In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("flights").getOrCreate()

In [2]:
csvFile = "data/departuredelays.csv"

In [3]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load(csvFile)

In [4]:
df.createOrReplaceTempView("us_delay_flights_tbl")

The US flight delays data set has five columns:

• The date column contains a string like 02190925. When converted, this maps to
02-19 09:25 am.

• The delay column gives the delay in minutes between the scheduled and actual
departure times. Early departures show negative numbers.

• The distance column gives the distance in miles from the origin airport to the
destination airport.

• The origin column contains the origin IATA airport code.

• The destination column contains the destination IATA airport code.

In [5]:
df.show(n=10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+
only showing top 10 rows


In [6]:
# Flights whose distance is greater than 1k miles
spark.sql("""SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC""").show(10)

from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
.where(col("distance") > 1000)
.orderBy(desc("distance"))).show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows


In [16]:
# ALl flights between san francisco (SFO) and chicago (ORD) with 2 hour delay
spark.sql("""SELECT * FROM us_delay_flights_tbl WHERE origin='SFO' AND destination='ORD' AND delay>120""").show(10)
from pyspark.sql.functions import col, desc
df.select("distance", "origin", "destination", "delay", "date").where(col("origin") == "SFO").where(col("destination")=="ORD").where(col("delay") > 120).show(n=10)

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011410|  124|    1604|   SFO|        ORD|
|1022330|  326|    1604|   SFO|        ORD|
|1021410|  190|    1604|   SFO|        ORD|
|1101410|  184|    1604|   SFO|        ORD|
|1190925|  297|    1604|   SFO|        ORD|
|1241110|  139|    1604|   SFO|        ORD|
|1301800|  167|    1604|   SFO|        ORD|
|1011237|  122|    1604|   SFO|        ORD|
|1032258|  163|    1604|   SFO|        ORD|
|1031920|  193|    1604|   SFO|        ORD|
+-------+-----+--------+------+-----------+
only showing top 10 rows
+--------+------+-----------+-----+-------+
|distance|origin|destination|delay|   date|
+--------+------+-----------+-----+-------+
|    1604|   SFO|        ORD|  124|1011410|
|    1604|   SFO|        ORD|  326|1022330|
|    1604|   SFO|        ORD|  190|1021410|
|    1604|   SFO|        ORD|  184|1101410|
|    1604|   SFO|        ORD|  297|1190925|
|    16

 label all US flights, regardless of origin and destination,
with an indication of the delays they experienced: Very Long Delays (> 6 hours),
Long Delays (2–6 hours), etc. We’ll add these human-readable labels in a new column
called Flight_Delays

In [24]:
spark.sql("""SELECT *,
CASE
    WHEN delay > 360 THEN 'Very Long Delays'
    WHEN delay > 120 AND delay < 360 THEN 'Long Delays'
    WHEN delay > 60 and delay < 120 THEN 'Short Delays'
    WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
    WHEN delay=0 THEN 'No Delays'
    ELSE 'Early'
END AS Flight_Delays
FROM us_delay_flights_tbl
ORDER BY origin, delay DESC""").show(10)

from pyspark.sql.functions import col, desc, when
df.withColumn(
    "Flight_Delays",
    when(col("delay") > 360, "Very Long Delays")
    .when((col("delay") > 120) & (col("delay") <= 360), "Long Delays")
    .when((col("delay") > 60) & (col("delay") <= 120), "Short Delays")
    .when((col("delay") > 0) & (col("delay") <= 60), "Tolerable Delays")
    .when(col("delay") == 0, "No Delays")
    .otherwise("Early")
).orderBy(col("origin"), col("delay").desc()).show(n=10)

+-------+-----+--------+------+-----------+-------------+
|   date|delay|distance|origin|destination|Flight_Delays|
+-------+-----+--------+------+-----------+-------------+
|1220625|  333|     602|   ABE|        ATL|  Long Delays|
|2120625|  305|     602|   ABE|        ATL|  Long Delays|
|3021725|  275|     602|   ABE|        ATL|  Long Delays|
|2150625|  257|     602|   ABE|        ATL|  Long Delays|
|2211215|  247|     602|   ABE|        ATL|  Long Delays|
|2211245|  247|     369|   ABE|        DTW|  Long Delays|
|1220607|  219|     569|   ABE|        ORD|  Long Delays|
|3201725|  211|     602|   ABE|        ATL|  Long Delays|
|3121245|  197|     369|   ABE|        DTW|  Long Delays|
|2141628|  192|     569|   ABE|        ORD|  Long Delays|
+-------+-----+--------+------+-----------+-------------+
only showing top 10 rows
+-------+-----+--------+------+-----------+-------------+
|   date|delay|distance|origin|destination|Flight_Delays|
+-------+-----+--------+------+-----------+----

In [25]:
# With each table spark has metadata, table, schema, description columns, partitions, physical locations
# All stored in metastore
# Spark by default uses Apache have metastore
