In [0]:
import time
from pyspark.sql.functions import col, avg, max, count, when, sum, round

# Load the data
df_base = spark.read.csv("/databricks-datasets/flights/departuredelays.csv", 
                          header=True, inferSchema=True)

df_large = df_base
for i in range(9):
    df_large = df_large.union(df_base)

In [0]:
# 1. Data Processing Pipeline
# Apply transformations

# Filter the data
# 1. Filter flights with delay > 60 minutes
df_filtered1 = df_large.filter(col("delay") > 60)

# Filter flights originating from New York airports
df_filtered2 = df_filtered1.filter(col("origin").isin(["JFK", "LGA", "EWR"]))

# Column transformation using withColumn
# Add a delay category column
df_transformed = df_filtered2.withColumn(
    "delay_category",
    when(col("delay") > 180, "Very High")
    .when(col("delay") > 120, "High")
    .otherwise("Moderate")
)

# groupBy with aggregation
df_agg = df_transformed.groupBy("origin", "destination") \
                       .agg(
                           avg("delay").alias("avg_delay"),
                           max("delay").alias("max_delay"),
                           count("*").alias("num_flights")
                       ) \
                       .filter(col("num_flights") > 50)  

# Complex Aggregation
## Goal: For each origin-destination pair:
### 1. Average delay
### 2. Maximum delay
### 3. Count of flights
### 4. Count of "Very High" delay flights
### 5. Percentage of flights with delay > 120 minutes

df_agg = df_transformed.groupBy("origin", "destination") \
    .agg(
        avg("delay").alias("avg_delay"),
        max("delay").alias("max_delay"),
        count("*").alias("num_flights"),
        count(when(col("delay_category") == "Very High", 1)).alias("very_high_delay_count"),
        round(
            100 * sum(when(col("delay") > 120, 1).otherwise(0)) / count("*"), 2
        ).alias("pct_delay_over_120")
    ) \
    .filter(col("num_flights") > 50)

In [0]:
# SQL queries

df_agg.createOrReplaceTempView("flights_summary")

sql_result1 = spark.sql("""
    SELECT origin, destination, avg_delay, pct_delay_over_120
    FROM flights_summary
    WHERE pct_delay_over_120 > 30
    ORDER BY pct_delay_over_120 DESC
""")

sql_result2 = spark.sql("""
    SELECT origin, COUNT(*) AS num_high_delay_routes
    FROM flights_summary
    WHERE avg_delay > 120
    GROUP BY origin
    ORDER BY num_high_delay_routes DESC
""")

# Show results
sql_result1.show(5)
sql_result2.show()

+------+-----------+------------------+------------------+
|origin|destination|         avg_delay|pct_delay_over_120|
+------+-----------+------------------+------------------+
|   JFK|        EGE|261.61538461538464|             84.62|
|   JFK|        SAT|175.66666666666666|             83.33|
|   JFK|        HOU|228.85714285714286|             71.43|
|   EWR|        STT|             152.5|             66.67|
|   JFK|        PIT|143.11111111111111|             66.67|
+------+-----------+------------------+------------------+
only showing top 5 rows
+------+---------------------+
|origin|num_high_delay_routes|
+------+---------------------+
|   JFK|                   44|
|   EWR|                   34|
|   LGA|                   28|
+------+---------------------+



In [0]:
# Write results to a table
df_agg.write.mode("overwrite").saveAsTable("flights_summary_table")

spark.sql("SELECT * FROM flights_summary_table").show(10)

+------+-----------+------------------+---------+-----------+---------------------+------------------+
|origin|destination|         avg_delay|max_delay|num_flights|very_high_delay_count|pct_delay_over_120|
+------+-----------+------------------+---------+-----------+---------------------+------------------+
|   EWR|        ORF|  94.3529411764706|      170|        170|                    0|             17.65|
|   EWR|        FLL|120.73873873873873|      307|       1110|                  160|             41.44|
|   EWR|        DEN|131.43548387096774|      292|        620|                  140|             45.16|
|   EWR|        DSM|102.28571428571429|      127|         70|                    0|             28.57|
|   EWR|        SAV|121.36363636363636|      246|        220|                   50|             40.91|
|   EWR|        GSP|107.95454545454545|      229|        220|                   30|             27.27|
|   EWR|        SAN|145.55555555555554|      348|        180|            

In [0]:
# 2. Performance Analysis
df_agg.explain(True)

== Parsed Logical Plan ==
'Filter '`>`('num_flights, 50)
+- 'Aggregate ['origin, 'destination], ['origin, 'destination, 'avg('delay) AS avg_delay#11225, 'max('delay) AS max_delay#11226, 'count(*) AS num_flights#11227, 'count('when('`==`('delay_category, Very High), 1)) AS very_high_delay_count#11228, 'round('`/`('`*`(100, 'sum('when('`>`('delay, 120), 1, 0))), 'count(*)), 2) AS pct_delay_over_120#11229]
   +- Project [date#11125, delay#11126, distance#11127, origin#11128, destination#11129, CASE WHEN (delay#11126 > 180) THEN Very High WHEN (delay#11126 > 120) THEN High ELSE Moderate END AS delay_category#11223]
      +- Filter origin#11128 IN (JFK,LGA,EWR)
         +- Filter (delay#11126 > 60)
            +- Union false, false
               :- Union false, false
               :  :- Union false, false
               :  :  :- Union false, false
               :  :  :  :- Union false, false
               :  :  :  :  :- Union false, false
               :  :  :  :  :  :- Union false, fa

In [0]:
# 3. Actions vs. Transformations
# Transformation: Select specific columns (lazy)
start_transformation_time = time.time()
transformation = df_large.select("date", "delay", "distance", "origin", "destination")
end_transformation_time = time.time()
transformation_time = end_transformation_time - start_transformation_time

# Action: Count the number of records (eager)
start_action_count_time = time.time()
record_count = transformation.count()
end_action_count_time = time.time()
action_count_time = end_action_count_time - start_action_count_time

# Action: Show the first few rows (eager)
start_action_show_time = time.time()
sample_data = transformation.show()
end_action_show_time = time.time()
action_show_time = end_action_show_time - start_action_show_time

# Display results
print(f"Transformation Time: {transformation_time:.4f} seconds")
print(f"Record Count: {record_count}")
print(f"Action (Show) Time: {action_show_time:.4f} seconds")

+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
|1030605|    0|     602|   ABE|        ATL|
|1041243|   10|     602|   ABE|        ATL|
|1040605|   28|     602|   ABE|        ATL|
|1051245|   88|     602|   ABE|        ATL|
|1050605|    9|     602|   ABE|        ATL|
|1061215|   -6|     602|   ABE|        ATL|
|1061725|   69|     602|   ABE|        ATL|
|1061230|    0|     369|   ABE|        DTW|
|1060625|   -3|     602|   ABE|        ATL|
|1070600|    0|     369|   ABE|        DTW|
|1071725|    0|     602|   ABE|        ATL|
|1071230|    0|     369|   ABE|        DTW|
|1070625|    0|     602|   ABE|        ATL|
|1071219|    0|     569|   ABE|        ORD|
|1080600|    0|     369|   ABE| 

In [0]:
# 4. Machine Learning
from pyspark.sql.functions import when
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier

# Create a new column 'Delayed' (1 if delay > 0, 0 otherwise)
df_large = df_large.withColumn("Delayed", when(col("delay") > 0, 1).otherwise(0))

# Index categorical features and one-hot encode the indexed features
origin_indexer = StringIndexer(inputCol="origin", outputCol="origin_index")
destination_indexer = StringIndexer(inputCol="destination", outputCol="destination_index")
origin_encoder = OneHotEncoder(inputCols=["origin_index"], outputCols=["origin_vec"])
destination_encoder = OneHotEncoder(inputCols=["destination_index"], outputCols=["destination_vec"])

features = ["distance", "origin_vec"]  
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")

random_forest = RandomForestClassifier(numTrees=5, maxDepth=5, featuresCol="features", labelCol="Delayed")

pipeline = Pipeline(stages=[origin_indexer, destination_indexer, origin_encoder, destination_encoder, vector_assembler, random_forest])
train_data, test_data = df_large.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)

predictions = model.transform(test_data)

# Show some predictions
predictions.select("delay", "Delayed", "prediction").show(10)


+-----+-------+----------+
|delay|Delayed|prediction|
+-----+-------+----------+
|   -8|      0|       0.0|
|   -4|      0|       0.0|
|   -7|      0|       0.0|
|    0|      0|       0.0|
|    5|      1|       0.0|
|    0|      0|       0.0|
|   -6|      0|       0.0|
|   -3|      0|       0.0|
|    0|      0|       0.0|
|   -9|      0|       0.0|
+-----+-------+----------+
only showing top 10 rows
