In [0]:
# -------------------------------PYSPARK MINI PROJECT FINAL SUBMISSION--------------------------------

# Name : Jagadeesh Relli

# DATASET USED : AIRLINE ON TIME DATASET (2007)
#             (https://dataverse.harvard.edu/file.xhtml?persistentId=doi:10.7910/DVN/HG7NV7/2BHLWK&version=1.0#)
             
# REQUIREMENTS :

# 1)Find the mot frequent tail number which is getting in destination by maximum
# 2)Find out the cancelled flight  details for the last quarter of the year 2007
# 3)Find out the average weather delays for a particular flight per month
# 4)Inspite of NASDelay, SecurityDelay, LateAircraftDelay,Weatherdealy which flight reached on time
# 5)Month wise total distance travelled by each flight number in every month
# 6)Month wise how many flights get diverted(origin to destination)
# 7)Week and month wise number of trips in all the flights
# 8)Which flights covered maximum origin and destination by month wise
# 9)Average month wise arrival delay (flightnum wise)
# 10)Average month wise departure delay (flightnum wise)

In [0]:
#creating the spark session

from pyspark.sql import SparkSession
from pyspark.sql.functions import ceil
spark = SparkSession.builder.appName("Mini Project-2143518").getOrCreate()



In [0]:
#loading the dataset into a dataframe

df=spark.read.options(inferSchema='True',header='True',delimiter=',').csv("/FileStore/tables/2007_csv.bz2")


In [0]:
#displaying the schema of the dataframe

df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

In [0]:
#displaying first 10 rows of the dataframe

df.show(10)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    1|         1|        1|   1232|      1225|   1341|      1340|           WN|     2891

In [0]:
#displaying the shape of dataframe

print((df.count(), len(df.columns)))

(7453215, 29)


In [0]:
#REQUIREMENTS SOLUTION SCRIPTS

In [0]:
# 1) Find the most frequent tail number which is getting in destination by maximum

df1=df.select("Dest","TailNum")\
.filter("TailNum != '0' AND TailNum != '000000'")\
.groupBy("Dest", "TailNum")\
.count()\
.sort("count", ascending=False)\
.dropDuplicates(["Dest"])\
.show()

+----+-------+-----+
|Dest|TailNum|count|
+----+-------+-----+
| ABE| N25504|   97|
| ABI| N902AE|   93|
| ABQ| N12163|  111|
| ABY| N919EV|   74|
| ACK| N258JB|    7|
| ACT| N286AE|  167|
| ACV| N294SW|  108|
| ACY| N921EV|   13|
| ADK| N768AS|   22|
| ADQ| N762AS|   37|
| AEX| N324AE|   84|
| AGS| N641AS|   82|
| AKN| N768AS|   40|
| ALB| N713EV|   41|
| ALO| 80339E|    6|
| AMA| N519SW|   51|
| ANC| N768AS|  728|
| APF| N878AS|   41|
| ASE| N446YV|  190|
| ATL| N634AS| 1066|
+----+-------+-----+
only showing top 20 rows



In [0]:
# 2)Find out the cancelled flight  details for the last quarter of the year 2007

df2=df.select("*")\
.filter("Cancelled == 1 and month >9")\
.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|   10|         2|        2|     NA|      1930|     NA|      2150|           WN|      195

In [0]:
# 3)Find out the average weather delays for a particular flight per month

df3=df.select("FlightNum","Month","WeatherDelay")\
.groupby("FlightNum","Month")\
.agg({"WeatherDelay":"avg"})\
.withColumnRenamed("avg(WeatherDelay)","AverageWeatherDelay")\
.sort("FlightNum","Month")\
.show()

+---------+-----+--------------------+
|FlightNum|Month| AverageWeatherDelay|
+---------+-----+--------------------+
|        1|    1| 0.40476190476190477|
|        1|    2|  3.0197368421052633|
|        1|    3|                0.89|
|        1|    4|  0.5185185185185185|
|        1|    5| 0.17829457364341086|
|        1|    6|  0.1589958158995816|
|        1|    7|  0.8492871690427699|
|        1|    8|  0.7018255578093306|
|        1|    9|  0.2597938144329897|
|        1|   10|  0.9749518304431599|
|        1|   11|  0.3134715025906736|
|        1|   12|  0.8363636363636363|
|        2|    1| 0.05555555555555555|
|        2|    2| 0.21923076923076923|
|        2|    3| 0.18055555555555555|
|        2|    4| 0.07547169811320754|
|        2|    5|0.050397877984084884|
|        2|    6|0.027548209366391185|
|        2|    7|  0.0427807486631016|
|        2|    8|  0.4037940379403794|
+---------+-----+--------------------+
only showing top 20 rows



In [0]:
# 4)Inspite of NASDelay, SecurityDelay, LateAircraftDelay,Weatherdealy which flight reached on time

df4=df.select("FlightNum","ArrDelay","NASDelay","SecurityDelay","LateAircraftDelay","WeatherDelay").\
filter("(NASDelay!=0 or SecurityDelay!=0 or LateAircraftDelay!=0 or WeatherDelay!=0) and (ArrDelay<=0)")\
.show()


+---------+--------+--------+-------------+-----------------+------------+
|FlightNum|ArrDelay|NASDelay|SecurityDelay|LateAircraftDelay|WeatherDelay|
+---------+--------+--------+-------------+-----------------+------------+
+---------+--------+--------+-------------+-----------------+------------+



In [0]:
# 5) Month wise total distance travelled by each flight number in every month

df5=df.select("FlightNum","Month","Distance")\
.groupby("Month","FlightNum")\
.agg({"Distance":"sum"})\
.withColumnRenamed("sum(Distance)","TotalDistanceTravelled")\
.show()

+-----+---------+----------------------+
|Month|FlightNum|TotalDistanceTravelled|
+-----+---------+----------------------+
|    1|     1208|                168829|
|    1|     2545|                 46943|
|    1|      587|                228368|
|    1|     1702|                244257|
|    1|     2483|                 20571|
|    1|     1720|                135178|
|    1|     1677|                187618|
|    1|      869|                117514|
|    1|     1892|                 79096|
|    1|     2324|                 68316|
|    1|     3452|                  1848|
|    1|     2667|                 34723|
|    1|     2202|                 20083|
|    1|     2086|                 76858|
|    1|     2870|                 38722|
|    1|     7340|                 19824|
|    1|     5501|                 13702|
|    1|     4979|                  1152|
|    1|     5668|                 46004|
|    1|     4949|                  7506|
+-----+---------+----------------------+
only showing top

In [0]:
# 6)Month wise how many flights get diverted(origin to destination)

df6=df.select("Month","FlightNum","Diverted")\
.filter("Diverted == 1")\
.groupby("Month")\
.agg({"FlightNum":"Count"})\
.withColumnRenamed("Count(FlightNum)","TotalNumOfDivertedFlights")\
.sort("Month")\
.show()

+-----+-------------------------+
|Month|TotalNumOfDivertedFlights|
+-----+-------------------------+
|    1|                     1200|
|    2|                     1261|
|    3|                     1275|
|    4|                     1193|
|    5|                     1442|
|    6|                     2199|
|    7|                     2150|
|    8|                     2101|
|    9|                      962|
|   10|                     1000|
|   11|                      881|
|   12|                     1515|
+-----+-------------------------+



In [0]:
# 7) Week and month wise number of trips in all the flights

df7=df.select("Month","DayOfMonth")\
.withColumn('WeekNumber',(ceil(df["DayOfMonth"]/7)))\
.groupby("Month","WeekNumber")\
.count()\
.withColumnRenamed("count","NumberOfTrips")\
.sort("Month","WeekNumber")\
.show()

+-----+----------+-------------+
|Month|WeekNumber|NumberOfTrips|
+-----+----------+-------------+
|    1|         1|       141364|
|    1|         2|       139730|
|    1|         3|       139647|
|    1|         4|       139631|
|    1|         5|        61187|
|    2|         1|       139397|
|    2|         2|       139549|
|    2|         3|       143252|
|    2|         4|       143406|
|    3|         1|       144133|
|    3|         2|       144485|
|    3|         3|       144860|
|    3|         4|       144769|
|    3|         5|        60962|
|    4|         1|       144603|
|    4|         2|       143815|
|    4|         3|       142766|
|    4|         4|       142434|
|    4|         5|        41030|
|    5|         1|       142879|
+-----+----------+-------------+
only showing top 20 rows



In [0]:
# 8) Which flights covered maximum origin and destination by month wise

df8=df.select("FlightNum","Month","Origin","Dest")\
.groupBy("Month","Origin","Dest","FlightNum")\
.count()\
.sort("count",ascending=False)\
.withColumnRenamed("count","DistanceCovered")\
.dropDuplicates(["Month"])\
.sort("Month")\
.show()


+-----+------+----+---------+---------------+
|Month|Origin|Dest|FlightNum|DistanceCovered|
+-----+------+----+---------+---------------+
|    1|   SFO| JFK|       16|             62|
|    2|   LIH| HNL|      110|             56|
|    3|   DEN| PDX|      521|             62|
|    4|   SFO| JFK|       16|             60|
|    5|   LAX| JFK|       30|             62|
|    6|   LAX| JFK|       30|             60|
|    7|   JFK| SFO|       15|             62|
|    8|   JFK| SFO|       15|             62|
|    9|   JFK| LAX|      133|             60|
|   10|   JFK| SFO|       15|             62|
|   11|   SFO| JFK|       16|             60|
|   12|   ATL| DFW|     1209|             62|
+-----+------+----+---------+---------------+



In [0]:
# 9)Average month wise arrival delay (flightnum wise)

df9=df.select("Month","FlightNum","DepDelay")\
.groupBy("Month","FlightNum")\
.agg({"DepDelay":"avg"})\
.withColumnRenamed("avg(DepDelay)","AverageArrivalDelay")\
.sort("Month")\
.show()

+-----+---------+-------------------+
|Month|FlightNum|AverageArrivalDelay|
+-----+---------+-------------------+
|    1|     1208|  8.094890510948906|
|    1|     2545|  5.462686567164179|
|    1|      587|  6.336956521739131|
|    1|     1702|  7.805471124620061|
|    1|     2483|  6.814285714285714|
|    1|     1720|  11.01530612244898|
|    1|     1677| 14.131428571428572|
|    1|      869|  6.867469879518072|
|    1|     1892|  4.377551020408164|
|    1|     2324|  5.641304347826087|
|    1|     3452|                3.0|
|    1|     2667| 12.347826086956522|
|    1|     2202|  8.090909090909092|
|    1|     2086|              11.25|
|    1|     2870|            3.71875|
|    1|     7340|            15.8125|
|    1|     5501| 10.416666666666666|
|    1|     4979|               31.0|
|    1|     5668|  7.225806451612903|
|    1|     4949|  4.461538461538462|
+-----+---------+-------------------+
only showing top 20 rows



In [0]:
# 10)Average month wise departure delay (flightnum wise)

df10=df.select("Month","FlightNum","DepDelay")\
.groupBy("Month","FlightNum")\
.agg({"DepDelay":"avg"})\
.withColumnRenamed("avg(DepDelay)","AverageDepartureDelay")\
.sort("Month")\
.show()

+-----+---------+---------------------+
|Month|FlightNum|AverageDepartureDelay|
+-----+---------+---------------------+
|    1|     1208|    8.094890510948906|
|    1|     2545|    5.462686567164179|
|    1|      587|    6.336956521739131|
|    1|     1702|    7.805471124620061|
|    1|     2483|    6.814285714285714|
|    1|     1720|    11.01530612244898|
|    1|     1677|   14.131428571428572|
|    1|      869|    6.867469879518072|
|    1|     1892|    4.377551020408164|
|    1|     2324|    5.641304347826087|
|    1|     3452|                  3.0|
|    1|     2667|   12.347826086956522|
|    1|     2202|    8.090909090909092|
|    1|     2086|                11.25|
|    1|     2870|              3.71875|
|    1|     7340|              15.8125|
|    1|     5501|   10.416666666666666|
|    1|     4979|                 31.0|
|    1|     5668|    7.225806451612903|
|    1|     4949|    4.461538461538462|
+-----+---------+---------------------+
only showing top 20 rows

