In [222]:
import os
import pandas as pd
import pyarrow.parquet as pq

# Adjust display settings
pd.set_option('display.max_colwidth', None)  # Show full content of DataFrame cells

In [223]:
# List of links
csv_path = '/home/datalab-user/Raw_Data_Validation.csv'

csv_carriers = '/home/datalab-user/source_ok/carriers.csv'
csv_airports = '/home/datalab-user/source_ok/airports.csv'
csv_flights = '/home/datalab-user/source_ok/flights.csv'

parquet_carriers = '/home/datalab-user/raw_ok/carriers'
parquet_airports = '/home/datalab-user/raw_ok/airports'
parquet_flights = '/home/datalab-user/raw_ok/flights'

In [224]:
#  Functions for the DataFrame and .csv file Raw_Data_Validation to store check results

def initialize_validation_dataframe():
    """
    Initialize the Raw_Data_Validation DataFrame or load existing DataFrame from CSV if it exists.
    """
    
    if os.path.exists(csv_path):
        # Load existing DataFrame from CSV
        Raw_Data_Validation = pd.read_csv(csv_path, keep_default_na=False)
    else:
        # Create a new DataFrame if CSV doesn't exist
        Raw_Data_Validation = pd.DataFrame(columns=['Num', 'Table', 'DQ check', 'Column', 'Status', 'Bad Data', 'Test Case'])
    
    return Raw_Data_Validation

def save_validation_results(Raw_Data_Validation):
    """
    Save the Raw_Data_Validation DataFrame to a CSV file.
    """
    Raw_Data_Validation.to_csv(csv_path, index=False)

def add_validation_result(Raw_Data_Validation, table, dq_check, column, status, bad_data, tc_num):
    """
    Add a new validation result to the Raw_Data_Validation DataFrame.
    """
    num = len(Raw_Data_Validation) + 1
    new_row = {
        'Num': num,
        'Table': table,
        'DQ check': dq_check,
        'Column': column,
        'Status': status,
        'Bad Data': bad_data,
        'Test Case': tc_num
    }
    Raw_Data_Validation = Raw_Data_Validation.append(new_row, ignore_index=True)

    return Raw_Data_Validation

In [225]:
def drop_validation_dataframe():
    """
    Drop Raw_Data_Validation DataFrame and delete existing CSV if it exists.
    """
    
    if os.path.exists(csv_path):
        # Remove existing CSV
        os.remove(csv_path)
    
    # Drop Raw_Data_Validation DataFrame
    Raw_Data_Validation = pd.DataFrame(columns=['Num', 'Table', 'DQ check', 'Column', 'Status', 'Bad Data', 'Test Case'])
    
    return Raw_Data_Validation

In [226]:
def check_null_values_csv(folder_path, table_name, dq_check, column_name, tc_num):
    """
    Perform completeness check for NULL or empty values in a specific column of csv files.
    
    Arguments:
    folder_path (str): Path to the folder containing Parquet files.
    table_name (str): Name of the table (equal to the folder name).
    column_name (str): Name of the column to perform completeness check on.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Load CSV file from the specified source folder into DataFrame
    source_df = pd.read_csv(folder_path, keep_default_na=False)
    
    # Call check_null_values function to perform check and save results into the Raw_Data_Validation
    check_null_values(source_df, table_name, dq_check, column_name, tc_num)
    
    return    

def check_null_values_parquet(folder_path, table_name, dq_check, column_name, tc_num):
    """
    Perform completeness check for NULL or empty values in a specific column of Parquet files.
    
    Arguments:
    folder_path (str): Path to the folder containing Parquet files.
    table_name (str): Name of the table (equal to the folder name).
    column_name (str): Name of the column to perform completeness check on.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
       
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(folder_path, f)).to_pandas() 
                           for f in os.listdir(folder_path) if f.endswith('.parquet')],
                          ignore_index=True)
        
    # Call check_null_values function to perform check and save results into the Raw_Data_Validation
    check_null_values(target_df, table_name, dq_check, column_name, tc_num)
    
    return


def check_null_values(table_df, table_name, dq_check, column_name, tc_num):
    
    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Check for NULL or empty values in the specified column of combined_df
    bad_data_indices = table_df[table_df[column_name].isnull() | (table_df[column_name] == '')].index
    
    # Get next indices (index + 1) for incorrect rows to start from 1
    next_indices = [idx + 1 for idx in bad_data_indices]     
    
    # Determine the status based on completeness check
    if len(next_indices) == 0:
        status = 'Passed'
        bad_data_summary = 'No missing values in Table'
    else:
        status = 'Failed'
        total_bad_rows = len(bad_data_indices)
        bad_data_summary = f"Total missing values: {total_bad_rows}; Rows #: {', '.join(map(str, next_indices))}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, column_name, status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [227]:
def compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                           target_folder_path, target_table_name, target_column_name, dq_check, tc_num):
    """
    Perform check if all values from source column are present in target column of parquet file and vice versa.
    
    Arguments:
    source_csv_path (str): Path to the source CSV file.
    source_table_name (str): Name of the source table (equal to the CSV file name).
    source_column_name (str): Name of the source column to check.
    target_folder_path (str): Path to the folder containing target Parquet files.
    target_table_name (str): Name of the target table (equal to the folder name).
    target_column_name (str): Name of the target column to check.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Read source CSV file into a DataFrame
    source_df = pd.read_csv(source_csv_path, keep_default_na=False)

    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(target_folder_path, f)).to_pandas() 
                           for f in os.listdir(target_folder_path) if f.endswith('.parquet')],
                          ignore_index=True)
    
    # Check if all source column values are present in target column
    source_values_set = set(source_df[source_column_name])
    target_values_set = set(target_df[target_column_name])
  
  
    if source_values_set.issubset(target_values_set) and target_values_set.issubset(source_values_set):
        status = 'Passed'
        bad_data_summary = 'Same values in Source and Target'
    else:
        status = 'Failed'
        source_missing_values = sorted(list(source_values_set - target_values_set))
        target_missing_values = sorted(list(target_values_set - source_values_set))
        
        source_missing_str = ", ".join(f"'{val}'" for val in source_missing_values) if source_missing_values else ''
        target_missing_str = ", ".join(f"'{val}'" for val in target_missing_values) if target_missing_values else ''
        
        if source_missing_str and target_missing_str:
            bad_data_summary = (
                f"Total rows with bad data: {len(source_missing_values) + len(target_missing_values)}; "
                f"Missing in Target: {source_missing_str}; Missing in Source: {target_missing_str}"
            )
        elif source_missing_str:
            bad_data_summary = f"Total rows with bad data: {len(source_missing_values)}; Missing in Target: {source_missing_str}"
        elif target_missing_str:
            bad_data_summary = f"Total rows with bad data: {len(target_missing_values)}; Missing in Source: {target_missing_str}"
        else:
            bad_data_summary = 'Same values in Source and Target'
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, [source_table_name, target_table_name], dq_check, [source_column_name, target_column_name], status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [228]:
def compare_row_counts(source_csv_path, source_table_name, target_folder_path, target_table_name, dq_check, tc_num):
    """
    Perform row count comparison between source CSV file and target Parquet files.
    
    Arguments:
    source_csv_path (str): Path to the source CSV file.
    source_table_name (str): Name of the source table (equal to the CSV file name).
    target_folder_path (str): Path to the folder containing target Parquet files.
    target_table_name (str): Name of the target table (equal to the folder name).
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Read source CSV file into a DataFrame
    source_df = pd.read_csv(source_csv_path, keep_default_na=False)
    
    # Count rows in source DataFrame
    source_row_count = len(source_df)
    
    # Count rows in target Parquet files
    target_row_count = sum(pq.read_table(os.path.join(target_folder_path, f)).to_pandas().shape[0]
                           for f in os.listdir(target_folder_path) if f.endswith('.parquet'))
    
    # Compare row counts between source and target
    if source_row_count == target_row_count:
        status = 'Passed'
        bad_data_summary = 'Same Row counts in Source and Target'
    else:
        status = 'Failed'
        bad_data_summary = f"Source row counts: {source_row_count}; Target row counts: {target_row_count}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, target_table_name, dq_check, '', status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [229]:
def check_for_duplicates_csv(source_csv_path, table_name, columns, dq_check, tc_num):
    """
    Perform duplicate check in the source CSV file based on specified columns.
    
    Arguments:
    source_csv_path (str): Path to the source CSV file.
    source_table_name (str): Name of the source table (equal to the CSV file name).
    columns (list of str): List of column names to check for duplicates.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    
    # Read source CSV file into a DataFrame
    source_df = pd.read_csv(source_csv_path, keep_default_na=False)

    # Call check_for_duplicates function to perform check and save results into the Raw_Data_Validation
    check_for_duplicates(source_df, table_name, columns, dq_check, tc_num)
    
    return

def check_for_duplicates_parquet(target_path, table_name, columns, dq_check, tc_num):
    """
    Perform duplicate check in the target parquet file based on specified columns.
    
    Arguments:
    target_path (str): Path to the target parquet file.
    target_table_name (str): Name of the target table.
    columns (list of str): List of column names to check for duplicates.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(target_folder_path, f)).to_pandas() 
                           for f in os.listdir(target_folder_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_for_duplicates function to perform check and save results into the Raw_Data_Validation
    check_for_duplicates(target_df, table_name, columns, dq_check, tc_num)
    
    return    
    
def check_for_duplicates(table_df, table_name, columns, dq_check, tc_num):    

    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Check for duplicates based on specified columns
    duplicate_rows = table_df.duplicated(subset=columns, keep=False)
    duplicate_indices = table_df[duplicate_rows].index.tolist()
    num_duplicate_rows = len(duplicate_indices)
    
    # Get next indices (index + 1) for duplicate rows to start from 1
    next_indices = [idx + 1 for idx in duplicate_indices]    
    
    # Determine status based on duplicate check
    if num_duplicate_rows == 0:
        status = 'Passed'
        bad_data_summary = 'No duplicates'
    else:
        status = 'Failed'
        bad_data_summary =  f"Duplicates: {num_duplicate_rows}; Duplicate Row Indices: {', '.join(map(str, next_indices))}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, columns, status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [230]:
def check_state_consistency_csv(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform state consistency check based on country for the airport table (airport.csv).
    
    Arguments:
    table_path (str): Path to the source CSV file.
    table_name (str): Name of the source table (equal to the CSV file name).
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
  
    # Read airport CSV file into a DataFrame
    source_df = pd.read_csv(table_path, keep_default_na=False)
    
    # Call check_state_consistency function to perform check and save results into the Raw_Data_Validation
    check_state_consistency(source_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return     

def check_state_consistency_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform state consistency check based on country for the target parquet file.
    
    Arguments:
    table_path (str): Path to the  target parquet file.
    table_name (str): Name of the target parquet file.
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(table_path, f)).to_pandas() 
                           for f in os.listdir(table_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_state_consistency function to perform check and save results into the Raw_Data_Validation
    check_state_consistency(target_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return     

def check_state_consistency(table_df, table_name, state_column, country_column, dq_check, tc_num):
    
    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Sort table_df DataFrame by specified columns in ascending order
    sort_columns = ['iata']
    table_df = table_df.sort_values(by=sort_columns, ascending=True)

    # Reset index of table_df after sorting
    table_df = table_df.reset_index(drop=True)     
    
    # Filter rows where 'state' is 'NA' and country is not 'USA'
    incorrect_records = table_df[((table_df[state_column] == 'NA') & (table_df[country_column] == 'USA')) | 
                                ((table_df[state_column] != 'NA') & (table_df[country_column] != 'USA'))]
    num_incorrect_records = len(incorrect_records)
    
    # Determine status based on state consistency check
    if num_incorrect_records == 0:
        status = 'Passed'
        bad_data_summary = 'No incorrect NA values'
    else:
        status = 'Failed'
      # Convert 'iata' values to strings in quotes and join them into a comma-separated string
        incorrect_iata_values = ', '.join(["'{}'".format(str(val)) for val in incorrect_records['iata'].tolist()])
        bad_data_summary = f"Number of incorrect records: {num_incorrect_records}; Incorrect records 'iata': {incorrect_iata_values}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, [state_column, country_column], status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [231]:
def check_missing_state_usa_csv(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform missing 'state' check for the 'USA' country in the airports table (airports.csv).
    
    Arguments:
    table_path (str): Path to the source CSV file (airports.csv).
    table_name (str): Name of the source table (equal to the CSV file name).
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Read airport CSV file into a DataFrame
    source_df = pd.read_csv(table_path, keep_default_na=False)
    
    # Call check_missing_state_usa function to perform check and save results into the Raw_Data_Validation
    check_missing_state_usa(source_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return     

def check_missing_state_usa_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform missing 'state' check for the 'USA' country in the airports parquet file.
    
    Arguments:
    table_path (str): Path to the target parquet file (airports).
    table_name (str): Name of the source table (equal to the CSV file name).
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(table_path, f)).to_pandas() 
                           for f in os.listdir(table_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_missing_state_usa function to perform check and save results into the Raw_Data_Validation
    check_missing_state_usa(target_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return     

def check_missing_state_usa(table_df, table_name, state_column, country_column, dq_check, tc_num):

    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()
    
    # Sort table_df DataFrame by specified columns in ascending order
    sort_columns = ['iata']
    table_df = table_df.sort_values(by=sort_columns, ascending=True)

    # Reset index of table_df after sorting
    table_df = table_df.reset_index(drop=True)     
    
    # Filter rows where 'state' is missing or 'NA' for the 'USA' country
    missing_state_records = table_df[((table_df[state_column] == '') & (table_df[country_column] == 'USA')) |
                                      ((table_df[state_column] == 'NA') & (table_df[country_column] == 'USA'))]
    num_missing_state_records = len(missing_state_records)
    
    # Determine status based on missing state check
    if num_missing_state_records == 0:
        status = 'Passed'
        bad_data_summary = 'No missing or NA states for the USA country'
    else:
        status = 'Failed'
      # Convert 'iata' values to strings in quotes and join them into a comma-separated string
        missing_iata_values = ', '.join(["'{}'".format(str(val)) for val in missing_state_records['iata'].tolist()])
        bad_data_summary = f"Number of incorrect records: {num_missing_state_records}; Incorrect records 'iata': {missing_iata_values}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, [state_column, country_column], status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [232]:
# Define a list of valid US state abbreviations
valid_state_abbreviations = [
    'AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS',
    'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY',
    'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY'
]

def check_valid_state_abbreviations_csv(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform state abbreviation check for rows with 'USA' country in the airport table (airports.csv).
    
    Arguments:
    table_path (str): Path to the source CSV file (airports.csv).
    table_name (str): Name of the source table (equal to the CSV file name).
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Read airport CSV file into a DataFrame
    source_df = pd.read_csv(table_path, keep_default_na=False)
    
    # Call check_missing_state_usa function to perform check and save results into the Raw_Data_Validation
    check_valid_state_abbreviations(source_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return     

def check_valid_state_abbreviations_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num):
    """
    Perform state abbreviation check for rows with 'USA' country in the airports parquet file.
    
    Arguments:
    table_path (str): Path to the target parquet file (airports).
    table_name (str): Name of the source table (equal to the CSV file name).
    state_column (str): Name of the 'state' column in the airport table.
    country_column (str): Name of the 'country' column in the airport table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(table_path, f)).to_pandas() 
                           for f in os.listdir(table_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_valid_state_abbreviations function to perform check and save results into the Raw_Data_Validation
    check_valid_state_abbreviations(target_df, table_name, state_column, country_column, dq_check, tc_num)
    
    return  

def check_valid_state_abbreviations(table_df, table_name, state_column, country_column, dq_check, tc_num):

    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()

    # Sort table_df DataFrame by specified columns in ascending order
    sort_columns = ['iata']
    table_df = table_df.sort_values(by=sort_columns, ascending=True)

    # Reset index of table_df after sorting
    table_df = table_df.reset_index(drop=True)     
    
    # Filter rows where 'country' is 'USA'
    usa_records = table_df[table_df[country_column] == 'USA']
    
    # Filter USA records with invalid state abbreviations
    invalid_state_records = usa_records[~usa_records[state_column].isin(valid_state_abbreviations)]
    num_invalid_state_records = len(invalid_state_records)
    
    # Determine status based on state abbreviation check
    if num_invalid_state_records == 0:
        status = 'Passed'
        bad_data_summary = 'No incorrect state abbreviation'
    else:
        status = 'Failed'
        # Get list of incorrect state abbreviations
        incorrect_abbreviations = invalid_state_records[state_column].tolist()
        incorrect_abbreviations_str = ', '.join(incorrect_abbreviations)
        bad_data_summary = f"Incorrect state abbreviation: {num_invalid_state_records}; Incorrect records: {incorrect_abbreviations_str}"
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, [state_column, country_column], status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return 

In [233]:
def check_cancellation_codes_csv(table_path, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num):
    """
    Perform cancellation code check based on the 'Cancelled' column values in the flights table (flights.csv).
    
    Arguments:
    table_path (str): Path to the source CSV file (flights.csv).
    table_name (str): Name of the source table (equal to the CSV file name).
    cancelled_column (str): Name of the 'Cancelled' column in the flights table.
    cancellation_code_column (str): Name of the 'CancellationCode' column in the flights table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Read airport CSV file into a DataFrame
    source_df = pd.read_csv(table_path, keep_default_na=False)
    
    # Call check_cancellation_codes function to perform check and save results into the Raw_Data_Validation
    check_cancellation_codes(source_df, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num)
    
    return
    
    
def check_cancellation_codes_parquet(table_path, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num):
    """
    Perform cancellation code check based on the 'Cancelled' column values in the flights parquet file.
    
    Arguments:
    table_path (str): Path to the flights parquet file.
    table_name (str): Name of the target parquet file.
    cancelled_column (str): Name of the 'Cancelled' column in the flights table.
    cancellation_code_column (str): Name of the 'CancellationCode' column in the flights table.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(table_path, f)).to_pandas() 
                           for f in os.listdir(table_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_cancellation_codes function to perform check and save results into the Raw_Data_Validation
    check_cancellation_codes(target_df, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num)
    
    return  


def check_cancellation_codes(table_df, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num):

    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()

    # Sort table_df DataFrame by specified columns in ascending order
    sort_columns = ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']
    table_df = table_df.sort_values(by=sort_columns, ascending=True)

    # Reset index of table_df after sorting
    table_df = table_df.reset_index(drop=True)     
    
    # Filter rows with incorrect cancellation codes based on business rules
    incorrect_records = table_df[
        ((table_df[cancelled_column] == '1') & ~table_df[cancellation_code_column].isin(['A', 'B', 'C'])) |
        ((table_df[cancelled_column] == '0') & (table_df[cancellation_code_column] != ''))
    ]
    
    num_incorrect_records = len(incorrect_records)
    
    # Determine status based on cancellation code check
    if num_incorrect_records == 0:
        status = 'Passed'
        bad_data_summary = 'No incorrect values'
    else:
        status = 'Failed'
        # Get list of row numbers with incorrect cancellation codes
        incorrect_indices = incorrect_records.index.tolist()
        # Adjust row indices to start counting from 1
        incorrect_indices = [idx + 1 for idx in incorrect_indices]
        bad_data_summary = (
            f"Number of incorrect records: {num_incorrect_records}; "
            f"Rows with bad data: {', '.join(map(str, incorrect_indices))}"
        )   
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, [cancelled_column, cancellation_code_column], status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [234]:
def check_elapsed_time_calculation_csv(
        table_path, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num):
    """
    Perform CRSElapsedTime calculation check based on 'CRSArrTime' and 'CRSDepTime' columns in the flights table (flights.csv).
    
    Arguments:
    table_path (str): Path to the source CSV file (flights.csv).
    table_name (str): Name of the source table (equal to the CSV file name).
    elapsed_time_column (str): Name of the 'CRSElapsedTime' column in the flights table.
    arr_time_column (str): Name of the 'CRSArrTime' (Scheduled arrival time) column.
    dep_time_column (str): Name of the 'CRSDepTime' (Scheduled departure time) column.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Read airport CSV file into a DataFrame
    source_df = pd.read_csv(table_path, keep_default_na=False)
    
    # Call check_elapsed_time_calculation function to perform check and save results into the Raw_Data_Validation
    check_elapsed_time_calculation(source_df, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num)
    
    return
    
    
def check_elapsed_time_calculation_parquet(
        table_path, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num):
    """
    Perform CRSElapsedTime calculation check based on 'CRSArrTime' and 'CRSDepTime' columns in the flights parquet file.
    
    Arguments:
    table_path (str): Path to the target parquet file (flights).
    table_name (str): Name of the target parquet file.
    elapsed_time_column (str): Name of the 'CRSElapsedTime' column in the flights table.
    arr_time_column (str): Name of the 'CRSArrTime' (Scheduled arrival time) column.
    dep_time_column (str): Name of the 'CRSDepTime' (Scheduled departure time) column.
    dq_check (str): Name of the data quality check.
    tc_num (str): Test Case number.    
    
    Returns:
    pd.DataFrame: Updated Raw_Data_Validation DataFrame with the validation results.
    """
    # Load Parquet files from the specified target folder and concatenate them into one DataFrame
    target_df = pd.concat([pq.read_table(os.path.join(table_path, f)).to_pandas() 
                           for f in os.listdir(table_path) if f.endswith('.parquet')],
                          ignore_index=True)    
    
    # Call check_elapsed_time_calculation function to perform check and save results into the Raw_Data_Validation
    check_elapsed_time_calculation(target_df, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num)
    
    return  


def convert_time_to_hhmm(time_str):
    """
    Convert various time string formats to HH:MM format.
    """
    try:
        # Clean the time string by removing non-numeric characters
        cleaned_time_str = ''.join(filter(str.isdigit, time_str))

        # Ensure the cleaned time string has at least four digits (HHMM format)
        cleaned_time_str = cleaned_time_str.zfill(4)

        # Extract hours and minutes
        hours = int(cleaned_time_str[:2])
        minutes = int(cleaned_time_str[2:])

        # Validate hours and minutes ranges (0-23 for hours, 0-59 for minutes)
        if 0 <= hours <= 23 and 0 <= minutes <= 59:
            # Format as HH:MM
            formatted_time = f'{hours:02}:{minutes:02}'
            return formatted_time
        else:
            raise ValueError(f"Invalid time format: {time_str}")
    except Exception as e:
        # Handle any errors during time conversion
        print(f"Error converting time '{time_str}': {e}")
        return None  # Return None for invalid or unexpected inputs  
    
    
def check_elapsed_time_calculation(
    table_df, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num):

    # Initialize or load the Raw_Data_Validation DataFrame
    Raw_Data_Validation = initialize_validation_dataframe()  
    
    # Sort table_df DataFrame by specified columns in ascending order
    sort_columns = ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']
    table_df = table_df.sort_values(by=sort_columns, ascending=True)

    # Reset index of table_df after sorting
    table_df = table_df.reset_index(drop=True)    
    
    # Convert time columns to HH:MM format
    table_df[arr_time_column] = table_df[arr_time_column].apply(convert_time_to_hhmm)
    table_df[dep_time_column] = table_df[dep_time_column].apply(convert_time_to_hhmm)
    
    # Calculate expected CRSElapsedTime and round to nearest whole number
    table_df['expected_elapsed_time'] = (
        pd.to_datetime(table_df[arr_time_column], format='%H:%M') -
        pd.to_datetime(table_df[dep_time_column], format='%H:%M')
    ).dt.total_seconds() / 60
    
    table_df['expected_elapsed_time'] = table_df['expected_elapsed_time'].round()
    
    # Convert existing CRSElapsedTime column to numeric type
    table_df[elapsed_time_column] = pd.to_numeric(table_df[elapsed_time_column], errors='coerce')
    
    # Compare expected CRSElapsedTime with recorded CRSElapsedTime
    table_df['is_elapsed_time_correct'] = table_df[elapsed_time_column] == table_df['expected_elapsed_time']      
    
    # Filter out rows with incorrect CRSElapsedTime
    incorrect_records = table_df[~table_df['is_elapsed_time_correct']]
    num_incorrect_records = len(incorrect_records)
    
    # Determine status based on CRSElapsedTime consistency check
    if num_incorrect_records == 0:
        status = 'Passed'
        bad_data_summary = 'No incorrect records'
    else:
        status = 'Failed'
        # Get list of row numbers with correct CRSElapsedTime
        incorrect_indices = incorrect_records.index.tolist()
        # Adjust row indices to start counting from 1
        incorrect_indices = [idx + 1 for idx in incorrect_indices]
        bad_data_summary = (
            f"Number of incorrect records: {num_incorrect_records}; "
            f"Rows #: {', '.join(map(str, incorrect_indices))}"        
        )
    
    # Add validation result to Raw_Data_Validation DataFrame
    Raw_Data_Validation = add_validation_result(
        Raw_Data_Validation, table_name, dq_check, [elapsed_time_column, arr_time_column, dep_time_column],
        status, bad_data_summary, tc_num
    )
    
    # Save the updated Raw_Data_Validation DataFrame to CSV
    save_validation_results(Raw_Data_Validation)
    
    return

In [235]:
# Prepare Raw_Data_Validation DataFrame
drop_validation_dataframe()

Unnamed: 0,Num,Table,DQ check,Column,Status,Bad Data,Test Case


In [236]:
# DQ_CHECK #1:

# Check that no NULL and empty values in the 'code' PK column of the source 'carriers' table 

# Initial parameters:
folder_path = csv_carriers
table_name = 'carriers.csv'
dq_check = 'Completeness'
column_name = 'Code'
tc_num = 'TC #1'

# Perform completeness check and store results in Raw_Data_Validation DataFrame
check_null_values_csv(folder_path, table_name, dq_check, column_name, tc_num)

In [237]:
# DQ_CHECK #2:

# Check that no NULL and empty values in the 'Description' PK column of the source 'carriers' table 

# Initial parameters:
folder_path = csv_carriers
table_name = 'carriers.csv'
dq_check = 'Completeness'
column_name = 'Description'
tc_num = 'TC #2'

# Perform completeness check and store results in Raw_Data_Validation DataFrame
check_null_values_csv(folder_path, table_name, dq_check, column_name, tc_num)

In [238]:
# DQ_CHECK #3:

# Check that no NULL and empty values in the 'code' PK column of the target 'carriers' table 

# Initial parameters:
folder_path = parquet_carriers
table_name = 'carriers'
dq_check = 'Completeness'
column_name = 'code'
tc_num = 'TC #3'

# Perform completeness check and store results in Raw_Data_Validation DataFrame
check_null_values_parquet(folder_path, table_name, dq_check, column_name, tc_num)


In [239]:
# DQ_CHECK #4:

# Check that no NULL and empty values in the 'description' PK column of the target 'carriers' table 

# Initial parameters:
dq_check = 'Completeness'
folder_path = parquet_carriers
table_name = 'carriers'
column_name = 'description'
tc_num = 'TC #4'

# Perform completeness check and store results in Raw_Data_Validation DataFrame
check_null_values_parquet(folder_path, table_name, dq_check, column_name, tc_num)

In [240]:
# DQ_CHECK #5:

# Check that all values from source table are present in target table correspondent column, no data gaps

# Initial parameters:
dq_check = 'Completeness'
source_csv_path = csv_carriers
source_table_name = 'carriers.csv'
source_column_name = 'Code'
target_folder_path = parquet_carriers
target_table_name = 'carriers'
target_column_name = 'code'
tc_num = 'TC #5'

# Perform completeness check on values in the source and target column 'code'
compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                       target_folder_path, target_table_name, target_column_name, dq_check, tc_num)

In [241]:
# DQ_CHECK #6:

# Check that all values from source table are present in target table correspondent column, no data gaps

# Initial parameters:
source_csv_path = csv_carriers
source_table_name = 'carriers.csv'
source_column_name = 'Description'
target_folder_path = parquet_carriers
target_table_name = 'carriers'
target_column_name = 'description'
dq_check = 'Completeness'
tc_num = 'TC #6'

# Perform completeness check on values in the source and target column 'description'
compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                       target_folder_path, target_table_name, target_column_name, dq_check, tc_num)

In [242]:
# DQ_CHECK #7:

# Check that row counts in source and target tables are equal

# Initial parameters:
source_csv_path = csv_carriers
source_table_name = 'carriers.csv'
target_folder_path = parquet_carriers
target_table_name = 'carriers, carriers.csv'
dq_check = 'Completeness'
tc_num = 'TC #7'

# Perform row count comparison between source and target
compare_row_counts(source_csv_path, source_table_name, target_folder_path, target_table_name, dq_check, tc_num)

In [243]:
# DQ_CHECK #8:

# Check that no duplicate values in source table

# Initial parameters:
source_csv_path = csv_carriers
table_name = 'carriers.csv'
columns_to_check = ['Code', 'Description']
dq_check = 'Completeness'
tc_num = 'TC #8'

# Perform duplicate check based on specified columns
check_for_duplicates_csv(source_csv_path, table_name, columns_to_check, dq_check, tc_num)

In [244]:
# DQ_CHECK #9:

# Check that no duplicate values in target table

# Initial parameters:
target_path = parquet_carriers
table_name = 'carriers'
columns_to_check = ['code', 'description']
dq_check = 'Completeness'
tc_num = 'TC #9'

# Perform duplicate check based on specified columns
check_for_duplicates_parquet(target_path, table_name, columns_to_check, dq_check, tc_num)

In [245]:
# DQ_CHECK #10:

# Source table 'airports.csv'
# Check that no 'NA' values in the state column for the 'USA' country
# and 'NA' values in the state column for each non 'USA' country

# Initial parameters:
table_path = csv_airports
table_name = 'airports.csv'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #10'

# Perform duplicate check based on specified columns
check_state_consistency_csv(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [246]:
# DQ_CHECK #11:

# Target table 'airports', parquet
# Check that no 'NA' values in the state column for the 'USA' country
# and 'NA' values in the state column for each non 'USA' country

# Initial parameters:
table_path = parquet_airports
table_name = 'airports'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #11'

# Perform duplicate check based on specified columns
check_state_consistency_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [247]:
# DQ_CHECK #12:

# Source table 'airports.csv'
# Check that State specified for all records where country 'USA' in source table

# Initial parameters:
table_path = csv_airports
table_name = 'airports.csv'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #12'

# Perform duplicate check based on specified columns
check_missing_state_usa_csv(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [248]:
# DQ_CHECK #13:

# Target table 'airports', parquet
# Check that State specified for all records where country 'USA' in target table

# Initial parameters:
table_path = parquet_airports
table_name = 'airports'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #13'

# Perform duplicate check based on specified columns
check_missing_state_usa_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [249]:
# DQ_CHECK #14:

# Compare values in source and target table for the 'state' column
# Initial parameters:
source_csv_path = csv_airports
source_table_name = 'airports.csv'
source_column_name = 'state'
target_folder_path = parquet_airports
target_table_name = 'airports'
target_column_name = 'state'
dq_check = 'Consistency'
tc_num = 'TC #14'

# Perform completeness check on values in the source and target column 'state'
compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                       target_folder_path, target_table_name, target_column_name, dq_check, tc_num)

In [250]:
# DQ_CHECK #15:  (optional for the Consistency checks)

# Source table 'airports.csv'
# Check that abbreviation in the state column for the 'USA' country is correct

# Initial parameters:
table_path = csv_airports
table_name = 'airports.csv'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #15'

# Perform abbreviation check based on specified columns
check_valid_state_abbreviations_csv(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [251]:
# DQ_CHECK #16:  (optional for the Consistency checks)

# Source table 'airports'
# Check that abbreviation in the state column for the 'USA' country is correct

# Initial parameters:
table_path = parquet_airports
table_name = 'airports'
state_column = 'state'
country_column = 'country'
dq_check = 'Consistency'
tc_num = 'TC #16'

# Perform abbreviation check based on specified columns
check_valid_state_abbreviations_parquet(table_path, table_name, state_column, country_column, dq_check, tc_num)

In [252]:
# DQ_CHECK #17:

# Source table 'flights.csv'
# Check if 'Cancelled' is 0 then CancellationCode is NULL
# and if 'Cancelled' is 1 then CancellationCode in the ['A', 'B', 'C']

# Initial parameters:
table_path = csv_flights
table_name = 'flights.csv'
cancelled_column = 'Cancelled'
cancellation_code_column = 'CancellationCode'
dq_check = 'Consistency'
tc_num = 'TC #17'

# Perform duplicate check based on specified columns
check_cancellation_codes_csv(table_path, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num)

In [253]:
# DQ_CHECK #18:

# Target table 'flights', parquet
# Check if 'Cancelled' is 0 then CancellationCode is NULL
# and if 'Cancelled' is 1 then CancellationCode in the ['A', 'B', 'C']

# Initial parameters:
table_path = parquet_flights
table_name = 'flights'
cancelled_column = 'Cancelled'
cancellation_code_column = 'CancellationCode'
dq_check = 'Consistency'
tc_num = 'TC #18'

# Perform duplicate check based on specified columns
check_cancellation_codes_parquet(table_path, table_name, cancelled_column, cancellation_code_column, dq_check, tc_num)

In [254]:
# DQ_CHECK #19:

# Compare values in source and target table for the 'CancellationCode' column
# Initial parameters:
source_csv_path = csv_flights
source_table_name = 'flights.csv'
source_column_name = 'CancellationCode'
target_folder_path = parquet_flights
target_table_name = 'flights'
target_column_name = 'CancellationCode'
dq_check = 'Consistency'
tc_num = 'TC #19'

# Perform consistency check on values in the source and target column 'CancellationCode'
compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                       target_folder_path, target_table_name, target_column_name, dq_check, tc_num)

In [255]:
# DQ_CHECK #20:

# Source table 'flights.csv'
# Check that CRSElapsedTime column values are calculated correctly

# Initial parameters:
table_path = csv_flights
table_name = 'flights.csv'
elapsed_time_column = 'CRSElapsedTime'
arr_time_column = 'CRSArrTime'
dep_time_column = 'CRSDepTime'
dq_check = 'Consistency'
tc_num = 'TC #20'

# Perform duplicate check based on specified columns
check_elapsed_time_calculation_csv(
    table_path, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num)

Error converting time '870': Invalid time format: 870
Error converting time '1275': Invalid time format: 1275


In [256]:
# DQ_CHECK #21:

# Target table 'flights', parquet
# Check that CRSElapsedTime column values are calculated correctly

# Initial parameters:
table_path = parquet_flights
table_name = 'flights'
elapsed_time_column = 'CRSElapsedTime'
arr_time_column = 'CRSArrTime'
dep_time_column = 'CRSDepTime'
dq_check = 'Consistency'
tc_num = 'TC #21'

# Perform duplicate check based on specified columns
check_elapsed_time_calculation_parquet(
    table_path, table_name, elapsed_time_column, arr_time_column, dep_time_column, dq_check, tc_num)

Error converting time '870': Invalid time format: 870
Error converting time '1275': Invalid time format: 1275


In [257]:
# DQ_CHECK #22:

# Compare values in source and target table for the 'CRSElapsedTime' column
# Initial parameters:
source_csv_path = csv_flights
source_table_name = 'flights.csv'
source_column_name = 'CRSElapsedTime'
target_folder_path = parquet_flights
target_table_name = 'flights'
target_column_name = 'CRSElapsedTime'
dq_check = 'Consistency'
tc_num = 'TC #22'

# Perform consistency check on values in the source and target column 'CRSElapsedTime'
compare_values_source_target(source_csv_path, source_table_name, source_column_name,
                       target_folder_path, target_table_name, target_column_name, dq_check, tc_num)

In [258]:
# Print check results:

pd.set_option('display.max_colwidth', None)  # Show full content of DataFrame cells
Raw_Data_Validation = pd.read_csv(csv_path, keep_default_na=False)
Raw_Data_Validation.head(30)

Unnamed: 0,Num,Table,DQ check,Column,Status,Bad Data,Test Case
0,1,carriers.csv,Completeness,Code,Failed,"Total missing values: 4; Rows #: 8, 36, 52, 79",TC #1
1,2,carriers.csv,Completeness,Description,Failed,"Total missing values: 3; Rows #: 8, 15, 16",TC #2
2,3,carriers,Completeness,code,Failed,"Total missing values: 4; Rows #: 8, 36, 52, 79",TC #3
3,4,carriers,Completeness,description,Failed,"Total missing values: 3; Rows #: 8, 15, 16",TC #4
4,5,"['carriers.csv', 'carriers']",Completeness,"['Code', 'code']",Passed,Same values in Source and Target,TC #5
5,6,"['carriers.csv', 'carriers']",Completeness,"['Description', 'description']",Passed,Same values in Source and Target,TC #6
6,7,"carriers, carriers.csv",Completeness,,Passed,Same Row counts in Source and Target,TC #7
7,8,carriers.csv,Completeness,"['Code', 'Description']",Failed,"Duplicates: 7; Duplicate Row Indices: 5, 7, 1495, 1502, 1503, 1504, 1505",TC #8
8,9,carriers,Completeness,"['code', 'description']",Failed,"Duplicates: 7; Duplicate Row Indices: 5, 7, 1495, 1502, 1503, 1504, 1505",TC #9
9,10,airports.csv,Consistency,"['state', 'country']",Failed,"Number of incorrect records: 12; Incorrect records 'iata': '', '01B', '01C', '01D', 'CLD', 'HHH', 'MIB', 'MQT', 'RCA', 'RDR', 'SCE', 'SKA'",TC #10
