# Author: Petros Polyhchronis

## Dataset overivew


| Column ID   | Column Name     | Description |
| ----------- | -----------     | ----------- |
|     1       | Year            | =2007       |
|     2       | Month       | 1-31        |
|     3       | DayofMonth  | 1-12        |
|     4       | DayOfWeek   | (Monday) - 7 (Sunday)        |
|     5       | DepTime     | actual departure time (local, hhmm)        |
|     6       | CRSDepTime  | scheduled departure time (local, hhmm)        |
|     7       | ArrTime     | actual arrival time (local, hhmm)        |
|     8       | CRSArrTime  | scheduled arrival time (local, hhmm)        |
|     9       | UniqueCarrier   | unique carrier code       |
|     10      | FlightNum   | flight number        |
|     11      | TailNum   | plane tail number        |
|     12      | ActualElapsedTime   | in minutes        |
|     13      | CRSElapsedTime   | in minutes        |
|     14      | AirTime        | in minutes        |
|     15      | ArrDelay     | arrival delay, in minutes        |
|     16      | DepDelay departure   | delay, in minutes        |
|     17      | Origin   |  origin  IATA airport code        |
|     18      | Dest   | destination IATA airport code        |
|     19      | Distance    | in miles        |
|     20      | TaxiIn   |taxi in time, in minutes        |
|     21      | TaxiOut   | taxi out time in minutes        |
|     22      | Cancelled    | was the flight cancelled?       |
|     23      | CancellationCode   | reason for cancellation (A = carrier, B = weather, C =
NAS, D = security)        |
|     24      | Diverted    | 1 = yes, 0 = no        |
|     25      | CarrierDelay   | in minutes        |
|     26      | WeatherDelay    | in minutes        |
|     27      | NASDelay    | in minutes        |
|     28      | SecurityDelay   |in minutes        |
|     29      | LateAircraftDelay   |in minutes        |


# Setup

In [1]:
# Init pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Init sparksql -- Only used to format the output nicely!
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

rows = sc.textFile("/air_transit_2007.csv")
data = rows.map(lambda line: line.split(","))
# data.cache()



In [2]:
import statistics
from pyspark.sql.functions import col
from operator import add

# Sample Query

In [3]:
sample_results = data.filter(lambda r: (r[16]=='ROC'and r[1]=='3' and r[2]=='12')) \
    .map(lambda r: (r[5] , r[17])) \
    .collect()

#format nicely!
sqlContext.createDataFrame(sample_results, ['Sched. Departure', 'Destination']).show(n=100)

+----------------+-----------+
|Sched. Departure|Destination|
+----------------+-----------+
|            1455|        CLE|
|            1840|        CLE|
|             659|        CLE|
|            1750|        EWR|
|             630|        EWR|
|            1650|        EWR|
|            1255|        EWR|
|            1025|        EWR|
|            1034|        IAD|
|            1607|        IAD|
|            1006|        ORD|
|             600|        ORD|
|            2000|        ORD|
|             628|        JFK|
|             638|        CVG|
|            1100|        CVG|
|            1141|        JFK|
|            1530|        JFK|
|            1700|        JFK|
|            1827|        CVG|
|             720|        ORD|
|            1725|        ORD|
|             800|        CLT|
|             720|        PHL|
|             600|        ATL|
|            1521|        ATL|
|            1730|        ATL|
|            1236|        ATL|
|             935|        ATL|
|       

___

#### Some  additional tools to be used later

In [4]:
from operator import add # to perform addition within reducyByKey()
from pyspark.sql.functions import col # to preview selected rows fromd dataframes
import statistics

___

## Initial Data Inspection and Filtering

In what follows, we lay out a few tools created to facilitate the data inspection and filtering process. In particular, we are finding all non-arithmetic entries in numerical columns using 2 custom functions. The first helper function is `distinct(x)` and the second is called `find_non_arithmetics(x)`. The first creates a string of all distict values for each column and the second one finds all non-arithmetic characters within that string. Both functions are used within a map-reduce context, where each column is a key. These results will help us irrelevant/NA values during our calculations throughout the report.

Numerical columns: All columns except *UniqueCarrier*, *Origin*, *Dest*, *CancellationCode*

In [5]:
def distinct(x,y):
    if y in x:
        return x
    else:
        return x + y

In [6]:
def find_non_arithmetics(x):
    import re
    pattern= r'\D+'
    words = re.findall(pattern,x[1])
    words = list(set(words))
    return (x[0], words)

In [7]:
filters = data.flatMap(lambda x:(
                               (('Month')            , x[1]),
                               (('DayofMonth')       , x[2]),
                               (('DayOfWeek')        , x[3]),
                               (('DepTime')          , x[4]),
                               (('CRSDepTime')       , x[5]),
                               (('ArrTime')          , x[6]),
                               (('CRSArrTime')       , x[7]),
                               (('FlightNum')        , x[9]),
                               (('TailNum')          , x[10]),
                               (('ActualElapsedTime'), x[11]),
                               (('CRSElapsedTime')   , x[12]),
                               (('AirTime')          , x[13]),
                               (('ArrDelay')         , x[14]),
                               (('DepDelay')         , x[15]),
                               (('Distance')         , x[18]),
                               (('TaxiIn')           , x[19]),
                               (('TaxiOut')          , x[20]),
                               (('Cancelled')        , x[21]),
                               (('Diverted')         , x[23]),
                               (('CarrierDelay')     , x[24]),
                               (('WeatherDelay')     , x[25]),
                               (('NASDelay')         , x[26]),
                               (('SecurityDelay')    , x[27]),
                               (('LateAircraftDelay'), x[28])))\
                      .reduceByKey(distinct)\
                      .map(find_non_arithmetics)\
                      .collect()

In [8]:
df_filters = sqlContext.createDataFrame(filters, ['Column Name', 'Non-arithmetic Values'])
df_filters.show(30)

+-----------------+---------------------+
|      Column Name|Non-arithmetic Values|
+-----------------+---------------------+
|     WeatherDelay|       [WeatherDelay]|
|   CRSElapsedTime| [-, NA, CRSElapse...|
|        FlightNum|          [FlightNum]|
|ActualElapsedTime| [NA, ActualElapse...|
|          TaxiOut|            [TaxiOut]|
|          ArrTime|        [NA, ArrTime]|
|         ArrDelay| [NA-, NA, ArrDela...|
|         NASDelay|           [NASDelay]|
|    SecurityDelay|      [SecurityDelay]|
|       CRSDepTime|         [CRSDepTime]|
|       CRSArrTime|         [CRSArrTime]|
|         Distance|           [Distance]|
|           TaxiIn|             [TaxiIn]|
|            Month|              [Month]|
|LateAircraftDelay|  [LateAircraftDelay]|
|         Diverted|           [Diverted]|
|         DepDelay| [-, NA-, NA, DepD...|
|        Cancelled|          [Cancelled]|
|        DayOfWeek|          [DayOfWeek]|
|          AirTime|        [AirTime, NA]|
|       DayofMonth|         [Dayof

For each exercise, we will be selecting and printing the revelant columns from this dataset to inspect whether there are 'NA' values or negative values, indicated by the existance of hyphens in the non-arithmetic values list.

In addition to that, we observe that each column entails its column name as a non-arithmetic value. This indicates that there is probably a row that contains the column names as values. We can verify this as seen below:

In [9]:
print(data.filter(lambda x: x[1]== 'Month').collect())

[['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']]


There is one line in the dataset containing the names of the columns!

## General Methodology
We follow the general concept of Map & Reduce as discussed in class. The two main players are Map and Recude functions. The mapping task reduces to establishhing what the (key, value) pairs for each row should be. By and large, the key is gonna be the attribute that we want to aggregate our results by, and the value is gonna a tuple of attributes, which in the interest of computational efficiency should contain as little information as possible in order to reach our analysis goals. Next, the Reduce step is comprised of finding what the aggregation function should be based on our analytical objectives (e.g., count, sum, etc.). These two actions form the cornerstone approach for all questions. On top of that, and as mentioned in the assignment description, filtering the data such that only the relevant rows are included in the processing is also undertaken as a first step to improve efficiency and reduce computational time. Finally, we format our results in (sorted) dataframes to ensure readibility and ease of use.

## Q1 : Compute the total number of records.

There are various ways to count the number of records. We can use .count() directly, but I prefer to do it manually using the map and reduce functions. The steps are as follows:
 1. Filter out the row containing the column names using `.filter()`.
 2. Map each row to the value of 1 (as little data as possible) using `.map()`
 3. Use the `.reduce()` function to sum up the 1s. 
 
*Note:* We cannot use `.reduceByKey()` here since each row is mapped to a single number (1), which is the key as well. Thus if we use `.reduceByKey()`, the function will automatically extract the first element from the tuple and nothing will be left to add. When using the `.reduce()` function we can define any element of the full tuple and thus it works.

In [10]:
number_of_records = data.filter(lambda x: x[1]!='Month')\
                        .map(lambda r: 1) \
                        .reduce(lambda x, y: x+y)

print(number_of_records)

7453215


**Q1 ANSWER: There are 7,453,215 recorded flights in the document.**

Alternatively...

In [11]:
number_of_records = data.filter(lambda x: x[1]!='Month')\
                        .count()

print(number_of_records)

7453215


## Q2 : Find total number of operated flights per month, sorted by the month.

In this exercise we make use of the columns:
- *Month* (column 2)
- *Cancelled* (column 22)

We inspect what non-arithmetic values exist in the columns to be used.

In [12]:
df_filters.filter(col('Column Name').isin(['Month', 'Cancelled'])).show()

+-----------+---------------------+
|Column Name|Non-arithmetic Values|
+-----------+---------------------+
|      Month|              [Month]|
|  Cancelled|          [Cancelled]|
+-----------+---------------------+



We see that:
- there are no negative numbers (no hyphens)
- no NA entries for the columns *Month* and *Cancelled*

#### Assumptions and Code Steps:

*Assumptions*
- Cancelled flights are not considered as operated since they never took place.

*Code*

1. **Filter**: Discard all cancelled flights using `.filter()`. Condition: `x[21] =='0'`.
2. **Map**: We map over tuples and create key-value pairs (Key= Month, value=1) using `.map()`. Our goal being to simply count the realized flights, we discard all other attributes to improve efficiency (less data to work with). Additionally, we have converted the string values to integer ones using `int()`.
 3. **Reduce**: We reduce the results by aggregating (here: summing) the 1s for each separate key (=month) using `reduceByKey()` and the `add()` function (libary:operator). Alternatively, we can also use `lambda x,y: x+y` for the summation
 4.  **Sorting**: We sort by key(=month) in ascending order (default value) using `.sortByKey()`

In [13]:
flights_by_month = data.filter(lambda x: x[21] =='0') \
                       .map(lambda x: (int(x[1]), 1)) \
                       .reduceByKey(add) \
                       .sortByKey()

The results are then summarised in the following table.

In [14]:
df2 = sqlContext.createDataFrame(flights_by_month, ['Month', 'Number of Flights']).show(12)

+-----+-----------------+
|Month|Number of Flights|
+-----+-----------------+
|    1|           605782|
|    2|           540139|
|    3|           622332|
|    4|           603510|
|    5|           624768|
|    6|           612037|
|    7|           635054|
|    8|           640984|
|    9|           593680|
|   10|           622665|
|   11|           598870|
|   12|           592646|
+-----+-----------------+



## Q3 : Find the plane with the highest number of flights. Each plane has a unique TailNum.

In this exercise we make use of the columns:
- *TailNum* (column 11)
- *Cancelled* (column 22)


We only inspect the new columns. i.e. *TailNum*

In [15]:
tailnumbers = df_filters.filter(col('Column Name').isin(['TailNum'])).select('Non-arithmetic Values').collect()
tailnumbers

[Row(Non-arithmetic Values=['EEAAN', 'CTAAN', 'WDAAN', 'BPAAN', 'DEAAN', 'WNAAN', 'FFAAN', 'JN', 'YBAAN', 'EMAAN', 'BCN', 'TSAAN', 'UAN', 'SN', 'LRN', 'RN', 'WMAAN', 'YAAAN', 'TKAAN', 'CCAAN', 'AKAAN', 'NWN', 'CN', 'AUN', 'YHAAN', 'XN', 'FRN', 'TDAAN', 'BYAAN', 'ACAAN', 'EUAAN', 'BAAAN', 'KN', 'FAAAN', 'WAAAN', 'YN', 'CLN', 'TGAAN', 'E', 'EVN', 'CYAAN', 'YEAAN', 'DYAAN', 'ADAAN', 'FBAAN', 'ANAAN', 'MJN', 'LEN', 'XMAAN', 'QCN', 'AUAAN', 'DKAAN', 'WVAAN', 'HAN', 'MRN', 'YJAAN', 'NW', 'QXN', 'AYN', 'TN', 'BGN', 'AFAAN', 'FTAAN', 'WXAAN', 'TWN', 'LLN', 'CSAAN', 'YFAAN', 'SWN', 'NRN', 'DGAAN', 'TBN', 'EGAAN', 'FEAAN', 'EJAAN', 'TMAAN', 'JCN', 'DPAAN', 'TFAAN', 'MBN', 'DHAAN', 'DNN', 'GBN', 'WWAAN', 'WYAAN', 'ALNHZOALN', 'XLAAN', 'DFAAN', 'BFAAN', 'FGAAN', 'XXAAN', 'BJAAN', 'EXAAN', 'FRAAN', 'AXN', 'WEAAN', 'PSN', 'CBAAN', 'NCN', 'JSN', 'DAN', 'ALN', 'XGAAN', 'BNAAN', 'XEAAN', 'JHN', 'ASAAN', 'SAN', 'VN', 'CA', 'PCN', 'TEAAN', 'BVAAN', 'JAN', 'DMAAN', 'ENAAN', 'TSN', 'DXAAN', 'MSN', 'DRAAN',

In [16]:
'NA' in tailnumbers

False

We see that:
- TailNums are strings and not numbers.
- We do not have any 'NA' tailnumber

#### Assumptions and Code Steps:

*Assumptions*
- Each plane can be uniquely identified via its tail number.
- Cancelled flights can be either filtered out or preserved in the dataset according to the application context. In this case, we chose to dicard them.

 *Code*
 1. **Filter**: We discard all cancelled flights using `.filter()`. Condition: `x[21] =='0'`.
 2. **Map**: We map over tuples and create key-value pairs (Key= TailNum, value=1) using `.map()`. Our goal being to simply count the realized flights for each plane, we discard all other attributes to improve efficiency (less data to work with).
 3. **Reduce**: We reduce the results by aggregating (here: adding) the 1s for each separate key using `reduceByKey()` and `add`.
 4. **Sorting**: We use `sortBy()` to sort by the Number of Flights in descending order.

In [17]:
planes_by_flights = data.filter(lambda x: x[21]=='0') \
                   .map(lambda x: (x[10],1)) \
                   .reduceByKey(add)\
                   .sortBy(lambda x: -x[1])

The results are then summarised in the following table.

In [18]:
df3 = sqlContext.createDataFrame(planes_by_flights, ['TailNum', 'Number of Flights']).show(100)

+-------+-----------------+
|TailNum|Number of Flights|
+-------+-----------------+
| N655BR|             4457|
| N479HA|             4359|
| N651BR|             4324|
| N478HA|             4316|
| N654BR|             4252|
| N480HA|             4225|
| N485HA|             4203|
| N484HA|             4126|
| N693BR|             4088|
| N481HA|             4045|
| N487HA|             4038|
| N477HA|             3958|
| N810AL|             3939|
| N837AL|             3881|
| N475HA|             3854|
| N486HA|             3820|
| N476HA|             3685|
| N836AL|             3680|
| N808AL|             3676|
| N824AL|             3672|
| N646BR|             3502|
| N828AL|             3493|
| N295SW|             3466|
| N226SW|             3462|
| N835AL|             3440|
| N292SW|             3437|
| N220SW|             3435|
| N295UX|             3426|
| N234SW|             3365|
| N294SW|             3321|
| N250YV|             3303|
| N218SW|             3285|
| N393SW|           

## Q4 : Compute the total flight time of each airplane, sorted by flight time in descending order.

Each airplane is uniquely identified via its Tail number (*TailNum*) as explained in Q3. Moreover, the flight time of each flight is captured in the attribute *ActualElapsedTime*, as indicated in the assignment clarification on Insendi.

In this exercise we make use of the columns:
- *TailNum* (column 11)
- *Cancelled* (column 22)
- *ActualElapsedTime* (column 12)


We only inspect the new columns. i.e. *ActualElapsedTime*

In [19]:
AET = df_filters.filter(col('Column Name').isin(['ActualElapsedTime'])).select('Non-arithmetic Values').collect()
AET

[Row(Non-arithmetic Values=['NA', 'ActualElapsedTime'])]

We see that:
- *ActualElapsedTime* containts NA values.
- *ActualElapsedTime* does not have any negative valus (no hyphens)

#### Assumptions and Code Steps:

*Assumptions*
- Each plane can be uniquely identified via its tail number.
- Cancelled flights should not be counted as flight time since the planes have not executed them.

 *Code*
 1. **Filter**: We discard all cancelled flights using `.filter()` and the condition: `x[21] =='0'`. We exclude NA values using `x[11]!= 'NA'`
 2. **Map**: We map over tuples and create key-value pairs (Key= TailNum, value= float(*ActualElapsedTime*)) using `.map()`. No other attribute is required for the calculations.
 3. **Reduce**: We reduce the results by aggregating (here: adding) the *ActualElapsedTime* values for each separate key using `reduceByKey()` and `add`.
 4. **Sorting**: We use `sortBy()` to sort by the total flight time in descending order.

In [20]:
flight_time_by_plane = data.filter(lambda x: x[21]=='0' and x[11]!='NA') \
                           .map(lambda x: (x[10], float(x[11]))) \
                           .reduceByKey(add) \
                           .sortBy(lambda x:-x[1])

In [21]:
df4 = sqlContext.createDataFrame(flight_time_by_plane, ['TailNum', 'Flight time']).show(100)

+-------+-----------+
|TailNum|Flight time|
+-------+-----------+
| N556AS|   592470.0|
| N637JB|   296443.0|
| N636JB|   296091.0|
| N646JB|   293882.0|
| N607JB|   293853.0|
| N624JB|   292416.0|
| N599JB|   291035.0|
| N640JB|   290752.0|
| N590NW|   290597.0|
| N625JB|   290284.0|
| N639JB|   289787.0|
| N557UA|   289540.0|
| N649JB|   289044.0|
| N618JB|   288891.0|
| N645JB|   288835.0|
| N523JB|   287243.0|
| N524JB|   286692.0|
| N634JB|   286676.0|
| N630JB|   286470.0|
| N633JB|   286107.0|
| N621JB|   285990.0|
| N638JB|   285929.0|
| N579JB|   285598.0|
| N632JB|   285468.0|
| N597UA|   285104.0|
| N641JB|   284748.0|
| N520JB|   284729.0|
| N629JB|   284394.0|
| N623JB|   284146.0|
| N603JB|   283655.0|
| N613JB|   283401.0|
| N585JB|   283278.0|
| N608JB|   283055.0|
| N590UA|   283007.0|
| N569JB|   282939.0|
| N558AS|   281977.0|
| N585NW|   281768.0|
| N648JB|   281498.0|
| N587JB|   281450.0|
| N582NW|   281266.0|
| N505UA|   281144.0|
| N592JB|   281092.0|
| N589JB| 

*Important Note*  
Inspecting the table, we easily observe that the total flight time of the first airplane (\~60K minutes) is double that of the second (\~30K minutes). On top of that, simply by googling, we realize that one year has less than 60K minutes which hints at the fact that some sort of mistake has been made when tabling the data for the plane N556AS. Thus, in the absence of further information, we should not regard that result for the first plane as accurate, until more information comes to light. The remaining 99 planes previewed on the table have much more similar flight times, indicatung that the results are probably accurate.

## Q5 : Find the busiest airport (in terms of number of departures + arrivals of all operated flights) for each month.

An airport is uniquely identified through its IATA code airport code. Each record in the data set entails information about the origin airport (column:Origin) and destination airport (column:Dest) of the flight. As per instruction, we are consdering only the operated flights, i.e., we will filter out the cancelled ones.

In this exercise we make use of the columns:
- *Origin* (column 17)
- *Dest* (column 18)
- *Cancelled* (column 22)

The new columns are both strings with coded information, we cannot easily check for mistakes.

#### Assumptions and Code Steps:

*Assumptions*
- Each airport can be uniquely identified through its IATA code.
- Cancelled flights should not be counted as flight time since the planes have not executed them.

 *Code*
 1. **Filter**: We discard all cancelled flights using `.filter()` and the condition: `x[21] =='0'`.
 2. **Map**: We use `.flatMap()` to create key-value pairs (Key= (Month, Airport), Value= 1) for each of the destination and origin airport values contained in the record.
 3. **Reduce**: We reduce the results by aggregating (here: adding) the 1s for separate keys using `reduceByKey()` and `add`. Each reducer handles one Month-Airport combination.

In [22]:
busiest_airports = data.filter(lambda x: x[21]=='0')\
                       .flatMap(lambda x: (((int(x[1]), x[16]), 1),(((int(x[1]), x[17]), 1))))\
                       .reduceByKey(add)

4. We define a custom funtion `find_max()` to select the airport with the highest amount of incoming/outgoing flights for each month. The function:
    - takes two tuples as input (typical `reduce` syntax) of the form (Airline, Number of fights)
    - finds the maximum number of flights out of the two, and returns the Airline and the max Number of flights

In [23]:
def find_max(x,y):
    b = max(x[1],y[1])
    if b == x[1]:
        a = x[0]
    else:
        a = y[0]
    return (a,b)

5. We remap and use our custom function within the `.reduceByKey()`function to find the airport with the highest traffic for each month.
6. We sort by key(=month) using `.sortByKey()`.
7. We remap the tuples for easier printing of the dataframe

In [24]:
busiest_airports_ordered = busiest_airports.map(lambda x: (x[0][0], (x[0][1],x[1])))\
                                           .reduceByKey(find_max)\
                                           .sortByKey()\
                                           .map(lambda x: (x[0], x[1][0], x[1][1]))

In [25]:
df5 = sqlContext.createDataFrame(busiest_airports_ordered, ['Month','Airport','Number of Flights'])

The results can be seen below. Atlanta's airport exhibits constantly the highest traffic throughout the year.

In [26]:
df5.show(12)

+-----+-------+-----------------+
|Month|Airport|Number of Flights|
+-----+-------+-----------------+
|    1|    ATL|            62955|
|    2|    ATL|            57654|
|    3|    ATL|            66750|
|    4|    ATL|            64953|
|    5|    ATL|            68000|
|    6|    ATL|            70168|
|    7|    ATL|            72327|
|    8|    ATL|            72850|
|    9|    ATL|            67732|
|   10|    ATL|            73126|
|   11|    ATL|            68883|
|   12|    ATL|            67674|
+-----+-------+-----------------+



## Q6 : Find the airline with highest average delay of each type in March 2007.

*Note: do not write separate code for each error type. You should compute a single RDD where each row contains the delay type, the airline that is worst regarding that delay type, and its average delay of that type in minutes*

In this exercise we make use of the columns:


- *ArrDelay* (column15)
- *DepDelay* (column16)
- *CarrierDelay* (column(25)
- *WeatherDelay* (column26)
- *NASDelay* (column 27)
- *SecurityDelay* (column28)
- *LateAircraftDelay* (column 29)
- *Cancelled* (column 22)

Let's inspect the delay values:

In [27]:
df_filters.filter(col('Column Name').isin(['ArrDelay', 'DepDelay', 'CarrierDelay', 'WeatherDelay',
                                           'NASDelay', 'SecurityDelay', 'LateAircraftDelay'])).select('Non-arithmetic Values').collect()

[Row(Non-arithmetic Values=['WeatherDelay']),
 Row(Non-arithmetic Values=['NA-', 'NA', 'ArrDelay', '-']),
 Row(Non-arithmetic Values=['NASDelay']),
 Row(Non-arithmetic Values=['SecurityDelay']),
 Row(Non-arithmetic Values=['LateAircraftDelay']),
 Row(Non-arithmetic Values=['-', 'NA-', 'NA', 'DepDelay']),
 Row(Non-arithmetic Values=['CarrierDelay'])]

We see that:
- *ArrDelay* and *DepDelay* contain both negative values (hyphens) as well as NAs. They are the only ones that need filtering out.
- The remaining delay columns have neither negative values nor NAs

#### Assumptions and Code Steps:

*Assumptions*
- We filter out negative delays as they can skew the results. Flying earlier might be convenient, but passangers are usually concerned with the average delays when there are indeed delays. We thus find it more insightful to filter out negative delays.

 *Code*
 1. **Filter**: We use `.filter()` to:
     - Discard all cancelled flights  `x[21] =='0'`.
     - Select only data from March `Month == '3'`.
     - Filter out NAs (e.g., `ArrDelay!='NA'`) and  negative values (e.g., `float(ArrDelay)>=0`).  
  
  
   
2. **Map**: We use `.flatMap()` to create separate key-value pairs of the form
    - (Key= (Airline, DelayType), Value = (Delay-value, 1)).
 3. **Reduce**: We reduce the results by aggregating (here: adding) the values for each  delay type and counting the total number of flights for each Airline-Delay type combination (each reducer handles data for one such combination). We aggregate using `reduceByKey()` and using a lambda expression.

In [28]:
airline_delays = data.filter(lambda x: x[1]=='3' and x[21]=='0'
                             and x[14]!='NA' and float(x[14])>=0
                             and x[15]!='NA' and float(x[15])>=0)\
\
                     .flatMap(lambda x: (((x[8],"ArrDelay"), (x[14],1)), 
                                     ((x[8], "DepDelay"), (x[15],1)), 
                                     ((x[8], "CarrierDelay"), (x[24],1)), 
                                     ((x[8], "WeatherDelay"), (x[25],1)), 
                                     ((x[8], "NASDelay"), (x[26],1)), 
                                     ((x[8], "SecurityDelay"), (x[27],1)),
                                     ((x[8], "LateAircraftDelay"), (x[28],1))))\
\
                     .reduceByKey(lambda x,y : (float(x[0]) + float(y[0]), x[1]+y[1]))

4. We remap the tuples such that they take the assume the form (Key:(Airline, DelayType), Value:(Average delay)) using `.map()`

In [29]:
airline_delays2 = airline_delays.map(lambda x: ((x[0][0],x[0][1]), x[1][0]/x[1][1]))

5. We reduce the results by finding the maximum average delay for each delay type using `.reduceByKey()` with our custom function `find_max()` as argument

In [30]:
airline_delays3 = airline_delays2.map(lambda x: (x[0][1], (x[0][0],round(x[1],3))))\
                                 .reduceByKey(find_max)

6. We remap for easier printing: the tuple takes the form: (delay type, airline, average delay)

In [31]:
airline_delays4 = airline_delays3.map(lambda x: (x[0], x[1][0], x[1][1]))

In [32]:
df6 = sqlContext.createDataFrame(airline_delays4, ['DelayType', 'Airlines', 'Average Delay']).show(7)

+-----------------+--------+-------------+
|        DelayType|Airlines|Average Delay|
+-----------------+--------+-------------+
|     WeatherDelay|      OH|        6.014|
|         ArrDelay|      XE|       52.735|
|         NASDelay|      CO|       16.745|
|    SecurityDelay|      AQ|        0.333|
|LateAircraftDelay|      B6|        22.13|
|         DepDelay|      XE|       49.326|
|     CarrierDelay|      EV|       20.649|
+-----------------+--------+-------------+



## Q7 : Compute median, mean, and mode of columns 12-16, 19-21 and 25-29 for the flights in the third week of 2007.
*Note: Exclude the non-numeric values*

In this exercise we make use of the columns:

- *ActualElapsedTime* (column 12)
- *CRSElapsedTime*  (column 13)
- *AirTime* (column 14)
- *Distance* (column 19)
- *TaxiIn* (column 20)
- *TaxiOut* (column 21)
-  Delay types (columns 15-16, 25-29)

The new columns will be inspected below:

In [33]:
df_filters.filter(col('Column Name').isin(['CRSElapsedTime', 'AirTime', 'Distance', 'TaxiIn',
                                           'TaxiOut'])).select('Non-arithmetic Values').collect()

[Row(Non-arithmetic Values=['-', 'NA', 'CRSElapsedTime']),
 Row(Non-arithmetic Values=['TaxiOut']),
 Row(Non-arithmetic Values=['Distance']),
 Row(Non-arithmetic Values=['TaxiIn']),
 Row(Non-arithmetic Values=['AirTime', 'NA'])]

We see that:
- *CRSElapsedTime* contains both negative values (hyphens) as well as NAs. These need filtering out.
- *TaxiOut*, *TaxiIn* and *Distance* have neither negative values nor NAs.
- *AirTime* has NA values that need filtering.

#### Assumptions and Code Steps:

*Assumptions*
- No specific assumptions

 *Code*
 1. **Filter**: We use `.filter()` to:
     - Select only data from January: `Month == '1'`.
     - Select only the third week from January: `int(Day) >=15 and int(Day)<=21`
     - Filter out NAs (e.g., `ArrDelay!='NA'`) and  negative values (e.g., `float(ArrDelay)>=0`).  

In [34]:
dataq7 = data.filter(lambda x: x[1]=='1' and int(x[2])>=15 and int(x[2])<=21)

In [35]:
dataq7 = dataq7.filter(lambda x: 
                 x[11]!= 'NA' and                       #ActualElapsedTime
                 x[12]!= 'NA' and float(x[12])>=0 and   #CRSElapsedTime
                 x[13]!= 'NA' and                       #Airtime  
                 x[14]!= 'NA' and float(x[14])>=0 and   #ArrDelay
                 x[15]!= 'NA' and float(x[15])>=0)      #DepDelay

2. **Create a custom function `calculate_statistics`**: We create a function that:
     - takes a list-type object of numbers as input:
     - calculates the median, mean and mode of those numbers
     - outputs the median, mean and mode as a tuple (median, mean, mode)

In [36]:
def calculate_statistics(list_of_numbers):
    import statistics
    delay_median = statistics.median(list_of_numbers)
    delay_mean = statistics.mean(list_of_numbers)
    delay_mode = statistics.mode(list_of_numbers)
    return (delay_median, delay_mean, delay_mode)

3. **Map**: We use `.flatMap()` to:
     - create a new row for each of the columns we would like to compute statistics for
4. **Aggregate/Reduce**: We use `.groupByKey()` to get all values for each column in a list-type object
5. **Map the values to our custom function**: We employ `.mapValues()` to
    - send the list of values of each column to a reducer
    - the reducer uses our custom function `calculate_statistics()` to return a tuple with the median, mean, mode for each column
6. **Remap the values for printing purposes**: We use `.map()` to rearrange the terms - makes it easier to preview as a dataframe.

In [37]:
column_statistics  = dataq7.flatMap(lambda x:(
                               ('ActualElapsedTime', float(x[11])),
                               ('CRSElapsedTime'   , float(x[12])),
                               ('AirTime'          , float(x[13])),
                               ('ArrDelay'         , float(x[14])),
                               ('DepDelay'         , float(x[15])),
                               ('Distance'         , float(x[18])),
                               ('TaxiIn'           , float(x[19])),
                               ('TaxiOut'          , float(x[20])),
                               ('CarrierDelay'     , float(x[24])),
                               ('WeatherDelay'     , float(x[25])),
                               ('NASDelay'         , float(x[26])),
                               ('SecurityDelay'    , float(x[27])),
                               ('LateAircraftDelay', float(x[28]))))\
            .groupByKey()\
            .mapValues(calculate_statistics)\
            .map(lambda x: (x[0], x[1][0], round(x[1][1],3), x[1][2]))

The resulting table can be seen below.

In [38]:
df7 = sqlContext.createDataFrame(column_statistics, ['Column Name', 'Median', 'Mean', 'Mode']).show(20)

+-----------------+------+-------+-----+
|      Column Name|Median|   Mean| Mode|
+-----------------+------+-------+-----+
|     WeatherDelay|   0.0|  3.979|  0.0|
|   CRSElapsedTime| 109.0|125.735| 75.0|
|ActualElapsedTime| 114.0|131.264| 80.0|
|          TaxiOut|  16.0| 21.318| 10.0|
|         ArrDelay|  26.0| 43.785|  0.0|
|         NASDelay|   0.0| 10.966|  0.0|
|    SecurityDelay|   0.0|  0.028|  0.0|
|         Distance| 556.0|697.689|370.0|
|           TaxiIn|   6.0|  7.527|  4.0|
|LateAircraftDelay|   0.0|  16.99|  0.0|
|         DepDelay|  21.0| 38.255|  0.0|
|          AirTime|  85.0| 102.42| 59.0|
|     CarrierDelay|   0.0|  9.656|  0.0|
+-----------------+------+-------+-----+



## Q8 :Assume that a passenger wants to travel from Philadelphia International Airport (airport code:PHL) to Los Angeles International Airport (airport code: LAX), and then go back to Philadelphia (PHL). He departs PHL not earlier than 5:59 am (scheduled time), stays at least 3:01 hours in Los Angeles and then arrives at PHL not later than 11pm. Based on the "scheduled" times, find which carrier has the highest number of flights with these constraints. Limit your analysis to February 2007 and use scheduled times.

We will use columns:
- *Month*  (column 2)
- *Origin* (column 17)
- *Dest*   (column 18)
- *CRSDepTime* (column 6)
- *CRSArrTime* (column 8)
- *UniqueCarrier* (column 9)

We have inspected all columns in previous steps.

#### Assumptions and Code Steps:

*Assumptions*
-  We do not take indirect flights into account
-  We will base our analysis on round-trips; one flight is counted when a company offers a full trip i.e., both a go-to and return flight satisfying the given constraints.
-  We do take cancelled flights into account. Flights get cancelled for unsystematic reasons. If we want to use the data from Februrary 2007 as an indication for the number of  round-trips available in the next year, factoring in the flights that were scheduled but got randomly cancelled would give us a better overview of the situation.

 *Code*
 1. **Filter**: We use `.filter()` to:
     - Select only data from February: `Month == '2'`.
     - Create two RDDs where:
         - The 1st RDD(PHL_LAX_departures) contains all flights from PHL TO LAX that depart after 05:59 am AND where the departure time is less than the arrival time - filters out overnight flights.
         - The 2rd RDD(LAX_PHL_departures) contains all flights from LAX to PHL that arrive in Philadelphia before 23.00 AND where the departure time is less than the arrival time - filters out overnight flights.
2. **Map** We use `.map()` to:
    - create (key, value) pairs of the form (Key = (Carrier, Day of the Month), Value = (Departure Time, Arrival Time))

In [39]:
dataq8 = data.filter(lambda x: x[1]=='2')

In [40]:
PHL_LAX_departures =  dataq8.filter(lambda x: (x[16]=='PHL' and x[17]=='LAX' and int(x[5])>= 559 and (int(x[5]) <= int(x[7]))))\
                            .map(lambda x: ((x[8], int(x[2])), (int(x[5]),int(x[7]))))

In [41]:
LAX_PHL_departures =  dataq8.filter(lambda x: (x[16]=='LAX' and x[17]=='PHL' and int(x[7])<=2300 and (int(x[5]) <= int(x[7]))))\
                            .map(lambda x: ((x[8], int(x[2])), (int(x[5]), int(x[7]))))

3. **Merge Datasets and Filter** : We use `.join()` to merge the datasets that:
    - are joined according to keys, i.e., each row is a combination of go-to and return flights from the same airline and on the same day.
    - satisfy the additional condition that the arrival time in LAX and the departure time from LAX have at least a difference of 3.01 hours - (we use `.filter()`)
    
The resulting dataset contains all relevant combinations of flights for each day in February and for each airline.

4. **We remap the values for nice printing**: We use `.map()` to format each tuple as (key, value1, value2,...)
5. **Sort the values by day for nice printing**: We sort by day in ascending order using `sortBy()`.

In [42]:
round_trips = PHL_LAX_departures.join(LAX_PHL_departures)\
                                .filter(lambda x: (x[1][1][0] - x[1][0][1])>= 301)\
                                .map(lambda x: (x[0][0], x[0][1], x[1][0][0], x[1][0][1], x[1][1][0], x[1][1][1]))\
                                .sortBy(lambda x: int(x[1]))

The results can be seen below

In [43]:
df_round_trips = sqlContext.createDataFrame(round_trips, ['Airline', 'Day', 'PHL Departure', 'LAX Arrival', 
                                                         'LAX Departure', 'PHL Arrival']).show(20)

+-------+---+-------------+-----------+-------------+-----------+
|Airline|Day|PHL Departure|LAX Arrival|LAX Departure|PHL Arrival|
+-------+---+-------------+-----------+-------------+-----------+
|     UA| 14|          700|       1001|         1353|       2153|
|     UA| 15|          700|       1001|         1353|       2153|
|     UA| 16|          700|       1001|         1353|       2153|
|     UA| 17|          700|       1001|         1353|       2153|
|     UA| 18|          700|       1001|         1353|       2153|
|     UA| 19|          700|       1001|         1353|       2153|
|     UA| 20|          700|       1001|         1353|       2153|
|     UA| 21|          700|       1001|         1353|       2153|
|     UA| 22|          700|       1001|         1353|       2153|
|     UA| 23|          700|       1001|         1353|       2153|
|     UA| 24|          700|       1001|         1353|       2153|
|     UA| 25|          700|       1001|         1353|       2153|
|     UA| 

Taking a quick look at the table, it becomes clear that "UA" is the only airline that offered flights from PHL to LAX and back in February 2007 that met the specified constraints. In particular, said flights were scheduled daily, from the 14th of February until the end of the month. Inspection of the arrival and departure times lays bear that it is indeed the same flight (same schedule) every time.

## How many flights in total?

In [44]:
round_trips.count()

15

There were 15 round trips scheduled in February 2007 meeting those constraints.

___

## Q9 : Generate the *departure flights* board of the Los Angeles Airport at 12 Jan 2007 at 13:00. The board should contain flights with actual departure times between 12:00 and 14:00, sorted by scheduled departure time. The resulting table should at least contain scheduled departure time, actual departure time (if departed), airline code, and destination

We will use columns:

- *Month* (column 2)
- *Day* (column 3)
- *DepTime*(column 5)
- *CRSDepTime* (column 6)
- *UniqueCarrier* (column 9)
- *Origin* (column 17)
- *Dest* (column 18)

*DepTime* is to be inspected. (The remaining columns have been inspected before)

In [45]:
df_filters.filter(col('Column Name').isin(['DepTime'])).select('Non-arithmetic Values').collect()

[Row(Non-arithmetic Values=['NA', 'DepTime'])]

We see that:
- *DepTime* contains NA values that require filtering.

#### Assumptions and Code Steps:

*Assumptions*
-  We inspect the dataset to find whether there are cancelled flights on the day. There are none.
-  For flights that had departed by 13.00 we preview the actual departure time and for those that departed later, we use a hyphen (-).

Investigation of cancelled flights on the day - `x[21]=='1'` (for more filtering information, please see the next code snippet)

In [48]:
dataq9_cancelled = data.filter(lambda x: x[1]=='1'
                     and x[2]=='12'
                     and x[16]=='LAX'
                     and x[4]!='NA'
                     and x[21]=='1')\
                      .map(lambda x: (x[4], x[5]))\
                      .collect()
        
len(dataq9_cancelled)

0

No cancelled flights on the day.

 *Code*
 1. **Filter**: We use `.filter()` to:
     - Select only data from January: `Month == '1'`.
     - Select only data from the 12th of January: `Day=='12'`.
     - Select only the LAX airport: `Origin='LAX'`
     - Filter out NA values from *DepTime* : `DepTime!='NA'`.
     - Select only flights with actual departure time between 12.00 and 14.00: `1200 <= DepTime <= 1400`

In [49]:
dataq9 = data.filter(lambda x: x[1]=='1'
                     and x[2]=='12'
                     and x[16]=='LAX'
                     and x[4]!='NA'
                     and (int(x[4])>=1200 and int(x[4])<=1400))

2. **Use map with our custom function `map9()`**: We a build a function that:
    - takes a tuple as input a full row from the data
    - outputs a tuple that contains
        - the scheduled departure time as key
        - a tuple of values (actual departure time/hyphen, carrier, destination)
    
We use the `.map()` function with `map9()` as argument to map the tuples to the form (Key:Scheduled Departure Time, Value: (Actual Departure Time/Hyphen, Carrier, Destination ))

In [50]:
def map9(x):
    if int(x[4])<=1300:
        dep = x[4]
    else:
        dep = '-'
    return (int(x[5]), (dep, x[8], x[17]))

3. **Sort**: We sort by key (`sortByKey()`), i.e., the Scheduled Departure Time, as requested.
4. **Remap the values for easy printing** using `.map()`

In [51]:
departure_table = dataq9.map(map9)\
                        .sortByKey()\
                        .map(lambda x: (x[0], x[1][0], x[1][1], x[1][2]))

The results can bee seen below (only the 100 rows)

In [52]:
df9 = sqlContext.createDataFrame(departure_table, ['Scheduled Departure Time', 'Actual Departure Time', 'Carrier', 'Destination']).show(100)

+------------------------+---------------------+-------+-----------+
|Scheduled Departure Time|Actual Departure Time|Carrier|Destination|
+------------------------+---------------------+-------+-----------+
|                    1100|                 1208|     UA|        ORD|
|                    1140|                 1218|     OO|        OXR|
|                    1140|                 1202|     AA|        DFW|
|                    1142|                 1206|     DL|        CVG|
|                    1145|                 1210|     WN|        SLC|
|                    1155|                 1252|     MQ|        SJC|
|                    1155|                 1242|     AA|        LAS|
|                    1157|                 1216|     OO|        PSP|
|                    1200|                 1256|     WN|        OAK|
|                    1200|                 1213|     CO|        IAH|
|                    1205|                 1206|     AA|        MIA|
|                    1211|        