# Checkpoint 4 - Data Cleaning and ETL Process

### Import Packages

In [23]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

ModuleNotFoundError: No module named 'psycopg2'

In [24]:
pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.6-cp39-cp39-macosx_10_9_x86_64.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.6
Note: you may need to restart the kernel to use updated packages.


### Import Dataset

In [12]:
# import dataset 
flights_2018 = pd.read_csv('Combined_Flights_2018.csv')
flights_2019 = pd.read_csv('Combined_Flights_2019.csv')
flights_2020 = pd.read_csv('Combined_Flights_2020.csv')
flights_2021 = pd.read_csv('Combined_Flights_2021.csv')
flights_2022 = pd.read_csv('Combined_Flights_2022.csv')

# combine dataset 
df = pd.concat([flights_2018, flights_2019,flights_2020,flights_2021,flights_2022])

# inspect dataset 
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 29193782 entries, 0 to 4078317
Data columns (total 61 columns):
 #   Column                                   Dtype  
---  ------                                   -----  
 0   FlightDate                               object 
 1   Airline                                  object 
 2   Origin                                   object 
 3   Dest                                     object 
 4   Cancelled                                bool   
 5   Diverted                                 bool   
 6   CRSDepTime                               int64  
 7   DepTime                                  float64
 8   DepDelayMinutes                          float64
 9   DepDelay                                 float64
 10  ArrTime                                  float64
 11  ArrDelayMinutes                          float64
 12  AirTime                                  float64
 13  CRSElapsedTime                           float64
 14  ActualElapsedTime

In [13]:
# inspect dataset 
df.head

<bound method NDFrame.head of          FlightDate            Airline Origin Dest  Cancelled  Diverted  \
0        2018-01-23  Endeavor Air Inc.    ABY  ATL      False     False   
1        2018-01-24  Endeavor Air Inc.    ABY  ATL      False     False   
2        2018-01-25  Endeavor Air Inc.    ABY  ATL      False     False   
3        2018-01-26  Endeavor Air Inc.    ABY  ATL      False     False   
4        2018-01-27  Endeavor Air Inc.    ABY  ATL      False     False   
...             ...                ...    ...  ...        ...       ...   
4078313  2022-03-31  Republic Airlines    MSY  EWR      False      True   
4078314  2022-03-17  Republic Airlines    CLT  EWR       True     False   
4078315  2022-03-08  Republic Airlines    ALB  ORD      False     False   
4078316  2022-03-25  Republic Airlines    EWR  PIT      False      True   
4078317  2022-03-07  Republic Airlines    EWR  RDU      False      True   

         CRSDepTime  DepTime  DepDelayMinutes  DepDelay  ...  WheelsO

In [16]:
# check for null value
null_count_by_column = df.isnull().sum()
for col, count in null_count_by_column.items():
    if count > 0:
        print(f"Column '{col}' has {count} null values")

Column 'DepTime' has 761652 null values
Column 'DepDelayMinutes' has 763084 null values
Column 'DepDelay' has 763084 null values
Column 'ArrTime' has 786177 null values
Column 'ArrDelayMinutes' has 846183 null values
Column 'AirTime' has 852561 null values
Column 'CRSElapsedTime' has 22 null values
Column 'ActualElapsedTime' has 845637 null values
Column 'Tail_Number' has 267613 null values
Column 'DepDel15' has 763084 null values
Column 'DepartureDelayGroups' has 763084 null values
Column 'TaxiOut' has 780561 null values
Column 'WheelsOff' has 780551 null values
Column 'WheelsOn' has 793133 null values
Column 'TaxiIn' has 793143 null values
Column 'ArrDelay' has 846183 null values
Column 'ArrDel15' has 846183 null values
Column 'ArrivalDelayGroups' has 846183 null values
Column 'DivAirportLandings' has 90 null values


In [15]:
# drop na
#df = df.dropna(subset=['ArrDelay'])

# check for null value
#null_count_by_column = df.isnull().sum()
#for col, count in null_count_by_column.items():
#    if count > 0:
#        print(f"Column '{col}' has {count} null values")

### Create Table

In [26]:
# Define the connection parameters
host = "localhost"
port = 5432
dbname = "5310teamproject"
user = "postgres"
password = "123"

# Create a connection string in the required format
conn_url = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"

# Create an engine that connects to the PostgreSQL server
engine = create_engine(conn_url)

# Establish a connection
connection = engine.connect()

In [27]:
# Pass the SQL statements that create all tables
CreatTableCMD = """
CREATE TABLE airline (
  company_code 		VARCHAR(55) PRIMARY KEY,
  company_name 		VARCHAR(50) NOT NULL,
  DOT_ID			INT NOT NULL
);

CREATE TABLE code_share (
  operated_or_branded_code_share_partners 		VARCHAR(2) NOT NULL,
  marketing_airline 				VARCHAR(2) NOT NULL,
  PRIMARY KEY (operated_or_branded_code_share_partners),
  FOREIGN KEY (marketing_airline) REFERENCES airline(company_code)
);

CREATE TABLE state (
  state_code 		VARCHAR(2) PRIMARY KEY,
  state_name 	VARCHAR(50) NOT NULL,
  state_fips 	INT NOT NULL,
  wac 			INT
);

CREATE TABLE city (
  city_ID 		INT PRIMARY KEY,
  cityName 		VARCHAR(50) NOT NULL,
  state_code 		VARCHAR(2) NOT NULL,
  cityMarketID 	INT NOT NULL,
  FOREIGN KEY (state_code) REFERENCES state(state_code)
);


CREATE TABLE date_info (
  flightdate 	DATE PRIMARY KEY,
  year 			INT NOT NULL,
  quarter 		INT NOT NULL,
  month 		INT NOT NULL,
  dayofmonth 	INT NOT NULL,
  dayofweek 	INT NOT NULL
);


CREATE TABLE airport (
  airport_CODE 	VARCHAR(3) PRIMARY KEY,
  airport_ID 	INT NOT NULL,
  airportseq_ID 	INT NOT NULL,
  city_id 		INT NOT NULL,
  FOREIGN KEY (city_id) REFERENCES city(city_id)
);


CREATE TABLE route (
  route_ID 	INT PRIMARY KEY,
  origin 	VARCHAR(3) NOT NULL,
  dest 		VARCHAR(3) NOT NULL,
  distance 	INT NOT NULL,
   FOREIGN KEY (origin) REFERENCES airport(airport_code),
   FOREIGN KEY (dest) REFERENCES airport(airport_code)
);

CREATE TABLE flight (
  flightdate 						DATE NOT NULL,
  operating_airline 					VARCHAR(2) NOT NULL,
  flight_number_operating_airline		INT NOT NULL,
  route_ID 								INT NOT NULL,
  cancelled 								BOOLEAN NOT NULL,
  diverted 									BOOLEAN NOT NULL,
  CRSdepTime 								INT NOT NULL,
  depTime 									INT ,
  dep_delay_minutes 						INT ,
  dep_delay 									INT ,
  arr_time 										INT ,
  arr_delay_minutes 								INT ,
  air_time 										INT ,
  CRS_elapsed_time 								INT ,
  actual_elapsed_time 							INT ,
  operated_or_branded_code_share_partners 		VARCHAR(55),
  PRIMARY KEY (flightDate,operating_airline,flight_number_operating_airline,route_ID),
  FOREIGN KEY (flightdate) REFERENCES date_info(flightdate),
  FOREIGN KEY (operating_airline) REFERENCES airline(company_code),
  FOREIGN KEY (route_ID) REFERENCES route(route_ID),
  FOREIGN KEY (operated_or_branded_code_share_partners) REFERENCES Code_share(Operated_or_Branded_Code_Share_Partners)
);
"""

# Execute the statement to create tables
connection.execute(CreatTableCMD)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fb7cef81fa0>

#### --- airlines ---

In [28]:
# Rename the columns 'Operating_Airline', 'Airline', and 'DOT_ID_Operating_Airline' to 'company_code', 'company_name', and 'DOT_ID', respectively
df = df.rename(columns={'Operating_Airline': 'company_code', 'Airline': 'company_name', 'DOT_ID_Operating_Airline': 'dot_id'})

# Select only the renamed columns 'company_code', 'company_name', and 'DOT_ID' using double square brackets
airline_df = df[['company_code', 'company_name', 'dot_id']]

# Create a new DataFrame with unique values of the 'company_code' column from the 'airline_df' DataFrame
temp_airline_df = pd.DataFrame(airline_df.company_code.unique(), columns=['company_code'])

# Merge the new DataFrame with the 'airline_df' DataFrame to get the corresponding 'company_name' and 'DOT_ID' values for each unique 'company_code'
temp_airline_df = temp_airline_df.merge(airline_df.drop_duplicates(subset='company_code'), on='company_code')

# Print the first five rows of the resulting DataFrame
print(temp_airline_df.head())

  company_code              company_name  dot_id
0           9E         Endeavor Air Inc.   20363
1           B6           JetBlue Airways   20409
2           EV  ExpressJet Airlines Inc.   20366
3           G4             Allegiant Air   20368
4           HA    Hawaiian Airlines Inc.   19690


In [29]:
# Write the contents of the 'temp_airline_df' DataFrame to a SQL database table named 'airline'
temp_airline_df.to_sql(name='airline', con=engine, if_exists='append', index=False)

28

#### --- code_share ---

In [30]:
# Rename the columns 'Marketing_Airline_Network' and 'Operated_or_Branded_Code_Share_Partners'
df = df.rename(columns={'Marketing_Airline_Network': 'marketing_airline', 'Operated_or_Branded_Code_Share_Partners': 'operated_or_branded_code_share_partners'})

# Select only the columns 'operated_or_branded_code_share_partners' and 'marketing_airline'
code_share_df = df[['operated_or_branded_code_share_partners', 'marketing_airline']]

# Create a new DataFrame with unique values of the 'operated_or_branded_code_share_partners' column from the 'code_share_df' DataFrame
temp_code_share_df = pd.DataFrame(code_share_df.operated_or_branded_code_share_partners.unique(), columns=['operated_or_branded_code_share_partners'])

# Merge the new DataFrame with the 'code_share_df' DataFrame to get the corresponding 'marketing_airline' values for each unique 'operated_or_branded_code_share_partners'
temp_code_share_df = temp_code_share_df.merge(code_share_df.drop_duplicates(subset='operated_or_branded_code_share_partners'), on='operated_or_branded_code_share_partners')

# Print the first five rows of the resulting DataFrame
print(temp_code_share_df.head())

  operated_or_branded_code_share_partners marketing_airline
0                            DL_CODESHARE                DL
1                                      B6                B6
2                            UA_CODESHARE                UA
3                                      G4                G4
4                                      HA                HA


In [31]:
# Write the contents of the 'temp_code_share_df' DataFrame to a SQL database table named 'code_share'
temp_code_share_df.to_sql(name='code_share', con=engine, if_exists='append', index=False)

DataError: (psycopg2.errors.StringDataRightTruncation) value too long for type character varying(2)

[SQL: INSERT INTO code_share (operated_or_branded_code_share_partners, marketing_airline) VALUES (%(operated_or_branded_code_share_partners)s, %(marketing_airline)s)]
[parameters: ({'operated_or_branded_code_share_partners': 'DL_CODESHARE', 'marketing_airline': 'DL'}, {'operated_or_branded_code_share_partners': 'B6', 'marketing_airline': 'B6'}, {'operated_or_branded_code_share_partners': 'UA_CODESHARE', 'marketing_airline': 'UA'}, {'operated_or_branded_code_share_partners': 'G4', 'marketing_airline': 'G4'}, {'operated_or_branded_code_share_partners': 'HA', 'marketing_airline': 'HA'}, {'operated_or_branded_code_share_partners': 'NK', 'marketing_airline': 'NK'}, {'operated_or_branded_code_share_partners': 'AS_CODESHARE', 'marketing_airline': 'AS'}, {'operated_or_branded_code_share_partners': 'F9', 'marketing_airline': 'F9'}  ... displaying 10 of 16 total bound parameter sets ...  {'operated_or_branded_code_share_partners': 'AA_CODESHARE', 'marketing_airline': 'AA'}, {'operated_or_branded_code_share_partners': 'AA', 'marketing_airline': 'AA'})]
(Background on this error at: https://sqlalche.me/e/14/9h9h)

#### --- state ---

In [None]:
# Rename the columns 'OriginState' to 'state_code', 'OriginStateName' to 'state_name', 'OriginStateFips' to 'state_fips', and 'OriginWac' to 'wac'
df = df.rename(columns={'OriginState': 'state_code', 'OriginStateName': 'state_name', 'OriginStateFips': 'state_fips', 'OriginWac': 'wac'})

# Select only the columns 'state_code', 'state_name', 'state_fips', and 'wac'
state_df = df[['state_code', 'state_name', 'state_fips', 'wac']]

# Select the unique values of the 'state_code' column from the 'state_df' DataFrame and create a new DataFrame 'temp_state_df'
temp_state_df = pd.DataFrame(state_df.state_code.unique(), columns=['state_code'])

# Merge the unique values with the original 'state_df' DataFrame based on the 'state_code' column
temp_state_df = temp_state_df.merge(state_df.drop_duplicates(subset='state_code'), on='state_code')

# Print the resulting DataFrame
print(temp_state_df.head())

In [None]:
# Write the contents of the 'temp_state_df' DataFrame to a SQL database table named 'state'
temp_state_df.to_sql(name='state', con=engine, if_exists='append', index=False)

#### --- city ---

In [None]:
# Rename the columns 'OriginCityName' to 'cityName' and 'OriginCityMarketID' to 'cityMarketID'
df = df.rename(columns={'OriginCityName': 'cityname', 'OriginCityMarketID': 'cityMarketID'})

# Select only the columns 'cityName', 'state_code', and 'cityMarketID'
city_df = df[['cityname', 'state_code', 'cityMarketID']]

# Select the unique values from the 'cityName' column of the 'city_df' DataFrame
temp_city_df = pd.DataFrame(city_df.cityname.unique(), columns=['cityname'])

# Add a new 'city_id' column with a sequence of integers starting from 1
temp_city_df.insert(0, 'city_id', range(1, 1 + len(temp_city_df)))

# Merge 'temp_city_df' with 'city_df' on the 'cityname' column
temp_city_df = temp_city_df.merge(city_df.drop_duplicates(subset='cityname'), on='cityname')

# Print the resulting DataFrame
print(temp_city_df.head())

In [None]:
# adding id back to df
city_id_list = [temp_city_df.city_id[temp_city_df.cityname == i].values[0] for i in df.cityname]

df.insert(5, 'city_id', city_id_list)

In [None]:
# Split the 'cityName' column by a comma delimiter and keep only the city name
temp_city_df['cityname'] = temp_city_df['cityname'].str.split(',').str.get(0)

# Print the resulting DataFrame
print(temp_city_df.head())

In [None]:
# Fix error, rename the wrong column
temp_city_df = temp_city_df.rename(columns={'cityMarketID':'citymarketid'})

In [None]:
temp_city_df.to_sql(name='city', con=engine, if_exists='append', index=False)

#### --- date_info ---

In [24]:
# creating our date_info table 
date_info_df = df[['FlightDate', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek']]

# drop duplicate
date_info_df = date_info_df.drop_duplicates().reset_index(drop=True)

# rename
date_info_df = date_info_df.rename(columns={
    'FlightDate': 'flightdate',
    'Year': 'year',
    'Quarter': 'quarter',
    'Month': 'month',
    'DayofMonth': 'dayofmonth',
    'DayOfWeek': 'dayofweek'
})

date_info_df.head()

Unnamed: 0,flightdate,year,quarter,month,dayofmonth,dayofweek
0,2018-01-23,2018,1,1,23,2
1,2018-01-24,2018,1,1,24,3
2,2018-01-25,2018,1,1,25,4
3,2018-01-26,2018,1,1,26,5
4,2018-01-27,2018,1,1,27,6


In [25]:
# load the relation to sql
date_info_df.to_sql(name='date_info', con=engine, if_exists='append', index=False)

673

#### --- airport ---

In [31]:
# creating our airport table 
ori_df = df[['Origin', 'OriginAirportID', 'OriginAirportSeqID', 'city_id']]
ori_df = ori_df.rename(columns={'Origin': 'airport_CODE', 'OriginAirportID': 'airport_ID', 'OriginAirportSeqID': 'airportseq_ID'})

des_df = df[['Dest', 'DestAirportID', 'DestAirportSeqID', 'city_id']]
des_df = des_df.rename(columns={'Dest': 'airport_CODE', 'DestAirportID': 'airport_ID', 'DestAirportSeqID': 'airportseq_ID'})


airport_df = airport_df = pd.concat([ori_df, des_df])

# drop duplicate
airport_df = airport_df.drop_duplicates().reset_index(drop=True)

airport_df.head()

Unnamed: 0,airport_CODE,airport_ID,airportseq_ID,city_id
0,ABY,10146,1014602,"Albany, GA"
1,ATL,10397,1039707,"Atlanta, GA"
2,MOB,13422,1342202,"Mobile, AL"
3,BUF,10792,1079206,"Buffalo, NY"
4,BTV,10785,1078502,"Burlington, VT"


In [32]:
# load the relation to sql
airport_df.to_sql(name='airport', con=engine, if_exists='append', index=False)

ProgrammingError: (psycopg2.errors.UndefinedColumn) column "airport_CODE" of relation "airport" does not exist
LINE 1: INSERT INTO airport ("airport_CODE", "airport_ID", "airports...
                             ^

[SQL: INSERT INTO airport ("airport_CODE", "airport_ID", "airportseq_ID", city_id) VALUES (%(airport_CODE)s, %(airport_ID)s, %(airportseq_ID)s, %(city_id)s)]
[parameters: ({'airport_CODE': 'ABY', 'airport_ID': 10146, 'airportseq_ID': 1014602, 'city_id': 'Albany, GA'}, {'airport_CODE': 'ATL', 'airport_ID': 10397, 'airportseq_ID': 1039707, 'city_id': 'Atlanta, GA'}, {'airport_CODE': 'MOB', 'airport_ID': 13422, 'airportseq_ID': 1342202, 'city_id': 'Mobile, AL'}, {'airport_CODE': 'BUF', 'airport_ID': 10792, 'airportseq_ID': 1079206, 'city_id': 'Buffalo, NY'}, {'airport_CODE': 'BTV', 'airport_ID': 10785, 'airportseq_ID': 1078502, 'city_id': 'Burlington, VT'}, {'airport_CODE': 'CVG', 'airport_ID': 11193, 'airportseq_ID': 1119302, 'city_id': 'Cincinnati, OH'}, {'airport_CODE': 'LGA', 'airport_ID': 12953, 'airportseq_ID': 1295304, 'city_id': 'New York, NY'}, {'airport_CODE': 'CHO', 'airport_ID': 10990, 'airportseq_ID': 1099005, 'city_id': 'Charlottesville, VA'}  ... displaying 10 of 431 total bound parameter sets ...  {'airport_CODE': 'TBN', 'airport_ID': 15138, 'airportseq_ID': 1513802, 'city_id': 'Fort Leonard Wood, MO'}, {'airport_CODE': 'BIH', 'airport_ID': 10617, 'airportseq_ID': 1061705, 'city_id': 'Bishop, CA'})]
(Background on this error at: https://sqlalche.me/e/14/f405)

#### --- route ---

In [None]:
# creating our route table 
route_df = df[['Origin', 'Dest', 'Distance']]

route_df = route_df.rename(columns={'Origin': 'origin', 'Dest': 'dest', 'Distance': 'distance'})

route_df.head()

In [None]:
# adding route_id to our route table
route_df.insert(0, 'route_id', range(1, 1+len(route)))

route_df.head()

In [None]:
# load the relation to sql
route_df.to_sql(name='route', con=engine, if_exists='append', index=False)

In [None]:
# read id to to a seperate route_id_list
route_id_list = [route_df.route_id[route_df.route_id == i].values[0] for i in df.route_id]

# adding route_id to df
df.insert(5, 'route_id', route_id_list)

#### --- flight ---

In [None]:
# creating our flight table 
flight_df = df[['FlightDate', 'Operating_Airline', 'Flight_Number_Operating_Airline', 'route_ID', 'cancelled'
                , 'Diverted', 'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime', 'ArrDelayMinutes'
               , 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime', 'Operated_or_Branded_Code_Share_Partners  ']]

flight_df = flight_df.rename(columns={'FlightDate': 'flightdate', 'Operating_Airline': 'operating_airline', 
                                      'Flight_Number_Operating_Airline': 'flight_number_operating_airline', 
                                      'route_ID': 'route_ID', 'cancelled': 'cancelled', 'Diverted': 'diverted', 
                                      'CRSDepTime': 'CRSdepTime', 'DepTime': 'depTime', 
                                      'DepDelayMinutes': 'dep_delay_minutes', 'DepDelay': 'dep_delay', 
                                      'ArrTime': 'arr_time', 'ArrDelayMinutes': 'arr_delay_minutes',
                                      'AirTime': 'air_time', 'CRSElapsedTime': 'CRS_elapsed_time', 
                                      'ActualElapsedTime': 'actual_elapsed_time',
                                      'Operated_or_Branded_Code_Share_Partners': 'operated_or_branded_code_share_partners'})

flight_df.head()

In [None]:
# load the relation to sql
flight_df.to_sql(name='flight', con=engine, if_exists='append', index=False)