In [None]:
import numpy as np
import torch.optim as optim
import os
import matplotlib.pyplot as plt
%matplotlib inline
import random

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device available now:', device)

In [None]:
seqlen = random.randint(100,110)
rep = nn.functional.one_hot(torch.arange(0,8), num_classes=-1)

In [None]:
input_no = 110
inputsize = 8
hiddensize1 = 2
hiddensize2 = 4
hiddensize3 = 8
outputsize = 8
numlayers = 1
learning_rate = 0.1
batchsize=1
num = 3000
nts = 3000

In [None]:
inputchar = 'EBabcdXY'
outputchar = 'QRSUVABC'
inputdict ={}
outputdict = {}
for i in range(8):
  inputdict[inputchar[i]] = i
  outputdict[outputchar[i]] = i
labeldict = {"XXX":"Q","XXY":"R","XYX":"S","XYY":"U","YXX":"V","YXY":"A","YYX":"B","YYY":"C"}

In [None]:
def generate_inputs(num):
  seqs = []
  strings = []
  lens = torch.zeros(num)
  for i in range(num):
    seqlen = random.randint(100,110)
    lens[i] = seqlen
    string = []
    string.append('E')
    ind1 = random.randint(10,20)
    ind2 = random.randint(33,43)
    ind3 = random.randint(66,76)
    for j in range(seqlen-2):
      string.append(random.choice(['a','b','c','d']))
    string.append('B')
    string[ind1] = random.choice(['X','Y'])
    string[ind2] = random.choice(['X','Y'])
    string[ind3] = random.choice(['X','Y'])
    seqinput = torch.zeros((seqlen,8))
    for j in range(seqlen):
      seqinput[j] = rep[inputdict[string[j]]]
    seqs.append(seqinput)
    strings.append(string)

  
  return seqs,strings,lens

In [None]:
def generate_output(inputstring):
  op = [] # 100 op - 'Q','E'......    # seqs - [   ] , [  ] 
  seqs = torch.zeros((len(inputstring),outputsize))
  num = len(inputstring)
  for i in range(num):
    ind = ""
    for j in range(len(inputstring[i])):
      if(inputstring[i][j]=='X' or inputstring[i][j]=='Y'):
        ind += inputstring[i][j]
    op.append(labeldict[ind])
    onehotrep = rep[outputdict[op[i]]]
    seqs[i] = onehotrep
  return op,seqs

In [None]:
inputs,inputstring,lens = generate_inputs(num)
outputstring,outputs = generate_output(inputstring)
testinputs,teststrings,testlens = generate_inputs(nts)
testoutputstrings,testoutputs = generate_output(teststrings)

In [None]:
class RNN(nn.Module):
  def __init__(self,input_size,hiddensize1,hiddensize2,hiddensize3,num_layers,output_size):
    super(RNN, self).__init__()
    self.num_layers = num_layers
    self.hiddensize1 = hiddensize1
    self.hiddensize2 = hiddensize2
    self.hiddensize3 = hiddensize3
    self.rnn1 = nn.RNN(inputsize, hiddensize1, num_layers,batch_first=True)
    self.rnn2 = nn.RNN(hiddensize1, hiddensize2, num_layers,batch_first=True)
    self.rnn3 = nn.RNN(hiddensize2, hiddensize3, num_layers,batch_first=True)
    self.fc = nn.Linear(hiddensize3,outputsize) 
  def forward(self, x):
        h0 = ((20-10)*torch.rand(self.num_layers, x.size(0), self.hiddensize1)+10).to(device)
        c0 = ((20-10)*torch.rand(self.num_layers, x.size(0), self.hiddensize1)+10).to(device)
        self.rnn1.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn1.bias_ih_l0.data.fill_(-2)
        out1, _ = self.rnn1(x, h0)   
        h1 = ((20-10)*torch.rand(self.num_layers, out1.size(0), self.hiddensize2)+10).to(device)
        c1 = ((20-10)*torch.rand(self.num_layers, out1.size(0), self.hiddensize2)+10).to(device)
        self.rnn2.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn2.bias_ih_l0.data.fill_(-4)
        out2, _ = self.rnn2(out1,h1)
        h2 = ((20-10)*torch.rand(self.num_layers, out2.size(0), self.hiddensize3)+10).to(device)
        c2 = ((20-10)*torch.rand(self.num_layers, out2.size(0), self.hiddensize3)+10).to(device)
        self.rnn3.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn3.bias_ih_l0.data.fill_(-6)
        out3, _ = self.rnn3(out2,h2)                           
        out3 = out3[:, -1, :]
        fout = self.fc(out3)
        return fout

In [None]:
class LSTM(nn.Module):
  def __init__(self,input_size,hiddensize1,hiddensize2,hiddensize3,num_layers,output_size):
    super(LSTM, self).__init__()
    self.num_layers = num_layers
    self.hiddensize1 = hiddensize1
    self.hiddensize2 = hiddensize2
    self.hiddensize3 = hiddensize3
    self.lstm1 = nn.LSTM(inputsize, hiddensize1, num_layers,batch_first=True)
    self.lstm2 = nn.LSTM(hiddensize1, hiddensize2, num_layers,batch_first=True)
    self.lstm3 = nn.LSTM(hiddensize2, hiddensize3, num_layers,batch_first=True)
    self.fc = nn.Linear(hiddensize3,outputsize) 
  def forward(self, x):
        h0 = ((20-10)*torch.rand(self.num_layers, x.size(0), self.hiddensize1)+10).to(device)
        c0 = ((20-10)*torch.rand(self.num_layers, x.size(0), self.hiddensize1)+10).to(device)
        self.lstm1.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.lstm1.bias_ih_l0.data.fill_(-2)
        out1, _ = self.lstm1(x, (h0,c0))   
        h1 = ((20-10)*torch.rand(self.num_layers, out1.size(0), self.hiddensize2)+10).to(device)
        c1 = ((20-10)*torch.rand(self.num_layers, out1.size(0), self.hiddensize2)+10).to(device)
        self.lstm2.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.lstm2.bias_ih_l0.data.fill_(-4)
        out2, _ = self.lstm2(out1,(h1,c1))
        h2 = ((20-10)*torch.rand(self.num_layers, out2.size(0), self.hiddensize3)+10).to(device)
        c2 = ((20-10)*torch.rand(self.num_layers, out2.size(0), self.hiddensize3)+10).to(device)
        self.lstm3.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.lstm3.bias_ih_l0.data.fill_(-6)
        out3, _ = self.lstm3(out2,(h2,c2))                           
        out3 = out3[:, -1, :]
        fout = self.fc(out3)
        return fout

In [None]:
class Attention_RNN(nn.Module):
  def __init__(self,input_size,hiddensize1,hiddensize2,hiddensize3,num_layers,output_size):
    super(Attention_RNN, self).__init__()
    self.num_layers = num_layers
    self.hiddensize1 = hiddensize1
    self.hiddensize2 = hiddensize2
    self.hiddensize3 = hiddensize3
    self.rnn1 = nn.RNN(inputsize, hiddensize1, num_layers,batch_first=True)
    self.rnn2 = nn.RNN(hiddensize1, hiddensize2, num_layers,batch_first=True)
    self.rnn3 = nn.RNN(hiddensize2, hiddensize3, num_layers,batch_first=True)
    self.fc = nn.Linear(hiddensize3,outputsize) 
  def forward(self, x):
        h0 = ((20-10)*torch.rand(self.num_layers, x.size(0), self.hiddensize1)+10).to(device)
        self.rnn1.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn1.bias_ih_l0.data.fill_(-2)
        out1, _ = self.rnn1(x, h0)   
        h1 = ((20-10)*torch.rand(self.num_layers, out1.size(0), self.hiddensize2)+10).to(device)
        self.rnn2.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn2.bias_ih_l0.data.fill_(-4)
        out2, _ = self.rnn2(out1,h1)
        h2 = ((20-10)*torch.rand(self.num_layers, out2.size(0), self.hiddensize3)+10).to(device)
        self.rnn3.weight_ih_l0.data.uniform_(-0.1,0.1)
        self.rnn3.bias_ih_l0.data.fill_(-6)
        out3, final_hidden = self.rnn3(out2,h2)
        h3  = final_hidden.squeeze(0)
        weights_attention = torch.bmm(out3,h3.unsqueeze(2)).squeeze(2)
        soft_weights = F.softmax(weights_attention, 1)
        output = torch.bmm(out3.transpose(1, 2), soft_weights.unsqueeze(2)).squeeze(2)                       
        fout = self.fc(output)
        return fout

In [None]:
rnn_model = RNN(inputsize, hiddensize1,hiddensize2,hiddensize3, numlayers, outputsize).to(device)
lstm_model = LSTM(inputsize, hiddensize1,hiddensize2,hiddensize3, numlayers, outputsize).to(device)
atten_model = Attention_RNN(inputsize, hiddensize1,hiddensize2,hiddensize3, numlayers, outputsize).to(device)

In [None]:
def train_model(num,inputs,outputs,model):
  count=0
  total=0
  losses=[]
  loss_fn = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
  for s in range(num):
    total+=1
    loss = 0
    epochs = 15
    for epoch in range(epochs):
        inputs[s] = torch.reshape(inputs[s],(batchsize,int(lens[s]),inputsize))
        outputs[s] = torch.reshape(outputs[s],(batchsize,1,outputsize))
        inputs[s] = inputs[s].to(device,dtype=torch.float)
        label = outputs[s]
        label = torch.reshape(label,(batchsize,8))
        label = label.to(device,dtype=torch.float)
        model_output = model(inputs[s])
        loss = loss_fn(model_output, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    losses.append(loss.item())
    print(loss.item())
    if loss.item()<0.3:
      count+=1
    else:
      count=0
    if count>=2000 and loss.item()<0.1:
      break
  print("No of input sequences to attain stopping condition is",total)
  return losses

In [None]:
rnn_losses = train_model(num,inputs,outputs,rnn_model)

In [None]:
plt.plot(rnn_losses)
plt.xlabel('epochs')
plt.ylabel('RNN Loss')
plt.title('RNN')

In [None]:
lstm_losses = train_model(num,inputs,outputs,lstm_model)

In [None]:
plt.plot(lstm_losses)
plt.xlabel('epochs')
plt.ylabel('LSTM Loss')
plt.title('LSTM')

In [None]:
attn_losses = train_model(num,inputs,outputs,atten_model)

In [None]:
plt.plot(attn_losses)
plt.xlabel('epochs')
plt.ylabel('Attention Loss')
plt.title('RNN Attention')

In [None]:
def test_model(nts,testinputs,testoutputs,model):
  loss_fn = nn.CrossEntropyLoss()
  count=0
  labels = torch.zeros(nts)
  targets = torch.zeros(nts)
  num_correct=0
  for testseq in range(nts):
      testinputs[testseq] = torch.reshape(testinputs[testseq],(batchsize,int(testlens[testseq]),inputsize))
      testoutputs[testseq] = torch.reshape(testoutputs[testseq],(batchsize,1,outputsize))
      testinputs[testseq] = testinputs[testseq].to(device,dtype=torch.float)
      testlabel = testoutputs[testseq]
      testlabel = torch.reshape(testlabel,(batchsize,8))
      testlabel = testlabel.to(device,dtype=torch.float)
      testmodel_output = model(testinputs[testseq].to(device,dtype=torch.float))
      loss = loss_fn(testmodel_output, testlabel)
      labels[testseq] = testlabel.argmax(dim=1)
      targets[testseq] = testmodel_output.argmax(dim=1)
  num_correct += (labels == targets).sum().item()
  return (nts-num_correct)
  


In [None]:
wp=[]
for i in range(10):
  testinputs,teststrings,testlens = generate_inputs(nts)
  testoutputstrings,testoutputs = generate_output(teststrings)
  wp.append(test_model(nts,testinputs,testoutputs,rnn_model))

In [None]:
from statistics import mean
print("The average number of wrong predictions on the test set in RNN model are",mean(wp))

In [None]:
wp=[]
for i in range(10):
  testinputs,teststrings,testlens = generate_inputs(nts)
  testoutputstrings,testoutputs = generate_output(teststrings)
  wp.append(test_model(nts,testinputs,testoutputs,lstm_model))

In [None]:
from statistics import mean
print("The average number of wrong predictions on the test set in LSTM model are",mean(wp))

In [None]:
wp=[]
for i in range(10):
  testinputs,teststrings,testlens = generate_inputs(nts)
  testoutputstrings,testoutputs = generate_output(teststrings)
  wp.append(test_model(nts,testinputs,testoutputs,atten_model))
  print(wp[i])

In [None]:
from statistics import mean
print("The average number of wrong predictions on the test set in RNN Attention model are",mean(wp))

According to Research Paper "LONG SHORT-TERM MEMORY" the test accuracy is above 99% when the no of sequences on which the model was trained are 571,100. But due to computational resources I trained the model on 3000 sequences, so the test accuracy is around 18%.

- I kept the sequence length 8 instead of 100 just to experiment. The test accuracy is around 60% from which we can clearly infer that, to learn the sequences of length 100, much more data should be trained. 