In [1]:
from pyspark.sql import SparkSession

In [3]:
spark = (
    SparkSession.builder
    .appName("Spark flight Analysis")
    .getOrCreate()
)

In [4]:
data = "./data/flights.csv"
df = (
    spark.read.format("csv")
      .option("inferSchema", "true")
      .option('header', 'true')
      .load(data)
)

                                                                                

In [5]:
df.printSchema()

root
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- WEEKDAY: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORG_AIR: string (nullable = true)
 |-- DEST_AIR: string (nullable = true)
 |-- SCHED_DEP: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DIST: integer (nullable = true)
 |-- SCHED_ARR: integer (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- DIVERTED: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)



In [6]:
df.createOrReplaceTempView("flight_data")

In [7]:
df.show()

+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|MONTH|DAY|WEEKDAY|AIRLINE|ORG_AIR|DEST_AIR|SCHED_DEP|DEP_DELAY|AIR_TIME|DIST|SCHED_ARR|ARR_DELAY|DIVERTED|CANCELLED|
+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|    1|  1|      4|     WN|    LAX|     SLC|     1625|     58.0|    94.0| 590|     1905|     65.0|       0|        0|
|    1|  1|      4|     UA|    DEN|     IAD|      823|      7.0|   154.0|1452|     1333|    -13.0|       0|        0|
|    1|  1|      4|     MQ|    DFW|     VPS|     1305|     36.0|    85.0| 641|     1453|     35.0|       0|        0|
|    1|  1|      4|     AA|    DFW|     DCA|     1555|      7.0|   126.0|1192|     1935|     -7.0|       0|        0|
|    1|  1|      4|     WN|    LAX|     MCI|     1720|     48.0|   166.0|1363|     2225|     39.0|       0|        0|
|    1|  1|      4|     UA|    IAH|     SAN|     1450|  

In [8]:
# Count the number of flights for each AIRLINE
airline_count_query = '''
        SELECT AIRLINE, COUNT(*) AS flight_count from flight_data GROUP BY AIRLINE
'''
airline_count_res = spark.sql(airline_count_query)
airline_count_res.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+------------+
|AIRLINE|flight_count|
+-------+------------+
|     UA|        7792|
|     NK|        1516|
|     AA|        8900|
|     EV|        5858|
|     B6|         543|
|     DL|       10601|
|     OO|        6588|
|     F9|        1317|
|     US|        1615|
|     MQ|        3471|
|     HA|         112|
|     AS|         768|
|     VX|         993|
|     WN|        8418|
+-------+------------+



                                                                                

In [9]:
# Find the average departure delay for each weekday
avg_dep_delay_query = '''
    SELECT WEEKDAY, AVG(DEP_DELAY) as avg_dep_delay from flight_data GROUP BY WEEKDAY
'''

avg_dep_delay = spark.sql(avg_dep_delay_query)
avg_dep_delay.show()

+-------+------------------+
|WEEKDAY|     avg_dep_delay|
+-------+------------------+
|      1|12.546447445684088|
|      6| 9.333668775158136|
|      3|10.582200534448704|
|      5|10.789574366331898|
|      4|11.505021018215787|
|      7| 11.14237123420797|
|      2|10.256382852392827|
+-------+------------------+



In [11]:
# Filter flights with a distance greater than 1000 miles and positive arrival delay
distance_arr_delay_query = '''
    SELECT * FROM flight_data WHERE DIST > 1000 AND ARR_DELAY > 0
'''
distance_arr_delay_result = spark.sql(distance_arr_delay_query)
distance_arr_delay_result.show(10)

+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|MONTH|DAY|WEEKDAY|AIRLINE|ORG_AIR|DEST_AIR|SCHED_DEP|DEP_DELAY|AIR_TIME|DIST|SCHED_ARR|ARR_DELAY|DIVERTED|CANCELLED|
+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|    1|  1|      4|     WN|    LAX|     MCI|     1720|     48.0|   166.0|1363|     2225|     39.0|       0|        0|
|    1|  1|      4|     NK|    DEN|     DTW|     1952|     37.0|   124.0|1123|       31|     54.0|       0|        0|
|    1|  1|      4|     AA|    LAX|     AUS|     1430|     33.0|   157.0|1242|     1925|     41.0|       0|        0|
|    1|  1|      4|     AA|    PHX|     MIA|     1559|    115.0|   230.0|1972|     2203|    117.0|       0|        0|
|    1|  1|      4|     AA|    LAS|     DFW|     1210|     12.0|   132.0|1055|     1650|     17.0|       0|        0|
|    1|  1|      4|     AA|    DFW|     DCA|     2010|  

In [13]:
# Now we will create new columns TOTAL_DELAY
from pyspark.sql import functions as F
df = df.withColumn("TOTAL_DELAY", F.col("DEP_DELAY") + F.col("ARR_DELAY"))

In [14]:
df.printSchema()

root
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- WEEKDAY: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- ORG_AIR: string (nullable = true)
 |-- DEST_AIR: string (nullable = true)
 |-- SCHED_DEP: integer (nullable = true)
 |-- DEP_DELAY: double (nullable = true)
 |-- AIR_TIME: double (nullable = true)
 |-- DIST: integer (nullable = true)
 |-- SCHED_ARR: integer (nullable = true)
 |-- ARR_DELAY: double (nullable = true)
 |-- DIVERTED: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- TOTAL_DELAY: double (nullable = true)



In [15]:
df.show(10)

+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+-----------+
|MONTH|DAY|WEEKDAY|AIRLINE|ORG_AIR|DEST_AIR|SCHED_DEP|DEP_DELAY|AIR_TIME|DIST|SCHED_ARR|ARR_DELAY|DIVERTED|CANCELLED|TOTAL_DELAY|
+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+-----------+
|    1|  1|      4|     WN|    LAX|     SLC|     1625|     58.0|    94.0| 590|     1905|     65.0|       0|        0|      123.0|
|    1|  1|      4|     UA|    DEN|     IAD|      823|      7.0|   154.0|1452|     1333|    -13.0|       0|        0|       -6.0|
|    1|  1|      4|     MQ|    DFW|     VPS|     1305|     36.0|    85.0| 641|     1453|     35.0|       0|        0|       71.0|
|    1|  1|      4|     AA|    DFW|     DCA|     1555|      7.0|   126.0|1192|     1935|     -7.0|       0|        0|        0.0|
|    1|  1|      4|     WN|    LAX|     MCI|     1720|     48.0|   166.0|1363|     2225|  

In [18]:
(
    df.select("AIRLINE", "AIR_TIME", "DIST")
    .where(F.col('TOTAL_DELAY') > 0)
    .orderBy(F.desc("DIST"))
 ).show(10)

+-------+--------+----+
|AIRLINE|AIR_TIME|DIST|
+-------+--------+----+
|     DL|   554.0|4502|
|     DL|   504.0|4502|
|     DL|   524.0|4502|
|     UA|   501.0|4243|
|     UA|   506.0|4243|
|     UA|   497.0|4243|
|     UA|   501.0|4243|
|     UA|   500.0|4243|
|     UA|   499.0|4243|
|     UA|   501.0|4243|
+-------+--------+----+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import max, min

df.select(max(col("DIST"))).show()

+---------+
|max(DIST)|
+---------+
|     4502|
+---------+



In [15]:
max_dist = df.select(max(col("DIST"))).first()[0]
max_dist

4502

In [16]:
(
    df.select("AIRLINE", "AIR_TIME")
    .where(col("DIST") == max_dist)
).show()

+-------+--------+
|AIRLINE|AIR_TIME|
+-------+--------+
|     DL|   566.0|
|     DL|   551.0|
|     DL|   554.0|
|     DL|   560.0|
|     DL|   539.0|
|     DL|   504.0|
|     DL|   524.0|
|     DL|   533.0|
|     DL|   560.0|
|     DL|   577.0|
+-------+--------+



Windowing 

In [17]:
import pyspark.sql as sql
import pyspark.sql.functions as F

In [18]:

airline_window = sql.Window.partitionBy('AIRLINE').orderBy(F.col("DIST").desc())

df = df.withColumn("MAX_DIST", max("DIST").over(airline_window))

df.show()

+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+-----------+--------+
|MONTH|DAY|WEEKDAY|AIRLINE|ORG_AIR|DEST_AIR|SCHED_DEP|DEP_DELAY|AIR_TIME|DIST|SCHED_ARR|ARR_DELAY|DIVERTED|CANCELLED|TOTAL_DELAY|MAX_DIST|
+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+-----------+--------+
|    2|  4|      3|     AA|    DFW|     HNL|     1315|     15.0|   484.0|3784|     1758|      6.0|       0|        0|       21.0|    3784|
|    2|  9|      1|     AA|    DFW|     HNL|     1315|     -5.0|   489.0|3784|     1758|     -4.0|       0|        0|       -9.0|    3784|
|    2| 24|      2|     AA|    DFW|     HNL|     1315|    169.0|   484.0|3784|     1758|    153.0|       0|        0|      322.0|    3784|
|    3|  7|      6|     AA|    DFW|     HNL|     1550|     56.0|   507.0|3784|     2032|     61.0|       0|        0|      117.0|    3784|
|    3| 18|      3|     AA|