In [None]:
import math
import time
import torch

import yfinance as yf
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from torch.autograd import Variable

torch.manual_seed(1)

In [None]:
def data_preprocessing(df):
    df = df.fillna(method='ffill')
    scaler = MinMaxScaler(feature_range=(-1, 1))
    df['Close'] = scaler.fit_transform(df['Close'].values.reshape(-1, 1))
    return df

def load_data(stock, look_back):
    data_raw = stock.values
    data = []
    
    # Split data into sequences (move 1 each time)
    for index in range(len(data_raw)-look_back):
        data.append(data_raw[index: index + look_back])
    
    data = np.array(data)
    dt_size = data.shape[0]
    test_set_size = int(np.round(0.2 * dt_size))
    train_set_size = dt_size - (test_set_size)
    
    # Split by batch, for each sequence, choose the last as y
    # Shape = [batches_num, lookback_len, 1]
    x_train = data[:train_set_size, :-1, :]
    y_train = data[:train_set_size, -1, :]
    
    x_test = data[train_set_size:, :-1, :]
    y_test = data[train_set_size:, -1, :]
    
    return [x_train, y_train, x_test, y_test]

In [None]:
df_hsi = stock_data[['Close']]
df_hsi = data_preprocessing(df_hsi)

look_back = 60 # choose sequence length
x_train, y_train, x_test, y_test = load_data(df_hsi, look_back)

# make training and test sets in torch
x_train = torch.from_numpy(x_train).type(torch.Tensor).to(device='cuda')
x_test = torch.from_numpy(x_test).type(torch.Tensor).to(device='cuda')
y_train = torch.from_numpy(y_train).type(torch.Tensor).to(device='cuda')
y_test = torch.from_numpy(y_test).type(torch.Tensor).to(device='cuda')

print('x_train.shape = ', x_train.shape)
print('y_train.shape = ', y_train.shape)
print('x_test.shape = ', x_test.shape)
print('y_test.shape = ', y_test.shape)

In [None]:
class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTM, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim
        # Number of hidden layers
        self.num_layers = num_layers
        
        # batch_first=True causes input/output tensors to be of shape
        # (batch_dim, seq_dim, feature_dim)
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        # Readout layer
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        # Initialize hidden state with zeros
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_().to(device='cuda')
        # Initialize cell state
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_().to(device='cuda')
        # We need to detach as we are doing truncated backpropagation through time (BPTT)
        # If we don't, we'll backprop all the way to the start even after going through another batch
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        
        # Index hidden state of last time step
        # out.size() --> 100, 32, 100
        # out[:, -1, :] --> 100, 100 --> just want last time step hidden states! 
        out = self.fc(out[:, -1, :])
        return out

In [None]:
input_dim = 1
hidden_dim = 32
num_layers = 2 
output_dim = 1

model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers).cuda()
loss_fn = torch.nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=0.01)

In [None]:
print(model)
print(len(list(model.parameters())))
for i in range(len(list(model.parameters()))):
    print(list(model.parameters())[i].size())

In [None]:
"""
在pytorch里我算的是(正向)h_n[-2,:,:]=output[:,-1,:hiddensize], 
（反向）h_n[-1,:,:]=output[:,0,-hiddensize:]，
如果单纯取output[:,-1,:]确实只包含了最后一个时间的输出，
这对于正向是正确的，但对反向有问题。
而正确的做法是取output[:,-1,:hiddensize]为正向最后一个时间的输出，
以及output[:,0,-hiddensize:]为反向最后一个时间（即正向第一个时间）的输出
"""


In [None]:
# Train model
#####################
num_epochs = 100
hist = np.zeros(num_epochs)

# Number of steps to unroll
seq_dim =look_back-1  

for t in range(num_epochs):
    # Initialise hidden state
    # Don't do this if you want your LSTM to be stateful
    #model.hidden = model.init_hidden()
    
    # Forward pass
    y_train_pred = model(x_train)

    loss = loss_fn(y_train_pred, y_train)
    if t % 10 == 0 and t !=0:
        print("Epoch ", t, "MSE: ", loss.item())
    hist[t] = loss.item()

    # Zero out gradient, else they will accumulate between epochs
    optimiser.zero_grad()
    
    # Backward pass
    loss.backward()

    # Update parameters
    optimiser.step()

In [None]:
plt.plot(hist, label="Training loss")
plt.legend()
plt.show()

In [None]:
# make predictions
y_test_pred = model(x_test)

# invert predictions
y_train_pred = scaler.inverse_transform(y_train_pred.detach().cpu().numpy())
y_train = scaler.inverse_transform(y_train.detach().cpu().numpy())
y_test_pred = scaler.inverse_transform(y_test_pred.detach().cpu().numpy())
y_test = scaler.inverse_transform(y_test.detach().cpu().numpy())

In [None]:
from sklearn.metrics import mean_squared_error

# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(y_train[:,0], y_train_pred[:,0]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(y_test[:,0], y_test_pred[:,0]))
print('Test Score: %.2f RMSE' % (testScore))

In [None]:
# Visualising the results
figure, axes = plt.subplots(figsize=(15, 6))
axes.xaxis_date()

axes.plot(df_hsi[len(df_hsi)-len(y_test):].index, y_test, color = 'red', label = 'Real HSI Stock Price')
axes.plot(df_hsi[len(df_hsi)-len(y_test):].index, y_test_pred, color = 'blue', label = 'Predicted HSI Stock Price')
plt.title('HSI Stock Price Prediction')
plt.xlabel('Time')
plt.ylabel('HSI Stock Price')
plt.legend()
plt.show()