# Flights data preparation

In [14]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark.sql.types import *
import pandas as pd
from io import StringIO
import matplotlib.pyplot as plt
hc = sc._jsc.hadoopConfiguration()
hc.set("hive.execution.engine", "mr")

## Function to parse CSV

In [15]:
import csv

def parseCsv(csvStr):
    f = StringIO(csvStr)
    reader = csv.reader(f, delimiter=',')
    row = next(reader)
    return row

scsv = '"02Q","Titan Airways"'
row = parseCsv(scsv)
print(row[0])
print(row[1])

02Q
Titan Airways


## Parse and convert Carrier data to parquet

In [7]:
carriersHeader = 'Code,Description'
carriersText = sc.textFile("s3a://dlab6th-dmytro-liaskovskyi-bucket/carriers.csv").filter(lambda x: x != carriersHeader)
carriers = carriersText.map(lambda s: parseCsv(s)) \
    .map(lambda s: Row(code=s[0], description=s[1])).cache().toDF()
carriers.write.mode("overwrite").parquet("s3a://dlab6th-dmytro-liaskovskyi-bucket/processed3/carriers")    
sqlContext.registerDataFrameAsTable(carriers, "carriers")
carriers.limit(20).toPandas()

Unnamed: 0,code,description
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.
5,09Q,"Swift Air, LLC"
6,0BQ,DCA
7,0CQ,ACM AIR CHARTER GmbH
8,0FQ,"Maine Aviation Aircraft Charter, LLC"
9,0GQ,"Inter Island Airways, d/b/a Inter Island Air"


## Parse and convert to parquet Airport data

In [12]:
airportsHeader= '"iata","airport","city","state","country","lat","long"'
airports = sc.textFile("s3a://dlab6th-dmytro-liaskovskyi-bucket/airports.csv") \
    .filter(lambda x: x != airportsHeader) \
    .map(lambda s: parseCsv(s)) \
    .map(lambda p: Row(iata=p[0], \
                       airport=p[1], \
                       city=p[2], \
                       state=p[3], \
                       country=p[4], \
                       lat=float(p[5]), \
                       longt=float(p[6])) \
        ).cache().toDF()
airports.write.mode("overwrite").parquet("s3a://dlab6th-dmytro-liaskovskyi-bucket/processed3/airports")    
sqlContext.registerDataFrameAsTable(airports, "airports")
airports.limit(20).toPandas()

Unnamed: 0,airport,city,country,iata,lat,longt,state
0,Thigpen,Bay Springs,USA,00M,31.953765,-89.234505,MS
1,Livingston Municipal,Livingston,USA,00R,30.685861,-95.017928,TX
2,Meadow Lake,Colorado Springs,USA,00V,38.945749,-104.569893,CO
3,Perry-Warsaw,Perry,USA,01G,42.741347,-78.052081,NY
4,Hilliard Airpark,Hilliard,USA,01J,30.688012,-81.905944,FL
5,Tishomingo County,Belmont,USA,01M,34.491667,-88.201111,MS
6,Gragg-Wade,Clanton,USA,02A,32.850487,-86.611453,AL
7,Capitol,Brookfield,USA,02C,43.08751,-88.177869,WI
8,Columbiana County,East Liverpool,USA,02G,40.673313,-80.641406,OH
9,Memphis Memorial,Memphis,USA,03D,40.447259,-92.226961,MO


## Parse and convert Flights data to parquet

In [16]:
flightsHeader = 'Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay'
flights = sc.textFile("s3a://dlab6th-dmytro-liaskovskyi-bucket/2008.csv.bz2") \
    .filter(lambda x: x!= flightsHeader) \
    .map(lambda s: parseCsv(s)) \
    .map(lambda p: Row(Year=int(p[0]), \
                       Month=int(p[1]), \
                       DayofMonth=int(p[2]), \
                       DayOfWeek=int(p[3]), \
                       DepTime=p[4], \
                       CRSDepTime=p[5], \
                       ArrTime=p[6], \
                       CRSArrTime=p[7], \
                       UniqueCarrier=p[8], \
                       FlightNum=p[9], \
                       TailNum=p[10], \
                       ActualElapsedTime=p[11], \
                       CRSElapsedTime=p[12], \
                       AirTime=p[13], \
                       ArrDelay=int(p[14].replace("NA", "0")), \
                       DepDelay=int(p[15].replace("NA", "0")), \
                       Origin=p[16], \
                       Dest=p[17], \
                       Distance=int(p[18]), \
                       TaxiIn=p[19], \
                       TaxiOut=p[20], \
                       Cancelled=p[21], \
                       CancellationCode=p[22], \
                       Diverted=p[23], \
                       CarrierDelay=int(p[24].replace("NA", "0")), \
                                              CarrierDelayStr=p[24], \
                       WeatherDelay=int(p[25].replace("NA", "0")), \
                                              WeatherDelayStr=p[25], \
                       NASDelay=int(p[26].replace("NA", "0")), \
                       SecurityDelay=int(p[27].replace("NA", "0")), \
                       LateAircraftDelay=int(p[28].replace("NA", "0")))) \
         .toDF()

flights.write.mode("ignore").parquet("s3a://dlab6th-dmytro-liaskovskyi-bucket/processed3/flights")
sqlContext.registerDataFrameAsTable(flights, "flights")
flights.limit(10).toPandas()[["ArrDelay","CarrierDelay","CarrierDelayStr","WeatherDelay","WeatherDelayStr","Distance"]]

Unnamed: 0,ArrDelay,CarrierDelay,CarrierDelayStr,WeatherDelay,WeatherDelayStr,Distance
0,-14,0,,0,,810
1,2,0,,0,,810
2,14,0,,0,,515
3,-6,0,,0,,515
4,34,2,2.0,0,0.0,515
5,11,0,,0,,688
6,57,10,10.0,0,0.0,1591
7,-18,0,,0,,1591
8,2,0,,0,,451
9,-16,0,,0,,451
