In [1]:
import pandas as pd
import numpy as np
import requests
import zipfile
from configdef import *
from sqlalchemy import exc
from sqlalchemy import event


In [25]:
def airline_csv_to_sql(years=[2020], months=[2], path ='data/', table='flights', airports=[]):
    '''Downloads and unzips the flight data from BTS'''
    
    # Establish db connection
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    engine.dispose()

    # Loop through months
    for year in years:
        for month in months:
            # Get the file
            zip_file = f'On_Time_Reporting_Carrier_On_Time_Performance_1987_present_{year}_{month}.zip'
            csv_file = f'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_{year}_{month}.csv'
            url = (f'https://transtats.bts.gov/PREZIP/{zip_file}')
            arg = f' -P {path} --no-check-certificate'
            !wget {url}{arg} 

            # unzip
            with zipfile.ZipFile(path+zip_file, 'r') as zip_ref:
                zip_ref.extractall(path)
            
            # prepare df
            df_airline = pd.read_csv(path+csv_file, low_memory=False)
            df_airline = clean_airline_df(df_airline)
            
            # Select specific airports 
            if len(airports) > 0:
                df_airline = select_airport(df_airline, airports)

            # to SQL
            print(f'starting uploading {df_airline.shape[0]} rows from {year}-{month}')
            df_airline.to_sql(table, engine, index=False, if_exists="append", 
                  method='multi', chunksize=5000)
            print(f'done uploading {year}-{month}')

In [3]:

# Columns from downloaded file that are to be kept

columns_to_keep = [
                'Year',
                'Month',
                'DayofMonth',
                'FlightDate',
                'DepTime',
                'CRSDepTime',
                'DepDelay',
                'ArrTime',
                'CRSArrTime',
                'ArrDelay',
                'Reporting_Airline',
                'Tail_Number',
                'Flight_Number_Reporting_Airline',
                'Origin',
                'Dest',
                'AirTime',
                'Distance',
                'Cancelled',
                'Diverted'
]

In [4]:
# New column names for SQL database table

new_column_names = [
                    'year',
                    'month',
                    'day',
                    'flightdate',
                    'dep_time',
                    'sched_dep_time',
                    'dep_delay',
                    'arr_time',
                    'sched_arr_time',
                    'arr_delay',
                    'carrier',
                    'tailnum',
                    'flight',
                    'origin',
                    'dest',
                    'air_time',
                    'distance',
                    'cancelled',
                    'diverted',
]

In [14]:
def clean_airline_df(df, rows=10, starting_row=0):
    '''
    Transforms a df made from BTS csv file into a df that is ready to be uploaded to SQL
    Set rows=0 for no filtering
    '''
    # sub set of data
    if rows > 0:
        df_airline = df.iloc[starting_row:(starting_row+rows),]
    
    df_airline = df.loc[starting_row:,columns_to_keep]
    
    
    # Clean data types and NULLs
    df_airline['Year']= pd.to_numeric(df_airline['Year'], downcast='integer')
    df_airline['Month']= pd.to_numeric(df_airline['Month'], downcast='integer')
    df_airline['DayofMonth']= pd.to_numeric(df_airline['DayofMonth'], downcast='integer')
    df_airline['FlightDate']= pd.to_datetime(df_airline['FlightDate'], yearfirst=True)
    df_airline['CRSArrTime']= pd.to_numeric(df_airline['CRSArrTime'], downcast='integer', errors='coerce')
    df_airline['Cancelled']= pd.to_numeric(df_airline['Cancelled'], downcast='integer')
    df_airline['Diverted']= pd.to_numeric(df_airline['Diverted'], downcast='integer')
    
    # rename columns
    df_airline.columns = new_column_names
    
    return df_airline

In [16]:
def select_airport(df, airports):
    ''' Helper function for filtering airline df for a subset of airports'''
    df_out = df.loc[(df.origin.isin(airports)) | (df.dest.isin(airports))]
    return df_out

In [26]:
def airline_csv_to_sql_nodl(years=[2020], months=[2], path ='data/', table='flights', airports=[]):
    '''Sends BTS data to SQL'''

    
    # establish db connection
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    engine.dispose()

    # Loop through months
    for year in years:
        for month in months:
            # Get the file
#            zip_file = f'On_Time_Reporting_Carrier_On_Time_Performance_1987_present_{year}_{month}.zip'
            csv_file = f'On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_{year}_{month}.csv'
#            url = (f'https://transtats.bts.gov/PREZIP/{zip_file}')
#            arg = f' -P {path} --no-check-certificate'
#            !wget {url}{arg} 

            # unzip
#            with zipfile.ZipFile(path+zip_file, 'r') as zip_ref:
#                zip_ref.extractall(path)
            
            # prepare df
            df_airline = pd.read_csv(path+csv_file, low_memory=False)
            df_airline = clean_airline_df(df_airline)
            
            # Select specific airports 
            if len(airports) > 0:
                df_airline = select_airport(df_airline, airports)

            # to SQL
            print(f'starting uploading {df_airline.shape[0]} rows from {year}-{month}')
            df_airline.to_sql(table, engine, index=False, if_exists="append", 
                  method='multi', chunksize=5000)
            print(f'done uploading {year}-{month}')

In [29]:
airline_csv_to_sql(years=[2019], months=[1, 2, 3, 4, 5, 6, 7, 8, 10, 12], path ='data/', table='nyflights', 
                   airports=['LGA','JFK', 'EWR'])

Postgres Database connection successful
--2021-03-23 21:14:12--  https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_1.zip
Resolving transtats.bts.gov... 204.68.194.70
Connecting to transtats.bts.gov|204.68.194.70|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 29527308 (28M) [application/x-zip-compressed]
Saving to: 'data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_1.zip'


2021-03-23 21:15:26 (397 KB/s) - 'data/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_1.zip' saved [29527308/29527308]

starting uploading 69780 rows from 2019-1
done uploading 2019-1
--2021-03-23 21:16:10--  https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2019_2.zip
Resolving transtats.bts.gov... 204.68.194.70
Connecting to transtats.bts.gov|204.68.194.70|:443... connected.
  Unable to locally verify the issuer's a

In [2]:
params = config(section='postgres')
engine = pg_engine_connection(**params)
engine.dispose()

Postgres Database connection successful


In [3]:
result = engine.execute('select * from nyflights limit 3').fetchall()
display(result)

[(2019, 9, 18, datetime.datetime(2019, 9, 18, 0, 0), 732.0, 733, -1.0, 908.0, 900, 8.0, 'YX', 'N809MD', 5944, 'PIT', 'LGA', 65.0, 335.0, 0, 0),
 (2019, 9, 18, datetime.datetime(2019, 9, 18, 0, 0), 657.0, 700, -3.0, 807.0, 817, -10.0, 'YX', 'N870RW', 5946, 'LGA', 'DCA', 38.0, 214.0, 0, 0),
 (2019, 9, 18, datetime.datetime(2019, 9, 18, 0, 0), 611.0, 615, -4.0, 756.0, 759, -3.0, 'YX', 'N874RW', 5948, 'CMH', 'LGA', 86.0, 479.0, 0, 0)]

In [4]:
engine.execute('select count(*) from nyflights').fetchall()

[(1259729,)]

In [None]:
airport_columns = ["id", "name", "city", "country", "faa", "icao", "lat", "lon", "alt", "tz", "dst", "region", 'airport', 'source']
airport_columns_to_keep = ['faa', 'name', 'lat', 'lon', 'alt', 'tz', 'dst', 'city', 'country']

In [None]:
# All columns in source table
airport_columns = ["id", "name", "city", "country", "faa", "icao", "lat", "lon", "alt", "tz", "dst", "region", 'airport', 'source']
# Columns needed in sql db
airport_columns_to_keep = ['faa', 'name', 'lat', 'lon', 'alt', 'tz', 'dst', 'city', 'country']

def run_aiports_table_pipeline():
    ''' Create the airports table in SQL based on remote source'''
    
    df_airports = pd.read_csv("https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat", 
                              names=airport_columns)
    df_airports = df_airports.loc[:,airport_columns_to_keep]
    # Clean NULL values
    df_airports = df_airports.loc[df_airports['faa']!='\\N']
    df_airports.replace('\\N',np.NaN, inplace=True)
    
    df_airports['tz']= pd.to_numeric(df_airports['tz'], errors='coerce', downcast='integer')
    
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    if engine!=None:

        try:
            # sending df to spl
            df_airports.to_sql('airports', con=engine, if_exists='replace', index=False)
            print(f"airports table is imported successfully.")

        except exc.SQLAlchemyError as e:
            print(type(e))
            # set the connection to 'None' in case of error
            engine = None

In [None]:
engine.dispose()

def run_aiports_alt_table_pipeline():
    df_airports_alt = pd.read_csv('data/L_AIRPORT.csv')
    df_airports_alt.columns = ['airport_code', 'airport_description']
    
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    if engine!=None:

        try:
            # sending df to spl
            df_airports_alt.to_sql('airports_alt', con=engine, if_exists='replace', index=False)
            print(f"airports_alt table is imported successfully.")

        except exc.SQLAlchemyError as e:
            print(type(e))
            # set the connection to 'None' in case of error
            engine = None

In [None]:
def run_carriers_table_pipeline():
    df_carriers = pd.read_csv('data/L_UNIQUE_CARRIERS.csv')
    df_carriers.columns = ['carrier', 'name']
    
    params = config(section='postgres')
    engine = pg_engine_connection(**params)
    if engine!=None:

        try:
            # sending df to spl
            df_carriers.to_sql('carriers', con=engine, if_exists='replace', index=False)
            print(f"carriers table is imported successfully.")

        except exc.SQLAlchemyError as e:
            print(type(e))
            # set the connection to 'None' in case of error
            engine = None

In [None]:
run_aiports_alt_table_pipeline()

In [None]:
    df_carriers = pd.read_csv('data/L_UNIQUE_CARRIERS.csv')
    df_carriers.columns = ['carrier', 'name']
    df_to_sql(df_carriers, 'carriers')

In [None]:
run_carriers_table_pipeline()