In [18]:
from dataset import train_data, test_data
import numpy as np 

In [19]:
[train_data.keys()]

[dict_keys(['good', 'bad', 'happy', 'sad', 'not good', 'not bad', 'not happy', 'not sad', 'very good', 'very bad', 'very happy', 'very sad', 'i am happy', 'this is good', 'i am bad', 'this is bad', 'i am sad', 'this is sad', 'i am not happy', 'this is not good', 'i am not bad', 'this is not sad', 'i am very happy', 'this is very good', 'i am very bad', 'this is very sad', 'this is very happy', 'i am good not bad', 'this is good not bad', 'i am bad not good', 'i am good and happy', 'this is not good and not happy', 'i am not at all good', 'i am not at all bad', 'i am not at all happy', 'this is not at all sad', 'this is not at all happy', 'i am good right now', 'i am bad right now', 'this is bad right now', 'i am sad right now', 'i was good earlier', 'i was happy earlier', 'i was bad earlier', 'i was sad earlier', 'i am very bad right now', 'this is very good right now', 'this is very sad right now', 'this was bad earlier', 'this was very good earlier', 'this was very bad earlier', 'thi

In [20]:
vocab = list(set([w for text in train_data.keys() for w in text.split(" ") ]))
vocab_size = len(vocab)
print(f'{vocab_size}unique words found')

30unique words found


In [21]:

w2idx = {w:i for i,w in enumerate(vocab)} # lookup table
idx2w = {i:w for i,w in enumerate(vocab)} # loopup table
w2idx["earlier"], idx2w[1]

(16, 'not')

In [22]:

def word2vec(input_text:str):
    """
    - simply one-hot-encoding
    """
    vectors = []
    for word in input_text.split(" "):
        vector = np.zeros((vocab_size,1))
        vector[w2idx[word]] = 1
        vectors.append(vector)
    return vectors

vec = word2vec("very good very delighted")
np.array(vec).shape

(4, 30, 1)

### in vanilla RNN , we have 3 weights(xh,hh,hy) and 2 biases (bh,by) 

### lets build the forward phase

In [23]:
from typing import List
class VanillaRNN:
    def __init__(self,input_size,output_size,hidden_size=64):
        # weights 
        self.Whh = np.random.randn(hidden_size,hidden_size)/1000
        self.Wxh = np.random.randn(hidden_size,input_size)/1000
        self.Why = np.random.randn(output_size,hidden_size)/1000

        # biases
        self.bh = np.zeros((hidden_size,1))
        self.by = np.zeros((output_size,1))

    def forward(self,inputs:List[List]):
        """
        performs forward pass for the given list of input vectors
        returns the final output and the hidden state
        designed for many to one problem task, so produced single, final output.
        """
        # initialize the hidden state chain : first hidden state
        # since there is no previous h
        h = np.zeros((self.Whh.shape[0],1))

        for i,x in enumerate(inputs):
            h = np.tanh(self.Wxh@x + self.Whh@h + self.bh)
        
        y = self.Why@h + self.by

        return y,h
    


def softmax(xs):
  """to convert the output logits to prob."""
  return np.exp(xs) / sum(np.exp(xs))

rnn = VanillaRNN(vocab_size, 2) # we want to generate 1 and -1 logits.

inputs = word2vec('i am very good and delighted')
out, h = rnn.forward(inputs)
probs = softmax(out)
print(probs) # [[0.50000095], [0.49999905]]



[[0.49999996]
 [0.50000004]]


### Backward Pass for training
- since our task is classification, lets use cross-entropy loss
    which is -ln(p)

- y : raw output from VanillaRNN
- p : final probs = softmax(y)
- c : correct class
- L : cross-entropy loss
- Wxh,Whh,Why : associated weights
- bh,by : associated biases

In [24]:
# to proceed, we need to have access to last_inputs and last hidden states

# GRADIENT of the Loss: dL/dy
"""
L = -ln(p_c) = -ln(softmax(y_c))
dL/dy : 
{
    pi   if  i!=c
    pi-1 if  i==c
}
For example, if we have p=[0.2,0.2,0.6] and the correct class is c=0, 
then we'd get ∂L∂y = [ -0.8 , -0.8 , 0.6 ]
"""


class VanillaRNN:
    def __init__(self,input_size,output_size,hidden_size=64):
        # weights 
        self.Whh = np.random.randn(hidden_size,hidden_size)/1000
        self.Wxh = np.random.randn(hidden_size,input_size)/1000
        self.Why = np.random.randn(output_size,hidden_size)/1000

        # biases
        self.bh = np.zeros((hidden_size,1))
        self.by = np.zeros((output_size,1))

    def forward(self,inputs:List[List]):
        """
        performs forward pass for the given list of input vectors
        returns the final output and the hidden state
        """
        # initialize the hidden state chain : first hidden state
        # since there is no previous h
        h = np.zeros((self.Whh.shape[0],1))

        self.last_inputs = inputs
        self.last_hs = {0:h}

        # hidden state chain
        for i,x in enumerate(inputs):
            h = np.tanh(self.Wxh@x + self.Whh@h + self.bh)
            self.last_hs.update({i+1:h})

        y = self.Why@h + self.by
        #print(self.last_hs)
        return y,h
    

    def backpropagate(self,d_y, learning_rate = 2e-2):
        """
        performs backward pass
        d_y : calculated loss
        """
        
        n = len(self.last_inputs)

        # Calculate dL/dWhy and dL/dby.
        d_Why = d_y @ self.last_hs[n].T
        d_by = d_y

        # Initialize dL/dWhh, dL/dWxh, and dL/dbh to zero.
        d_Whh = np.zeros(self.Whh.shape) # or use zeros_like
        d_Wxh = np.zeros(self.Wxh.shape)
        d_bh = np.zeros(self.bh.shape)

        # Calculate dL/dh for the last h.
        d_h = self.Why.T @ d_y

        # Backpropagate through time.
        for t in reversed(range(n)):
            # An intermediate value: dL/dh * (1 - h^2)
            temp = ((1 - self.last_hs[t + 1] ** 2) * d_h) # related to tanh

            #---GRADIENT ACCUMULATION---#
            
            # dL/db = dL/dh * (1 - h^2)
            d_bh += temp

            # dL/dWhh = dL/dh * (1 - h^2) * h_{t-1}
            d_Whh += temp @ self.last_hs[t].T

            # dL/dWxh = dL/dh * (1 - h^2) * x
            d_Wxh += temp @ self.last_inputs[t].T

            # Next dL/dh = dL/dh * (1 - h^2) * Whh
            d_h = self.Whh @ temp

        # Clip to prevent exploding gradients.
        for d in [d_Wxh, d_Whh, d_Why, d_bh, d_by]:
            np.clip(d, -1, 1, out=d)

        # Update weights and biases using gradient descent.
        self.Whh -= learning_rate * d_Whh
        self.Wxh -= learning_rate * d_Wxh
        self.Why -= learning_rate * d_Why
        self.bh -= learning_rate * d_bh
        self.by -= learning_rate * d_by

In [25]:
model = VanillaRNN(vocab_size, 2,hidden_size=64)

for x,y in train_data.items():
    inputs = word2vec(x)
    target = int(y)

    # forward pass 
    out, _ = model.forward(inputs)
    probs = softmax(out)

    # calculate dL/dy
    probs[target] -=1 
    dL_dy = probs 
    # backward pass 
    model.backpropagate(dL_dy)

resource: https://victorzhou.com/blog/intro-to-rnns/

In [33]:
model = VanillaRNN(vocab_size, 2,hidden_size=64)
import random
def processData(data, backprop=True):

    '''
    Returns the RNN's loss and accuracy for the given data.
    - data is a dictionary mapping text to True or False.
    - backprop determines if the backward phase should be run.
    '''
    items = list(data.items())
    random.shuffle(items)

    loss = 0
    num_correct = 0

    for x, y in items:
        inputs = word2vec(x)
        target = int(y)

        # Forward
        out, _ = model.forward(inputs)
        probs = softmax(out)
        
        # Calculate loss / accuracy
        loss -= np.log(probs[target])
        num_correct += int(np.argmax(probs) == target)

        if backprop:
            # Build dL/dy
            d_L_d_y = probs
            d_L_d_y[target] -= 1

            # Backward
            model.backpropagate(d_L_d_y)

    return loss / len(data), num_correct / len(data)

In [27]:
model.Whh[0].shape

(64,)

In [34]:
# Training loop
for epoch in range(1001):
  train_loss, train_acc = processData(train_data)

  if epoch % 100 == 0:
    print('--- Epoch %d' % (epoch + 1))
    print('Train:\tLoss %.3f | Accuracy: %.3f' % (train_loss, train_acc))

    test_loss, test_acc = processData(test_data, backprop=False)
    print('Test:\tLoss %.3f | Accuracy: %.3f' % (test_loss, test_acc))

--- Epoch 1
Train:	Loss 0.695 | Accuracy: 0.510
Test:	Loss 0.697 | Accuracy: 0.457


  print('Train:\tLoss %.3f | Accuracy: %.3f' % (train_loss, train_acc))
  print('Test:\tLoss %.3f | Accuracy: %.3f' % (test_loss, test_acc))


--- Epoch 101
Train:	Loss 0.660 | Accuracy: 0.603
Test:	Loss 0.701 | Accuracy: 0.471
--- Epoch 201
Train:	Loss 0.090 | Accuracy: 0.974
Test:	Loss 0.336 | Accuracy: 0.943
--- Epoch 301
Train:	Loss 0.004 | Accuracy: 1.000
Test:	Loss 0.303 | Accuracy: 0.929
--- Epoch 401
Train:	Loss 0.002 | Accuracy: 1.000
Test:	Loss 0.310 | Accuracy: 0.943
--- Epoch 501
Train:	Loss 0.001 | Accuracy: 1.000
Test:	Loss 0.325 | Accuracy: 0.957
--- Epoch 601
Train:	Loss 0.001 | Accuracy: 1.000
Test:	Loss 0.320 | Accuracy: 0.971
--- Epoch 701
Train:	Loss 0.001 | Accuracy: 1.000
Test:	Loss 0.349 | Accuracy: 0.971
--- Epoch 801
Train:	Loss 0.001 | Accuracy: 1.000
Test:	Loss 0.346 | Accuracy: 0.957
--- Epoch 901
Train:	Loss 0.000 | Accuracy: 1.000
Test:	Loss 0.403 | Accuracy: 0.943
--- Epoch 1001
Train:	Loss 0.000 | Accuracy: 1.000
Test:	Loss 0.362 | Accuracy: 0.957


In [35]:
def predict(input_text):
    
    inputs = word2vec(input_text)
    
    # Forward
    out, _ = model.forward(inputs)
    probs = softmax(out)
    print(probs)
    return bool(np.argmax(probs))

In [38]:
predict('this is not at all joyful'), test_data['this is not at all joyful']

[[0.99476383]
 [0.00523617]]


(False, False)

In [54]:
cls = predict('this is joyful at all')
print("NEGATIVE") if cls==0 else "POSITIVE"

[[0.00856183]
 [0.99143817]]


'POSITIVE'

In [61]:
predict('this is not joyful at all'), predict('this is not bad at all')

[[9.99999534e-01]
 [4.65783008e-07]]
[[0.01279713]
 [0.98720287]]


(False, True)