In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Import CSV files into spark DataFrame.
spy_spark_df = spark.read.csv("SPY.csv")
vix_spark_df = spark.read.csv("VIX.csv")

# Convert to Pandas DataFrame for cleaning.
import pandas as pd
spy_df = spy_spark_df.toPandas()
vix_df = vix_spark_df.toPandas()

In [None]:
# Drop unnecessary columns.
vix_df = vix_df.drop(columns=['_c1', '_c2', '_c3', '_c5', '_c6'])

# Rename columns and drop index numbers.
spy_df = spy_df.rename(columns={"_c0": "Date", "_c1": "Open", "_c2": "High", "_c3": "Low", "_c4": "Close", "_c5": "Adj Close", "_c6": "Volume"})
spy_df = spy_df.drop([0])

vix_df = vix_df.rename(columns={"_c0": "Date", "_c4": "VIX"})
vix_df = vix_df.drop([0])

In [None]:
# Merge the SPY and VIX DataFrames.
combined_df = pd.merge(spy_df, vix_df, how='outer', on='Date')

# Set Date column as the dataframe index.
combined_df['Date'] = pd.to_datetime(combined_df['Date'])
combined_df.set_index('Date', inplace=True)

# Convert data in columns from objects to numeric.
for col in combined_df.columns:
    combined_df[col] = pd.to_numeric(combined_df[col], errors='ignore')

# Dropping last column because it contains a NA value.
combined_df = combined_df[:-1]

# Reducing the number size of the volume data to help the model run faster.
combined_df["Volume"] = combined_df["Volume"].div(1000000)
combined_df["Volume"] = combined_df["Volume"].round(1)

combined_df.head()

In [None]:
# Create "SPY Tomorrow" column and "Target" column.
combined_df["SPY Tomorrow"] = combined_df["Adj Close"].shift(-1)
combined_df["Target"] = (combined_df["SPY Tomorrow"] > combined_df["Adj Close"]).astype(int)

combined_df.head()

In [None]:
# Reduced the data's time range.
combined_df = combined_df.loc["2000-01-01":].copy()

In [None]:
# Create Model.
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators=100, min_samples_split=100, random_state=1)

# Test Train Split
train = combined_df.iloc[:-250]  # Training data
test = combined_df.iloc[-250:]   # Test data

# Creating a list of predictors.
predictors = ["Open", "High", "Low", "Close", "Adj Close", "Volume", "VIX"]

# Fit the model.
model.fit(train[predictors], train["Target"])

In [None]:
# Calculate precision accuracy score.
from sklearn.metrics import precision_score

preds = model.predict(test[predictors])
preds = pd.Series(preds, index=test.index)

precision_score(test["Target"], preds)

In [None]:
#Lag Features & SMA
lag_features = ['Adj Close']
lags = range(1, 4)  # for example, create lags for 1, 2, and 3 days
for lag in lags:
  combined_df[f'Adj Close Lag {lag}'] = combined_df['Adj Close'].shift(lag)

# Add a simple moving average (SMA) for 'Adj Close'
window = 5  # 5-day SMA
combined_df['SMA 5'] = combined_df['Adj Close'].rolling(window=window).mean()

# Handle missing values introduced by lag and SMA features
combined_df.dropna(inplace=True)

In [None]:
#Create a new Model.
model_1 = RandomForestClassifier(n_estimators=100, min_samples_split=100, random_state=1)

# Test Train Split
train_1 = combined_df.iloc[:-250]  # Training data
test_1 = combined_df.iloc[-250:]   # Test data

# Updating list of predictors.
predictors_1 = ["High", "Low", "Adj Close", "Volume", "VIX", "Adj Close Lag 1", "Adj Close Lag 2", "Adj Close Lag 3", "SMA 5"]

# Fit the optimized model
model_1.fit(train_1[predictors_1], train_1["Target"])

In [None]:
# Recalculate precision accuracy score.
preds_1 = model_1.predict(test_1[predictors_1])
preds_1 = pd.Series(preds_1, index=test_1.index)

precision_score(test_1["Target"], preds_1)

In [None]:
# Import CSV files into spark DataFrame
tnx_spark_df = spark.read.csv("TNX.csv")

# Convert to Pandas dataframe for cleaning
tnx_df = tnx_spark_df.toPandas()
tnx_df = tnx_df.drop(columns=['_c1', '_c2', '_c3', '_c5', '_c6'])

# Rename columns
tnx_df = tnx_df.rename(columns={"_c0": "Date", "_c4": "10-Y Treasury"})
tnx_df = tnx_df.drop([0])

# Convert "Date" column to datetime and set as index.
tnx_df['Date'] = pd.to_datetime(tnx_df['Date'])
tnx_df.set_index('Date', inplace=True)

# Convert all other columns to numeric.
for col in tnx_df.columns:
     tnx_df[col] = pd.to_numeric(tnx_df[col], errors='ignore')

In [None]:
# Merged the tnx_df with the combined_df
combined_df = pd.merge(combined_df, tnx_df, how = "outer", on = "Date")
combined_df = combined_df[:-1]
combined_df = combined_df.loc["2000-01-01":].copy()
combined_df = combined_df.dropna()

In [None]:
#Create a new Model.
model_2 = RandomForestClassifier(n_estimators=100, min_samples_split=100, random_state=1)

# Test Train Split
train_2 = combined_df.iloc[:-250]  # Training data
test_2 = combined_df.iloc[-250:]   # Test data

# Updating list of predictors.
predictors_2 = ["High", "Low", "Adj Close", "Volume", "VIX", "Adj Close Lag 1", "Adj Close Lag 2", "Adj Close Lag 3", "SMA 5", "10-Y Treasury"]

# Fit the optimized model
model_2.fit(train_2[predictors_2], train_2["Target"])

In [None]:
# Recalculate precision accuracy score.
preds_2 = model_2.predict(test_2[predictors_2])
preds_2 = pd.Series(preds_2, index=test_2.index)

precision_score(test_2["Target"], preds_2)

In [None]:
# Create plot to visualize the different indexes.
combined_df.plot.line(y=['Adj Close','VIX', '10-Y Treasury'], use_index=True)