In [None]:
import pandas as pd #For working with data tables and dataframes
import numpy as np #Fro numerical operations
import matplotlib.pyplot as plt #For making plots

In [12]:
from azureml.core import Workspace, Dataset

subscription_id = 'b23ddd02-1d4d-4c80-8ef3-f68a97a0dab6'
resource_group = 'MLDev'
workspace_name = 'project_groundhog'

workspace = Workspace(subscription_id, resource_group, workspace_name)

dataset = Dataset.get_by_name(workspace, name='Treasury')
dataset.to_pandas_dataframe()

{'infer_column_types': 'False', 'activity': 'to_pandas_dataframe'}
{'infer_column_types': 'False', 'activity': 'to_pandas_dataframe', 'activityApp': 'TabularDataset'}


Unnamed: 0,observation_date,DGS10
0,1962-01-02,4.06
1,1962-01-03,4.03
2,1962-01-04,3.99
3,1962-01-05,4.02
4,1962-01-08,4.03
...,...,...
16575,2025-07-15,4.50
16576,2025-07-16,4.46
16577,2025-07-17,4.47
16578,2025-07-18,4.44


The power of the LSTM is in its cell. The cell, in an RNN, is basically the fundamental building block that processes sequential data. It takes in both the current input and the previous cell's output (the hidden state) as input,  and produces a new output and hidden state. 

The gates in an LSTM are what allows it to maintain a long and short term memory. There are 3. 

The Input Gate decides how much new information to keep in the cell state. It uses a sigmoid function tofilter incoming data, determining what weights and biases need to be updated.

The Forget Gate determines how much old information to remove from the cell state. It also uses a sigmoid function to review current inputs and past outputs, and decides wether to retain or discard previous states. The goal is to keep the network free form unnecessary/irrelivant data/

The Output Gate manages what the next hidden layer or the output will receive from the cell state. It uses tanh function on the filtered cell state to scale the values, deciding, based on the sigmoid's state what should be passed onto the output.

This will be shown later

WeightInitializer
- Blueprint for initializing weights for the neural network

In [None]:
class WeightInitializer: 

    #setup function that runs when yyou create a new weight initializer. It remembers the method you want to use for initializing weights, an ddefaults to 'random' if you don't specify one.

    def __init__(self, method='random'): 
        self.method = method

    #This function creates the actual weights. The "shape" tells it how many weights to create and how to arrange them

    def initialize(self, shape):
        if self.method == 'random':
            return np.random.randn(*shape) #Creates random numbers from a bell curve
        elif self.method == 'xavier':
            return np.random.randn(*shape) / np.sqrt(shape[0]) #Divides those random numbers by the square root of the first dimension - this helps prevent numbers from getting too big or small
        elif self.method == 'he':
            return np.random.randn(*shape) * np.sqrt(2. / shape[0]) #Multiplies random numbers by a specific factor
        elif self.method == 'uniform':
            return np.random.uniform(-1, 1, shape) #Creates random numbers evenly spread between -1 and 1
        else:
            raise ValueError("Unknown initialization method: {}".format(self.method))

PlotManager
- Blueprint for plotting loss functions

In [None]:
class PlotManager:

    #This sets up a figure with space for 3 graphs stacked vertically, sized 6 inches wide by 4 inches tall.

    def __init__(self):
        self.fig, self.ax = plt.subplots(3, 1, figsize=(6, 4))

    #This function draws two lines on a graph: One line showing how training loss changed over time. Another line showing validation loss (how well the model performs on data it hasn't seen)

    def plot_losses(self, train_losses, val_losses):
        self.ax.plot(train_losses, label='Training Loss')
        self.ax.plot(val_losses, label='Validation Loss')
        self.ax.set_title('Training and Validation Losses')
        self.ax.set_xlabel('Epoch')
        self.ax.set_ylabel('Loss')
        self.ax.legend()

    #shows the plots

    def show_plots(self):
        plt.tight_layout()

EarlyStopping
- Blueprint for early stopping after a given epoch, in order to prevent overfitting. 
- This class monitors the validation loss and stops training if it does not improve for a specified number of epochs (patience).
- It can also print messages when the loss does not improve, depending on the verbose flag.
Args:
- patience (int): Number of epochs to wait before stopping the training. Default is 7.
- verbose (bool): If True, prints a message for each epoch where the loss does not improve.
- delta (float): Minimum change in the monitored quantity to qualify as an improvement. Default is 0.

In [None]:
class EarlyStopping:

    def __init__(self, patience=7, verbose=False, delta=0):
        self.patience = patience #Number of epochs to wait before stopping
        self.verbose = verbose #If True, prints messages when the loss does not improve
        self.counter = 0 #Counter for how many epochs the loss has not improved
        self.best_score = None #Best score seen so far
        self.early_stop = False #Flag to indicate if training should stop
        self.delta = delta #Minimum change in the monitored quantity to qualify as an improvement

    #This function is called at the end of each epoch to check if the model should stop training based on the validation loss.
    #If the validation loss does not improve for a number of epochs equal to patience, it sets early_stop to True.
    #If verbose is True, it prints a message indicating that the loss did not improve.

    def __call__(self, val_loss):
        #Determines if the model should stop training.

        # Args: val_loss (float): The loss of the model on the validation set.
        
        score = -val_loss #flips the sign of the validation loss to make it a score (lower loss is better)

        if self.best_score is None: 
            self.best_score = score # self.best_score is set to the first score seen

        elif score < self.best_score + self.delta:
            self.counter += 1 # If the score has not improved by at least delta, increment the counter

            if self.counter >= self.patience:
                self.early_stop = True # If the counter exceeds patience, set early_stop to True
        else:
            self.best_score = score
            self.counter = 0 # If the score has improved, reset the counter to 0

Class for LSTM Network

    - input_size: int, dimensionality of input space
    - hidden_size: int, number of LSTM units
    - output_size: int, dimensionality of output space
    - init_method: str, weight initialization method (default: 'xavier')

In [None]:
class LSTM:
    """
    Long Short-Term Memory (LSTM) network.

    Parameters:
    - input_size: int, dimensionality of input space
    - hidden_size: int, number of LSTM units
    - output_size: int, dimensionality of output space
    - init_method: str, weight initialization method (default: 'xavier')
    """
    def __init__(self, input_size, hidden_size, output_size, init_method='xavier'):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.weight_initializer = WeightInitializer(method=init_method)

        # Initialize weights
        self.wf = self.weight_initializer.initialize((hidden_size, hidden_size + input_size)) # Forget gate weights
        self.wi = self.weight_initializer.initialize((hidden_size, hidden_size + input_size)) # Input gate weights
        self.wo = self.weight_initializer.initialize((hidden_size, hidden_size + input_size)) # Output gate weights
        self.wc = self.weight_initializer.initialize((hidden_size, hidden_size + input_size)) # Candiate gate weights. Creates new candidate information to potentially store in the cell state

        # Initialize biases for all gates starting at zero--industry standard
        self.bf = np.zeros((hidden_size, 1)) 
        self.bi = np.zeros((hidden_size, 1))
        self.bo = np.zeros((hidden_size, 1))
        self.bc = np.zeros((hidden_size, 1))

        # Initialize output layer weights and biases. 
        # These are for the final output of the LSTM network, which maps the hidden state to the desired output space.
        self.why = self.weight_initializer.initialize((output_size, hidden_size)) 
        self.by = np.zeros((output_size, 1))

    @staticmethod
    def sigmoid(z):
        """
        Sigmoid activation function.

        Parameters:
        - z: np.ndarray, input to the activation function

        Returns:
        - np.ndarray, output of the activation function
        """
        return 1 / (1 + np.exp(-z))

    @staticmethod
    def dsigmoid(y):
        """
        Derivative of the sigmoid activation function.

        Parameters:
        - y: np.ndarray, output of the sigmoid activation function

        Returns:
        - np.ndarray, derivative of the sigmoid function
        """
        return y * (1 - y)

    @staticmethod
    def dtanh(y):
        """
        Derivative of the hyperbolic tangent activation function.

        Parameters:
        - y: np.ndarray, output of the hyperbolic tangent activation function

        Returns:
        - np.ndarray, derivative of the hyperbolic tangent function
        """
        return 1 - y * y

    def forward(self, x):

        # This function processes input data through the LSTM network, moving one time step at a time.

        """
        Forward pass through the LSTM network.

        Parameters:
        - x: np.ndarray, input to the network

        Returns:
        - np.ndarray, output of the network
        - list, caches containing intermediate values for backpropagation
        """
        caches = [] # List to store intermediate values for backpropagation
        h_prev = np.zeros((self.hidden_size, 1)) #previous hidden state. Short term memory of the LSTM
        c_prev = np.zeros((self.hidden_size, 1)) # Previous cell state. Long term memory of the LSTM
        h = h_prev 
        c = c_prev

        # Loop through each time step in the input sequence
        # x.shape[0] is the number of time steps in the input sequence
        # x[t] is the input at time step t, reshaped to match the expected input shape
        # combined is the concatenation of the previous hidden state and the current input at time step t
        # f, i, o, c_ are the forget, input, output gates and candidate cell state respectively
        # c is the new cell state and h is the new hidden state
        # The caches list stores all intermediate values needed for backpropagation
        # The final output y is computed by multiplying the hidden state h with the output layer weights and adding the output layer bias
        # The function returns the output y and the caches list


        #Add in for loop to forget weights after 60 steps

        for t in range(x.shape[0]): 
            x_t = x[t].reshape(-1, 1)
            combined = np.vstack((h_prev, x_t))

            f = self.sigmoid(np.dot(self.wf, combined) + self.bf) # Decides what to forget from long term memory. 
            i = self.sigmoid(np.dot(self.wi, combined) + self.bi) # How much of the new information to store
            o = self.sigmoid(np.dot(self.wo, combined) + self.bo) # What parts of the memory to output
            c_ = np.tanh(np.dot(self.wc, combined) + self.bc) # deals with new candidate information to potentially store in the cell state

            c = f * c_prev + i * c_ #New long-term memory = (forget gate × old memory) + (input gate × new candidate)
            h = o * np.tanh(c) #New short-term memory = output gate × processed long-term memory

            cache = (h_prev, c_prev, f, i, o, c_, x_t, combined, c, h) #Saves all intermediate values needed for backpropagation
            caches.append(cache)

            h_prev, c_prev = h, c # Update previous hidden and cell states for the next time step

        y = np.dot(self.why, h) + self.by # Final output of the network, computed by multiplying the hidden state with the output layer weights and adding the output layer bias
        return y, caches

    def backward(self, dy, caches, clip_value=1.0):
        """
        Backward pass through the LSTM network.

        Parameters:
        - dy: np.ndarray, gradient of the loss with respect to the output
        - caches: list, caches from the forward pass
        - clip_value: float, value to clip gradients to (default: 1.0)

        Returns:
        - tuple, gradients of the loss with respect to the parameters
        """
        dWf, dWi, dWo, dWc = [np.zeros_like(w) for w in (self.wf, self.wi, self.wo, self.wc)] #create empty arrays for gradients of weights. 
            # np.zeros_like(w) means "make an array of zeros that's the exact same size and shape as self.wf"
        dbf, dbi, dbo, dbc = [np.zeros_like(b) for b in (self.bf, self.bi, self.bo, self.bc)] #create empty arrays for gradients of biases
        dWhy = np.zeros_like(self.why)
        dby = np.zeros_like(self.by)

        # Ensure dy is reshaped to match output size
        dy = dy.reshape(self.output_size, -1)
        dh_next = np.zeros((self.hidden_size, 1))  # shape must match hidden_size
        dc_next = np.zeros_like(dh_next) # shape must match hidden_size

        for cache in reversed(caches):
            h_prev, c_prev, f, i, o, c_, x_t, combined, c, h = cache # For each time step, we are unpacking the cached values from the forward pass

            # Add gradient from next step to current output gradient
            dh = np.dot(self.why.T, dy) + dh_next # Calculates how much the hidden state should change based on the output gradient and the previous hidden state gradient
            dc = dc_next + (dh * o * self.dtanh(np.tanh(c))) # How much the cell state should change based on the hidden state gradient and the output gate
        
            # Calculate gradients for each gate. Figuring out how much of the blame for the error should go to each gate
            # df, di, do, dc_ are the gradients for the forget gate, input gate, output gate, and candidate cell state respectively
            # They are calculated using the chain rule and the derivatives of the activation functions
            df = dc * c_prev * self.dsigmoid(f)
            di = dc * c_ * self.dsigmoid(i)
            do = dh * self.dtanh(np.tanh(c))
            dc_ = dc * i * self.dtanh(c_)

            #

            dcombined_f = np.dot(self.wf.T, df)
            dcombined_i = np.dot(self.wi.T, di)
            dcombined_o = np.dot(self.wo.T, do)
            dcombined_c = np.dot(self.wc.T, dc_)

            dcombined = dcombined_f + dcombined_i + dcombined_o + dcombined_c
            dh_next = dcombined[:self.hidden_size]
            dc_next = f * dc

            dWf += np.dot(df, combined.T)
            dWi += np.dot(di, combined.T)
            dWo += np.dot(do, combined.T)
            dWc += np.dot(dc_, combined.T)

            dbf += df.sum(axis=1, keepdims=True)
            dbi += di.sum(axis=1, keepdims=True)
            dbo += do.sum(axis=1, keepdims=True)
            dbc += dc_.sum(axis=1, keepdims=True)

        dWhy += np.dot(dy, h.T)
        dby += dy

        gradients = (dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby)

        # Gradient clipping
        for i in range(len(gradients)):
            np.clip(gradients[i], -clip_value, clip_value, out=gradients[i])

        return gradients

    def update_params(self, grads, learning_rate):
        """
        Update the parameters of the network using the gradients.

        Parameters:
        - grads: tuple, gradients of the loss with respect to the parameters
        - learning_rate: float, learning rate
        """
        dWf, dWi, dWo, dWc, dbf, dbi, dbo, dbc, dWhy, dby = grads

        self.wf -= learning_rate * dWf
        self.wi -= learning_rate * dWi
        self.wo -= learning_rate * dWo
        self.wc -= learning_rate * dWc

        self.bf -= learning_rate * dbf
        self.bi -= learning_rate * dbi
        self.bo -= learning_rate * dbo
        self.bc -= learning_rate * dbc

        self.why -= learning_rate * dWhy
        self.by -= learning_rate * dby

In [10]:
class LSTMTrainer:
    """
    Trainer for the LSTM network.

    Parameters:
    - model: LSTM, the LSTM network to train
    - learning_rate: float, learning rate for the optimizer
    - patience: int, number of epochs to wait before early stopping
    - verbose: bool, whether to print training information
    - delta: float, minimum change in validation loss to qualify as an improvement
    """
    def __init__(self, model, learning_rate=0.01, patience=7, verbose=True, delta=0):
        self.model = model
        self.learning_rate = learning_rate
        self.train_losses = []
        self.val_losses = []
        self.early_stopping = EarlyStopping(patience, verbose, delta)

    def train(self, X_train, y_train, X_val=None, y_val=None, epochs=10, batch_size=1, clip_value=1.0):
        """
        Train the LSTM network.

        Parameters:
        - X_train: np.ndarray, training data
        - y_train: np.ndarray, training labels
        - X_val: np.ndarray, validation data
        - y_val: np.ndarray, validation labels
        - epochs: int, number of training epochs
        - batch_size: int, size of mini-batches
        - clip_value: float, value to clip gradients to
        """
        for epoch in range(epochs):
            epoch_losses = []
            for i in range(0, len(X_train), batch_size):
                batch_X = X_train[i:i + batch_size]
                batch_y = y_train[i:i + batch_size]
                losses = []

                for x, y_true in zip(batch_X, batch_y):
                    y_pred, caches = self.model.forward(x)
                    loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))
                    losses.append(loss)

                    # Backpropagation to get gradients
                    dy = y_pred - y_true.reshape(-1, 1)
                    grads = self.model.backward(dy, caches, clip_value=clip_value)
                    self.model.update_params(grads, self.learning_rate)

                batch_loss = np.mean(losses)
                epoch_losses.append(batch_loss)

            avg_epoch_loss = np.mean(epoch_losses)
            self.train_losses.append(avg_epoch_loss)

            if X_val is not None and y_val is not None:
                val_loss = self.validate(X_val, y_val)
                self.val_losses.append(val_loss)
                print(f'Epoch {epoch + 1}/{epochs} - Loss: {avg_epoch_loss:.5f}, Val Loss: {val_loss:.5f}')

                # Check early stopping condition
                self.early_stopping(val_loss)
                if self.early_stopping.early_stop:
                    print("Early stopping")
                    break
            else:
                print(f'Epoch {epoch + 1}/{epochs} - Loss: {avg_epoch_loss:.5f}')

    def compute_loss(self, y_pred, y_true):
        """
        Compute mean squared error loss.
        """
        return np.mean((y_pred - y_true) ** 2)

    def validate(self, X_val, y_val):
        """
        Validate the model on a separate set of data.
        """
        val_losses = []
        for x, y_true in zip(X_val, y_val):
            y_pred, _ = self.model.forward(x)
            loss = self.compute_loss(y_pred, y_true.reshape(-1, 1))
            val_losses.append(loss)
        return np.mean(val_losses)

In [11]:
class TimeSeriesDataset:
    """
    Dataset class for time series data.

    Parameters:
    - ticker: str, stock ticker symbol
    - start_date: str, start date for data retrieval
    - end_date: str, end date for data retrieval
    - look_back: int, number of previous time steps to include in each sample
    - train_size: float, proportion of data to use for training
    """
    def __init__(self, start_date, end_date, look_back=1, train_size=0.67):
        self.start_date = start_date
        self.end_date = end_date
        self.look_back = look_back
        self.train_size = train_size

    def load_data(self):
        """
        Load stock data.

        Returns:
        - np.ndarray, training data
        - np.ndarray, testing data
        """
        df = pd.read_csv('data/google.csv')
        df = df[(df['Date'] >= self.start_date) &amp; (df['Date'] <= self.end_date)]
        df = df.sort_index()
        df = df.loc[self.start_date:self.end_date]
        df = df[['Close']].astype(float)  # Use closing price
        df = self.MinMaxScaler(df.values)  # Convert DataFrame to numpy array
        train_size = int(len(df) * self.train_size)
        train, test = df[0:train_size,:], df[train_size:len(df),:]
        return train, test

    def MinMaxScaler(self, data):
        """
        Min-max scaling of the data.

        Parameters:
        - data: np.ndarray, input data
        """
        numerator = data - np.min(data, 0)
        denominator = np.max(data, 0) - np.min(data, 0)
        return numerator / (denominator + 1e-7)

    def create_dataset(self, dataset):
        """
        Create the dataset for time series prediction.

        Parameters:
        - dataset: np.ndarray, input data

        Returns:
        - np.ndarray, input data
        - np.ndarray, output data
        """
        dataX, dataY = [], []
        for i in range(len(dataset)-self.look_back):
            a = dataset[i:(i + self.look_back), 0]
            dataX.append(a)
            dataY.append(dataset[i + self.look_back, 0])
        return np.array(dataX), np.array(dataY)

    def get_train_test(self):
        """
        Get the training and testing data.

        Returns:
        - np.ndarray, training input
        - np.ndarray, training output
        - np.ndarray, testing input
        - np.ndarray, testing output
        """
        train, test = self.load_data()
        trainX, trainY = self.create_dataset(train)
        testX, testY = self.create_dataset(test)
        return trainX, trainY, testX, testY

SyntaxError: invalid syntax (2334281612.py, line 27)

In [None]:
# Instantiate the dataset
dataset = TimeSeriesDataset( '2010-1-1', '2020-12-31', look_back=1)
trainX, trainY, testX, testY = dataset.get_train_test()

# Reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))

look_back = 1  # Number of previous time steps to include in each sample
hidden_size = 256  # Number of LSTM units
output_size = 1  # Dimensionality of the output space

lstm = LSTM(input_size=1, hidden_size=hidden_size, output_size=output_size)

trainer = LSTMTrainer(lstm, learning_rate=1e-3, patience=50, verbose=True, delta=0.001)
trainer.train(trainX, trainY, testX, testY, epochs=1000, batch_size=32)

plot_manager = PlotManager()

# Inside your training loop
plot_manager.plot_losses(trainer.train_losses, trainer.val_losses)

# After your training loop
plot_manager.show_plots()