In [13]:
import pandas as pd

train_data = pd.read_csv("data/train.csv")
test_data = pd.read_csv("data/test.csv")

In [14]:
# Create the vocabulary.
vocab = list(set([w for text in train_data["text"] for w in text.split(" ")]))
vocab_size = len(vocab)
print("Vocabulary size:", vocab_size)

Vocabulary size: 18


In [15]:
# Assign indices to each word.
word_to_idx = {w: i for i, w in enumerate(vocab)}
word_to_idx

{'and': 0,
 'good': 1,
 'this': 2,
 'happy': 3,
 'bad': 4,
 'or': 5,
 'at': 6,
 'is': 7,
 'right': 8,
 'all': 9,
 'am': 10,
 'was': 11,
 'sad': 12,
 'now': 13,
 'very': 14,
 'not': 15,
 'earlier': 16,
 'i': 17}

In [16]:
import numpy as np


def createInputs(text):
    """
    Returns an array of one-hot vectors representing the words
    in the input text string.
    - text is a string
    - Each one-hot vector has shape (vocab_size, 1)
    """
    inputs = []
    for w in text.split(" "):
        v = np.zeros((vocab_size, 1))
        v[word_to_idx[w]] = 1
        inputs.append(v)
    return inputs

In [17]:
import numpy as np
from numpy.random import randn
from torch import tensor
from torch.nn import Sigmoid

# set random state to 42 for reproducibility
np.random.seed(42)


# default vectors is column vectors
# RNN Training: Stochastic Gradient Descent
# many-to-one architecture
class RNN:
    def __init__(self, m, d, p, lr):
        # m is the number of hidden units
        self.m = m
        # d is the number of inputs
        self.d = d
        # p is the number of classes
        self.p = p
        # lr is the learning rate
        self.lr = lr
        # initialize weights
        self.wi = randn(self.d, self.m) / 1000
        self.wh = randn(self.m, self.m) / 1000
        self.wo = randn(self.m, self.p) / 1000
        # initialize bias vectors
        self.bo = np.zeros((self.p, 1))
        self.bh = np.zeros((self.m, 1))
        # activation function
        self.sigmoid = Sigmoid()

    def forward_propagation(self, x, y):
        # feed-forward phase
        # create a 3d matrix to store the hidden states
        self.h = np.zeros((self.tau + 1, self.m, 1))
        for t in range(1, self.tau + 1):
            # activation function of the hidden layer is tanh
            self.h[t] = np.tanh(
                (self.wi.T @ x[t - 1]) + (self.wh.T @ self.h[t - 1]) + self.bh
            )
        self.o = tensor((self.wo.T @ self.h[self.tau]) + self.bo)
        self.o = self.sigmoid(self.o)
        self.o = np.array(self.o)
        self.n += int(np.argmax(self.o) == np.argmax(y))
        self.loss += -np.sum(np.log(self.o) * y)  # logarithm in base e

    def backpropagation(self):
        # backpropagation phase (loss function is cross-entropy)
        dL_do = self.o - self.y  # y is one-hot vector

        # calculate gradient of wo
        dL_dwo = self.h[self.tau] @ dL_do.T
        # calculate gradient of bo
        dL_dbo = dL_do

        # initialize gradients of weights to zero
        dL_dwh = np.zeros(self.wh.shape)
        dL_dwi = np.zeros(self.wi.shape)

        # initialize gradients of bias vectors to zero
        dL_dbh = np.zeros(self.bh.shape)

        # initialize gradients of hidden states to zero
        dL_dh = np.zeros((self.tau + 1, self.m, 1))

        # net gradients at h(tau)
        dL_dh[self.tau] = (1 - self.h[self.tau] ** 2) * (self.wo @ dL_do)

        # net gradients at h(t)
        for t in range(self.tau - 1, 0, -1):
            dL_dh[t] = (1 - self.h[t] ** 2) * (self.wh @ dL_dh[t + 1])

        for t in range(1, self.tau + 1):
            # gradients of weights
            dL_dwh += self.h[t - 1] @ dL_dh[t].T
            dL_dwi += self.x[t - 1] @ dL_dh[t].T

            # gradients of bias vectors
            dL_dbh += dL_dh[t]

        # update weights using gradient descent
        self.wo -= self.lr * dL_dwo
        self.wh -= self.lr * dL_dwh
        self.wi -= self.lr * dL_dwi

        # update bias vectors using gradient descent
        self.bo -= self.lr * dL_dbo
        self.bh -= self.lr * dL_dbh

    def train(self, data, test=False):
        self.n = 0  # number of correct predictions
        self.loss = 0  # total loss
        # data is a list of sequences
        for x, y in data:
            self.x = createInputs(x)
            self.tau = len(self.x)  # length of training sequence
            self.y = y
            # convert y to one-hot vector
            self.y = np.array([[1], [0]]) if y else np.array([[0], [1]])
            # feed-forward phase
            self.forward_propagation(self.x, self.y)
            if not test:
                # backpropagation phase
                self.backpropagation()
        return self.n / len(data), self.loss / len(data)

In [18]:
import random

# random seed for reproducibility
random.seed(42)

# m is the number of hidden units
m = 5
# d is the number of inputs
d = vocab_size
# p is the number of classes
p = 2
# learning rate
lr = 2e-2

# create the RNN model
rnn = RNN(m, d, p, lr)

# maxiter is the number of epochs
maxiter = 500

train_data = [(x, y) for x, y in zip(train_data["text"], train_data["label"])]
test_data = [(x, y) for x, y in zip(test_data["text"], test_data["label"])]

for i in range(maxiter):
    # shuffle the training data
    random.shuffle(train_data)
    acc, loss = rnn.train(train_data)
    if i % 100 == 99:
        print("--- Epoch %d" % (i + 1))
        print("Train:\tLoss %.3f | Accuracy: %.3f" % (loss, acc))

        # test the model with forward propagation
        acc, loss = rnn.train(test_data, test=True)
        print("Test:\tLoss %.3f | Accuracy: %.3f" % (loss, acc))

--- Epoch 100
Train:	Loss 0.690 | Accuracy: 0.552
Test:	Loss 0.698 | Accuracy: 0.500
--- Epoch 200
Train:	Loss 0.646 | Accuracy: 0.707
Test:	Loss 0.634 | Accuracy: 0.800
--- Epoch 300
Train:	Loss 0.657 | Accuracy: 0.621
Test:	Loss 0.699 | Accuracy: 0.500
--- Epoch 400
Train:	Loss 0.021 | Accuracy: 1.000
Test:	Loss 0.027 | Accuracy: 1.000
--- Epoch 500
Train:	Loss 0.009 | Accuracy: 1.000
Test:	Loss 0.011 | Accuracy: 1.000
