In [89]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

spark=SparkSession.builder.appName("Read data").getOrCreate()

bucket_path = 'gs://iskldl01-projectby-local-bucket'
source = 'source'
target= 'raw'

#Result structure:
result_schema = StructType(
    [
        StructField("Table", StringType(), True),
        StructField("DQ check", StringType(), True),
        StructField("Column", StringType(), True),
        StructField("Status", StringType(), True),
        StructField("Bad data", StringType(), True),
    ]
)

# Create an empty DataFrame with the specified schema
result_df = spark.createDataFrame([], result_schema)

In [90]:
#Create temporary tables from source data
src_airports=spark.read.option("header", "true").csv(f"{bucket_path}/{source}/airports.csv")
src_airports.createOrReplaceTempView("src_airports")

src_carriers=spark.read.option("header", "true").csv(f"{bucket_path}/{source}/carriers.csv")
src_carriers.createOrReplaceTempView("src_carriers")

src_flights=spark.read.option("header", "true").csv(f"{bucket_path}/{source}/flights.csv")
src_flights.createOrReplaceTempView("src_flights")

#Create temporary tables from raw data
raw_airports=spark.read.option("header", "true").parquet(f"{bucket_path}/{target}/airports")
raw_airports.createOrReplaceTempView("raw_airports")

raw_carriers=spark.read.option("header", "true").parquet(f"{bucket_path}/{target}/carriers")
raw_carriers.createOrReplaceTempView("raw_carriers")

raw_flights=spark.read.option("header", "true").parquet(f"{bucket_path}/{target}/flights")
raw_flights.createOrReplaceTempView("raw_flights")

In [91]:
#Check completeness regardless of the object
def ck_completeness(object_name):
    global result_df
    #Define column set
    #Ideally, this would be pulled directly from a DM, source names mapped to target, etc.
    column_list = []
    if object_name == 'carriers':
        column_list = ['Code', 'Description']
    elif object_name == 'flights':
        column_list = ['Year','Month','DayofMonth','DayofWeek','DepTime','CRSDepTime','ArrTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum','ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay','DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut','Cancelled','CancellationCode','Diverted','CarrierDelay','WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay']
    elif object_name == 'airports':
        column_list = ['iata','airport','city','state','country','lat','long']
    else:
        column_list = ['1']

    column_string = ','.join([f"NULLIF(TRIM({column}),'') as {column}" for column in column_list])

    #Select all from tables with the same order of columns
    src_rbr = spark.sql(f'select {column_string} from src_{object_name}')
    if object_name == 'airports':
        column_string_ra = column_string.replace('long','longt')
        raw_rbr = spark.sql(f'select {column_string_ra} from raw_{object_name}')
    else:
        raw_rbr = spark.sql(f'select {column_string} from raw_{object_name}')

    #Row-by-row check
    src_raw = src_rbr.exceptAll(raw_rbr)
    raw_src = raw_rbr.exceptAll(src_rbr)

    #Assessment of test execution
    if src_raw.count() == 0 and raw_src.count()==0 and src_rbr.count()==raw_rbr.count():
        Status = 'Passed'
        bad_data_list=''
    else:
        Status = 'Failed'
        bad_data_list = [[f"{src_raw.columns[i]}={row[i]}" for i in range(len(src_raw.columns))] for row in src_raw.collect()]
        # Add raw_src for reference
        bad_data_list += [[f"{raw_src.columns[i]}={row[i]}" for i in range(len(raw_src.columns))] for row in raw_src.collect()]


    # Create a DataFrame to store the result
    result_complete = [
        (
            object_name,
            'Completeness',
            'All',
            Status,
            bad_data_list,
        )
    ]
    res_to_add = spark.createDataFrame(result_complete, result_schema)

    # Append the data DataFrame to the empty_result_df
    result_df = result_df.union(res_to_add) 
    result_df.show()


In [93]:
#will pass if function only has PK columns - decided to include Row-by-row
ck_completeness('carriers')

                                                                                

+--------+------------+------+------+--------+
|   Table|    DQ check|Column|Status|Bad data|
+--------+------------+------+------+--------+
|carriers|Completeness|   All|Passed|        |
+--------+------------+------+------+--------+



In [94]:
def ck_consistency(object_name, field_name):
    global result_df
    try:
        if object_name == 'airports' and field_name == 'state':
            raw_st_cry = spark.sql(f"select iata, NULLIF(state,'') as state, country from raw_{object_name}")
            #List of US states
            us_states = ['AL', 'AK', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']
            bad_data_message = ''
            for row in raw_st_cry.collect():
                country = row['country']
                state = row['state']
                iata = row['iata']
                if country=='USA':
                    if state=='NA' or state=='':
                        bad_data_message += f'Should be a US state but is NULL. Iata: {iata},\n'
                    elif state not in us_states:
                        bad_data_message += f'State not in the US. Iata: {iata},\n'
                elif country != 'USA':
                    if state!='NA':
                        bad_data_message += f'Invalid state value. Iata: {iata},\n'

            if bad_data_message != '':
                Status='Failed'
            else:
                Status='Passed'


        elif object_name == 'flights' and field_name == 'CancellationCode':
            raw_data = spark.sql(f"select Year, Month, DayofMonth, FlightNum, Cancelled, CancellationCode from raw_{object_name}")
            accepted_vals = ['A','B','C']
            bad_data_message = ''
            for row in raw_data.collect():
                cancelled = row["Cancelled"]
                canc_code = row["CancellationCode"]
                if cancelled == '0':
                    if canc_code !='':
                        bad_data_message += f'Invalid value. Should be Null. Row: {row}'
                if cancelled == '1':
                    if canc_code not in accepted_vals:
                        bad_data_message += f'Invalid value. Should be A, B, or C. Row: {row}'

            if bad_data_message != '':
                Status='Failed'
            else:
                Status='Passed'


        elif object_name == 'flights' and field_name == 'CRSElapsedTime':
            raw_data = spark.sql(f"select Year, Month, DayofMonth, FlightNum, CRSArrTime, CRSDepTime, CRSElapsedTime from raw_{object_name}")
            bad_data_message = ''
            for row in raw_data.collect():
                try:
                    arr_time = int(row["CRSArrTime"])
                    dep_time = int(row["CRSDepTime"])
                    act_diff = int(row["CRSElapsedTime"])
                except Exception as e:
                    bad_data_message += f"An error occurred: {e}. Row: {row}"

                if arr_time > dep_time:
                    arr_min = arr_time//100*60 + arr_time%100
                    dep_min = dep_time//100*60 + dep_time%100
                    exp_diff = arr_min - dep_min
                elif arr_time < dep_time:
                    arr_min = (24-arr_time//100)*60
                    dep_min = dep_time//100*60 + dep_time%100
                    exp_diff = arr_min + dep_min

                test_diff = exp_diff - act_diff

                if test_diff != 0:
                    bad_data_message += f'Inconsitent value of CRSElapsedTime. Actual: {act_diff}, Expected: {exp_diff}. Row: {row}.'

            if bad_data_message != '':
                Status='Failed'
            else:
                Status='Passed'

        else:
            print('Not yet included in the test')

        # Create a DataFrame to store the result
        result_consist = [
            (
                object_name,
                'Consistency',
                field_name,
                Status,
                bad_data_message,
            )
        ]
        res_to_add = spark.createDataFrame(result_consist, result_schema)

        # Append the data DataFrame to the empty_result_df
        result_df = result_df.union(res_to_add) 
        result_df.show()
    
    except Exception as e:
        print(f'Not yet included in the test. {e}')

In [95]:
ck_consistency('flights', 'CancellationCode')
ck_consistency('flights', 'CRSElapsedTime')
ck_consistency('airports', 'state')
ck_consistency('airports', 'check_other')

+--------+------------+----------------+------+--------------------+
|   Table|    DQ check|          Column|Status|            Bad data|
+--------+------------+----------------+------+--------------------+
|carriers|Completeness|             All|Passed|                    |
| flights| Consistency|CancellationCode|Failed|Invalid value. Sh...|
+--------+------------+----------------+------+--------------------+

+--------+------------+----------------+------+--------------------+
|   Table|    DQ check|          Column|Status|            Bad data|
+--------+------------+----------------+------+--------------------+
|carriers|Completeness|             All|Passed|                    |
| flights| Consistency|CancellationCode|Failed|Invalid value. Sh...|
| flights| Consistency|  CRSElapsedTime|Failed|Inconsitent value...|
+--------+------------+----------------+------+--------------------+

+--------+------------+----------------+------+--------------------+
|   Table|    DQ check|         