In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

conf = SparkConf()
conf.setAppName("ML Data Preparation")
conf.setMaster("local")
conf.set("spark.hadoop.fs.defaultFS", "file:///")
sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
spark = SparkSession.builder.appName("App").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/21 21:08:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
dataframe = spark.read.parquet("data_processed_job/2007")

# Rename column "ArrDelay" to "label", as this is the default in Spark
dataframe = dataframe.withColumn("label", col("ArrDelay")).drop("ArrDelay")

In [3]:
dataframe.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- DepTimeT: integer (nullable = true)
 |-- CRSDepTimeT: integer (nullable = true)
 |-- CRSArrTimeT: integer (nullable = true)
 |-- PunctualCarrier: integer (nullable = true)
 |-- AverageCarrier: integer (nullable = true)
 |-- label: integer (nullable = true)



First, we split our data into a *train set* (80%) and *test set* (20%).

In [4]:
FEATURE_COLS = ["Month", "DayofMonth", "DepTimeT", "CRSDepTimeT", "CRSArrTimeT", "DepDelay", "CRSElapsedTime", "PunctualCarrier", "AverageCarrier"]

train_data, test_data = dataframe.select(*(FEATURE_COLS + ["label"])).randomSplit([0.8, 0.2], seed=3)
train_data.cache()

DataFrame[Month: int, DayofMonth: int, DepTimeT: int, CRSDepTimeT: int, CRSArrTimeT: int, DepDelay: int, CRSElapsedTime: int, PunctualCarrier: int, AverageCarrier: int, label: int]

# Linear Regression

## On Raw Features

In [None]:
FEATURE_COLS = ["Month", "DayofMonth", "DepTimeT", "CRSDepTimeT", "CRSArrTimeT", "DepDelay", "CRSElapsedTime", "PunctualCarrier", "AverageCarrier"]

vector_assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
lr = LinearRegression()
pipeline_lr = Pipeline(stages=[vector_assembler, lr])

# train_data_lr = train_data.select(*(FEATURE_COLS + ["label"]))
model_lr = pipeline_lr.fit(train_data)

## On Normalized Features

In [None]:
# Define groups of columns for preprocessing
# Use Min-Max Scaling for features with approximately uniform distribution
MINMAX_COLS = ["Month", "DayofMonth", "DepTimeT", "CRSDepTimeT", "CRSArrTimeT"]
# Use Standard Scaling for features which deviate from a mean
STANDARD_COLS = ["DepDelay", "CRSElapsedTime"]
# One-Hot encoding was already performed in the preparation phase
ONE_HOT_COLS = ["PunctualCarrier", "AverageCarrier"]

# ALL_COLS = MINMAX_COLS + STANDARD_COLS + ONE_HOT_COLS

# ++++ Define stages for the pipeline ++++
minmax_assembler = VectorAssembler(inputCols=MINMAX_COLS, outputCol="minmax_features")
minmax_scaler = MinMaxScaler(inputCol="minmax_features", outputCol="scaled_minmax_features")

standard_assembler = VectorAssembler(inputCols=STANDARD_COLS, outputCol="standard_features")
standard_scaler = StandardScaler(inputCol="standard_features", outputCol="scaled_standard_features", withMean=True, withStd=True)

final_assembler = VectorAssembler(
    inputCols=["scaled_minmax_features", "scaled_standard_features"] + ONE_HOT_COLS,
    outputCol="features"
)

lr_normalized = LinearRegression()

# ++++ Create a pipeline ++++
pipeline_lr_normalized = Pipeline(stages=[
    minmax_assembler, 
    minmax_scaler, 
    standard_assembler, 
    standard_scaler, 
    final_assembler, 
    lr_normalized
])

# ++++ Fit the pipeline ++++
model_lr_normalized = pipeline_lr_normalized.fit(train_data)

## Interpretation & Evaluation

Display the coefficients learned by the models (raw + normalized features).

In [None]:
model_lr_model = model_lr.stages[-1]
model_lr_normalized_model = model_lr_normalized.stages[-1]

print(f"{'Feature':<15}{'Raw Features':>15}{'Normalized Features':>20}")
print(f"{'Intercept':<15}{model_lr_model.intercept:>15.2f}{model_lr_normalized_model.intercept:>20.2f}")

for i in range(len(FEATURE_COLS)):
    print(f"{FEATURE_COLS[i]:<15}{model_lr_model.coefficients[i]:>15.2f}{model_lr_normalized_model.coefficients[i]:>20.2f}")

Based on the LR using the **raw features**, we can see that a `DepDelay` of one minute corresponds to about one minute of `ArrDelay`.
The coefficients of the Linear Regression with **normalized features** tells us that `DepDelay` has by far the strongest influence on `ArrDelay`. (Which coincides with our findings from the correlation matrix, as both consider linear relationships.)

In [None]:
predictions = model_lr.transform(test_data.select(*(FEATURE_COLS + ["label"])))

evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

To evaluate our Linear Regression model, we compute three metrics: Root Mean Squared Error (**RMSE**), Mean Absolute Error (**MAE**), and the coefficient of determination (**R²**).

The RMSE is the most common evaluation metric of regression models, and it penalizes greater errors stronger. We will use it later to compare the performances between our models and choose the best-performing one.

The MAE is more robust to outliers. Our LR model's predictions are off by 9.43 minutes on average.

R² measures the percentage of variance which can be explained by our model—about 86.9%—the rest is noise which cannot be explained by our model.

In [None]:
model_lr_path = "./models/lr"
model_lr.write().overwrite().save(model_lr_path)
print(f"Linear regression model saved to {model_lr_path}.")

# Decision Tree

In [None]:
vector_assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
dt = DecisionTreeRegressor(maxMemoryInMB=1024, seed=3)
pipeline_dt = Pipeline(stages=[vector_assembler, dt])

# ++++ Define parameter grid for grid search ++++
param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 8, 12, 15]) \
    .addGrid(dt.maxBins, [64, 128, 256]) \
    .build()

# ++++ Define evaluator for cross validation ++++
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="rmse"
)

# ++++ Define cross validation (using 3 folds) ++++
crossval = CrossValidator(
    estimator=pipeline_dt,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4  # Number of threads for parallel processing
)

cv_model = crossval.fit(train_data)

In [None]:
# ++++ Extract the best model ++++
best_model = cv_model.bestModel

# print(model_dt.stages[-1].toDebugString)

### Top 5 Models
- (1) RMSE = 14.5918    maxDepth = 12	maxBins = 256
- (2) RMSE = 14.8100    maxDepth = 12	maxBins = 128
- (3) RMSE = 14.8242    maxDepth = 8	maxBins = 256
- (4) RMSE = 14.8342    maxDepth = 15	maxBins = 256
- (5) RMSE = 14.9673    maxDepth = 15	maxBins = 128

In [None]:
# Access avgMetrics and estimatorParamMaps
metrics = cv_model.avgMetrics  # List of average metrics for each model
param_maps = cv_model.getEstimatorParamMaps()  # List of hyperparameter combinations

# Combine metrics and hyperparameters into a sorted list
results = [(metric, param_map) for metric, param_map in zip(metrics, param_maps)]
sorted_results = sorted(results, key=lambda x: x[0])  # Sort by metric (ascending for RMSE)

# Display top 5 models
print("Top 5 Models:")
for rank, (metric, param_map) in enumerate(sorted_results[:5], start=1):
    params_str = "\t".join([f"{param.name} = {value}" for param, value in param_map.items()])
    print(f"({rank}) RMSE = {metric:.4f}    {params_str}")

In [None]:
predictions = best_model.transform(test_data)

evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

In [None]:
best_model_path = "./models/dt"
best_model.write().overwrite().save(best_model_path)
print(f"Best model saved to {best_model_path}")

# Random Forest

In [None]:
vector_assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
rf = RandomForestRegressor(maxMemoryInMB=1024, seed=3)
pipeline_rf = Pipeline(stages=[vector_assembler, rf])

# ++++ Define parameter grid for grid search ++++
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3, 5, 7]) \
    .addGrid(rf.maxBins, [32, 64]) \
    .addGrid(rf.numTrees, [20, 50, 100]) \
    .build()

# ++++ Define evaluator for cross validation ++++
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="rmse"
)

# ++++ Define cross validation (using 3 folds) ++++
crossval = CrossValidator(
    estimator=pipeline_rf,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4  # Number of threads for parallel processing
)

cv_model = crossval.fit(train_data)

In [None]:
# Access avgMetrics and estimatorParamMaps
metrics = cv_model.avgMetrics  # List of average metrics for each model
param_maps = cv_model.getEstimatorParamMaps()  # List of hyperparameter combinations

# Combine metrics and hyperparameters into a sorted list
results = [(metric, param_map) for metric, param_map in zip(metrics, param_maps)]
sorted_results = sorted(results, key=lambda x: x[0])  # Sort by metric (ascending for RMSE)

# Display top 5 models
print("Top 5 Models:")
for rank, (metric, param_map) in enumerate(sorted_results[:5], start=1):
    params_str = "\t".join([f"{param.name} = {value}" for param, value in param_map.items()])
    print(f"({rank}) RMSE = {metric:.4f}    {params_str}")

In [None]:
best_model = cv_model.bestModel

predictions = best_model.transform(test_data)

evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

In [None]:
best_model_path = "./models/rf"
best_model.write().overwrite().save(best_model_path)
print(f"Best model saved to {best_model_path}")

# Gradient-boosted Trees

In [5]:
vector_assembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
gbt = GBTRegressor(maxMemoryInMB=1024, seed=3)
pipeline_gbt = Pipeline(stages=[vector_assembler, gbt])

# ++++ Define parameter grid for grid search ++++
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [50, 100]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

# ++++ Define evaluator for cross validation ++++
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="rmse"
)

# ++++ Define cross validation (using 3 folds) ++++
crossval = CrossValidator(
    estimator=pipeline_gbt,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4  # Number of threads for parallel processing
)

cv_model = crossval.fit(train_data)

24/12/21 21:09:45 WARN MemoryStore: Not enough space to cache rdd_97_0 in memory! (computed 235.8 MiB so far)
24/12/21 21:09:45 WARN BlockManager: Persisting block rdd_97_0 to disk instead.
24/12/21 21:09:52 WARN MemoryStore: Not enough space to cache rdd_102_0 in memory! (computed 235.8 MiB so far)
24/12/21 21:09:52 WARN BlockManager: Persisting block rdd_102_0 to disk instead.
24/12/21 21:09:59 WARN MemoryStore: Not enough space to cache rdd_113_0 in memory! (computed 235.8 MiB so far)
24/12/21 21:09:59 WARN BlockManager: Persisting block rdd_113_0 to disk instead.
24/12/21 21:10:06 WARN MemoryStore: Not enough space to cache rdd_119_0 in memory! (computed 235.8 MiB so far)
24/12/21 21:10:06 WARN BlockManager: Persisting block rdd_119_0 to disk instead.
24/12/21 21:10:42 WARN MemoryStore: Not enough space to cache rdd_177_0 in memory! (computed 19.5 MiB so far)
24/12/21 21:10:42 WARN BlockManager: Persisting block rdd_177_0 to disk instead.
24/12/21 21:10:43 WARN MemoryStore: Not eno

KeyboardInterrupt: 

24/12/23 23:37:19 WARN MemoryStore: Not enough space to cache rdd_8584_0 in memory! (computed 24.3 MiB so far)
24/12/23 23:37:22 WARN MemoryStore: Not enough space to cache rdd_8587_0 in memory! (computed 24.3 MiB so far)
24/12/23 23:37:24 WARN Executor: Issue communicating with driver in heartbeater 
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at or

In [None]:
# Access avgMetrics and estimatorParamMaps
metrics = cv_model.avgMetrics  # List of average metrics for each model
param_maps = cv_model.getEstimatorParamMaps()  # List of hyperparameter combinations

# Combine metrics and hyperparameters into a sorted list
results = [(metric, param_map) for metric, param_map in zip(metrics, param_maps)]
sorted_results = sorted(results, key=lambda x: x[0])  # Sort by metric (ascending for RMSE)

# Display top 5 models
print("Top 5 Models:")
for rank, (metric, param_map) in enumerate(sorted_results[:5], start=1):
    params_str = "\t".join([f"{param.name} = {value}" for param, value in param_map.items()])
    print(f"({rank}) RMSE = {metric:.4f}    {params_str}")

In [None]:
best_model = cv_model.bestModel

predictions = best_model.transform(test_data)

evaluator = RegressionEvaluator()
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")

In [None]:
best_model_path = "./models/gbt"
best_model.write().overwrite().save(best_model_path)
print(f"Best model saved to {best_model_path}")