RNN : a type of artificial neural network designed to process sequential data, where the order of elements is important, by maintaining a memory of past inputs through its hidden state.

![img](rnn.png)

In [None]:
import pandas as pd

df = pd.read_csv('100_Unique_QA_Dataset.csv')

df.head()

In [None]:
# tokenize
def tokenize(text):
  text = text.lower()
  text = text.replace('?','')
  text = text.replace("'","")
  return text.split()

In [None]:
tokenize('What is the capital of France?')

In [None]:
# vocab
vocab = {'<UNK>':0}

In [None]:
def build_vocab(row):
  tokenized_question = tokenize(row['question'])
  tokenized_answer = tokenize(row['answer'])

  merged_tokens = tokenized_question + tokenized_answer

  for token in merged_tokens:

    if token not in vocab:
      vocab[token] = len(vocab)


In [None]:
df.apply(build_vocab, axis=1)

In [None]:
len(vocab)

In [None]:
# Following is a function to convert text to indices based on the vocabulary created above.
# It replaces unknown tokens with the index for '<UNK>'.
def text_to_indices(text, vocab):

  indexed_text = []

  for token in tokenize(text):

    if token in vocab:
      indexed_text.append(vocab[token])
    else:
      indexed_text.append(vocab['<UNK>'])

  return indexed_text

In [None]:
text_to_indices("What is campusx", vocab)

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

In [None]:
class QADataset(Dataset):

  def __init__(self, df, vocab):
    self.df = df
    self.vocab = vocab

  def __len__(self):
    return self.df.shape[0]

  def __getitem__(self, index):

    numerical_question = text_to_indices(self.df.iloc[index]['question'], self.vocab)
    numerical_answer = text_to_indices(self.df.iloc[index]['answer'], self.vocab)

    return torch.tensor(numerical_question), torch.tensor(numerical_answer)

In [None]:
dataset = QADataset(df, vocab)

In [None]:
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

In [None]:
for question, answer in dataloader:
  print(question, answer[0])

In [None]:
import torch.nn as nn

RNN Architecture : 

![rnn_arch](rnn_arch.png)

In [None]:
# RNN Model

# Reason why we are not using sequential model : is because we need to define the embedding layer separately and then use it in the RNN layer.
# This is because the embedding layer needs to be initialized with the vocabulary size, which is not known until after the vocabulary is built.

class SimpleRNN(nn.Module):

  def __init__(self, vocab_size):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim=50)  # Embedding layer to convert indices to Embedding vectors of dim 50, Embeding is created by using the vocab size.
    self.rnn = nn.RNN(50, 64, batch_first=True) # RNN layer with input size 50 and hidden size 64
    self.fc = nn.Linear(64, vocab_size)   # Fully connected layer to map RNN output to vocabulary size (i.e 324)
      # this is because among all 324 uniques word , the wording having most probability will be selected as the answer.

  def forward(self, question):
    embedded_question = self.embedding(question) # Convert indices to embedding vectors
    hidden, final = self.rnn(embedded_question) # Pass through RNN layer
    output = self.fc(final.squeeze(0))

    return output

In [None]:
#debugging the model
x = nn.Embedding(324, embedding_dim=50)
y = nn.RNN(50, 64, batch_first=True)
z = nn.Linear(64, 324)

a = dataset[0][0].reshape(1,6)
print("shape of a:", a.shape)
b = x(a)
print("shape of b:", b.shape)
c, d = y(b)
print("shape of c:", c.shape)
print("shape of d:", d.shape)

e = z(d.squeeze(0))

print("shape of e:", e.shape)

In [None]:
learning_rate = 0.001
epochs = 20

In [None]:
model = SimpleRNN(len(vocab))

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# training loop

for epoch in range(epochs):

  total_loss = 0

  for question, answer in dataloader:

    optimizer.zero_grad()

    # forward pass
    output = model(question)

    # loss -> output shape (1,324) - (1)
    loss = criterion(output, answer[0])

    # gradients
    loss.backward()

    # update
    optimizer.step()

    total_loss = total_loss + loss.item()

  print(f"Epoch: {epoch+1}, Loss: {total_loss:4f}")

In [None]:
def predict(model, question, threshold=0.5):

  # convert question to numbers
  numerical_question = text_to_indices(question, vocab)

  # tensor
  question_tensor = torch.tensor(numerical_question).unsqueeze(0)

  # send to model
  output = model(question_tensor)

  # convert logits to probs
  probs = torch.nn.functional.softmax(output, dim=1)

  # find index of max prob
  value, index = torch.max(probs, dim=1)

  if value < threshold:
    print("I don't know")

  print(list(vocab.keys())[index])

In [None]:
predict(model, "What is the largest planet in our solar system?")

In [None]:
list(vocab.keys())[7]