# Set up the tools

In [4]:
# Import all necessary libraries
import pandas as pd
import numpy as np
import requests
from zipfile import *
from configdef import *
from sqlalchemy import exc #SQLAlchemy provides a nice “Pythonic” way of interacting with databases.
from sqlalchemy import event

# Establish db connection

# Get connection details from configdef file into a list
params = config(section='postgres')

# Use sql alchemy to create connection to database, which is contained within the engine object
engine = pg_engine_connection(**params)

# Cleans up unnecessary database connections
engine.dispose()

Postgres Database connection successful


In [6]:
engine.execute("SELECT user;").fetchall()

[('dauser',)]

### Setup in DBeaver

In [5]:
engine.execute("CREATE TABLE IF NOT EXISTS flights_cd_d AS SELECT * FROM flights AS f;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa103b196a0>

In [7]:
engine.execute("DELETE FROM flights_cd_d *;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa103acbc70>

In [9]:
engine.execute("ALTER TABLE flights_cd_d ADD COLUMN IF NOT EXISTS div1_airport VARCHAR;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa106214940>

In [11]:
engine.execute("ALTER TABLE flights_cd_d ADD COLUMN IF NOT EXISTS cancellation_code VARCHAR;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa103b731f0>

In [12]:
engine.execute("ALTER TABLE flights_cd_d ADD COLUMN IF NOT EXISTS weather_delay int;")

<sqlalchemy.engine.result.ResultProxy at 0x7fa103b195b0>

### Getting Data via CSV Download

This is essentially the method of Philipp.

In [16]:
def clean_airline_df(df):
    '''
    Transforms a df made from BTS csv file into a df that is ready to be uploaded to SQL
    Set rows=0 for no filtering
    '''

    # Build dataframe including only the columns you want to keep
    df_airline = df.loc[:,columns_to_keep]
     
    # Clean data types and NULLs
    df_airline['Year']= pd.to_numeric(df_airline['Year'], downcast='integer')
    df_airline['Month']= pd.to_numeric(df_airline['Month'], downcast='integer')
    df_airline['DayofMonth']= pd.to_numeric(df_airline['DayofMonth'], downcast='integer')
    df_airline['FlightDate']= pd.to_datetime(df_airline['FlightDate'], yearfirst=True)
    df_airline['CRSArrTime']= pd.to_numeric(df_airline['CRSArrTime'], downcast='integer', errors='coerce')
    df_airline['Cancelled']= pd.to_numeric(df_airline['Cancelled'], downcast='integer')
    df_airline['Diverted']= pd.to_numeric(df_airline['Diverted'], downcast='integer')
    df_airline['WeatherDelay']= pd.to_numeric(df_airline['WeatherDelay'], downcast='integer')
    
    # Rename columns
    df_airline.columns = new_column_names
    
    return df_airline


# Specify the airports you are interested in and put them as a list in the function.
def select_airport(df, airports):
    ''' Helper function for filtering airline df for a subset of airports'''
    df_out = df.loc[(df.origin.isin(airports)) | (df.dest.isin(airports))]
    return df_out


def airline_csv_to_sql(years=[2021], months=[2], path ='data/', table='flights_cd_d', airports=['ICT', 'DFW', 'LIT', 'OKC','ANC']):
    '''Downloads and unzips the flight data from BTS'''
    
    # Establish db connection
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    engine.dispose()

    # Loop through months
    for year in years:
        for month in months:
            # Get the file
            zip_file = f'On_Time_Reporting_Carrier_On_Time_Performance_1987_present_{year}_{month}.zip'
            csv_file = f'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_{year}_{month}.csv'
            url = (f'https://transtats.bts.gov/PREZIP/{zip_file}')
            arg = f' -P {path} --no-check-certificate'
            !wget {url}{arg}
            
            # unzip
            with ZipFile(path+zip_file, 'r') as zip_ref:
                zip_ref.extractall(path)
            
            # prepare df
            df_airline = pd.read_csv(path+csv_file, low_memory=False)
            df_airline = clean_airline_df(df_airline)
            
            # Select specific airports 
            if len(airports) > 0:
                df_airline = select_airport(df_airline, airports)

            # to SQL
            print(f'starting uploading {df_airline.shape[0]} rows from {year}-{month}')
            df_airline.to_sql(table, engine, index=False, if_exists="append", 
                  method='multi', chunksize=5000)
            print(f'done uploading {year}-{month}')

In [17]:
columns_to_keep = [
                'Year',
                'Month',
                'DayofMonth',
                'FlightDate',
                'DepTime',
                'CRSDepTime',
                'DepDelay',
                'ArrTime',
                'CRSArrTime',
                'ArrDelay',
                'Reporting_Airline',
                'Tail_Number',
                'Flight_Number_Reporting_Airline',
                'Origin',
                'Dest',
                'AirTime',
                'Distance',
                'Cancelled',
                'Diverted',
                'Div1Airport',
                'CancellationCode',
                'WeatherDelay'
]

# The columns in the DB have different naming as in the source csv files. Lets get the names from the DB
table_name_sql = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'flights_cd' ORDER BY ordinal_position"
c_names = engine.execute(table_name_sql).fetchall()
c_names

[('year',),
 ('month',),
 ('day',),
 ('flightdate',),
 ('dep_time',),
 ('sched_dep_time',),
 ('dep_delay',),
 ('arr_time',),
 ('sched_arr_time',),
 ('arr_delay',),
 ('carrier',),
 ('tailnum',),
 ('flight',),
 ('origin',),
 ('dest',),
 ('air_time',),
 ('distance',),
 ('cancelled',),
 ('diverted',),
 ('div1_airport',),
 ('cancellation_code',),
 ('weather_delay',)]

In [18]:
# we can clean up the results into a clean list
new_column_names=[]
for name in c_names:
    new_column_names.append(name[0])
new_column_names   

['year',
 'month',
 'day',
 'flightdate',
 'dep_time',
 'sched_dep_time',
 'dep_delay',
 'arr_time',
 'sched_arr_time',
 'arr_delay',
 'carrier',
 'tailnum',
 'flight',
 'origin',
 'dest',
 'air_time',
 'distance',
 'cancelled',
 'diverted',
 'div1_airport',
 'cancellation_code',
 'weather_delay']

In [19]:
# Marked as a commend for savety.
airline_csv_to_sql()

Postgres Database connection successful
--2021-06-06 15:07:09--  https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2021_2.zip
Resolving transtats.bts.gov... 204.68.194.70
Connecting to transtats.bts.gov|204.68.194.70|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 16566293 (16M) [application/x-zip-compressed]
Saving to: 'data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2021_2.zip.18'


2021-06-06 15:08:00 (325 KB/s) - 'data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2021_2.zip.18' saved [16566293/16566293]

starting uploading 41034 rows from 2021-2
done uploading 2021-2


### First Clean up in DBeaver

In [None]:
SELECT f.flightdate,
	 f.origin,
	 f.dest,
	 dep_time_f_utc,
	 arr_time_f_utc,
	 air_time_f,
	 travel_time_utc,
	 MAKE_TIMESTAMP(DATE_PART('year', f.flightdate)::Int,
	  				 DATE_PART('month', f.flightdate)::Int,
	  				 DATE_PART('day', f.flightdate)::Int,
	  				 DATE_PART('hour', dep_time_f_utc)::Int,
	  				 DATE_PART('minute', dep_time_f_utc)::Int,
	  				 0) AS dep_timestamp_utc,
	 MAKE_TIMESTAMP(DATE_PART('year', f.flightdate)::Int,
	  				 DATE_PART('month', f.flightdate)::Int,
	  				 DATE_PART('day', f.flightdate)::Int,
	  				 DATE_PART('hour',arr_time_f_utc)::Int,
	  				 DATE_PART('minute', arr_time_f_utc)::Int,
	  				 0) AS arr_timestamp_utc
FROM (SELECT flightdate,
			 origin,
		dest,
		MAKE_TIME(dep_time/100, MOD(dep_time, 100), 0) - MAKE_INTERVAL(hours => CAST(a.tz AS INT)) AS dep_time_f_utc,
		MAKE_TIME(arr_time/100, MOD(arr_time, 100), 0) - MAKE_INTERVAL(hours => CAST(aa.tz AS INT)) AS arr_time_f_utc,
		MAKE_INTERVAL(mins => air_time) AS air_time_f,
		(MAKE_TIME((arr_time/100), (arr_time%100),0) - MAKE_INTERVAL(hours => CAST(a.tz AS INT))) - (MAKE_TIME((dep_time/100),(dep_time%100),0) - MAKE_INTERVAL(hours => CAST(a.tz AS INT))) AS travel_time_utc
	 FROM flights_cd_pro 
	LEFT JOIN airports AS a
	ON origin = a.faa
	LEFT JOIN airports AS aa 
	ON dest = aa.faa) AS f;