In [19]:
import pandas as pd
import pyarrow
import numpy as np
from textwrap import dedent
from datetime import datetime

In [20]:
# convert files to pandas dataframe
def to_pandas_df_converter(path: str, datatype: str, keep_default_na=True):
    """
    Transforms data from csv or parquet files into pandas dataframe
    :param path: path to file for processing. In case of parquet files it could be folder with several parquet files inside
    :param datatype: definition of data type. Could be csv or parquet only
    :param keep_default_na: optional parameter to handle NaN values
    :return: pandas dataframe
    """
    try:
        if datatype == 'csv':
            return pd.read_csv(path, keep_default_na=keep_default_na)
        elif datatype == 'parquet':
            return pd.read_parquet(path)
        else: 
            raise ValueError('Improper datatype is chosen. Choose csv or parquet')
    except (ValueError, PermissionError) as e:
        raise Exception("Path to file is not defined propery. Please, check if path is proper in accordance with chosen datatype")

In [21]:
# Check empty_nonempty fields

def check_for_empy_fields(path: str, datatype: str, columns_to_check: list=False):
    """
    Checks if file contains empty values in defined columns.
    NOTE: ('NA', 'nan' values etc. are not considered as empty. Only empty string like ' ' is defined as empty)
    :param path: path to file for processing. In case of parquet files it could be folder with several parquet files inside
    :param datatype: definition of data type. Could be csv or parquet only
    :param columns_to_check: columns to check for absence of empty values. If False (default) then all columns are checked
    :return: pandas dataframe row with results of check
    """

    # get table name for logs
    table_name = path.split('/')[-1].split('.')[0]
    
    dataframe = to_pandas_df_converter(path, datatype, keep_default_na=False)

    # get names of all columns if specific columns are not defined
    if columns_to_check==False:
        columns_to_check = list(dataframe.columns.values)

    # create string with columns names for logs
    string_of_cols = ", ".join(["" + str(item) + "" for item in columns_to_check])

    dataframe_to_check = dataframe[columns_to_check]
    count_of_rows_with_nulls = dataframe_to_check[dataframe_to_check.applymap(lambda x: True if (str(x).strip()=='') else False).any(axis=1)].shape[0]

    if count_of_rows_with_nulls == 0:
        status ='Passed'
        bad_data_desc = ''
    else: 
        status ='Failed'
        bad_data_desc = f"There are {count_of_rows_with_nulls} rows with epmty values in any of the chosen columns"

    data = {'Table': [table_name], 
            'File path': [path], 
            'DQ check': ['Completeness by empty/non empty fields'], 
            'Column': [string_of_cols], 
            'Status': [status], 
            'Bad Data': [bad_data_desc]}

    result_table = pd.DataFrame(data)

    return result_table


# example of using
path = './Data/source/flights.csv'
path_parquet = './Data/raw/flights'

list_of_columns = \
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime',
'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum',
'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay',
'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 
'Cancelled', 'Diverted', 'CarrierDelay', 'WeatherDelay',
'NASDelay', 'SecurityDelay', 'LateAircraftDelay']

nn_result = check_for_empy_fields(path, 'csv', list_of_columns)
nn_result

Unnamed: 0,Table,File path,DQ check,Column,Status,Bad Data
0,flights,./Data/source/flights.csv,Completeness by empty/non empty fields,"Year, Month, DayofMonth, DayOfWeek, DepTime, C...",Failed,There are 34 rows with epmty values in any of ...


In [22]:
# Check validity by range

def check_validity_by_range(path: str, datatype: str, columns_to_check: list, min_val: int, max_val: int):
    """
    Checks if in the specific columns there are values out of the [min, max] range.
    NOTE: NaN values are not considered to be out of range.
    :param path: path to the file for processing. In case of parquet files it could be folder with several parquet files inside
    :param datatype: definition of data type. Could be csv or parquet only
    :param columns_to_check: columns to check
    :param min_val: min value in range
    :param max_val: max value in range
    :return: pandas dataframe row with results of check
    """
    # get table name for logs
    table_name = path.split('/')[-1].split('.')[0]
    
    dataframe = to_pandas_df_converter(path, datatype)

    # create string with columns names for logs
    string_of_cols = ", ".join(["" + str(item) + "" for item in columns_to_check])

    out_of_range_values = []
    for value in columns_to_check:
        values_to_check = dataframe[value].tolist()
        out_of_range_values_per_column = [val for val in values_to_check if (val<min_val or val>max_val) and (pd.isnull(val)==False or val!=np.nan)]
        out_of_range_values+=out_of_range_values_per_column
    
    if len(out_of_range_values) == 0:
        status ='Passed'
        bad_data_desc = ''
    else: 
        status ='Failed'
        bad_data_desc = f"There are {len(out_of_range_values)} out of range values. \nList of up to 20 distinct out of range values: {set(out_of_range_values[0:20])}"

    data = {'Table': [table_name], 
            'File path': [path], 
            'DQ check': [f'Validity by range: min value: {min_val}, max value: {max_val}'], 
            'Column': [string_of_cols], 
            'Status': [status], 
            'Bad Data': [bad_data_desc]}
    result_table = pd.DataFrame(data)

    return result_table


# example of using
path = './Data/source/flights.csv'
path_parquet = './Data/raw/flights'

range_result = check_validity_by_range(path, 'csv', ['DepTime', 'CRSDepTime'], 0, 2359)
range_result


Unnamed: 0,Table,File path,DQ check,Column,Status,Bad Data
0,flights,./Data/source/flights.csv,"Validity by range: min value: 0, max value: 2359","DepTime, CRSDepTime",Failed,There are 1 out of range values. \nList of up ...


In [23]:
# Check uniqueness

def check_uniqueness(path: str, datatype: str, columns_to_check: list=False):
    """
    Checks if file contains duplicates by defined columns
    :param path: path to file for processing. In case of parquet files it could be folder with several parquet files inside
    :param datatype: definition of data type. Could be csv or parquet only
    :param columns_to_check: set of columns, which should unequally define values. If False (default) then all columns are checked
    :return: pandas dataframe row with results of check
    """
    # get table name for logs
    table_name = path.split('/')[-1].split('.')[0]
    
    dataframe = to_pandas_df_converter(path, datatype)

    # get dataframe columns for checks
    if columns_to_check==False:
        columns_to_check = list(dataframe.columns.values)

    # create string with columns names for logs
    string_of_cols = ", ".join(["" + str(item) + "" for item in columns_to_check])

    dataframe_to_check = dataframe[columns_to_check].dropna()
    duplicates = dataframe_to_check[dataframe_to_check.duplicated()]

    if duplicates.shape[0] == 0:
        status = 'Passed'
        bad_data_desc = ''
    else: 
        status = 'Failed'
        bad_data_desc = f"There are {duplicates.shape[0]} duplicates. List of up to 10 duplicated values:\n {duplicates.to_string(index=False, max_rows=10, col_space=10)}"

    data = {'Table': [table_name], 
            'File path': [path], 
            'DQ check': ['Uniqueness by set of columns'], 
            'Column': [string_of_cols], 
            'Status': [status], 
            'Bad Data': [bad_data_desc]}
    result_table = pd.DataFrame(data)

    return result_table


# example of using
path = './Data/source/airports.csv'
path_parquet = './Data/raw/airports'

unq_result = check_uniqueness(path, 'csv', ['iata'])
unq_result

Unnamed: 0,Table,File path,DQ check,Column,Status,Bad Data
0,airports,./Data/source/airports.csv,Uniqueness by set of columns,iata,Failed,There are 6 duplicates. List of up to 10 dupli...


In [24]:
# consistency check. Compares unique values in 2 columns

def check_consistency(first_path: str, first_datatype: str, first_column_to_check: str, second_path: str, second_datatype: str, second_column_to_check: str, comparison_type: str):
    """
    Checks consistency between 2 columns
    :param first_path: path to the first file for consistency check. In case of parquet files it could be folder with several parquet files inside
    :param first_datatype: definition of data type for the first file. Could be csv or parquet only
    :param first_column_to_check: columns from the first file for consistency check
    :param second_path: path to the second file for consistency check. In case of parquet files it could be folder with several parquet files inside
    :param second_datatype: definition of data type for the second file. Could be csv or parquet only
    :param second_column_to_check: columns from the second file for consistency check
    :param comparison_type: type of comparison. Appropriate values: 
                            'full_match' - unique values in both columns should be the same,
                            'first_match_second' - unique values in the first table should exists in the second table,
                            'second_match_first' - unique values in the second table should exists in the first table
    :return: pandas dataframe row with results of check
    """
    first_dataframe = to_pandas_df_converter(first_path, first_datatype)
    first_dataframe_column = first_dataframe[[first_column_to_check]].drop_duplicates().dropna()
    first_table_name = first_path.split('/')[-1].split('.')[0]

    second_dataframe = to_pandas_df_converter(second_path, second_datatype)
    second_dataframe_column = second_dataframe[[second_column_to_check]].drop_duplicates().dropna()
    second_table_name = second_path.split('/')[-1].split('.')[0]

    result_join = first_dataframe_column.merge(second_dataframe_column, left_on=[first_column_to_check], right_on=[second_column_to_check], how='outer', suffixes=('_ss', '_sap'), indicator=True)
    
    if comparison_type == 'full_match':
        comparison_result = result_join[result_join._merge!='both']
        bad_data_desc = \
        dedent(f'''There are {comparison_result.shape[0]} inappropriate values. {comparison_result[comparison_result['_merge']=='left_only'].shape[0]} in first table. {comparison_result[comparison_result['_merge']=='right_only'].shape[0]} in second table.
        Example of up to 20 values, which exist in first table only: {comparison_result[comparison_result['_merge']=='left_only'][first_column_to_check].tolist()[:20]}.
        Example of up to 20 values, which exist in second table only: {comparison_result[comparison_result['_merge']=='right_only'][second_column_to_check].tolist()[:20]}.''')
    elif comparison_type == 'first_match_second':
        comparison_result = result_join[result_join._merge=='left_only']
        bad_data_desc = \
        f'''There are {comparison_result.shape[0]} inappropriate values.\
        Example of up to 20 values, which exist in first table only: {comparison_result[comparison_result['_merge']=='left_only'][first_column_to_check].tolist()[:20]}.'''
    elif comparison_type == 'second_match_first':
        comparison_result = result_join[result_join._merge=='right_only']
        bad_data_desc = \
        dedent(f'''There are {comparison_result.shape[0]} inappropriate values.
        Example of up to 20 values, which exist in second table only: {comparison_result[comparison_result['_merge']=='right_only'][second_column_to_check].tolist()[:20]}.''').strip("\n")
    else:
        raise Exception("Comparison type is not defined properly. Should be 'full_match' or first_match_second' or 'second_match_first' only.")

    if comparison_result.shape[0] == 0:
        status ='Passed'
        bad_data_desc = ''
    else: 
        status ='Failed'

    data = {'Table': [f'''First table: {first_table_name}. \nSecond table: {second_table_name}'''], 
            'File path': [f"First path: {first_path}. \nSecond path: {second_path}"], 
            'DQ check': [f'Consistency between 2 columns'], 
            'Column': [f"Column from first table: {first_column_to_check}. \nColumn from second table: {second_column_to_check}"], 
            'Status': [status], 
            'Bad Data': [bad_data_desc]}
    result_table = pd.DataFrame(data)   

    return result_table


# example of using
first_path = './Data/source/carriers.csv'
second_path = './Data/source/flights.csv'

cons_result = check_consistency(first_path=first_path, first_datatype='csv', first_column_to_check='Code', second_path=second_path, second_datatype='csv', second_column_to_check='UniqueCarrier', comparison_type='second_match_first')
cons_result


Unnamed: 0,Table,File path,DQ check,Column,Status,Bad Data
0,First table: carriers. \nSecond table: flights,First path: ./Data/source/carriers.csv. \nSeco...,Consistency between 2 columns,Column from first table: Code. \nColumn from s...,Failed,There are 2 inappropriate values.\n Exa...


In [25]:
# completeness check. Full join comparison 

def check_pk_completeness(first_path: str, first_datatype: str, second_path: str, second_datatype: str, first_join_keys: list=False, second_join_keys: list=False):
    """
    Check if all Primary keys exists in both tables
    :param first_path: path to the first file. In case of parquet files it could be folder with several parquet files inside
    :param first_datatype: definition of data type for the first file. Could be csv or parquet only
    :param first_join_keys: columns from the first file for joining. If False - all columns
    :param second_path: path to the second file for. In case of parquet files it could be folder with several parquet files inside
    :param second_datatype: definition of data type for the second file. Could be csv or parquet only
    :param second_join_keys: columns from the second file for joining. If False - all columns
    :return: pandas dataframe row with results of check
    """  
    first_dataframe = to_pandas_df_converter(first_path, first_datatype)
    first_dataframe = first_dataframe.astype(str)
    first_table_name = first_path.split('/')[-1].split('.')[0]    
    # get all columns
    if first_join_keys==False:
        first_join_keys = list(first_dataframe.columns.values)

    second_dataframe = to_pandas_df_converter(second_path, second_datatype)
    second_dataframe = second_dataframe.astype(str)
    second_table_name = second_path.split('/')[-1].split('.')[0] 
    # get all columns
    if second_join_keys==False:
        second_join_keys = list(second_dataframe.columns.values)

    result_join = first_dataframe.merge(second_dataframe, left_on=first_join_keys, right_on=second_join_keys, how='outer', suffixes=('_ss', '_sap'), indicator=True)

    check_result = result_join.query("_merge != 'both'")

    if check_result.shape[0] == 0:
        status ='Passed'
    else: 
        status ='Failed'
        bad_data_desc = f'''There are {check_result.query("_merge == 'left_only'").shape[0]} rows which exist in the first table, while are absent in the second one. 
There are {check_result.query("_merge == 'right_only'").shape[0]} rows which exist in the second table, while are absent in the first one.
Up to 10 rows with difference:
        {check_result.to_string(index=False, max_rows=10, col_space=10)}'''

    data = {'Table': [f'''First table: {first_table_name}. \nSecond table: {second_table_name}'''], 
            'File path': [f"First path: {first_path}. \nSecond path: {second_path}"], 
            'DQ check': [f'Consistency between 2 columns'], 
            'Column': [f"Join keys from the first table: {first_join_keys}. \nJoin keys from the second table: {second_join_keys}"], 
            'Status': [status], 
            'Bad Data': [bad_data_desc]}
    result_table = pd.DataFrame(data)   

    return result_table



In [26]:
now = datetime.now()

# create an empty dataframe for result table
result_table = pd.DataFrame(columns = ['Table', 'File path', 'DQ check', 'Column', 'Status', 'Bad Data'])


# Carriers checks
unq_result = check_uniqueness('./Data/source/carriers.csv', 'csv', False)
result_table = pd.concat([result_table, unq_result], ignore_index=True)

nn_result = check_for_empy_fields('./Data/source/carriers.csv', 'csv', False)
result_table = pd.concat([result_table, nn_result], ignore_index=True)

comp_result = check_pk_completeness(first_path='./Data/source/carriers.csv', first_datatype='csv', first_join_keys=['Code', 'Description'], \
                                    second_path='./Data/raw/carriers', second_datatype='parquet', second_join_keys=['code', 'description'])


# Airports checks
unq_result = check_uniqueness('./Data/source/airports.csv', 'csv', ['iata'])
result_table = pd.concat([result_table, unq_result], ignore_index=True)

nn_result = check_for_empy_fields('./Data/source/airports.csv', 'csv', False)
result_table = pd.concat([result_table, nn_result], ignore_index=True)

range_result = check_validity_by_range('./Data/source/airports.csv', 'csv', ['lat'], -90, 90)
result_table = pd.concat([result_table, range_result], ignore_index=True)

range_result = check_validity_by_range('./Data/source/airports.csv', 'csv', ['long'], -180, 180)
result_table = pd.concat([result_table, range_result], ignore_index=True)

comp_result = check_pk_completeness(first_path='./Data/source/airports.csv', first_datatype='csv', first_join_keys=['iata'], \
                                    second_path='./Data/raw/airports', second_datatype='parquet', second_join_keys=['iata'])


# Flights checks
unq_result = check_uniqueness('./Data/source/flights.csv', 'csv', ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'])
result_table = pd.concat([result_table, unq_result], ignore_index=True)

list_of_not_empty_columns = \
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime',
'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum',
'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay',
'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 
'Cancelled', 'Diverted', 'CarrierDelay', 'WeatherDelay',
'NASDelay', 'SecurityDelay', 'LateAircraftDelay']
nn_result = check_for_empy_fields('./Data/source/flights.csv', 'csv', list_of_not_empty_columns)
result_table = pd.concat([result_table, nn_result], ignore_index=True)

range_result = check_validity_by_range('./Data/source/flights.csv', 'csv', ['DepTime', 'CRSDepTime'], 0, 2359)
result_table = pd.concat([result_table, range_result], ignore_index=True)

range_result = check_validity_by_range('./Data/source/flights.csv', 'csv', ['ArrTime', 'CRSArrTime'], 0, 2359)
result_table = pd.concat([result_table, range_result], ignore_index=True)

cons_result = check_consistency(first_path='./Data/source/carriers.csv', 
                                first_datatype='csv', 
                                first_column_to_check='Code', 
                                second_path='./Data/source/flights.csv', 
                                second_datatype='csv', 
                                second_column_to_check='UniqueCarrier', 
                                comparison_type='second_match_first')
result_table = pd.concat([result_table, cons_result], ignore_index=True)

cons_result = check_consistency(first_path='./Data/source/airports.csv', 
                                first_datatype='csv', 
                                first_column_to_check='iata', 
                                second_path='./Data/source/flights.csv', 
                                second_datatype='csv', 
                                second_column_to_check='Origin', 
                                comparison_type='second_match_first')
result_table = pd.concat([result_table, cons_result], ignore_index=True)

cons_result = check_consistency(first_path='./Data/source/airports.csv', 
                                first_datatype='csv', 
                                first_column_to_check='iata', 
                                second_path='./Data/source/flights.csv', 
                                second_datatype='csv', 
                                second_column_to_check='Dest', 
                                comparison_type='second_match_first')
result_table = pd.concat([result_table, cons_result], ignore_index=True)

comp_result = check_pk_completeness(first_path='./Data/source/flights.csv', first_datatype='csv', first_join_keys=['Year', 'Month', 'DayofMonth', 'FlightNum'], \
                                    second_path='./Data/raw/flights', second_datatype='parquet', second_join_keys=['Year', 'Month', 'DayofMonth', 'FlightNum'])

# write results to excel file
result_path=f'./Results/total_result_{now}.xlsx'.replace(':','-')
result_table.to_excel(excel_writer=result_path, sheet_name='results')
