In [None]:
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import r2_score
from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import xgboost as xgb
import seaborn as sns

from src.data.preprocess import extend_market_data

# Getting and preprocessing the data

In [None]:
SRW = yf.Ticker("ZW=F")
SRW_data = SRW.history(start ="2014-01-01").drop(['Dividends', 'Stock Splits'], axis=1)
SRW_data = extend_market_data(SRW_data)
SRW_data['Target'] = SRW_data['Log_Return'].shift(-1)
SRW_data.dropna(inplace=True)
SRW_data.head()

# Defining the feature columns and the train test split

In [None]:
feature_cols = ['Close', 'Volume', 'Day_Of_Year', 'Year', 'Month', 'Day' ,'DTE', '7D_Volatility', '14D_ATR', '7D_MA', '7D_EMA', '14D_RSI']
splitting_point = '2024-01-01'

scaler = StandardScaler()

X_test_pd = SRW_data[SRW_data.index >= splitting_point][feature_cols]
y_test = SRW_data[SRW_data.index >= splitting_point]['Target']
X_train_pd = SRW_data[SRW_data.index < splitting_point][feature_cols]
y_train = SRW_data[SRW_data.index < splitting_point]['Target']

X_train = scaler.fit_transform(X_train_pd)
X_test = scaler.transform(X_test_pd)

# Linear regression model with CV

In [None]:
tscv = TimeSeriesSplit(n_splits=5)
model = LinearRegression()

cv_scores = cross_val_score(model, X_train, y_train, cv=tscv, scoring='r2')
print(f'Cross-validation R^2 scores on the training set: {cv_scores}')
print(f'Average R^2 score: {np.mean(cv_scores):.4f}')

model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(f"The R^2 score on the testing set: {r2_score(y_test, y_pred):.4f}")

plt.figure(figsize=(16, 6))
plt.title('The Linear Regression Model on the testing set')
sns.lineplot(x=y_test.index, y=y_pred, label='prediction')
sns.lineplot(x=y_test.index, y=y_test, label='actual')

plt.figure(figsize=(16, 6))
plt.title('The Linear Regression Model on the training set')
sns.lineplot(x=y_train[2000:].index, y=model.predict(X_train[2000:]), label='prediction')
sns.lineplot(x=y_train[2000:].index, y=y_train[2000:], label='actual', alpha = 0.5)



In [None]:
((y_pred * y_test) >0).sum()/len(y_pred)

In [None]:
len(y_pred)

### Understanding the Feature Importance

In [None]:
coefficients = model.coef_
coefficients_df = pd.DataFrame({"Feature": feature_cols, "Coefficient": coefficients}).sort_values(by="Coefficient", key=abs, ascending=False)
plt.figure(figsize=(10, 5))
plt.barh(coefficients_df['Feature'], coefficients_df['Coefficient'], color='skyblue')
plt.xlabel('Coefficient Value')
plt.ylabel('Feature')
plt.title('Feature Importance in Linear Regression')
plt.gca().invert_yaxis()  # Invert y-axis for better readability
plt.show()

In [None]:
correlation_matrix = X_train_pd.merge(y_train, left_index=True, right_index=True).corr()

plt.figure(figsize=(10, 10))
sns.heatmap(correlation_matrix, annot=True, fmt=".2f", cmap="coolwarm", linewidths=0.5)
plt.title("Feature Correlation Matrix")
plt.show()

# Lasso regression on the market data

In [None]:
from sklearn.linear_model import Lasso
lasso_model = Lasso(alpha=0.0001)
lasso_model.fit(X_train, y_train)

cv_scores = cross_val_score(lasso_model, X_train, y_train, cv=tscv, scoring='r2')
print(f'Cross-validation R^2 scores on the training set: {cv_scores}')
print(f'Average R^2 score: {np.mean(cv_scores):.4f}')

lasso_model.fit(X_train, y_train)
y_pred = lasso_model.predict(X_test)
print(f"The Lasso Regression R^2 score on the testing set: {r2_score(y_test, y_pred):.4f}")

plt.figure(figsize=(16, 6))
plt.title('The Linear Regression Model on the testing set')
sns.lineplot(x=y_test.index, y=y_pred, label='prediction')
sns.lineplot(x=y_test.index, y=y_test, label='actual')

plt.figure(figsize=(16, 6))
plt.title('The Lasso Regression Model on the training set')
sns.lineplot(x=y_train[1000:].index, y=lasso_model.predict(X_train[1000:]), label='prediction')
sns.lineplot(x=y_train[1000:].index, y=y_train[1000:], label='actual', alpha = 0.3)

coefficients = lasso_model.coef_
coefficients_df = pd.DataFrame({"Feature": feature_cols, "Coefficient": coefficients}).sort_values(by="Coefficient", key=abs, ascending=False)
plt.figure(figsize=(10, 5))
plt.barh(coefficients_df['Feature'], coefficients_df['Coefficient'], color='skyblue')
plt.xlabel('Coefficient Value')
plt.ylabel('Feature')
plt.title('Feature Importance in Linear Regression')
plt.gca().invert_yaxis()  # Invert y-axis for better readability
plt.show()

## Linear regression model on the training set without 2022 data

In [None]:
X_train_new = X_train_pd[X_train_pd.index.year != 2022]
y_train_new = y_train[X_train_pd.index.year != 2022]

X_train_new = scaler.fit_transform(X_train_new)
X_test = scaler.transform(X_test_pd)

tscv = TimeSeriesSplit(n_splits=5)
model = LinearRegression()

cv_scores = cross_val_score(model, X_train_new, y_train_new, cv=tscv, scoring='r2')
print(f'Cross-validation R^2 scores on the training set: {cv_scores}')
print(f'Average R^2 score: {np.mean(cv_scores):.4f}')

model.fit(X_train_new, y_train_new)
y_pred = model.predict(X_test)
print(f"The R^2 score on the testing set: {r2_score(y_test, y_pred):.4f}")

plt.figure(figsize=(16, 6))
plt.title('The Linear Regression Model on the testing set')
sns.lineplot(x=y_test.index, y=y_pred, label='prediction')
sns.lineplot(x=y_test.index, y=y_test, label='actual')

plt.figure(figsize=(16, 6))
plt.title('The Linear Regression Model on the training set')
sns.lineplot(x=y_train_new[1500:].index, y=model.predict(X_train_new[1500:]), label='prediction')
sns.lineplot(x=y_train_new[1500:].index, y=y_train_new[1500:], label='actual', alpha = 0.3)


In [None]:
coefficients = model.coef_
coefficients_df = pd.DataFrame({"Feature": feature_cols, "Coefficient": coefficients}).sort_values(by="Coefficient", key=abs, ascending=False)
plt.figure(figsize=(10, 5))
plt.barh(coefficients_df['Feature'], coefficients_df['Coefficient'], color='skyblue')
plt.xlabel('Coefficient Value')
plt.ylabel('Feature')
plt.title('Feature Importance in Linear Regression')
plt.gca().invert_yaxis()  # Invert y-axis for better readability
plt.show()

# XGB Model with CV

In [None]:
tscv = TimeSeriesSplit(n_splits=10)
xgb_model = xgb.XGBRegressor(
    n_estimators=1000,
    learning_rate=0.01,
    max_depth=3,
    eval_metric=r2_score,
#    device='cuda',
    reg_alpha=0.1,
    reg_lambda=0.1,
#    gamma=0.1,
#    early_stopping_rounds=250,
#    min_child_weight=5,
#    subsample=0.7,
#    colsample_bytree=0.7,
)

cv_scores = cross_val_score(xgb_model, X_train_new, y_train_new, cv=tscv, scoring='r2')
print(f'Cross-validation R2 scores: {cv_scores}')  # Convert to positive MAE values
print(f'Average R2: {np.mean(cv_scores):.4f}')

In [None]:
xgb_model.fit(X_train_new, y_train_new, verbose=50,eval_set=[(X_train_new, y_train_new), (X_test, y_test)])
y_pred = xgb_model.predict(X_test)

plt.figure(figsize=(16, 6))
plt.title('The XGBoost Model on the testing set')
sns.lineplot(x=y_test.index, y=y_pred, label='prediction')
sns.lineplot(x=y_test.index, y=y_test, label='actual', alpha = 0.3)

plt.figure(figsize=(16, 6))
plt.title('The XGBoost Model on the training set')
sns.lineplot(x=y_train_new[2000:].index, y=xgb_model.predict(X_train_new[2000:]), label='prediction')
sns.lineplot(x=y_train_new[2000:].index, y=y_train_new[2000:], label='actual', alpha = 0.3)



In [None]:
((y_pred * y_test) >0).sum()/len(y_pred)

# NN Model

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

In [None]:
# Converting the training set and testing set to the torch tensors
X_train, y_train = torch.tensor(X_train_new, dtype=torch.float32), torch.tensor(y_train_new, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)

# Create DataLoader class
BATCH_SIZE = 32
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=BATCH_SIZE, shuffle=False)

In [None]:
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(12, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        output = self.linear_relu_stack(x)
        return output

model = NeuralNetwork().to(device)
print(model)

In [None]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 10 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
#    test_loss /= num_batches
    print(f"Test loss: \n: {test_loss:>0.8f} \n")

In [None]:
epochs = 500
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_loader, model, loss_fn, optimizer)
    test(test_loader, model, loss_fn)
print("Done!")

In [None]:
model.eval()
with torch.no_grad():
    y_pred_test = model(X_test.to(device)).squeeze().cpu().numpy()

y_pred_test_rescaled = y_pred_test.reshape(-1, 1)
y_test_rescaled = y_test.numpy().reshape(-1, 1)

# Plot results

plt.figure(figsize=(12, 6))
plt.plot(y_pred_test_rescaled, label='Predicted')
plt.plot(y_test_rescaled, label='Actual')
plt.legend()
plt.title("NN Model Predictions on testing set")
plt.show()

print(r2_score(y_test_rescaled, y_pred_test_rescaled))

In [None]:
((y_test_rescaled * y_pred_test_rescaled) >0).sum()/len(y_test_rescaled)

In [None]:
model.eval()
with torch.no_grad():
    y_pred_train = model(X_train.to(device)).squeeze().cpu().numpy()

y_pred_train_rescaled = y_pred_train.reshape(-1, 1)
y_train_rescaled = y_train.numpy().reshape(-1, 1)

# Plot results

plt.figure(figsize=(16, 10))
plt.plot(y_train_rescaled[2000:], label='Predicted', alpha=0.7)
plt.plot(y_pred_train_rescaled[2000:], label='Actual')
plt.legend()
plt.title("NN Model Predictions on Training set")
plt.show()

print(r2_score(y_train_rescaled , y_pred_train_rescaled))

# LSTM Model

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler


In [None]:
def create_sequences(data, seq_length):
    sequences = []
    targets = []
    for i in range(len(data) - seq_length):
        sequences.append(data[i:i+seq_length])
        targets.append(data[i+seq_length][-1])
    return np.array(sequences), np.array(targets)

data = SRW_data[feature_cols + ['Target']]

scaler = MinMaxScaler(feature_range=(-1, 1))
data_scaled = scaler.fit_transform(data)

SEQ_LENGTH = 60
X, y = create_sequences(data_scaled, SEQ_LENGTH)

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X[:-500], dtype=torch.float32), torch.tensor(y[:-500], dtype=torch.float32)
X_test, y_test = torch.tensor(X[-500:], dtype=torch.float32), torch.tensor(y[-500:], dtype=torch.float32)

# Reshape for LSTM (batch_size, seq_length, num_features)
X_train = X_train.view(-1, SEQ_LENGTH, 13)
X_test = X_test.view(-1, SEQ_LENGTH, 13)

# Create DataLoader
BATCH_SIZE = 32
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=BATCH_SIZE, shuffle=False)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim=13, hidden_dim=64, num_layers=2, output_dim=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        out = self.fc(lstm_out[:, -1, :])  # Take last output from LSTM
        return out

# Initialize Model
model = LSTMModel()


In [None]:
# Define Loss and Optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training Loop
EPOCHS = 1000
for epoch in range(EPOCHS):
    model.train()
    epoch_loss = 0
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        y_pred = model(batch_x)
        loss = criterion(y_pred.squeeze(), batch_y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{EPOCHS}], Loss: {epoch_loss/len(train_loader):.6f}')

In [None]:
model.eval()
with torch.no_grad():
    y_pred_test = model(X_test).squeeze().cpu().numpy()

# Inverse transform predictions
# y_pred_test_rescaled = scaler.inverse_transform(y_pred_test.reshape(-1, 1))
# y_test_rescaled = scaler.inverse_transform(y_test.numpy().reshape(-1, 1))

y_pred_test_rescaled = y_pred_test.reshape(-1, 1)
y_test_rescaled = y_test.numpy().reshape(-1, 1)

# Plot results

plt.figure(figsize=(12, 6))
plt.plot(data.index[-500:], y_test_rescaled, label='Actual')
plt.plot(data.index[-500:], y_pred_test_rescaled, label='Predicted')
plt.legend()
plt.title("LSTM Model Predictions on testing set")
plt.show()

print(f"r2 score {r2_score(y_test_rescaled, y_pred_test_rescaled):.4f}")

In [None]:
model.eval()
with torch.no_grad():
    y_pred_train = model(X_train).squeeze().cpu().numpy()

# Inverse transform predictions
# y_pred_test_rescaled = scaler.inverse_transform(y_pred_test.reshape(-1, 1))
# y_test_rescaled = scaler.inverse_transform(y_test.numpy().reshape(-1, 1))

y_pred_train_rescaled = y_pred_train.reshape(-1, 1)
y_train_rescaled = y_train.numpy().reshape(-1, 1)

# Plot results

plt.figure(figsize=(12, 6))
plt.plot(data.index[60:-500], y_train_rescaled, label='Actual')
plt.plot(data.index[60:-500], y_pred_train_rescaled, label='Predicted')
plt.legend()
plt.title("LSTM Model Predictions on training set")
plt.show()

print(f"r2 score {r2_score(y_train_rescaled, y_pred_train_rescaled):.4f}")