In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .master("local[4]") \
    .appName("Stock Price Forecast") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
from datetime import datetime
import os
import glob
from pyspark.sql.functions import col, isnan, when, count, lit, lead, lag, avg, stddev, min, max, concat_ws
from pyspark.sql.window import Window
from pyspark.sql.functions import year, col, min as spark_min, max as spark_max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType
from pyspark.sql.functions import sqrt, avg, lead, col, expr, to_date, year, mean, log, row_number
from pyspark.sql.window import Window
from pyspark.ml.feature import  VectorAssembler, StandardScaler, PCA
from pyspark.sql.functions import abs as spark_abs
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor, FMRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

def scale_features_improved(train_df, val_df, test_df, numerical_cols, categorical_cols):
    """
    Scale numerical features while properly handling categorical features

    Args:
        train_df, val_df, test_df: DataFrames for each dataset
        numerical_cols: List of numerical feature column names
        categorical_cols: List of categorical feature column names

    Returns:
        Tuple of (scaled_train_df, scaled_val_df, scaled_test_df)
    """
    # 1. Process numerical features
    num_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features_vec")

    # Apply assembler to each dataset
    train_num = num_assembler.transform(train_df)
    val_num = num_assembler.transform(val_df)
    test_num = num_assembler.transform(test_df)

    # Fit scaler on training data only
    num_scaler = StandardScaler(inputCol="numerical_features_vec", outputCol="scaled_numerical_features")
    scaler_model = num_scaler.fit(train_num)

    # Transform all datasets
    train_scaled = scaler_model.transform(train_num)
    val_scaled = scaler_model.transform(val_num)
    test_scaled = scaler_model.transform(test_num)

    # 2. Process categorical features (assuming they're already indexed/encoded)
    cat_assembler = VectorAssembler(inputCols=categorical_cols, outputCol="categorical_features_vec")

    # Apply to each dataset
    train_final = cat_assembler.transform(train_scaled)
    val_final = cat_assembler.transform(val_scaled)
    test_final = cat_assembler.transform(test_scaled)

    # 3. Combine numerical and categorical features
    final_assembler = VectorAssembler(
        inputCols=["scaled_numerical_features", "categorical_features_vec"],
        outputCol="features"
    )

    train_result = final_assembler.transform(train_final)
    val_result = final_assembler.transform(val_final)
    test_result = final_assembler.transform(test_final)

    return train_result, val_result, test_result


def prepare_pca_for_all_folds(input_dir, output_dir, fold_indices, label_col="Target_Return_1d", pca_k=20):
    from pyspark.ml.functions import vector_to_array
    from pyspark.sql.functions import col
    os.makedirs(output_dir, exist_ok=True)

    print("Fitting scaler and PCA using fold 6")
    train_6 = spark.read.parquet(os.path.join(input_dir, "train_fold_6"))
    price_cols = ["Open", "High", "Low", "Close", "Volume", "Adjusted Close"]
    derived_cols = [f.name for f in train_6.schema.fields if isinstance(f.dataType, (DoubleType, IntegerType)) and f.name not in price_cols and not f.name.startswith("Target_")]
    numerical_cols = price_cols + derived_cols
    categorical_cols = [f.name for f in train_6.schema.fields if f.name.endswith("_vec")]

    df_6_scaled, _, _ = scale_features_improved(train_6, train_6, train_6, numerical_cols, categorical_cols)
    pca = PCA(inputCol="features", outputCol="pca_features", k=pca_k)
    pca_model = pca.fit(df_6_scaled)

    def transform_and_save(fold_index):
        print(f"Processing fold {fold_index}")
        df_train = spark.read.parquet(os.path.join(input_dir, f"train_fold_{fold_index}"))
        df_scaled, _, _ = scale_features_improved(df_train, df_train, df_train, numerical_cols, categorical_cols)
        df_pca = pca_model.transform(df_scaled)

        df_clean = df_pca.select("pca_features", label_col)
        df_clean = df_clean.withColumn("pca_array", vector_to_array("pca_features"))
        for i in range(pca_k):
            df_clean = df_clean.withColumn(f"pca_feature_{i}", col("pca_array")[i])
        df_clean = df_clean.drop("pca_features", "pca_array")

        save_path = os.path.join(output_dir, f"train_fold_{fold_index}_pca.parquet")
        df_clean.write.mode("overwrite").parquet(save_path)

    for i in fold_indices:
        transform_and_save(i)

    print("Processing test set")
    df_test = spark.read.parquet(os.path.join(input_dir, "test_set"))
    df_test_scaled, _, _ = scale_features_improved(df_test, df_test, df_test, numerical_cols, categorical_cols)
    df_test_pca = pca_model.transform(df_test_scaled)

    df_test_clean = df_test_pca.select("pca_features", label_col)
    df_test_clean = df_test_clean.withColumn("pca_array", vector_to_array("pca_features"))
    for i in range(pca_k):
        df_test_clean = df_test_clean.withColumn(f"pca_feature_{i}", col("pca_array")[i])
    df_test_clean = df_test_clean.drop("pca_features", "pca_array")
    df_test_clean.write.mode("overwrite").parquet(os.path.join(output_dir, "test_set_pca.parquet"))

    print("All folds processed and saved.")





In [21]:
input_dir = r"D:\bigdata_project\sliding_time_splits_nasdaq"
output_dir = r"D:\bigdata_project\sliding_time_splits_nasdaq_clean"
fold_indices = list(range(7))  # [0, 1, 2, 3, 4, 5, 6]
label_col = "Target_Return_1d"
pca_k = 20

prepare_pca_for_all_folds(
    input_dir=input_dir,
    output_dir=output_dir,
    fold_indices=fold_indices,
    label_col=label_col,
    pca_k=pca_k
)


Fitting scaler and PCA using fold 6
Processing fold 0
Processing fold 1
Processing fold 2
Processing fold 3
Processing fold 4
Processing fold 5
Processing fold 6
Processing test set
All folds processed and saved.


In [28]:
import os
from pyspark.ml.regression import LinearRegression, FMRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

def load_pca_folds(input_dir, fold_indices, label_col="Target_Return_1d"):
    train_folds = []
    for i in fold_indices:
        df = spark.read.parquet(os.path.join(input_dir, f"train_fold_{i}_pca.parquet"))
        train_folds.append(df)

    test_df = spark.read.parquet(os.path.join(input_dir, "test_set_pca.parquet"))
    return train_folds, test_df

def train_models_on_folds(train_folds, label_col="Target_Return_1d"):
    all_models = []
    for i, df in enumerate(train_folds):
        print(f"Training models for fold {i}")

        feature_cols = [f"pca_feature_{j}" for j in range(20)]
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="pca_features")
        df = assembler.transform(df)

        lr_model = LinearRegression(featuresCol="pca_features", labelCol=label_col, regParam=0.01, elasticNetParam=0.0, maxIter=10).fit(df)
        print(f"Training models for fold {i} LR model")
        fm_model = FMRegressor(featuresCol="pca_features", labelCol=label_col, stepSize=0.001, regParam=0.1, maxIter=100).fit(df)
        print(f"Training models for fold {i} FM model")

        all_models.append((lr_model, fm_model))
    return all_models
def ensemble_predict(models, test_df, label_col="Target_Return_1d"):
    feature_cols = [f"pca_feature_{j}" for j in range(20)]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="pca_features")
    test_df = assembler.transform(test_df)

    preds = []
    weights = [i + 1 for i in range(len(models))]  # Recency-weighted: fold 0 = 1, fold 6 = 7
    total_weight = sum(weights)

    for i, (lr_model, fm_model) in enumerate(models):
        weight = weights[i] / total_weight

        lr_pred = lr_model.transform(test_df).select(col("prediction").alias("lr_pred"))
        fm_pred = fm_model.transform(test_df).select(col("prediction").alias("fm_pred"))

        combined = lr_pred.withColumn("fm_pred", fm_pred["fm_pred"])
        combined = combined.withColumn("weighted_avg", (col("lr_pred") + col("fm_pred")) / 2 * lit(weight))

        preds.append(combined.select("weighted_avg"))

    # Sum all weighted predictions
    from functools import reduce
    from pyspark.sql import DataFrame
    combined_preds = reduce(lambda df1, df2: df1.withColumn("weighted_avg", col("weighted_avg") + df2["weighted_avg"]), preds)

    final_df = test_df.select(label_col).withColumn("ensemble_pred", combined_preds["weighted_avg"])

    evaluators = {
        "rmse": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="rmse"),
        "mae": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="mae"),
        "mse": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="mse"),
        "r2": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="r2"),
    }

    for name, evaluator in evaluators.items():
        score = evaluator.evaluate(final_df)
        print(f"Final Ensemble {name.upper()}: {score:.6f}")

    return final_df


In [26]:
output_dir = "sliding_time_splits_forbes2000_clean"
fold_indices = list(range(7))  # 0 to 6
label_col = "Target_Return_1d"
train_folds, test_df = load_pca_folds(output_dir, fold_indices, label_col)
models_per_fold = train_models_on_folds(train_folds, label_col)


Training models for fold 0
Training models for fold 1
Training models for fold 2
Training models for fold 3
Training models for fold 4
Training models for fold 5
Training models for fold 6


In [1]:
def ensemble_predict(models, test_df, label_col="Target_Return_1d"):
    feature_cols = [f"pca_feature_{j}" for j in range(20)]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="pca_features")
    test_df = assembler.transform(test_df).withColumn("row_id", monotonically_increasing_id())

    preds = []
    weights = [i + 1 for i in range(len(models))]  # Recency-weighted: fold 0 = 1, fold 6 = 7
    total_weight = sum(weights)

    for i, (lr_model, fm_model) in enumerate(models):
        weight = weights[i] / total_weight

        lr_pred = lr_model.transform(test_df).select("row_id", col("prediction").alias("lr_pred"))
        fm_pred = fm_model.transform(test_df).select("row_id", col("prediction").alias("fm_pred"))

        combined = lr_pred.join(fm_pred, on="row_id")
        combined = combined.withColumn(f"weighted_{i}", ((col("lr_pred") + col("fm_pred")) / 2) * lit(weight))

        preds.append(combined.select("row_id", f"weighted_{i}"))

    from functools import reduce
    from pyspark.sql import DataFrame
    combined_preds = reduce(lambda df1, df2: df1.join(df2, on="row_id"), preds)

    weight_cols = [f"weighted_{i}" for i in range(len(models))]
    combined_preds = combined_preds.withColumn("ensemble_pred", sum([col(c) for c in weight_cols]))

    final_df = test_df.select("row_id", label_col).join(combined_preds.select("row_id", "ensemble_pred"), on="row_id")
    final_df = final_df.select(label_col, "ensemble_pred")

    evaluators = {
        "rmse": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="rmse"),
        "mae": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="mae"),
        "mse": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="mse"),
        "r2": RegressionEvaluator(labelCol=label_col, predictionCol="ensemble_pred", metricName="r2"),
    }

    for name, evaluator in evaluators.items():
        score = evaluator.evaluate(final_df)
        print(f"Final Ensemble {name.upper()}: {score:.6f}")

    return final_df



Running evaluation for dataset: sliding_time_splits_forbes2000_clean
Final Ensemble RMSE: 0.048952
Final Ensemble MAE: 0.015401
Final Ensemble MSE: 0.002542
Final Ensemble R2: -0.013567


## Nasdaq dataset

In [None]:
output_dir = "sliding_time_splits_nasdaq_clean"
fold_indices = list(range(7))  # 0 to 6
label_col = "Target_Return_1d"
train_folds, test_df = load_pca_folds(output_dir, fold_indices, label_col)
models_per_fold = train_models_on_folds(train_folds, label_col)
final_df = ensemble_predict(models_per_fold, test_df, label_col)

Training models for fold 0
Training models for fold 1
Training models for fold 2
Training models for fold 3
Training models for fold 4
Training models for fold 5
Training models for fold 6
Final Ensemble RMSE: 0.065031
Final Ensemble MAE: 0.022865
Final Ensemble MSE: 0.004225
Final Ensemble R2: 0.015129


## Forbes dataset

In [None]:
output_dir = "sliding_time_splits_forbes2000_clean"
fold_indices = list(range(7))  # 0 to 6
label_col = "Target_Return_1d"
train_folds, test_df = load_pca_folds(output_dir, fold_indices, label_col)
models_per_fold = train_models_on_folds(train_folds, label_col)
final_df = ensemble_predict(models_per_fold, test_df, label_col)

Training models for fold 0
Training models for fold 1
Training models for fold 2
Training models for fold 3
Training models for fold 4
Training models for fold 5
Training models for fold 6
Final Ensemble RMSE: 0.078433
Final Ensemble MAE: 0.027189
Final Ensemble MSE: 0.008305
Final Ensemble R2: 0.017433
