In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import  floor
SparkSession.builder.appName("Mini_Project").getOrCreate()

In [0]:
df = spark.read.options(inferSchema='True', header='True', delimiter=',').csv('s3://flightdata-cdb/2007.csv')

In [0]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

In [0]:
df.show(10)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    1|         1|        1|   1232|      1225|   1341|      1340|           WN|     2891

In [0]:
# 1st-> most frequent tail number which is getting in destination by maximum
df1 = df1.groupBy("Dest","TailNum").count().orderBy("count",ascending=False)
df1.show()

+----+-------+-----+
|Dest|TailNum|count|
+----+-------+-----+
| ORD|      0|11709|
| DFW|      0| 9361|
| EWR|      0| 5194|
| LGA|      0| 3935|
| ORD| 000000| 3201|
| JFK|      0| 3123|
| BOS|      0| 2736|
| LAX|      0| 2274|
| HNL| N655BR| 2241|
| HNL| N651BR| 2173|
| HNL| N654BR| 2138|
| DTW|      0| 2108|
| HNL| N693BR| 2067|
| DCA|      0| 2062|
| HNL| N479HA| 2038|
| HNL| N478HA| 2024|
| HNL| N485HA| 1984|
| HNL| N480HA| 1976|
| IAH|      0| 1967|
| HNL| N484HA| 1944|
+----+-------+-----+
only showing top 20 rows



In [0]:
# 2nd-> Find out the cancelled flight  details for the last quarter of the year 2007
df2 = df.filter((df.Month>9) & (df.Cancelled == 1))
df2.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|   10|         2|        2|     NA|      1930|     NA|      2150|           WN|      195

In [0]:
#3rd-> Find out the average weather delays for a particular flight per month
df3 = df.groupBy("FlightNum", "Month").mean("WeatherDelay")
df3.show()

+---------+-----+-------------------+
|FlightNum|Month|  avg(WeatherDelay)|
+---------+-----+-------------------+
|     1504|    1|                0.0|
|     1384|    1|                0.0|
|     1161|    1| 1.0402010050251256|
|      307|    1| 0.5899581589958159|
|     2781|    1|0.14444444444444443|
|     2141|    1|                0.0|
|     2395|    1| 0.1566265060240964|
|     2566|    1|0.08695652173913043|
|     2328|    1| 1.8712871287128714|
|     2599|    1|                0.0|
|     2509|    1| 1.4528301886792452|
|     1655|    1|                0.0|
|     3021|    1|                0.0|
|     3231|    1|0.34210526315789475|
|     7152|    1|                0.0|
|     7405|    1|                0.0|
|     5296|    1| 1.7777777777777777|
|     5373|    1|               3.52|
|     5193|    1|  2.980769230769231|
|     5014|    1|                0.0|
+---------+-----+-------------------+
only showing top 20 rows



In [0]:
#4th-> Inspite of NASDelay, SecurityDelay, LateAircraftDelay,Weatherdealy which flight reached on time
df4 = df.filter(((df.NASDelay > 0) | (df.SecurityDelay >0) | (df.LateAircraftDelay>0) | (df.WeatherDelay>0) & (df.ArrDelay==0)))
df4.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|    1|         1|        1|   2206|      2130|   2334|      2300|           WN|     1229

In [0]:
# 5th-> Month wise total distance travelled by each flight number in every month
df5 = df.groupBy("FlightNum", "Month").sum("Distance")
df5.show()

+---------+-----+-------------+
|FlightNum|Month|sum(Distance)|
+---------+-----+-------------+
|     1504|    1|        85915|
|     1384|    1|        26070|
|     1161|    1|       172275|
|      307|    1|       218173|
|     2781|    1|        38181|
|     2141|    1|        44717|
|     2395|    1|        41289|
|     2566|    1|        22842|
|     2328|    1|        61944|
|     2599|    1|        12066|
|     2509|    1|        31729|
|     1655|    1|        58396|
|     3021|    1|        17631|
|     3231|    1|         5498|
|     7152|    1|        12519|
|     7405|    1|        14604|
|     5296|    1|         3627|
|     5373|    1|         8250|
|     5193|    1|        33124|
|     5014|    1|          834|
+---------+-----+-------------+
only showing top 20 rows



In [0]:
# 6th-> Month wise how many flights get diverted(origin to destination)
df6 = df.groupBy("Month").sum("Diverted")
df6.show()

+-----+-------------+
|Month|sum(Diverted)|
+-----+-------------+
|    1|         1200|
|    2|         1261|
|    3|         1275|
|    4|         1193|
|    5|         1442|
|    6|         2199|
|    7|         2150|
|    8|         2101|
|    9|          962|
|   10|         1000|
|   11|          881|
|   12|         1515|
+-----+-------------+



In [0]:
# 7th-> Week and month wise number of trips in all the flights
df7 = df.withColumn('Week',floor(df['DayofMonth']/7)+1)
df7 = df7.groupBy('Week','Month').count().orderBy("Week","Month",ascending=True)
df7.show()

+----+-----+------+
|Week|Month| count|
+----+-----+------+
|   1|    1|121534|
|   1|    2|119117|
|   1|    3|122937|
|   1|    4|126450|
|   1|    5|121806|
|   1|    6|122905|
|   1|    7|120625|
|   1|    8|126446|
|   1|    9|117234|
|   1|   10|122124|
|   1|   11|121221|
|   1|   12|119123|
|   2|    1|140028|
|   2|    2|139371|
|   2|    3|144478|
|   2|    4|143693|
|   2|    5|142977|
|   2|    6|147617|
|   2|    7|148339|
|   2|    8|148161|
+----+-----+------+
only showing top 20 rows



In [0]:
# 8th -> Which flights covered maximum origin and destination by month wise
df8 = df.groupBy("TailNum","Cancelled","Month","Origin").count().orderBy("Month",ascending=True).filter(df.Cancelled == 0)
df8 = df8.select("TailNum","Origin","count").orderBy("count",ascending=False)
df8.show()

+-------+------+-----+
|TailNum|Origin|count|
+-------+------+-----+
| N655BR|   HNL|  226|
| N654BR|   HNL|  210|
| N655BR|   HNL|  208|
| N655BR|   HNL|  208|
| N828AL|   HNL|  207|
| N654BR|   HNL|  206|
| N693BR|   HNL|  206|
| N651BR|   HNL|  206|
| N651BR|   HNL|  203|
| N654BR|   HNL|  202|
| N836AL|   HNL|  202|
| N808AL|   HNL|  201|
| N651BR|   HNL|  200|
| N651BR|   HNL|  198|
| N837AL|   HNL|  198|
| N693BR|   HNL|  197|
| N810AL|   HNL|  197|
| N654BR|   HNL|  196|
| N836AL|   HNL|  196|
| N651BR|   HNL|  194|
+-------+------+-----+
only showing top 20 rows



In [0]:
# 9th-> Average month wise arrival delay (flightnum wise)
df9 = df.withColumn("ArrDelay",df.ArrDelay.cast("integer"))
df9.groupBy("FlightNum", "Month").mean("ArrDelay").show()

+---------+-----+--------------------+
|FlightNum|Month|       avg(ArrDelay)|
+---------+-----+--------------------+
|     1504|    1|   9.488888888888889|
|     1384|    1| -2.9361702127659575|
|     1161|    1|   6.123076923076923|
|      307|    1|   12.78695652173913|
|     2781|    1|   4.905882352941177|
|     2141|    1| -0.7654320987654321|
|     2395|    1|  1.9518072289156627|
|     2566|    1|                4.75|
|     2328|    1|   10.98913043478261|
|     2599|    1|  25.785714285714285|
|     2509|    1|                 5.6|
|     1655|    1|0.046296296296296294|
|     3021|    1|              1.4375|
|     3231|    1|  3.7941176470588234|
|     7152|    1|   7.916666666666667|
|     7405|    1|               29.75|
|     5296|    1|   4.444444444444445|
|     5373|    1|               18.28|
|     5193|    1|               11.34|
|     5014|    1|                 6.0|
+---------+-----+--------------------+
only showing top 20 rows



In [0]:
# 10th-> Average month wise departure delay (flightnum wise)
df10 = df.withColumn("DepDelay",df.DepDelay.cast("integer")).groupBy("FlightNum", "Month").mean("DepDelay")
df10.show()

+---------+-----+------------------+
|FlightNum|Month|     avg(DepDelay)|
+---------+-----+------------------+
|     1504|    1| 8.994444444444444|
|     1384|    1|1.6382978723404256|
|     1161|    1|10.056410256410256|
|      307|    1|11.352173913043478|
|     2781|    1| 6.705882352941177|
|     2141|    1|1.7654320987654322|
|     2395|    1| 4.313253012048193|
|     2566|    1| 5.431818181818182|
|     2328|    1|12.347826086956522|
|     2599|    1|              11.5|
|     2509|    1| 0.803921568627451|
|     1655|    1|1.2037037037037037|
|     3021|    1|          -4.28125|
|     3231|    1| 7.147058823529412|
|     7152|    1|  5.45945945945946|
|     7405|    1|22.916666666666668|
|     5296|    1|-2.111111111111111|
|     5373|    1|             14.24|
|     5193|    1|              15.4|
|     5014|    1| 6.333333333333333|
+---------+-----+------------------+
only showing top 20 rows

