# Vanilla RNN for char level seq prediction

In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
import matplotlib.pyplot as plt

In [2]:
class MyData(Dataset):
    
    def __init__(self, txt_file, root_dir, transform=None):
        #read from csv file:
        data = []
        with open(root_dir+txt_file, 'r') as file:
            data.append(str(file.read().replace('\n', '').replace('\'', '').replace('"','')))
        #Preprocess data
        data = list(data[0])
        #make a dictionary of words
        full_set = set([])
        full_set = full_set.union(set(data)) 
        full_set = dict(enumerate(full_set))
        self.full_set = {c: i for i, c in full_set.items()}
        data = [self.full_set[i] for i in data]
        self.data = np.array(data)
        self.root_dir = root_dir
        self.transform = transform
        
    #function to return length of data
    def  __len__(self):
        #we can take first 10 entries a s we will not be having last 10 features for them
        return len(self.data)-10
    
    #function to get data
    def __getitem__(self, idx):
        
        idx_label = idx+10
        #for letter i, features are last 10 letter
        idx_features = [idx_label-i for i in range (1,11)]
       
        #we assign next index as label for current sequence
        sample = {'features' : [self.data[idx_features[i]] for i in range(10)],
                     'label' : self.data[idx_label] 
                 }
        
        #apply transformation
        if(self.transform):
            sample = self.transform(sample)
        
        return sample


In [3]:
#custom ToTensor class
class ToTensor(object):
    def __call__(self, sample):
        
        feature = sample['features']
        #one hot encoding of features
        arr = np.zeros((len(feature),72))
        for i in range(len(feature)):
            arr[i,feature[i]] = 1
            
        label = sample['label']
        
        return [ torch.tensor(arr), torch.tensor(label)]

In [4]:
#Obtain data
data = MyData(txt_file='text_data.txt', 
              root_dir='./../0. Data/',
              transform = T.Compose([
                ToTensor()
                ]))

#we do not split data to train and test as it is a generative model with no fix target for any sequence
data_loader = DataLoader(data,
                        batch_size=32,
                        shuffle=True)

In [5]:
next(iter(data_loader))[1]

tensor([ 2,  7,  2,  2, 46, 13, 29, 27,  2,  8, 17,  6, 26, 55, 27, 18, 18, 21,
         2, 29, 18, 55, 17,  7, 44,  8, 18, 64, 27, 46, 55, 55])

In [6]:
#Check for GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [7]:
#Create Vanilla RNN class
class Vanilla_RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(Vanilla_RNN, self).__init__()
        #save variables to use in other functions
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        #define RNN layer
        self.rnn = nn.RNN(input_size=input_size, 
                          hidden_size=hidden_size, 
                          num_layers=num_layers, 
                          batch_first=True)
        
        #convert output to desired output dimension(readout layer)
        self.fc = nn.Linear(in_features=hidden_size, out_features=output_size)
        
    def forward(self, x):
        #call RNN layer
        out, _ = self.rnn(x)
        
        #We will use only last output
        out = self.fc(out[:,-1,:].view(x.shape[0],self.hidden_size))
        return out
        


In [8]:
#define training function
def train(Model, max_epoch):
    for epoch in range(max_epoch):
        Train_Loss = []
        Val_Loss =[]
        loader = data_loader
        
        #Train on training data
        for i, sample in enumerate(loader):
            
            #set model to train mode
            Model.train()
            #set gradients to zero
            optimizer.zero_grad()
            #obtain output
            output = Model(sample[0].float().to(device).view(len(sample[0]),10,72)).to(device)
            #compute loss
            loss = loss_function(output, sample[1].to(device))
            #compute gradients
            loss.backward()
            #optimize weights
            optimizer.step()
            #record train loss
            Train_Loss.append(loss.item())
        
        
        #print losses in every epoch
        print('epoch = ', epoch,'; Train_loss  ',np.round(np.mean(Train_Loss),4))

In [9]:
#function to test model
def test(Model, seq, l):
    
    # save sequence to output
    out = [i for i in seq]
    
    #define dictionaries to convert
    char2int = data.full_set
    int2char = {i : c for c , i in char2int.items()}
    
    seq = [char2int[i] for i in seq]
    features = torch.zeros(10,72)
    #one hot encoding
    for i in range(10):
        features[i,seq[i]] = 1
    
    #we run this for l iteration, in each iteration, we get letter at position l+i
    with torch.no_grad():
        for i in range(l):
            #set model to evaluation mode
            Model.eval()
            output = Model(features.to(device).view(1,10,72))
            #calculate output by argmax
            output = torch.argmax(output, 1)
            #append  word to output
            out.append(int2char[output.item()])
            features = features[1:]
            temp = torch.zeros(1,72)
            temp[0,output] = 1
            features = torch.cat((features,temp), dim=0)
            
        print(''.join([i for i in out]))

In [10]:
#define loss function
loss_function = nn.CrossEntropyLoss()

In [11]:
#Create Model
Model = Vanilla_RNN(input_size=72,
                    hidden_size=64,
                    num_layers=3,
                    output_size=72).to(device)
#Define optimizer
optimizer = optim.Adam(Model.parameters())
#train model with validation
train(Model, max_epoch=100)

epoch =  0 ; Train_loss   3.0453
epoch =  1 ; Train_loss   2.8284
epoch =  2 ; Train_loss   2.697
epoch =  3 ; Train_loss   2.65
epoch =  4 ; Train_loss   2.6303
epoch =  5 ; Train_loss   2.6797
epoch =  6 ; Train_loss   2.6469
epoch =  7 ; Train_loss   2.5785
epoch =  8 ; Train_loss   2.5419
epoch =  9 ; Train_loss   2.5352
epoch =  10 ; Train_loss   2.5321
epoch =  11 ; Train_loss   2.5288
epoch =  12 ; Train_loss   2.4495
epoch =  13 ; Train_loss   2.3938
epoch =  14 ; Train_loss   2.4209
epoch =  15 ; Train_loss   2.3972
epoch =  16 ; Train_loss   2.336
epoch =  17 ; Train_loss   2.3233
epoch =  18 ; Train_loss   2.2894
epoch =  19 ; Train_loss   2.2605
epoch =  20 ; Train_loss   2.235
epoch =  21 ; Train_loss   2.2188
epoch =  22 ; Train_loss   2.2163
epoch =  23 ; Train_loss   2.197
epoch =  24 ; Train_loss   2.2026
epoch =  25 ; Train_loss   2.1845
epoch =  26 ; Train_loss   2.163
epoch =  27 ; Train_loss   2.1479
epoch =  28 ; Train_loss   2.164
epoch =  29 ; Train_loss   2.159

In [12]:
seq1 = ['You have s']
seq2 = ['i will be ']

In [14]:
#Let's test model now
test(Model,list(seq1[0]),100)

You have s rnt revcawedht  ehnar ehscre iec eee vws atr vteeasnaete   n  nnhvAWcost  eeneer hstrd n ete heotot


In [15]:
test(Model,list(seq2[0]),100)

i will be ntenyislvt  d  n dehAcaWtot  e hneh hAcvte  atenkehvstned   ee  d oAAs ascotn nest ew ottr hw at eae


Don't be surprized by results, we have implemented a very basic model and language modelling is a very dificult task. It requires huge architectures to generate something meaningful. So, we move to NLP where we will learn how to represent natural language

In [16]:
#Finally, let's save our model
torch.save(Model.state_dict(), './saved_models/vanilla_RNN_char.pth')

In [None]:
#To Retrieve
Modelx = Vanilla_RNN(input_size=72,
                    hidden_size=64,
                    num_layers=3,
                    output_size=72).to(device)
Modelx.load_state_dict(torch.load('./saved_models/vanilla_RNN_char.pth'))