In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("FlightdelayAnalysis").getOrCreate()

# Importing the Data from to SPARK Data Frames
df_fl = spark.read.option("header", "true").csv("flights.csv")
df_ap = spark.read.option("header", "true").csv("airports.csv")
df_al = spark.read.option("header", "true").csv("airlines.csv")


In [2]:
# Removing unused/Not required columns from Flights dataset.

cols_to_drop = ['TAXI_OUT', 'WHEELS_OFF','WHEELS_ON','TAXI_IN']
df_fl = df_fl.drop(*cols_to_drop)


In [3]:
# Showing Schenmas for all Datasets 

df_fl.printSchema()
df_ap.printSchema()
df_al.printSchema()


root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- DIVERTED: string (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- CANCELLATION_REASON: string (nullable = true)
 |-- AIR_SYSTEM_DELAY: string (nulla

In [4]:
# Joined all the data set and stored DataFrames
flights_with_airports= df_fl.join(df_ap, on=(df_fl.DESTINATION_AIRPORT == df_ap.IATA_CODE), how='leftouter')
flights_wi_airline_airpo= flights_with_airports.join(df_al,on=(df_al.AIRLINE==flights_with_airports.AIRLINE),how='leftouter')
flights_wi_airline_airpo.count()
flights_wi_airline_airpo.printSchema()



root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- DIVERTED: string (nullable = true)
 |-- CANCELLED: string (nullable = true)
 |-- CANCELLATION_REASON: string (nullable = true)
 |-- AIR_SYSTEM_DELAY: string (nulla

In [5]:
## Month Wise totaldistance travelled by each Airplane

df_fl_1= df_fl.select("month",'TAIL_NUMBER',"distance",)
df_total_dis_Flight = df_fl_1.filter(df_fl_1.month.isNotNull()).groupby("month","TAIL_NUMBER").agg({"distance":"sum"}).orderBy("month","TAIL_NUMBER").withColumnRenamed("sum(distance)","Total_Distance")
df_total_dis_Flight.printSchema()
df_total_dis_Flight.na.drop(how="any").show(10)



root
 |-- month: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- Total_Distance: double (nullable = true)

+-----+-----------+--------------+
|month|TAIL_NUMBER|Total_Distance|
+-----+-----------+--------------+
|    1|     N001AA|       47547.0|
|    1|     N002AA|       42052.0|
|    1|     N003AA|       46306.0|
|    1|     N004AA|       58632.0|
|    1|     N005AA|       52377.0|
|    1|     N006AA|       40502.0|
|    1|     N007AA|       52032.0|
|    1|     N008AA|       18789.0|
|    1|     N009AA|       50562.0|
|    1|     N010AA|       34216.0|
+-----+-----------+--------------+
only showing top 10 rows



In [6]:
# Month Wise How many get Diverted from Origin to Destination

df_fl_2 = df_fl.select("month","AIRLINE","FLIGHT_NUMBER","diverted")
df_flight_div = df_fl_2.filter((df_fl_2.month.isNotNull()) & (df_fl_2.diverted=='1')).groupby("month","AIRLINE","FLIGHT_NUMBER").agg({"diverted":"sum"}).orderBy("month","AIRLINE").withColumnRenamed("sum(diverted)","Total_diverted")
df_flight_div.printSchema()
df_flight_div.show(10)

root
 |-- month: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- Total_diverted: double (nullable = true)

+-----+-------+-------------+--------------+
|month|AIRLINE|FLIGHT_NUMBER|Total_diverted|
+-----+-------+-------------+--------------+
|    1|     AA|         2454|           1.0|
|    1|     AA|         1110|           1.0|
|    1|     AA|         1447|           1.0|
|    1|     AA|         2395|           1.0|
|    1|     AA|         2280|           1.0|
|    1|     AA|         1138|           1.0|
|    1|     AA|           19|           1.0|
|    1|     AA|         1529|           1.0|
|    1|     AA|         1419|           1.0|
|    1|     AA|          235|           1.0|
+-----+-------+-------------+--------------+
only showing top 10 rows



In [7]:
# Arrival Delay count at airports 

df_join = df_fl.join( df_ap, (df_fl.DESTINATION_AIRPORT == df_ap.IATA_CODE),how='leftouter')
df_out=df_join.filter(col("ARRIVAL_DELAY")>"0").select(["AIRPORT","CITY","STATE"]).groupBy(["AIRPORT","CITY","STATE"]).count().orderBy(desc("count")).withColumnRenamed('count','NUm_Of_Flights')
df_out.na.drop(how="any").show(10, truncate=False)

+------------------------------------------------+-----------------+-----+--------------+
|AIRPORT                                         |CITY             |STATE|NUm_Of_Flights|
+------------------------------------------------+-----------------+-----+--------------+
|Hartsfield-Jackson Atlanta International Airport|Atlanta          |GA   |105603        |
|Chicago O'Hare International Airport            |Chicago          |IL   |104224        |
|Dallas/Fort Worth International Airport         |Dallas-Fort Worth|TX   |83312         |
|Los Angeles International Airport               |Los Angeles      |CA   |81985         |
|Denver International Airport                    |Denver           |CO   |71443         |
|San Francisco International Airport             |San Francisco    |CA   |55846         |
|Phoenix Sky Harbor International Airport        |Phoenix          |AZ   |54619         |
|George Bush Intercontinental Airport            |Houston          |TX   |53844         |
|McCarran 

In [8]:
# Departure Delay count at airports 

df_join = df_fl.join( df_ap, (df_fl.DESTINATION_AIRPORT == df_ap.IATA_CODE),how='leftouter')
df_out=df_join.filter(col("DEPARTURE_DELAY")>"0").select(["AIRPORT","CITY","STATE"]).groupBy(["AIRPORT","CITY","STATE"]).count().orderBy(desc("count")).withColumnRenamed('count','NUm_Of_Flights')
df_out.na.drop(how="any").show(10,truncate=False)

+------------------------------------------------+-----------------+-----+--------------+
|AIRPORT                                         |CITY             |STATE|NUm_Of_Flights|
+------------------------------------------------+-----------------+-----+--------------+
|Hartsfield-Jackson Atlanta International Airport|Atlanta          |GA   |110754        |
|Chicago O'Hare International Airport            |Chicago          |IL   |97990         |
|Los Angeles International Airport               |Los Angeles      |CA   |81828         |
|Denver International Airport                    |Denver           |CO   |77972         |
|Dallas/Fort Worth International Airport         |Dallas-Fort Worth|TX   |76151         |
|San Francisco International Airport             |San Francisco    |CA   |65917         |
|McCarran International Airport                  |Las Vegas        |NV   |54693         |
|Phoenix Sky Harbor International Airport        |Phoenix          |AZ   |54289         |
|George Bu

In [9]:
# Number of FLights between Airports In US

df_join.createOrReplaceTempView('flights')
df_Source_dest_airlines=df_fl.select(['ORIGIN_AIRPORT','DESTINATION_AIRPORT']).groupby('ORIGIN_AIRPORT','DESTINATION_AIRPORT').count().orderBy(desc("count")).withColumnRenamed('count','No_of_Active_flights')
df_Source_dest_airlines.show(10)

+--------------+-------------------+--------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|No_of_Active_flights|
+--------------+-------------------+--------------------+
|           SFO|                LAX|               13744|
|           LAX|                SFO|               13457|
|           JFK|                LAX|               12016|
|           LAX|                JFK|               12015|
|           LAS|                LAX|                9715|
|           LGA|                ORD|                9639|
|           LAX|                LAS|                9594|
|           ORD|                LGA|                9575|
|           SFO|                JFK|                8440|
|           JFK|                SFO|                8437|
+--------------+-------------------+--------------------+
only showing top 10 rows



In [10]:
# Flight Delayed resons and no of flights delyed with those reasons
df_cancelreason = df_fl.select("CANCELLATION_REASON")
df_out_new=df_cancelreason.na.drop(how="any").groupBy("CANCELLATION_REASON").count()

df_flight_delayed_Reas = df_out_new.withColumn("CANCELLATION_REASON", when(df_out_new.CANCELLATION_REASON == "D","SECURITY_REASON")       .when(df_out_new.CANCELLATION_REASON == "C","NATIONAL_AIRSYSTEM_DELAY")       .when(df_out_new.CANCELLATION_REASON == 'B',"WEATHER_DELAY")       .when(df_out_new.CANCELLATION_REASON == 'A',"AIRLINE_DELAY")       .otherwise(df_out_new.CANCELLATION_REASON))       .orderBy(desc("count"))      .withColumnRenamed('count','Numberofflights')
df_flight_delayed_Reas.show(truncate=False)

+------------------------+---------------+
|CANCELLATION_REASON     |Numberofflights|
+------------------------+---------------+
|WEATHER_DELAY           |48851          |
|AIRLINE_DELAY           |25262          |
|NATIONAL_AIRSYSTEM_DELAY|15749          |
|SECURITY_REASON         |22             |
+------------------------+---------------+



In [11]:
# Create a table and caluculate the average speed of each flight in both ways
# Calculate average speed by dividing the distance by the air_time (converted to hours).Use the .alias() method name
# Define avg_speed
avg_speed = (df_fl.DISTANCE/(df_fl.AIR_TIME/60)).alias("avg_speed")
speed_1 = df_fl.select('AIRLINE','ORIGIN_AIRPORT','DESTINATION_AIRPORT','TAIL_NUMBER', avg_speed)

#Using the Spark DataFrame method .selectExpr() 
speed_2 =df_fl.selectExpr('AIRLINE','ORIGIN_AIRPORT','DESTINATION_AIRPORT','TAIL_NUMBER','DISTANCE/(AIR_TIME/60) as avg_speed').orderBy(desc("avg_speed"))
speed_2.show(10)

+-------+--------------+-------------------+-----------+-----------------+
|AIRLINE|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|        avg_speed|
+-------+--------------+-------------------+-----------+-----------------+
|     F9|           TTN|                UST|     N926FR|787.8688524590165|
|     AA|           DFW|                TUS|     N425AA| 786.774193548387|
|     OO|           DIK|                MSP|     N913EV|            780.0|
|     OO|           ESC|                DTW|     N449SW|            765.0|
|     AA|           CLT|                ORD|     N940UW|764.6808510638298|
|     F9|           TTN|                UST|     N902FR|762.8571428571428|
|     F9|           TTN|                UST|     N939FR|762.8571428571428|
|     F9|           LAS|                CVG|     N918FR|762.7272727272726|
|     NK|           IAH|                FLL|     N508NK|762.6315789473684|
|     EV|           STL|                MSP|     N695CA|746.6666666666667|
+-------+--------------+-