In [1]:
import kagglehub
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

#Import data
import math
path = kagglehub.dataset_download("joebeachcapital/s-and-p500-index-stocks-daily-updated")

Downloading from https://www.kaggle.com/api/v1/datasets/download/joebeachcapital/s-and-p500-index-stocks-daily-updated?dataset_version_number=166...


100%|██████████| 52.3M/52.3M [00:01<00:00, 37.9MB/s]

Extracting files...





In [2]:
# Create the spark app
spark = SparkSession.builder.appName("ML").getOrCreate()

# Create a spark dataframe with that data
data = spark.read.csv(path, header=True, inferSchema=True)

# Data cleaning and preprocessing
data = data.dropna()

#No duplicates (Yay)
data.exceptAll(data.dropDuplicates()).show()


+----+------+----+----+---+-----+---------+------+
|Date|Ticker|Open|High|Low|Close|Adj Close|Volume|
+----+------+----+----+---+-----+---------+------+
+----+------+----+----+---+-----+---------+------+



In [4]:
from typing_extensions import final
from pyspark.sql.functions import col

# Convert the date into separate columns so that they are pure numbers
data = data.withColumn("Year", year("Date")).withColumn("Month", month("Date")).withColumn("Day", dayofmonth("Date"))

# OneHot Encoding on Ticker (Names of companies) change them to numerical values so that we can analyze it
if "TickerIndex" not in data.columns:
    indexer = StringIndexer(inputCol="Ticker", outputCol="TickerIndex")
    data = indexer.fit(data).transform(data)

# this range will be excluded
exclude_condition = (
    (col("Year") == 2007) & (col("Month") <= 12) & (col("Month")>=2) |
    (col("Year") > 2007) & (col("Year") < 2009) |
    (col("Year") == 2009) & (col("Month") <= 4)
)

# Filter the dataset to exclude the specified range
filtered_data = data.filter(~exclude_condition)


# Keep track of the mapping between ticker and tickerindex so we know what the numbers represent what stock
# A list of mappings here
mapping = data.select("Ticker","TickerIndex").distinct().collect()


# Adj Close is what we will be predicting we are extracting the columns that will not be our label columns here
feature_columns = [col for col in data.columns if col not in ['Date','Ticker'] and col!= 'Adj Close']

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
assembled_data = assembler.transform(data)


assembler_excluded = VectorAssembler(inputCols=[col for col in filtered_data.columns if col not in ['Date','Ticker'] and col!='Adj Close'],outputCol="features")
assembled_data_excluded = assembler_excluded.transform(filtered_data)

final_data_excluded = assembled_data_excluded.select("features", "Adj Close")

# select features vector and adjacent close
final_data = assembled_data.select("features", "Adj Close")

# Train will be 80% of the data and test will be 20% , we will set the seed to ensure that the split remains consistent
train,test = final_data.randomSplit([0.8,0.2], seed=42)

# Train for the filtered data, this is splitting for the model with the financial crisis excluded
train_excluded,test_excluded = final_data_excluded.randomSplit([0.8,0.2], seed=42)

In [None]:
rf_filtered = RandomForestRegressor(featuresCol="features", labelCol="Adj Close", seed=42, numTrees=100,maxBins=502)
rf_filtered.setSeed(42)

# without financial crisis
model_filtered = rf_filtered.fit(train_excluded)

# Make predictions
predictions = model_filtered.transform(test_excluded)

#Evaluate how good the prediction is
evaluator = RegressionEvaluator(labelCol="Adj Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) for financial crisis excluded: {rmse}")

# show results
predictions.select("prediction", "Adj Close", "features").show(5)

Root Mean Squared Error (RMSE) for financial crisis excluded: 29.16253793990679
+-----------------+---------+--------------------+
|       prediction|Adj Close|            features|
+-----------------+---------+--------------------+
|12.38860180961822| 0.142361|[0.143229,0.15277...|
|12.38860180961822| 0.159444|[0.147778,0.19055...|
|12.38860180961822| 0.145833|[0.152778,0.15277...|
|12.38860180961822| 0.159722|[0.159722,0.15972...|
|12.38860180961822| 0.166667|[0.163194,0.16666...|
+-----------------+---------+--------------------+
only showing top 5 rows



In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="Adj Close", seed=42, numTrees=100,maxBins=502)
rf.setSeed(42)

# Fit model to the whole dataset, without any filters or excluded periods
model = rf.fit(train)

# Make prediction
predictions = model.transform(test)

#Evaluate how good the prediction is
evaluator = RegressionEvaluator(labelCol="Adj Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) for general model: {rmse}")

# Show results
predictions.select("prediction", "Adj Close", "features").show(5)

Root Mean Squared Error (RMSE) for general model: 29.136094870639774
+-----------------+---------+--------------------+
|       prediction|Adj Close|            features|
+-----------------+---------+--------------------+
|12.17149784444751| 0.142361|[0.143229,0.15277...|
|12.17149784444751| 0.159444|[0.147778,0.19055...|
|12.17149784444751| 0.145833|[0.152778,0.15277...|
|12.17149784444751| 0.159722|[0.159722,0.15972...|
|12.17149784444751| 0.166667|[0.163194,0.16666...|
+-----------------+---------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import to_date

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RandomForestRegressor") \
    .getOrCreate()

# Download the dataset using Kaggle Hub API
path = kagglehub.dataset_download("joebeachcapital/s-and-p500-index-stocks-daily-updated")

# Load the dataset into Spark Dataframe
df = spark.read.csv(path, header=True, inferSchema=True)

# Filter for ticker 'ULA'
df_ula = df.filter(df['Ticker'] == 'UAL')

# check if everything is loaded correctly
print(df_ula)

# Ensure the date column is in the correct format
df_ula = df_ula.withColumn('Date', to_date(df_ula['Date'], 'yyyy-MM-dd'))

# Select relevant columns
df_ula = df_ula.select('Date', 'Open', 'High', 'Low', 'Volume', 'Adj Close')

# --- Create datasets for training models --

# Exclude the period from March 2020 to November 2020 which is the COVID period (according to the report)
df_ula_excluded = df_ula.filter((df_ula['Date'] < '2020-03-01') | (df_ula['Date'] > '2020-11-30'))

# Include the full period for the ULA stock (no exclusion)
df_ula_included = df_ula

# Assemble features for both datasets
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')

df_ula_excluded = assembler.transform(df_ula_excluded)
df_ula_included = assembler.transform(df_ula_included)

# Split data into training and test sets for both models using a 80/20 split
train_data_excluded, test_data_excluded = df_ula_excluded.randomSplit([0.8, 0.2], seed=1234)
train_data_included, test_data_included = df_ula_included.randomSplit([0.8, 0.2], seed=1234)

# Initialize the RandomForestRegressor with our features
rf = RandomForestRegressor(featuresCol='features', labelCol='Adj Close')

# Train models on both datasets
model_excluded = rf.fit(train_data_excluded)
model_included = rf.fit(train_data_included)

# Make predictions with the model excluding March 2020 to November 2020
predictions_excluded_ULA = model_excluded.transform(test_data_excluded)

# Make predictions with the model including the full period
predictions_included_ULA = model_included.transform(test_data_included)

In [None]:
# Order predictions by most current date
predictions_excluded = predictions_excluded_ULA.orderBy(col('Date').desc())
predictions_included = predictions_included_ULA.orderBy(col('Date').desc())

# Show predictions for the model excluding the period (March 2020 - Nov 2020)
print("Predictions with March 2020 to November 2020 Excluded for ULA:")
predictions_excluded.select('Date', 'Adj Close', 'prediction').show()

# Show predictions for the model including the full period
print("Predictions with March 2020 to November 2020 Included for ULA:")
predictions_included.select('Date', 'Adj Close', 'prediction').show()

Predictions with March 2020 to November 2020 Excluded for ULA:
+----------+---------+-----------------+
|      Date|Adj Close|       prediction|
+----------+---------+-----------------+
|2024-12-16|    95.48|91.91758769935635|
|2024-12-12|    95.97|91.76909395785975|
|2024-12-09|    96.02|93.10904919643271|
|2024-12-04|    99.25|92.83645499353416|
|2024-11-29|    96.83|91.76909395785975|
|2024-11-26|    96.51|94.18720559029455|
|2024-11-15|    91.17|92.83645499353416|
|2024-11-14|    91.16|93.10904919643271|
|2024-11-13|    89.78|94.18720559029455|
|2024-10-31|    78.26|81.53468107150185|
|2024-10-25|    74.64|75.44595261833332|
|2024-10-18|    74.15|75.44595261833332|
|2024-10-14|    63.53|61.04721559992731|
|2024-10-10|    60.26|60.22971514722123|
|2024-10-09|    59.43|61.04721559992731|
|2024-09-27|    57.99|58.85991700937852|
|2024-09-19|    52.59|53.10741635041886|
|2024-09-05|    45.12|44.78940737269865|
|2024-09-03|    43.87|43.36739274476998|
|2024-08-22|    41.44| 42.059539345

In [None]:
# Extract the year from the date column for both predictions, as we are averaging on that
predictions_excluded_UAL = predictions_excluded.withColumn('year', year(predictions_excluded['date']))
predictions_included_UAL = predictions_included.withColumn('year', year(predictions_included['date']))

# Calculate average predcted 'Adj Close' per year for the excluded model
avg_predictions_excluded_UAL = predictions_excluded_UAL.groupBy('year').agg(avg('prediction').alias('avg_prediction_excluded'))

# Calculate average predicted 'Adj Close' per year for the included model
avg_predictions_included_UAL = predictions_included_UAL.groupBy('year').agg(avg('prediction').alias('avg_prediction_included'))

# Show results for both models
avg_predictions_excluded_UAL.show()
avg_predictions_included_UAL.show()

# Join the two dataframes on 'year'
combined_avg_predictions_UAL = avg_predictions_excluded_UAL.join(
    avg_predictions_included_UAL, on='year', how='inner'
)

# Sort by year
combined_avg_predictions_UAL = combined_avg_predictions_UAL.orderBy('year')

# Calculate the difference between the predictions
combined_avg_predictions_UAL = combined_avg_predictions_UAL.withColumn(
    'avg_difference',
    col('avg_prediction_included') - col('avg_prediction_excluded')
)

# Calculate the average difference across all years
average_difference_UAL = combined_avg_predictions_UAL.agg(avg('avg_difference')).collect()[0][0]

# Show the combined table
combined_avg_predictions_UAL.show()

# Display the average difference
print(f"Average difference between the included and excluded models for UAL: {average_difference_UAL}")

+----+-----------------------+
|year|avg_prediction_excluded|
+----+-----------------------+
|2007|      41.56448663717772|
|2018|      78.51366831327475|
|2015|         60.56287020196|
|2023|      46.14816236598309|
|2006|      34.13746484957114|
|2022|      40.10374645605257|
|2013|     30.840157695158073|
|2014|      48.09991846465564|
|2019|      86.39343898605311|
|2020|      66.82956132592668|
|2012|     21.144983636153935|
|2009|      7.186706660296881|
|2016|      52.78022757251588|
|2024|     55.354360482156274|
|2010|     21.497608288154733|
|2011|      21.06704034069232|
|2008|     15.330729998397471|
|2017|      69.21397555153219|
|2021|      48.63963051333712|
+----+-----------------------+

+----+-----------------------+
|year|avg_prediction_included|
+----+-----------------------+
|2007|      41.56756582890751|
|2018|      78.45727383037509|
|2015|     60.330512559399075|
|2023|     45.356449580670635|
|2006|      34.09219986086741|
|2022|      41.05528013716842|
|2013| 

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("RandomForestRegressor") \
    .getOrCreate()

# Download the dataset using Kaggle Hub API
path = kagglehub.dataset_download("joebeachcapital/s-and-p500-index-stocks-daily-updated")

# Load the dataset
df = spark.read.csv(path, header=True, inferSchema=True)

# Filter for ticker 'MRNA'
df_mrna = df.filter(df['Ticker'] == 'MRNA')

# Check if its loaded properly
print(df_mrna)

# Ensure date column is in good format
df_mrna = df_mrna.withColumn('date', to_date(df_mrna['date'], 'yyyy-MM-dd'))

# Select relevant columns
df_mrna = df_mrna.select('Date', 'Open', 'High', 'Low', 'Volume', 'Adj Close')

# --- Create datasets for training models ---

# Exclude the period from March 2020 to November 2020
df_mrna_excluded = df_mrna.filter((df_mrna['date'] < '2020-03-01') | (df_mrna['date'] > '2020-11-30'))

# Include the full period (no exclusion)
df_mrna_included = df_mrna

# Assemble features for both datasets
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Volume'], outputCol='features')

df_mrna_excluded = assembler.transform(df_mrna_excluded)
df_mrna_included = assembler.transform(df_mrna_included)

# Split data into training and test sets for both models using 80/20 split
train_data_excluded, test_data_excluded = df_mrna_excluded.randomSplit([0.8, 0.2], seed=1234)
train_data_included, test_data_included = df_mrna_included.randomSplit([0.8, 0.2], seed=1234)

# Initialize the RandomForestRegressor
rf = RandomForestRegressor(featuresCol='features', labelCol='Adj Close')

# Train the two models on both datasets separatly
model_excluded = rf.fit(train_data_excluded)
model_included = rf.fit(train_data_included)

# Make predictions with the model excluding March 2020 to November 2020
predictions_excluded = model_excluded.transform(test_data_excluded)

# Make predictions with the model including the full period
predictions_included = model_included.transform(test_data_included)

DataFrame[Date: date, Ticker: string, Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: int]


In [None]:
# Order predictions by most current date for MRNA
predictions_excluded = predictions_excluded.orderBy(col('Date').desc())
predictions_included = predictions_included.orderBy(col('Date').desc())

# Show predictions for the model excluding the period (March 2020 - Nov 2020)
print("Predictions with March 2020 to November 2020 Excluded:")
predictions_excluded.select('Date', 'Adj Close', 'prediction').show()

# Show predictions for the model including the full period
print("Predictions with March 2020 to November 2020 Included:")
predictions_included.select('Date', 'Adj Close', 'prediction').show()

Predictions with March 2020 to November 2020 Excluded:
+----------+---------+------------------+
|      Date|Adj Close|        prediction|
+----------+---------+------------------+
|2024-12-10|    41.51| 49.48295474782432|
|2024-12-09|    45.65| 52.97521993187932|
|2024-11-27|    43.39| 37.98185406613086|
|2024-11-18|    39.51| 37.98185406613086|
|2024-11-08|    46.83| 52.21777993187932|
|2024-10-25|    53.09| 56.83336278061413|
|2024-10-23|    53.39| 56.83336278061413|
|2024-10-15|    57.31| 56.83336278061413|
|2024-09-25|    63.64| 56.83336278061413|
|2024-09-23|    64.14| 64.24921921181814|
|2024-09-11|    79.51| 82.33938105757241|
|2024-09-04|    72.49| 73.37721880738123|
|2024-08-29|    77.59| 78.88745996180083|
|2024-08-21|    86.65| 83.39868088633952|
|2024-08-13|     82.9| 82.33938105757241|
|2024-08-08|    84.32| 82.33938105757241|
|2024-08-02|    86.58| 91.42338352932623|
|2024-07-30|   118.84|123.93514167600327|
|2024-07-23|   121.25| 123.7435991925184|
|2024-07-18|   121.32

In [None]:
from pyspark.sql.functions import avg

# Extract the year from the date column for both predictions
predictions_excluded = predictions_excluded.withColumn('year', year(predictions_excluded['date']))
predictions_included = predictions_included.withColumn('year', year(predictions_included['date']))

# Calculate average predicted 'Adj Close' per year for the excluded model
avg_predictions_excluded = predictions_excluded.groupBy('year').agg(avg('prediction').alias('avg_prediction_excluded'))

# Calculate average predicted 'Adj Close' per year for the included model
avg_predictions_included = predictions_included.groupBy('year').agg(avg('prediction').alias('avg_prediction_included'))

# Show results for both models
avg_predictions_excluded.show()
avg_predictions_included.show()

# Join the two dataframes on 'year'
combined_avg_predictions = avg_predictions_excluded.join(
    avg_predictions_included, on='year', how='inner'
)

# Sort by year
combined_avg_predictions = combined_avg_predictions.orderBy('year')

# Calculate the difference between the predictions
combined_avg_predictions = combined_avg_predictions.withColumn(
    'avg_difference',
    col('avg_prediction_included') - col('avg_prediction_excluded')
)

# Calculate the average difference across all years
average_difference = combined_avg_predictions.agg(avg('avg_difference')).collect()[0][0]

# Show the combined table
combined_avg_predictions.show()

# Display the average difference
print(f"Average difference between the included and excluded models: {average_difference}")



+----+-----------------------+
|year|avg_prediction_excluded|
+----+-----------------------+
|2018|     15.072866430304833|
|2023|      124.3708852895942|
|2022|     153.51758897310285|
|2019|      18.70342812154918|
|2020|      68.33704624971848|
|2024|      99.60344564583029|
|2021|     236.94594864113688|
+----+-----------------------+

+----+-----------------------+
|year|avg_prediction_included|
+----+-----------------------+
|2018|     15.774466118934521|
|2023|     121.21571485849024|
|2022|     156.60215192121143|
|2019|      18.70475089859806|
|2020|      65.75601057507264|
|2024|     107.72543459448099|
|2021|      255.2800051063015|
+----+-----------------------+

+----+-----------------------+-----------------------+--------------------+
|year|avg_prediction_excluded|avg_prediction_included|      avg_difference|
+----+-----------------------+-----------------------+--------------------+
|2018|     15.072866430304833|     15.774466118934521|  0.7015996886296882|
|2019|      

In [None]:
# Exclude condition for covid, we will take it out the dataset to check its performance
exclude_condition = (
        (col("Year") == 2020) & (col("Month") <= 11) & (col("Month") >= 3)
)

# Filter the dataset to exclude the specified range, COVID
filtered_data = data.filter(~exclude_condition)

assembler_excluded = VectorAssembler(inputCols=[col for col in filtered_data.columns if col not in ['Date','Ticker'] and col!='Adj Close'],outputCol="features")
assembled_data_excluded = assembler_excluded.transform(filtered_data)

final_data_excluded = assembled_data_excluded.select("features", "Adj Close")

train_excluded,test_excluded = final_data_excluded.randomSplit([0.8,0.2], seed=42)

In [None]:
rf_filtered = RandomForestRegressor(featuresCol="features", labelCol="Adj Close", seed=42, numTrees=100,maxBins=502)
rf_filtered.setSeed(42)

model_filtered = rf_filtered.fit(train_excluded)

# Make predictions for model without COVID period
predictions = model_filtered.transform(test_excluded)

#Check how good the prediction is
evaluator = RegressionEvaluator(labelCol="Adj Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

predictions.select("prediction", "Adj Close", "features").show(5)

Root Mean Squared Error (RMSE): 32.785446738044705
+------------------+---------+--------------------+
|        prediction|Adj Close|            features|
+------------------+---------+--------------------+
|12.239416838123175| 0.142361|[0.143229,0.15277...|
|12.239416838123175| 0.159444|[0.147778,0.19055...|
|12.239416838123175| 0.145833|[0.152778,0.15277...|
|12.239416838123175| 0.159722|[0.159722,0.15972...|
|12.239416838123175| 0.166667|[0.163194,0.16666...|
+------------------+---------+--------------------+
only showing top 5 rows



In [5]:
#This section is to exclude both covid and the financial crisis
exclude_condition = (
        (col("Year") == 2020) & (col("Month") <= 11) & (col("Month") >= 3)|
        (col("Year") == 2007) & (col("Month") <= 12) & (col("Month")>=2) |
        (col("Year") > 2007) & (col("Year") < 2009) |
        (col("Year") == 2009) & (col("Month") <= 4)
)


# Filter the dataset to exclude the specified range
filtered_data = data.filter(~exclude_condition)


assembler_excluded = VectorAssembler(inputCols=[col for col in filtered_data.columns if col not in ['Date','Ticker'] and col!='Adj Close'],outputCol="features")

assembled_data_excluded = assembler_excluded.transform(filtered_data)

final_data_excluded = assembled_data_excluded.select("features", "Adj Close")

train_excluded,test_excluded = final_data_excluded.randomSplit([0.8,0.2], seed=42)

In [6]:
rf_filtered = RandomForestRegressor(featuresCol="features", labelCol="Adj Close", seed=42, numTrees=100,maxBins=502)
rf_filtered.setSeed(42)

model_filtered = rf_filtered.fit(train_excluded)


# Make the predictions for model without both periods
predictions = model_filtered.transform(test_excluded)


#Check how good the prediction is
evaluator = RegressionEvaluator(labelCol="Adj Close", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

predictions.select("prediction", "Adj Close", "features").show(5)

Root Mean Squared Error (RMSE): 28.050697753545084
+------------------+---------+--------------------+
|        prediction|Adj Close|            features|
+------------------+---------+--------------------+
|12.595238883717467| 0.142361|[0.143229,0.15277...|
|12.595238883717467| 0.159444|[0.147778,0.19055...|
|12.595238883717467| 0.145833|[0.152778,0.15277...|
|12.595238883717467| 0.159722|[0.159722,0.15972...|
|12.595238883717467| 0.166667|[0.163194,0.16666...|
+------------------+---------+--------------------+
only showing top 5 rows

