In [484]:
# Read source files
import pandas as pd

bucket = 'iskldl02-projectby-local-bucket'
bucket_path = 'gs://' + bucket
folder = '/source/'

flights = 'flights.csv'
airports = 'airports.csv'
carriers = 'carriers.csv'

source_flights = pd.read_csv(bucket_path+folder+flights, dtype=str)
source_airports = pd.read_csv(bucket_path+folder+airports, dtype=str)
source_carriers = pd.read_csv(bucket_path+folder+carriers, dtype=str)

source_flights.head()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2008,5,1,4,859,905,1025,1030,AA,1755,...,5,10,0,,0,,,8.0,,
1,2008,5,1,4,859,905,1025,1030,AA,1755,...,5,10,0,,0,,5.0,,,
2,2008,5,1,4,859,905,1025,1030,AA,1856,...,5,10,0,,0,3.0,,,,
3,2008,5,2,5,901,905,1104,1030,AA,1755,...,7,35,0,,0,0.0,0.0,34.0,0.0,0.0
4,2008,5,3,6,901,905,1034,1030,AA,1755,...,5,17,0,,0,,,,,


In [526]:
# Read target files
from google.cloud import storage

# sub_folders
flt_prefix = 'raw/flights'
apt_prefix = 'raw/airports'
carr_prefix = 'raw/carriers'

# lists to store parquet files
flt_files, apt_files, carr_files = [], [], []
# raw_flt, raw_apt, raw_carr = pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

parquet_files = {flt_prefix: [flt_files, 'raw_flt'], apt_prefix: [apt_files, 'raw_apt'], carr_prefix: [carr_files, 'raw_carr']}

client = storage.Client()

# read all files in bucket and extract parquet ones only. Add parquet files to respective lists, declared above
for prfx, folders in parquet_files.items():
    for blob in client.list_blobs(bucket, prefix=prfx):
        if 'parquet' in str(blob):
            folders[0].append(bucket_path + '/' + blob.name)

def concat_parquet_files(files_folder, df):
    df = pd.DataFrame()
    for file in files_folder:
        df = pd.concat([pd.read_parquet(file), df])
    return df

raw_apt = concat_parquet_files (apt_files, 'raw_apt')
raw_flt = concat_parquet_files (flt_files, 'raw_flt') 
raw_carr = concat_parquet_files (carr_files, 'raw_carr') 

raw_flt.head()

Unnamed: 0,ActualElapsedTime,AirTime,ArrDelay,ArrTime,CRSArrTime,CRSDepTime,CRSElapsedTime,CancellationCode,Cancelled,CarrierDelay,...,NASDelay,Origin,SecurityDelay,TailNum,TaxiIn,TaxiOut,UniqueCarrier,WeatherDelay,WeatherDelayStr,Year
0,61,44,11,2016,2005,1905,60,,0,0,...,0,LAS,0,N490WN,4,13,WN,0,,2008
1,66,44,65,1945,1840,1740,60,,0,5,...,6,LAS,0,N692SW,4,18,WN,0,0.0,2008
2,60,49,42,2307,2225,2125,60,,0,17,...,0,LAS,0,N646SW,3,8,WN,0,0.0,2008
3,61,48,34,1604,1530,1425,65,,0,9,...,0,LAS,0,N414WN,3,10,WN,0,0.0,2008
4,74,47,72,1727,1615,1515,60,,0,0,...,19,LAS,0,N405WN,3,24,WN,0,0.0,2008


In [486]:
# flights completeness: check all expected columns exist in target

expected_cols = ['Year', 'Month', 'DayofMonth', 'DayofWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'
]
target_cols = raw_flights.columns

# lower both lists for comparison
exp_lower = [col.lower() for col in expected_cols]
target_lower = [col.lower() for col in target_cols]

# compare columns in terms of upper/lower case:
for col in expected_cols:
    if col not in target_cols:
        print("WrongCase or Lost: "+col)

for col in target_cols:
    if col not in expected_cols:
        print("WrongCase or Unexpected: "+col)

print('---')

# compare columns ignoring the case:
for col in expected_cols:
    if col.lower() not in target_lower:
        print("Lost: "+col)

for col in target_cols:
    if col.lower() not in exp_lower:
        print("Unexpected: "+col)
        

WrongCase or Lost: DayofWeek
WrongCase or Unexpected: CarrierDelayStr
WrongCase or Unexpected: DayOfWeek
WrongCase or Unexpected: WeatherDelayStr
---
Unexpected: CarrierDelayStr
Unexpected: WeatherDelayStr


In [487]:
# flights completeness: 
# 1. check that all primary keys satisfying non-Null requirements are loaded from source to target
# 2. check that rows with NULL primary keys are not loaded to target

# find source rows where all key columns are non-Null
valid_source = source_flights[source_flights[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']].notnull().all(1)]

# find mismatches between valid source vs target (expected: 0 rows). _merge column: left_only means records got lost, right_only means invalid records got into target
mismatches = valid_source_data.merge(raw_flt, how='outer', on=['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'], indicator=True).query('_merge != "both"')
mismatches[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum', '_merge']].head(10)

Unnamed: 0,Year,Month,DayofMonth,DepTime,FlightNum,_merge
736,2008.0,1.0,3.0,,1146.0,right_only
737,2008.0,1.0,3.0,,469.0,right_only
738,2008.0,1.0,3.0,,618.0,right_only
739,2008.0,1.0,3.0,,2528.0,right_only
740,,,,,,right_only
741,,2.0,2.0,520.0,724.0,right_only
742,2008.0,,2.0,520.0,724.0,right_only
743,2008.0,2.0,,520.0,724.0,right_only
744,2008.0,2.0,2.0,,724.0,right_only
745,2008.0,2.0,2.0,1519.0,,right_only


In [488]:
# flights completeness: check that target columns where NULLs are not allowed don't have NULLs

# create df from raw_flights from only columns where no Null values are expected
expected_nonan = raw_flt.drop(["CancellationCode"], axis=1, inplace=False)

# from that new dataframe, create series with Nulls
nan_columns = expected_nonan.isnull().any(axis=1)

# select rows where any value is null
target_rows_with_nan = expected_nonan[nans]

# print out those rows: expected 0 rows
target_rows_with_nan.head(3)

Unnamed: 0,ActualElapsedTime,AirTime,ArrDelay,ArrTime,CRSArrTime,CRSDepTime,CRSElapsedTime,Cancelled,CarrierDelay,CarrierDelayStr,...,NASDelay,Origin,SecurityDelay,TailNum,TaxiIn,TaxiOut,UniqueCarrier,WeatherDelay,WeatherDelayStr,Year


In [527]:
# flights completeness: verify all as-is columns values for valid primary keys got loaded as is to target

# 1.Subset valid source rows with no expected data modification
valid_source_as_is = valid_source[['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'AirTime', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'
]]
# 2.Intersect valid as is from source with target
common = valid_source_as_is.merge(raw_flt[['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'AirTime', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']],
                                how='inner',suffixes=('_x', ''))
# 3.Find discrepancies
diff = valid_source_as_is.merge(common, how='left', indicator=True, suffixes=('_src', '_raw')).query('_merge=="left_only"')
diff.head() # expected: 0 rows

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,_merge
0,2008,5,1,4,859,905,1025,1030,AA,1755,...,5,10,0,0,,,8.0,,,left_only
1,2008,5,1,4,859,905,1025,1030,AA,1755,...,5,10,0,0,,5.0,,,,left_only
2,2008,5,1,4,859,905,1025,1030,AA,1856,...,5,10,0,0,3.0,,,,,left_only
4,2008,5,3,6,901,905,1034,1030,AA,1755,...,5,17,0,0,,,,,,left_only
5,2008,5,4,7,851,905,1019,1030,AA,1755,...,4,20,0,0,,,,,,left_only


In [489]:
# flights: Validity by Time Range for DepTime. Expected time range 0000-2359. Some flights appear to have departure time as 'NA' or empty (not null or NaN), so this needs to be clarified

valid_dep_time = raw_flt.loc[raw_flt['DepTime'].astype(str).str.isdigit()]

below_0_dep_time = valid_dep_time[pd.to_numeric(valid_dep_time['DepTime']) < 0]
invalid_minutes = valid_dep_time[pd.to_numeric(valid_dep_time['DepTime'].str[3:]) > 59]
invalid_hours = valid_dep_time[pd.to_numeric(valid_dep_time['DepTime'].str[-4:-2]) > 23]

print("Below 0:") 
print(below_0_dep_time[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])
print('---')
print("Invalid minutes:") 
print(invalid_minutes[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])
print('---')
print("Invalid hours:") 
print(invalid_hours[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])

Below 0:
Empty DataFrame
Columns: [Year, Month, DayofMonth, DepTime, FlightNum]
Index: []
---
Invalid minutes:
Empty DataFrame
Columns: [Year, Month, DayofMonth, DepTime, FlightNum]
Index: []
---
Invalid hours:
    Year Month DayofMonth DepTime FlightNum
59  2008    10         23    2500      3053


In [490]:
# Flights: Validity by Time Range for CRSDepTime: find non-digit CRSDepTime (expected: 0 rows)

invalid_csr_time = raw_flt.loc[~raw_flt['CRSDepTime'].astype(str).str.isdigit()]
invalid_csr_time.head(10)

Unnamed: 0,ActualElapsedTime,AirTime,ArrDelay,ArrTime,CRSArrTime,CRSDepTime,CRSElapsedTime,CancellationCode,Cancelled,CarrierDelay,...,NASDelay,Origin,SecurityDelay,TailNum,TaxiIn,TaxiOut,UniqueCarrier,WeatherDelay,WeatherDelayStr,Year
250,,,,,,,,,0,0,...,0,BTV,0,,4,13,,0,,
256,164.0,149.0,56.0,704.0,608.0,,188.0,,0,0,...,0,BQN,0,N634JB,6,9,B6,0,0.0,2008.0


In [491]:
# Flights: Validity by Time Range for CRSDepTime. Expected time range 0000-2359. Some flights appear to have departure time as  'NA' (not null or NaN), so this needs to be clarified

valid_csr_time = raw_flt.loc[raw_flt['CRSDepTime'].astype(str).str.isdigit()]

below_0_crs_time = valid_dep_time[pd.to_numeric(valid_dep_time['CRSDepTime']) < 0]
invalid_minutes = valid_dep_time[pd.to_numeric(valid_dep_time['CRSDepTime'].str[3:]) > 59]
invalid_hours = valid_dep_time[pd.to_numeric(valid_dep_time['CRSDepTime'].str[-4:-2]) > 23]

print("Below 0:") 
print(below_0_crs_time[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])
print('---')
print("Invalid minutes:") 
print(invalid_minutes[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])
print('---')
print("Invalid hours:") 
print(invalid_hours[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']])

Below 0:
Empty DataFrame
Columns: [Year, Month, DayofMonth, DepTime, FlightNum]
Index: []
---
Invalid minutes:
Empty DataFrame
Columns: [Year, Month, DayofMonth, DepTime, FlightNum]
Index: []
---
Invalid hours:
Empty DataFrame
Columns: [Year, Month, DayofMonth, DepTime, FlightNum]
Index: []


In [492]:
# Airports: Uniqueness by PK. PK = iata. We would expect each iata to have count of 1 row

iata_cnt = raw_apt.groupby('iata').size()  # count rows by iata
dups = iata_cnt[iata_cnt > 1]  # return dups dataframe where count of rows per iata > 1. Expected: 0 rows
dups.head(10)

iata
       2
00M    2
00R    2
00V    2
Z08    2
Z09    2
Z13    2
dtype: int64

In [493]:
# Flights: Consistency check for Unique Carrier

unexpected_codes = raw_flt.merge(raw_carr,how='left', left_on='UniqueCarrier', right_on='code', indicator=True).query('_merge == "left_only"')
unexpected_codes[['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum', 'UniqueCarrier', 'code']].head(10)

Unnamed: 0,Year,Month,DayofMonth,DepTime,FlightNum,UniqueCarrier,code
284,2008,5,5,918,1055,AXX,
285,2008,5,7,918,1055,AXX,
286,2008,5,8,918,1055,AXX,
303,2008,1,10,1425,5932,1B9,
304,2008,2,11,1428,5932,1B9,
305,2008,3,12,1430,5932,1B9,
306,2008,4,13,1425,5932,1B9,
307,2008,5,14,1428,5932,1B9,
308,2008,6,15,1430,5932,1B9,
