# github_link : https://github.com/SiddharthaShandilya/cts-data-engineerinf/blob/master/trial_mini_project-Copy1.ipynb

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc, count, window
spark = SparkSession.builder.appName('trial_mini_project').getOrCreate()



# importing data from datastet

df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
#df_q2 = df.select('Dest', 'TailNum').where(df["Dest"] != '0')

df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

# 1) Find the most frequent tail number which is getting in destination 


# ----------------------------------------------------------------------

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc, count, countDistinct, row_number
from pyspark.sql.window import Window

#importing data from local data folder
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

df_q1 = df
df_q1.na.drop(subset=['TailNum','Dest','Month'])

#Calculating which flights covered maximum origins by month wise

df_q11 = df_q1.filter(df_q1.Cancelled == 0).groupBy('Dest','TailNum').agg(count('TailNum').alias("Count_TailNum")).sort(desc('Count_TailNum'))


maxDest = Window.partitionBy('Dest').orderBy(col("Count_TailNum").desc())


flight_with_maxDest = df_q11.withColumn("row",row_number().over(maxDest)).filter(col("row") == 1).drop("row")

print("Flights that covered maximum Dest month wise")

flight_with_maxDest.sort(desc('Count_TailNum')).show() 

Flights that covered maximum Dest month wise
+----+-------+-------------+
|Dest|TailNum|Count_TailNum|
+----+-------+-------------+
| HNL| N655BR|         2227|
| DEN| N455YV|         1377|
| LAX| N313AE|         1250|
| ORD| N680AE|         1177|
| DFW| N286AE|         1161|
| PHX| N988HA|         1079|
| SEA| N556AS|         1076|
| ATL| N634AS|         1017|
| LGA| N916DE|         1003|
| IAH| N15941|          994|
| SLC| N457SW|          982|
| OGG| N479HA|          853|
| SJC| N841AE|          835|
| CVG| N656CA|          760|
| CLT| N906FJ|          706|
| JFK| N197JB|          702|
| ANC| N768AS|          701|
| IAD| N859MJ|          676|
| SFO| N293SW|          668|
| PHL| N944UW|          636|
+----+-------+-------------+
only showing top 20 rows



In [3]:
#Method 2 for validating the values


"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc, count, window
#importing data from local data folder
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

#selecting the required columns , this makes processing faster
df_q2 = df.select('Dest','TailNum', 'Cancelled')

df_q2_no_null = df_q2.filter((df_q2["TailNum"] != "0") & (df_q2["TailNum"] != "000000") & (df_q2["Cancelled"] != 1)) #removing rows with TailNum = 0/000000

#print(type(df_q2))
print(" +++++ "*10)
most_frequent = df_q2_no_null.groupBy("TailNum",'Dest').agg(count("TailNum").alias("Count_TailNum")).sort(col("Count_TailNum").desc()).show(1000)


#most_frequent.distinct().show()
"""

'\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.functions import avg, col, desc, count, window\n#importing data from local data folder\ndf = spark.read.csv(\'Data/2007.csv\',inferSchema=True,header=True)\n\n#selecting the required columns , this makes processing faster\ndf_q2 = df.select(\'Dest\',\'TailNum\', \'Cancelled\')\n\ndf_q2_no_null = df_q2.filter((df_q2["TailNum"] != "0") & (df_q2["TailNum"] != "000000") & (df_q2["Cancelled"] != 1)) #removing rows with TailNum = 0/000000\n\n#print(type(df_q2))\nprint(" +++++ "*10)\nmost_frequent = df_q2_no_null.groupBy("TailNum",\'Dest\').agg(count("TailNum").alias("Count_TailNum")).sort(col("Count_TailNum").desc()).show(1000)\n\n\n#most_frequent.distinct().show()\n'

# ----------------------------------------------------------------------

# 2) Find out the cancelled flight  details for the last quarter of the year 2007



# ----------------------------------------------------------------------

In [4]:
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

df.na.drop()

df.filter((df.Month >=10) & (df.Month <=12) & (df.Cancelled == 1)).show(5)


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2007|   10|         2|        2|     NA|      1930|     NA|      2150|           WN|      195

# ----------------------------------------------------------------------

# 3) Find out the average weather delays for a particular flight per month


# ----------------------------------------------------------------------

In [5]:
from pyspark.sql.functions import avg, col, desc

df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

df.groupBy("Month").avg('WeatherDelay').sort(col("Month")).show()

+-----+-------------------+
|Month|  avg(WeatherDelay)|
+-----+-------------------+
|    1| 0.8126742594025668|
|    2| 1.1426651862433788|
|    3| 0.6333765638468795|
|    4|   0.51643216930666|
|    5| 0.6052272846017077|
|    6| 1.2763936562420544|
|    7| 1.0766004687307265|
|    8| 0.8375915956275956|
|    9|0.41135346150449775|
|   10|0.45674389516057345|
|   11| 0.3357768086867862|
|   12| 1.1352771929481762|
+-----+-------------------+



# ----------------------------------------------------------------------

# 4) Inspite of NASDelay, SecurityDelay, LateAircraftDelay,Weatherdealy which flight reached on time



# ----------------------------------------------------------------------

In [6]:
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)


total_count = df.filter(((df.NASDelay > 0) | (df.SecurityDelay > 0) | (df.LateAircraftDelay > 0) | (df.WeatherDelay > 0)) & (df.ArrDelay <= 0) ).count()

print("\n total number of flights that are on time | -- ", total_count)


 total number of flights that are on time | --  0


# ----------------------------------------------------------------------

# 5) Month wise total distance travelled by each flight number in every month

# ----------------------------------------------------------------------

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc


df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
df_q2 = df.select('Month','FlightNum', 'Distance').filter((df["TailNum"] != "0") & (df["TailNum"] != "000000"))

print(type(df_q2))

total_distance = df_q2.groupBy("Month","FlightNum").sum("Distance").sort(col("Month")).show()

#total_count = df_q2.groupBy("TailNum").sum("Distance").sort(col("sum(Distance)").desc()).show()


<class 'pyspark.sql.dataframe.DataFrame'>
+-----+---------+-------------+
|Month|FlightNum|sum(Distance)|
+-----+---------+-------------+
|    1|      739|       218313|
|    1|     2344|        71799|
|    1|     2285|        39326|
|    1|     2847|        73732|
|    1|      547|       215454|
|    1|     1726|       139989|
|    1|     2367|       103377|
|    1|     2478|        76863|
|    1|      847|       103350|
|    1|      381|       263587|
|    1|      152|       347567|
|    1|      541|       170169|
|    1|     2215|        54846|
|    1|     2682|        32871|
|    1|     2250|        36816|
|    1|     1207|       130163|
|    1|     2267|        28446|
|    1|     2699|        12831|
|    1|     7187|         5328|
|    1|     7174|        29747|
+-----+---------+-------------+
only showing top 20 rows



# ----------------------------------------------------------------------

# 6) Month wise how many flights get diverted(origin to destination)



# ----------------------------------------------------------------------

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc


df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
df_q2 = df.select('Origin','Month','Dest','TailNum','Diverted').filter((df["TailNum"] != "0") & (df["TailNum"] != "000000"))

print(" ++++ "*10)

# monthwise total diveted flight
total_sum_diverted = df_q2.groupBy("Month").sum("Diverted").sort(col("Month")).show()

print(" ++++ "*10)

# monthwise total diveted flight with respect to each origin and destination
total_distance = df_q2.groupBy("Month",'Origin','Dest').sum("Diverted").sort(col("Month")).show(12)

print(" ++++ "*10)

 ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++ 
+-----+-------------+
|Month|sum(Diverted)|
+-----+-------------+
|    1|         1200|
|    2|         1261|
|    3|         1275|
|    4|         1193|
|    5|         1442|
|    6|         2199|
|    7|         2150|
|    8|         2100|
|    9|          942|
|   10|          977|
|   11|          845|
|   12|         1488|
+-----+-------------+

 ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++  ++++ 
+-----+------+----+-------------+
|Month|Origin|Dest|sum(Diverted)|
+-----+------+----+-------------+
|    1|   RSW| PHL|            0|
|    1|   TUL| IAH|            0|
|    1|   STL| IAH|            0|
|    1|   CLE| SAT|            0|
|    1|   HDN| DEN|            0|
|    1|   IAD| BNA|            0|
|    1|   ORD| GSP|            3|
|    1|   CID| CVG|            0|
|    1|   ATL| BDL|            0|
|    1|   SLC| MCI|            0|
|    1|   DEN| MRY|            0|
|    1|   LAS| SFO|            0|
+-----+------+-

# ----------------------------------------------------------------------

# 7) Week and month wise number of trips in all the flights



# ----------------------------------------------------------------------

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc


df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
df_no_cancel = df.filter('Cancelled== 0') 
df_q2 = df_no_cancel.select('Month','DayOfWeek','TailNum').filter((df["TailNum"] != "0") & (df["TailNum"] != "000000") & (df["Cancelled"] == 0))



total_distance = df_q2.groupBy("Month",'DayOfWeek','TailNum').agg(count("TailNum").alias("total_trips")).sort(['Month','DayOfWeek']).show()

+-----+---------+-------+-----------+
|Month|DayOfWeek|TailNum|total_trips|
+-----+---------+-------+-----------+
|    1|        1| N511UA|         14|
|    1|        1| N434YV|         34|
|    1|        1| N16170|         21|
|    1|        1| N443CA|         21|
|    1|        1| N495CA|         29|
|    1|        1| N708CA|         22|
|    1|        1| N929LR|         30|
|    1|        1| N829UA|         19|
|    1|        1| N680AW|         23|
|    1|        1| N813AW|         24|
|    1|        1| N625AW|         22|
|    1|        1| N627AW|         13|
|    1|        1| N752UW|         26|
|    1|        1| N601DL|         15|
|    1|        1| N503AE|         28|
|    1|        1| N513AE|         25|
|    1|        1| N620NW|         17|
|    1|        1| N508US|         13|
|    1|        1| N583AA|         21|
|    1|        1| N958AS|         27|
+-----+---------+-------+-----------+
only showing top 20 rows



# ----------------------------------------------------------------------

# 8) Which flights covered maximum origin and destination by month wise


# ----------------------------------------------------------------------

--------------------------------------------------------------------
METHOD 1: FLIGHT WITH THEIR TRIP BETWEEN EACH ORIGIN AND DESTINATION
--------------------------------------------------------------------

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc,expr

#importing data from local data folder
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

#selecting the required columns , this makes processing faster
df_q2 = df.select('Origin','Month','Dest','TailNum','FlightNum','Cancelled').filter((df["TailNum"] != "0") & (df["TailNum"] != "000000"))

df_q2_no_null = df_q2.filter((df_q2["Origin"] != "null") & (df_q2["Dest"] != "null") & (df_q2["Cancelled"] != 1) ) #removing rows with null values

print(type(df_q2))

print(" --++-- "*10 + " \n \t\t\t With respect to Tail Number \n" + " --++-- "*10)

most_frequent_TailNum = df_q2_no_null.groupBy('Month','Origin','Dest','TailNum').agg(count("TailNum").alias("Total_trips_TailNum")).sort(col("Total_trips_TailNum").desc()).show()

print(" --++-- "*10 + " \n  \t\t\t With respect to Flight Number \n" + " --++-- "*10)

most_frequent_FlightNum = df_q2_no_null.groupBy('Month','Origin','Dest','FlightNum').agg(count("FlightNum").alias("Total_trips_FlightNum")).sort(col("Total_trips_FlightNum").desc())

most_frequent_FlightNum.show()

<class 'pyspark.sql.dataframe.DataFrame'>
 --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++--  
 			 With respect to Tail Number 
 --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++--  --++-- 
+-----+------+----+-------+-------------------+
|Month|Origin|Dest|TailNum|Total_trips_TailNum|
+-----+------+----+-------+-------------------+
|    8|   HNL| OGG| N841AL|                 94|
|   12|   HNL| OGG| N841AL|                 86|
|   12|   OGG| HNL| N841AL|                 86|
|    7|   HNL| OGG| N841AL|                 86|
|    8|   OGG| HNL| N485HA|                 85|
|   10|   KOA| HNL| N655BR|                 84|
|   10|   HNL| KOA| N655BR|                 84|
|    1|   KOA| HNL| N646BR|                 83|
|    1|   HNL| KOA| N646BR|                 82|
|    7|   LIH| HNL| N841AL|                 80|
|    5|   HNL| OGG| N836AL|                 79|
|    8|   KOA| HNL| N655BR|                 78|
|    5|   LGA| BOS| N908DE|              

# ----------------------------------------------------------------------

----------------------------------------------------------------------
METHOD 2 USING DEST AND ORIGIN SEPERATELY TO COUNT THE MAX TRIP
----------------------------------------------------------------------

In [11]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc,expr

#importing data from local data folder
df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)

#Removing null values
df8=df
df8.na.drop(subset=['Origin','Dest','Month','FlightNum','Cancelled'])

#Calculating which flights covered maximum origins by month wise

df81 = df8.filter(df8.Cancelled == 0).groupBy('Month', 'FlightNum').agg(countDistinct('Origin').alias("Max_Origins")).sort('Month', desc('Max_Origins'))

maxorigin = Window.partitionBy('Month').orderBy(col("Max_Origins").desc())

flight_with_maxorigin = df81.withColumn("row",row_number().over(maxorigin)).filter(col("row") == 1).drop("row")

print(" -- ++ -- "*12 + "\n \t\t Flights that covered maximum origins month wise \n" + " -- ++ -- "*12)

flight_with_maxorigin.show()

#Calculating which flights covered maximum destinations by month wise

df82 = df8.filter(df8.Cancelled == 0).groupBy('Month', 'FlightNum').agg(countDistinct('Dest').alias("Max_Destinations")).sort('Month', desc('Max_Destinations'))

maxdest = Window.partitionBy('Month').orderBy(col("Max_Destinations").desc())

flight_with_maxdest=df82.withColumn("row",row_number().over(maxdest)).filter(col("row") == 1).drop("row")

print(" -- ++ -- "*12 + "\n \t\t Flights that covered maximum destinations month wise \n" + " -- ++ -- "*12)
flight_with_maxdest.show()




 -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ -- 
 		 Flights that covered maximum origins month wise 
 -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ -- 
+-----+---------+-----------+
|Month|FlightNum|Max_Origins|
+-----+---------+-----------+
|   12|      151|         22|
|    1|      433|         18|
|    6|      226|         18|
|    3|      644|         18|
|    5|      644|         17|
|    9|       62|         20|
|    4|      644|         17|
|    8|       67|         18|
|    7|      425|         17|
|   10|       66|         20|
|   11|      303|         21|
|    2|      500|         18|
+-----+---------+-----------+

 -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- ++ -- 
 		 Flights that covered maximum destinations month wise 
 -- ++ --  -- ++ --  -- ++ --  -- ++ --  -- +

# ----------------------------------------------------------------------

# 9) Average month wise arrival delay (flightnum wise)



# ----------------------------------------------------------------------

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc


df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
df_no_cancel = df.filter('Cancelled== 0') 
df_q2 = df_no_cancel.select('Month','ArrDelay','FlightNum')



total_distance = df_q2.groupBy("Month", "FlightNum").agg(avg("ArrDelay").alias("avg_monthly_arrival_delay")).sort(['Month']).show()

+-----+---------+-------------------------+
|Month|FlightNum|avg_monthly_arrival_delay|
+-----+---------+-------------------------+
|    1|      739|        0.558252427184466|
|    1|     2344|        4.923809523809524|
|    1|     2285|      -12.209302325581396|
|    1|     2847|        1.251572327044025|
|    1|      547|        16.72151898734177|
|    1|     1726|        3.877551020408163|
|    1|     2367|       -5.546511627906977|
|    1|     2478|       -4.820224719101123|
|    1|      847|       10.861842105263158|
|    1|      381|       1.4029850746268657|
|    1|      152|                 2.359375|
|    1|      541|       12.755458515283843|
|    1|     2215|        2.629032258064516|
|    1|     2682|        5.919354838709677|
|    1|     2250|                      1.6|
|    1|     1207|        7.872340425531915|
|    1|     2267|        39.67272727272727|
|    1|     2699|        6.310344827586207|
|    1|     7187|                   19.125|
|    1|     7174|        10.9836

# ----------------------------------------------------------------------

# 10) Average month wise departure delay (flightnum wise)

# ----------------------------------------------------------------------

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, desc


df = spark.read.csv('Data/2007.csv',inferSchema=True,header=True)
df_no_cancel = df.filter('Cancelled== 0') 
df_q2 = df_no_cancel.select('Month','DepDelay','FlightNum')



total_distance = df_q2.groupBy("Month","FlightNum").agg(avg("DepDelay").alias("avg_monthly_departure_delay")).sort(['Month']).show()

+-----+---------+---------------------------+
|Month|FlightNum|avg_monthly_departure_delay|
+-----+---------+---------------------------+
|    1|     1208|          8.094890510948906|
|    1|     2545|          5.462686567164179|
|    1|      587|          6.336956521739131|
|    1|     1702|          7.805471124620061|
|    1|     2483|          6.814285714285714|
|    1|     1720|          11.01530612244898|
|    1|     1677|         14.131428571428572|
|    1|      869|          6.867469879518072|
|    1|     1892|          4.377551020408164|
|    1|     2324|          5.641304347826087|
|    1|     3452|                        3.0|
|    1|     2667|         12.347826086956522|
|    1|     2202|          8.090909090909092|
|    1|     2086|                      11.25|
|    1|     2870|                    3.71875|
|    1|     7340|                    15.8125|
|    1|     5501|         10.416666666666666|
|    1|     4979|                       31.0|
|    1|     5668|          7.22580

# ----------------------------------------------------------------------