In [1]:
# import the necessary modules
import os
import pandas as pd
import numpy as np
import pyarrow.parquet as pq

In [2]:
# data from csv and parquet
root_path = os.path.expanduser('~')
airports = pd.read_csv(os.path.join(root_path, 'source', 'airports.csv'))
airports_parquet = pd.read_parquet(os.path.join(root_path, 'raw', 'airports'))
airports.columns = airports.columns.str.title()
airports_parquet.columns = airports_parquet.columns.str.title()
carriers = pd.read_csv(os.path.join(root_path, 'source', 'carriers.csv'))
flights = pd.read_csv(os.path.join(root_path, 'source', 'flights.csv'))

In [3]:
# create result list for checks
result_list = []
column_result_list = ['Table', 'DQ_check', 'Column', 'Status', 'Bad_Data']

### Airports: Completeness by sum (for numeric columns)

In [4]:
airports.head()

Unnamed: 0,Iata,Airport,City,State,Country,Lat,Long
0,00M,Thigpen,Bay Springs,MS,USA,31.953765,-89.234505
1,00R,Livingston Municipal,Livingston,TX,USA,30.685861,-95.017928
2,00V,Meadow Lake,Colorado Springs,CO,USA,38.945749,-104.569893
3,01G,Perry-Warsaw,Perry,NY,USA,42.741347,-78.052081
4,01J,Hilliard Airpark,Hilliard,FL,USA,30.688012,-81.905944


In [5]:
#airports.dtypes
airports_parquet.head()

Unnamed: 0,Airport,City,Country,Iata,Lat,Longt,State
0,Thigpen,Bay Springs,USA,00M,31.95376472,-89.23450472,MS
1,Livingston Municipal,Livingston,USA,00R,30.68586111,-95.01792778,TX
2,Meadow Lake,Colorado Springs,USA,00V,38.94574889,-104.5698933,CO
3,Perry-Warsaw,Perry,USA,01G,42.74134667,-78.05208056,NY
4,Hilliard Airpark,Hilliard,USA,01J,30.6880125,-81.90594389,FL


In [6]:
# Airports: Completeness by sum (for numeric columns).
# print(airports['Lat'].sum())
# print(airports_parquet['Lat'].sum())
def completeness_by_sum(df, df_parquet, column):
    res =[]
    if column in df_parquet:     
        df_parquet[column] = pd.to_numeric(df_parquet[column])
        df_diff = df[column].sum() - df_parquet[column].sum()          
        result_list.append(['airports' ,
                       'Completeness',
                       column,
                       'Passed' if df_diff == 0 else 'Failed',
                       'No bad data.' if df_diff == 0 else f'Difference between parquet and csv files - sum by column {column} is  {df_diff }.'])
    else:
        result_list.append(['airports' ,
                       'Completeness',
                       column,
                        'Failed',
                       f'Column {column} is not present in parquet file'])   

In [7]:

result = completeness_by_sum(airports, airports_parquet, 'Lat')
result = completeness_by_sum(airports, airports_parquet, 'Long')
print(result_list)

[['airports', 'Completeness', 'Lat', 'Passed', 'No bad data.'], ['airports', 'Completeness', 'Long', 'Failed', 'Column Long is not present in parquet file']]


### Carrier: Uniqueness by PK

In [8]:
carriers.head()

Unnamed: 0,Code,Description
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.


In [9]:
# Carrier: Uniqueness by PK
def uniqueness(df, columns):
    duplicates = df[df.duplicated(subset=columns, keep=False)]
    bad_data = duplicates[columns]
    if len(duplicates)>0:     
        result_list.append(['carriers' ,
                           'uniqueness',
                           columns,
                            'Failed',
                           f'There are {len(duplicates)} duplicated rows: {bad_data.head()}',])         
    else:
        result_list.append(['carriers' ,
                           'uniqueness',
                           columns,
                           'Passed',
                           'There are NO duplicated rows']) 

In [10]:
result = uniqueness(carriers, ['Code', 'Description'])
result = uniqueness(airports, ['Iata','City', 'Airport'])
result = uniqueness(airports, ['Iata'])
result = uniqueness(flights, ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'])
print(result_list)

[['airports', 'Completeness', 'Lat', 'Passed', 'No bad data.'], ['airports', 'Completeness', 'Long', 'Failed', 'Column Long is not present in parquet file'], ['carriers', 'uniqueness', ['Code', 'Description'], 'Failed', 'There are 7 duplicated rows:      Code          Description\n4     07Q  Flair Airlines Ltd.\n6     0BQ                  DCA\n1494  ZUQ  Zuliana De Aviacion\n1501  07Q  Flair Airlines Ltd.\n1502  0BQ                  DCA'], ['carriers', 'uniqueness', ['Iata', 'City', 'Airport'], 'Failed', 'There are 2 duplicated rows:      Iata      City   Airport\n3368  Z09  Kasigluk  Kasigluk\n3386  Z09  Kasigluk  Kasigluk'], ['carriers', 'uniqueness', ['Iata'], 'Failed', 'There are 14 duplicated rows:   Iata\n0  00M\n1  00R\n2  00V\n8  NaN\n9  NaN'], ['carriers', 'uniqueness', ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'], 'Failed', 'There are 24 duplicated rows:       Year  Month  DayofMonth  DepTime  FlightNum\n0   2008.0    5.0         1.0    859.0     1755.0\n1   2008.0

### Flights: Validity by Time Range for ArrTime and ArrDepTime

In [11]:
flights.head()
# print(len(flights))

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,2008.0,5.0,1.0,4.0,859.0,905.0,1025.0,1030.0,AA,1755.0,...,5.0,10.0,0.0,,0.0,,,8.0,,
1,2008.0,5.0,1.0,4.0,859.0,905.0,1025.0,1030.0,AA,1755.0,...,5.0,10.0,0.0,,0.0,,5.0,,,
2,2008.0,5.0,1.0,4.0,859.0,905.0,1025.0,1030.0,AA,1856.0,...,5.0,10.0,0.0,,0.0,3.0,,,,
3,2008.0,5.0,2.0,5.0,901.0,905.0,1104.0,1030.0,AA,1755.0,...,7.0,35.0,0.0,,0.0,0.0,0.0,34.0,0.0,0.0
4,2008.0,5.0,3.0,6.0,901.0,905.0,1034.0,1030.0,AA,1755.0,...,5.0,17.0,0.0,,0.0,,,,,


In [12]:
# Flights: Validity by Time Range for ArrTime and ArrDepTime
def range_column(df, column, value_min, value_max):
    df_range = df[(df[column] >value_max) | (df[column] < value_min) | (df[column].isnull())]
    bad_data = df_range[[column]]
    if len(df_range)>0:     
        result_list.append(['flights' ,
                           'validity',
                           column,
                            'Failed',
                           f'Number of rows with NO VALID column "{column}"  :  {len(df_range)}: {bad_data.head()}' ])                                     
    else:
        result_list.append(['flights' ,
                           'validity',
                           column,
                            'Passed',
                           'All rows are valid in range'])                             

In [13]:
result = range_column(flights,'ArrTime', 0, 2359)
result = range_column(flights,'DepTime', 0, 2359)
result = range_column(flights,'CRSArrTime', 0, 2359)
result = range_column(flights,'CRSDepTime', 0, 2359)
print(result_list)

[['airports', 'Completeness', 'Lat', 'Passed', 'No bad data.'], ['airports', 'Completeness', 'Long', 'Failed', 'Column Long is not present in parquet file'], ['carriers', 'uniqueness', ['Code', 'Description'], 'Failed', 'There are 7 duplicated rows:      Code          Description\n4     07Q  Flair Airlines Ltd.\n6     0BQ                  DCA\n1494  ZUQ  Zuliana De Aviacion\n1501  07Q  Flair Airlines Ltd.\n1502  0BQ                  DCA'], ['carriers', 'uniqueness', ['Iata', 'City', 'Airport'], 'Failed', 'There are 2 duplicated rows:      Iata      City   Airport\n3368  Z09  Kasigluk  Kasigluk\n3386  Z09  Kasigluk  Kasigluk'], ['carriers', 'uniqueness', ['Iata'], 'Failed', 'There are 14 duplicated rows:   Iata\n0  00M\n1  00R\n2  00V\n8  NaN\n9  NaN'], ['carriers', 'uniqueness', ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'], 'Failed', 'There are 24 duplicated rows:       Year  Month  DayofMonth  DepTime  FlightNum\n0   2008.0    5.0         1.0    859.0     1755.0\n1   2008.0

### Flights: Consistency check for CancellationCode

In [14]:
# Check:
# NULL if Cancelled = 0
# A/B/C if Cancelled = 1
def consistency_CancellationCode(df, column):
    cancelled_flights = df[(df["Cancelled"] ==0)]
    bad_data = cancelled_flights.CancellationCode.unique()
    if len(cancelled_flights.CancellationCode.value_counts ()) > 1:     
        result_list.append(['flights' ,
                           'consistency',
                           column,
                            'Failed',
                           f' Consinstency check is failed. if Cancelled = 0 then CancellationCode  can be not only NULL. Possible values:  {bad_data}' ])         
    else:
        result_list.append(['flights' ,
                           'consistency',
                           column,
                            'Passed',
                           'consistency check CancellationCode is passed'])                           

In [15]:
result = consistency_CancellationCode(flights, 'CancellationCode')
print(result_list)

[['airports', 'Completeness', 'Lat', 'Passed', 'No bad data.'], ['airports', 'Completeness', 'Long', 'Failed', 'Column Long is not present in parquet file'], ['carriers', 'uniqueness', ['Code', 'Description'], 'Failed', 'There are 7 duplicated rows:      Code          Description\n4     07Q  Flair Airlines Ltd.\n6     0BQ                  DCA\n1494  ZUQ  Zuliana De Aviacion\n1501  07Q  Flair Airlines Ltd.\n1502  0BQ                  DCA'], ['carriers', 'uniqueness', ['Iata', 'City', 'Airport'], 'Failed', 'There are 2 duplicated rows:      Iata      City   Airport\n3368  Z09  Kasigluk  Kasigluk\n3386  Z09  Kasigluk  Kasigluk'], ['carriers', 'uniqueness', ['Iata'], 'Failed', 'There are 14 duplicated rows:   Iata\n0  00M\n1  00R\n2  00V\n8  NaN\n9  NaN'], ['carriers', 'uniqueness', ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum'], 'Failed', 'There are 24 duplicated rows:       Year  Month  DayofMonth  DepTime  FlightNum\n0   2008.0    5.0         1.0    859.0     1755.0\n1   2008.0

##### REPORT

In [16]:
pd.set_option('display.max_colwidth', None)
df_result = pd.DataFrame (result_list, columns = column_result_list)
df_result

Unnamed: 0,Table,DQ_check,Column,Status,Bad_Data
0,airports,Completeness,Lat,Passed,No bad data.
1,airports,Completeness,Long,Failed,Column Long is not present in parquet file
2,carriers,uniqueness,"[Code, Description]",Failed,There are 7 duplicated rows: Code Description\n4 07Q Flair Airlines Ltd.\n6 0BQ DCA\n1494 ZUQ Zuliana De Aviacion\n1501 07Q Flair Airlines Ltd.\n1502 0BQ DCA
3,carriers,uniqueness,"[Iata, City, Airport]",Failed,There are 2 duplicated rows: Iata City Airport\n3368 Z09 Kasigluk Kasigluk\n3386 Z09 Kasigluk Kasigluk
4,carriers,uniqueness,[Iata],Failed,There are 14 duplicated rows: Iata\n0 00M\n1 00R\n2 00V\n8 NaN\n9 NaN
5,carriers,uniqueness,"[Year, Month, DayofMonth, DepTime, FlightNum]",Failed,There are 24 duplicated rows: Year Month DayofMonth DepTime FlightNum\n0 2008.0 5.0 1.0 859.0 1755.0\n1 2008.0 5.0 1.0 859.0 1755.0\n15 2008.0 7.0 15.0 1726.0 401.0\n16 2008.0 7.0 15.0 1726.0 401.0\n17 2008.0 7.0 15.0 1726.0 401.0
6,flights,validity,ArrTime,Failed,"Number of rows with NO VALID column ""ArrTime"" : 8: ArrTime\n61 2926.0\n187 NaN\n382 NaN\n408 NaN\n410 NaN"
7,flights,validity,DepTime,Failed,"Number of rows with NO VALID column ""DepTime"" : 8: DepTime\n59 2500.0\n187 NaN\n382 NaN\n408 NaN\n410 NaN"
8,flights,validity,CRSArrTime,Failed,"Number of rows with NO VALID column ""CRSArrTime"" : 2: CRSArrTime\n513 NaN\n521 NaN"
9,flights,validity,CRSDepTime,Failed,"Number of rows with NO VALID column ""CRSDepTime"" : 2: CRSDepTime\n513 NaN\n519 NaN"


In [17]:
# csv report 
df_result.to_csv(os.path.join(root_path, 'test_result.csv'))

In [18]:
# html report
html_report = df_result.to_html()
  
# write html to file
text_file = open("html_report.html", "w")
text_file.write(html_report)
text_file.close()