# Query 4

a weekly "penalty" score for each airport that depends on both the its incoming and outgoing flights. The score adds 0.5 for each incoming flight that is more than 15 minutes late, and 1 for each outgoing flight that is more than 15 minutes late.

### The dataset is composed in this way:

<br>1 Year -> 1994-2008 
<br>2 Month -> 1-12
<br>3 DayofMonth -> 1-31
<br>4 DayOfWeek -> 1 (Monday) - 7 (Sunday)
<br>5 DepTime -> actual departure time (local, hhmm)
<br>6 CRSDepTime -> scheduled departure time (local, hhmm)
<br>7 ArrTime -> actual arrival time (local, hhmm)
<br>8 CRSArrTime -> scheduled arrival time (local, hhmm)
<br>9 UniqueCarrier -> unique carrier code
<br>10 FlightNum -> flight number
<br>11 TailNum -> plane tail number
<br>12 ActualElapsedTime -> in minutes
<br>13 CRSElapsedTime -> in minutes
<br>14 AirTime -> in minutes
<br>15 ArrDelay -> arrival delay, in minutes
<br>16 DepDelay -> departure delay, in minutes
<br>17 Origin -> origin IATA airport code
<br>18 Dest -> destination IATA airport code
<br>19 Distance -> in miles
<br>20 TaxiIn -> taxi in time, in minutes
<br>21 TaxiOut -> taxi out time in minutes
<br>22 Cancelled -> was the flight cancelled?
<br>23 CancellationCode -> reason for cancellation
        (A = carrier, B = weather, C = NAS, D = security)
<br>24 Diverted -> 1 = yes, 0 = no
<br>25 CarrierDelay -> in minutes
<br>26 WeatherDelay -> in minutes
<br>27 NASDelay -> in minutes
<br>28 SecurityDelay -> in minutes
<br>29 LateAircraftDelay -> in minutes

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import to_date, col, lit, concat, weekofyear
from pyspark.sql.types import IntegerType
import pandas as pd
import math

sqlContext = SQLContext(sc)

In [2]:
def loadYearCsv(year):
    filePath = "./Data/"+str(year)+".csv"

#load dataset from csv file and then creates a dataframe 
    df = sqlContext.read.csv(filePath, header='true')

#DataFrame[Year, Month, Day, DayOfWeek, WeatherDelay]
    df = df.select("Year", "Month", "DayofMonth", "DayOfWeek", "Origin", "DepDelay", "Dest", "ArrDelay" ) \
                .withColumnRenamed('DayofMonth', 'Day')
    df = df.orderBy(["Year","Month", "Day"] )

#Add column Date (YYYY-MM-DD)
#DataFrame[Year, Month, Day, DayOfWeek, WeatherDelay, Date]
    df = df.withColumn('Date', 
            to_date(concat(col("Year"), lit('-'), col("Month"), lit('-'),col("Day")))) 

#Add column week
#DataFrame[Year:str, Month:str, Day:str, WeatherDelay, Date:str, Week:int]
    df = df.withColumn("Week", weekofyear(df["Date"]))
    
# DataFrame[Year: string, Month: string, Day: string, DayOfWeek: string, Origin: string, 
#          DepDelay: string, Dest: string, ArrDelay: string, Date: date, Week: int]
    return df

In [3]:
#if the first day of the year is Mon-Thu returns true, otherwise false
def fromMonToThu(df):
    if(int(df.first().DayOfWeek) <= 4):
        return True
    else:
        return False

In [4]:
#if the last day of the year is Thu-Sun returns true, otherwise false
def fromThuToSun(df):
    if(int(df.first().DayOfWeek) >= 4):
        return True
    else:
        return False

In [5]:
# It has been chosen to work on dataframes with weeks from 1 to 52/53, instead of working on 
# year based dataframes. This function adjust the dataframe accordingly

def fixFirstRows(df, year):
    if (fromMonToThu(df)):
        if(df.first().DayOfWeek != '1'):
            #No problem for year = 1994
            dfTmp = loadYearCsv(year-1)
            dfTmp = dfTmp.where((col("Month") == '12') & (col("Week") == 1) & (col("Year") == str(year-1)))
            dfTmp = dfTmp.union(df)
        else:
            return df
    else:
        dfTmp = df.where((col("Month")!='1' )|(col("Week") < 52))
        dfTmp = dfTmp.orderBy("Date")
    return dfTmp



In [6]:
# It has been chosen to work on dataframes with weeks from 1 to 52/53, instead of working on 
# year based dataframes. This function adjust the dataframe accordingly

def fixLastRows(df, year):
    #This is used to check which is the week day of the 31st of december
    dfTmp = df.where(col("Year") == str(year)).orderBy(["Date"], ascending = False )
    if (fromThuToSun(dfTmp)):
        if (dfTmp.first().DayOfWeek == '7'):
            return df
        else:
            # --> week >= 52
            #no problem for year = 2008
            dfTmp = loadYearCsv(year+1)
            dfTmp = dfTmp.where((col("Month") == '1') & (col("Week") >= 52))
            df = df.union(dfTmp)
            
    else: 
        # --> Week = 1
        df = df.where((col("Month") != '12') | (col("Week") != 1) | (col("Year") == str(year-1)))
    return df      

In [7]:
def startEndWeek(df):

    d0 = df.select("Date", "DayOfWeek", "Week")
    ####prepares week's start date and end date
    startDay = d0.filter(col("DayOfWeek") == '1').drop("DayOfWeek").distinct() \
                .withColumnRenamed("Date", "StartDay")
    endDay = d0.filter(col("DayOfWeek") == '7').drop("DayOfWeek").distinct() \
                .withColumnRenamed("Date", "EndDay")
    #DataFrame[Week, StartDay, EndDay]
    d0 = startDay.join(endDay, "Week")
    
    return d0

In [8]:
def checkPenalty(year):    
    
    df = loadYearCsv(year)
    df = fixFirstRows(df, year)
    df = fixLastRows(df, year)
    
    ## Checks done through data exploration: 
    #     - no origin or destination airport with NA values

    # df1 is for departure airports
    #DataFrame[Date: date, Week: int, Origin: string, DepDelay: string]
    df1 = df.select("Week", "Origin", "DepDelay")
    df1 = df1.filter(col("DepDelay") != 'NA').filter(col("DepDelay") > 15)
    df1 = df1.groupBy("Origin", "Week").count() \
            .withColumnRenamed("count", "OutgoingDelaysPerAirport")
    df1 = df1.orderBy("Origin", "Week")

    # df2 is for destination airports
    #DataFrame[Date: date, Week: int, Dest: string, ArrDelay: string]
    df2 = df.select("Week", "Dest", "ArrDelay").withColumnRenamed("Week", "WeekNumber")
    df2 = df2.filter(col("ArrDelay") != 'NA').filter(col("ArrDelay") > 15)
    df2 = df2.groupBy("Dest", "WeekNumber").count() \
            .withColumnRenamed("count", "IncomingDelaysPerAirport")
    df2 = df2.orderBy("Dest", "WeekNumber")

    # computes first and last day of the week
    d0 = startEndWeek(df)

    #We build the condition that has to be satisfied for the join
    cond = [df1.Origin == df2.Dest, df1.Week == df2.WeekNumber]
    dfFinale = df1.join(df2, cond)
    dfFinale = dfFinale.drop("WeekNumber", "Origin").withColumnRenamed("Dest", "Airport")
    
    #Adds first and last day of the week
    dfFinale = dfFinale.join(d0, "Week")
    dfFinale = dfFinale.select("Airport", "Week", "StartDay", "EndDay",
                               "IncomingDelaysPerAirport", "OutgoingDelaysPerAirport")

    #Computes the penalty
    dfFinale = dfFinale.withColumn("Penalty", 0.5*(dfFinale.IncomingDelaysPerAirport)+(dfFinale.OutgoingDelaysPerAirport))

    #Creates column "Year-Week" (YYYY-ww)
    dfFinale = dfFinale.withColumn("YearWeek", concat(lit(year), lit('-'), col("Week")))
    dfFinale = dfFinale.select("YearWeek", "Airport", "StartDay", "EndDay", "IncomingDelaysPerAirport", "OutgoingDelaysPerAirport", "Penalty")
    dfFinale = dfFinale.orderBy("YearWeek", "Airport")

    #DataFrame[YearWeek, Airport, StartDay, EndDay, IncomingDelaysPerAirport, OutgoingDelaysPerAirport, Penalty]
    return dfFinale


In [9]:
finalPath = "./Results/Query4/query4.csv"
years = list(range(1994, 2009,1))

with open(finalPath, 'a') as f:
    for y in years:
        tmpDf = checkPenalty(y)
        if (y==1994):
            tmpDf.toPandas().to_csv(f, header=True)
        else:
            tmpDf.toPandas().to_csv(f, header=False)