# LAB 3.1 - CNS (BPTT)


Import of libraries and fix of random seed.

In [1]:
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm


def set_seed(seed=0):
    random.seed(seed)
    np.random.seed(seed)


set_seed(0)

# Bonus track 3 -  BackPropagation Through-Time algorithm from scratch


Data loading.

In [2]:
def load_data(train: bool = True) -> tuple[np.array, np.array]:
    """
    Function able to load the dataset.

    train: Flag able to specify if retrieve trian or test set.

    return:
        tuple[np.array, np.array]: Tuple of X and Y of dataset.
    """
    filename = 'TRAIN' if train else 'TEST'
    data = np.loadtxt(f'ECG5000_dataset/ECG5000_{filename}.txt')
    X = np.expand_dims(data[:, 1:].T, -1)
    Y = data[:, 0].astype(int)
    Y -= Y.min()
    one_hot = np.zeros((*Y.shape, len(set(Y.tolist()))))
    for y, oh in zip(Y, one_hot):
        oh[y] = 1
    return X, one_hot


TR_SET = load_data(train=False)
TS_SET = load_data(train=True)

X_TR_SET, Y_TR_SET = TR_SET
X_TS_SET, Y_TS_SET = TS_SET

X_TR_SET.shape, Y_TR_SET.shape, X_TS_SET.shape, Y_TS_SET.shape

((140, 4500, 1), (4500, 5), (140, 500, 1), (500, 5))

Function able to simplify the plot of loss and metric.

In [3]:
def _base_plot(tr: list, ts: list, name: str, yscale: str = None) -> None:
    """
    Function able to plot training and test loss or metric results.
    """
    plt.figure(figsize=(20, 10))
    plt.plot(tr, label=f'tr_{name}')
    plt.plot(ts, label=f'ts_{name}')
    if yscale is not None:
        plt.yscale('log')
    plt.legend()
    plt.grid(True)
    plt.show()

Mean Square Error loss function definition.

In [4]:
def mse(model, X: np.array, Y: np.array) -> float:
    """
    MSE loss function.

    model: Model used to retrieve predictions given an input X.
    X: Input data.
    Y: Output data.

    returns:
        float: MSE result.
    """
    return ((Y - model(X)) ** 2).sum() / 2 * X.shape[1]

Accuracy metric function definition.

In [11]:
def accuracy(model, X: np.array, Y: np.array) -> float:
    """
    Accuracy metric function.

    model: Model used to retrieve predictions given an input X.
    X: Input data.
    Y: Output data.

    returns:
        float: Accuracy result.
    """
    return sum((Y.argmax(-1) - model(X).argmax(-1)) == 0) / Y.shape[0]

Class representing the vanilla RNN model where is implemented the BPTT learning algorithm.

In [12]:
class VanillaRNN:
    """
    Vanilla RNN model class.
    """

    def __init__(
            self,
            input_size: int,
            hidden_size: int,
            output_size: int
    ) -> None:
        """
        Vanilla RNN model constructor method.

        input_size: Input size.
        hidden_size: Hidden size.
        output_size: Output size.
        """
        self.hidden_size = hidden_size

        self.w_in = np.random.rand(input_size, hidden_size)
        self.w_hh = np.random.rand(hidden_size, hidden_size)
        self.w_out = np.random.rand(hidden_size, output_size)

        self.z_list = []
        self.h_list = []

    def __call__(self, *args, **kwargs) -> np.array:
        """
        Method able to call the forward function and return its output.

        returns:
            np.array: Forward output tensor.
        """
        return self.forward(*args, **kwargs)[0]

    def forward(
            self,
            X: np.array,
            h: np.array = None
    ) -> tuple[np.array, np.array]:
        """
        Method able to compute the forward step of the model.

        X: Input data to predict with forward method.
        h: Initial hidden state.

        returns:
            tuple[np.array, np.array]: Output and last hidden state.
        """
        if h is None:
            h = np.random.rand(X.shape[1], self.hidden_size)
        self.h_list.append(h)
        for x in X:
            z = x @ self.w_in + h @ self.w_hh
            h = np.tanh(z)
            self.z_list.append(z)
            self.h_list.append(h)
        output = h @ self.w_out
        return output, h

    def backward(
            self,
            X: np.array,
            Y: np.array
    ) -> tuple[np.array, np.array, np.array]:
        """
        Method able to compute the backward pass of the algorithm.

        X: Input data.
        Y: Output data.

        returns:
            tuple[np.array, np.array, np.array]: Gradients of weights.
        """
        self.z_list = []
        preds, h = self.forward(X)
        D_z_list = [1 - np.tanh(np.diag(np.diag(z))) ** 2 for z in self.z_list]
        g_mse = - 2 / X.shape[1] * (Y - preds)
        g_w_in = 0
        g_w_hh = 0
        g_out = h.T @ g_mse
        for t in range(X.shape[0]):
            product = 1
            for k in range(t, len(D_z_list) - 1):
                product *= D_z_list[k] @ self.w_hh.T
            base = product * D_z_list[-1] @ self.w_out @ g_mse.T
            g_w_in += base @ X[t]
            g_w_hh += base @ self.h_list[t]
        return g_w_in.T, g_w_hh, g_out

    def train(
            self,
            TR: tuple[np.array, np.array],
            TS: tuple[np.array, np.array],
            epochs: int = 10,
            lr: float = 0.1,
            momentum: float = 0.9,
            reg: float = 0.01,
            clip: float = 1
    ) -> tuple[np.array, np.array, np.array, np.array]:
        """
        Method able to train the model.

        TR: Training set.
        TS: Test set.
        epochs: Number of epochs used to train the model.
        lr: Learning rate of SGD.
        momentum: Hyperparameter of momentum.
        reg: Hyperparameter of Thikonov regularization.
        clip: Hyperparameter og clipping gradient.

        returns:
            tuple[np.array, np.array, np.array, np.array]: Training and test mse and accuracies.
        """
        prev_g_w_in, prev_g_w_hh, prev_g_out = 0, 0, 0
        clip_grad = lambda x: clip * x / np.linalg.norm(x)
        X_TR, Y_TR = TR
        X_TS, Y_TS = TS
        tr_loss, ts_loss = [], []
        tr_metric, ts_metric = [], []
        for _ in tqdm(range(epochs)):
            g_w_in, g_w_hh, g_out = [clip_grad(w) for w in self.backward(X_TR, Y_TR)]

            v_in = momentum * prev_g_w_in + g_w_in
            self.w_in -= lr * v_in + reg * np.linalg.norm(self.w_in) ** 2
            prev_g_w_in = v_in

            v_hh = momentum * prev_g_w_hh + g_w_hh
            self.w_hh -= lr * v_hh + reg * np.linalg.norm(self.w_hh) ** 2
            prev_g_w_hh = v_hh

            v_out = momentum * prev_g_out + g_out
            self.w_out -= lr * v_out + reg * np.linalg.norm(self.w_out) ** 2
            prev_g_out = v_out

            tr_loss.append(mse(self, X_TR, Y_TR))
            ts_loss.append(mse(self, X_TS, Y_TS))
            tr_metric.append(accuracy(self, X_TR, Y_TR))
            ts_metric.append(accuracy(self, X_TS, Y_TS))
        return tr_loss, ts_loss, tr_metric, ts_metric

Model creation and training.

In [None]:
model = VanillaRNN(X_TR_SET.shape[-1], 100, Y_TR_SET.shape[-1])

tr_loss, ts_loss, tr_metric, ts_metric = model.train(TR_SET, TS_SET, epochs=20, momentum=0.9, lr=0.01, reg=0.0001, clip=0.8)

  0%|          | 0/20 [00:00<?, ?it/s]

Plot of training and test MSE.

In [None]:
_base_plot(tr_loss, ts_loss, name='mse', yscale='log')

Plot of training and test Accuracy.

In [None]:
_base_plot(tr_metric, ts_metric, name='accuracy')