In [1]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import spacy
from collections import Counter

In [2]:
train_file = "leaf_village_train2.csv"
validation_file = "leaf_village_validation2.csv"

In [3]:
train_data = pd.read_csv(train_file)
validation_data = pd.read_csv(validation_file)

In [4]:
train_data

Unnamed: 0,question,answer
0,Where is the nearest ramen shop?,"Ichiraku Ramen, near Market Street"
1,Can you guide me to the training grounds?,Take the path behind the academy
2,What is the quickest route to the Hokage Rock?,Head north from the village square
3,How far is the Ninja Academy from here?,About a 5 minute walk from the gate
4,Where can I find the Anbu headquarters?,Near the village outskirts
...,...,...
59,Are there recreational spaces?,"Yes, near the training grounds."
60,Where can I read about Leaf Village history?,At the library.
61,Is there a gym in the village?,"Yes, at the training grounds."
62,Are there places to meditate?,"Yes, near the Hyuga Compound."


In [5]:
# Load SpaCy tokenizer
spacy_en = spacy.load("en_core_web_sm")

# Tokenizer function
def tokenize(text):
    return [token.text.lower() for token in spacy_en.tokenizer(text) if not token.is_punct]


# Special tokens
PAD_TOKEN = "<pad>"
START_TOKEN = "<start>"
END_TOKEN = "<end>"
UNK_TOKEN = "<unk>"

In [6]:
# Vocabulary creation
class Vocabulary:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.word2idx[PAD_TOKEN] = 0
        self.word2idx[START_TOKEN] = 1
        self.word2idx[END_TOKEN] = 2
        self.word2idx[UNK_TOKEN] = 3
        self.idx2word = {idx: word for word, idx in self.word2idx.items()}
        self.counter = Counter()

    def build_vocab(self, tokenized_texts, max_size=10000, min_freq=1):
        for tokens in tokenized_texts:
            self.counter.update(tokens)
        for word, freq in self.counter.items():
            if freq >= min_freq and word.isalnum() and len(self.word2idx) < max_size:
                idx = len(self.word2idx)
                self.word2idx[word] = idx
                self.idx2word[idx] = word

In [7]:
# Custom dataset
class QADataset(Dataset):
    def __init__(self, csv_file, question_vocab, answer_vocab):
        self.data = pd.read_csv(csv_file)
        self.question_vocab = question_vocab
        self.answer_vocab = answer_vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        question = tokenize(self.data.iloc[idx, 0])  # Tokenize question
        answer = tokenize(self.data.iloc[idx, 1])  # Tokenize answer

        # Convert to integer sequences
        question_seq = [self.question_vocab.word2idx.get(w, self.question_vocab.word2idx[UNK_TOKEN]) for w in question]
        answer_seq = [self.answer_vocab.word2idx.get(w, self.answer_vocab.word2idx[UNK_TOKEN]) for w in answer]

        # Add special tokens
        question_seq = [self.question_vocab.word2idx[START_TOKEN]] + question_seq + [self.question_vocab.word2idx[END_TOKEN]]
        answer_seq = [self.answer_vocab.word2idx[START_TOKEN]] + answer_seq + [self.answer_vocab.word2idx[END_TOKEN]]

        return torch.tensor(question_seq), torch.tensor(answer_seq)

In [8]:
# Collate function for DataLoader
def collate_fn(batch):
    questions, answers = zip(*batch)
    questions = torch.nn.utils.rnn.pad_sequence(questions, batch_first=True, padding_value=0)
    answers = torch.nn.utils.rnn.pad_sequence(answers, batch_first=True, padding_value=0)
    return questions, answers

In [9]:
def preprocess_data(train_csv, val_csv):
    # Load CSV files into pandas DataFrames
    train_data = pd.read_csv(train_csv)
    val_data = pd.read_csv(val_csv)

    # Tokenize all data
    train_questions = [tokenize(q) for q in train_data['question']]
    train_answers = [tokenize(a) for a in train_data['answer']]
    val_questions = [tokenize(q) for q in val_data['question']]
    val_answers = [tokenize(a) for a in val_data['answer']]

    # Build vocabularies
    question_vocab = Vocabulary()
    answer_vocab = Vocabulary()
    question_vocab.build_vocab(train_questions)
    answer_vocab.build_vocab(train_answers)

    # Create datasets
    train_dataset = QADataset(train_csv, question_vocab, answer_vocab)
    val_dataset = QADataset(val_csv, question_vocab, answer_vocab)

    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

    return train_loader, val_loader, question_vocab, answer_vocab



In [10]:
train_loader, val_loader, question_vocab, answer_vocab = preprocess_data(
    "leaf_village_train2.csv",
    "leaf_village_validation2.csv"
)


In [11]:
for questions, answers in train_loader:
    print("Questions batch:", questions)
    print("Answers batch:", answers)
    break  # Check the first batch only


Questions batch: tensor([[  1,  47,  32, 153,  50, 154,   2,   0,   0,   0,   0,   0],
        [  1, 145,  36,  28, 146, 147, 148,  14,   2,   0,   0,   0],
        [  1,  10,  28,  97,  33,  15,  98,   2,   0,   0,   0,   0],
        [  1,  22,  36,  28,  37, 113,   6,  46,   2,   0,   0,   0],
        [  1,  17,  47,   6,  69,  70,  44,   6,  45,  46,   2,   0],
        [  1,   4,   5,   6,   7,   8,   9,   2,   0,   0,   0,   0],
        [  1,   5,  32,  33, 165,  44,   6,  46,   2,   0,   0,   0],
        [  1,   4,  10,  28,  29,  33,  57,   2,   0,   0,   0,   0],
        [  1,   4,  10,  28,  29,  96,  44,   6,  45,  46,   2,   0],
        [  1,  47,  32, 107, 108,   2,   0,   0,   0,   0,   0,   0],
        [  1,  47,  32, 104, 105,  50, 106,   2,   0,   0,   0,   0],
        [  1,  47,  32,  48,  99, 100, 101,   2,   0,   0,   0,   0],
        [  1,  47,  32,  59,  44,   6,  45,  46,   2,   0,   0,   0],
        [  1,  17,   3,   6,  79,  80,  50,  81,  44,   6,  46,   2],
   

In [12]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, num_layers, dropout=0.1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)
        outputs, (hidden, cell) = self.lstm(embedded)  # Outputs ignored; focus on hidden and cell
        return hidden, cell  # (num_layers, batch_size, hidden_dim)


In [13]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, num_layers, dropout=0.1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        # x: (batch_size) -> Need to reshape to (batch_size, 1)
        x = x.unsqueeze(1)
        embedded = self.embedding(x)  # (batch_size, 1, embed_dim)
        outputs, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        predictions = self.fc(outputs.squeeze(1))  # (batch_size, output_dim)
        return predictions, hidden, cell


In [14]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        trg_len = trg.size(1)
        trg_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        hidden, cell = self.encoder(src)

        # First input to the decoder is the <start> token
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            top1 = output.argmax(1)  # Get the highest probability token
            input = trg[:, t] if torch.rand(1).item() < teacher_forcing_ratio else top1

        return outputs


In [15]:
# Define hyperparameters
INPUT_DIM = len(question_vocab.word2idx)
OUTPUT_DIM = len(answer_vocab.word2idx)
EMBED_DIM = 256
HIDDEN_DIM = 512
NUM_LAYERS = 2
DROPOUT = 0.5

In [16]:
# Initialize components
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(INPUT_DIM, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
decoder = Decoder(OUTPUT_DIM, EMBED_DIM, HIDDEN_DIM, NUM_LAYERS, DROPOUT)
model = Seq2Seq(encoder, decoder, device).to(device)

# Define optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore <pad> token

In [17]:
# Training loop
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for src, trg in iterator:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()

        output = model(src, trg)  # (batch_size, trg_len, trg_vocab_size)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [18]:
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, trg in iterator:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, 0)  # Turn off teacher forcing
            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [19]:
def train_model(model, train_loader, val_loader, optimizer, criterion, n_epochs, clip):
    for epoch in range(1, n_epochs + 1):
        # Training
        train_loss = train(model, train_loader, optimizer, criterion, clip)

        # Validation
        val_loss = evaluate(model, val_loader, criterion)

        print(f'Epoch {epoch:02} | Train Loss: {train_loss:.3f} | Val Loss: {val_loss:.3f}')


In [31]:
# Hyperparameters
N_EPOCHS = 20
CLIP = 1

# Train the model
train_model(model, train_loader, val_loader, optimizer, criterion, N_EPOCHS, CLIP)

Epoch 01 | Train Loss: 1.494 | Val Loss: 3.356
Epoch 02 | Train Loss: 1.384 | Val Loss: 3.394
Epoch 03 | Train Loss: 1.379 | Val Loss: 3.342
Epoch 04 | Train Loss: 1.217 | Val Loss: 3.498
Epoch 05 | Train Loss: 1.235 | Val Loss: 3.471
Epoch 06 | Train Loss: 1.091 | Val Loss: 3.687
Epoch 07 | Train Loss: 1.117 | Val Loss: 3.565
Epoch 08 | Train Loss: 0.974 | Val Loss: 3.598
Epoch 09 | Train Loss: 0.956 | Val Loss: 3.867
Epoch 10 | Train Loss: 0.853 | Val Loss: 3.862
Epoch 11 | Train Loss: 0.773 | Val Loss: 3.883
Epoch 12 | Train Loss: 0.748 | Val Loss: 3.903
Epoch 13 | Train Loss: 0.639 | Val Loss: 4.058
Epoch 14 | Train Loss: 0.635 | Val Loss: 4.084
Epoch 15 | Train Loss: 0.570 | Val Loss: 4.206
Epoch 16 | Train Loss: 0.524 | Val Loss: 4.312
Epoch 17 | Train Loss: 0.493 | Val Loss: 4.195
Epoch 18 | Train Loss: 0.432 | Val Loss: 4.302
Epoch 19 | Train Loss: 0.406 | Val Loss: 4.457
Epoch 20 | Train Loss: 0.347 | Val Loss: 4.369


In [32]:
import math

def calculate_perplexity(loss):
    return math.exp(loss)

# Example usage
val_loss = evaluate(model, val_loader, criterion)
perplexity = calculate_perplexity(val_loss)
print(f"Validation Perplexity: {perplexity:.2f}")


Validation Perplexity: 78.96


In [33]:
def test_model(model, question, question_vocab, answer_vocab, max_len=20):
    model.eval()

    # Tokenize and numericalize the question
    tokens = [question_vocab.word2idx.get(word, question_vocab.word2idx["<unk>"]) for word in tokenize(question)]
    tokens = [question_vocab.word2idx["<start>"]] + tokens + [question_vocab.word2idx["<end>"]]
    src = torch.tensor(tokens).unsqueeze(0).to(device)  # (1, seq_len)

    with torch.no_grad():
        hidden, cell = model.encoder(src)

        # Start decoding
        trg_indices = [answer_vocab.word2idx["<start>"]]
        for _ in range(max_len):
            trg_tensor = torch.tensor([trg_indices[-1]]).to(device)
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
            pred_token = output.argmax(1).item()
            trg_indices.append(pred_token)

            if pred_token == answer_vocab.word2idx["<end>"]:
                break

    # Convert indices back to words
    trg_tokens = [answer_vocab.idx2word[idx] for idx in trg_indices]
    return " ".join(trg_tokens[1:-1])  # Remove <start> and <end>


In [34]:
# Unseen questions
test_questions = [
    "Where is the nearest ramen shop?",
    "Where can I see the Hokage Monument?",
    "Are there festivals in the Leaf Village?"
]

for question in test_questions:
    answer = test_model(model, question, question_vocab, answer_vocab)
    print(f"Q: {question}")
    print(f"A: {answer}\n")


Q: Where is the nearest ramen shop?
A: ichiraku ramen near market street

Q: Where can I see the Hokage Monument?
A: about hokage rock

Q: Are there festivals in the Leaf Village?
A: yes the fire festival



In [36]:
# save model 
torch.save(model.state_dict(), "seq2seq.pth")