# Table of contents<a class="anchor" id="table"></a>

* [1 Working with RDD](#1)
* [1.1 Data Preparation and Loading](#1.1)
* [1.1.1 Creating SparkSession & SparkContext](#OneOneOne)
* [1.1.2 Read CSV files, Preprocessing, and final(formatted data) RDD for each file](#OneOneTwo)
* [1.1.2.1 Flights RDD](#1.1.2.1)
* [1.1.2.2 Airports RDD](#1.1.2.2)
* [1.1.3 Show RDD number of columns, and number of records](#1.1.3)
* [1.2 Dataset flights partitioning](#1.2)
* [1.2.1 Obtain the maximum arrival time ](#1.2.1)
* [1.2.2 Obtain the maximum minimum time ](#1.2.2)
* [1.2.3 Define hash partitioning](#1.2.3)
* [1.2.4 Display the records in each partition](#1.2.4)
* [1.3 Query RDD](#1.3)
* [1.3.1 Collect a total number of flights for each month for all flights](#1.3.1)
* [1.3.2 Collect the average delay for each month for all flights](#1.3.2)
* [2 Working with DataFrames](#2)
* [2.1 Data Preparation and Loading](#2.1)
* [2.1.1 Define DataFrames](#2.1.1)
* [2.1.2 Display the Scheme of DataFrames](#2.1.2)
* [2.1.3 Transform date-time and location column](#2.1.3)
* [2.2.1 January Flights Events with ANC airport](#2.2.1)
* [2.2.2 Average Arrival Delay From Origin to Destination](#2.2.2)
* [2.2.3 Join Query with Airports DataFrame](#2.2.3)
* [2.3 Analysis](#2.3.1)
* [2.3.1 Relationship between day of week with mean arrival delay, total time delay, and count flights](#2.3.1)
* [2.3.2 Display mean arrival delay each month](#2.3.2)
* [2.3.3 Relationship between mean departure delay and mean arrival delay](#2.3.3)
* [3 RDDs vs DataFrame vs Spark SQL](#3)
* [3.1 RDD Operation](#3.1)
* [3.2 DataFrame Operation](#3.1)
* [3.3 Spark SQL Operation](#3.1)
* [3.4 Discussion](#3.1)


# 1 Working with RDD<a class="anchor" id="1"></a>
## 1.1 Data Preparation and Loading<a class="anchor" id="1.1"></a>
### 1.1.1 Create SparkSession and SparkContext<a class="anchor" id="OneOneOne"></a>
[Back to top](#table)

In [273]:
from pyspark import SparkConf
from pyspark import SparkContext 
from pyspark.sql import SparkSession
from pyspark.rdd import RDD

master = "local[*]"
app_name = "Assignment1"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

### 1.1.2 Import CSV files and Make RDD for each file<a class="anchor" id="OneOneTwo"></a>
[Back to top](#table)

In [274]:
# define funtion that reads file and load into rdd object
def read_file_rdd(file):
    rdd=sc.textFile(file)
    #remove the header
    header=rdd.first()
    rdd=rdd.filter(lambda row: row!=header)
    return rdd

def transtype(rows):
    rows=rows.split(",")
    return rows

# trans flights columns 
def transtype_flights(rows):
    rows=rows.split(",")
    rows[0]=int(rows[0])
    rows[1]=int(rows[1])
    rows[2]=int(rows[2])
    rows[3]=int(rows[3])
    rows[5]=int(rows[5])
    if(rows[11]!=''):
        rows[11]=float(rows[11])
    else:
        rows[11]=0
    if rows[12]!='':
        rows[12]=float(rows[12])
    else:
        rows[12]=0
    if(rows[15]!=''):
        rows[15]=float(rows[15])
    else:
        rows[15]=0
    if(rows[16]!=''):
        rows[16]=float(rows[16])
    else:
        rows[16]=0
    if(rows[17]!=''):
        rows[17]=float(rows[17])
    else:
        rows[17]=0.0
    if(rows[19]!=''):
        rows[19]=float(rows[19])
    else:
        rows[19]=0
#     if(rows[21]!=''):
#         rows[21]=float(rows[21])
#     else:
#         rows[21]=0
    if(rows[22]!=''):
        rows[22]=float(rows[22])
    else:
        rows[22]=0
    return rows


#### 1.1.2.1 Flights RDD <a class="anchor" id="1.1.2.1"></a>
[Back to top](#table)

In [275]:
# Read the 20 files of “flight*.csv” file into a single RDD (flights_rdd) 
flights_rdd=read_file_rdd("flight*.csv")

flights_rdd=flights_rdd.map(transtype_flights)

print(flights_rdd.take(3))
print(flights_rdd.count())

print("Number of partitions of flights:{}".format(flights_rdd.getNumPartitions()))


Number of partitions of flights:20


#### 1.1.2.2 Airports RDD <a class="anchor" id="1.1.2.2"></a>
[Back to top](#table)

In [276]:
# read file airports.csv into airports_rdd
airports_rdd=read_file_rdd("airports.csv")
print(airports_rdd.take(3))
print()

print("Number of partitions of airports:{}".format(airports_rdd.getNumPartitions()))

['ABE,Lehigh Valley International Airport,Allentown,PA,USA,40.65236,-75.44040', 'ABI,Abilene Regional Airport,Abilene,TX,USA,32.41132,-99.68190', 'ABQ,Albuquerque International Sunport,Albuquerque,NM,USA,35.04022,-106.60919']

Number of partitions of airports:2


### 1.1.3 Show RDD number of columns, and number of records <a class="anchor" id="1.1.3"></a>
[Back to top](#table)

In [277]:
def num_of_columns(rdd):
    return len(rdd.take(1)[0])


# number of columns of airports_rdd
airports_columns=num_of_columns(airports_rdd.map(transtype))
print('number of columns of airports_rdd: {}'.format(airports_columns))

# number of columns of fights_rdd
flights_columns=num_of_columns(flights_rdd)
print('number of columns of flights_rdd: {}'.format(flights_columns))


# total number of records of airports_rdd
total_airports_records=airports_rdd.count()
print('number of records of airports_rdd: ',total_airports_records)

# total number of records of flights_rdd
total_flights_records=flights_rdd.count()
print('number of records of flights_rdd: ',total_flights_records)

number of columns of airports_rdd: 7
number of columns of flights_rdd: 31
number of records of airports_rdd:  322
number of records of flights_rdd:  582184


## 1.2 Dataset Partitioning <a class="anchor" id="1.2"></a>
### 1.2.1 Obtain the maximum arrival time <a class="anchor" id="1.2.1"></a>
[Back to top](#table)

In [278]:
# flights_rdd.filter(lambda x:x[21]==max(x[21]))

maximum_arrival_time=flights_rdd.max(key=lambda x:x[22])
print(maximum_arrival_time)
print(" maximum arrival delay is {}".format(maximum_arrival_time[22]))


[2015, 9, 13, 7, 'AA', 1063, 'N3CAAA', 'SAN', 'DFW', '700', '1050', 1670.0, 26.0, '1116', '179', 174.0, 142.0, 1171.0, '1538', 6.0, '1159', '1544', 1665.0, '0', '0', '', '0', '0', '1665', '0', '0']
 maximum arrival delay is 1665.0


### 1.2.2 Obtain the minimum arrival time <a class="anchor" id="1.2.2"></a>
[Back to top](#table)

In [279]:
minimum_arrival_time=flights_rdd.min(key=lambda x:x[22])
print(minimum_arrival_time)
print(" minimum arrival delay is {}".format(minimum_arrival_time[22]))

[2015, 1, 21, 3, 'AS', 11, 'N467AS', 'EWR', 'SEA', '1720', '1705', -15.0, 13.0, '1718', '389', 322.0, 305.0, 2402.0, '1923', 4.0, '2049', '1927', -82.0, '0', '0', '', '', '', '', '', '']
 minimum arrival delay is -82.0


### 1.2.3 Define hash partitioning function <a class="anchor" id="1.2.3"></a>
[Back to top](#table)

In [280]:
#Hash Function to implement Hash Partitioning 

def hash_function(k):
    total = 0
    for digit in str(k):
        total += int(digit)
    return total


### 1.2.4 Display the records in each partition <a class="anchor" id="1.2.4"></a>
[Back to top](#table)

In [281]:
from pyspark.rdd import RDD
#A Function to print the data items in each RDD
#WARNING: this function is only for demo purpose, it should not be used on large dataset
def print_partitions(data):
    if isinstance(data, RDD):
        numPartitions = data.getNumPartitions()
        partitions = data.glom().collect()
    else:
        numPartitions = data.rdd.getNumPartitions()
        partitions = data.rdd.glom().collect()
    
    print(f"####### NUMBER OF PARTITIONS: {numPartitions}")
    for index, partition in enumerate(partitions):
        # show partition if it is not empty
        if len(partition) > 0:
            print(f"Partition {index}: {len(partition)} records")
            print(partition)
# hash partitioning
no_of_partitions=40
# flights_hash_partitioned_rdd = flights_rdd.partitionBy(no_of_partitions, hash_function)
# flights_hash_partitioned_rdd.collect()

print("Number of partitions:{}".format(flights_rdd.getNumPartitions()))
print("Partitioner:{}".format(flights_rdd.partitioner))
# print_partitions(flights_rdd)  

Number of partitions:20
Partitioner:None


## 1.3 Query RDD  <a class="anchor" id="1.3"></a>
### 1.3.1 Collect a total number of flights for each month <a class="anchor" id="1.3.1"></a>
[Back to top](#table)

In [282]:
flights_rdd.map(lambda x:(x[1],x[5])).countByKey()

# flights_rdd.map(lambda x:(x[0],x[1],x[5])).collect()

defaultdict(int,
            {6: 50256,
             12: 47866,
             1: 47136,
             11: 46809,
             4: 48810,
             8: 50524,
             7: 52065,
             10: 48680,
             3: 50816,
             5: 49691,
             9: 46733,
             2: 42798})

### 1.3.2 Collect the average delay for each month <a class="anchor" id="1.3.2"></a>
[Back to top](#table)

In [283]:
# flights_rdd.map(lambda x:(x[1],x[22])).collect()

# print(flights_rdd.collect())
# reduceByKey(func, [numPartitions])

seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

avgdelay_rdd=flights_rdd.map(lambda x:(x[1],x[22])).aggregateByKey((0,0),seqOp,combOp).map(lambda x: [x[0],x[1][0]/x[1][1]])
print(avgdelay_rdd.collect())


[[1, 5.652155465037339], [2, 7.722627225571288], [3, 4.889286838790932], [4, 3.1355050194632246], [5, 4.644402406874485], [6, 9.534662527857371], [7, 6.701373283395755], [8, 4.652501781331645], [9, -0.8448847709327456], [10, -0.5383935907970419], [11, 0.8206114208805999], [12, 6.035244223457151]]


# 2 Working with DataFrame <a class="anchor" id="2"></a>
## 2.1. Data Preparation and Loading <a class="anchor" id="2.1"></a>
### 2.1.1 Define dataframes and loading scheme<a class="anchor" id="2.1.1"></a>
[Back to top](#table)

In [284]:
# 1. Load all flights and airports data into two separate dataframes. Name the dataframes as flightsDf and airportsDf respectively.
# Hint : use the module spark.read.format(“csv”), with header option is true and inferSchema is true
airportsDf= spark.read.format('csv')\
            .option('header',True).option('escape','"')\
            .load('airports.csv')

flightsDf= spark.read.format('csv').option('header',True).option('escape','"').load('flight*.csv')

### 2.1.2 Display the schema of the final two dataframes<a class="anchor" id="2.1.2"></a>
[Back to top](#table)

In [285]:
#2.Display the schema of the final of two dataframes

#display the schema of dataframe: airports_df
airportsDf.printSchema()
#display the schema of dataframe: flights_df
flightsDf.printSchema()

#
#display the rows of the dataframe
airportsDf.show(5)
flightsDf.show(5)

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nu

## 2.2. Query Analysis <a class="anchor" id="2.2"></a>
### 2.2.1 January flight events with ANC airport <a class="anchor" id="2.2.1"></a>
[Back to top](#table)

In [286]:
# 1.Display all the flight events in January 2015 with five columns (Month, Origin Airport, Destination Airport, Distance, and Arrival Delay), 
# where the origin airport 'ANC' and name this dataframe as janFlightEventsAncDf.

janFlightEventAncDf=flightsDf.filter(flightsDf.YEAR==2015).filter(flightsDf.MONTH==1).filter(flightsDf.ORIGIN_AIRPORT=='ANC').select("MONTH","ORIGIN_AIRPORT","DESTINATION_AIRPORT","DISTANCE","ARRIVAL_DELAY")
janFlightEventAncDf.show()



+-----+--------------+-------------------+--------+-------------+
|MONTH|ORIGIN_AIRPORT|DESTINATION_AIRPORT|DISTANCE|ARRIVAL_DELAY|
+-----+--------------+-------------------+--------+-------------+
|    1|           ANC|                SEA|    1448|          -13|
|    1|           ANC|                SEA|    1448|           -4|
|    1|           ANC|                JNU|     571|           17|
|    1|           ANC|                CDV|     160|           20|
|    1|           ANC|                BET|     399|          -20|
|    1|           ANC|                SEA|    1448|          -15|
|    1|           ANC|                SEA|    1448|          -11|
|    1|           ANC|                ADQ|     253|          -16|
|    1|           ANC|                SEA|    1448|           17|
|    1|           ANC|                BET|     399|           -9|
|    1|           ANC|                SEA|    1448|           15|
|    1|           ANC|                FAI|     261|           -6|
|    1|   

### 2.2.2 Average Arrival Delay From Origin to Destination <a class="anchor" id="2.2.2"></a>
[Back to top](#table)

In [287]:
# 2.From the query results on query no.1, please display a new query. Then please group by ‘ORIGIN_AIRPORT’ AND ‘DESTINATION_AIRPORT’.
# Add a new column and name it as ‘AVERAGE_DELAY’. This column value is the average from all‘ARRIVAL_DELAY’ values. 
# Then sort it based on ‘AVERAGE_DELAY’. Please name this dataframe as janFlightEventsAncAvgDf.
import pyspark.sql.functions as F
import pyspark.sql.types as T
# from pyspark.sql.types import DecimalType
janFlightEventAncDf=janFlightEventAncDf.withColumn("ARRIVAL_DELAY",janFlightEventAncDf["ARRIVAL_DELAY"].cast(T.DecimalType()))
janFlightEventsAncAvgDf=janFlightEventAncDf.groupby(['ORIGIN_AIRPORT','DESTINATION_AIRPORT'])\
.agg(F.avg('ARRIVAL_DELAY').alias('AVERAGE_DELAY'))
janFlightEventsAncAvgDf=janFlightEventsAncAvgDf.orderBy(janFlightEventsAncAvgDf.AVERAGE_DELAY)
janFlightEventsAncAvgDf.show()




+--------------+-------------------+-------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AVERAGE_DELAY|
+--------------+-------------------+-------------+
|           ANC|                ADK|     -27.0000|
|           ANC|                HNL|     -20.0000|
|           ANC|                MSP|     -19.2500|
|           ANC|                BET|      -9.0909|
|           ANC|                SEA|      -6.4902|
|           ANC|                BRW|      -4.3333|
|           ANC|                OME|      -3.0000|
|           ANC|                ADQ|      -2.6667|
|           ANC|                CDV|       1.0000|
|           ANC|                OTZ|       1.2500|
|           ANC|                PHX|       2.0000|
|           ANC|                DEN|       3.3333|
|           ANC|                PDX|       3.5000|
|           ANC|                JNU|       5.0000|
|           ANC|                LAS|       9.0000|
|           ANC|                SCC|      16.6667|
|           ANC|               

### 2.2.3 Join Query with Airports DataFrame <a class="anchor" id="2.2.3"></a>
[Back to top](#table)

In [288]:
# 3.Join the results on query no. 2 janFlightEventsAncAvgDf and airportsDf using inner join operation. You may name this dataset as joinedSqlDf.

# inner join using janFlightEventsAncAvgDf.ORIGIN_AIRPORT==airportsDf.IATA_CODE
joinedSqlDf=janFlightEventsAncAvgDf.join(airportsDf, janFlightEventsAncAvgDf.ORIGIN_AIRPORT==airportsDf.IATA_CODE)
joinedSqlDf.show()

# inner join using janFlightEventsAncAvgDf.DESTINATION_AIRPORT==airportsDf.IATA_CODE
joinedSqlDf=janFlightEventsAncAvgDf.join(airportsDf, janFlightEventsAncAvgDf.DESTINATION_AIRPORT==airportsDf.IATA_CODE)
joinedSqlDf.show()




+--------------+-------------------+-------------+---------+--------------------+---------+-----+-------+--------+----------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|AVERAGE_DELAY|IATA_CODE|             AIRPORT|     CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+--------------+-------------------+-------------+---------+--------------------+---------+-----+-------+--------+----------+
|           ANC|                BRW|      -4.3333|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                ADK|     -27.0000|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                OME|      -3.0000|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                JNU|       5.0000|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149.99619|
|           ANC|                LAS|       9.0000|      ANC|Ted Stevens Ancho...|Anchorage|   AK|    USA|61.17432|-149

## 2.3. Analysis <a class="anchor" id="2.3"></a>
### 2.3.1 Relationship between day of week with mean arrival delay, total time delay, and count flights <a class="anchor" id="2.3.1"></a>
[Back to top](#table)

In [290]:
#  1. Find the total number of flights events, total time delay and average of arrival delay for each day of week (‘DAY_OF_WEEK’) 
# sorted by the value of NumOfFlights in descending order. This query represents the summary of all 2015 flights. 
# What can you analyse from this query results?

# create views from dataframe
airportsDf.createOrReplaceTempView("sql_airports")
flightsDf.createOrReplaceTempView("sql_flights")

# 
sql_flights_query=spark.sql('''
select DAY_OF_WEEK, avg(ARRIVAL_DELAY) as MeanArrivalDelay, sum(ARRIVAL_DELAY) as TotalTimeDelay,count(*) as NumOfFlights
from sql_flights
where YEAR=2015 and MONTH in ('12','1','2')
group by DAY_OF_WEEK
order by NumOfFlights desc
''')
sql_flights_query.show()



+-----------+-----------------+--------------+------------+
|DAY_OF_WEEK| MeanArrivalDelay|TotalTimeDelay|NumOfFlights|
+-----------+-----------------+--------------+------------+
|          4|6.036164255716286|      129355.0|       21819|
|          5|4.501339698933113|       92399.0|       20826|
|          3|5.863307196723768|      117401.0|       20434|
|          2|6.848719138188753|      132872.0|       20180|
|          1|9.925783642336173|      178277.0|       19136|
|          7|9.933103249928099|      172687.0|       18149|
|          6|3.747166885363235|       62825.0|       17256|
+-----------+-----------------+--------------+------------+



### 2.3.2 Display mean arrival delay each month <a class="anchor" id="2.3.2"></a>
[Back to top](#table)

In [291]:
# 2. Find the average of arrival delay, total time delay, and total number of flight events for each month (‘MONTH’) sorted by MeanArrivalDelay in ascending order (default).
# What can you analyse from this query results?

sql_flights_query2=spark.sql('''
select MONTH, avg(ARRIVAL_DELAY) as MeanArrivalDelay, sum(ARRIVAL_DELAY) as TotalTimeDelay,count(*) as NumOfFlights
from sql_flights
group by MONTH
order by MeanArrivalDelay 
''')
sql_flights_query2.show()

+-----+-------------------+--------------+------------+
|MONTH|   MeanArrivalDelay|TotalTimeDelay|NumOfFlights|
+-----+-------------------+--------------+------------+
|    9|-0.8498676252179341|      -39484.0|       46733|
|   10| -0.541989784312509|      -26209.0|       48680|
|   11| 0.8313745860658399|       38412.0|       46809|
|    4|  3.173803944339603|      153044.0|       48810|
|    5| 4.7121097658084405|      230785.0|       49691|
|    8|  4.713893233866763|      235063.0|       50524|
|    3|  5.011173860427592|      248454.0|       50816|
|    1|  5.804357298474946|      266420.0|       47136|
|   12|   6.15837046195826|      288883.0|       47866|
|    7|  6.786093552465234|      348907.0|       52065|
|    2|  8.123906203913085|      330513.0|       42798|
|    6|  9.747630090727856|      479174.0|       50256|
+-----+-------------------+--------------+------------+



### 2.3.3 Relationship between mean departure delay and mean arrival delay <a class="anchor" id="2.3.3"></a>
[Back to top](#table)

In [292]:
# 3.Display the mean departure delay (MeanDeptDelay) and mean arrival delay (MeanArrivalDelay) for each month (‘MONTH’) sorted by MeanDeptDelay in
# descending order. What you can analyse from the relationship between two columns: Mean Departure Delay and Mean Arrival Delay?

sql_flights_query3=spark.sql('''
select MONTH,avg(DEPARTURE_DELAY) as MeanDeptDelay,avg(ARRIVAL_DELAY) as MeanArrivalDelay
from sql_flights
group by MONTH
order by MeanDeptDelay desc
''')
sql_flights_query3.show()

+-----+------------------+-------------------+
|MONTH|     MeanDeptDelay|   MeanArrivalDelay|
+-----+------------------+-------------------+
|    6|  13.9730063585922|  9.747630090727856|
|   12|11.821651454043728|   6.15837046195826|
|    7|11.708608758020432|  6.786093552465234|
|    2|11.620796080832823|  8.123906203913085|
|    8|10.086906141367324|  4.713893233866763|
|    1|  9.75401499511029|  5.804357298474946|
|    3| 9.718308159530178|  5.011173860427592|
|    5| 9.550310180006102| 4.7121097658084405|
|    4| 7.737554783759199|  3.173803944339603|
|   11| 6.630585898709037| 0.8313745860658399|
|   10| 5.243436261558784| -0.541989784312509|
|    9| 4.728506981740065|-0.8498676252179341|
+-----+------------------+-------------------+



# 3 RDDs vs DataFrame vs Spark SQL <a class="anchor" id="3"></a>


Implement the following queries using RDDs, DataFrames and SparkSQL separately. Log the time taken for each query in each approach using the “%%time” built-in magic command in Jupyter Notebook and discuss the performance difference of these 3 approaches.

<strong>Find the MONTH and DAY_OF_WEEK, number of flights, and average delay where TAIL_NUMBER = ‘N407AS’. Note number of flights and average delay should be aggregated separately. The average delay should be grouped by both MONTH and DAYS_OF_WEEK.</strong>

## 3.1 RDD Operation<a class="anchor" id="3.1"></a>
[Back to top](#table)

In [293]:
%%time

# print(flights_rdd.filter(lambda x: 'N407AS' in x).count())
rdd=flights_rdd.filter(lambda x: 'N407AS' in x).map(lambda x :[(x[1],x[3]),x[22]])


seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
rdd=rdd.aggregateByKey((0,0),seqOp,combOp)

rdd=rdd.map(lambda x : [x[0],x[1][0]/x[1][1], x[1][1]])
print(rdd.sortBy(lambda x: x[0]).collect())

[[(1, 1), -6.0, 1], [(1, 2), 17.5, 2], [(1, 3), -27.0, 1], [(1, 5), -21.0, 2], [(1, 6), 4.333333333333333, 3], [(2, 1), -2.5, 2], [(2, 2), -9.5, 2], [(2, 3), -11.5, 2], [(2, 4), -11.0, 2], [(2, 5), -31.0, 1], [(2, 7), 6.5, 2], [(3, 1), 29.0, 1], [(3, 2), -28.0, 2], [(3, 3), 3.0, 1], [(3, 4), 2.0, 1], [(3, 5), 6.666666666666667, 3], [(3, 6), -3.0, 1], [(4, 1), 0.0, 1], [(4, 2), 6.0, 1], [(4, 3), -7.0, 1], [(4, 4), -0.6666666666666666, 3], [(4, 6), -20.0, 1], [(4, 7), -22.0, 1], [(5, 1), 4.666666666666667, 3], [(5, 2), 0.8, 5], [(5, 3), 30.0, 1], [(5, 5), 6.0, 1], [(5, 6), -3.0, 2], [(5, 7), -7.666666666666667, 3], [(6, 1), 7.0, 4], [(6, 2), 35.0, 1], [(6, 3), -10.666666666666666, 3], [(6, 4), -15.0, 1], [(6, 6), -7.666666666666667, 3], [(7, 1), -1.0, 1], [(7, 3), -5.333333333333333, 3], [(7, 4), -4.0, 2], [(7, 5), -4.0, 1], [(7, 7), 15.2, 5], [(8, 1), -13.0, 2], [(8, 2), -11.0, 2], [(8, 3), -4.0, 1], [(8, 4), -5.0, 1], [(8, 5), -10.0, 3], [(8, 6), -5.0, 2], [(8, 7), 60.5, 2], [(9, 1), -

## 3.2 DataFrame Operation<a class="anchor" id="3.2"></a>
[Back to top](#table)

In [294]:
%%time
flightsDf=flightsDf.withColumn('DEPARTURE_DELAY',flightsDf['DEPARTURE_DELAY'].cast(T.DecimalType()))

df_op=flightsDf.filter(flightsDf.TAIL_NUMBER =='N407AS').groupby(['MONTH','DAY_OF_WEEK']).agg(F.avg('ARRIVAL_DELAY').alias('MeanArrivalDelay'),F.count('MONTH').alias('NumOfFlights'))

df_op.orderBy(df_op.MONTH,df_op.DAY_OF_WEEK).show()


+-----+-----------+-------------------+------------+
|MONTH|DAY_OF_WEEK|   MeanArrivalDelay|NumOfFlights|
+-----+-----------+-------------------+------------+
|    1|          1|               -6.0|           1|
|    1|          2|               17.5|           2|
|    1|          3|              -27.0|           1|
|    1|          5|              -21.0|           2|
|    1|          6|  4.333333333333333|           3|
|   10|          1|               15.5|           2|
|   10|          3|                1.0|           2|
|   10|          4|               -6.0|           1|
|   10|          5|-3.6666666666666665|           3|
|   11|          1|               35.0|           1|
|   11|          2|              -23.0|           1|
|   11|          4|               -1.0|           2|
|   11|          5|               12.0|           1|
|   11|          7|               -4.0|           3|
|   12|          1|               -1.0|           1|
|   12|          2|              -11.5|       

## 3.3 Spark SQL OPERATION<a class="anchor" id="3.3"></a>
[Back to top](#table)

In [295]:
%%time
sql_flights_op=spark.sql('''
select MONTH,DAY_OF_WEEK,avg(ARRIVAL_DELAY) as MeanArrivalDelay,count(*) as NumOfFlights
from sql_flights
where TAIL_NUMBER ='N407AS'
group by MONTH,DAY_OF_WEEK
order by MONTH,DAY_OF_WEEK
''')
sql_flights_op.show()

+-----+-----------+-------------------+------------+
|MONTH|DAY_OF_WEEK|   MeanArrivalDelay|NumOfFlights|
+-----+-----------+-------------------+------------+
|    1|          1|               -6.0|           1|
|    1|          2|               17.5|           2|
|    1|          3|              -27.0|           1|
|    1|          5|              -21.0|           2|
|    1|          6|  4.333333333333333|           3|
|   10|          1|               15.5|           2|
|   10|          3|                1.0|           2|
|   10|          4|               -6.0|           1|
|   10|          5|-3.6666666666666665|           3|
|   11|          1|               35.0|           1|
|   11|          2|              -23.0|           1|
|   11|          4|               -1.0|           2|
|   11|          5|               12.0|           1|
|   11|          7|               -4.0|           3|
|   12|          1|               -1.0|           1|
|   12|          2|              -11.5|       

## 3.4 Discussion<a class="anchor" id="3.4"></a>
[Back to top](#table)