In [16]:
import pandas as pd
import spacy
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import random

# Load spacy model
spacy_en = spacy.load('en_core_web_sm')

# Load dataset
df = pd.read_csv('qa_dataset.csv')

# Split dataset into train and validation sets
train_df, val_df = train_test_split(df, test_size=0.2)

# Save the split datasets
train_df.to_csv('train_qa_dataset.csv', index=False)
val_df.to_csv('val_qa_dataset.csv', index=False)

# Tokenizer and vocabulary
tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

def tokenize(text):
    return tokenizer(text)

def yield_tokens(data_iter):
    for text in data_iter:
        yield tokenize(text)

def build_vocab(data):
    vocab = build_vocab_from_iterator(yield_tokens(data))
    # Add special tokens
    specials = ["<unk>", "<pad>", "<bos>", "<eos>"]
    vocab.set_default_index(vocab["<unk>"])
    vocab.set_specials(specials)
    return vocab

# Define Dataset class
class QADataset(Dataset):
    def __init__(self, dataframe, vocab):
        self.dataframe = dataframe
        self.vocab = vocab

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        question = tokenize(row['question'])
        answer = tokenize(row['answer'])
        question_tensor = torch.tensor([self.vocab[token] for token in question], dtype=torch.long)
        answer_tensor = torch.tensor([self.vocab[token] for token in answer], dtype=torch.long)
        return question_tensor, answer_tensor

# Build vocabularies
train_questions = df['question'].tolist()
train_answers = df['answer'].tolist()
vocab = build_vocab(train_questions + train_answers)

# Add special tokens
specials = ["<unk>", "<pad>", "<bos>", "<eos>"]
vocab.set_default_index(vocab["<unk>"])
for special in specials:
    if special not in vocab:
        vocab.insert_token(special, len(vocab))
vocab.set_default_index(vocab["<unk>"])

# Create datasets and dataloaders
train_dataset = QADataset(train_df, vocab)
val_dataset = QADataset(val_df, vocab)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=lambda x: zip(*x))
val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=lambda x: zip(*x))

# Define the Attention mechanism
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_size):
        super(BahdanauAttention, self).__init__()
        self.hidden_size = hidden_size
        self.attn = nn.Linear(self.hidden_size * 2, hidden_size)
        self.v = nn.Parameter(torch.rand(hidden_size))

    def forward(self, hidden, encoder_outputs):
        timestep = encoder_outputs.size(0)
        h = hidden.repeat(timestep, 1, 1).transpose(0, 1)
        encoder_outputs = encoder_outputs.transpose(0, 1)
        attn_energies = self.score(h, encoder_outputs)
        return torch.softmax(attn_energies, dim=1).unsqueeze(1)

    def score(self, hidden, encoder_outputs):
        energy = torch.tanh(self.attn(torch.cat([hidden, encoder_outputs], 2)))
        energy = energy.transpose(2, 1)
        v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)
        energy = torch.bmm(v, energy)
        return energy.squeeze(1)

# Define Encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, n_layers, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.rnn(embedded)
        return outputs, hidden, cell

# Define Decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(hidden_dim + emb_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc_out = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        a = self.attention(hidden[-1], encoder_outputs)
        a = a.permute(1, 0, 2)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        weighted = torch.bmm(a, encoder_outputs)
        weighted = weighted.permute(1, 0, 2)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)
        prediction = self.fc_out(torch.cat((output, weighted), dim=1))
        return prediction, hidden, cell

# Define Seq2Seq
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        trg_len = trg.shape[0]
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, hidden, cell = self.encoder(src)
        input = trg[0, :]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell, encoder_outputs)
            outputs[t] = output
            top1 = output.argmax(1)
            input = trg[t] if random.random() < teacher_forcing_ratio else top1
        return outputs

# Initialize model
INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
attn = BahdanauAttention(HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn)
model = Seq2Seq(enc, dec, device).to(device)

# Initialize optimizer and criterion
optimizer = optim.Adam(model.parameters())
TRG_PAD_IDX = vocab['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

# Training function
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        src, trg = batch
        src = torch.nn.utils.rnn.pad_sequence(src, padding_value=vocab['<pad>'])
        trg = torch.nn.utils.rnn.pad_sequence(trg, padding_value=vocab['<pad>'])
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# Validation function
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for batch in iterator:
            src, trg = batch
            src = torch.nn.utils.rnn.pad_sequence(src, padding_value=vocab['<pad>'])
            trg = torch.nn.utils.rnn.pad_sequence(trg, padding_value=vocab['<pad>'])
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(iterator)

# Training the model
N_EPOCHS = 10
CLIP = 1

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, val_loader, criterion)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'seq2seq-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Val. Loss: {valid_loss:.3f}')

# Load the best model
model.load_state_dict(torch.load('seq2seq-model.pt'))

# Inference function
def predict(model, question, vocab, max_len=50):
    model.eval()
    tokens = tokenize(question)
    tokens = [vocab["<bos>"]] + [vocab[token] for token in tokens] + [vocab["<eos>"]]
    src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)
    with torch.no_grad():
        encoder_outputs, hidden, cell = model.encoder(src_tensor)
    trg_indexes = [vocab["<bos>"]]
    for i in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell, encoder_outputs)
        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        if pred_token == vocab["<eos>"]:
            break
    trg_tokens = [list(vocab.keys())[i] for i in trg_indexes]
    return trg_tokens[1:-1]




ModuleNotFoundError: No module named 'spacy'

In [15]:
# Test the model interactively
def interactive_test(model, questions, vocab):
    print("Select a question to answer:")
    for i, q in enumerate(questions):
        print(f"{i + 1}. {q}")
    choice = int(input("Enter the number of the question: ")) - 1
    user_input = input("Enter your answer: ")
    prediction = predict(model, questions[choice], vocab)
    print(f"Model Prediction: {' '.join(prediction)}")
    print(f"Your Answer: {user_input}")

# List of questions from the dataset
questions = df['question'].unique()

# Run interactive test
interactive_test(model, questions, vocab)

Welcome to the AI Quiz Game!

Question 1: What is AI?

Player 1's turn:
Player 1's Score for this question: 0

Player 2's turn:
Player 2's Score for this question: 0

Question 2: What is ML?

Player 1's turn:
Player 1's Score for this question: 93.32372523065641

Player 2's turn:
Player 2's Score for this question: 43.3237252306564

--- Final Scores ---
Player 1: 93.32372523065641
Player 2: 43.3237252306564
Player 1 wins!
