In [None]:
import pandas as pd
import numpy as np

In [None]:
final_result = [] # table/DQ check/ Column/ Status/ Bad Data

In [None]:
def read_parquet_files(parquet_files):
    dataframes = [pd.read_parquet(file) for file in parquet_files]
    my_df = pd.concat(dataframes, ignore_index=True)
    return my_df 

In [None]:
def get_column_value(column_raw):
    if column_raw == "NA":
        return 0
    else:
        return int(column_raw or 0)

In [None]:
def check_column_value_is_empty(column_value):
    return True if (column_value is None or column_value == "") else False

In [None]:
def check_time_format(row, column_name):
    value = get_column_value(row[column_name])
    if not len(str(get_column_value(row[column_name]))) == 4:
        final_result.append(('Flights', 'Validity', column_name, "Failed", f"Time format for {row['TailNum']}/{row['FlightNum']}:  {value}"))
    else:
        if not 0 <= value <= 2359:
            final_result.append(('Flights', 'Validity', column_name, "Failed", f"Time format for {row['TailNum']}/{row['FlightNum']}:  {value}"))

In [None]:
### Airports: Consistency check for state
def check_consistency():
    parquet_files = ["part-00000-a9aee747-6f56-4317-bf6b-075fe3b3ed5f-c000.snappy.parquet",
                     "part-00001-a9aee747-6f56-4317-bf6b-075fe3b3ed5f-c000.snappy.parquet"]

    my_df = read_parquet_files(parquet_files)

    valid_states = [
        "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA",
        "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
        "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT",
        "VA", "WA", "WV", "WI", "WY", "DC"]
    inconsistency = []

    for index, row in my_df.iterrows():
        country = row["country"]
        state = row["state"]
        if country == "USA" and state in valid_states:
            continue
        else:
            if state not in inconsistency and country != "":
                final_result.append(['Airports', 'Consistency', 'State', 'Failed', state])
                inconsistency.append(state)


In [None]:
### Flights: Validity by time range
def check_validity():
    parquet_files = ["part-00000-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet",
                         "part-00001-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet"]
    
    my_df = read_parquet_files(parquet_files)
    
    for index,row in my_df.iterrows():
        check_time_format(row, "ArrTime")
        check_time_format(row, "CRSArrTime")
        check_time_format(row, "DepTime")
        check_time_format(row, "CRSDepTime")

In [None]:
### Flights: Consisntency check for ActualElapsedTime
def check_consistency_for_ActualElapsedTime():
    parquet_files = ["part-00000-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet",
                     "part-00001-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet"]

    my_df = read_parquet_files(parquet_files)

    for index, row in my_df.iterrows():
        actual_elapsed_time = get_column_value(row["ActualElapsedTime"])
        arrival = get_column_value(row["ArrTime"])
        departure = get_column_value(row["DepTime"])
        expected_diff = arrival - departure
        if actual_elapsed_time != expected_diff:
            flightnum = row["FlightNum"]
            plane = row["TailNum"]
            final_result.append(('Flights', 'Consistency', "ActualElapsedTime", "Failed", f"{plane}/{flightnum}: actual = {actual_elapsed_time}, expected = {expected_diff}"))

In [None]:
### Carrier: Completeness by records
def check_completeness():
    parquet_files = ["part-00000-366e67ad-4fd6-41cd-af99-1ff7b3e314db-c000.snappy.parquet",
                     "part-00001-366e67ad-4fd6-41cd-af99-1ff7b3e314db-c000.snappy.parquet"]

    my_df = read_parquet_files(parquet_files)

    for index,row in my_df.iterrows():
        if check_column_value_is_empty(row["code"]) or check_column_value_is_empty(row["description"]):
            final_result.append(["Carriers", "Completeness", ["code", "description"], "Failed", f"code = {row['code']} description = {row['description']}"])

In [None]:
check_consistency()
check_validity()
check_consistency_for_ActualElapsedTime()
check_completeness()

In [None]:
df = pd.DataFrame(final_result, columns=["Table", "DQ check", "Column", "Status", "Bad Data"])

pd.set_option('display.max_rows', df.shape[0]+1)
df.index = np.arange(1, len(df) + 1)

df