# Stonk
We aim to forcast stock price value by using LSTM with Pytorch.

# Setup

In [182]:
import numpy as np
import pandas as pd

import yfinance as yf

from tqdm.notebook import tqdm

# Data

In [183]:
def get_stock_data(ticker, start, end):
    stock_data = yf.Ticker(ticker).history(start=start, end=end)
    return stock_data

In [184]:
snp_hist = get_stock_data('^GSPC', start="2004-08-19", end="2019-10-05")

snp_hist

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2004-08-19 00:00:00-04:00,1095.170044,1095.170044,1086.280029,1091.229980,1249400000,0.0,0.0
2004-08-20 00:00:00-04:00,1091.229980,1100.260010,1089.569946,1098.349976,1199900000,0.0,0.0
2004-08-23 00:00:00-04:00,1098.349976,1101.400024,1094.729980,1095.680054,1021900000,0.0,0.0
2004-08-24 00:00:00-04:00,1095.680054,1100.939941,1092.819946,1096.189941,1092500000,0.0,0.0
2004-08-25 00:00:00-04:00,1096.189941,1106.290039,1093.239990,1104.959961,1192200000,0.0,0.0
...,...,...,...,...,...,...,...
2019-09-30 00:00:00-04:00,2967.070068,2983.850098,2967.070068,2976.739990,3249130000,0.0,0.0
2019-10-01 00:00:00-04:00,2983.689941,2992.530029,2938.699951,2940.250000,3560750000,0.0,0.0
2019-10-02 00:00:00-04:00,2924.780029,2924.780029,2874.929932,2887.610107,3914180000,0.0,0.0
2019-10-03 00:00:00-04:00,2885.379883,2911.129883,2855.939941,2910.629883,3515130000,0.0,0.0


In [185]:
df = snp_hist[['Open', 'High', 'Low', 'Close', 'Volume']].copy()

df

Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2004-08-19 00:00:00-04:00,1095.170044,1095.170044,1086.280029,1091.229980,1249400000
2004-08-20 00:00:00-04:00,1091.229980,1100.260010,1089.569946,1098.349976,1199900000
2004-08-23 00:00:00-04:00,1098.349976,1101.400024,1094.729980,1095.680054,1021900000
2004-08-24 00:00:00-04:00,1095.680054,1100.939941,1092.819946,1096.189941,1092500000
2004-08-25 00:00:00-04:00,1096.189941,1106.290039,1093.239990,1104.959961,1192200000
...,...,...,...,...,...
2019-09-30 00:00:00-04:00,2967.070068,2983.850098,2967.070068,2976.739990,3249130000
2019-10-01 00:00:00-04:00,2983.689941,2992.530029,2938.699951,2940.250000,3560750000
2019-10-02 00:00:00-04:00,2924.780029,2924.780029,2874.929932,2887.610107,3914180000
2019-10-03 00:00:00-04:00,2885.379883,2911.129883,2855.939941,2910.629883,3515130000


In [186]:
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
pio.templates.default = "plotly_white"

plot_template = dict(
    layout=go.Layout({
        "font_size": 18,
        "xaxis_title_font_size": 24,
        "yaxis_title_font_size": 24})
)

fig = px.line(df['Open'], labels=dict(
    created_at="Date", value="Open", variable="Sensor"
))
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()

## Create the target variable

In [187]:
def shift_lead(df, target, target_col, forecast_lead):
    df[target_col] = df[target].shift(-forecast_lead)
    df = df.iloc[:-forecast_lead]
    return df

In [188]:
target = "Open"
features = list(df.columns.difference([target]))

forecast_lead = 1
target_col = f"{target}_lead{forecast_lead}"

df = shift_lead(df, target, target_col, forecast_lead)

df

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Open_lead1
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2004-08-19 00:00:00-04:00,1095.170044,1095.170044,1086.280029,1091.229980,1249400000,1091.229980
2004-08-20 00:00:00-04:00,1091.229980,1100.260010,1089.569946,1098.349976,1199900000,1098.349976
2004-08-23 00:00:00-04:00,1098.349976,1101.400024,1094.729980,1095.680054,1021900000,1095.680054
2004-08-24 00:00:00-04:00,1095.680054,1100.939941,1092.819946,1096.189941,1092500000,1096.189941
2004-08-25 00:00:00-04:00,1096.189941,1106.290039,1093.239990,1104.959961,1192200000,1104.959961
...,...,...,...,...,...,...
2019-09-27 00:00:00-04:00,2985.469971,2987.310059,2945.530029,2961.790039,3246480000,2967.070068
2019-09-30 00:00:00-04:00,2967.070068,2983.850098,2967.070068,2976.739990,3249130000,2983.689941
2019-10-01 00:00:00-04:00,2983.689941,2992.530029,2938.699951,2940.250000,3560750000,2924.780029
2019-10-02 00:00:00-04:00,2924.780029,2924.780029,2874.929932,2887.610107,3914180000,2885.379883


In [189]:
print(f"features: {features}, target_col: {target_col}")

features: ['Close', 'High', 'Low', 'Volume'], target_col: Open_lead1


## Create a hold-out test set and preprocess the data

In [190]:
test_start = "2019-01-01"
val_start = "2018-01-01"

df_train = df.loc[:val_start].copy()
df_val = df.loc[val_start:test_start].copy()
df_test = df.loc[test_start:].copy()

print("Test set fraction:", len(df_test) / len(df))

Test set fraction: 0.050157563025210086


## Standardize the features and target, based on the training set

In [191]:
target_mean = df_train[target].mean()
target_stdev = df_train[target].std()

for c in df_train.columns:
    mean = df_train[c].mean()
    stdev = df_train[c].std()

    df_train[c] = (df_train[c] - mean) / stdev
    df_val[c] = (df_val[c] - mean) / stdev
    df_test[c] = (df_test[c] - mean) / stdev

## Create datasets that PyTorch `DataLoader` can work with

In [192]:
import torch
from torch.utils.data import Dataset

class SequenceDataset(Dataset):
    def __init__(self, dataframe, target, features, sequence_length=5):
        self.features = features
        self.target = target
        self.sequence_length = sequence_length
        self.y = torch.tensor(dataframe[self.target].values).float()
        self.X = torch.tensor(dataframe[self.features].values).float()

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, i):
        if i >= self.sequence_length - 1:
            i_start = i - self.sequence_length + 1
            x = self.X[i_start:(i + 1), :]
        else:
            padding = self.X[0].repeat(self.sequence_length - i - 1, 1)
            x = self.X[0:(i + 1), :]
            x = torch.cat((padding, x), 0)

        return x, self.y[i]

In [193]:
i = 4
sequence_length = 4

train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
    sequence_length=sequence_length
)

print(f"train len: {len(train_dataset)}")

X, y = train_dataset[i]
print(X)

train len: 3366
tensor([[-0.9922, -1.0070, -0.9909, -1.8666],
        [-0.9983, -1.0044, -0.9792, -2.0027],
        [-0.9971, -1.0055, -0.9836, -1.9487],
        [-0.9773, -0.9934, -0.9826, -1.8725]])


In [194]:
from torch.utils.data import DataLoader
torch.manual_seed(99)

train_loader = DataLoader(train_dataset, batch_size=3, shuffle=True)

X, y = next(iter(train_loader))
print(X.shape)
print(X)

torch.Size([3, 4, 4])
tensor([[[-0.7814, -0.7791, -0.7606, -0.9342],
         [-0.7834, -0.7940, -0.7678, -1.4545],
         [-0.7588, -0.7754, -0.7624, -1.4286],
         [-0.7627, -0.7719, -0.7442, -1.4310]],

        [[ 0.0068, -0.0048,  0.0086, -0.3342],
         [-0.0824, -0.0111, -0.0626,  0.3984],
         [-0.0695, -0.0834, -0.0849, -0.1287],
         [-0.0395, -0.0564, -0.0551, -0.2532]],

        [[ 0.0338,  0.0672,  0.0355,  0.4668],
         [ 0.0103,  0.0215,  0.0182,  0.1918],
         [ 0.0411,  0.0249,  0.0258, -0.0537],
         [ 0.0575,  0.0468,  0.0457, -0.5050]]])


## Create the datasets and data loaders for real

In this tutorial we will
use sequences of length 60 (60 days) to forcast 1 day ahead.

The PyTorch `DataLoader` is a very convenient way to iterate through these datasets. For
the training set we'll shuffle (the rows *within* each training sequence are not
shuffled, only the order in which we draw those blocks). For the test set, shuffling
isn't necessary.

In [195]:
torch.manual_seed(101)

batch_size = 32
sequence_length = 60

train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
    sequence_length=sequence_length
)
val_dataset = SequenceDataset(
    df_val,
    target=target,
    features=features,
    sequence_length=sequence_length
)
test_dataset = SequenceDataset(
    df_test,
    target=target,
    features=features,
    sequence_length=sequence_length
)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

X, y = next(iter(train_loader))

print("Features shape:", X.shape)
print("Target shape:", y.shape)

Features shape: torch.Size([32, 60, 4])
Target shape: torch.Size([32])


# Evaluate Function

In [196]:
import numpy as np
import math
from sklearn.metrics import mean_squared_error

def MAPE(Y_actual,Y_Predicted):
    mape = np.mean(np.abs((Y_actual - Y_Predicted)/Y_actual))*100
    return mape

def MASE(pred, y):
    return float(np.mean(np.abs(pred - y) / np.mean(np.abs(y[1:] - y[:-1]))))

def SMAPE(pred, y):
    return float(200 * np.mean(np.abs(pred - y) / (np.abs(y) + np.abs(pred))))

def MAE(pred, y):
    return float(np.mean(np.abs(pred - y)))

def sharp_ratio(pred, y):
    return float(np.mean((pred - y) / np.std(y)))

def directional_accuracy(Y_actual, Y_predicted):
    actual_changes = np.sign(np.diff(Y_actual))
    predicted_changes = np.sign(np.diff(Y_predicted))
    correct_predictions = np.sum(actual_changes == predicted_changes)
    total_predictions = len(actual_changes)
    directional_accuracy = correct_predictions / total_predictions * 100
    return directional_accuracy

def evaluate(y_true, y_pred, save=False, model_name="model_name"):
    performance = {}
    performance['MAPE'] = MAPE(y_true, y_pred)
    performance['MASE'] = MASE(y_true, y_pred)
    performance['RMSE'] = math.sqrt(mean_squared_error(y_true, y_pred))
    performance['SMAPE'] = SMAPE(y_true, y_pred)
    performance['MAE'] = MAE(y_true, y_pred)
    performance['sharp_ratio'] = sharp_ratio(y_true, y_pred)
    performance['Directional Accuracy'] = directional_accuracy(y_true, y_pred)

    if save:
        pd.DataFrame(performance, index=[model_name]).to_csv(f'../performance/{model_name}.csv')
        
    return performance


In [197]:
evaluate(df_test[target_col], df_test[target_col], save=True, model_name="test")

{'MAPE': 0.0,
 'MASE': nan,
 'RMSE': 0.0,
 'SMAPE': 0.0,
 'MAE': 0.0,
 'sharp_ratio': 0.0,
 'Directional Accuracy': 100.0}

# LSTM

## The model and learning algorithm

![picture](https://i.stack.imgur.com/SjnTl.png)

Credit : https://stackoverflow.com/questions/48302810/whats-the-difference-between-hidden-and-output-in-pytorch-lstm

In [198]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [199]:
from torch import nn

class ShallowRegressionLSTM(nn.Module):
    def __init__(self, num_features, hidden_units):
        super().__init__()
        self.num_features = num_features
        self.hidden_units = hidden_units
        self.num_layers = 4

        self.lstm = nn.LSTM(
            input_size=num_features,
            hidden_size=hidden_units,
            num_layers=self.num_layers,
            batch_first=True,
        )

        self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]
        # print("batch_size :",batch_size)
        # initialize the hidden and cell state of the LSTM layer
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).to(device).requires_grad_()
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).to(device).requires_grad_()

        # _, (hn, _) = self.lstm(x, (h0, c0))
        out, (hn, cn) = self.lstm(x, (h0, c0))
        # print(out.shape, hn.shape, cn.shape)

        out = self.linear(hn[-1]).flatten()  # get the output of the last hidden layer
        # out = self.linear(out[-1]).flatten()  # get the output of the last hidden layer
        return out


In [200]:
learning_rate = 5e-4
num_hidden_units = 60

model = ShallowRegressionLSTM(num_features=len(features), hidden_units=num_hidden_units)
model.to(device)

model_out = model(X.to(device))
print(X.shape, model_out.shape)

torch.Size([32, 60, 4]) torch.Size([32])


In [201]:
from torchinfo import summary

summary(model, input_size=(32, 60, 4))

Layer (type:depth-idx)                   Output Shape              Param #
ShallowRegressionLSTM                    [32]                      --
├─LSTM: 1-1                              [32, 60, 60]              103,680
├─Linear: 1-2                            [32, 1]                   61
Total params: 103,741
Trainable params: 103,741
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 199.07
Input size (MB): 0.03
Forward/backward pass size (MB): 0.92
Params size (MB): 0.41
Estimated Total Size (MB): 1.37

In [202]:
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

## Train

In [203]:
def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    # print( num_batches )
    total_loss = 0
    model.train()

    for X, y in data_loader:
        # print(X.shape, y.shape)
        X = X.to(device)
        y = y.to(device)
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")

def test_model(data_loader, model, loss_function, best_val_loss):

    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            X = X.to(device)
            y = y.to(device)
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    if avg_loss < best_val_loss:
        best_val_loss = avg_loss
        torch.save(model.state_dict(), '../model/lstm.pth')
        print('Save new best model')
    return best_val_loss

In [None]:
best_val_loss = torch.inf
for ix_epoch in tqdm(range(100)):
    print(f"Epoch {ix_epoch}\n---------")
    train_model(train_loader, model, loss_function, optimizer=optimizer)
    best_val_loss = test_model(val_loader, model, loss_function, best_val_loss)
    print()

## Evaluation

In [205]:
def predict(data_loader, model):
    """Just like `test_loop` function but keep track of the outputs instead of the loss
    function.
    """
    output = torch.tensor([])
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            X = X.to(device)
            y_star = model(X)
            output = torch.cat((output, y_star.detach().cpu()), 0)

    return output

In [206]:
PATH = '../model/lstm.pth'
model.load_state_dict(torch.load(PATH))

<All keys matched successfully>

In [207]:
# print lstm params
# for name, param in model.named_parameters():
#     if param.requires_grad:
#         print(name, param.data)

In [208]:
train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
df_train[ystar_col] = predict(train_eval_loader, model).numpy()
df_val[ystar_col] = predict(val_loader, model).numpy()
df_test[ystar_col] = predict(test_loader, model).numpy()

df_out = pd.concat((df_train, df_val, df_test))[[target, ystar_col]]

for c in df_out.columns:
    df_out[c] = df_out[c] * target_stdev + target_mean

display(df_out)

df_out.to_csv('../prediction/lstm.csv')

Unnamed: 0_level_0,Open,Model forecast
Date,Unnamed: 1_level_1,Unnamed: 2_level_1
2004-08-19 00:00:00-04:00,1095.170044,1115.828125
2004-08-20 00:00:00-04:00,1091.229980,1115.854248
2004-08-23 00:00:00-04:00,1098.349976,1115.919189
2004-08-24 00:00:00-04:00,1095.680054,1116.028198
2004-08-25 00:00:00-04:00,1096.189941,1116.210938
...,...,...
2019-09-27 00:00:00-04:00,2985.469971,2465.731201
2019-09-30 00:00:00-04:00,2967.070068,2465.727051
2019-10-01 00:00:00-04:00,2983.689941,2465.704834
2019-10-02 00:00:00-04:00,2924.780029,2465.605469


In [209]:
evaluate(df_test['Open_lead1'], df_test['Model forecast'], save=True, model_name="lstm")

{'MAPE': 30.134621746888286,
 'MASE': inf,
 'RMSE': 0.9434990008011135,
 'SMAPE': 35.72705156374499,
 'MAE': 0.9143282990935728,
 'sharp_ratio': 22.887196882760197,
 'Directional Accuracy': 55.26315789473685}

In [210]:
fig = px.line(df_out, labels={'value': "Open", 'created_at': 'Date'})
fig.add_vline(x=val_start, line_width=4, line_dash="dash")
fig.add_vline(x=test_start, line_width=4, line_dash="dash")
# fig.add_annotation(xref="paper", x=0.75, yref="paper", y=0.8, text="Test set start", showarrow=False)
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()

# GRU

## The model and learning algorithm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
from torch import nn

class ShallowRegressionGRU(nn.Module):
    def __init__(self, num_features, hidden_units):
        super().__init__()
        self.num_features = num_features  # this is the number of features
        self.hidden_units = hidden_units
        self.num_layers = 4

        self.gru = nn.GRU(
            input_size=num_features,
            hidden_size=hidden_units,
            batch_first=True,
            num_layers=self.num_layers
        )

        self.linear = nn.Linear(in_features=self.hidden_units, out_features=1)

    def forward(self, x):
        batch_size = x.shape[0]

        # initialize the hidden and cell state of the LSTM layer
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_units).to(device).requires_grad_()

        _, hn = self.gru(x, h0)
        out = self.linear(hn[-1]).flatten()  # get the output of the last hidden layer
        return out


In [None]:
learning_rate = 5e-4
num_hidden_units = 60

model = ShallowRegressionGRU(num_features=len(features), hidden_units=num_hidden_units)
model.to(device)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
from torchinfo import summary
summary(model, input_size=(32, 60, 4))

## Train

In [None]:
from tqdm.notebook import tqdm

In [None]:
def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    total_loss = 0
    model.train()

    for X, y in data_loader:
        X = X.to(device)
        y = y.to(device)
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")

def test_model(data_loader, model, loss_function, best_val_loss):

    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            X = X.to(device)
            y = y.to(device)
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    if avg_loss < best_val_loss:
        best_val_loss = avg_loss
        torch.save(model.state_dict(), '../model/gru.pth')
        print('Save new best model')
    return best_val_loss

In [None]:
best_val_loss = torch.inf
for ix_epoch in tqdm(range(100)):
    print(f"Epoch {ix_epoch}\n---------")
    train_model(train_loader, model, loss_function, optimizer=optimizer)
    best_val_loss = test_model(val_loader, model, loss_function, best_val_loss)
    print()

## Evaluation

In [None]:
def predict(data_loader, model):
    """Just like `test_loop` function but keep track of the outputs instead of the loss
    function.
    """
    output = torch.tensor([])
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            X = X.to(device)
            y_star = model(X)
            output = torch.cat((output, y_star.detach().cpu()), 0)

    return output

In [None]:
PATH = './model_gru.pth'
model.load_state_dict(torch.load(PATH))

In [None]:
train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
df_train[ystar_col] = predict(train_eval_loader, model).numpy()
df_val[ystar_col] = predict(val_loader, model).numpy()
df_test[ystar_col] = predict(test_loader, model).numpy()

df_out = pd.concat((df_train, df_val, df_test))[[target, ystar_col]]

for c in df_out.columns:
    df_out[c] = df_out[c] * target_stdev + target_mean

print(df_out)

df_out.to_csv('../prediction/gru.csv')

In [None]:
import numpy as np
import math
from sklearn.metrics import mean_squared_error

def MAPE(Y_actual,Y_Predicted):
    mape = np.mean(np.abs((Y_actual - Y_Predicted)/Y_actual))*100
    return mape

print( 'MPAE =', MAPE(df_test['Open_lead1'], df_test['Model forecast']) )
print( 'RMSE =', math.sqrt(mean_squared_error(df_val['Open_lead1'], df_val['Model forecast'])) )

evaluate(df_test['Open_lead1'], df_test['Model forecast'], save=True, model_name="gru")

In [None]:
fig = px.line(df_out, labels={'value': "Open", 'created_at': 'Date'})
fig.add_vline(x=val_start, line_width=4, line_dash="dash")
fig.add_vline(x=test_start, line_width=4, line_dash="dash")
# fig.add_annotation(xref="paper", x=0.75, yref="paper", y=0.8, text="Test set start", showarrow=False)
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()

# ResNLS

**Data**


*   10 consecutive trading days as the unit step
*   clean


*   normalize it to eliminate scale differences
*   transform it into tensors

**Train** : closing price data of the SSE Composite
Index from January 1, 2011 to December 31, 2020

**Test** : closing price data from January 1, 2021 to December 31, 2021



**Conv**


*   filter = 64
*   kernel size = 3

*   applies ReLU as the activation
function
*   weight decay of 1e-5

**dropout layer** with a retain probability of 0.8

**LSTM**


*   hidden size = 32

**optimizer** Adam

batch size (64) with a larger initial
learning rate (1e-3) are preferred

epoch = 50



## Data

In [None]:
features

In [None]:
torch.manual_seed(101)
# aj
# batch_size = 32
# sequence_length = 60

# # best in paper
batch_size = 64
sequence_length = 5

train_dataset = SequenceDataset(
    df_train,
    target=target,
    features=features,
    sequence_length=sequence_length
)
val_dataset = SequenceDataset(
    df_val,
    target=target,
    features=features,
    sequence_length=sequence_length
)
test_dataset = SequenceDataset(
    df_test,
    target=target,
    features=features,
    sequence_length=sequence_length
)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

X, y = next(iter(train_loader))

print("Features shape:", X.shape)
print("Target shape:", y.shape)

## The model and learning algorithm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# input = (batch,10,4)
# output = (batch,10,4)
# filter of each conv = 64
# conv1 = (10) -> (64)
# conv2 = (64) -> (64)
# linear = (64) -> (10)
# shortcut = (10) -> (10)
class ResBlock(torch.nn.Module):
    def __init__(self, in_channels, out_channels, filter, kernel_size=3, stride=1, padding=1):
        super(ResBlock, self).__init__()
        self.conv1 = torch.nn.Conv1d(in_channels, filter, kernel_size, stride, padding)
        self.conv2 = torch.nn.Conv1d(filter, filter, kernel_size, stride, padding)
        self.linear = torch.nn.Linear(filter, out_channels)
        self.relu = torch.nn.ReLU()
        self.bn1 = torch.nn.BatchNorm1d(filter, eps=1e-5)
        self.bn2 = torch.nn.BatchNorm1d(filter, eps=1e-5)
        self.dropout = nn.Dropout(0.2)
        # intialise weights of the attention mechanism
        self.weight = nn.Parameter(torch.zeros(1)).to(device)


    def forward(self, x):
        out = self.conv1(x)
        out = self.relu(out)
        out = self.bn1(out)

        out = self.dropout(out)

        out = self.conv2(out)
        out = self.relu(out)
        out = self.bn2(out)

        out = out.view(out.size(0), -1)
        out = self.linear(out)
        out = self.weight*out
        out += x.view(x.size(0), -1)
        return out

# model = ResBlock(10, 10, 64)
# summary(model, input_size=(1, 10, 1))

In [None]:
class ResNLS(torch.nn.Module):
    def __init__(self, in_channels, out_channels, hidden_layer, num_layer, num_feature=4, filter=64, kernel_size=3):
        super(ResNLS, self).__init__()
        self.resblock1 = ResBlock(in_channels, in_channels, filter)
        self.resblock2 = ResBlock(in_channels, in_channels, filter)
        self.resblock3 = ResBlock(in_channels, in_channels, filter)
        self.resblock4 = ResBlock(in_channels, in_channels, filter)
        self.num_layers = num_layer
        self.hidden_layer = hidden_layer
        self.lstm = torch.nn.LSTM(num_feature, hidden_layer, num_layer, batch_first=True)
        self.linear = torch.nn.Linear(hidden_layer, out_channels)


    def forward(self, x):
        batch_size = x.size(0)
        out = torch.split(x, 1, 2)
        out1 = self.resblock1(out[0])
        out2 = self.resblock2(out[1])
        out3 = self.resblock3(out[2])
        out4 = self.resblock4(out[3])
        out1 = out1.unsqueeze(2)
        out2 = out2.unsqueeze(2)
        out3 = out3.unsqueeze(2)
        out4 = out4.unsqueeze(2)
        out = torch.cat((out1, out2, out3, out4), 2)
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_layer).to(device).requires_grad_()
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_layer).to(device).requires_grad_()
        _, (hn, _) = self.lstm(out, (h0, c0))
        out = self.linear(hn[-1]).flatten()
        return out

In [None]:
# Init from paper
num_consecutive_days = sequence_length
days_pred = 1
# num_lstm_hiddensize = 32
num_lstm_hiddensize = 64
# lstm_layers = 4 # from aj
# lstm_layers = 64 # from paper
lstm_layers = 1

num_features = len(features)
filters = 64  # Number of filters in convolutional layers
kernel_size = 3  # Kernel size for convolutional layers

model = ResNLS(num_consecutive_days,
               days_pred,
               num_lstm_hiddensize,
               lstm_layers,
               num_features,
               filters,
               kernel_size)
model.to(device)
summary(model, input_size=(batch_size, num_consecutive_days, num_features))

In [None]:
parameters_to_decay = []
for name, param in model.named_parameters():
  if 'conv1' in name or 'conv2' in name:
        parameters_to_decay.append(param)

weight_decay=1e-5
learning_rate = 1e-3 # Optimizer lr
# optimizer = torch.optim.Adam([{'params': parameters_to_decay, 'weight_decay': weight_decay}], lr=learning_rate)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)


loss_function = nn.MSELoss() # We use Mean Absolute Error (MAE), Mean Squared Error (MSE) and Root Mean Squared Error (RMSE)

In [None]:
# class RealResNLS(nn.Module):

#     def __init__(self):
#         super(ResNLS, self).__init__()

#         # intialise weights of the attention mechanism
#         self.weight = nn.Parameter(torch.zeros(1)).to(device)

#         # intialise cnn structure
#         self.cnn = nn.Sequential(
#             nn.Conv1d(in_channels=1, out_channels=n_hidden, kernel_size=3, stride=1, padding=1), # ((5 + 1*2 - 3)/1 + 1) = 5
#             nn.ReLU(inplace=True),
#             nn.BatchNorm1d(n_hidden, eps=1e-5),
#             nn.Dropout(0.1),

#             nn.Conv1d(in_channels=n_hidden, out_channels=n_hidden, kernel_size=3, stride=1, padding=1), # ((5 + 1*2 - 3)/1 + 1) = 5
#             nn.ReLU(inplace=True),
#             nn.BatchNorm1d(n_hidden, eps=1e-5),

#             nn.Flatten(),
#             nn.Linear(n_input * n_hidden, n_input)
#         )

#         # intialise lstm structure
#         self.lstm = nn.LSTM(n_input, n_hidden, batch_first=True, bidirectional=False)
#         self.linear = nn.Linear(n_hidden, 1)


#     def forward(self, x):

#         cnn_output = self.cnn(x)
#         cnn_output = cnn_output.view(-1, 1, n_input)

#         residuals = x + self.weight * cnn_output

#         _, (h_n, _)  = self.lstm(residuals)
#         y_hat = self.linear(h_n[0,:,:])

#         return y_hat

# optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

## Train

In [None]:
from tqdm.notebook import tqdm

def train_model(data_loader, model, loss_function, optimizer):
    num_batches = len(data_loader)
    total_loss = 0
    model.train()

    for X, y in data_loader:
        X = X.to(device)
        y = y.to(device)
        output = model(X)
        loss = loss_function(output, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / num_batches
    print(f"Train loss: {avg_loss}")

def test_model(data_loader, model, loss_function, best_val_loss):

    num_batches = len(data_loader)
    total_loss = 0

    model.eval()
    with torch.no_grad():
        for X, y in data_loader:
            X = X.to(device)
            y = y.to(device)
            output = model(X)
            total_loss += loss_function(output, y).item()

    avg_loss = total_loss / num_batches
    print(f"Test loss: {avg_loss}")
    if avg_loss < best_val_loss:
        best_val_loss = avg_loss
        torch.save(model.state_dict(), 'model.pth')
        print('Save new best model')
    return best_val_loss

In [None]:
best_val_loss = torch.inf
for ix_epoch in tqdm(range(100)):
    print(f"Epoch {ix_epoch}\n---------")
    train_model(train_loader, model, loss_function, optimizer=optimizer)
    best_val_loss = test_model(val_loader, model, loss_function, best_val_loss)
    print()

## Eval

In [None]:
def predict(data_loader, model):
    """Just like `test_loop` function but keep track of the outputs instead of the loss
    function.
    """
    output = torch.tensor([])
    model.eval()
    with torch.no_grad():
        for X, _ in data_loader:
            X = X.to(device)
            y_star = model(X)
            output = torch.cat((output, y_star.detach().cpu()), 0)

    return output

In [None]:
# PATH = './model_gru.pth'
# model.load_state_dict(torch.load(PATH))

In [None]:
train_eval_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

ystar_col = "Model forecast"
df_train[ystar_col] = predict(train_eval_loader, model).numpy()
df_val[ystar_col] = predict(val_loader, model).numpy()
df_test[ystar_col] = predict(test_loader, model).numpy()

df_out = pd.concat((df_train, df_val, df_test))[[target, ystar_col]]

for c in df_out.columns:
    df_out[c] = df_out[c] * target_stdev + target_mean

print(df_out)

df_out.to_csv('../prediction/resnls.csv')

In [None]:
def MASE(pred, y):
    pred = torch.tensor(pred)
    y = torch.tensor(y)
    return float(torch.mean(torch.abs(pred - y) / torch.mean(torch.abs(y[1:] - y[:-1]))))

def SMAPE(pred, y):
    pred = torch.tensor(pred)
    y = torch.tensor(y)
    return float(200 * torch.mean(torch.abs(pred - y) / (torch.abs(y) + torch.abs(pred))))

def MAE(pred, y):
    pred = torch.tensor(pred)
    y = torch.tensor(y)
    return float(torch.mean(torch.abs(pred - y)))

def sharp_ratio(pred, y):
    pred = torch.tensor(pred)
    y = torch.tensor(y)
    return float(torch.mean((pred - y) / torch.std(y)))

def directional_accuracy(Y_actual, Y_predicted):
    """
    Calculate the directional accuracy of predictions.

    Parameters:
        Y_actual (array-like): Array of actual stock prices.
        Y_predicted (array-like): Array of predicted stock prices.

    Returns:
        float: Directional accuracy percentage.
    """
    actual_changes = np.sign(np.diff(Y_actual))
    predicted_changes = np.sign(np.diff(Y_predicted))
    correct_predictions = np.sum(actual_changes == predicted_changes)
    total_predictions = len(actual_changes)
    directional_accuracy = correct_predictions / total_predictions * 100
    return directional_accuracy

def evaluate(y_true, y_pred, save=False, model_name="ModelName"):
    performance = {}
    performance['MAPE'] = MAPE(y_true, y_pred)
    performance['MASE'] = MASE(y_true, y_pred)
    performance['RMSE'] = math.sqrt(mean_squared_error(y_true, y_pred))
    performance['SMAPE'] = SMAPE(y_true, y_pred)
    performance['MAE'] = MAE(y_true, y_pred)
    performance['sharp_ratio'] = sharp_ratio(y_true, y_pred)
    performance['Directional Accuracy'] = directional_accuracy(y_true, y_pred)

    if save:
        pd.DataFrame(performance, index=[model_name]).to_csv(f'../performance/{model_name}.csv')
        
    return performance

print( 'MASE =', MASE(df_test['Open_lead1'], df_test['Model forecast']))
print( 'RMSE =', math.sqrt(mean_squared_error(df_test['Open_lead1'], df_test['Model forecast'])) )
print( 'SMAPE =', SMAPE(df_test['Open_lead1'], df_test['Model forecast']))
print( 'MAE =', MAE(df_test['Open_lead1'], df_test['Model forecast']))
print( 'sharp_ratio =', sharp_ratio(df_test['Open_lead1'], df_test['Model forecast']))
print('Directional Accuracy =', directional_accuracy(df_test['Open_lead1'], df_test['Model forecast']))

In [None]:
perf = evaluate(df_test['Open_lead1'], df_test['Model forecast'])
perf = pd.DataFrame(perf, index=['ResNLS'])
display(perf)

perf.to_csv('../performance/resnls.csv')

In [None]:
fig = px.line(df_out, labels={'value': "Open", 'created_at': 'Date'})
fig.add_vline(x=val_start, line_width=4, line_dash="dash")
fig.add_vline(x=test_start, line_width=4, line_dash="dash")
# fig.add_annotation(xref="paper", x=0.75, yref="paper", y=0.8, text="Test set start", showarrow=False)
fig.update_layout(
  template=plot_template, legend=dict(orientation='h', y=1.02, title_text="")
)
fig.show()