### Importing Libraries

In [11]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.functions import col, sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, monotonically_increasing_id, lit, date_add, explode
import numpy as np
import matplotlib.pyplot as plt
import warnings
import plotly.express as px
warnings.filterwarnings('ignore')


### Build Spark Session

In [12]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Netflix Stock Price Forecasting") \
    .getOrCreate()
    
# Load the data
df = spark.read.csv("../data/raw/NFLX.csv", header=True, inferSchema=True)
df.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-02-05|     262.0|267.899994|250.029999|254.259995|254.259995|11896100|
|2018-02-06|247.699997|266.700012|     245.0|265.720001|265.720001|12595800|
|2018-02-07|266.579987|272.450012|264.329987|264.559998|264.559998| 8981500|
|2018-02-08|267.079987|267.619995|     250.0|250.100006|250.100006| 9306700|
|2018-02-09|253.850006|255.800003|236.110001|249.470001|249.470001|16906900|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



### Spliting the data intro train and test datasets

In [13]:
# Get the minimum and maximum dates in the dataset
min_date = df.select("Date").agg({"Date": "min"}).collect()[0][0]
max_date = df.select("Date").agg({"Date": "max"}).collect()[0][0]

# Calculate the time delta between the first and last date
delta = max_date - min_date

# Define a cutoff date: use the first 80% of the time period for training
cutoff_date = min_date + datetime.timedelta(days=int(delta.days * 0.8))

# Split the dataset:
# - train_df: data with Date less than cutoff_date (older data)
# - test_df: data with Date on or after cutoff_date (newer data)
train_df = df.filter(col("Date") < cutoff_date)
test_df = df.filter(col("Date") >= cutoff_date)

# Print out the counts in each set
print("Number of training samples: ", train_df.count())
print("Number of testing samples: ", test_df.count())

Number of training samples:  805
Number of testing samples:  204


In [14]:
train_df.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-02-05|     262.0|267.899994|250.029999|254.259995|254.259995|11896100|
|2018-02-06|247.699997|266.700012|     245.0|265.720001|265.720001|12595800|
|2018-02-07|266.579987|272.450012|264.329987|264.559998|264.559998| 8981500|
|2018-02-08|267.079987|267.619995|     250.0|250.100006|250.100006| 9306700|
|2018-02-09|253.850006|255.800003|236.110001|249.470001|249.470001|16906900|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



Time-based Split: Splitting by time helps prevent “look-ahead bias” where future information inadvertently influences the model training.


### Build the Linear Regression model

In [15]:
from pyspark.ml.feature import StandardScaler

# List of feature columns from the Feature Engineering step
feature_columns = ["Open", "High", "Low", "Volume"]

# Create a VectorAssembler to combine these columns into one vector column "scaledFeatures"
assembler = VectorAssembler(
    inputCols=feature_columns, 
    outputCol="features"
)

train_df = assembler.transform(train_df).select("features", col("Close").alias("label"))
test_df = assembler.transform(test_df).select("features", col("Close").alias("label"))

# Create Scaler to normalize vector
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

# Create LR model use scaledFeatures as input
linear_regressor = LinearRegression(
    featuresCol = "scaledFeatures",
    labelCol = "label"
)

#Build the pipeline that includes scaling and regression
pipeline = Pipeline(stages=[scaler, linear_regressor])

#Configure hyperparameter tuning with grid search
paramGrid = ParamGridBuilder() \
    .addGrid(linear_regressor.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(linear_regressor.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(linear_regressor.maxIter, [50, 100, 200]) \
    .build()

In [16]:
#Set up cross-validation
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
crossval = CrossValidator(
    estimator = pipeline,
    estimatorParamMaps = paramGrid,
    evaluator = evaluator,
    numFolds = 5  # Number of folds for cross-validation
)

# Fit the cross-validator to find the best model
cv_model = crossval.fit(train_df)

# Get the best model (which is a fitted pipeline)
best_pipeline_model = cv_model.bestModel
best_model = best_pipeline_model.stages[-1]  # Extract the LinearRegression model

print("Best Model Params:")
print("  Regularization Param (regParam):", best_model.getRegParam())
print("  ElasticNet Param (elasticNetParam):", best_model.getElasticNetParam())
print("  Maximum Iterations (maxIter):", best_model.getMaxIter())

Best Model Params:
  Regularization Param (regParam): 0.01
  ElasticNet Param (elasticNetParam): 0.0
  Maximum Iterations (maxIter): 50


In [None]:
# Make predictions on the test set using the pipeline
predictions = best_pipeline_model.transform(test_df)

# Evaluate the model 
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction"
)

# Calculate metrics
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print("MSE:", round(mse, 3))
print("RMSE:", round(rmse, 3))
print("MAE:", round(mae, 3))
print("R2 Score:", round(r2, 3))

MSE: 17.161
RMSE: 4.143
MAE: 3.07
R2 Score: 0.997


### Save model for inference