# IMPORT LIBS

In [1]:
import json
import pandas as pd
import pymongo as mongo
from itertools import compress
import glob
import os

In [None]:
from configparser import ConfigParser

CONFIG = ConfigParser()
CONFIG.read('../config/dq.ini')
MONGO_SERVER = CONFIG["MONGO_SERVER"]

HOST = MONGO_SERVER['MONGO_HOST']
PORT = int(MONGO_SERVER['MONGO_PORT'])
DB = MONGO_SERVER['MONGO_DB']
COL = MONGO_SERVER['MONGO_COLLECTIONS']

In [None]:
client = mongo.MongoClient(HOST, PORT)

# DEFINE FUNCTIONS

In [2]:
def openValidation(jsonPath: str) -> dict:
    try:
        with open(jsonPath, 'r') as result_json:
            print(f'FILE OPENED WITH SUCCESS !')
            return json.loads(result_json.read())
        
    except Exception as e:
        print(f'ERROR - READING VALIDATION FILE - {jsonPath}: \n {e}')
        raise

def formatResults(validationJSON: dict) -> dict:
    formatedResults = ({

        'expectation_suite_name' :  validationJSON['meta'].get('expectation_suite_name', None),
        'checkpoint_name' : validationJSON['meta'].get('checkpoint_name', None),
        'data_asset_name' : validationJSON['meta']['active_batch_definition'].get('data_asset_name', None),
        'path_file' :  validationJSON['meta']['batch_spec'].get('path', None),
        'run_name' : validationJSON['meta']['run_id'].get('run_name', None),
        'run_time' : validationJSON['meta']['run_id'].get('run_time', None),

        'results' : [{
                'rule' : validationJSON['results'][expect]['expectation_config'].get('expectation_type', None),
                'rule_detail' : validationJSON['results'][expect]['expectation_config'].get('kwargs', None),

                'rows_evaluated' : validationJSON['results'][expect]['result'].get('element_count', None),
                'unexpected_count' : validationJSON['results'][expect]['result'].get('unexpected_count', None),
                'unexpected_percent' : validationJSON['results'][expect]['result'].get('unexpected_percent', None),
                'rule_result': validationJSON['results'][expect].get('success', None)
                } for expect in range(len(validationJSON['results']))] ,

        'statistics': {
            'evaluated_expectations': validationJSON['statistics'].get('evaluated_expectations', None),
            'success_percent': validationJSON['statistics'].get('success_percent', None),
            'successful_expectations': validationJSON['statistics'].get('successful_expectations', None),
            'unsuccessful_expectations': validationJSON['statistics'].get('unsuccessful_expectations', None),
        },

        'expectation_result': validationJSON['success']
    })

    for i in range(len(formatedResults['results'])):
            del formatedResults['results'][i]['rule_detail']['batch_id']
    
    return formatedResults

def registerValidation(mongoClient, databaseName: str, collectionName: str, formatedResults: dict):
    try:
        db = mongoClient.get_database(databaseName)
    except Exception as e:
        print(f'ERROR - MONGO CONNECITON - GET DATABASE: \n {e}')

    try:
        dbColl = db.get_collection(collectionName)
    except Exception as e:
        print(f'ERROR - MONGO CONNECITON - GET COLLECTION: \n {e}')
    
    try:
        dbColl.insert_one(formatedResults)
    except Exception as e:
        print(f'ERROR - MONGO CONNECITON - SAVING RESULTS: \n {e}')

# RUN THE PROCESS

In [4]:
val = glob.glob('../output/validations/DataQuality/**/*.json', recursive=True)
latest_file = max(val, key=os.path.getctime)

In [6]:
validationJSON = openValidation(latest_file)

formatedResults = formatResults(validationJSON)

registerValidation(client, DB, COL, formatedResults)

FILE OPENED WITH SUCCESS !


In [3]:
for validation in glob.iglob('../output/validations/DataQuality/**/*.json', recursive=True):
    validationJSON = openValidation(validation)

    formatedResults = formatResults(validationJSON)
    
    registerValidation(client, 'greatExpectations', 'expectationsResults', formatedResults)

FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !
FILE OPENED WITH SUCCESS !


# TESTS

In [4]:
with open('result.json', 'r') as result_json:
    result = json.loads(result_json.read())

In [5]:
geResult = ({

        'expectation_suite_name' :  result['meta'].get('expectation_suite_name', None),
        'checkpoint_name' : result['meta'].get('checkpoint_name', None),
        'data_asset_name' : result['meta']['active_batch_definition'].get('data_asset_name', None),
        'path_file' :  result['meta']['batch_spec'].get('path', None),
        'run_name' : result['meta']['run_id'].get('run_name', None),
        'run_time' : result['meta']['run_id'].get('run_time', None),

        'results' : [{
                'rule' : result['results'][expect]['expectation_config'].get('expectation_type', None),
                'rule_detail' : result['results'][expect]['expectation_config'].get('kwargs', None),

                'rows_evaluated' : result['results'][expect]['result'].get('element_count', None),
                'unexpected_count' : result['results'][expect]['result'].get('unexpected_count', None),
                'unexpected_percent' : result['results'][expect]['result'].get('unexpected_percent', None),
                'rule_result': result['results'][expect].get('success', None)
                } for expect in range(len(result['results']))] ,

        'statistics': {
            'evaluated_expectations': result['statistics'].get('evaluated_expectations', None),
            'success_percent': result['statistics'].get('success_percent', None),
            'successful_expectations': result['statistics'].get('successful_expectations', None),
            'unsuccessful_expectations': result['statistics'].get('unsuccessful_expectations', None),
        },

        'expectation_result': result['success']
})

for i in range(len(geResult['results'])):
        del geResult['results'][i]['rule_detail']['batch_id']

In [6]:
db = client.get_database('greatExpectations')
dbColl = db.get_collection('expectations')

In [7]:
dbColl.insert_one(geResult)

<pymongo.results.InsertOneResult at 0x20f2b898e40>