# Linear Regressor

## Import Libraries

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, monotonically_increasing_id, lit, date_add, explode
import numpy as np
import matplotlib.pyplot as plt
import warnings
import plotly.express as px
warnings.filterwarnings('ignore')

## Initialize Spark session

In [2]:
sc = SparkContext(master = 'local')
spark = SparkSession.builder \
          .appName("Python Spark Linear Regressor") \
          .getOrCreate()

25/01/03 20:05:39 WARN Utils: Your hostname, Khim3 resolves to a loopback address: 127.0.1.1; using 192.168.75.108 instead (on interface wlo1)
25/01/03 20:05:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/03 20:05:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/03 20:05:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Load the data

In [3]:
df = spark.read.csv('../data/NFLX.csv', header=True, inferSchema=True)
df.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-02-05|     262.0|267.899994|250.029999|254.259995|254.259995|11896100|
|2018-02-06|247.699997|266.700012|     245.0|265.720001|265.720001|12595800|
|2018-02-07|266.579987|272.450012|264.329987|264.559998|264.559998| 8981500|
|2018-02-08|267.079987|267.619995|     250.0|250.100006|250.100006| 9306700|
|2018-02-09|253.850006|255.800003|236.110001|249.470001|249.470001|16906900|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



## Split the data into training and test sets

In [4]:
df = df.orderBy('Date')
# Calculate split index
split_index = int(df.count() * 0.8)

# Split the dataset into training and testing sets
train = df.limit(split_index)  # Take the first 80% of rows
test = df.subtract(train)      # Subtract the training set from the original DataFrame to get the test set
test_copy = test.select("*") 
# Display row counts of the resulting DataFrames to verify the split
print(f"Training set row count: {train.count()}")
print(f"Testing set row count: {test.count()}")
test.show(5)

Training set row count: 807
Testing set row count: 202
+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2021-08-31|566.119995| 569.47998|561.609985|569.190002|569.190002| 2431900|
|2021-10-04|613.390015|626.130005|594.679993|603.349976|603.349976| 4995900|
|2021-05-17|485.589996|492.709991|482.809998|488.940002|488.940002| 2705200|
|2021-04-21|     508.0|515.460022|503.600006|508.899994|508.899994|22897400|
|2021-04-27|512.619995| 512.98999|504.579987|505.549988|505.549988| 3761300|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



## Set pipeline stages

### Vector Assembler

In [5]:
# 1. Assemble feature columns into a single vector column
feature_columns = ["Open", "High", "Low", "Volume"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

train = assembler.transform(train).select("features", col("Close").alias("label"))
test = assembler.transform(test).select("features", col("Close").alias("label"))
test.show(5)

+--------------------+----------+
|            features|     label|
+--------------------+----------+
|[508.0,515.460022...|508.899994|
|[513.820007,513.9...|508.779999|
|[509.01001,509.70...|505.549988|
|[506.76001,510.48...|510.299988|
|[512.619995,512.9...|505.549988|
+--------------------+----------+
only showing top 5 rows



### Train with Grid Search and Cross Validation

In [6]:
# 2. Initialize and fit the Linear Regression model
linear_regressor = LinearRegression(featuresCol="features", labelCol="label")

paramGrid = ParamGridBuilder() \
    .addGrid(linear_regressor.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(linear_regressor.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
crossval = CrossValidator(
    estimator=linear_regressor,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3  # Number of folds for cross-validation
)

cv_model = crossval.fit(train)

# Get the best model
best_model = cv_model.bestModel
print("Best Model Params:")
print(f"  Regularization Param (regParam): {best_model._java_obj.getRegParam()}")
print(f"  ElasticNet Param (elasticNetParam): {best_model._java_obj.getElasticNetParam()}")

# Make predictions on the test set
predictions = best_model.transform(test)
# 4. Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

# Metrics
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print("MSE:", round(mse, 3))
print("RMSE:", round(rmse, 3))
print("MAE:", round(mae, 3))
print("R2 Score:", round(r2, 3))

25/01/03 20:05:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/01/03 20:05:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Best Model Params:
  Regularization Param (regParam): 0.01
  ElasticNet Param (elasticNetParam): 0.0
MSE: 17.157
RMSE: 4.142
MAE: 3.067
R2 Score: 0.997


## Save model for inference

In [7]:
best_model.save("../models/linear_regressor")

## Prepare data for visualization

In [8]:
# Join the predictions with the test DataFrame to add a 'Close_Prediction' column
test_pred = test.join(
    predictions.select("features", "prediction"), 
    on="features", 
    how="inner"
)

# Rename the 'prediction' column to 'Close_Prediction'
test_pred = test_pred.withColumnRenamed("prediction", "Close_Prediction")

# Optionally drop the 'features' column if no longer needed
test_pred = test_pred.drop("features")

# Show the resulting DataFrame
test_pred.show(5)

+----------+-----------------+
|     label| Close_Prediction|
+----------+-----------------+
|508.899994|511.3810775079306|
|508.779999|503.8861933081949|
|505.549988|503.3259849143498|
|510.299988| 506.864291968483|
|505.549988|506.7309388916451|
+----------+-----------------+
only showing top 5 rows



In [9]:
# Merge the DataFrames on `Close` from test_copy and `label` from test_pred
merged_df = test_copy.join(test_pred, test_copy["Close"] == test_pred["label"], how="inner")

# Drop the duplicate column `label` after the join
merged_df = merged_df.drop("label")

# Show the merged DataFrame
merged_df.show(5)


+----------+----------+----------+----------+----------+----------+--------+-----------------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume| Close_Prediction|
+----------+----------+----------+----------+----------+----------+--------+-----------------+
|2021-04-21|     508.0|515.460022|503.600006|508.899994|508.899994|22897400|511.3810775079306|
|2021-04-22|513.820007|513.960022|500.549988|508.779999|508.779999| 9061100|503.8861933081949|
|2021-04-23| 509.01001|509.700012|500.700012|505.549988|505.549988| 7307700|503.3259849143498|
|2021-04-27|512.619995| 512.98999|504.579987|505.549988|505.549988| 3761300|503.3259849143498|
|2021-04-26| 506.76001|510.480011|     503.0|510.299988|510.299988| 4388800| 506.864291968483|
+----------+----------+----------+----------+----------+----------+--------+-----------------+
only showing top 5 rows



In [10]:
merge_df = merged_df.toPandas()
merge_df.shape

(206, 8)

In [11]:
merge_df

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Close_Prediction
0,2021-04-21,508.000000,515.460022,503.600006,508.899994,508.899994,22897400,511.381078
1,2021-04-22,513.820007,513.960022,500.549988,508.779999,508.779999,9061100,503.886193
2,2021-04-23,509.010010,509.700012,500.700012,505.549988,505.549988,7307700,503.325985
3,2021-04-27,512.619995,512.989990,504.579987,505.549988,505.549988,3761300,503.325985
4,2021-04-26,506.760010,510.480011,503.000000,510.299988,510.299988,4388800,506.864292
...,...,...,...,...,...,...,...,...
201,2022-01-31,401.970001,427.700012,398.200012,427.140015,427.140015,20047500,419.836906
202,2022-02-01,432.959991,458.480011,425.540009,457.130005,457.130005,22542300,447.908175
203,2022-02-02,448.250000,451.980011,426.480011,429.480011,429.480011,14346000,434.647086
204,2022-02-03,421.440002,429.260010,404.279999,405.600006,405.600006,9905200,414.419024


In [12]:
# Sort the DataFrame by Date
merged_df_pandas = merge_df.sort_values(by="Date")

# Plot using Plotly
fig = px.line(
    merged_df_pandas,
    x="Date",
    y=["Close", "Close_Prediction"],
    title="Close Price vs Close Price Prediction"
)

# Adjust the layout for better visualization
fig.update_layout(width=900, height=600)

# Show the plot
fig.show()


### Remark: Linear Regression performs great on test dataset.