In [143]:
from IPython.core.debugger import set_trace

%load_ext nb_black

import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import time

plt.style.use(style="seaborn")
%matplotlib inline

ModuleNotFoundError: No module named 'nb_black'

In [None]:
df = pd.read_csv("data/MSFT-1Y-Hourly.csv")

In [None]:
df.info()

In [None]:
df.set_index("date", drop=True, inplace=True)

We'll use only the close feature.

In [None]:
df = df[["close"]]

In [None]:
type(df["close"]) == type(df[["close"]])

In [None]:
type(df[["close"]])

In [None]:
df.describe()

In [None]:
plt.figure(1, figsize=(16, 6))
_ = plt.plot(df.close)

Calculate the percentage change.

The reason for using pct_change instead of the prices is the benefit of normalization as we can measure all variables in a comparable metric. Also returns have more manageable statistical properties than prices such as stationarity, as in most cases we don't have stationary prices but we can have stationary returns.

A stationary time series is one where statistical properties such as mean, variance, correlation, etc are constant over time.
For more details on stationarity: https://en.wikipedia.org/wiki/Stationary_process

In [None]:
df["returns"] = df.close.pct_change()

In [None]:
df.head(5)

In [None]:
134.75 / 132.89 - 1

Calculate the log returns.

Why use log returns: https://quantivity.wordpress.com/2011/02/21/why-log-returns/

In [None]:
df["log_returns"] = np.log(1 + df["returns"])

In [None]:
df.head(5)

In [None]:
plt.figure(1, figsize=(16, 4))
plt.plot(df.log_returns)

In [None]:
df.dropna(inplace=True)
X = df[["close", "log_returns"]].values

In [None]:
X

In [None]:
from sklearn.preprocessing import MinMaxScaler

In [None]:
scaler = MinMaxScaler(feature_range=(0, 1)).fit(X)
X_scaled = scaler.transform(X)

In [None]:
X_scaled[:5]

In [None]:
y = [x[0] for x in X_scaled]

In [None]:
y[:5]

#### Train test split

In [None]:
split = int(len(X_scaled) * 0.8)
print(split)

In [None]:
X_train = X_scaled[:split]
X_test = X_scaled[split : len(X_scaled)]
y_train = y[:split]
y_test = y[split : len(y)]

In [None]:
assert len(X_train) == len(y_train)
assert len(X_test) == len(y_test)

#### Labeling

We want to predict the stock price at a future time. We'll predict the stock price at time t + 1 relative to the stock price at time t.

As we're going to use an LSTM architecture, we know that it has memory and that it is maintained by setting the time step, basically how many steps in the past we want the LSTM to use.

The time step  refers to how many steps in time we want the backpropagation algorithm to use when calculating gradients for weight updates during training.

So we can use a method to create both the timestep and the output variable, the t + 1.

In [None]:
n = 3
Xtrain = []
ytrain = []
Xtest = []
ytest = []
for i in range(n, len(X_train)):
    Xtrain.append(X_train[i - n : i, : X_train.shape[1]])
    ytrain.append(y_train[i])  # predict next record
for i in range(n, len(X_test)):
    Xtest.append(X_test[i - n : i, : X_test.shape[1]])
    ytest.append(y_test[i])  # predict next record

In [None]:
df.head(5)

In [None]:
Xtrain[0]

In [None]:
ytrain[0]

In [None]:
val = np.array(ytrain[0])
val = np.c_[val, np.zeros(val.shape)]

In [None]:
scaler.inverse_transform(val)

In an LSTM network the input for each LSTM layer needs to contain the following information:
- The number of observations
- The time steps
- The features

Therefore we need to add a temporal dimension compared to a classical network:

(number of observations, number of steps, number of features per step)

In [None]:
Xtrain, ytrain = (np.array(Xtrain), np.array(ytrain))
Xtrain = np.reshape(Xtrain, (Xtrain.shape[0], Xtrain.shape[1], Xtrain.shape[2]))

Xtest, ytest = (np.array(Xtest), np.array(ytest))
Xtest = np.reshape(Xtest, (Xtest.shape[0], Xtest.shape[1], Xtest.shape[2]))

In [None]:
print(Xtrain.shape)
print(ytrain.shape)
print("---")
print(Xtest.shape)
print(ytest.shape)

#### LSTM Model

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense

In [None]:
model = Sequential()
model.add(LSTM(4, input_shape=(Xtrain.shape[1], Xtrain.shape[2])))
model.add(Dense(1))
model.compile(loss="mean_squared_error", optimizer="adam")
model.fit(
    Xtrain, ytrain, epochs=100, validation_data=(Xtest, ytest), batch_size=16, verbose=1
)

In [None]:
model.summary()

In [None]:
trainPredict = model.predict(Xtrain)
testPredict = model.predict(Xtest)



In [None]:
trainPredict = np.c_[trainPredict, np.zeros(trainPredict.shape)]
testPredict = np.c_[testPredict, np.zeros(testPredict.shape)]

In [None]:
# invert predictions
trainPredict = scaler.inverse_transform(trainPredict)
trainPredict = [x[0] for x in trainPredict]

testPredict = scaler.inverse_transform(testPredict)
testPredict = [x[0] for x in testPredict]


In [None]:
print(trainPredict[:5])
print(testPredict[:5])

In [None]:
from sklearn.metrics import mean_squared_error

In [None]:
# calculate root mean squared error
trainScore = mean_squared_error([x[0][0] for x in Xtrain], trainPredict, squared=False)
print("Train Score: %.2f RMSE" % (trainScore))

testScore = mean_squared_error([x[0][0] for x in Xtest], testPredict, squared=False)
print("Test Score: %.2f RMSE" % (testScore))