In [1]:
import os, pickle, glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, mean, stddev, min, max, countDistinct, sum as spark_sum
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from itertools import product
import numpy as np
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.storagelevel import StorageLevel
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorSlicer

Matplotlib created a temporary cache directory at /scratch/brocchio/job_39796952/matplotlib-iao7xbdv because the default path (/home/jovyan/.cache/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [22]:
try:
    spark.stop()
except Exception:
    pass

In [2]:
spill_dir = os.environ["SLURM_TMPDIR"]

sc = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.executor.memoryOverhead", "4g")\
    .config('spark.executor.instances', 2)\
    .config('spark.executor.cores', 4)\
    .config('spark.drive.cores', 4)\
    .config("spark.local.dir", spill_dir)\
    .getOrCreate()

print("Spark spill directory:", spill_dir)

Spark spill directory: /scratch/brocchio/job_39796952


In [3]:
data_path = 'project_data/train.parquet/'
df = sc.read.parquet(data_path)

#df.printSchema()

#df.show(5)

## Pre-processing Pipeline

In [4]:
feature_cols = [c for c in df.columns if c.startswith("feature_")]
categorical_cols = ["symbol_id"]
label_col = "responder_7"

imputer = Imputer(strategy="median", inputCols=feature_cols,
                  outputCols=[f"{c}_imputed" for c in feature_cols])

assembler = VectorAssembler(
    inputCols=[*imputer.getOutputCols(), *categorical_cols],
    outputCol="features_vec"
)

scaler = StandardScaler(inputCol="features_vec",
                        outputCol="scaled_features",
                        withMean=True, withStd=True)

pipeline_prep = Pipeline(stages=[imputer, assembler, scaler])
prep_model = pipeline_prep.fit(df)

## Linear Regression Model

In [8]:
all_features = assembler.getInputCols()
feature_pos = {f: i for i, f in enumerate(all_features)}

top5_names = [
    "feature_56_imputed",
    "feature_45_imputed",
    "feature_19_imputed",
    "feature_66_imputed",
    "feature_06_imputed",
]

top5_idx = [feature_pos[i] for i in top5_names]

slicer = VectorSlicer(
    inputCol="scaled_features",
    outputCol="feat_vec",
    indices=top5_idx 
)

linreg = LinearRegression(
    featuresCol="feat_vec",
    labelCol=label_col,
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.5
)

train_df = df.filter("date_id <= 1350")
val_df   = df.filter("date_id BETWEEN 1351 AND 1500")
test_df  = df.filter("date_id > 1500")

pipe_lr = Pipeline(stages=[prep_model, slicer, linreg])
lr_model   = pipe_lr.fit(train_df)

In [9]:
evaluator = (RegressionEvaluator(labelCol=label_col,
                                 predictionCol="prediction",
                                 metricName="rmse"))

pred_train = lr_model.transform(train_df)
pred_val   = lr_model.transform(val_df)
pred_test  = lr_model.transform(test_df)

rmse_train = evaluator.evaluate(pred_train)
rmse_val   = evaluator.evaluate(pred_val)
rmse_test  = evaluator.evaluate(pred_test)

print(f"Linear‑Regression RMSE train {rmse_train:.4f}  | "
      f"val {rmse_val:.4f}  | test {rmse_test:.4f}")

Linear‑Regression RMSE train 0.9351  | val 0.9274  | test 0.8065


## Random Forest Top 5 Features

In [13]:
all_features = assembler.getInputCols()
feature_pos = {f: i for i, f in enumerate(all_features)}

top5_names = [
    "feature_56_imputed",
    "feature_45_imputed",
    "feature_19_imputed",
    "feature_66_imputed",
    "feature_06_imputed",
]

top5_idx = [feature_pos[i] for i in top5_names]

slicer = VectorSlicer(
    inputCol="scaled_features",
    outputCol="feat_vec",
    indices=top5_idx 
)

rf = (RandomForestRegressor(featuresCol="feat_vec",
                            labelCol=label_col,
                            numTrees=30,
                            maxDepth=4,
                            subsamplingRate=0.6,
                            featureSubsetStrategy="sqrt",
                            seed=42))

pipe_rf5 = Pipeline(stages=[prep_model, slicer, rf])

train_df = df.filter("date_id <= 1350")
val_df   = df.filter("date_id BETWEEN 1351 AND 1500")
test_df  = df.filter("date_id > 1500")

rf5_model = pipe_rf5.fit(train_df)

In [14]:
evaluator = RegressionEvaluator(labelCol=label_col,
                                predictionCol="prediction",
                                metricName="rmse")

pred_train = rf5_model.transform(train_df)
pred_val   = rf5_model.transform(val_df)
pred_test  = rf5_model.transform(test_df)

rmse_train = evaluator.evaluate(pred_train)
rmse_val   = evaluator.evaluate(pred_val)
rmse_test  = evaluator.evaluate(pred_test)

print(f"Random‑Forest RMSE train {rmse_train:.4f}  | "
      f"val {rmse_val:.4f}  | test {rmse_test:.4f}")

Random‑Forest RMSE train 0.9349  | val 0.9270  | test 0.8063
