In [1]:
#Important libraries
import pandas as pd
import numpy as np
import random
import duckdb
import yaml
from datetime import datetime
import logging

### Extracting relevant info to build the contract

In [None]:
#Dataframe to use
df = pd.read_csv("mejorado.csv")
df

Added age as part of the example to the contract. Ranges: 18-100

In [None]:
#DO NOT RUN AGAIN
age = []
for i in range(1000):
    age.append(random.randint(18,100))

df=df.assign(ep_edad = age)
df

In [None]:
#DO NOT RUN AGAIN
df.to_csv('mejorado.csv')

In [None]:
np.unique(df['ep_tipo_exp'])

In [None]:
print(np.unique(df['ep_estado']))

In [None]:
print(np.unique(df['ep_ubicacion']))

In [None]:
df['ep_edad'].describe()

### Contract example using the four variables described earlier

In [2]:
#Log file basic configuration
logging.basicConfig(filename="ContractFiles.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# Stablishing connection with DB
conn = duckdb.connect('file.db')

In [3]:
# Creating the tables that I'm gonna use and viewing them
try:
        conn.sql("""CREATE TABLE resultados(fecha_val_contrato DATETIME,
                contract_id INTEGER,
                table_name VARCHAR(100),
                type_error VARCHAR(50),
                column_name VARCHAR(200))""")
except:
        print("resultados table already exists")
        
conn.table('resultados').show()

resultados table already exists
┌─────────────────────┬─────────────┬────────────┬───────────────────────────────┬──────────────┐
│ fecha_val_contrato  │ contract_id │ table_name │          type_error           │ column_name  │
│      timestamp      │    int32    │  varchar   │            varchar            │   varchar    │
├─────────────────────┼─────────────┼────────────┼───────────────────────────────┼──────────────┤
│ 2023-11-02 12:02:12 │  1234567890 │ tester.csv │ Different type vals           │ ep_tipo_exp  │
│ 2023-11-02 12:02:12 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_tipo_exp  │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_estado    │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Null values present           │ ep_estado    │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Different type vals           │ ep_ubicacion │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_

In [4]:
# This will be the CSV that ill pass in the validator
result = duckdb.query('SELECT * FROM "tester.csv"').to_df()
result.head(125)

Unnamed: 0,ep_operacion,ep_expediente,ep_tipo_exp,ep_estado,ep_ubicacion,ep_cliente,ep_descripcion,ep_ult_mod,ep_edad
0,1,50041781,C1,N,5,704501,traslado por actualizacion de cambios en segur...,56:08.6,28
1,2,20037263,C1,N,5,280988,Crédito cancelado,20:18.0,87
2,3,10254701,C1,N,5,264989,Expediente cancelado con bolsa de testimonio e...,52:17.1,54
3,4,10255729,C1,N,5,145769,Expediente y Testimonio original,40:31.3,47
4,5,50014116,C2,N,22,480545,Creacion de etiqueta para expediente,46:11.7,27
...,...,...,...,...,...,...,...,...,...
120,1715,10321126,C1,N,5,744224,Expediente y Testimonio original,35:24.1,28
121,1716,10321504,85,N,5,116229,Expediente y Testimonio original,35:24.3,85
122,1717,10320512,C1,N,5,1017704,Expediente y Testimonio original,10:58.5,52
123,1763,10238317,C1,N,5,237705,Crédito cancelado,32:27.7,59


In [10]:
# Getting the details of the contract in the YAML
with open('Contract.yml', 'rb') as f:
    conf = yaml.safe_load(f.read())

In [5]:
def SQLWriter(contract, table, problem, column):
    """
    Writes into the table when something is wrong in the contract
     Args:
        contract (int): The contract ID that failed
        table (str): The table that have a problem
        problem (str): The type of problem detected
        column (str): The column that is being affected by the problem
    Returns:
        1 when the execution is over
    """

    now = datetime.now()
    formatted_date = now.strftime('%Y-%m-%d %H:%M:%S')
    
    insert_query = f"""
    INSERT INTO resultados
    (fecha_val_contrato, contract_id, table_name, type_error, column_name)
    VALUES
    ('{formatted_date}', '{contract}', 
    '{table}', '{problem}', 
    '{column}'),
    """

    conn.execute(insert_query)
    return 1
    

In [7]:
def tipo(arr, typ):
    """
    Receives an array and verifies that all the values in
    the array have the type they are expected to have
     Args:
        arr (array): The array to verify
        typ (str): the type that the whole array needs to have
    Returns:
        True if all the values are correct or false if someone is wrong
    """
    
    if typ == 'str':
        indexer = arr.str.isnumeric()
        res = [i for i, val in enumerate(indexer) if val]
        if len(res) == 0:
            return True
        else:
            return False
    elif typ == 'int':
        return all(map(lambda x: isinstance(x, int), arr))
    elif typ == 'float':
        return all(map(lambda x: isinstance(x, float), arr))
    elif typ == 'bool':
        indexer = arr.str.isnumeric()
        res = [i for i, val in enumerate(indexer) if val]
        if len(res) == 0:
            return True
        else:
            return False

In [8]:
# Contract enforcer
def verifier(yaml):
    """
    Receives a YAML file with the table and columns to verify in the contract. 
    The general findings are stored in a table called RESULTADOS (i.e. if a 
    column has atypical values o doesnt exist) and the specific problems in a 
    .log file (i.e. the atypical values per column). The logging is done here
    and the INSERT is done in another function called from here
     Args:
        yaml (list): All the information of the contract
    Returns:
        1 when the execution is over
    """

    # Holder to store if a table exists
    exists = True

    logger.info("Contract ID: %s", yaml["contractId"])

    # Cycle to move in all the columns that the DC defines
    for i in range (len(yaml['columns'])):

        columna = yaml['columns'][i]['column']
        valores = yaml['columns'][i]['values']
        tipos = yaml['columns'][i]['logicalType']
        listData = []

        # Validator for categorical columns
        if yaml['columns'][i]['isCategorical']:
            try:
                # Types verifier
                qry = duckdb.query(f'SELECT DISTINCT {columna} FROM {yaml["tableName"]}').fetchall()
                for i in range(len(qry)):
                    listData.append(qry[i][0])
                typesData = pd.Series(listData)
                if tipo(typesData, tipos) == False:
                    # Saving the result if it fails the verification
                    logger.warning(f"Col %s values distinct type from f{tipos}", columna)
                    SQLWriter(yaml['contractId'], yaml["tableName"], "Different type vals", columna)
                
                # Unique values verifier
                if len(list(set(listData).difference(valores))) == 0:
                    logger.info("Column %s correct", columna)
                else:
                    # Saving the result if it fails the verification
                    logger.warning(f"Col %s unknown values: {list(set(listData).difference(valores))}", columna)
                    SQLWriter(yaml['contractId'], yaml["tableName"], "Unidentified categorical vals", columna)
            except:
                # Saving the result if it fails the verification
                logger.error("Column %s doesnt exist", columna)
                SQLWriter(yaml['contractId'], yaml["tableName"], "Column doesnt exist", columna)
                exists = False
        
        # Validator for non categorical columns
        else:
            try:
                 # Types verifier
                qry = duckdb.query(f'SELECT DISTINCT {columna} FROM {yaml["tableName"]}').fetchall()
                for i in range(len(qry)):
                    listData.append(qry[i][0])
                typesData = pd.Series(listData)
                if tipo(typesData, tipos) == False:
                    # Saving the result if it fails the verification
                    logger.warning(f"Col %s values distinct type from f{tipos}", columna)
                    SQLWriter(yaml['contractId'], yaml["tableName"], "Different type vals", columna)

                # Min/Max values verifier
                qry = duckdb.query(f'''SELECT {columna} FROM {yaml["tableName"]}
                                    WHERE {columna} < {valores[0]} OR {columna} > {valores[1]}''').fetchall()
                if len(qry) != 0:
                    # Saving the result if it fails the verification
                    logger.warning(f"Col %s wrong vals: {qry}", columna)
                    SQLWriter(yaml['contractId'], yaml["tableName"], "Values out of bounds", columna)
                else:
                    logger.info("Column %s correct", columna)
            except:
                # Saving the result if it fails the verification
                logger.error("Column %s doesnt exist", columna)
                SQLWriter(yaml['contractId'], yaml["tableName"], "Column doesnt exist", columna)
                exists = False

        # Checking for nulls
        if exists and yaml['columns'][0]['isNullable'] == False:
            nulls = duckdb.query(f'''select {columna} from {yaml["tableName"]} 
                        WHERE {columna} IS NULL''').fetchall()
            if len(nulls) != 0:
                # Saving the result if it fails the verification
                logger.warning("Column %s have nulls", columna)
                SQLWriter(yaml['contractId'], yaml["tableName"], "Null values present", columna)
            else:
                logger.info("No Null values in %s", columna)
        else:
            exists = True
    
    logger.info("---------------------------------------------------------")

    return 1

In [11]:
verifier(conf)
conn.table('resultados').show()

┌─────────────────────┬─────────────┬────────────┬───────────────────────────────┬──────────────┐
│ fecha_val_contrato  │ contract_id │ table_name │          type_error           │ column_name  │
│      timestamp      │    int32    │  varchar   │            varchar            │   varchar    │
├─────────────────────┼─────────────┼────────────┼───────────────────────────────┼──────────────┤
│ 2023-11-02 12:02:12 │  1234567890 │ tester.csv │ Different type vals           │ ep_tipo_exp  │
│ 2023-11-02 12:02:12 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_tipo_exp  │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_estado    │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Null values present           │ ep_estado    │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Different type vals           │ ep_ubicacion │
│ 2023-11-02 12:02:13 │  1234567890 │ tester.csv │ Unidentified categorical vals │ ep_ubicacion │
│ 2023-11-02 12:02:1

In [12]:
conn.close()