We bring the imports we need

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, Dense, GRU, SimpleRNN, Dropout
from keras.optimizers import Adam
from keras.callbacks import LearningRateScheduler


## Load dataset

We will use yfincance to get the data from *Microsoft*

In [None]:
end = datetime.now()
start = datetime(2019, end.month, end.day)
dataset = yf.download("MSFT", start, end)
dataset.shape


## Inspect dataset

Let's see the dataset that we have

In [None]:
dataset.tail()

Here we remove the last row and we save it in a new variable to use it to compare predicted price with the last price we have in the last row.

In [None]:
actual_close = dataset.iloc[-1]['Close']
dataset.drop(dataset.index[-1], inplace=True)



Let's check for null data

In [None]:
dataset.isna().sum()

We will check what type of data are the columns from dataset

In [None]:
dataset.info()

Let's check for duplicates

In [None]:
dataset.duplicated().sum()

Let's observe the dataset in a plot

In [None]:
plt.figure(figsize=(15, 5))
palette = sns.color_palette("husl", 5)
sns.lineplot(data=dataset[['Open', 'High', 'Low', 'Close', 'Adj Close']],palette=palette) 
plt.title('Stock Price')
plt.xlabel('Time [Days]')
plt.ylabel('Price')
plt.legend(labels=['Open', 'High', 'Low', 'Close', 'Adj Close'], loc='best')
plt.show()

In [None]:
original_dataset = dataset

## Transform the data

We normalize the data

In [None]:
# scaler = MinMaxScaler()
# columns_to_normalize = ['Open', 'High', 'Low', 'Adj Close', 'Volume']
# dataset[columns_to_normalize] = scaler.fit_transform(dataset[columns_to_normalize])


In [None]:
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(dataset['Close'].values.reshape(-1, 1))

Splitting the data
X_train, y_train: Training set, used to train your model.
X_val, y_val: Validation set, used to evaluate model performance during training.
X_test, y_test: Test set, used to evaluate the final performance of your trained model.

In [None]:
# X = dataset[['Open', 'High', 'Low', 'Adj Close']]
# y = dataset[['Close']]
# X_train, X_val_test, y_train, y_val_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# X_val, X_test, y_val, y_test = train_test_split(X_val_test, y_val_test, test_size=0.5, shuffle=False)


We sets the number of time steps to 10, meaning each sequence will contain 10 data points.
X_train_seq, y_train_seq = create_time_steps(X_train, time_steps): This creates sequences for the training data (X_train_seq contains input sequences, and y_train_seq contains corresponding labels).
X_val_seq, y_val_seq = create_time_steps(X_val, time_steps): This creates sequences for the validation data.
X_test_seq, y_test_seq = create_time_steps(X_test, time_steps): This creates sequences for the test data.

In [None]:
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data)-seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    return np.array(X), np.array(y)

# Create sequences with a length of 10 (you can adjust this)
sequence_length = 10
X, y = create_sequences(scaled_data, sequence_length)

# Split data into training and testing sets
train_size = int(len(X) * 0.8)
test_size = len(X) - train_size
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]


Learning rate

In [None]:
def create_lr_scheduler(initial_lr, decay_factor, decay_epochs):
    def lr_scheduler(epoch, lr):
        if epoch % decay_epochs == 0 and epoch != 0:
            lr = lr * decay_factor
        return lr
    return LearningRateScheduler(lr_scheduler)

# Example usage:
lr_callback = create_lr_scheduler(initial_lr=0.001, decay_factor=0.9, decay_epochs=15)


In [None]:
global_lr_callback = create_lr_scheduler(initial_lr=0.001, decay_factor=0.9, decay_epochs=20)

## Create Model RNN Sequential LSTM

We create a sequential model using an LSTM layer followed by a Dense output layer
we are predicting the future value of the 'Close' price of a stock based on past values of the 'Open', 'High', 'Low', and 'Adj Close' prices.

Input: Each input sequence consists of past values of 'Open', 'High', 'Low', and 'Adj Close' prices, represented as a sequence of 10 time steps.
Output: The model is trained to predict the next value of the 'Close' price following the input sequence.

In [None]:
model = Sequential([
    LSTM(units=128, activation='relu', input_shape=(sequence_length, 1)),
    Dense(units=1)

])

Define the optimizer and compile the model

We train the model :
X_train_seq: This is the input data that we use to train your model. Here X_train_seq contains sequences of past values of 'Open', 'High', 'Low', and 'Adj Close' prices, and each sequence has a length of 10 time steps.
y_train_seq: This is the target data that we try to predict during training. In this case, y_train_seq contains the next value of the 'Close' price corresponding to each sequence in X_train_seq.

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
model.fit(X_train, y_train, batch_size=32, epochs=100, callbacks=[global_lr_callback])

In [None]:
# Evaluate the model
train_loss = model.evaluate(X_train, y_train, verbose=0)
test_loss = model.evaluate(X_test, y_test, verbose=0)

print(f'Training Loss: {train_loss}')
print(f'Test Loss: {test_loss}')

Inverse transform the predictions

In [None]:
# Predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

# Inverse transform the predictions
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)

# Inverse transform the actual values
y_train_inv = scaler.inverse_transform(y_train.reshape(-1, 1))
y_test_inv = scaler.inverse_transform(y_test.reshape(-1, 1))

We plot the prediction and the initial data

In [None]:
# Plotting
plt.figure(figsize=(14, 7))
#plt.plot(dataset.index[:len(y_train_inv)], y_train_inv, label='Actual (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(y_test_inv)], y_test_inv, label='Actual (Testing)')
#plt.plot(dataset.index[:len(train_predict)], train_predict, label='Predicted (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(test_predict)], test_predict, label='Predicted (Testing)')
plt.title('Microsoft Stock Price Prediction using LSTM')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.legend()
plt.show()

## GRU model

We will create a GRU model to see how it will perform

In [None]:
gru_model = Sequential([
    GRU(units=128, activation='relu', input_shape=(sequence_length, 1)),
    Dense(units=1)
])

Define optimizer and compile the model

Model training

In [None]:
# Compile the model
gru_model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
gru_model.fit(X_train, y_train, batch_size=32, epochs=100, callbacks=[global_lr_callback])

In [None]:
# Evaluate the model
train_loss = gru_model.evaluate(X_train, y_train, verbose=0)
test_loss = gru_model.evaluate(X_test, y_test, verbose=0)

print(f'Training Loss: {train_loss}')
print(f'Test Loss: {test_loss}')

Scale the data to initial state

In [None]:
# Predictions
gru_train_predict = gru_model.predict(X_train)
gru_test_predict = gru_model.predict(X_test)

# Inverse transform the predictions
gru_train_predict = scaler.inverse_transform(gru_train_predict)
gru_test_predict = scaler.inverse_transform(gru_test_predict)

# Inverse transform the actual values
y_train_inv = scaler.inverse_transform(y_train.reshape(-1, 1))
y_test_inv = scaler.inverse_transform(y_test.reshape(-1, 1))

Plot the prediction and the initial data

In [None]:
# Plotting
plt.figure(figsize=(14, 7))
#plt.plot(dataset.index[:len(y_train_inv)], y_train_inv, label='Actual (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(y_test_inv)], y_test_inv, label='Actual (Testing)')
#plt.plot(dataset.index[:len(train_predict)], train_predict, label='Predicted (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(gru_test_predict)], gru_test_predict, label='Predicted (Testing)')
plt.title('Microsoft Stock Price Prediction using GRU')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.legend()
plt.show()

Actual price from today compared with both models

In [None]:
last_predicted_close_lstm = test_predict[-1][0]
last_predicted_close_gru = gru_test_predict[-1][0]

print("Actual price: ", actual_close)
print("LSTM Prediction: ", last_predicted_close_lstm)
print("GRU Prediction", last_predicted_close_gru)

## RNN Simple model

In [None]:
rnn_model = Sequential([
    SimpleRNN(units=128, activation='relu', input_shape=(sequence_length, 1)),
    Dense(units=1)
])

In [None]:
# Compile the model
rnn_model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
rnn_model.fit(X_train, y_train, batch_size=32, epochs=100, callbacks=[global_lr_callback])

In [None]:
# Evaluate the model
train_loss = rnn_model.evaluate(X_train, y_train, verbose=0)
test_loss = rnn_model.evaluate(X_test, y_test, verbose=0)

print(f'Training Loss: {train_loss}')
print(f'Test Loss: {test_loss}')

In [None]:
# Predictions
rnn_train_predict = rnn_model.predict(X_train)
rnn_test_predict = rnn_model.predict(X_test)

# Inverse transform the predictions
rnn_train_predict = scaler.inverse_transform(rnn_train_predict)
rnn_test_predict = scaler.inverse_transform(rnn_test_predict)

# Inverse transform the actual values
y_train_inv = scaler.inverse_transform(y_train.reshape(-1, 1))
y_test_inv = scaler.inverse_transform(y_test.reshape(-1, 1))

In [None]:
# Plotting
plt.figure(figsize=(14, 7))
#plt.plot(dataset.index[:len(y_train_inv)], y_train_inv, label='Actual (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(y_test_inv)], y_test_inv, label='Actual (Testing)')
#plt.plot(dataset.index[:len(train_predict)], train_predict, label='Predicted (Training)')
plt.plot(dataset.index[len(y_train_inv):len(y_train_inv)+len(rnn_test_predict)], rnn_test_predict, label='Predicted (Testing)')
plt.title('Microsoft Stock Price Prediction using RNN Simple')
plt.xlabel('Date')
plt.ylabel('Stock Price')
plt.legend()
plt.show()

In [None]:
last_predicted_close_lstm = test_predict[-1][0]
last_predicted_close_gru = gru_test_predict[-1][0]
last_predicted_close_rnn = rnn_test_predict[-1][0]

print("Actual price: ", actual_close)
print("LSTM Prediction: ", last_predicted_close_lstm)
print("GRU Prediction: ", last_predicted_close_gru)
print("RNN Prediction: ", last_predicted_close_rnn)

In [None]:
# To predict for the next 10 days, you'll need the last n_past days of data
last_sequence = X_test[-1]

# Reshape the last_sequence to match the input shape of the model
last_sequence = last_sequence.reshape(1, sequence_length, 1)

# Generate predictions for the next 10 days
predictions_next_10_days = []
for _ in range(10):
    next_day_prediction = model.predict(last_sequence, verbose=0)
    predictions_next_10_days.append(next_day_prediction[0, 0])  # Get the predicted value
    last_sequence = np.roll(last_sequence, -1, axis=1)  # Shift the sequence by one day
    last_sequence[0, -1, 0] = next_day_prediction  # Update the last element with the new prediction

# Transform the predictions back to the original scale
predictions_next_10_days = scaler.inverse_transform(np.array(predictions_next_10_days).reshape(-1, 1))

# Print the predictions for the next 10 days
print("Predictions for the next 10 days:")
for i, prediction in enumerate(predictions_next_10_days, start=1):
    print(f"Day {i}: Predicted Price = {prediction[0]}")