In [1]:
import pyspark

pyspark.__version__

'4.0.0'

## Simple Project Setup

In [None]:
import os
import shutil


os.makedirs("data/raw", exist_ok=True)
os.makedirs("data/lake", exist_ok=True)
os.makedirs("outputs/tables/", exist_ok=True)

shutil.move("flights.csv", "data/raw/flights.csv")
shutil.move("airlines.csv", "data/raw/airlines.csv")
shutil.move("airports.csv", "data/raw/airports.csv")

In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
        .appName("Flights-Delay")
        .master("local[*]")
        .getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/20 15:13:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data


In [3]:
from pyspark.sql import types as T

flight_schema = T.StructType([
    T.StructField("YEAR", T.IntegerType(), True),
    T.StructField("MONTH", T.IntegerType(), True),
    T.StructField("DAY", T.IntegerType(), True),
    T.StructField("DAY_OF_WEEK", T.IntegerType(), True),
    T.StructField("AIRLINE", T.StringType(),  True),
    T.StructField("FLIGHT_NUMBER", T.StringType(),  True),
    T.StructField("TAIL_NUMBER", T.StringType(),  True),
    T.StructField("ORIGIN_AIRPORT", T.StringType(),  True),
    T.StructField("DESTINATION_AIRPORT", T.StringType(),  True),
    T.StructField("SCHEDULED_DEPARTURE", T.IntegerType(), True),
    T.StructField("DEPARTURE_TIME", T.IntegerType(), True),
    T.StructField("DEPARTURE_DELAY", T.DoubleType(),  True),
    T.StructField("TAXI_OUT", T.DoubleType(),  True),
    T.StructField("WHEELS_OFF", T.IntegerType(), True),
    T.StructField("SCHEDULED_TIME", T.DoubleType(),  True),
    T.StructField("ELAPSED_TIME", T.DoubleType(),  True),
    T.StructField("AIR_TIME", T.DoubleType(),  True),
    T.StructField("DISTANCE", T.DoubleType(),  True),
    T.StructField("WHEELS_ON", T.IntegerType(), True),
    T.StructField("TAXI_IN", T.DoubleType(),  True),
    T.StructField("SCHEDULED_ARRIVAL", T.IntegerType(), True),
    T.StructField("ARRIVAL_TIME", T.IntegerType(), True),
    T.StructField("ARRIVAL_DELAY", T.DoubleType(),  True),
    T.StructField("DIVERTED", T.IntegerType(), True),
    T.StructField("CANCELLED", T.IntegerType(), True),
    T.StructField("CANCELLATION_REASON", T.StringType(),  True),
    T.StructField("AIR_SYSTEM_DELAY", T.DoubleType(),  True),
    T.StructField("SECURITY_DELAY", T.DoubleType(),  True),
    T.StructField("AIRLINE_DELAY", T.DoubleType(),  True),
    T.StructField("LATE_AIRCRAFT_DELAY", T.DoubleType(),  True),
    T.StructField("WEATHER_DELAY", T.DoubleType(),  True),
])

fl_raw = (spark.read.option("header", True).schema(flight_schema)
          .csv("data/raw/flights.csv"))

fl_raw.show(10)


25/09/20 15:13:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [4]:
airline_schema = T.StructType([
    T.StructField("IATA_CODE", T.StringType(), True),
    T.StructField("AIRLINE",   T.StringType(), True),
])

airlines = (spark.read
            .option("header", True)
            .schema(airline_schema)
            .csv("data/raw/airlines.csv"))

airlines.show()

+---------+--------------------+
|IATA_CODE|             AIRLINE|
+---------+--------------------+
|       UA|United Air Lines ...|
|       AA|American Airlines...|
|       US|     US Airways Inc.|
|       F9|Frontier Airlines...|
|       B6|     JetBlue Airways|
|       OO|Skywest Airlines ...|
|       AS|Alaska Airlines Inc.|
|       NK|    Spirit Air Lines|
|       WN|Southwest Airline...|
|       DL|Delta Air Lines Inc.|
|       EV|Atlantic Southeas...|
|       HA|Hawaiian Airlines...|
|       MQ|American Eagle Ai...|
|       VX|      Virgin America|
+---------+--------------------+



In [5]:
airport_schema = T.StructType([
    T.StructField("IATA_CODE", T.StringType(), True),
    T.StructField("AIRPORT",   T.StringType(), True),
    T.StructField("CITY",      T.StringType(), True),
    T.StructField("STATE",     T.StringType(), True),
    T.StructField("COUNTRY",   T.StringType(), True),
    T.StructField("LATITUDE",  T.DoubleType(), True),
    T.StructField("LONGITUDE", T.DoubleType(), True),
])

airports = (spark.read
            .option("header", True)
            .schema(airport_schema)
            .csv("data/raw/airports.csv"))

airports.show()

+---------+--------------------+-------------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|         CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+-------------+-----+-------+--------+----------+
|      ABE|Lehigh Valley Int...|    Allentown|   PA|    USA|40.65236|  -75.4404|
|      ABI|Abilene Regional ...|      Abilene|   TX|    USA|32.41132|  -99.6819|
|      ABQ|Albuquerque Inter...|  Albuquerque|   NM|    USA|35.04022|-106.60919|
|      ABR|Aberdeen Regional...|     Aberdeen|   SD|    USA|45.44906| -98.42183|
|      ABY|Southwest Georgia...|       Albany|   GA|    USA|31.53552| -84.19447|
|      ACK|Nantucket Memoria...|    Nantucket|   MA|    USA|41.25305| -70.06018|
|      ACT|Waco Regional Air...|         Waco|   TX|    USA|31.61129| -97.23052|
|      ACV|      Arcata Airport|Arcata/Eureka|   CA|    USA|40.97812|-124.10862|
|      ACY|Atlantic City Int...|Atlantic City|   NJ|    USA|39.45758| -74.57717|
|      ADK|        Adak Airp

# Write Data

In [6]:
from pyspark.sql import functions as F

fl = fl_raw.withColumn(
    "FL_DATE",
    F.to_date(F.format_string("%04d-%02d-%02d", F.col("YEAR"), F.col("MONTH"), F.col("DAY")))
)


(fl.write.mode("overwrite")
   .partitionBy("YEAR","MONTH")
   .parquet("data/lake/flights_parquet"))

airlines.write.mode("overwrite").parquet("data/lake/airlines_parquet")
airports.write.mode("overwrite").parquet("data/lake/airports_parquet")


25/09/20 15:13:58 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/09/20 15:13:58 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/09/20 15:13:58 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/09/20 15:13:58 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
25/09/20 15:13:58 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 63,33% for 12 writers
25/09/20 15:13:59 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
25/09/20 15:13:59 WARN MemoryManager: Total allocation exceeds 95,

# Read Parquet

In [7]:
flights = spark.read.parquet("data/lake/flights_parquet")
airlines = spark.read.parquet("data/lake/airlines_parquet")
airports = spark.read.parquet("data/lake/airports_parquet")

# Transformations

In [8]:
@F.udf(returnType=T.IntegerType())
def hhmm_to_hour_safe(x):
    if x is None:
        return None
    try:
        v = int(x)
        if v == 2400:
            return 0
        if v < 0:
            return None
        hh, mm = v // 100, v % 100
        return hh if 0 <= hh <= 23 and 0 <= mm <= 59 else None
    except Exception:
        return None

In [9]:
_cc_map = {"A": "Carrier", "B": "Weather", "C": "NAS", "D": "Security"}
@F.udf(T.StringType())
def cancel_code_to_label(c):
    if c is None:
        return None
    return _cc_map.get(str(c).strip().upper(), "Other/Unknown")

In [10]:
flights_transformed = (flights
    .withColumn("DEP_HOUR", (F.col("SCHEDULED_DEPARTURE")/100).cast("int")) # native
    .withColumn("DEP_HOUR_SAFE", hhmm_to_hour_safe("SCHEDULED_DEPARTURE")) # UDF
    .withColumn("IS_DELAYED_15", (F.col("ARRIVAL_DELAY") >= 15).cast("int")) # native
    .withColumn("DOW", F.date_format("FL_DATE", "E")) # native
    .withColumn("CANCEL_REASON_LABEL", cancel_code_to_label("CANCELLATION_REASON")) # UDF
)

flights_transformed.select("FL_DATE","DOW","DAY_OF_WEEK", "DEP_HOUR","DEP_HOUR_SAFE","IS_DELAYED_15", "CANCEL_REASON_LABEL").show(10)


[Stage 9:>                                                          (0 + 1) / 1]

+----------+---+-----------+--------+-------------+-------------+-------------------+
|   FL_DATE|DOW|DAY_OF_WEEK|DEP_HOUR|DEP_HOUR_SAFE|IS_DELAYED_15|CANCEL_REASON_LABEL|
+----------+---+-----------+--------+-------------+-------------+-------------------+
|2015-07-13|Mon|          1|      20|           20|            1|               NULL|
|2015-07-16|Thu|          4|      13|           13|            0|               NULL|
|2015-07-30|Thu|          4|       9|            9|            0|               NULL|
|2015-07-07|Tue|          2|       8|            8|            0|               NULL|
|2015-07-17|Fri|          5|       6|            6|            0|               NULL|
|2015-07-15|Wed|          3|      14|           14|            0|               NULL|
|2015-07-19|Sun|          7|      11|           11|            1|               NULL|
|2015-07-02|Thu|          4|      21|           21|            0|               NULL|
|2015-07-10|Fri|          5|       8|            8|   

                                                                                

# Enrichment

In [11]:
# Airlines Join
alN = airlines.withColumnRenamed("AIRLINE", "CARRIER_NAME")
flights_airlines_enriched = flights_transformed.join(F.broadcast(alN), flights_transformed.AIRLINE == alN.IATA_CODE, "left")

flights_airlines_enriched.select("CARRIER_NAME", "AIRLINE").show(10)


+--------------------+-------+
|        CARRIER_NAME|AIRLINE|
+--------------------+-------+
|Southwest Airline...|     WN|
|American Airlines...|     AA|
|American Airlines...|     AA|
|Southwest Airline...|     WN|
|Skywest Airlines ...|     OO|
|Skywest Airlines ...|     OO|
|Delta Air Lines Inc.|     DL|
|American Airlines...|     AA|
|Delta Air Lines Inc.|     DL|
|Delta Air Lines Inc.|     DL|
+--------------------+-------+
only showing top 10 rows


In [12]:
# Airport Join
airports_origin = airports.select(F.col("IATA_CODE").alias("ORIGIN_CODE"),
                F.col("AIRPORT").alias("ORIGIN_AIRPORT_NAME"),
                F.col("STATE").alias("ORIGIN_STATE"))

airports_dest = airports.select(F.col("IATA_CODE").alias("DEST_CODE"),
                F.col("AIRPORT").alias("DEST_AIRPORT_NAME"),
                F.col("STATE").alias("DEST_STATE"))

flights_enriched = (flights_airlines_enriched
  .join(F.broadcast(airports_origin), flights_airlines_enriched.ORIGIN_AIRPORT == F.col("ORIGIN_CODE"), "left")
  .join(F.broadcast(airports_dest), flights_airlines_enriched.DESTINATION_AIRPORT == F.col("DEST_CODE"), "left"))

flights_enriched.select("CARRIER_NAME", "ORIGIN_AIRPORT_NAME", "DEST_AIRPORT_NAME").show(10)

+--------------------+--------------------+--------------------+
|        CARRIER_NAME| ORIGIN_AIRPORT_NAME|   DEST_AIRPORT_NAME|
+--------------------+--------------------+--------------------+
|Southwest Airline...|Baltimore-Washing...|McCarran Internat...|
|American Airlines...|Miami Internation...|Chicago O'Hare In...|
|American Airlines...|LaGuardia Airport...|Chicago O'Hare In...|
|Southwest Airline...|John Wayne Airpor...|Oakland Internati...|
|Skywest Airlines ...|Nashville Interna...|Cincinnati/Northe...|
|Skywest Airlines ...|George Bush Inter...|El Paso Internati...|
|Delta Air Lines Inc.|Los Angeles Inter...|John F. Kennedy I...|
|American Airlines...|Philadelphia Inte...|Gen. Edward Lawre...|
|Delta Air Lines Inc.|Salt Lake City In...|Minneapolis-Saint...|
|Delta Air Lines Inc.|Hartsfield-Jackso...|Salt Lake City In...|
+--------------------+--------------------+--------------------+
only showing top 10 rows


In [13]:
# Save enriched data
(flights_enriched.write.mode("overwrite")
   .partitionBy("YEAR","MONTH")
   .parquet("data/lake/enriched_parquet"))


25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 84,44% for 9 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 76,00% for 10 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 69,09% for 11 writers
25/09/20 15:14:18 WARN MemoryManager: Total allocation exceeds 95,

# Flights Analysis (Filtering & Aggregating)

In [14]:
df = spark.read.parquet("data/lake/enriched_parquet")
df = df.filter((F.col("CANCELLED")==0) & (F.col("DIVERTED")==0))
df.show(5)

+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+----------+--------+-------------+-------------+---+-------------------+---------+--------------------+-----------+--------------------+------------+---------+--------------------+----------+----+-----+
|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|   FL_DATE|DEP_HOUR|DEP_HOU

In [22]:
by_airline = (df.groupBy("AIRLINE","CARRIER_NAME")
  .agg(F.count("*").alias("flights"),
       F.avg("IS_DELAYED_15").alias("p_delay"),
       F.avg("ARRIVAL_DELAY").alias("avg_arr_delay"))
  .filter("flights >= 500")
  .orderBy(F.desc("p_delay")))

by_airline.show(10, truncate=False)

+-------+----------------------------+-------+-------------------+------------------+
|AIRLINE|CARRIER_NAME                |flights|p_delay            |avg_arr_delay     |
+-------+----------------------------+-------+-------------------+------------------+
|NK     |Spirit Air Lines            |15259  |0.2965457455321419 |15.202253934382503|
|F9     |Frontier Airlines Inc.      |11808  |0.26027163235670964|12.036644742461775|
|B6     |JetBlue Airways             |34718  |0.2258594873902704 |6.753324917060568 |
|MQ     |American Eagle Airlines Inc.|38303  |0.21712083677401597|6.220919350399119 |
|UA     |United Air Lines Inc.       |67043  |0.2071143969578984 |5.590725226112382 |
|EV     |Atlantic Southeast Airlines |74357  |0.1966379334238754 |6.765445272873417 |
|VX     |Virgin America              |8046   |0.19512807634354595|4.687343043696635 |
|WN     |Southwest Airlines Co.      |164040 |0.19037268234682123|4.414619948228242 |
|US     |US Airways Inc.             |25833  |0.187881

In [23]:
by_origin = (df.groupBy("ORIGIN_AIRPORT","ORIGIN_AIRPORT_NAME","ORIGIN_STATE")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"))
  .filter("n >= 300")
  .orderBy(F.desc("p_delay")))

by_origin.show(10, truncate=False)


+--------------+------------------------------------------+------------+-----+-------------------+
|ORIGIN_AIRPORT|ORIGIN_AIRPORT_NAME                       |ORIGIN_STATE|n    |p_delay            |
+--------------+------------------------------------------+------------+-----+-------------------+
|ASE           |Aspen-Pitkin County Airport               |CO          |435  |0.3112244897959184 |
|ACY           |Atlantic City International Airport       |NJ          |486  |0.24634655532359082|
|ORD           |Chicago O'Hare International Airport      |IL          |36968|0.2437456324248777 |
|LGA           |LaGuardia Airport (Marine Air Terminal)   |NY          |12899|0.2434435575826682 |
|HPN           |Westchester County Airport                |NY          |966  |0.23969631236442515|
|BWI           |Baltimore-Washington International Airport|MD          |11140|0.23127005960568547|
|CID           |The Eastern Iowa Airport                  |IA          |904  |0.23103448275862068|
|SHV      

In [24]:
by_route = (df.groupBy("ORIGIN_AIRPORT","DESTINATION_AIRPORT")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"),
       F.avg("ARRIVAL_DELAY").alias("avg_arr_delay"))
  .filter("n >= 200")
  .orderBy(F.desc("p_delay")))

by_route.show(10, truncate=False)

+--------------+-------------------+---+-------------------+------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|n  |p_delay            |avg_arr_delay     |
+--------------+-------------------+---+-------------------+------------------+
|DFW           |TYS                |224|0.365296803652968  |22.164383561643834|
|ORD           |EWR                |500|0.34375            |19.84375          |
|STL           |LGA                |207|0.34054054054054056|14.886486486486486|
|LGA           |RDU                |265|0.3333333333333333 |14.710843373493976|
|BWI           |BUF                |221|0.3287037037037037 |14.337962962962964|
|RDU           |LGA                |230|0.3285024154589372 |14.159420289855072|
|MDW           |LGA                |275|0.3228346456692913 |16.992125984251967|
|ORD           |MSP                |863|0.32037691401649   |18.021201413427562|
|ORD           |IAH                |556|0.3192660550458716 |14.023853211009174|
|IAH           |LAX                |597|

In [25]:
by_hour = (df.groupBy("DEP_HOUR")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"))
  .orderBy("DEP_HOUR"))

by_hour.show(24, truncate=False)


+--------+-----+-------------------+
|DEP_HOUR|n    |p_delay            |
+--------+-----+-------------------+
|0       |1901 |0.15127388535031847|
|1       |659  |0.17619783616692428|
|2       |180  |0.16853932584269662|
|3       |99   |0.23958333333333334|
|4       |71   |0.18571428571428572|
|5       |15184|0.06982208794897617|
|6       |52906|0.0890655449862041 |
|7       |51059|0.11199362993928536|
|8       |49607|0.12959507944643772|
|9       |45540|0.1402918569678066 |
|10      |48586|0.1541017870205873 |
|11      |46627|0.16558512259918992|
|12      |46389|0.17477769503701432|
|13      |47316|0.19083100708610692|
|14      |42971|0.210315579984837  |
|15      |47576|0.2113329343365253 |
|16      |43712|0.23320878094348435|
|17      |50583|0.24563173398975022|
|18      |43051|0.25665733137133906|
|19      |43050|0.2615832859579306 |
|20      |34006|0.2629058490452382 |
|21      |24440|0.24499351979597808|
|22      |15318|0.22377808895112355|
|23      |5649 |0.19080007159477358|
+

In [27]:
by_dow = (df.groupBy("DOW")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"))
  .orderBy("DOW"))

by_dow.show(7, truncate=False)


+---+------+-------------------+
|DOW|n     |p_delay            |
+---+------+-------------------+
|Fri|111871|0.1913202491309386 |
|Mon|112362|0.19869793335836133|
|Sat|91519 |0.16002086408381147|
|Sun|106635|0.18315682922869775|
|Thu|113698|0.1987688783909527 |
|Tue|109415|0.18094562956468208|
|Wed|110980|0.18232241086277162|
+---+------+-------------------+



In [30]:
season = (F.when(F.col("MONTH").isin(12,1,2), "winter")
            .when(F.col("MONTH").isin(3,4,5), "spring")
            .when(F.col("MONTH").isin(6,7,8), "summer")
            .otherwise("autumn"))
df = df.withColumn("SEASON", season)



by_season = (df.groupBy("SEASON")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"),
       F.avg("ARRIVAL_DELAY").alias("avg_arr_delay"))
  .orderBy("SEASON"))

by_season.show(truncate=False)


+------+------+-------------------+--------------------+
|SEASON|n     |p_delay            |avg_arr_delay       |
+------+------+-------------------+--------------------+
|autumn|184480|0.13400619675517353|-0.27690315247624303|
|spring|193238|0.18290699082385273|4.219379938825685   |
|summer|199569|0.21078596083941717|6.876841697976805   |
|winter|179193|0.2151129959780116 |6.6963744482730805  |
+------+------+-------------------+--------------------+



In [32]:
dist_bins = (F.when(F.col("DISTANCE") < 500, "short")
               .when(F.col("DISTANCE") < 1500, "medium")
               .otherwise("long"))
df = df.withColumn("DIST_BIN", dist_bins)


by_dist = (df.groupBy("DIST_BIN")
  .agg(F.count("*").alias("n"),
       F.avg("IS_DELAYED_15").alias("p_delay"),
       F.avg("ARRIVAL_DELAY").alias("avg_arr_delay"))
  .orderBy("DIST_BIN"))

by_dist.show(truncate=False)


+--------+------+-------------------+------------------+
|DIST_BIN|n     |p_delay            |avg_arr_delay     |
+--------+------+-------------------+------------------+
|long    |102489|0.1838206993200233 |1.7663011832976403|
|medium  |378297|0.18938189720789997|4.619977460939917 |
|short   |275694|0.18148559991104193|5.066937247488788 |
+--------+------+-------------------+------------------+



In [34]:
probe = spark.read.parquet("data/lake/enriched_parquet")
print("###### Shuffle partitions: ######", spark.conf.get("spark.sql.shuffle.partitions"))

probe_rep = probe.repartition(48, "MONTH")  # mehr Parallelismus für Shuffles
print("###### Partitions nach repartition: ######", probe_rep.rdd.getNumPartitions())

# Partition Pruning (Sommer)
(probe_rep.filter("MONTH in (6,7,8)")
          .groupBy("AIRLINE")
          .agg(F.avg("ARRIVAL_DELAY").alias("avg_arr"))
          .explain(True))


                                                                                

###### Shuffle partitions: ###### 200




###### Partitions nach repartition: ###### 48
== Parsed Logical Plan ==
'Aggregate ['AIRLINE], ['AIRLINE, 'avg('ARRIVAL_DELAY) AS avg_arr#2327]
+- Filter MONTH#2325 IN (6,7,8)
   +- RepartitionByExpression [MONTH#2325], 48
      +- Relation [DAY#2281,DAY_OF_WEEK#2282,AIRLINE#2283,FLIGHT_NUMBER#2284,TAIL_NUMBER#2285,ORIGIN_AIRPORT#2286,DESTINATION_AIRPORT#2287,SCHEDULED_DEPARTURE#2288,DEPARTURE_TIME#2289,DEPARTURE_DELAY#2290,TAXI_OUT#2291,WHEELS_OFF#2292,SCHEDULED_TIME#2293,ELAPSED_TIME#2294,AIR_TIME#2295,DISTANCE#2296,WHEELS_ON#2297,TAXI_IN#2298,SCHEDULED_ARRIVAL#2299,ARRIVAL_TIME#2300,ARRIVAL_DELAY#2301,DIVERTED#2302,CANCELLED#2303,CANCELLATION_REASON#2304,AIR_SYSTEM_DELAY#2305,... 20 more fields] parquet

== Analyzed Logical Plan ==
AIRLINE: string, avg_arr: double
Aggregate [AIRLINE#2283], [AIRLINE#2283, avg(ARRIVAL_DELAY#2301) AS avg_arr#2327]
+- Filter MONTH#2325 IN (6,7,8)
   +- RepartitionByExpression [MONTH#2325], 48
      +- Relation [DAY#2281,DAY_OF_WEEK#2282,AIRLINE#2283,FLI



In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

ml = (df
      .withColumn("label", F.col("IS_DELAYED_15").cast("double"))
      .select("label","DEP_HOUR","MONTH","DISTANCE",
              "AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT")
      .na.drop())

cat = ["AIRLINE","ORIGIN_AIRPORT","DESTINATION_AIRPORT"]
idx = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat]
ohe = OneHotEncoder(inputCols=[f"{c}_idx" for c in cat],
                    outputCols=[f"{c}_ohe" for c in cat])
vec = VectorAssembler(
    inputCols=["DEP_HOUR","MONTH","DISTANCE"] + [f"{c}_ohe" for c in cat],
    outputCol="features"
)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)

pipe = Pipeline(stages=idx + [ohe, vec, lr])
train, test = ml.randomSplit([0.8, 0.2], seed=42)
model = pipe.fit(train)
pred  = model.transform(test)

auc = BinaryClassificationEvaluator(labelCol="label").evaluate(pred)
acc = pred.select((F.col("prediction")==F.col("label")).cast("int").alias("ok")).agg(F.avg("ok")).first()[0]
print(f"AUC={auc:.3f} | Accuracy={acc:.3f}")


25/09/20 15:59:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

AUC=0.637 | Accuracy=0.814


                                                                                