# Introduction and scope of the project

This document gives an overview of the rationale behind the choice of data and the ETL process implemented in the present project.

The aim is to gather information about commercial flight delays in the United States and consolidate them with methereolgical data (precipitations) at the departure and destination airport, with the final objective of building a dataset for analytical purposes. The resulting dataset can be used for research purposes or to feed a dashboard or other types of data visualizations - enabling end users to investigate e.g. correlations between precipitations and delays or flight cancellations, controlling them by date, day of the week, carrier, geographical position of the airports, etc.

Data about commercial flight delays in the U.S. are available through the Bureau of Transport Statistics website (https://www.transtats.bts.gov), however there is no API available to programmatically access the dataset. For the purpose of the present exercise, a simplified version of the data (available on https://www.kaggle.com/yuanyuwendymu/airline-delay-and-cancellation-data-2009-2018) were stored as .csv on a publicly accessible S3 bucket. Each year include approx. 5 millions observations.

Daily precipitations values can be accessed in JSON format through the API of the Applied Climate Information System (ACIS, http://www.rcc-acis.org/docs_webservices.html#title44). The MultiStnData web service gives access to a single measurement for a day for each station in the dataset. As detailed below, the dataset is queried only for the stations which are also airports and are represented in the delays dataset. As the number of airports in the dataset on a given year is about 300, each year will include approx. 3,300,000 observations.


# Data exploration and assessment

## Delays dataset
The delays dataset gives a granular overview of the timing and delays faced of a commercial flight, with information at flight level.The dataset taken into consideration doesn't contain duplicate entries nor completely missing rows - however in order to prevent issues and since duplicates would clearly be erroneous, we filter duplicates and entries that are missing date or origin or destination, since missing these values would make the entry in any case not usable for our purposes. We also include only the entries between the desired start and end dates. 

Below we will walk through some of the steps that make up the etl process in order to get a better understanding of the dataset and the steps required. Given the different purpose, some of the coding presented here will be slightly different in the etl code, where some passages have been omitted as not necessary for the process itself.

In [1]:
import pandas as pd
import zipfile
import gzip
import glob
from datetime import datetime, timedelta
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_add, lit, rank, to_date, sequence, regexp_extract, concat_ws, udf, col, explode, explode_outer, year, month, dayofmonth, hour, weekofyear, date_format, split, expr,array_contains, row_number
from pyspark.sql.window import Window
from pyspark.sql import types as T
import json
import urllib.request
from urllib.request import urlopen
import requests
import psycopg2
import time
import boto3
from ast import literal_eval

In [61]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.sql.broadcastTimeout", "36000") \
        .getOrCreate()
    return spark



In [3]:
spark=create_spark_session()

In [4]:
delays=spark.read.format('csv').options(header='true', inferSchema='true').load("{}".format(config.get("STAGE","source_delays")))

In [5]:
delays.printSchema()

root
 |-- FL_DATE: timestamp (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: double (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- TAXI_OUT: double (nullable = true)
 |-- WHEELS_OFF: double (nullable = true)
 |-- WHEELS_ON: double (nullable = true)
 |-- TAXI_IN: double (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: double (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- CANCELLED: double (nullable = true)
 |-- CANCELLATION_CODE: string (nullable = true)
 |-- DIVERTED: double (nullable = true)
 |-- CRS_ELAPSED_TIME: double (nullable = true)
 |-- ACTUAL_ELAPSED_TIME: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- CARRIER_DELAY: double (nullable = true)
 |-- WEATHER_DELAY:

In [6]:
delays.groupBy(delays.columns) \
    .count() \
    .where(col("count") > 1).show()


+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------+-----+
|FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|DEST|CRS_DEP_TIME|DEP_TIME|DEP_DELAY|TAXI_OUT|WHEELS_OFF|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|ARR_DELAY|CANCELLED|CANCELLATION_CODE|DIVERTED|CRS_ELAPSED_TIME|ACTUAL_ELAPSED_TIME|AIR_TIME|DISTANCE|CARRIER_DELAY|WEATHER_DELAY|NAS_DELAY|SECURITY_DELAY|LATE_AIRCRAFT_DELAY|Unnamed: 27|count|
+-------+----------+-----------------+------+----+------------+--------+---------+--------+----------+---------+-------+------------+--------+---------+---------+-----------------+--------+----------------+-------------------+--------+--------+-------------+-------------+---------+--------------+-------------------+-----------

In [7]:
delays.count()-delays.na.drop(subset=("FL_DATE","ORIGIN","DEST")).count()

0

In [8]:
delays.where(delays.FL_DATE >= to_date(lit("{}".format(config.get("STAGE","sdate"))))) \
      .where(delays.FL_DATE <= to_date(lit("{}".format(config.get("STAGE","edate"))))) \
      .distinct().na.drop(subset=("FL_DATE","ORIGIN","DEST")) \
      .createOrReplaceTempView("delays_sql")

In [9]:
spark.sql("""
select * from delays_sql limit 5""").toPandas()

Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
0,2017-01-01,AA,203,PHL,BOS,1750,1743.0,-7.0,20.0,1803.0,...,79.0,77.0,50.0,280.0,,,,,,
1,2017-01-01,AA,292,LAX,JFK,600,555.0,-5.0,13.0,608.0,...,329.0,300.0,280.0,2475.0,,,,,,
2,2017-01-01,AA,1161,ORD,MCI,2020,2019.0,-1.0,16.0,2035.0,...,92.0,84.0,64.0,403.0,,,,,,
3,2017-01-01,AA,1172,BOS,MIA,915,912.0,-3.0,14.0,926.0,...,216.0,197.0,176.0,1258.0,,,,,,
4,2017-01-01,B6,403,JFK,SJU,1208,1202.0,-6.0,15.0,1217.0,...,223.0,205.0,187.0,1598.0,,,,,,


In order to import the precipitation data, we need first need a list of the FAA codes of the airports in the dataset to filter the ACIS dataset (which contains data for hundreds of stations that are not relevant for the analysis). 

In [10]:
list_airports=spark.sql("""
WITH append_airports
AS
(
SELECT
DISTINCT origin
FROM
delays_sql

UNION

SELECT
DISTINCT dest
FROM
delays_sql
)

SELECT 
DISTINCT * 
FROM
append_airports
""").select('origin').rdd.flatMap(lambda x: x).collect()

separator=","
string_airports=separator.join(list_airports)
print(string_airports)

BGM,INL,PSE,MSY,PPG,GEG,BUR,SNA,GRB,GTF,IDA,GRR,EUG,PSG,PVD,MYR,GSO,OAK,MQT,MSN,FSM,FAR,BTM,COD,SCC,ESC,DCA,CID,GTR,LWS,MLU,PIB,WRG,HLN,CIU,IAG,RDM,LEX,JMS,ORF,SCE,KTN,EVV,CRW,CWA,SAV,GCK,TRI,CDV,CMH,CAK,TYR,ADK,CHO,MOB,PNS,LIH,IAH,HNL,SHV,ERI,SJC,CVG,LGA,TLH,BUF,CDC,ACT,HPN,RDD,AUS,GCC,MLI,SJU,ATW,DHN,LGB,AVL,GJT,GFK,BFL,RNO,SRQ,EYW,SBN,BJI,TTN,JAC,RST,CHS,RSW,TUL,HRL,ISP,AMA,BOS,MAF,MLB,EWR,LAS,BIS,JAN,FAI,ITO,IMT,XNA,DLH,DEN,EWN,RHI,SGU,CPR,ALB,LNK,OME,GRI,IAD,BOI,SBA,PSP,LAR,MEI,HOB,DRO,BRO,BRD,BMI,RKS,SEA,LAN,LRD,PBG,HYS,VLD,MCI,FLG,GRK,CLT,BNA,CLL,TVC,BLI,PSC,ORH,PBI,ABQ,SDF,ACV,LAW,DAL,BDL,MRY,ITH,CLE,PDX,MFR,MIA,TWF,TPA,BWI,CMX,APN,OKC,ROA,SMF,SPI,OTH,BRW,ABI,MBS,ELM,PHX,FCA,DVL,ABR,STL,PWM,ABY,BET,DFW,TXK,MHT,ABE,GSP,MMH,LSE,STX,FAY,HDN,GUC,LBB,EKO,CRP,EGE,FSD,SWF,SUN,BQK,CSG,SFO,MEM,SAF,ELP,BHM,FLL,ATL,FNT,PIH,RIC,DAY,PHF,OMA,SJT,LCH,VPS,BPT,LIT,FAT,ECP,ICT,CAE,ORD,AVP,LBE,BTV,MKG,SPS,AEX,BIL,ILM,PIA,GUM,RDU,BQN,MFE,HIB,MKE,ISN,SYR,LFT,HSV,PIT,TUS,MTJ,ROW,ACY,MDW,AZO,PLN,OAJ,

## ACIS precipitations dataset
We next define the functions that will enable us to access and put in tabular format the JSON file from ACIS web service. The JSON export has several nested fields which require some wrangling in order to be stored in tabular format. The example belows contain observations from three stations for three days between the 1 and 3 January 2017.

In [11]:
previewurl='http://data.rcc-acis.org/MultiStnData?&sdate=20170101&edate=20170103&elems=pcpn&sids=BGM,INL,MSY&output=json&meta=name,state,ll,uid,elev,sids'
previewjsonData = urlopen(previewurl).read().decode('utf-8')
previewrdd = spark.sparkContext.parallelize([previewjsonData])
    
previewto_explode =spark.read.option("multiline","true").json(previewrdd)
previewto_explode.toPandas()

Unnamed: 0,data
0,"[([['0.01'], ['0.02'], ['0.58']], (1593.0, [-7..."


In [12]:
previewto_explode.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |-- meta: struct (nullable = true)
 |    |    |    |-- elev: double (nullable = true)
 |    |    |    |-- ll: array (nullable = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- sids: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- uid: long (nullable = true)



We first explode the data in order to have a different line for each data array:

In [13]:
previewexploded=previewto_explode.select(explode(previewto_explode.data))
previewexploded.toPandas()

Unnamed: 0,col
0,"([[0.01], [0.02], [0.58]], (1593.0, [-75.97993..."
1,"([[0.02], [0.25], [0.30]], (1157.0, [-93.39554..."
2,"([[2.13], [0.42], [0.00]], (-3.0, [-90.27772, ..."


We then expand each row into different columns through a custom function (from https://stackoverflow.com/a/50156142):

In [14]:
def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, T.ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, T.StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields
#credit to https://stackoverflow.com/a/50156142

previewflattened=previewexploded.select(flatten((previewexploded.schema)))
previewflattened.toPandas()

Unnamed: 0,data,elev,ll,name,sids,state,uid
0,"[[0.01], [0.02], [0.58]]",1593.0,"[-75.97993, 42.20678]",BINGHAMTON (GREATER AP),"[04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, US...",NY,18571
1,"[[0.02], [0.25], [0.30]]",1157.0,"[-93.39554, 48.55943]",INTERNATIONAL FALLS INTL AP,"[14918 1, 214026 2, INL 3, 72747 4, KINL 5, US...",MN,10639
2,"[[2.13], [0.42], [0.00]]",-3.0,"[-90.27772, 29.99755]",NEW ORLEANS AP,"[12916 1, 166660 2, 166295 2, MSY 3, 72231 4, ...",LA,9580


We then store longitude and latitude into different columns and extract the relevant FAA (Federal Aviation Administration) identifier from the string array storing all the identifiers for each station (in the dataset, the FAA codes are identified with a "3" following them):

In [15]:
previewflattened=previewflattened.select("name",
                                         "state",
                                         "sids",
                                         concat_ws(", ","sids").alias("sids_string"),
                                         "uid",
                                         previewflattened.ll[0].alias("long"),
                                         previewflattened.ll[1].alias("lat"),
                                         "elev",
                                         "data") \
                                         .withColumn("faa",regexp_extract('sids_string', "(\w+(?=\s+3))", 1))                           
previewflattened.toPandas()

Unnamed: 0,name,state,sids,sids_string,uid,long,lat,elev,data,faa
0,BINGHAMTON (GREATER AP),NY,"[04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, US...","04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, USW...",18571,-75.97993,42.20678,1593.0,"[[0.01], [0.02], [0.58]]",BGM
1,INTERNATIONAL FALLS INTL AP,MN,"[14918 1, 214026 2, INL 3, 72747 4, KINL 5, US...","14918 1, 214026 2, INL 3, 72747 4, KINL 5, USW...",10639,-93.39554,48.55943,1157.0,"[[0.02], [0.25], [0.30]]",INL
2,NEW ORLEANS AP,LA,"[12916 1, 166660 2, 166295 2, MSY 3, 72231 4, ...","12916 1, 166660 2, 166295 2, MSY 3, 72231 4, K...",9580,-90.27772,29.99755,-3.0,"[[2.13], [0.42], [0.00]]",MSY


Next we filter out any values where the faa identifier is missing (this step is purely precautional, since by definition only entries with a valid FAA identifier are filtred) and explode the measurements array, in order to have a row for each day:

In [16]:
previewflattened=previewflattened.where(previewflattened.faa != "") \
                                 .select("name",
                                         "state",
                                         "sids",
                                         "faa",
                                         "uid",
                                         "long",
                                         "lat",
                                         "elev",
                                         explode_outer(previewflattened.data).alias("measure")) \
                                  .withColumn('s_date',lit("2017-01-01")) 

previewflattened.toPandas()

Unnamed: 0,name,state,sids,faa,uid,long,lat,elev,measure,s_date
0,BINGHAMTON (GREATER AP),NY,"[04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, US...",BGM,18571,-75.97993,42.20678,1593.0,[0.01],2017-01-01
1,BINGHAMTON (GREATER AP),NY,"[04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, US...",BGM,18571,-75.97993,42.20678,1593.0,[0.02],2017-01-01
2,BINGHAMTON (GREATER AP),NY,"[04725 1, 300687 2, BGM 3, 72515 4, KBGM 5, US...",BGM,18571,-75.97993,42.20678,1593.0,[0.58],2017-01-01
3,INTERNATIONAL FALLS INTL AP,MN,"[14918 1, 214026 2, INL 3, 72747 4, KINL 5, US...",INL,10639,-93.39554,48.55943,1157.0,[0.02],2017-01-01
4,INTERNATIONAL FALLS INTL AP,MN,"[14918 1, 214026 2, INL 3, 72747 4, KINL 5, US...",INL,10639,-93.39554,48.55943,1157.0,[0.25],2017-01-01
5,INTERNATIONAL FALLS INTL AP,MN,"[14918 1, 214026 2, INL 3, 72747 4, KINL 5, US...",INL,10639,-93.39554,48.55943,1157.0,[0.30],2017-01-01
6,NEW ORLEANS AP,LA,"[12916 1, 166660 2, 166295 2, MSY 3, 72231 4, ...",MSY,9580,-90.27772,29.99755,-3.0,[2.13],2017-01-01
7,NEW ORLEANS AP,LA,"[12916 1, 166660 2, 166295 2, MSY 3, 72231 4, ...",MSY,9580,-90.27772,29.99755,-3.0,[0.42],2017-01-01
8,NEW ORLEANS AP,LA,"[12916 1, 166660 2, 166295 2, MSY 3, 72231 4, ...",MSY,9580,-90.27772,29.99755,-3.0,[0.00],2017-01-01


We finally partition the dataset by the ACIS dataset ID and assign a date for each entry:  

In [17]:
w = Window().partitionBy('uid').orderBy("s_date")
    
previewflattened = previewflattened.withColumn('row_id',row_number().over(w)) \
                       .withColumn('date',expr("date_add(s_date, row_id-1)")) \
                       .withColumn('year', date_format(col('date'),'Y')) \
                       .withColumn('month', date_format(col('date'),'M')) 
    
    
previewacis_data=previewflattened.select("name",
                               "faa",
                               "state", 
                               "long", 
                               "lat", 
                               "elev", 
                               "date",
                               "year",
                               "month",
                               previewflattened.measure[0].alias("pcpn")) 
previewacis_data.toPandas()

Unnamed: 0,name,faa,state,long,lat,elev,date,year,month,pcpn
0,INTERNATIONAL FALLS INTL AP,INL,MN,-93.39554,48.55943,1157.0,2017-01-01,2017,1,0.02
1,INTERNATIONAL FALLS INTL AP,INL,MN,-93.39554,48.55943,1157.0,2017-01-02,2017,1,0.25
2,INTERNATIONAL FALLS INTL AP,INL,MN,-93.39554,48.55943,1157.0,2017-01-03,2017,1,0.3
3,NEW ORLEANS AP,MSY,LA,-90.27772,29.99755,-3.0,2017-01-01,2017,1,2.13
4,NEW ORLEANS AP,MSY,LA,-90.27772,29.99755,-3.0,2017-01-02,2017,1,0.42
5,NEW ORLEANS AP,MSY,LA,-90.27772,29.99755,-3.0,2017-01-03,2017,1,0.0
6,BINGHAMTON (GREATER AP),BGM,NY,-75.97993,42.20678,1593.0,2017-01-01,2017,1,0.01
7,BINGHAMTON (GREATER AP),BGM,NY,-75.97993,42.20678,1593.0,2017-01-02,2017,1,0.02
8,BINGHAMTON (GREATER AP),BGM,NY,-75.97993,42.20678,1593.0,2017-01-03,2017,1,0.58


The function below performs the whole transformation process, taking as inputs start and end dates and the list of FAA ids, storing the result to a local variable.

In [18]:
def get_acis_data(sdate,
                  edate,
                  sids="{}".format(string_airports)
                 ):
    
        
    url='http://data.rcc-acis.org/MultiStnData?&sdate='+sdate+'&edate='+edate+'&elems=pcpn&sids='+sids+'&output=json&meta=name,state,ll,uid,elev,sids'
    jsonData = urlopen(url).read().decode('utf-8')
    rdd = spark.sparkContext.parallelize([jsonData])
    
    to_explode =spark.read.option("multiline","true").json(rdd)
    exploded=to_explode.select(explode(to_explode.data))
    flattened=exploded.select(flatten((exploded.schema)))
    
    flattened=flattened.select("name",
                               "state",
                               "sids",
                               concat_ws(", ","sids").alias("sids_string"),
                               "uid",
                               flattened.ll[0].alias("long"),
                               flattened.ll[1].alias("lat"),
                               "elev",
                               "data") \
                        .withColumn("faa",regexp_extract('sids_string', "(\w+(?=\s+3))", 1))                           
    
    
    flattened=flattened.where(flattened.faa != "") \
                       .select("name",
                               "state",
                               "sids",
                               "faa",
                               "uid",
                               "long",
                               "lat",
                               "elev",
                               explode_outer(flattened.data).alias("measure")) \
                       .withColumn('s_date',lit(sdate)) 
    
    
    
    w = Window().partitionBy('uid').orderBy("s_date")
    
    flattened = flattened.withColumn('row_id',row_number().over(w)) \
                       .withColumn('date',expr("date_add(s_date, row_id-1)")) \
                       .withColumn('year', date_format(col('date'),'Y')) \
                       .withColumn('month', date_format(col('date'),'M')) 
                      
    
    
    acis_data=flattened.select("name",
                               "faa",
                               "state", 
                               "long", 
                               "lat", 
                               "elev", 
                               "date",
                               "year",
                               "month",
                               flattened.measure[0].alias("pcpn")) 
    
    return acis_data

As an example with the full list of airports created above:

In [19]:
get_acis_data(sdate='2017-01-01',edate='2017-01-03').createOrReplaceTempView("acis_data_sql")

In [20]:
spark.sql("""
select * from acis_data_sql limit 5""").toPandas()

Unnamed: 0,name,faa,state,long,lat,elev,date,year,month,pcpn
0,COLORADO SPRINGS MUNICIPAL AP,COS,CO,-104.68873,38.80949,6182.0,2017-01-01,2017,1,0.0
1,COLORADO SPRINGS MUNICIPAL AP,COS,CO,-104.68873,38.80949,6182.0,2017-01-02,2017,1,0.0
2,COLORADO SPRINGS MUNICIPAL AP,COS,CO,-104.68873,38.80949,6182.0,2017-01-03,2017,1,0.02
3,FORT WAYNE INTL AP,FWA,IN,-85.20636,40.97248,797.0,2017-01-01,2017,1,0.0
4,FORT WAYNE INTL AP,FWA,IN,-85.20636,40.97248,797.0,2017-01-02,2017,1,0.48


## Data Model

The objective of the process is the creation of an analytical dataset where information about commercial flight delays can be compared with precipitation information at the departure and destination airport.

As the precipitation dataset contains information at daily level and details about individual flights data are not required, the information will be grouped at route per day level - therefore each row will be univoquely identified as a origin-destination-date combination. Two additional dimensions will be included, one for the airports in the dataset (containing full name, latitude and longitude and elevation) and a second one with the different  routes in the dataset, identified by flight number and day of the week, and including information about the carrier, origin and destination and first and last flight date in the dataset. We finally include a date dimension. 

The tables are created from the datasets described above and loaded to S3 as a staging area, before being loaded to redshift. As an example below we display how the process would look like for the flights dimension table.

In [21]:
flights = spark.sql(
        """
        select distinct 
        OP_CARRIER carrier,
        OP_CARRIER_FL_NUM fl_num,
        ORIGIN,
        DEST,
        CRS_DEP_TIME,
        CRS_ARR_TIME,
        (weekday(FL_DATE)) weekday,
        min(FL_DATE) first_flight_in_ds,
        max(FL_DATE) last_flight_in_ds
        
        FROM
        delays_sql
        
        group by 
        carrier, 
        fl_num,
        ORIGIN,
        weekday,
        DEST,
        CRS_DEP_TIME,
        CRS_ARR_TIME
        """)

In [22]:
if flights.count()>0:
    flights.write.mode("overwrite").csv("{}".format(config.get("STAGE","staging_flights_buck"))+"/flights.csv")
else:print("Flights table has 0 entries")

In [64]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS flights (
    carrier varchar(2),
    fl_num int,
    origin varchar(3),
    dest varchar(3),
    crs_dep_time int,
    crs_arr_time int,
    weekday int,
    first_flight_in_ds date,
    last_flight_in_ds date
    );
""")
conn.commit()

cur.execute(("""
copy flights 
     from {}
     iam_role '{}'
     CSV 
     DATEFORMAT 'YYYY-MM-DD'
     TIMEFORMAT 'auto'
     compupdate off 
     region \'us-east-1\';
""").format(config.get("S3","staging_flights_file"), config.get("IAM_ROLE", "ARN")))

conn.commit()


conn.close()