In [None]:
import os
import shutil

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator


In [None]:
spark = SparkSession.builder \
    .appName("AirlinesDelay-SQL-Notebook") \
    .getOrCreate()

sc = spark.sparkContext


In [None]:
INPUT_CSV = "data/airlines_delay.csv"

df_raw = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(INPUT_CSV))

df_raw.printSchema()
df_raw.show(5, truncate=False)

In [None]:
df = df_raw

df = (df
      .withColumn("Time", F.col("Time").cast("int"))
      .withColumn("Length", F.col("Length").cast("int"))
      .withColumn("DayOfWeek", F.col("DayOfWeek").cast("int"))
      .withColumn("Class", F.col("Class").cast("int"))
      .withColumn("Airline", F.trim(F.col("Airline")))
      .withColumn("AirportFrom", F.trim(F.col("AirportFrom")))
      .withColumn("AirportTo", F.trim(F.col("AirportTo")))
)

df = df.dropna(subset=["Time","Length","DayOfWeek","Airline","AirportFrom","AirportTo","Class"]).cache()

print("Rows:", df.count())
df.groupBy("Class").count().orderBy("Class").show()

In [None]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
print("train:", train_df.count())
print("test :", test_df.count())

train_df.createOrReplaceTempView("train_flights")
test_df.createOrReplaceTempView("test_flights")
df.createOrReplaceTempView("flights")

### Advanced SQL Query 1 (Window): Airline × Hour + Moving Average

In [None]:
sql_airline_hour_ma = spark.sql("""
WITH hourly AS (
  SELECT
    Airline,
    CAST(FLOOR(Time/60) AS INT) AS hour,
    COUNT(*) AS flight_cnt,
    AVG(CAST(Class AS DOUBLE)) AS delay_rate
  FROM flights
  GROUP BY Airline, CAST(FLOOR(Time/60) AS INT)
),
with_ma AS (
  SELECT
    Airline,
    hour,
    flight_cnt,
    delay_rate,
    AVG(delay_rate) OVER (
      PARTITION BY Airline
      ORDER BY hour
      ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
    ) AS delay_rate_ma_3h
  FROM hourly
)
SELECT *
FROM with_ma
WHERE flight_cnt >= 30
ORDER BY Airline, hour
""")

sql_airline_hour_ma.show(50, truncate=False)


### Advanced SQL Query 2 (CTE + Rank + Percentile): Origin bazında top rotalar

In [None]:
sql_top_routes_by_origin = spark.sql("""
WITH route_stats AS (
  SELECT
    AirportFrom,
    AirportTo,
    COUNT(*) AS flight_cnt,
    AVG(CAST(Class AS DOUBLE)) AS delay_rate,
    AVG(Length) AS avg_length,
    AVG(Time) AS avg_time
  FROM flights
  GROUP BY AirportFrom, AirportTo
),
ranked AS (
  SELECT
    *,
    DENSE_RANK() OVER (
      PARTITION BY AirportFrom
      ORDER BY delay_rate DESC
    ) AS rnk,
    percentile_approx(delay_rate, 0.90) OVER (PARTITION BY AirportFrom) AS p90_delay_rate_origin
  FROM route_stats
)
SELECT
  AirportFrom, AirportTo, flight_cnt, delay_rate, avg_length, avg_time, p90_delay_rate_origin
FROM ranked
WHERE flight_cnt >= 50 AND rnk <= 3
ORDER BY AirportFrom, delay_rate DESC
""")

sql_top_routes_by_origin.show(80, truncate=False)


### “SQL used in application”: Train’den feature üret → Train/Test’e join

In [None]:
airline_feats = spark.sql("""
SELECT
  Airline,
  COUNT(*) AS airline_flight_cnt_train,
  AVG(CAST(Class AS DOUBLE)) AS airline_delay_rate_train
FROM train_flights
GROUP BY Airline
""")

route_feats = spark.sql("""
SELECT
  AirportFrom,
  AirportTo,
  COUNT(*) AS route_flight_cnt_train,
  AVG(CAST(Class AS DOUBLE)) AS route_delay_rate_train
FROM train_flights
GROUP BY AirportFrom, AirportTo
""")

airline_feats.show(5, truncate=False)
route_feats.show(5, truncate=False)


In [None]:
global_delay = train_df.select(F.avg(F.col("Class").cast("double")).alias("g")).collect()[0]["g"]
print("Global train delay rate:", global_delay)

train_fe = (train_df
    .join(airline_feats, on="Airline", how="left")
    .join(route_feats, on=["AirportFrom","AirportTo"], how="left")
    .fillna({
        "airline_flight_cnt_train": 0,
        "airline_delay_rate_train": global_delay,
        "route_flight_cnt_train": 0,
        "route_delay_rate_train": global_delay
    })
).cache()

test_fe = (test_df
    .join(airline_feats, on="Airline", how="left")
    .join(route_feats, on=["AirportFrom","AirportTo"], how="left")
    .fillna({
        "airline_flight_cnt_train": 0,
        "airline_delay_rate_train": global_delay,
        "route_flight_cnt_train": 0,
        "route_delay_rate_train": global_delay
    })
).cache()

train_fe.select("Airline","airline_delay_rate_train","route_delay_rate_train","Class").show(5, truncate=False)


### Mini “Application”: Bu SQL feature’larla model eğitip metrik bas (raporda çok iyi durur)

In [None]:
label_col = "Class"
cat_cols = ["Airline", "AirportFrom", "AirportTo"]
num_cols = [
    "Time", "Length", "DayOfWeek",
    "airline_flight_cnt_train", "airline_delay_rate_train",
    "route_flight_cnt_train", "route_delay_rate_train"
]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in cat_cols]
encoder = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in cat_cols],
    outputCols=[f"{c}_ohe" for c in cat_cols],
    handleInvalid="keep"
)
assembler = VectorAssembler(inputCols=num_cols + [f"{c}_ohe" for c in cat_cols], outputCol="features")

lr = LogisticRegression(featuresCol="features", labelCol=label_col, maxIter=50)

pipe = Pipeline(stages=indexers + [encoder, assembler, lr])
m = pipe.fit(train_fe)
pred = m.transform(test_fe)

acc_eval = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="accuracy")
f1_eval  = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="f1")
auc_eval = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")

print("SQL-feature LR -> Accuracy:", acc_eval.evaluate(pred))
print("SQL-feature LR -> F1      :", f1_eval.evaluate(pred))
print("SQL-feature LR -> AUC     :", auc_eval.evaluate(pred))

pred.groupBy("Class").pivot("prediction", [0.0, 1.0]).count().na.fill(0).show()


In [None]:
OUT_SQL1 = "outputs/sql/airline_hour_ma"
OUT_SQL2 = "outputs/sql/top_routes_by_origin"
OUT_FEATS_AIRLINE = "outputs/sql_features/airline_feats"
OUT_FEATS_ROUTE   = "outputs/sql_features/route_feats"

for p in [OUT_SQL1, OUT_SQL2, OUT_FEATS_AIRLINE, OUT_FEATS_ROUTE]:
    if os.path.exists(p):
        shutil.rmtree(p)


In [None]:
sql_airline_hour_ma.coalesce(1).write.mode("overwrite").option("header", True).csv(OUT_SQL1)
sql_top_routes_by_origin.coalesce(1).write.mode("overwrite").option("header", True).csv(OUT_SQL2)

airline_feats.coalesce(1).write.mode("overwrite").option("header", True).csv(OUT_FEATS_AIRLINE)
route_feats.coalesce(1).write.mode("overwrite").option("header", True).csv(OUT_FEATS_ROUTE)

print("Saved:")
print(" -", OUT_SQL1)
print(" -", OUT_SQL2)
print(" -", OUT_FEATS_AIRLINE)
print(" -", OUT_FEATS_ROUTE)
