# Importing packages

In [1]:
import re
import os
import time
import nltk
import string
import random
import numpy as np
from numpy.random import randn
from nltk.tokenize import word_tokenize
from sklearn.metrics import accuracy_score

# Defining all functions

#### Functons to load data

In [2]:
def dicList(keys):
    return {key : [] for key in keys}
def loadReview(path):
    file = open(path, "r", encoding = "utf8")
    review = file.read()
    file.close()
    return(review)
def loadData(path):
    data = dicList(dataLabel)
    for Label in dataLabel:
        pathNew = os.path.join(path, Label)
        for review in os.listdir(pathNew):
            reviewPath = os.path.join(pathNew, review)
            review = loadReview(reviewPath)
            data[Label].append(review)
    return data

#### Function to process the data

In [3]:
# Function to remove digits, punctuation and extra space
def dataPreprocessing(text):
    # Removs digits
    def removeDigits(text): 
        result = re.sub(r'\d+', '', text) 
        return result 
    # Removes punctuation 
    def removePunctuation(text): 
        translator = str.maketrans('', '', string.punctuation.replace("'", "")) 
        return text.translate(translator) 
    # Removes whitespace from text 
    def removeSpaces(text): 
        return  " ".join(text.split())
    # Main function
    if __name__=="__main__":
        text = text.lower()
        text = removeDigits(text)
        text = removePunctuation(text)
        text = removeSpaces(text)
        return text

def performProcessing(data):
    for Label in dataLabel:
        for index in range(len(data[Label])):
            data[Label][index] = dataPreprocessing(data[Label][index])
    return data

In [4]:
# Function to return unique words
def getUniquewords(data):
    dataVocab = set()
    for Label in dataLabel:
        for review in data[Label]:
            dataVocab.update(review.split())
    return dataVocab

In [5]:
# Function to check stoword
def checkStopwords(word):
    stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'you', "you're", "you've", "you'll", "you'd", 'your', 
                 'yours', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'now', 'd', 'll', 'm', 'o', 're',
                 've', 'y', 'will', 'just', 's', 't', 'to', 'from', 'up', 'down', 'in', 'the', 'of', 'for', 
                 'if', 'or', 'a', 'an']
    if word in stopwords:
        return True
    return False

#### Functons to create Glove vocab and embedding

In [6]:
def get_glove_vocab(path):
    vocab = {}
    with open(path, 'r', encoding = "utf8") as f:
        for line in f:
            values = line.strip().split(' ')
            word = values[0]
            if word in dataVocab:
                vector = np.asarray(values[1:], "float32")
                vocab[word] = vector
    return vocab
def create_embedding(data):
    embaddedDic = dicList(dataLabelEncod.values())
    for Label in dataLabel:
        dataCount = 0
        for review in data[Label]:
            emb = np.zeros((0,300))
            wordcount = 0
            for word in review.split():
                if not checkStopwords(word):
                    emb = np.concatenate((emb, gloveVocab.get(word, np.zeros((300)))), axis = None)
                    wordcount += 1
            emb = emb.reshape(wordcount,300)
            embaddedDic[dataLabelEncod[Label]].append(emb)
    return embaddedDic

#### Functions to create the input

In [7]:
def randomShuffle(X, Y):
    perm = np.random.permutation(len(X))
    return(X[perm], Y[perm])
# Function to create the input
def getInputVec(data):
    X = []
    Y = []
    for Label in dataLabelEncod.values():
        for emb in data[Label]:
            X.append(emb)
            Y.append(Label)
    X, Y = np.array(X), np.array(Y)
    X, Y = randomShuffle(X, Y)
    return X, Y

#### RNN function

In [8]:
class RNN:
    """
     Initialization
    """
    def __init__(self, input_size,  hidden_size ,output_size, learning_rate= 0.001):
        # Initialize Weights and biases
        ## Weights corresponding to input and hidden layer
        self.Wxh = randn(hidden_size, input_size) / 1000
        ## Weights corresponding to two hidden layers
        self.Whh = randn(hidden_size, hidden_size) / 1000
        ## Weights corresponding to ouput and hidden layer
        self.Why = randn(output_size, hidden_size) / 1000
        ## Biases corresponding to hidden layer
        self.bh = np.zeros((hidden_size, 1))
        ## Biases corresponding to output layer
        self.by = np.zeros((output_size, 1))
        ## Learning rate
        self.learning_rate = learning_rate
    """"
     Activation Functions
    """
    ## Tanh activation
    def tanh(self,x):
        return np.tanh(x)
    ## Softmax Activation
    def softmax(self,x):
        return np.exp(x) / sum(np.exp(x))
    """
     Gradient descent
    """
    def update_param(self, param_grad_pair):
        x =param_grad_pair[0]
        d_x = param_grad_pair[1]
        x -= self.learning_rate * d_x
        return x
    """
     Expleding gradient handler
    """
    def not_explode_grad(self,x):
        x = np.clip(x, -1, 1)
        return x
    """
     Loss function calculator
    """
    def calculate_loss(self, probs, target):
        loss = - np.log(probs[target])
        return loss
    """
     Forward propagation
    """
    def forward_prop(self, inputs, target):
        ## Inputs and targets
        self.inputs = inputs
        self.target = int(target)
        ## Store h values in different time steps (Memory of RNN)
        self.h_values = {}
        ## Initialize the hidden node values 
        h = np.zeros((self.Whh.shape[0], 1))
        self.h_values[0] = h
        for i, x in enumerate(inputs):
            ## Previous hidden layer values is being used here
            Z = self.Wxh @ x + self.Whh @ h + self.bh
            ## Tanh activation on hidden layer
            h = self.tanh(Z)
            ## Store the current h for next time step
            self.h_values[i + 1] = h
        ## Compute output in the final time step
        y = self.Why @ h + self.by
        ## Softmax for probabilities
        probs = self.softmax(y)
        self.probs = probs
        ## Calculate Loss
        loss = self.calculate_loss(probs, target)
        return(y, h, probs, loss)
    """
     Backpropagation
    """
    def BPTT(self):
        ## Gradient of loss w.r.t y
        d_y = self.probs
        d_y[self.target] -= 1
        # Initialize the gradients of loss w.r.t the paramters
        d_Whh = np.zeros(self.Whh.shape)
        d_Wxh = np.zeros(self.Wxh.shape)
        d_bh = np.zeros(self.bh.shape)
        ## No of inputs for a input data
        N = len(self.inputs)
        # Following gradient depends only on ouput and last time step hidden values
        d_Why = d_y @ self.h_values[N].T
        d_by = d_y
        # Gradient of loss w.r.t last time step h values
        d_h = self.Why.T @ d_y
        ## Backpropagate through time.
        for t in reversed(range(N-1,-1,-1)):
            # Derivative of tanh(x) w.r.t x is (1- tanh(x))^2
            ## Need the following value in computation of gradients
            temp = ((1 - self.h_values[t + 1] ** 2) * d_h)
            # Gradient of loss w.r.t bh
            d_bh += temp
            # Gradient of loss w.r.t Whh
            d_Whh += temp @ self.h_values[t].T
            # Gradient of loss w.r.t Wxh
            d_Wxh += temp @ self.inputs[t].T
            # Gradient of loss w.r.t h
            d_h = self.Whh @ temp
        ## Get rid of exploding gradients.
        d_Wxh, d_Whh, d_Why, d_bh, d_by = list(map(self.not_explode_grad,[d_Wxh, d_Whh, d_Why, d_bh, d_by]))
        ## Update weights and biases using gradient descent.
        param_grad_pair = [(self.Whh,d_Whh),(self.Wxh,d_Wxh),(self.Why,d_Why),(self.bh,d_bh),(self.by,d_by)]
        self.Whh,self.Wxh,self.Why,self.bh,self.by = list(map(self.update_param,param_grad_pair))
    """    
     Fitting model
    """
    # Fitting with one epoch
    def fit(self, X, Y, BPTT = True):
        total_cost = 0
        correct_pred = 0
        ## No of total data points
        N = len(Y)
        for i in range(N):
            inputs = X[i]
            label = Y[i]
            inputShape = inputs.shape
            inputs = inputs.reshape(inputShape[0], inputShape[1], 1)
            out, h, probs, loss = model.forward_prop(inputs, label)
            total_cost += loss
            correct_pred += int(np.argmax(probs) == label)
            if BPTT:
                model.BPTT()
        avg_loss = total_cost / N
        accuracy = correct_pred / N
        return(avg_loss, accuracy)
    # Defining epoch wise learning
    def train(self, X, Y, epoch = 5):
        for e in range(epoch):
            loss, accuracy = model.fit(X, Y)
            print('Epoch {}'.format(e + 1))
            print('Loss: {} and Accuracy: {}'.format(loss[0], accuracy))
    """
     Prediction
    """
    def predict(self, X):
        prediction = []
        N = len(X)
        for i in range(N):
            ## Inputs
            inputs = X[i]
            inputShape = inputs.shape
            inputs = inputs.reshape(inputShape[0], inputShape[1], 1)
            ## Store h values in different time steps (Memory of RNN)
            self.h_values = {}
            ## Initialize the hidden node values 
            h = np.zeros((self.Whh.shape[0], 1))
            self.h_values[0] = h
            for i, x in enumerate(inputs):
                ## Previous hidden layer values is being used here
                Z = self.Wxh @ x + self.Whh @ h + self.bh
                ## Tanh activation on hidden layer
                h = self.tanh(Z)
                ## Store the current h for next time step
                self.h_values[i + 1] = h
            ## Compute output in the final time step
            y = self.Why @ h + self.by
            ## Softmax for probabilities
            probs = self.softmax(y)
            pred = int(np.argmax(probs))
            prediction.append(pred)
        return prediction

# Main

In [9]:
TrainPath = "D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/aclImdb/train"
TestPath = "D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/aclImdb/test"
glovePath = "D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/glove.840B.300d/glove.840B.300d.txt"

dataType = ["train", "test"]
dataLabel = ["pos", "neg"]
dataLabelEncod = {"pos" : 0, "neg" : 1}

Loading train and test data

In [10]:
trainData = loadData(TrainPath)
testData = loadData(TestPath)

Processing the data

In [11]:
trainData = performProcessing(trainData)
testData = performProcessing(testData)

Getting vocabulary

In [12]:
dataVocab = getUniquewords(trainData)
dataVocab.update(testData)

Creating glove embedding

In [14]:
gloveVocab = get_glove_vocab(glovePath)      # Loading pretrained glove model
embaddedTrain = create_embedding(trainData)  # Building embedding for train data
embaddedTest = create_embedding(testData)    # Building embedding for test data

Creating input vectors

In [15]:
trainX, trainY = getInputVec(embaddedTrain)
testX, testY = getInputVec(embaddedTest)

In [None]:
"""
  Writting embedding vectors in a file
"""
np.save("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/trainX", trainX) 
np.save("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/trainY", trainY)
np.save("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/testX", testX)
np.save("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/testY", testY)

Loading data for training

In [None]:
"""
  Loading embedding vectors from file
"""
trainX = np.load("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/trainX.npy", allow_pickle = True)
trainY = np.load("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/trainY.npy")
testX = np.load("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/trainY.npy", allow_pickle = True)
testY = np.load("D:/Academics/CMI/MSC_4th_sem/NLP/My_work/Assignment_3/Data/OutputData/testY.npy")

Training the RNN Model

In [31]:
input_size = 300
hidden_size = 25
output_size = 2
learning_rate = 0.001
model = RNN(input_size, hidden_size, output_size, learning_rate)

In [32]:
model.train(trainX, trainY, epoch = 5)

Epoch 1
Loss: 0.6933052304800627 and Accuracy: 0.49632
Epoch 2
Loss: 0.6904994498227014 and Accuracy: 0.53324
Epoch 3
Loss: 0.6801195955770556 and Accuracy: 0.561
Epoch 4
Loss: 0.6792281988009802 and Accuracy: 0.5612
Epoch 5
Loss: 0.6791135560802545 and Accuracy: 0.561


Testing with test data

In [41]:
predicted = model.predict(testX)

In [42]:
accuracy_score(predicted, testY)

0.56232

#### Exploring with other hyper parameter

In [51]:
input_size = 300
hidden_size = 5
output_size = 2
learning_rate = 0.001
model2 = RNN(input_size, hidden_size, output_size, learning_rate)

In [52]:
model2.train(trainX[:5000], trainY[:5000], epoch = 5)

Epoch 1
Loss: 0.6772098003265945 and Accuracy: 0.573
Epoch 2
Loss: 0.6771208034356248 and Accuracy: 0.5736
Epoch 3
Loss: 0.6770216645948988 and Accuracy: 0.5732
Epoch 4
Loss: 0.6769168445064423 and Accuracy: 0.5736
Epoch 5
Loss: 0.6768107202754022 and Accuracy: 0.574


In [53]:
predicted = model2.predict(testX)
accuracy_score(predicted, testY)

0.50136