In [86]:
%%writefile airlines.csv
"AMD","COK","2024-02-24 08:00:00","2024-02-24 10:30:00","2024-02-24 08:50:00","2024-02-24 11:25:00","Akasa Air","airTraffic"
"COK","SXR","2024-02-25 12:00:00","2024-02-25 18:30:00","2024-02-25 12:15:00","2024-02-25 19:15:00","Vistara","mechanical"
"COK","AMD","2024-03-24 08:00:00","2024-03-24 10:40:00","2024-03-24 08:17:00","2024-03-24 11:00:00","Akasa Air","storm"
"SXR","AMD","2024-04-28 05:30:00","2024-04-28 07:35:00","2024-04-28 05:50:00","2024-04-28 07:55:00","Vistara","visibility"
"AMD","SXR","2024-07-25 07:00:00","2024-07-25 09:10:00","2024-07-25 07:10:00","2024-07-25 09:20:00","Akasa Air","airTraffic"
"AMD","CCU","2024-03-28 18:00:00","2024-03-28 20:25:00","2024-03-28 19:00:00","2024-03-28 21:15:00","Vistara","storm"
"SXR","COK","2024-06-28 23:00:00","2024-06-29 04:40:00","2024-06-28 23:40:00","2024-06-29 05:20:00","Vistara","airTraffic"
"COK","CCU","2024-06-24 18:00:00","2024-06-24 21:05:00","2024-06-24 18:05:00","2024-06-24 21:25:00","IndiGo","visibility"
"CCU","AMD","2024-07-25 04:00:00","2024-07-25 06:45:00","2024-07-25 04:30:00","2024-07-25 07:05:00","IndiGo","airTraffic"
"CCU","COK","2024-02-28 14:00:00","2024-02-28 17:10:00","2024-02-28 14:10:00","2024-02-28 17:40:00","IndiGo","mechanical"
"CCU","SXR","2024-04-21 17:00:00","2024-04-21 19:50:00","2024-04-21 17:30:00","2024-04-21 20:05:00","IndiGo","airTraffic"

Overwriting airlines.csv


In [87]:
#Schema to be imposed
schema = (
"origin_airport STRING, destination_airport STRING, scheduled_departure_time STRING, scheduled_arrival_time STRING, actual_departure_time STRING, actual_arrival_time STRING, carrier STRING, delay_reason STRING")

In [88]:
#Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [139]:
#Create dataframe for the given data using enforced Schema method
airline_df = spark.read.csv('/user/purvanakh61vgre/airlines.csv', schema=schema)

In [90]:
#Create a SparkSession
spark = SparkSession.builder.appName('pspyk11_purva').getOrCreate()
spark

In [140]:
airline_df.show()

+--------------+-------------------+------------------------+----------------------+---------------------+-------------------+---------+------------+
|origin_airport|destination_airport|scheduled_departure_time|scheduled_arrival_time|actual_departure_time|actual_arrival_time|  carrier|delay_reason|
+--------------+-------------------+------------------------+----------------------+---------------------+-------------------+---------+------------+
|           AMD|                COK|     2024-02-24 08:00:00|   2024-02-24 10:30:00|  2024-02-24 08:50:00|2024-02-24 11:25:00|Akasa Air|  airTraffic|
|           COK|                SXR|     2024-02-25 12:00:00|   2024-02-25 18:30:00|  2024-02-25 12:15:00|2024-02-25 19:15:00|  Vistara|  mechanical|
|           COK|                AMD|     2024-03-24 08:00:00|   2024-03-24 10:40:00|  2024-03-24 08:17:00|2024-03-24 11:00:00|Akasa Air|       storm|
|           SXR|                AMD|     2024-04-28 05:30:00|   2024-04-28 07:35:00|  2024-04-28 05:

In [141]:
airline_df = airline_df.withColumn('scheduled_departure_time', airline_df['scheduled_departure_time'].cast('timestamp'))
airline_df = airline_df.withColumn('scheduled_arrival_time', airline_df['scheduled_arrival_time'].cast('timestamp'))
airline_df = airline_df.withColumn('actual_departure_time', airline_df['actual_departure_time'].cast('timestamp'))
airline_df = airline_df.withColumn('actual_arrival_time', airline_df['actual_arrival_time'].cast('timestamp'))
airline_df.printSchema()


root
 |-- origin_airport: string (nullable = true)
 |-- destination_airport: string (nullable = true)
 |-- scheduled_departure_time: timestamp (nullable = true)
 |-- scheduled_arrival_time: timestamp (nullable = true)
 |-- actual_departure_time: timestamp (nullable = true)
 |-- actual_arrival_time: timestamp (nullable = true)
 |-- carrier: string (nullable = true)
 |-- delay_reason: string (nullable = true)



In [142]:
airline_df = airline_df.withColumn('Delay', expr('unix_timestamp(actual_departure_time) - unix_timestamp(scheduled_departure_time)'))

In [144]:
#Filter the dataframe to display all delayed flights
airline_df.filter(airline_df.Delay > 0).show()

+--------------+-------------------+------------------------+----------------------+---------------------+-------------------+---------+------------+-----+
|origin_airport|destination_airport|scheduled_departure_time|scheduled_arrival_time|actual_departure_time|actual_arrival_time|  carrier|delay_reason|Delay|
+--------------+-------------------+------------------------+----------------------+---------------------+-------------------+---------+------------+-----+
|           AMD|                COK|     2024-02-24 08:00:00|   2024-02-24 10:30:00|  2024-02-24 08:50:00|2024-02-24 11:25:00|Akasa Air|  airTraffic| 3000|
|           COK|                SXR|     2024-02-25 12:00:00|   2024-02-25 18:30:00|  2024-02-25 12:15:00|2024-02-25 19:15:00|  Vistara|  mechanical|  900|
|           COK|                AMD|     2024-03-24 08:00:00|   2024-03-24 10:40:00|  2024-03-24 08:17:00|2024-03-24 11:00:00|Akasa Air|       storm| 1020|
|           SXR|                AMD|     2024-04-28 05:30:00|   

In [146]:
#Calculate Average delay per source airport
airline_df.groupBy('origin_airport').agg(avg('Delay')).show()

+--------------+----------+
|origin_airport|avg(Delay)|
+--------------+----------+
|           SXR|    1800.0|
|           AMD|    2400.0|
|           CCU|    1400.0|
|           COK|     740.0|
+--------------+----------+



In [148]:
##Calculate Average delay per destination airport
airline_df.groupBy('destination_airport').agg(avg('Delay')).show()

+-------------------+----------+
|destination_airport|avg(Delay)|
+-------------------+----------+
|                SXR|    1100.0|
|                AMD|    1340.0|
|                CCU|    1950.0|
|                COK|    2000.0|
+-------------------+----------+



In [150]:
#Calculate Average delay per airline
airline_df.groupBy('carrier').agg(avg('Delay')).show()

+---------+----------+
|  carrier|avg(Delay)|
+---------+----------+
|Akasa Air|    1540.0|
|   IndiGo|    1125.0|
|  Vistara|    2025.0|
+---------+----------+



In [154]:
#Fetch airline with maximum delay (in single flight)
airline_df.orderBy('Delay',ascending=False).collect()[0]['carrier']

'Vistara'

In [183]:
#Fetch airline with maximum delay (overall time delay)
df = airline_df.groupBy('carrier').agg({'Delay': 'sum'})
df.orderBy('sum(Delay)',ascending=False).collect()[0][0]

'Vistara'

In [184]:
#Fetch airline with minimum delay (in single flight)
df = airline_df.groupBy('carrier').agg({'Delay': 'sum'})
df.orderBy('sum(Delay)').collect()[0][0]

'IndiGo'

In [185]:
#Fetch airline with minimum delay (overall time delay)
df = airline_df.groupBy('carrier').agg({'Delay': 'sum'})
df.orderBy('sum(Delay)').collect()[0][0]

'IndiGo'

In [190]:
#Calculate Average delay per reason
airline_df.groupBy('delay_reason').agg(avg('Delay')).show()

+------------+----------+
|delay_reason|avg(Delay)|
+------------+----------+
|  visibility|     750.0|
|  airTraffic|    1920.0|
|       storm|    2310.0|
|  mechanical|     750.0|
+------------+----------+



In [198]:
#Find most recurring cause of delays
airline_df.groupBy('delay_reason').count().orderBy('count', ascending=False).collect()[0][0]

'airTraffic'

In [216]:
#Identify the airline with best recovery time
# (Even though the flight started late, the airline recovered a few minutes in flight time)
df1 = airline_df.withColumn('arrival_delay', expr('unix_timestamp(actual_arrival_time) - unix_timestamp(scheduled_arrival_time)')).groupBy('carrier').agg(sum('arrival_delay')).orderBy('sum(arrival_delay)')
mini = df1.agg({'sum(arrival_delay)': 'min'}).first()[0]
df2 = df1.filter(col('sum(arrival_delay)') == mini).show()
d

+---------+------------------+
|  carrier|sum(arrival_delay)|
+---------+------------------+
|Akasa Air|              5100|
|   IndiGo|              5100|
+---------+------------------+



In [217]:
df2.rdd.flatMap(lambda x : [x[0]]).collect()

AttributeError: 'NoneType' object has no attribute 'rdd'