In [0]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("ganesh").getOrCreate()

In [0]:
spark

### Import flights dataset into dataframe using built in csv reader

In [0]:
flights_df=spark.read.csv("/FileStore/tables/flights_small.csv", header=True)

###Include a new column called duration_hrs

In [0]:
flight_dur_df=flights_df.withColumn("duration_hrs", flights_df.air_time/60)
flight_dur_df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

###Define first filter to only keep flights from SEA to PDX

In [0]:
flight_SEA_PDX_df=flights_df.filter(flights_df.origin == "SEA").filter(flights_df.dest == "PDX")
flight_SEA_PDX_df.count()

Out[9]: 157

###Filter flights which have 1000 distance (covered in km)

In [0]:
flight_long_df=flights_df.filter(flights_df.distance > 1000)
flight_long_df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20|
|2014|   11| 12|    2346|       -4|     217|      -28|     AS| N765AS|   121|   SEA| ANC|     183|    1448|  23|    46|
|2014|    8| 11|    1017|       -3|    1

###Calculate average speed by dividing the distance by the air_time

In [0]:
flight_avgspeed_df = flight_dur_df.select('*', (flight_dur_df.distance/flight_dur_df.duration_hrs).alias("Avg Speed"))
flight_avgspeed_df.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|         Avg Speed|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2| 433.6363636363636|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0| 446.1666666666667|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|367.02702702702703|
|2014|    4|  9|    1705|       45|    1839|       3

###Count the number of flights each plane made

In [0]:
flights_count_df=flights_df.groupBy("tailnum").count()
flights_count_df.orderBy("count", ascending=False).show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N612AS|   51|
| N224AG|   50|
| N219AG|   50|
| N218AG|   49|
| N223AG|   49|
| N611AS|   46|
| N607AS|   45|
| N225AG|   45|
| N435AS|   45|
| N615AS|   45|
| N794AS|   42|
| N215AG|   41|
| N627AS|   41|
| N216AG|   41|
| N644AS|   41|
| N619AS|   40|
| N622AS|   40|
| N227AG|   40|
| N413AS|   40|
| N626AS|   40|
+-------+-----+
only showing top 20 rows



###Find the length of the shortest (in terms of distance) flight that left PDX

In [0]:
flights_df=flights_df.withColumn("distance", flights_df.distance.cast("int"))
flight_short_dst_df=flights_df.filter(flights_df.origin=="PDX").groupBy().min("distance").alias("Shorteste Distance")
flight_short_dst_df.show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



###Find the length of the longest (in terms of time) flight that left SEA

In [0]:
flights_df=flights_df.withColumn("air_time", flights_df.air_time.cast("int"))
flight_long_tym_df=flights_df.filter(flights_df.origin=="SEA").groupBy().max("air_time").alias("longest time")
flight_long_tym_df.show()

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



###Get the average air time of Delta Airlines flights  that left SEA

In [0]:
from pyspark.sql.functions import *
flights_dl_df=flights_df.filter(flights_df.carrier=="DL").filter(flights_df.origin=="SEA").groupBy().agg(avg("air_time").alias("AVG_TIME"))
flights_dl_df.show()

+------------------+
|          AVG_TIME|
+------------------+
|188.20689655172413|
+------------------+



###Get the total number of hours all planes in this dataset spent in the air

In [0]:
flight_total_tym_df=flight_dur_df.groupBy().sum("duration_hrs").alias("Total Time")
flight_total_tym_df.show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



###Find the avg of the air_time column to find average duration of flights from PDX and SEA

In [0]:
flights_avg_df=flight_dur_df.filter(flight_dur_df.origin=="PDX").filter(flight_dur_df.dest=="SEA").groupBy().avg("duration_hrs").alias("AVG AIR TIME")
flights_avg_df.show()

+------------------+
| avg(duration_hrs)|
+------------------+
|0.5821256038647341|
+------------------+



###Average departure delay by month and year

In [0]:
flights_df=flights_df.withColumn("dep_delay", flights_df.dep_delay.cast("int"))
flight_avg_dl_df=flights_df.groupBy("year", "month").avg("dep_delay").alias("DEP_DELAY")
flight_avg_dl_df.show()

+----+-----+------------------+
|year|month|    avg(dep_delay)|
+----+-----+------------------+
|2014|    2|             9.712|
|2014|   11| 4.818998716302953|
|2014|    8| 6.423584905660378|
|2014|    5|3.0617577197149646|
|2014|   12|  9.78027465667915|
|2014|    3| 5.276649746192893|
|2014|    6| 6.630214205186021|
|2014|    7| 6.895277207392197|
|2014|    4| 5.722699386503067|
|2014|   10|3.9950738916256157|
|2014|    9| 4.798561151079137|
|2014|    1| 6.374149659863946|
+----+-----+------------------+



###Join the airplanes dataframe with flights dataframe

In [0]:
airports_df=spark.read.csv("/FileStore/tables/airports.csv", header=True, sep=",")
airports_df.show()
airports_new_df=airports_df.withColumnRenamed("faa", "dest")
flights_with_airports= flights_df.join(airports_new_df, on='dest', how='leftouter')
flights_with_airports.show()

+---+--------------------+-----------+------------+----+---+---+
|faa|                name|        lat|         lon| alt| tz|dst|
+---+--------------------+-----------+------------+----+---+---+
|04G|   Lansdowne Airport| 41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...| 32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional| 41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport|  41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...| 31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...| 36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...| 41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...| 42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...| 39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...| 48.0538086|-122.8106436| 108| -8|  A|
|0W3|Harford County Ai...| 39.5668378| -76.2024028| 409| -5|  A|
|10C|  Galt Field Airport| 42.4028889| -88.3751111| 875| -6|  U|
|17G|Port Bucyrus-Craw...

###Join the flights and plane table use key as tailnum column

In [0]:
planes_df=spark.read.csv("/FileStore/tables/planes.csv", header=True, sep=",")
planes_df.show()
# Rename year column on panes to avoid duplicate column name
planes_new_df = planes_df.withColumnRenamed('year', 'plane_year')
#join the flights and plane table use key as tailnum column
model_data = flights_df.join(planes_df, on='tailnum', how='leftouter')
model_data.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA