In [22]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])?  y


###################################################################       
#Script Name    :                                                                                              
#Description    :                                                                                 
#Args           :                                                                                           
#Author         : Nor Raymond                                                
#Email          : nraymond@appen.com                                          
###################################################################

In [1]:
import os, sys
import pandas as pd
import numpy as np
import yaml
from IPython.core.display import display, HTML

In [2]:
# Function to load yaml configuration file
def load_config(config_name):
    with open(os.path.join(config_path, config_name), 'r') as file:
        config = yaml.safe_load(file)

    return config

config_path = "conf/base"

try:
    
    # load yaml catalog configuration file
    config = load_config("catalog.yml")

    os.chdir(config["project_path"])
    root_path = os.getcwd()
    
except:
    
    os.chdir('..')
    # load yaml catalog configuration file
    config = load_config("catalog.yml")

    os.chdir(config["project_path"])
    root_path = os.getcwd()

### Functions to initialize data ingestion

In [3]:
def raw_file_checker(files): 

    keyword = ['RC', 'Vocab_2', 'Vocab_1']
    checker = []
    file_exists = {}
    for fname in files:
        for key in keyword:
            if key in fname:
                checker.append(True)
                file_exists[key] = os.path.join(fname)
                
    if len(checker) == 3 :
        print("PASS: All files exists!")
        condition = True
    else:
        print("FAIL: Not all file exists! Please check the raw data folder to ensure RC, Vocab_1 and Vocab_2 file exists.")
        condition = False
        
    return condition, file_exists


def data_ingestion_initialize(root_path):
    
    # Function to load yaml configuration file
    def load_config(config_name):
        with open(os.path.join(root_path, config_path, config_name), 'r') as file:
            config = yaml.safe_load(file)

        return config

    # load yaml catalog configuration file
    config = load_config("catalog.yml")

    print("Initialize data ingestion and file checking...")
    
    # define input and output data paths
    raw_data_path = os.path.join(root_path, config["data_path"]["input"])
    out_data_path = os.path.join(root_path, config["data_path"]["output"])
    
    # define reference file paths
    ref_path = os.path.join(root_path, config["data_path"]["ref"])
    ref_filepath = os.path.join(ref_path, config["filenames"]["rc_col_ref"])
    ref_data = pd.read_excel(io = ref_filepath, sheet_name="columns_check", header=None)
    ref_data_cols = ref_data[0].tolist()
    
    # get the list of files in raw folder
    files = os.listdir(raw_data_path)
    files = [f for f in files if f[-4:] == '.xls']
    
    condition, file_exists = raw_file_checker(files)
    
    ## Define raw data filepaths
    rc_filepath = os.path.join(raw_data_path, file_exists['RC'])
    v1_filepath = os.path.join(raw_data_path, file_exists['Vocab_1'])
    v2_filepath = os.path.join(raw_data_path, file_exists['Vocab_2'])
       
    return raw_data_path, out_data_path, ref_path, ref_filepath, ref_data, ref_data_cols, files, file_exists, rc_filepath, v1_filepath, v2_filepath

raw_data_path, out_data_path, ref_path, ref_filepath, ref_data, ref_data_cols, files, file_exists, rc_filepath, v1_filepath, v2_filepath = data_ingestion_initialize(root_path)

Initialize data ingestion and file checking...
PASS: All files exists!


### Function to create dataframes

In [26]:
def create_dataframes(file_initial, rc_filepath, v1_filepath , v2_filepath):
    
    '''
    file_initial choices -
    RC: Reading Comprehension 
    Vocab_1: Vocabulary 1 
    Vocab_2: Vocabulary 2
    '''
    
    if file_initial == 'RC':
        filepath = rc_filepath
    elif file_initial == 'Vocab_1':
        filepath = v1_filepath
    elif file_initial == 'Vocab_2':
        filepath = v2_filepath
    
    # create dataframe from 'Summary' sheet
    df_summary = pd.read_excel(io = filepath, sheet_name="Summary")
    df_summary_cols = list(df_summary.columns)
    
    # create dataframe from 'Data' sheet
    df_data = pd.read_excel(io=filepath, sheet_name="Data")
    df_data_cols = list(df_data.columns)
    
    # create dataframe from 'Data' sheet
    df_ans_key = pd.read_excel(io=filepath, sheet_name="Answer Key")
    df_ans_key_cols = list(df_ans_key.columns)
    
    print(f"Dataframe created from {file_initial} file")
    
    return df_summary, df_summary_cols, df_data, df_data_cols, df_ans_key, df_ans_key_cols

#df_summary, df_summary_cols, df_data, df_data_cols, df_ans_key, df_ans_key_cols = create_dataframes('RC', rc_filepath, v1_filepath , v2_filepath)

### Data integrity scanning functions

In [27]:
class color:
    PURPLE = '\033[95m'
    CYAN = '\033[96m'
    DARKCYAN = '\033[36m'
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    YELLOW = '\033[93m'
    RED = '\033[91m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    END = '\033[0m'

def print_scan_results(col_condition_num, scan_num, file_initial , sheets = 'Summary'):
    
    if scan_num == 1:
        print(f"\nSCAN-{scan_num} : {file_initial} - {sheets} : Checking if the sheet contains either 'Language' and 'Market' columns ...")
        if col_condition_num == True:
            print(color.GREEN + "PASS" + color.END + ": 'Summary' sheet contains both 'Language' and 'Market' columns")
        else: 
            print(color.RED + "FAIL" + color.END + ": 'Summary' sheet does not contain either 'Language' and 'Market' columns")

    if scan_num == 2:
        print(f"\nSCAN-{scan_num} : {file_initial} - {sheets} : Checking if Language' and 'Market' columns are empty ...")
        if col_condition_num == True:
            print(color.GREEN + "PASS" + color.END + ": Both 'Language' and 'Market' columns in 'Summary' contains complete data")
        else: 
            print(color.RED + "FAIL" + color.END + ": Both or either 'Language' and 'Market' columns in 'Summary' sheet are empty or incomplete")
            
    if scan_num == 3 or scan_num == 6:
        print(f"\nSCAN-{scan_num} : {file_initial} - {sheets} : Checking if '_worker_id' column name is correct ...")
        if col_condition_num == True:
            print(color.GREEN + "PASS" + color.END + ": valid '_workder_id' column name")
        else:
            print(color.RED + "FAIL" + color.END + ": invalid '_workder_id' column name")
            
    if scan_num == 4:         
        print(f"\nSCAN-{scan_num} : {file_initial} - {sheets} : Checking if sheet contains 'Language' column ...")
        if col_condition_num == True:
            print(color.GREEN + "PASS" + color.END + ": 'Data' sheet contains 'Language' columns")
        else: 
            print(color.RED + "FAIL" + color.END + ": 'Data' sheet does not contain 'Language' columns")
            
    if scan_num == 5:         
        print(f"\nSCAN-{scan_num} : {file_initial} - {sheets} : Checking if Language' column are empty ...")
        if col_condition_num == True:
            print(color.GREEN + "PASS" + color.END + ": 'Language'column in 'Data' contains complete data")
        else: 
            print(color.RED + "FAIL" + color.END + ": 'Language' column in 'Data' sheet are empty or incomplete")
            
    if scan_num == 7 and file_initial == 'RC':         
        print(f"\nSCAN-7 : {file_initial} - {sheets} : checking if columns in the 'Data' sheet are identical to the reference columns ...")
        if col_condition_num == True:
            print (color.GREEN + "PASS" + color.END + ": The columns in the 'Data' sheet are identical to the reference") 
        else: 
            print (color.RED + "FAIL" + color.END + ": The columns in the 'Data' sheet are not identical to the reference")         
            
def summary_col_check(df_summary, df_summary_cols, file_initial , sheets = 'Summary'): 
      
    # --- SCAN-1 : checking if "Summary" sheet contains "Language" and "Market" columns   ---------------------
    # PASS -> 'Summary' sheet contains both 'Language' and 'Market' columns
    scan_num = 1
    cols_to_check = ['Language', 'Market']
    col_checker = {}   
    for col in cols_to_check:
        
        if col in df_summary_cols:
            col_checker[col] = True
        else:
            col_checker[col] = False
            
    condition_1 = col_checker['Language']
    condition_2 = col_checker['Market']
    col_condition_1 = all([condition_1, condition_2]) # both conditions has to be true

    return col_condition_1, scan_num

def summary_col_value_check(df_summary, file_initial, sheets = 'Summary'): 
    
    # --- SCAN-2 :checking if "Language" and "Market" columns in "Summary" is empty   -------------------------
    # PASS -> Both 'Language' and 'Market' columns in 'Summary' contains complete data
    scan_num = 2
    cols_to_check = ['Language', 'Market']
    col_checker = {}
    for col in cols_to_check:

        if df_summary[col].notnull().values.all() == True:
            col_checker[col] = True
        else:
            col_checker[col] = False

    condition_3 = col_checker['Language']
    condition_4 = col_checker['Market']
    col_condition_2 = all([condition_3, condition_4]) # both conditions has to be true

    return col_condition_2, scan_num

def col_header_check(df_summary_data, file_initial, sheets):
    
    # --- SCAN-3 : checking if worker_id column contains _ at the start   -------------------------------------
    # PASS -> if the number of character is 10 not 9 and column name is _workder_id
    scan_num = 3
    find_worker_idx = df_summary_data.columns.str.contains('worker')
    worker_idx = [i for i, x in enumerate(find_worker_idx) if x][0]
    worker_col = df_summary_data.columns[worker_idx]
    worker_col_len = len(worker_col)
    
    if worker_col_len == 10 and worker_col[0] == "_":
        col_condition_3 = True
    elif worker_col_len == 9 and worker_col[0] == "w":
        col_condition_3 = False
    return col_condition_3, scan_num

def data_col_check(df_data, df_data_cols, file_initial, sheets = 'Data'): 
      
    # --- SCAN-4 : checking if "Data" sheet contains "Language" column   --------------------------------------
    # PASS -> 'Data' sheet contains both 'Language' column
    scan_num = 4
    cols_to_check = ['Language']
    col_checker = {}   
    for col in cols_to_check:
        
        if col in df_data_cols:
            col_checker[col] = True
        else:
            col_checker[col] = False
            
    condition_1 = col_checker['Language']
    col_condition_4 = all([condition_1])
        
    return col_condition_4, scan_num

def data_col_value_check(df_data, file_initial, sheets = 'Data'): 
    
    # --- SCAN-5 :checking if "Language" column in "Data" is empty   -------------------------
    # PASS -> 'Language' column in 'Data' contains complete data
    scan_num = 5
    cols_to_check = ['Language']
    col_checker = {}
    for col in cols_to_check:

        if df_data[col].notnull().values.all() == True:
            col_checker[col] = True
        else:
            col_checker[col] = False

    condition_3 = col_checker['Language']
    col_condition_5 = all([condition_3])

    return col_condition_5, scan_num

def data_col_header_check(df_data_cols, ref_data_cols, file_initial, sheets = 'Data'):
    
    # --- SCAN-7 : checking if columns in "Data" sheet are identical to the reference columns   ------------------------
    # refer to the file in reference > reference_checks.xlsx
    # PASS -> if the two column lists are identical
    scan_num = 7
    ref_data_cols_sorted = ref_data_cols
    df_data_cols_sorted = df_data_cols
    
    # sorting both the lists 
    ref_data_cols_sorted.sort() 
    df_data_cols_sorted.sort() 
    
    # using == to check if  
    if ref_data_cols_sorted == df_data_cols_sorted:
        col_condition_7 = True
    else : 
        col_condition_7 = False
    return col_condition_7, scan_num

def data_integrity_check(df_summary, df_summary_cols, df_data, df_data_cols, file_initial): 
    
    print(color.BOLD + f"Reading {file_initial} raw data and perform data integrity scanning...:\n" + color.END)
      
    conditions_list = []
    
    # SCAN-1
    col_condition_1, scan_num = summary_col_check(df_summary, df_summary_cols, file_initial , 'Summary')
    print_scan_results(col_condition_1, scan_num, file_initial , sheets = 'Summary')
    conditions_list.append(col_condition_1)
      
    # SCAN-2
    # Runs only when col_condition_1 returns True
    if col_condition_1 == True:
        col_condition_2, scan_num = summary_col_value_check(df_summary, file_initial, 'Summary')  
        print_scan_results(col_condition_2, scan_num, file_initial , sheets = 'Summary')
        conditions_list.append(col_condition_2)
    else:
        conditions_list = conditions_list

    # SCAN-3
    col_condition_3, scan_num = col_header_check(df_summary, file_initial, 'Summary')
    print_scan_results(col_condition_3, scan_num, file_initial , sheets = 'Summary')
    conditions_list.append(col_condition_3)
    
    # SCAN-4
    col_condition_4, scan_num = data_col_check(df_data, df_data_cols, file_initial, sheets = 'Data')
    print_scan_results(col_condition_4, scan_num, file_initial , sheets = 'Data')
    conditions_list.append(col_condition_4)
    
    # SCAN-5
    # Runs only when col_condition_4 returns True
    if col_condition_4 == True:
        col_condition_5, scan_num = data_col_value_check(df_data, file_initial, sheets = 'Data')
        print_scan_results(col_condition_5, scan_num, file_initial , sheets = 'Data')
        conditions_list.append(col_condition_5)
    else:
        conditions_list = conditions_list
        
    # SCAN-6 
    col_condition_6, scan_num = col_header_check(df_data, file_initial, 'Data')
    scan_num = 6
    print_scan_results(col_condition_6, scan_num, file_initial , 'Data')
    conditions_list.append(col_condition_6)
    
    # SCAN-7
    if file_initial == 'RC':
        col_condition_7, scan_num = data_col_header_check(df_data_cols, ref_data_cols, file_initial, sheets = 'Data')
        print_scan_results(col_condition_7, scan_num, file_initial , 'Data')
        conditions_list.append(col_condition_7)
       
    # Final data integrity results after all checks
    # PASS -> when all scans return True/PASS
    if len(conditions_list) > 1 :
        integrity_result = all(conditions_list)
        if integrity_result == True:
            print(color.BOLD + f'\n{file_initial} data integrity result:' + color.GREEN + ' PASS' + color.END + '\n')
        else: 
            print(color.BOLD + f'\n{file_initial} data integrity result:' + color.RED + ' FAIL' + color.END + '\n')
    elif len(conditions_list) == 1: 
        print(color.BOLD + f'\n{file_initial} data integrity result:' + color.RED + ' FAIL' + color.END + '\n')

    return integrity_result, conditions_list

#### Generate the data integrity report

In [28]:
def main():

    file_initials = ['RC', 'Vocab_1', 'Vocab_2']

    int_results = {}
    for file_initial in file_initials:
        df_summary, df_summary_cols, df_data, df_data_cols, df_ans_key, df_ans_key_cols = create_dataframes(file_initial, rc_filepath, v1_filepath , v2_filepath) 
        integrity_result, conditions_list = data_integrity_check(df_summary, df_summary_cols, df_data, df_data_cols, file_initial)
        int_results[file_initial] = integrity_result

if __name__ == "__main__":

    main()

Dataframe created from RC file
[1mReading RC raw data and perform data integrity scanning...:
[0m

SCAN-1 : RC - Summary : Checking if the sheet contains either 'Language' and 'Market' columns ...
[91mFAIL[0m: 'Summary' sheet does not contain either 'Language' and 'Market' columns

SCAN-3 : RC - Summary : Checking if '_worker_id' column name is correct ...
[92mPASS[0m: valid '_workder_id' column name

SCAN-4 : RC - Data : Checking if sheet contains 'Language' column ...
[91mFAIL[0m: 'Data' sheet does not contain 'Language' columns

SCAN-6 : RC - Data : Checking if '_worker_id' column name is correct ...
[92mPASS[0m: valid '_workder_id' column name

SCAN-7 : RC - Data : checking if columns in the 'Data' sheet are identical to the reference columns ...
[91mFAIL[0m: The columns in the 'Data' sheet are not identical to the reference
[1m
RC data integrity result:[91m FAIL[0m

Dataframe created from Vocab_1 file
[1mReading Vocab_1 raw data and perform data integrity scanning.