<a href="https://colab.research.google.com/github/DavisRayM/msft-stock-prediction/blob/main/msft-prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 4

Author: Davis Muro

For CPSC 5610

Microsoft Stock Prediction

In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
np.random.seed(25)
tf.random.set_seed(25)

In [None]:
df = pd.read_csv('data/MSFT.csv')
df.head(5)

## Preprocessing

In [None]:
# Convert `Date` to `DateTime`
df['Date'] = pd.to_datetime(df['Date'])

# Sort dataframe by `Date` (Ascending)
df.sort_values(by='Date', inplace=True, ascending=True)

# Drop Adj Close
df.drop(columns=['Adj Close'], inplace=True)

df.head(5)

In [None]:
from sklearn.preprocessing import MinMaxScaler

# Normalize numerical columns using MinMaxScaler
scaler = MinMaxScaler()
df[['Close', 'High', 'Low', 'Open', 'Volume']] = scaler.fit_transform(df[['Close', 'High', 'Low', 'Open', 'Volume']])

df.head(5)

In [None]:
# Set `Date` as index; It's unique
if (df.duplicated(subset=['Date']).sum() > 0):
    assert False, "Duplicate dates found"

df.set_index('Date', inplace=True)
df.head(5)

In [None]:
df.shape

## Windowing

In [None]:
train_size = df.shape[0] * 80 // 100
test_size = df.shape[0] - train_size
train_size, test_size

In [None]:
# Use a sliding window of N days (e.g., 20 days) to predict the next dayʼs values.
train_data = tf.data.Dataset.from_tensor_slices(df.values[:train_size])
test_data = tf.data.Dataset.from_tensor_slices(df.values[:test_size])
n_steps = 20
window_length = n_steps + 1
train_data = train_data.window(window_length, shift =1, drop_remainder=True )
test_data = test_data.window(window_length, shift =1, drop_remainder=True )

In [None]:
for window in train_data.take(2):
    t = list(window.as_numpy_iterator())
    print(len(t))
    print(t)

In [None]:
for window in test_data.take(2):
    t = list(window.as_numpy_iterator())
    print(len(t))
    print(t)

In [None]:
def show_dataset(dataset, n):
    dataset = dataset.prefetch(1)
    for tensor in dataset.take(n):
        print(tensor)
train_data = train_data.flat_map(lambda window: window.batch(window_length))
test_data = test_data.flat_map(lambda window: window.batch(window_length))

show_dataset(train_data, 2)

In [None]:
batch_size = 32
train_data = train_data.batch(batch_size)
test_data = test_data.batch(batch_size)
show_dataset(train_data, 2)

In [None]:
train_data = train_data.map(lambda window: (window[:, :-1, :], window[:, -1, :]))
test_data = test_data.map(lambda window: (window[:, :-1, :], window[:, -1, :]))
show_dataset(train_data, 2)

In [None]:
# Shape your input as (samples, timesteps, features) .
train_data = train_data.prefetch(1)
for X_batch, Y_batch in train_data.take(1):
    print(X_batch.shape, Y_batch.shape)

In [None]:
test_data = test_data.prefetch(1)
for X_batch, Y_batch in test_data.take(1):
    print(X_batch.shape, Y_batch.shape)

## Modelling

In [None]:
from tensorflow.keras.layers import BatchNormalization, Layer
from tensorflow.keras import activations

class BatchNormSimpleRNN(Layer):
    def __init__(self, units, return_sequences=False, **kwargs):
        super().__init__(**kwargs)
        self.simple_rnn = SimpleRNN(units, activation=None, return_sequences=return_sequences, **kwargs)
        self.batch_norm = BatchNormalization()
        self.activation = activations.tanh  # or any other activation you want

    def call(self, inputs, training=None):
        x = self.simple_rnn(inputs)
        x = self.batch_norm(x, training=training)
        x = self.activation(x)
        return x

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, SimpleRNN, LSTM, GRU, Input
from typing import Tuple, List

def create_model(kind: str = "simple", input_shape: Tuple[int, int, int] = (n_steps, 5), hidden_units: List = [128, 64]):
  model = tf.keras.models.Sequential()
  model.add(Input(shape=input_shape))

  if kind == "simple":
    model.add(BatchNormSimpleRNN(hidden_units[0], return_sequences=(len(hidden_units) > 1), dropout=0.2, recurrent_dropout=0.2))

    for units in hidden_units[1:-1]:
      model.add(BatchNormSimpleRNN(units, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))

    if len(hidden_units) > 1:
      model.add(BatchNormSimpleRNN(hidden_units[-1], return_sequences=False, dropout=0.2, recurrent_dropout=0.2))
  elif kind == "lstm":
    model.add(LSTM(hidden_units[0], return_sequences=(len(hidden_units) > 1), dropout=0.2, recurrent_dropout=0.2))

    for units in hidden_units[1:-1]:
      model.add(LSTM(units, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))

    if len(hidden_units) > 1:
      model.add(LSTM(hidden_units[-1], return_sequences=False, dropout=0.2, recurrent_dropout=0.2))
  elif kind == "gru":
    model.add(GRU(hidden_units[0], return_sequences=(len(hidden_units) > 1), dropout=0.2, recurrent_dropout=0.2))

    for units in hidden_units[1:-1]:
      model.add(GRU(units, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))

    if len(hidden_units) > 1:
      model.add(GRU(hidden_units[-1], return_sequences=False, dropout=0.2, recurrent_dropout=0.2))
  else:
    assert False, "Unsupported kind: " + kind

  model.add(Dense(input_shape[1]))
  model.compile(loss="mse", optimizer="adam", metrics=["mae"])
  return model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(
    monitor='val_mae',
    patience=3,
    restore_best_weights=True
)

epochs = 20

In [None]:
simple = create_model(kind="simple", hidden_units=[128, 64])
simple.summary()

In [None]:
simple.fit(train_data, epochs=epochs, validation_data=test_data, callbacks=[early_stopping])

In [None]:
result = pd.DataFrame(simple.history.history)
result.head()

In [None]:
result[['mae', 'val_mae']].plot(title="Mean Absolute Error vs Epoch")

In [None]:
result[['loss','val_loss']].plot(title="Loss vs Epoch")

In [None]:
gru = create_model(kind="gru", hidden_units=[128, 64])
gru.summary()

In [None]:
gru.fit(train_data, epochs=epochs, validation_data=test_data, callbacks=[early_stopping])

In [None]:
result = pd.DataFrame(simple.history.history)
result.head()

In [None]:
result[['mae', 'val_mae']].plot(title="Mean Absolute Error vs Epoch")

In [None]:
result[['loss','val_loss']].plot(title="Loss vs Epoch")