In [None]:
import numpy as np
import pandas as pd
from keras import Sequential
from keras.src.layers import LSTM, Dense
from keras.src.optimizers import Adam
from sklearn.preprocessing import StandardScaler
import plotly.graph_objects as go
import plotly.express as px

In [None]:
df = pd.read_csv("SBUX.csv")
df = df.drop("Adj Close", axis=1)
df.columns = [name.lower() for name in df.columns]
df = df.loc[len(df) // 1.5:len(df) // 1.2]
df.head(10)

In [None]:
# Start with only 1 price (close)
# This is wrong method!
series = df['close'].values.reshape(-1, 1)
N = len(series)

In [None]:
ss = StandardScaler()
ss.fit(series[-N // 2:])
series = ss.transform(series).flatten()

In [None]:
T = 10
D = 1
X = []
Y = []

for i in range(N - T):
    x = series[i:i + T]
    y = series[i + T]
    X.append(x)
    Y.append(y)

X = np.array(X).reshape(-1, T, D)
Y = np.array(Y)
N = len(X)
print(f"X.shape: {X.shape}\nY.shape: {Y.shape}")

In [None]:
# Autoregressive RNN model
model = Sequential()
model.add(LSTM(units=5, input_shape=(T, D)))
model.add(Dense(units=1))

model.compile(loss='mse', optimizer=Adam(learning_rate=0.1))

r = model.fit(X[:-N // 2], Y[:-N // 2], epochs=80, validation_data=(X[-N // 2:], Y[-N // 2:]), batch_size=16)

In [None]:
def draw_loss(history):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=history.epoch, y=history.history['loss'], name='loss'))
    fig.add_trace(go.Scatter(x=history.epoch, y=history.history['val_loss'], name='val_loss'))
    fig.show()


draw_loss(r)

In [None]:
# One-step forecast
outputs = model.predict(X)

fig = px.line(Y)
fig.add_scatter(y=outputs.ravel())
fig.show()

In [None]:
# Multistep forecast
predictions = []
last_x = X[-N // 2]

while len(predictions) < 750:
    p = model.predict(last_x.reshape(-1, T, D), verbose=0)[0, 0]
    predictions.append(p)
    last_x = np.roll(last_x, -1)
    last_x[-1] = p

fig = px.line(Y[-N // 2:])
fig.add_scatter(y=predictions)
fig.show()

In [None]:
# Find Stock Return
df['prev_close'] = df['close'].shift(1)
df['return'] = (df['close'] - df['prev_close']) / df['prev_close']
px.histogram(df['return'])

In [None]:
# Try to predict Stock Return
series = df['return'].values[1:].reshape(-1, 1)
ss = StandardScaler()
ss.fit(series[: - len(series) // 2])
series = ss.transform(series).flatten()

In [None]:
T = 10
D = 1
X = []
Y = []

for i in range(len(series) - T):
    x = series[i:i + T]
    y = series[i + T]
    X.append(x)
    Y.append(y)

X = np.array(X).reshape(-1, T, D)
Y = np.array(Y)
N = len(X)

In [None]:
# Autoregressive RNN model
model = Sequential()
model.add(LSTM(units=5, input_shape=(T, D)))
model.add(Dense(units=1))

model.compile(loss='mse', optimizer=Adam(learning_rate=0.01))

r = model.fit(X[:-N // 2], Y[:-N // 2], epochs=80, validation_data=(X[-N // 2:], Y[-N // 2:]), batch_size=16)

In [None]:
# One-step forecast
outputs = model.predict(X)

fig = px.line(outputs.ravel())
fig.add_scatter(y=Y, name='true')
fig.show()

In [None]:
# Multistep forecast
predictions = []
last_x = X[-N // 2]

while len(predictions) < 750:
    p = model.predict(last_x.reshape(-1, T, D), verbose=0)[0, 0]
    predictions.append(p)
    last_x = np.roll(last_x, -1)
    last_x[-1] = p

fig = px.line(predictions)
fig.add_scatter(y=Y[-N // 2:], name='true')
fig.show()

In [None]:
# Now, we will change this regression problem into binary classification
# Model will predict only income or loss
data = df[['open', 'high', 'low', 'close', 'volume']].values
labels = df["return"].values
labels

In [None]:
T = 10
D = data.shape[1]
N = len(data) - T

In [None]:
train_N = len(data) * 2 // 3
ss = StandardScaler()
ss.fit(data[:train_N + T])
data = ss.transform(data)

In [None]:
X_train = np.zeros((train_N, T, D))
Y_train = np.zeros(train_N)

for t in range(train_N):
    X_train[t, :, :] = data[t: t + T]
    Y_train[t] = (labels[t + T] > 0)

In [None]:
X_test = np.zeros((N - train_N, T, D))
Y_test = np.zeros(N - train_N)

for u in range(N - train_N):
    # This is last 1/3 of data
    t = train_N + u
    X_test[u, :, :] = data[t: t + T]
    Y_test[u] = (labels[t + T] > 0)

In [None]:
model = Sequential()
model.add(LSTM(units=50, input_shape=(T, D)))
model.add(Dense(units=1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])

In [None]:
r = model.fit(X_train, Y_train, epochs=300, batch_size=32, validation_data=(X_test, Y_test))

In [None]:
def draw_accuracy(history):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=history.epoch, y=history.history['accuracy'], name='accuracy'))
    fig.add_trace(go.Scatter(x=history.epoch, y=history.history['val_accuracy'], name='val_accuracy'))
    fig.show()


draw_loss(r)
draw_accuracy(r)