In [None]:
pip install -r requirements.txt

In [None]:
import numpy as np
import random
import pandas as pd
from pylab import mpl, plt
mpl.rcParams['font.family'] = 'serif'

import math, time
import itertools
import datetime
from operator import itemgetter
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from math import sqrt
import torch
import torch.nn as nn
from torch.autograd import Variable

#verify that data exists in corredt directory
import os
for dirname, _, filenames in os.walk('data'):
    for i, filename in enumerate(filenames):
        if i<5:
            print(os.path.join(dirname,filename))

In [None]:
def stocks_data(symbols):
    dfs = []
    for symbol in symbols:
        df_temp = pd.read_csv(f"data/{symbol}:US.csv", parse_dates=['Date'], index_col='Date')
        df_temp = df_temp[['Close($)']].rename(columns={'Close($)': symbol})
        dfs.append(df_temp)
    return pd.concat(dfs, axis=1)

In [None]:
symbols = ['AAPL']
df = stocks_data(symbols)

plt.figure(figsize=(10, 6))
df[symbols[0]].plot()
plt.xlabel('Date')
plt.ylabel('Close Price ($)')
plt.title(f'Close Price for {symbols[0]}')
plt.show()

df.head()

In [None]:
df_aapl=pd.read_csv("data/AAPL:US.csv", parse_dates=True, index_col=0)
df_aapl[['Close($)']].plot(figsize=(15, 6))
plt.ylabel("stock_price")
plt.title("AAPL Stock")
plt.show()

In [None]:
df_aapl = df_aapl[['Close($)']]
df_aapl.info()

In [None]:
df_aapl=df_aapl.fillna(method='ffill')

scaler = MinMaxScaler(feature_range=(-1, 1))
df_aapl['Close($)'] = scaler.fit_transform(df_aapl['Close($)'].values.reshape(-1,1))

In [None]:
def load_data(stock, look_back):
    data_raw = stock.values # convert to numpy array
    data = []
    
    # create all possible sequences of length look_back
    for index in range(len(data_raw) - look_back): 
        data.append(data_raw[index: index + look_back])
    
    data = np.array(data);
    test_set_size = int(np.round(0.2*data.shape[0]));
    train_set_size = data.shape[0] - (test_set_size);
    
    x_train = data[:train_set_size,:-1,:]
    y_train = data[:train_set_size,-1,:]
    
    x_test = data[train_set_size:,:-1]
    y_test = data[train_set_size:,-1,:]
    
    return [x_train, y_train, x_test, y_test]

look_back = 4 # choose sequence length
x_train, y_train, x_test, y_test = load_data(df_aapl, look_back)
print('x_train.shape = ',x_train.shape)
print('y_train.shape = ',y_train.shape)
print('x_test.shape = ',x_test.shape)
print('y_test.shape = ',y_test.shape)

In [None]:
x_train = torch.from_numpy(x_train).type(torch.Tensor)
x_test = torch.from_numpy(x_test).type(torch.Tensor)
y_train = torch.from_numpy(y_train).type(torch.Tensor)
y_test = torch.from_numpy(y_test).type(torch.Tensor)


In [None]:
y_train.size(),x_train.size()

In [None]:
input_dim = 1
hidden_dim = 32
num_layers = 2 
output_dim = 1

import torch
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTM, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()

        out, _ = self.lstm(x, (h0.detach(), c0.detach()))
        out = out[:, -1, :]
        out = self.fc(out)

        return out
    
model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)

loss_fn = torch.nn.MSELoss()

optimiser = torch.optim.Adam(model.parameters(), lr=0.01)
print(model)
print(len(list(model.parameters())))
for i in range(len(list(model.parameters()))):
    print(list(model.parameters())[i].size())

In [None]:
num_epochs = 600
hist = np.zeros(num_epochs)
seq_dim =look_back-1  

for t in range(num_epochs):
    y_train_pred = model(x_train)

    loss = loss_fn(y_train_pred, y_train)
    if t % 10 == 0 and t !=0:
        print("Epoch ", t, "MSE: ", loss.item())
    hist[t] = loss.item()

    optimiser.zero_grad()
    loss.backward()
    optimiser.step()

In [None]:
plt.plot(hist, label="Training loss")
plt.legend()
plt.show()
np.shape(y_train_pred)

In [None]:
with torch.no_grad():
    y_train_pred = model(x_train)
    y_test_pred = model(x_test)

y_train_pred = scaler.inverse_transform(y_train_pred.detach().numpy())
y_test_pred = scaler.inverse_transform(y_test_pred.detach().numpy())
y_train = scaler.inverse_transform(y_train.detach().numpy())
y_test = scaler.inverse_transform(y_test.detach().numpy())

trainScore = math.sqrt(mean_squared_error(y_train[:, 0], y_train_pred[:, 0]))
testScore = math.sqrt(mean_squared_error(y_test[:, 0], y_test_pred[:, 0]))

x_full = torch.cat((x_train, x_test), dim=0)  # Combine train and test data
with torch.no_grad():
    y_full_pred = model(x_full)

y_full_pred = scaler.inverse_transform(y_full_pred.detach().numpy())

print('Train Score: %.2f RMSE' % (trainScore))
print('Test Score: %.2f RMSE' % (testScore))



In [None]:
figure, axes = plt.subplots(figsize=(15, 6))
axes.xaxis_date()

axes.plot(df_aapl.index, scaler.inverse_transform(df_aapl[['Close($)']]), color='red', label='Actual Apple Stock Price')
axes.plot(df_aapl.index[look_back:len(y_full_pred)+look_back], y_full_pred, color='blue', label='Predicted Apple Stock Price')

plt.title('Apple Stock Price Prediction')
plt.xlabel('Time')
plt.ylabel('Apple Stock Price')
plt.legend()
plt.savefig('AApred.png')
plt.show()