## LSTM sample

In [None]:
symbol = "TSLA"

### imports

In [None]:
from datetime import datetime, date, timedelta
import alpaca_trade_api as tradeapi
from typing import List, Tuple
import numpy as np
from pandas import DataFrame as df
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from fastai.data.all import *
import matplotlib.pyplot as plt

### Load data

In [None]:
api = tradeapi.REST(base_url="https://api.alpaca.markets")
ohlc_data = api.polygon.historic_agg_v2(
    symbol,
    1,
    "minute",
    _from=str(date.today() - timedelta(days=9)),
    to=str(date.today()),
).df

In [None]:
ohlc_data.shape

In [None]:
x_dates = ohlc_data.index
ohlc_data['date'] = np.arange(start = 0, stop = len(ohlc_data), step = 1, dtype='int')
ohlc_data=ohlc_data.set_index(ohlc_data.date)

In [None]:
ohlc_data.close.plot()
plt.show()

### Pre-processing

In [None]:
scaler = MinMaxScaler(feature_range=(-1, 1))
#ohlc_data['reshaped'] = scaler.fit_transform(ohlc_data.close.values.reshape(-1, 1)).flatten()
open = scaler.fit_transform(ohlc_data.open.values.reshape(-1, 1)).flatten()
high = scaler.fit_transform(ohlc_data.high.values.reshape(-1, 1)).flatten()
low = scaler.fit_transform(ohlc_data.low.values.reshape(-1, 1)).flatten()
close = scaler.fit_transform(ohlc_data.close.values.reshape(-1, 1)).flatten()

plt.figure(figsize=(10, 4))
df(close).plot()

## Training

In [None]:
test_data_size = 120

train_data_close = close[2:-test_data_size]
test_data = close[-test_data_size:]
test_data_normalized = torch.FloatTensor(test_data).view(-1)
test_data_normalized.shape

In [None]:
train_data_close = pd.Series(train_data_close.tolist())
train_data_normalized = torch.FloatTensor(train_data_close)
train_data_normalized.shape

In [None]:
train_window = 60

def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L - tw):
        train_seq = input_data[i : i + tw]
        train_label = input_data[i + tw : i + tw + 1]
        inout_seq.append((train_seq, train_label))
    return inout_seq

In [None]:
train_in_seq = create_inout_sequences(train_data_normalized, train_window)

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_size=1, hidden_layer_size=100, output_size=1):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size

        self.lstm = nn.LSTM(input_size, hidden_layer_size , num_layers=2,dropout=0.5,bidirectional=False)

        self.linear = nn.Linear(hidden_layer_size, output_size)

        self.hidden_cell = (torch.zeros(2,1,self.hidden_layer_size),
                            torch.zeros(2,1,self.hidden_layer_size))

    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq) ,1, 1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions[-1]

In [None]:
model = LSTM()
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, amsgrad=True)

In [None]:
model

### Train

In [None]:
epochs = 100

for i in range(epochs):
    for seq, labels in train_in_seq:
        optimizer.zero_grad()
        model.hidden_cell = (
            torch.zeros(2, 1, model.hidden_layer_size),
            torch.zeros(2, 1, model.hidden_layer_size),
        )

        y_pred = model(seq)

        single_loss = loss_function(y_pred, labels)
        single_loss.backward()
        optimizer.step()

    if i % 25 == 1:
        print(f"epoch: {i:3} loss: {single_loss.item():10.8f}")

print(f"epoch: {i:3} loss: {single_loss.item():10.10f}")

## Predicting

In [None]:
test_inputs = test_data_normalized.tolist()
test= df(test_data_normalized)
test.plot()

In [None]:
model.eval()

In [None]:
model.eval()

train_data_normalized = torch.FloatTensor(train_data_close)
train_data_normalized.shape

predicted = test_data_normalized[:train_window].tolist()
print(len(predicted))
for i in range(test_data_size - train_window):
    seq = torch.FloatTensor(predicted[-train_window:])
    with torch.no_grad():
        model.hidden_cell = (
            torch.zeros(2, 1, model.hidden_layer_size),
            torch.zeros(2, 1, model.hidden_layer_size),
        )
        item = model(seq).item()
        predicted.append(item)

In [None]:
predicted = df(predicted)

In [None]:
predicted

In [None]:
plt.title(f'{symbol} LSTM predictions')
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(test)
plt.plot(predicted)
plt.show()