In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
        .builder \
        .master("local[11]") \
        .config("spark.executor.memory", "16g") \
        .config("spark.driver.memory", "16g") \
        .config("spark.memory.offHeap.enabled",True) \
        .config("spark.memory.offHeap.size","16g") \
        .appName("Airline") \
        .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

22/11/29 22:52:31 WARN Utils: Your hostname, LinuxGUI resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface eno1)
22/11/29 22:52:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/29 22:52:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/29 22:52:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


**Variables**

In [2]:
YEAR = 2013
SE = [(37,80),(24,65)]
NE = [(50,80),(37,65)]
MSE = [(37,95),(24,80)]
MNE = [(50,95),(37,80)]
MSW = [(37,110),(24,95)]
MNW = [(50,110),(37,95)]
SW = [(37,125),(24,110)]
NW = [(50,125),(37,110)]

**Load preprocessed airport-station dataset and GHCND weather data**

In [25]:
airport_station_df = spark.read.option("header",True).csv('./data/airports_stations.csv')
ghcnd_all = spark.read.option("header",True).csv('./data/filtered_weather.csv').drop('M_FLAG','Q_FLAG','S_FLAG','OBS_TIME')

selected_airport_station = airport_station_df.select(col('ORIGIN'),col('STATION'))
ghcnd_all = ghcnd_all.join(selected_airport_station, \
                                        ghcnd_all.STATION_ID == selected_airport_station.STATION, \
                                        "LeftOuter")

In [26]:
ghcnd_all.head()

Row(STATION_ID='AQW00061705', AIRPORT_CODE='PPG', DATE='20090101', ELEMENT='TMAX', DATA_VALUE='322', ORIGIN='PPG', STATION='AQW00061705')

**Load geological info for each airport and station**

In [27]:
full_stations_info = spark.read.option("header",False).csv('./data/ghcnd-stations.csv')

geo = airport_station_df \
    .join(full_stations_info,airport_station_df.STATION == full_stations_info._c0,'leftouter') \
    .select(col('ORIGIN').alias("AIRPORT_CODE"), \
            col('STATE'), \
            col('CITY'), \
            col('NAME'), \
            col('STATION'), \
            col('_c1').alias('LAITITUDE').cast('Double'), \
            col('_c2').alias('LONGITUDE').cast('Double') \
        ).withColumn('LONGITUDE', col('LONGITUDE')*-1)

In [28]:
geo.head()

Row(AIRPORT_CODE='BGM', STATE='NY', CITY='Binghamton', NAME='Greater Binghamton/Edwin A Link field', STATION='USW00014738', LAITITUDE=42.0833, LONGITUDE=76.1)

__Assign area for each airport/station__

In [29]:
areas = geo.withColumn("AREA",when((geo.LAITITUDE < SE[0][0]) & (geo.LONGITUDE < SE[0][1]),"SE") \
                                                .when((geo.LAITITUDE < NE[0][0]) & (geo.LONGITUDE < NE[0][1]),"NE") \
                                                .when((geo.LAITITUDE < MSE[0][0]) & (geo.LONGITUDE < MSE[0][1]),"MSE") \
                                                .when((geo.LAITITUDE < MNE[0][0]) & (geo.LONGITUDE < MNE[0][1]),"MNE") \
                                                .when((geo.LAITITUDE < MSW[0][0]) & (geo.LONGITUDE < MSW[0][1]),"MSW") \
                                                .when((geo.LAITITUDE < MNW[0][0]) & (geo.LONGITUDE < MNW[0][1]),"MNW") \
                                                .when((geo.LAITITUDE < SW[0][0]) & (geo.LONGITUDE < SW[0][1]),"SW") \
                                                .when((geo.LAITITUDE < NW[0][0]) & (geo.LONGITUDE < NW[0][1]),"NW") \
                                                .when((geo.LAITITUDE > NW[0][0]) & (geo.LONGITUDE > NW[0][1]),"NW") \
                                                .when((geo.LAITITUDE < SW[0][0]) & (geo.LONGITUDE > SW[0][1]),"SW") \
                                                .when((geo.LAITITUDE < SE[0][0]) & (geo.LONGITUDE < SE[0][1]),"SE") \
                                                .when((geo.LAITITUDE > NE[0][0]) & (geo.LONGITUDE < NE[0][1]),"NE") \
                                                .otherwise(geo.STATION)) \
                            .drop('LAITITUDE','LONGITUDE')

In [30]:
areas.head()

Row(AIRPORT_CODE='BGM', STATE='NY', CITY='Binghamton', NAME='Greater Binghamton/Edwin A Link field', STATION='USW00014738', AREA='NE')

**Load preprocessed airport-station dataset and GHCND weather data**

In [31]:
airports_stations =  spark.read.option("header",True).csv('./data/airports_stations.csv')
fl = spark.read.option("header",True).csv('./data/flights/{}.csv'.format(YEAR))

In [32]:
fl.head()

Row(FL_DATE='2013-01-01', OP_CARRIER='VX', OP_CARRIER_FL_NUM='108', ORIGIN='LAX', DEST='IAD', CRS_DEP_TIME='700', DEP_TIME='700.0', DEP_DELAY='0.0', TAXI_OUT='8.0', WHEELS_OFF='708.0', WHEELS_ON='1411.0', TAXI_IN='7.0', CRS_ARR_TIME='1445', ARR_TIME='1418.0', ARR_DELAY='-27.0', CANCELLED='0.0', CANCELLATION_CODE=None, DIVERTED='0.0', CRS_ELAPSED_TIME='285.0', ACTUAL_ELAPSED_TIME='258.0', AIR_TIME='243.0', DISTANCE='2288.0', CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, Unnamed: 27=None)

In [33]:
df = fl.select([
                'FL_DATE', \
                'OP_CARRIER', \
                'OP_CARRIER_FL_NUM', \
                'ORIGIN', \
                'DEST', \
                'CRS_DEP_TIME', \
                'DEP_DELAY', \
                'CRS_ARR_TIME', \
                'DIVERTED', \
                'CANCELLED', \
                'ARR_DELAY' \
                ])


In [34]:
df = df.join(areas, fl.ORIGIN == areas.AIRPORT_CODE,'LeftOuter') \
                .drop('AIRPORT_CODE','STATE','CITY','NAME') \
                .withColumnRenamed('AREA', 'ORIG_AREA').drop('ORIGIN') \
                .withColumnRenamed('STATION', 'ORIG_STATION') \
        .join(areas, fl.DEST == areas.AIRPORT_CODE,'LeftOuter') \
                .drop('AIRPORT_CODE','STATE','CITY','NAME') \
                .withColumnRenamed('AREA', 'DEST_AREA').drop('DEST') \
                .withColumnRenamed('STATION', 'DEST_STATION')

In [35]:
df = df.join(ghcnd_all,(df.ORIG_STATION == ghcnd_all.STATION_ID) & (ghcnd_all.DATE == date_format(df.FL_DATE,"yyyyMMdd")),'Inner') \
                .drop('STATION','AIRPORT_CODE','DATE','STATION_ID') \
                .withColumnRenamed('ELEMENT', 'ORIG_WEATHER') \
                .withColumnRenamed('DATA_VALUE', 'ORIG_WEATHER_DATA') \
        .join(ghcnd_all, (df.DEST_STATION == ghcnd_all.STATION_ID) & (ghcnd_all.DATE == date_format(df.FL_DATE,"yyyyMMdd")),'Inner') \
                .drop('STATION','AIRPORT_CODE','DATE','STATION_ID') \
                .withColumnRenamed('ELEMENT', 'DEST_WEATHER') \
                .withColumnRenamed('DATA_VALUE', 'DEST_WEATHER_DATA') \
        .filter(((col('ORIG_WEATHER_DATA').cast('Double') != 0) & (col('ORIG_WEATHER').rlike('(PRCP|SNOW|SNWD|^WD+|^WS+)'))) | \
                ((col('DEST_WEATHER_DATA').cast('Double') != 0) & (col('DEST_WEATHER').rlike('(PRCP|SNOW|SNWD|^WD+|^WS+)')))) \
        .drop('ORIG_WEATHER_DATA','DEST_WEATHER_DATA')

In [36]:
df = df.withColumn('ORIG_WEATHER', when(~df.ORIG_WEATHER.rlike('(PRCP|SNOW|SNWD|^WD+|^WS+)'),"SUNNY") \
                                    .otherwise(df.ORIG_WEATHER)) \
        .withColumn('DEST_WEATHER', when(~df.DEST_WEATHER.rlike('(PRCP|SNOW|SNWD|^WD+|^WS+)'),"SUNNY") \
                                    .otherwise(df.DEST_WEATHER))

In [37]:
df = df.groupBy('FL_DATE', \
                'OP_CARRIER', \
                'OP_CARRIER_FL_NUM', \
                'CRS_DEP_TIME', \
                'DEP_DELAY', \
                'CRS_ARR_TIME', \
                'ARR_DELAY', \
                'DIVERTED', \
                'CANCELLED', \
                'ORIG_STATION', \
                'ORIG_AREA', \
                'DEST_STATION', \
                'DEST_AREA') \
        .agg(collect_list('ORIG_WEATHER').alias('ORIG_WEATHERS'), collect_list('DEST_WEATHER').alias('DEST_WEATHERS'))

In [38]:
df = df.withColumn('ORIG_WEATHER', when((array_contains(df.ORIG_WEATHERS,'SNWD') | array_contains(df.ORIG_WEATHERS,'SNOW') | array_contains(df.ORIG_WEATHERS,'WT18')),'Snow') \
                                    .when((array_contains(df.ORIG_WEATHERS,'PRCP') | array_contains(df.ORIG_WEATHERS,'WT16')),'Rain') \
                                    .when((array_contains(df.ORIG_WEATHERS,'WDF2') | array_contains(df.ORIG_WEATHERS,'WDF5') | array_contains(df.ORIG_WEATHERS,'WT11')),'Wind') \
                                    .otherwise('Sunny')) \
        .withColumn('DEST_WEATHER', when((array_contains(df.DEST_WEATHERS,'SNWD') | array_contains(df.DEST_WEATHERS,'SNOW') | array_contains(df.DEST_WEATHERS,'WT18')),'Snow') \
                                    .when((array_contains(df.DEST_WEATHERS,'PRCP') | array_contains(df.DEST_WEATHERS,'WT16')),'Rain') \
                                    .when((array_contains(df.DEST_WEATHERS,'WDF2') | array_contains(df.DEST_WEATHERS,'WDF5') | array_contains(df.DEST_WEATHERS,'WT11')),'Wind') \
                                    .otherwise('Sunny'))

df = df.drop('ORIG_WEATHERS','DEST_WEATHERS','ORIG_STATION','DEST_STATION')

In [24]:
df.head()

                                                                                

Row(FL_DATE='2013-01-01', OP_CARRIER='9E', OP_CARRIER_FL_NUM='3284', CRS_DEP_TIME='1545', DEP_DELAY='0.0', CRS_ARR_TIME='1740', ARR_DELAY='-23.0', ORIG_STATION='USW00094847', ORIG_AREA='MNE', DEST_STATION='USW00094789', DEST_AREA='NE', ORIG_WEATHER='Snow', DEST_WEATHER='Snow')

In [39]:
df.coalesce(1).write.option('header',True).csv('./output/{}'.format(YEAR))

                                                                                