## Project Part 1: Distributed Spark ML Pipeline

This notebook implements the mandatory steps for Project Part 1, focusing on distributed data handling, resource management, and training a machine learning model using Spark MLlib on a cluster with one master and two worker nodes.

---

### Step 1: Cluster Setup & Initial Spark Session

**Requirement:** Set up a Spark cluster using Docker, VMware, or VirtualBox with 1 master node and 2 worker nodes. The setup must be configured for distributed task management and communication.

In [None]:
from pyspark.sql import SparkSession

# Note: Initial configuration for cluster connection and basic resources is set here.
# We will further refine resource settings in Step 6.
spark = (
    SparkSession.builder.appName("FlightDelay-SparkSQL-Preprocessing")
    .master("spark://spark-master:7077")
    .config("spark.sql.shuffle.partitions", "6")
    .config("spark.executor.memory", "2g")
    .getOrCreate()
)

print("Spark Session initialized on cluster master.")
print(f"Spark UI (Monitor): http://localhost:4040")

### Step 2 & 3: Manual Data Splitting & Loading on Each Node

**Requirement:** The dataset is manually split across the master and worker nodes (`/data/part1.csv`, `/data/part2.csv`, `/data/part3.csv`). Each node loads its part, and Spark combines them into a unified DataFrame for processing.


In [None]:
import socket

hostname = socket.gethostname()
print(f"I am running on: {hostname}")

part_map = {
    "spark-master":  "/data/part1.csv",
    "spark-worker1": "/data/part2.csv",
    "spark-worker2": "/data/part3.csv"
}

# 3. Data Loading on Each Node (Simulated/Demonstrated):
# In a real distributed scenario, each executor/node would load its local file.
my_file = part_map.get(hostname, "/data/part1.csv")
df_my_part = spark.read.csv(my_file, header=True, inferSchema=True)

print(f"I only loaded: {my_file}")
print(f"Rows in my part: {df_my_part.count():,}")

# The simplest way to load the unified dataset across all nodes' partitions is via wildcards (Spark handles distribution).
# This simulates the unified DataFrame Spark works with after loading individual pieces.
df_full = spark.read.csv("/data/part*.csv", header=True, inferSchema=True).withColumnRenamed("ArrDelay", "label")
df_full.createOrReplaceTempView("flights_raw")
print(f"\nFull unified dataset (Raw): {df_full.count():,} rows")

### Step 4: Data Partitioning and Processing with Spark SQL

**Requirement:** Repartition the dataset for parallel processing, perform initial data cleaning, exploration, and feature engineering using SQL queries.

In [None]:
# 4a. Cleaning, Transformation, and Repartitioning (via DataFrame API for caching)
# The `.repartition(6)` step distributes the data across the cluster for parallel processing.
clean_df = spark.table("flights_raw") \
    .where("Cancelled = 0 AND Diverted = 0 AND label IS NOT NULL") \
    .selectExpr(
        "CAST(label AS DOUBLE) AS label",
        "CAST(DepDelay AS DOUBLE) AS DepDelay",
        "CAST(Distance AS DOUBLE) AS Distance",
        "CAST(TaxiIn AS DOUBLE) AS TaxiIn",
        "CAST(TaxiOut AS DOUBLE) AS TaxiOut",
        "UniqueCarrier",
        "Origin",
        "Dest",
        "DayOfWeek"
) \
    .repartition(6) \
    .cache()

print(f"Dataset repartitioned into {clean_df.rdd.getNumPartitions()} partitions")
print(f"Rows after filtering: {clean_df.count():,}")

# 4b. Feature Engineering and Exploration using Spark SQL
clean_df.createOrReplaceTempView("flights_sql")

# Add feature engineering using SQL
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW flights_features AS
SELECT 
    *,
    -- Feature engineering with SQL
    CASE WHEN DayOfWeek IN (6,7) THEN 1.0 ELSE 0.0 END AS IsWeekend,
    Distance * 1.60934 AS Distance_km,
    CASE WHEN DepDelay > 15 THEN 1.0 ELSE 0.0 END AS WasDepartureDelayed
FROM flights_sql
""")

print("\n--- Data Exploration ---")
spark.sql("SELECT COUNT(*) AS total_flights FROM flights_features").show(1)
spark.sql("SELECT AVG(label) AS avg_delay_min FROM flights_features").show(1)
spark.sql("SELECT UniqueCarrier, AVG(label) AS avg_delay, COUNT(*) AS flights FROM flights_features GROUP BY UniqueCarrier ORDER BY avg_delay DESC LIMIT 5").show()

### Step 5: Machine Learning Pipeline (MLlib)

**Requirement:** Implement a machine learning pipeline using Spark MLlib, including feature transformations, model training, and model evaluation.

In [None]:
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time

# --- 5a. Data Preparation and Preprocessing Pipeline ---
data = spark.table("flights_features").cache()
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)
print(f"Training set: {train_df.count():,} rows")
print(f"Test set: {test_df.count():,} rows")

# Reduce cardinality for categorical features to keep the Decision Tree/Random Forest models performant
# This step replaces less frequent values with 'Other' (required to be outside the MLlib Pipeline)
top_origins = [r[0] for r in train_df.groupBy("Origin").count().orderBy("count", ascending=False).limit(30).select("Origin").collect()]
top_dests = [r[0] for r in train_df.groupBy("Dest").count().orderBy("count", ascending=False).limit(30).select("Dest").collect()]

def apply_grouping(df):
    df = df.withColumn("Origin_group", when(col("Origin").isin(top_origins), col("Origin")).otherwise("Other"))
    df = df.withColumn("Dest_group",   when(col("Dest").isin(top_dests),   col("Dest")).otherwise("Other"))
    return df

train_grouped = apply_grouping(train_df)
test_grouped = apply_grouping(test_df)

# Define MLlib preprocessing stages
indexer_carrier = StringIndexer(inputCol="UniqueCarrier", outputCol="carrier_idx", handleInvalid="keep")
indexer_origin = StringIndexer(inputCol="Origin_group", outputCol="origin_idx", handleInvalid="keep")
indexer_dest = StringIndexer(inputCol="Dest_group", outputCol="dest_idx", handleInvalid="keep")

assembler = VectorAssembler(
    inputCols=["DepDelay", "Distance_km", "TaxiIn", "TaxiOut", "IsWeekend", 
               "carrier_idx", "origin_idx", "dest_idx"], 
    outputCol="features"
)

# Define models (estimators)
lr = LinearRegression(featuresCol="features", labelCol="label", maxIter=20)
rf = RandomForestRegressor(featuresCol="features", labelCol="label", numTrees=50, maxDepth=10, seed=42)

# Build two full pipelines: Preprocessing + Model
lr_pipeline = Pipeline(stages=[indexer_carrier, indexer_origin, indexer_dest, assembler, lr])
rf_pipeline = Pipeline(stages=[indexer_carrier, indexer_origin, indexer_dest, assembler, rf])

print("\nMLlib pipelines defined and ready for fitting (training).")

In [None]:
# --- 5b. Model Training & Regression Evaluation ---
evaluator_rmse = RegressionEvaluator(labelCol="label", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="label", metricName="r2")
evaluator_mae = RegressionEvaluator(labelCol="label", metricName="mae")

print("\n--- Training Linear Regression ---")
start_lr = time.time()
lr_model = lr_pipeline.fit(train_grouped)
lr_time = time.time() - start_lr
lr_pred = lr_model.transform(test_grouped)
lr_rmse = evaluator_rmse.evaluate(lr_pred)
lr_r2 = evaluator_r2.evaluate(lr_pred)
lr_mae = evaluator_mae.evaluate(lr_pred)
lr_pred.cache()

print("\n--- Training Random Forest ---")
start_rf = time.time()
rf_model = rf_pipeline.fit(train_grouped)
rf_time = time.time() - start_rf
rf_pred = rf_model.transform(test_grouped)
rf_rmse = evaluator_rmse.evaluate(rf_pred)
rf_r2 = evaluator_r2.evaluate(rf_pred)
rf_mae = evaluator_mae.evaluate(rf_pred)
rf_pred.cache()

print("\n--- Regression Results ---")
print(f"Linear Regression → RMSE: {lr_rmse:.2f} min | R²: {lr_r2:.4f} | MAE: {lr_mae:.2f} min | Time: {lr_time:.1f}s")
print(f"Random Forest → RMSE: {rf_rmse:.2f} min | R²: {rf_r2:.4f} | MAE: {rf_mae:.2f} min | Time: {rf_time:.1f}s")

# Save models for later use
lr_model.save("/workspace/models/lr_model_final")
rf_model.save("/workspace/models/rf_model_final")
print("\nModels saved successfully!")

In [None]:
# --- 5c. Classification Evaluation (Optional but helpful) ---

# Convert to binary classification: delayed >= 15 min (used in the project requirements markdown)
lr_pred_class = lr_pred.withColumn("delayed_15", (col("label") >= 15).cast("double")) \
    .withColumn("pred_delayed_15", (col("prediction") >= 15).cast("double"))

rf_pred_class = rf_pred.withColumn("delayed_15", (col("label") >= 15).cast("double")) \
    .withColumn("pred_delayed_15", (col("prediction") >= 15).cast("double"))

# Evaluators
bin_eval = BinaryClassificationEvaluator(labelCol="delayed_15", rawPredictionCol="prediction", metricName="areaUnderROC")
multi_eval = MulticlassClassificationEvaluator(labelCol="delayed_15", predictionCol="pred_delayed_15")

lr_auc = bin_eval.evaluate(lr_pred_class)
rf_auc = bin_eval.evaluate(rf_pred_class)
lr_acc = multi_eval.evaluate(lr_pred_class, {multi_eval.metricName: "accuracy"})
rf_acc = multi_eval.evaluate(rf_pred_class, {multi_eval.metricName: "accuracy"})
lr_f1 = multi_eval.evaluate(lr_pred_class, {multi_eval.metricName: "f1"})
rf_f1 = multi_eval.evaluate(rf_pred_class, {multi_eval.metricName: "f1"})

# Print classification results
print("\n--- Classification Results (Delayed ≥ 15 min?) ---")
print(f"Linear Regression → AUC: {lr_auc:.4f} | Accuracy: {lr_acc:.4f} | F1-score: {lr_f1:.4f}")
print(f"Random Forest → AUC: {rf_auc:.4f} | Accuracy: {rf_acc:.4f} | F1-score: {rf_f1:.4f}")

### Step 6: Resource Management

**Requirement:** Configure Spark resource settings (`spark.executor.memory`, `spark.executor.cores`) and monitor utilization. Fine-tune `spark.sql.shuffle.partitions`.



In [None]:
print("=== SPARK CLUSTER & RESOURCE CONFIGURATION ===")

# 6.1 & 6.2: Configuring Memory and Cores (Master-side check)
# These settings are primarily controlled by the Docker/VM setup (e.g., spark-defaults.conf or environment variables).
# We print the current values for documentation.

print(f"Shuffle partitions (Initial/Driver Config) : {spark.conf.get('spark.sql.shuffle.partitions', 'Not set')}")
print(f"Executor memory (Initial/Driver Config)    : {spark.conf.get('spark.executor.memory', 'Not set')}")

try:
    # Note: spark.executor.cores is often a fixed setting per worker set in spark-defaults.conf
    print(f"Executor cores (Config)                    : {spark.conf.get('spark.executor.cores', 'Not set')}")
    print(f"Default parallelism (from executors)     : {spark.sparkContext.defaultParallelism}")
except Exception:
    print("Executor cores/Parallelism not directly readable via spark.conf.get for static setup.")

print(f"Dynamic allocation enabled (Check)           : {spark.conf.get('spark.dynamicAllocation.enabled', 'false')}")

print("\n--- Monitoring (Step 6.3 & 7) ---")
print("Resource Utilization is monitored using the Spark Web UI (http://localhost:4040)."+
    "This includes Task Distribution, Memory, and CPU usage on the Executors tab.")

### Step 6.4 & 7: Fine-Tuning Shuffle Partitions & Performance Monitoring

**Requirement:** Adjust shuffle partitions to balance parallelism and memory efficiency, and monitor performance using a micro-benchmark (Step 7).

In [None]:
from time import perf_counter

# Use the clean_df created in Step 4 which is cached
df_to_shuffle = spark.table("flights_features")

def time_shuffle_job(df, partitions):
    spark.conf.set('spark.sql.shuffle.partitions', str(partitions))
    # Clear cache to ensure a fresh run if same DF is used (optional, but safer)
    df.unpersist(blocking=True)
    t0 = perf_counter()
    # Lightweight aggregation that triggers shuffle and forces re-evaluation
    _ = df.groupBy('UniqueCarrier').count().count()
    t1 = perf_counter()
    return t1 - t0


print("\n--- Fine-Tuning spark.sql.shuffle.partitions ---")

# Number of partitions to test
partitions_to_test = [6, 12, 24]
results = {}

for p in partitions_to_test:
    t = time_shuffle_job(df_to_shuffle, p)
    results[p] = t
    print(f"Shuffle partitions = {p:2d} → Execution time: {t:.3f} seconds")

best_p = min(results, key=results.get)
print(f"\nObservation: Shuffle partitions = {best_p} is the most efficient configuration for this job.")
spark.conf.set('spark.sql.shuffle.partitions', str(best_p)) # Apply best setting
print(f"Final Shuffle Partitions set to: {best_p}")

### Step 8: Tuning the Machine Learning Model (Cross-Validation)

**Requirement:** Fine-tune the machine learning model by adjusting hyperparameters and running cross-validation to find the best-performing configuration.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.pipeline import Pipeline

# Re-create the full LR pipeline for tuning, as the original LR object is inside the fitted lr_model
# We will use the same preprocessing stages as defined in Step 5 (indexer_carrier, indexer_origin, indexer_dest, assembler)
lr_to_tune = LinearRegression(featuresCol="features", labelCol="label", maxIter=30)
lr_pipeline_tune = Pipeline(stages=lr_pipeline.getStages()[:-1] + [lr_to_tune])

# Define the hyperparameter grid for Linear Regression
paramGrid = ParamGridBuilder() \
    .addGrid(lr_to_tune.regParam, [0.01, 0.1]) \
    .addGrid(lr_to_tune.elasticNetParam, [0.0, 0.5]) \
    .build()

cv = CrossValidator(estimator=lr_pipeline_tune, estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol="label"),
                    numFolds=3, parallelism=4, seed=42)

print("Running 3-fold Cross-Validation on Linear Regression to tune: regParam and elasticNetParam...")
cvModel = cv.fit(train_grouped)

best_rmse = RegressionEvaluator().evaluate(cvModel.transform(test_grouped))
print(f"\nBest model after Cross-Validation → RMSE: {best_rmse:.2f} minutes")
print("Best parameters found:", cvModel.bestModel.extractParamMap())

# Save the best tuned model
cvModel.bestModel.save("/workspace/models/lr_best_tuned")
print("Best tuned model saved to /workspace/models/lr_best_tuned")

## Summary of Part 1 & Next Steps

This notebook has successfully completed all mandatory steps for Part 1:

* **Cluster Setup & Data Loading (Steps 1-3):** A Spark session was established, and the distributed loading of the manually split dataset was demonstrated.
* **Data Processing (Step 4):** Cleaning, feature engineering (like `IsWeekend`, `Distance_km`, `WasDepartureDelayed`), and repartitioning were executed using Spark SQL/DataFrame API.
* **ML Pipeline & Evaluation (Step 5):** Two distributed machine learning pipelines (`LinearRegression` and `RandomForestRegressor`) were trained and evaluated for regression and classification metrics.
* **Resource Management & Tuning (Steps 6-8):** The current resource configuration was checked, shuffle partitions were micro-tuned, and the Linear Regression model was fine-tuned using Cross-Validation.

All key outcomes and required steps have been documented and implemented in the runnable cells. You can now use the results from the executed cells to draft the **Methodology** and **Results** sections of your report.

In [None]:
# Stop Spark Session after all work is complete
spark.stop()