In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import os
import shutil
import json

In [None]:
runid = datetime.now().strftime("%Y%m%d_%H%M%S%f")
runner_cache_folder = os.path.join(os.getcwd(),'runner_cache', runid)
os.makedirs(runner_cache_folder)

In [None]:

'''
    Cheatsheet
    Each value in the list represents single column used in comparison
        - source_name: name of the column in the source csv file (usually file extracted from Power BI)
        - target_name: name of the column in the target csv file (usually extracted from the Synapse query result)
        - key: if True, will be used to join the extract, if False - column will be used for the comparison
        - comparison: only applicable if key = False. Type of the comparison. Available options:
            "numeric" - comparison based on subtraction of the columns
            "string" - comparison bsed on theexact value of he column
        - clean: only applicable if key = False and comparison = numeric. Removes spaces $ and % signs from the coluns in Power BI extract
        - precision: only applicable if key = False and comparison = numeric. Sets how many digits after 0 should be considered as discrepancy
        - negative_format: only applicable if key = False and comparison = numeric. if the value is "parentheses" then 
                the value in parentheses is replaced by the negative number, e.g. (156.23) -> -156.23. Useful for the data extracted from Power BI Desktop
        - ignore_case : only applicable if key = False and comparison = string. Ignores letter case during comparison if set to True
        - replace_nulls: only applicable if key = False. if set, then NULLs in the comparison columns are replaced by zeros.
                available options: 
                    "pbi" - replace NULLs in Power BI extract only
                    "dm" - replace NULLs in Data Mart extract (NaN and numpy inf both considered as NULLs)
                    "both": replaces value in both extracts
'''


In [None]:
standalone_mode = False

if standalone_mode:
    # Set up the variables
    test_name = "comparison_shipping_history"
   # src_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\test_page_1.csv" # path to file extracted from Power BI Desktop
   # tgt_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\prod_page_1.csv" # path to file extracted from the synapse query result
   # columns = [
   # {'source_name': 'Product Category', 'target_name': 'Product Category', 'key': True},
   # {'source_name': 'Product Sub Category', 'target_name': 'Product Sub Category', 'key': True},
   # {'source_name': 'Individual Unit Container', 'target_name': 'Individual Unit Container', 'key': True},
   # {'source_name': 'Bottle Size', 'target_name': 'Bottle Size', 'key': True},
   # {'source_name': 'Supply Area', 'target_name': 'Supply Area', 'key': True},
   # {'source_name': 'Site Name', 'target_name': 'Site Name', 'key': True},
   # {'source_name': 'Brand', 'target_name': 'Brand', 'key': True},
   # {'source_name': 'Year', 'target_name': 'Year', 'key': True},
   # {'source_name': 'Month', 'target_name': 'Month', 'key': True},
#
   # {'source_name': 'Efficient Ship','target_name': 'Efficient Ship', 'key': False, 'comparison': 'numeric', 'clean': False, 'replace_nulls': 'both', 'precision': 0},
   # {'source_name': 'Non-Efficient Ship','target_name': 'Non-Efficient Ship', 'key': False, 'comparison': 'numeric', 'clean': False ,'replace_nulls': 'both', 'precision': 0},
   # {'source_name': 'Efficient Ship %','target_name': 'Efficient Ship %', 'key': False, 'comparison': 'numeric', 'clean': 'both', 'replace_nulls': 'both', 'precision': 3}
   # ]

   # src_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\test_page_2.csv" # path to file extracted from Power BI Desktop
   # tgt_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\prod_page_2.csv" # path to file extracted from the synapse query result
   # columns = [
   # {'source_name': 'Channel', 'target_name': 'Channel', 'key': True},
   # {'source_name': 'level_3_sap_company_name', 'target_name': 'level_3_sap_company_name', 'key': True},
   # {'source_name': 'Year', 'target_name': 'Year', 'key': True},
   # {'source_name': 'Month', 'target_name': 'Month', 'key': True},
 #
 
   # {'source_name': 'Efficient Ship','target_name': 'Efficient Ship', 'key': False, 'comparison': 'numeric', 'clean': False, 'replace_nulls': 'both', 'precision': 0},
   # {'source_name': 'Non-Efficient Ship','target_name': 'Non-Efficient Ship', 'key': False, 'comparison': 'numeric', 'clean': False ,'replace_nulls': 'both', 'precision': 0},
   # {'source_name': 'Efficient Ship %','target_name': 'Efficient Ship %', 'key': False, 'comparison': 'numeric', 'clean': 'both', 'replace_nulls': 'both', 'precision': 3}
   # ]
    src_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\test_page_3_2023.csv" # path to file extracted from Power BI Desktop
    tgt_extract_path = r"C:\Users\12000498\Downloads\shipping_efficency\prod_page_3_2023.csv" # path to file extracted from the synapse query result
    columns = [
    {'source_name': 'Origin Supply Area', 'target_name': 'Origin Supply Area', 'key': True},
    {'source_name': 'Origin Plant', 'target_name': 'Origin Plant', 'key': True},
    {'source_name': 'Prod Plant', 'target_name': 'Prod Plant', 'key': True},
    {'source_name': 'Bottle Size', 'target_name': 'Bottle Size', 'key': True},
    {'source_name': 'Material', 'target_name': 'Material', 'key': True},
    {'source_name': 'Material Name', 'target_name': 'Material Name', 'key': True},

    {'source_name': 'CE','target_name': 'CE', 'key': False, 'comparison': 'numeric', 'clean': 'both', 'replace_nulls': 'both', 'precision': 0},
    {'source_name': '% of Total','target_name': '% of Total', 'key': False, 'comparison': 'numeric', 'clean': 'both' ,'replace_nulls': 'both', 'precision': 3}
    ]
else:
    # read setting from config
    target_param_file = os.path.join(runner_cache_folder, "params.json")
    shutil.copyfile("params.json", target_param_file)
    with open(target_param_file, "r") as f: 
        params = json.load(f)
    src_extract_path = params["source_path"]
    tgt_extract_path = params["target_path"]
    columns = params["column_mapping"]
    test_name = params["name"]

In [None]:
# Copy csvs to the cache folder
cache_src_file = os.path.join(runner_cache_folder, 'src.csv')
cache_tgt_file = os.path.join(runner_cache_folder, 'tgt.csv')
comparison_file = os.path.join(runner_cache_folder, 'comparison.csv')
discrepancy_file = os.path.join(runner_cache_folder, 'discrep.csv')
results_file = os.path.join(runner_cache_folder, 'result.json')

shutil.copyfile(src_extract_path, cache_src_file)
shutil.copyfile(tgt_extract_path, cache_tgt_file)

In [None]:
# Settings for columns 
src_key_col_names = [column_def["source_name"] for column_def in columns if column_def["key"]]
tgt_key_col_names = [column_def["target_name"] for column_def in columns if column_def["key"]]
comparison_columns = [column_def for column_def in columns if (not column_def["key"] and "comparison" in column_def)]

In [None]:
# load source extract to df
df_src = pd.read_csv(cache_src_file)
rename_mapping = dict([[col["source_name"], col["target_name"]] for col in columns])
# rename columns in source  extract so they match target extract
df_src.rename(columns=rename_mapping, inplace=True)
# delete undefined columns
columns_to_drop = set(df_src.columns) - set(rename_mapping.values())
df_src.drop(columns_to_drop, axis=1, inplace=True)
# convert key columns to upper index
for column_name in tgt_key_col_names:
    if df_src[column_name].dtype == object:
        df_src[column_name] = df_src[column_name].str.upper()
# set index
df_src.set_index(tgt_key_col_names, inplace=True)


# load target extract to df
df_tgt = pd.read_csv(cache_tgt_file)
# convert key columns to upper index
for column_name in tgt_key_col_names:
    if df_tgt[column_name].dtype == object:
        df_tgt[column_name] = df_tgt[column_name].str.upper()
# set index
df_tgt.set_index(tgt_key_col_names, inplace=True)

In [None]:
# change format for negative numbers
columns_to_update_format = [[column_def["target_name"], column_def["negative_format"]] for column_def in comparison_columns
    if column_def["comparison"] == 'numeric' and "negative_format" in column_def #and column_def["negative_format"] == 'parentheses'
    ]
for column, negative_format_settings in columns_to_update_format:
    if negative_format_settings['type'] == 'parentheses':
        negative_format_value = negative_format_settings['value']
        if negative_format_value == 'source' or negative_format_value == 'both':
            df_src[column] = df_src[column].map(
            lambda x : str(x)
                    .replace("(","-")
                    .replace(")","")
            )
        if negative_format_value == 'target' or negative_format_value == 'both':
            df_tgt[column] = df_tgt[column].map(
            lambda x : str(x)
                    .replace("(","-")
                    .replace(")","")
            )

# clean the data if needed
columns_to_clean = [[column_def["target_name"], column_def["clean"]] for column_def in comparison_columns 
    if column_def["comparison"] == 'numeric' and "clean" in column_def and column_def["clean"]
    ]
# remove spaces, $ and % signs if exists
for column_name, clean in columns_to_clean:
    # source
    if clean == 'source' or clean == 'both':
        if df_src[column_name].dtype == object:
            df_src[column_name] = df_src[column_name].map(
                lambda x : str(x).replace("$", "")
                    .replace("%", "")
                    .replace(" ","")
                    ).astype(np.float64)
    
    if clean == 'target' or clean == 'both':
        if df_tgt[column_name].dtype == object:
            df_tgt[column_name] = df_tgt[column_name].map(
                lambda x : str(x).replace("$", "")
                    .replace("%", "")
                    .replace(" ","")
                    ).astype(np.float64)
    

# replace nulls with zeros in compared columns
columns_to_replace = [[column_def["target_name"], column_def["replace_nulls"], column_def["comparison"]] for column_def in comparison_columns 
    if "replace_nulls" in column_def # column_def["comparison"] == 'numeric' and
    ]
for column_name, replace_nulls, comparison_type in columns_to_replace:
    replacement_value = 0 if comparison_type=='numeric' else ''
    if replace_nulls == 'source' or replace_nulls == 'both': 
        df_src[column_name] = df_src[column_name].fillna(replacement_value).replace(np.inf, replacement_value)
    if replace_nulls == 'target' or replace_nulls == 'both': 
        df_tgt[column_name] = df_tgt[column_name].fillna(replacement_value).replace(np.inf, replacement_value)


In [None]:
# join into single dataframe
comparison_df = df_src.join(df_tgt
    , how="outer", lsuffix='_src', rsuffix='_tgt')
comparison_df["discrepancy"] = False

In [None]:
# compare values

# numeric comparison
# subtract values in column
numeric_columns = [col for col in comparison_columns if col['comparison']=='numeric']
for comparison_column in numeric_columns:
    column_name =  comparison_column["target_name"]
    target_col_name = f'{column_name}_diff'
    comparison_df[target_col_name] = comparison_df[f"{column_name}_src"] - comparison_df[f"{column_name}_tgt"]
    comparison_precision = comparison_column.get("precision", 0)
    comparison_df["discrepancy"] = comparison_df["discrepancy"] | (comparison_df[target_col_name].map(lambda x: abs(x)) >= 0.1**comparison_precision)
    comparison_df["discrepancy"] = comparison_df["discrepancy"] | comparison_df[f"{column_name}_src"].isna() | comparison_df[f"{column_name}_tgt"].isna()
    
# string comparison

string_columns = [col for col in comparison_columns if col['comparison']=='string']
if string_columns:
    # create column to save list of columns that are not matching
    comparison_df['string_comparison_diff'] = ""
    for comparison_column in string_columns:
        column_name =  comparison_column["target_name"]

        if 'ignore_case' in comparison_column and comparison_column['ignore_case']:
            comparison_df.loc[comparison_df[f'{column_name}_src'].str.lower() != comparison_df[f'{column_name}_tgt'].str.lower(), 'string_comparison_diff'] = \
                comparison_df.loc[comparison_df[f'{column_name}_src'].str.lower() != comparison_df[f'{column_name}_tgt'].str.lower(), 'string_comparison_diff'] + ' ' + column_name
        else:
            comparison_df.loc[comparison_df[f'{column_name}_src'] != comparison_df[f'{column_name}_tgt'], 'string_comparison_diff'] = \
                comparison_df.loc[comparison_df[f'{column_name}_src'] != comparison_df[f'{column_name}_tgt'], 'string_comparison_diff'] + ' ' + column_name

    # write result
    comparison_df.loc[comparison_df['string_comparison_diff']!='','discrepancy'] = True

In [None]:
# show records that doesn't match
result  = comparison_df[comparison_df["discrepancy"]]
result_description = dict()
result_description['cache_folder'] = runner_cache_folder
if len(result) > 0:
    result_description["status"] = "Failed"
    result_description["discrepancy_file"] = discrepancy_file
    result.to_csv(discrepancy_file)
else:
    result_description["status"] = "Passed"

# write execution details to json
with open(results_file,"w") as f:
    json.dump(result_description, f)

In [None]:
# return path to the cache folder with the results of execution
if not standalone_mode:
    shutil.copyfile(results_file, "runner_result.json")