In [0]:
import pyspark
from pyspark import SparkConf, SparkContext

In [0]:
conf = SparkConf().setAppName("Flight data")
sc = SparkContext.getOrCreate(conf=conf)

In [0]:
df = spark.read.options(inferSchema='True', header='True', delimiter=',').csv('s3://flightdata-cdb/2007.csv')

In [0]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

In [0]:
#Finding the most frequent tail number which is in destination by maximum and not 0
df1=df.filter((df.TailNum!='0') & (df.TailNum!='000000')) 
df1.groupBy("Dest", "TailNum").count().orderBy("count", ascending=False).show(1)

+----+-------+-----+
|Dest|TailNum|count|
+----+-------+-----+
| HNL| N655BR| 2241|
+----+-------+-----+
only showing top 1 row



In [0]:
#Finding out the cancelled flight details for the last quarter of the year 2007
df.filter((df.Year==2007) & (df.Month>=10) & (df.Cancelled==1)).select('FlightNum', 'Year', 'Month','Cancelled').show()

+---------+----+-----+---------+
|FlightNum|Year|Month|Cancelled|
+---------+----+-----+---------+
|      195|2007|   10|        1|
|      710|2007|   10|        1|
|     1191|2007|   10|        1|
|      514|2007|   10|        1|
|     1799|2007|   10|        1|
|       30|2007|   10|        1|
|      507|2007|   10|        1|
|      449|2007|   10|        1|
|      820|2007|   10|        1|
|     1036|2007|   10|        1|
|     1243|2007|   10|        1|
|     1804|2007|   10|        1|
|     2010|2007|   10|        1|
|     1235|2007|   10|        1|
|     1970|2007|   10|        1|
|      336|2007|   10|        1|
|      507|2007|   10|        1|
|     2843|2007|   10|        1|
|      162|2007|   10|        1|
|     2980|2007|   10|        1|
+---------+----+-----+---------+
only showing top 20 rows



In [0]:
#Finding out the average weather delays for a particular flight per month
df.groupBy(df.FlightNum, df.Month).agg({'WeatherDelay': 'avg'}).orderBy(df.FlightNum, df.Month).show()

+---------+-----+--------------------+
|FlightNum|Month|   avg(WeatherDelay)|
+---------+-----+--------------------+
|        1|    1| 0.40476190476190477|
|        1|    2|  3.0197368421052633|
|        1|    3|                0.89|
|        1|    4|  0.5185185185185185|
|        1|    5| 0.17829457364341086|
|        1|    6|  0.1589958158995816|
|        1|    7|  0.8492871690427699|
|        1|    8|  0.7018255578093306|
|        1|    9|  0.2597938144329897|
|        1|   10|  0.9749518304431599|
|        1|   11|  0.3134715025906736|
|        1|   12|  0.8363636363636363|
|        2|    1| 0.05555555555555555|
|        2|    2| 0.21923076923076923|
|        2|    3| 0.18055555555555555|
|        2|    4| 0.07547169811320754|
|        2|    5|0.050397877984084884|
|        2|    6|0.027548209366391185|
|        2|    7|  0.0427807486631016|
|        2|    8|  0.4037940379403794|
+---------+-----+--------------------+
only showing top 20 rows



In [0]:
#Inspite of NASDelay, SecurityDelay, LateAircraftDelay,Weatherdelay which flight reached exactly on time
from pyspark.sql.types import IntegerType
df2 = df.withColumn("ArrDelay", df["ArrDelay"].cast(IntegerType()))
df2.filter((df.NASDelay>0) & (df.SecurityDelay>0) & (df.LateAircraftDelay>0) & (df.WeatherDelay>0) & (df.ArrDelay<=0)).select('FlightNum','ArrDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay','LateAircraftDelay','Month', 'Year').show()

+---------+--------+------------+--------+-------------+-----------------+-----+----+
|FlightNum|ArrDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|Month|Year|
+---------+--------+------------+--------+-------------+-----------------+-----+----+
+---------+--------+------------+--------+-------------+-----------------+-----+----+



In [0]:
#Month wise total distance travelled by each flight number
df.groupBy(df.FlightNum,df.Month).agg({'Distance': 'sum'}).withColumnRenamed('sum(Distance)','total_distance').show()

+---------+-----+--------------+
|FlightNum|Month|total_distance|
+---------+-----+--------------+
|     1504|    1|         85915|
|     1384|    1|         26070|
|     1161|    1|        172275|
|      307|    1|        218173|
|     2781|    1|         38181|
|     2141|    1|         44717|
|     2395|    1|         41289|
|     2566|    1|         22842|
|     2328|    1|         61944|
|     2599|    1|         12066|
|     2509|    1|         31729|
|     1655|    1|         58396|
|     3021|    1|         17631|
|     3231|    1|          5498|
|     7152|    1|         12519|
|     7405|    1|         14604|
|     5296|    1|          3627|
|     5373|    1|          8250|
|     5193|    1|         33124|
|     5014|    1|           834|
+---------+-----+--------------+
only showing top 20 rows



In [0]:
#Month wise how many flights are getting diverted (origin to destination)
df.filter(df.Diverted==1).select('Month','Diverted').groupBy('Month').agg({'Diverted': 'sum'}).withColumnRenamed('sum(Diverted)','total_diverted_flights').show()


+-----+----------------------+
|Month|total_diverted_flights|
+-----+----------------------+
|    1|                  1200|
|    2|                  1261|
|    3|                  1275|
|    4|                  1193|
|    5|                  1442|
|    6|                  2199|
|    7|                  2150|
|    8|                  2101|
|    9|                   962|
|   10|                  1000|
|   11|                   881|
|   12|                  1515|
+-----+----------------------+



In [0]:
#Week and month wise number of trips in all the flights
df.select('FlightNum','Month','DayOfWeek').groupBy(df.Month,df.DayOfWeek).agg({'FlightNum':'count'}).withColumnRenamed('count(FlightNum)','total_flights').orderBy('Month','DayOfWeek').show()

+-----+---------+-------------+
|Month|DayOfWeek|total_flights|
+-----+---------+-------------+
|    1|        1|       102646|
|    1|        2|       101698|
|    1|        3|       102465|
|    1|        4|        83417|
|    1|        5|        83547|
|    1|        6|        69351|
|    1|        7|        78435|
|    2|        1|        83765|
|    2|        2|        81929|
|    2|        3|        82770|
|    2|        4|        83924|
|    2|        5|        84168|
|    2|        6|        70209|
|    2|        7|        78839|
|    3|        1|        85108|
|    3|        2|        84442|
|    3|        3|        84860|
|    3|        4|       106468|
|    3|        5|       106655|
|    3|        6|        91017|
+-----+---------+-------------+
only showing top 20 rows



In [0]:
#Which flights covered maximum origin and destination by month wise
from pyspark.sql.functions import countDistinct, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
new_df = df.filter(df.Cancelled == 0).groupBy('Month', 'FlightNum').agg(countDistinct('Origin').alias("Max_Origins")).sort('Month', desc('Max_Origins'))
windowDept = Window.partitionBy('Month').orderBy(col("Max_Origins").desc())
new_df.withColumn("row",row_number().over(windowDept)).filter(col("row") == 1).drop("row").show()

+-----+---------+-----------+
|Month|FlightNum|Max_Origins|
+-----+---------+-----------+
|    1|      433|         18|
|    2|      500|         18|
|    3|      644|         18|
|    4|      644|         17|
|    5|      644|         17|
|    6|      226|         18|
|    7|      425|         17|
|    8|       67|         18|
|    9|       62|         20|
|   10|       66|         20|
|   11|      303|         21|
|   12|      151|         22|
+-----+---------+-----------+



In [0]:
#Average month wise arrival delay (flightnum wise)
import pyspark.sql.functions as func
df.select('FlightNum','Month','ArrDelay').groupBy('FlightNum','Month').agg(func.avg('ArrDelay').alias('average_arrival_delay')).orderBy('FlightNum','Month').show()

+---------+-----+---------------------+
|FlightNum|Month|average_arrival_delay|
+---------+-----+---------------------+
|        1|    1|   10.715568862275449|
|        1|    2|   18.306397306397308|
|        1|    3|    8.577608142493638|
|        1|    4|   1.6835164835164835|
|        1|    5|    3.216374269005848|
|        1|    6|    9.654008438818565|
|        1|    7|    6.444444444444445|
|        1|    8|    8.573770491803279|
|        1|    9|   1.2655601659751037|
|        1|   10|    7.056420233463035|
|        1|   11|  -1.4661458333333333|
|        1|   12|    8.182539682539682|
|        2|    1|   10.943462897526501|
|        2|    2|    6.039840637450199|
|        2|    3|    9.851590106007068|
|        2|    4|    2.574193548387097|
|        2|    5|   5.2761394101876675|
|        2|    6|   15.546218487394958|
|        2|    7|   10.869209809264305|
|        2|    8|    9.385245901639344|
+---------+-----+---------------------+
only showing top 20 rows



In [0]:
#Average month wise departure delay (flightnum wise)
df.select('FlightNum','Month','DepDelay').groupBy('FlightNum','Month').agg(func.avg('DepDelay').alias('average_deparatur_delay')).show()

+---------+-----+-----------------------+
|FlightNum|Month|average_deparatur_delay|
+---------+-----+-----------------------+
|     1504|    1|      8.994444444444444|
|     1384|    1|     1.6382978723404256|
|     1161|    1|     10.056410256410256|
|      307|    1|     11.352173913043478|
|     2781|    1|      6.705882352941177|
|     2141|    1|     1.7654320987654322|
|     2395|    1|      4.313253012048193|
|     2566|    1|      5.431818181818182|
|     2328|    1|     12.347826086956522|
|     2599|    1|                   11.5|
|     2509|    1|      0.803921568627451|
|     1655|    1|     1.2037037037037037|
|     3021|    1|               -4.28125|
|     3231|    1|      7.147058823529412|
|     7152|    1|       5.45945945945946|
|     7405|    1|     22.916666666666668|
|     5296|    1|     -2.111111111111111|
|     5373|    1|                  14.24|
|     5193|    1|                   15.4|
|     5014|    1|      6.333333333333333|
+---------+-----+-----------------