# QA System Neural Network

This notebook implements a Sequence-to-Sequence (Seq2Seq) model for Question Answering using PyTorch.

In [1]:
%pip install torch pandas numpy scikit-learn -q

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import numpy as np
import collections
import random
import math
import time
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Note: you may need to restart the kernel to use updated packages.
Using device: cpu


In [2]:
# Load the dataset
df = pd.read_csv('../data/processed/qa_dataset.csv')

# Display the head of the dataframe
pd.set_option('display.max_colwidth', None)
print(df.head())

                                                                                                                                 context  \
0                                            Item: Christian Religious Rubber Bracelet with Card, 2 1/4 Inch | Cost: 14.99 | Rating: 4.3   
1                                            Item: Christian Religious Rubber Bracelet with Card, 2 1/4 Inch | Cost: 14.99 | Rating: 4.3   
2                                            Item: Christian Religious Rubber Bracelet with Card, 2 1/4 Inch | Cost: 14.99 | Rating: 4.3   
3                                            Item: Christian Religious Rubber Bracelet with Card, 2 1/4 Inch | Cost: 14.99 | Rating: 4.3   
4  Item: 2 Pcs Mickey Ears, Minnie Costume Ears Headband for Children Mom Baby Boys Girls Women Party (RED 2) | Cost: 8.79 | Rating: 4.5   

                    question  \
0      What's the price tag?   
1        What is the rating?   
2       Name of the product?   
3  Is it a top selling item?   

In [3]:
class Tokenizer:
    def __init__(self):
        # Special tokens
        self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3, "<SEP>": 4}
        self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>", 4: "<SEP>"}
        self.vocab_size = 5
        
    def fit_on_text(self, text_list):
        # Build vocabulary from list of texts
        all_words = []
        for text in text_list:
            # Simple preprocessing: lowercase and split
            words = str(text).lower().split()
            all_words.extend(words)
            
        counter = collections.Counter(all_words)
        
        # Add words to vocabulary
        for word, count in counter.items():
            if word not in self.word2idx:
                self.word2idx[word] = self.vocab_size
                self.idx2word[self.vocab_size] = word
                self.vocab_size += 1
                
    def encode(self, text):
        # Convert text to list of indices
        words = str(text).lower().split()
        return [self.word2idx.get(w, self.word2idx["<UNK>"]) for w in words]
    
    def decode(self, indices):
        # Convert list of indices back to text
        return ' '.join([self.idx2word.get(idx, "<UNK>") for idx in indices])

# Initialize and train tokenizer
tokenizer = Tokenizer()
# Use lowercase column names to match the CSV file
all_text = df['context'].tolist() + df['question'].tolist() + df['answer'].tolist()
tokenizer.fit_on_text(all_text)
print(f"Vocabulary Size: {tokenizer.vocab_size}")

Vocabulary Size: 169717


In [4]:
class QADataset(Dataset):
    def __init__(self, df, tokenizer):
        self.df = df.reset_index(drop=True)
        self.tokenizer = tokenizer
        self.sep_idx = self.tokenizer.word2idx["<SEP>"]
        self.sos_idx = self.tokenizer.word2idx["<SOS>"]
        self.eos_idx = self.tokenizer.word2idx["<EOS>"]
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        # Use lowercase column names
        context = row['context']
        question = row['question']
        answer = row['answer']
        
        # Encode inputs
        context_idx = self.tokenizer.encode(context)
        question_idx = self.tokenizer.encode(question)
        
        # Input Tensor: Context + <SEP> + Question
        input_indices = context_idx + [self.sep_idx] + question_idx
        
        # Target Tensor: <SOS> + Answer + <EOS>
        answer_idx = self.tokenizer.encode(answer)
        target_indices = [self.sos_idx] + answer_idx + [self.eos_idx]
        
        return torch.tensor(input_indices, dtype=torch.long), torch.tensor(target_indices, dtype=torch.long)

def collate_fn(batch):
    inputs, targets = zip(*batch)
    pad_idx = tokenizer.word2idx["<PAD>"]
    
    # Pad sequences to the same length within the batch
    inputs_padded = nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=pad_idx)
    targets_padded = nn.utils.rnn.pad_sequence(targets, batch_first=True, padding_value=pad_idx)
    
    return inputs_padded, targets_padded

In [5]:
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input):
        embedded = self.dropout(self.embedding(input))
        output, hidden = self.gru(embedded)
        return output, hidden

class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input, hidden):
        # input: [batch_size, 1]
        output = self.embedding(input)
        output = self.dropout(output)
        output = F.relu(output)
        
        output, hidden = self.gru(output, hidden)
        prediction = self.out(output)
        return prediction, hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src: [batch_size, src_len]
        # trg: [batch_size, trg_len]
        
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.out.out_features
        
        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        
        # Last hidden state of the encoder is the context
        _, hidden = self.encoder(src)
        
        # First input to the decoder is the <SOS> token
        input = trg[:, 0].unsqueeze(1)
        
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t, :] = output.squeeze(1)
            
            # Get the highest predicted token
            top1 = output.argmax(2) 
            
            # Teacher forcing: decide if next input is from target or prediction
            teacher_force = random.random() < teacher_forcing_ratio
            input = trg[:, t].unsqueeze(1) if teacher_force else top1
            
        return outputs

In [None]:
# Hyperparameters
INPUT_DIM = tokenizer.vocab_size
OUTPUT_DIM = tokenizer.vocab_size
HIDDEN_DIM = 256
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
LEARNING_RATE = 0.001
BATCH_SIZE = 32
EPOCHS = 10

# Model Initialization
enc = EncoderRNN(INPUT_DIM, HIDDEN_DIM, ENC_DROPOUT).to(device)
dec = DecoderRNN(HIDDEN_DIM, OUTPUT_DIM, DEC_DROPOUT).to(device)
model = Seq2Seq(enc, dec, device).to(device)

# Optimization
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.word2idx["<PAD>"])

# Data Preparation
train_df, val_df = train_test_split(df, test_size=0.15, random_state=42)

train_dataset = QADataset(train_df, tokenizer)
val_dataset = QADataset(val_df, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

print(f"Model Parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

Model Parameters: 131,301,877


: 

In [None]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(device), trg.to(device)
        
        optimizer.zero_grad()
        
        output = model(src, trg)
        # output: [batch size, trg len, output dim]
        # trg: [batch size, trg len]
        
        output_dim = output.shape[-1]
        
        # Flatten outputs and targets for loss calculation
        # Skip the first token (<SOS>)
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        
        loss = criterion(output, trg)
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for i, (src, trg) in enumerate(iterator):
            src, trg = src.to(device), trg.to(device)
            
            output = model(src, trg, 0) # Turn off teacher forcing
            
            output_dim = output.shape[-1]
            
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)
            
            loss = criterion(output, trg)
            epoch_loss += loss.item()
            
    return epoch_loss / len(iterator)

print("Starting training...")
for epoch in range(EPOCHS):
    start_time = time.time()
    
    train_loss = train(model, train_loader, optimizer, criterion, 1)
    val_loss = evaluate(model, val_loader, criterion)
    
    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)
    
    print(f'Epoch: {epoch+1:02} | Time: {int(epoch_mins)}m {int(epoch_secs)}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {val_loss:.3f} |  Val. PPL: {math.exp(val_loss):7.3f}')

Starting training...


In [None]:
def predict_answer(model, context, question, max_len=50):
    model.eval()
    
    # Prepare input
    c_idx = tokenizer.encode(context)
    q_idx = tokenizer.encode(question)
    s_idx = [tokenizer.word2idx["<SEP>"]]
    input_indices = c_idx + s_idx + q_idx
    
    src_tensor = torch.tensor(input_indices, dtype=torch.long).unsqueeze(0).to(device)
    
    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src_tensor)
        
    # Start decoding with SOS
    trg_indexes = [tokenizer.word2idx["<SOS>"]]
    
    for i in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        trg_tensor = trg_tensor.unsqueeze(0) # [1, 1]
        
        with torch.no_grad():
            output, hidden = model.decoder(trg_tensor, hidden)
            
        pred_token = output.argmax(2).item()
        trg_indexes.append(pred_token)
        
        if pred_token == tokenizer.word2idx["<EOS>"]:
            break
            
    # Decode indices to text
    predicted_tokens = trg_indexes[1:] # remove SOS
    
    # Remove EOS if present
    if predicted_tokens and predicted_tokens[-1] == tokenizer.word2idx["<EOS>"]:
         predicted_tokens = predicted_tokens[:-1]
         
    return tokenizer.decode(predicted_tokens)

# Evaluation on a few validation samples
print("--- Inference Examples ---")
for i in range(3):
    sample = val_df.iloc[i]
    print(f"Example {i+1}")
    print(f"Context: {sample['context'][:100]}...")
    print(f"Question: {sample['question']}")
    print(f"True Answer: {sample['answer']}")
    prediction = predict_answer(model, sample['context'], sample['question'])
    print(f"Predicted Answer: {prediction}")
    print("-" * 50)

# Interactive Chat with History
Test the QA system with conversation history management.

In [None]:
# Chat History Management
class ChatSession:
    def __init__(self, model, tokenizer, device, context):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.context = context
        self.history = []  # List of (question, answer) tuples
        
    def ask(self, question):
        # Build input with history
        history_str = ""
        for q, a in self.history[-3:]:  # Keep last 3 turns for context
            history_str += f" Q: {q} A: {a}"
        
        # Combine: Context + History + Current Question
        full_context = f"{self.context}{history_str}"
        
        # Get prediction
        answer = predict_answer(self.model, full_context, question)
        
        # Update history
        self.history.append((question, answer))
        
        return answer
    
    def reset(self):
        self.history = []

# Demo: Interactive session with a sample product
sample_product = val_df.iloc[0]['context']
print(f"Product Context: {sample_product}\n")

session = ChatSession(model, tokenizer, device, sample_product)

# Simulate a multi-turn conversation
demo_questions = [
    "What is the price?",
    "How many stars does it have?",
    "Is this a best seller?"
]

print("--- Multi-turn Conversation Demo ---")
for q in demo_questions:
    answer = session.ask(q)
    print(f"User: {q}")
    print(f"Bot: {answer}")
    print()

# To enable true interactivity, uncomment below:
# while True:
#     q = input("You: ")
#     if q.lower() == 'exit': break
#     print(f"Bot: {session.ask(q)}")