In [1]:
import os
import sys
import random
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchinfo import summary

# to get deterministic output
torch.manual_seed(123)

sys.path.append(os.path.abspath(".."))

### Loading the Dataset

In [2]:
df = pd.read_csv('../datasets/question_answer_dataset.csv')
df

Unnamed: 0,question,answer
0,What is the capital of France?,Paris
1,What is the capital of Germany?,Berlin
2,Who wrote 'To Kill a Mockingbird'?,Harper-Lee
3,What is the largest planet in our solar system?,Jupiter
4,What is the boiling point of water in Celsius?,100
...,...,...
85,Who directed the movie 'Titanic'?,JamesCameron
86,Which superhero is also known as the Dark Knight?,Batman
87,What is the capital of Brazil?,Brasilia
88,Which fruit is known as the king of fruits?,Mango


### Tokenize the dataset

In [3]:
def tokenize(text):
  text = text.lower()
  text = text.replace('?','')
  text = text.replace("'","")
  return text.split()

tokenize('What is the capital of France?')

['what', 'is', 'the', 'capital', 'of', 'france']

### Build Vocabulary

In [4]:
vocab = {'<UNK>': 0}

def build_vocab(row):
  tokenized_question = tokenize(row['question'])
  tokenized_answer = tokenize(row['answer'])
  merged_tokens = tokenized_question + tokenized_answer

  for token in merged_tokens:
    if token not in vocab:
      vocab[token] = len(vocab)

In [5]:
df.apply(build_vocab, axis=1)
print("vocab size:", len(vocab))

vocab size: 324


In [6]:
def text_to_indices(text, vocab):
  indexed_text = []
  for token in tokenize(text):
    if token in vocab:
      indexed_text.append(vocab[token])
    else:
      indexed_text.append(vocab['<UNK>'])
  return indexed_text

In [7]:
text_to_indices("What is campusx", vocab)

[1, 2, 0]

### Defining the DataLoader

In [8]:
class MyDataset(Dataset):
  
  def __init__(self, df, vocab):
    self.df = df
    self.vocab = vocab

  def __len__(self):
    return self.df.shape[0]

  def __getitem__(self, index):
    numerical_question = text_to_indices(self.df.iloc[index]['question'], self.vocab)
    numerical_answer = text_to_indices(self.df.iloc[index]['answer'], self.vocab)
    return torch.tensor(numerical_question), torch.tensor(numerical_answer)

In [9]:
dataset = MyDataset(df, vocab)
train_dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

### Test the DataLoder

In [10]:
for batch_idx, (batch_x, batch_y) in enumerate(train_dataloader):
  print(f"batch: {batch_idx}, {batch_x.shape}, {batch_y.shape}")
  print("\t", batch_x, batch_y)
  if batch_idx >= 4:
    break

batch: 0, torch.Size([1, 7]), torch.Size([1, 1])
	 tensor([[78, 79, 80, 81, 82, 83, 84]]) tensor([[85]])
batch: 1, torch.Size([1, 7]), torch.Size([1, 1])
	 tensor([[ 42, 255,   2, 256,  83, 257, 258]]) tensor([[259]])
batch: 2, torch.Size([1, 7]), torch.Size([1, 1])
	 tensor([[ 1,  2,  3, 33, 34,  5, 35]]) tensor([[36]])
batch: 3, torch.Size([1, 5]), torch.Size([1, 1])
	 tensor([[10, 29,  3, 30, 31]]) tensor([[32]])
batch: 4, torch.Size([1, 7]), torch.Size([1, 1])
	 tensor([[ 42, 167,   2,   3,  17, 168, 169]]) tensor([[170]])


### Design the Model

In [11]:
class SimpleRNN(nn.Module):

  def __init__(self, vocab_size):
    super().__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim=50)
    self.rnn = nn.RNN(50, 64, batch_first=True)
    self.fc = nn.Linear(64, vocab_size)

  def forward(self, question):
    embedded = self.embedding(question)
    internal_states, hidden_state = self.rnn(embedded)
    logits = self.fc(hidden_state.squeeze(0))
    return logits

In [12]:
epochs = 20
learning_rate = 0.001

model = SimpleRNN(len(vocab))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
model_stats = summary(model, input_data=batch_x, verbose=2, col_width=16,
                      col_names=["kernel_size", "input_size", "output_size", "num_params"])

Layer (type:depth-idx)                   Kernel Shape     Input Shape      Output Shape     Param #
SimpleRNN                                --               [1, 7]           [1, 324]         --
├─Embedding: 1-1                         --               [1, 7]           [1, 7, 50]       16,200
│    └─weight                            [50, 324]                                          └─16,200
├─RNN: 1-2                               --               [1, 7, 50]       [1, 7, 64]       7,424
│    └─weight_ih_l0                      [64, 50]                                           ├─3,200
│    └─weight_hh_l0                      [64, 64]                                           ├─4,096
│    └─bias_ih_l0                        [64]                                               ├─64
│    └─bias_hh_l0                        [64]                                               └─64
├─Linear: 1-3                            --               [1, 64]          [1, 324]         21,060
│    └─weight 

### Analyze The Model Layers

In [13]:
emb_layer_output = model.embedding(batch_x)
print("Input sequence shape\t :", batch_x.shape)
print("Embedding output shape\t :", emb_layer_output.shape)

internal_states, hidden_state = model.rnn(emb_layer_output)
print("\nRNN internal states shape:", internal_states.shape)
print("RNN final state shape\t :", hidden_state.shape)

fc_layer_output = model.fc(hidden_state.squeeze(0))
print("\nFC layer output shape\t :", fc_layer_output.shape)

Input sequence shape	 : torch.Size([1, 7])
Embedding output shape	 : torch.Size([1, 7, 50])

RNN internal states shape: torch.Size([1, 7, 64])
RNN final state shape	 : torch.Size([1, 1, 64])

FC layer output shape	 : torch.Size([1, 324])


### Train the Model

In [14]:
for epoch in range(epochs):
  epoch_loss = 0
  model = model.train()  
  for question, answer in train_dataloader:
    # forward pass
    logits = model(question)

    # loss -> output shape (1,324) - (1)
    loss = criterion(logits, answer[0])

    # backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    epoch_loss += loss.item()

  print(f'Epoch: {epoch+1:03d}/{epochs:03d} | epoch_loss: {epoch_loss:.2f}')

Epoch: 001/020 | epoch_loss: 531.15
Epoch: 002/020 | epoch_loss: 459.17
Epoch: 003/020 | epoch_loss: 380.93
Epoch: 004/020 | epoch_loss: 322.42
Epoch: 005/020 | epoch_loss: 272.64
Epoch: 006/020 | epoch_loss: 225.88
Epoch: 007/020 | epoch_loss: 182.12
Epoch: 008/020 | epoch_loss: 143.38
Epoch: 009/020 | epoch_loss: 110.73
Epoch: 010/020 | epoch_loss: 85.50
Epoch: 011/020 | epoch_loss: 66.07
Epoch: 012/020 | epoch_loss: 51.40
Epoch: 013/020 | epoch_loss: 40.57
Epoch: 014/020 | epoch_loss: 32.63
Epoch: 015/020 | epoch_loss: 26.63
Epoch: 016/020 | epoch_loss: 22.01
Epoch: 017/020 | epoch_loss: 18.40
Epoch: 018/020 | epoch_loss: 15.65
Epoch: 019/020 | epoch_loss: 13.40
Epoch: 020/020 | epoch_loss: 11.64


### Make Prediction

In [15]:
def predict(model, question, threshold=0.5):
  numerical_question = text_to_indices(question, vocab)
  question_tensor = torch.tensor(numerical_question).unsqueeze(0)
  logits = model(question_tensor)

  # convert logits to probs
  probs = nn.functional.softmax(logits, dim=1)
  prob, index = torch.max(probs, dim=1)
  if prob < threshold:
    return "I don't know", prob.item()

  return list(vocab.keys())[index], prob.item()

In [16]:
prediction, confidence = predict(model, "What is the largest planet in our solar system?")
print(f"Confidence: {confidence:.2f}")
print(f"Answer: {prediction}")

Confidence: 0.86
Answer: jupiter


### Evaluate the Model

In [17]:
def calculate_accuracy(model):
    total = 0
    correct = 0
    model.eval()
    with torch.inference_mode():
        for _, datarow in df.iterrows():
            prediction, prob = predict(model, datarow['question'])
            correct += (prediction.casefold() == datarow['answer'].casefold())
            total += 1
    accuracy = correct / total
    return accuracy

train_acc = calculate_accuracy(model)
print(f"Train Accuracy: {train_acc:.2f}")

Train Accuracy: 1.00
