In [None]:
!pip install torch==2.1.0 torchtext==0.16.0 numpy==1.24.2

Collecting torch==2.1.0
  Downloading torch-2.1.0-cp311-cp311-manylinux1_x86_64.whl.metadata (25 kB)
Collecting torchtext==0.16.0
  Downloading torchtext-0.16.0-cp311-cp311-manylinux1_x86_64.whl.metadata (7.5 kB)
Collecting numpy==1.24.2
  Downloading numpy-1.24.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.1.0)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.1.0)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.1.0)
  Downloading nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch==2.1.0)
  Downloading nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu1

In [None]:
from google.colab import drive
import json

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import math
import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from collections import Counter


# Read QA dataset (JSON format)
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader, Dataset, random_split
from collections import Counter
import json

import random
random.seed(42)
torch.manual_seed(42)


# Read QA dataset (JSON format)
def read_qa_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        raw = json.load(f)
    raw_data = []
    for item in raw:
        question = item["question"].strip()
        context = item["context"].strip()
        answer = item["answers"]["text"][0].strip()
        input_text = question + " [SEP] " + context
        raw_data.append((input_text, answer))
    return raw_data

# Splitting the dataset
def split_dataset(data, train_split=0.7, val_split=0.15, test_split=0.15):
    total_size = len(data)
    train_size = int(total_size * train_split)
    val_size = int(total_size * val_split)
    test_size = total_size - train_size - val_size
    train_data, remaining_data = random_split(data, [train_size, total_size - train_size])
    val_data, test_data = random_split(remaining_data, [val_size, test_size])
    return list(train_data), list(val_data), list(test_data)

# Custom Dataset class
class QADataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def get_raw_texts(self):
        return [(src, trg) for src, trg in self.data]

# Use basic English tokenizer
tokenizer_en = get_tokenizer('basic_english')

# Build vocabulary
def build_vocabulary(tokenizer, dataset, min_freq=2):
    def yield_tokens(data):
        for src, tgt in data:
            yield tokenizer(src)
            yield tokenizer(tgt)

    vocab = build_vocab_from_iterator(
        yield_tokens(dataset.get_raw_texts()),
        specials=["<pad>", "<unk>", "<sos>", "<eos>"],
        min_freq=min_freq
    )
    vocab.set_default_index(vocab["<unk>"])
    return vocab

# Constants
MAX_PADDING = 300
BATCH_SIZE = 64

# Read and split data
file_path = "/content/drive/MyDrive/dataset.json"
raw_data = read_qa_data(file_path)
train_data_raw, val_data_raw, test_data_raw = split_dataset(raw_data)

# Create datasets
train_dataset = QADataset(train_data_raw)
valid_dataset = QADataset(val_data_raw)
test_dataset = QADataset(test_data_raw)

# Build vocab
vocab = build_vocabulary(tokenizer_en, train_dataset)
import pickle

# Save the vocab to your Google Drive
with open('/content/drive/MyDrive/vocab.pkl', 'wb') as f:
    pickle.dump(vocab, f)


PAD_IDX = vocab['<pad>']
SOS_IDX = vocab['<sos>']
EOS_IDX = vocab['<eos>']

# Batch generation function
def generate_batch(data_batch):
    src_batch, tgt_batch = [], []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for (src_text, tgt_text) in data_batch:
        src_indices = torch.tensor([vocab[token] for token in tokenizer_en(src_text)], dtype=torch.long)
        tgt_indices = torch.tensor([vocab[token] for token in tokenizer_en(tgt_text)], dtype=torch.long)

        src_tensor = torch.cat([torch.tensor([SOS_IDX]), src_indices, torch.tensor([EOS_IDX])]).to(device)
        tgt_tensor = torch.cat([torch.tensor([SOS_IDX]), tgt_indices, torch.tensor([EOS_IDX])]).to(device)

        src_padded = F.pad(src_tensor, (0, MAX_PADDING - len(src_tensor)), value=PAD_IDX)
        tgt_padded = F.pad(tgt_tensor, (0, MAX_PADDING - len(tgt_tensor)), value=PAD_IDX)

        src_batch.append(src_padded)
        tgt_batch.append(tgt_padded)

    return torch.stack(src_batch), torch.stack(tgt_batch)

# DataLoader setup
train_iter = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
valid_iter = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
test_iter = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True, collate_fn=generate_batch)
print(len(vocab))

class Embeddings(nn.Module):
    def __init__(self, vocab_size: int, d_model: int):
        """
        Args:
          vocab_size:    size of vocabulary
          d_model:       dimension of embeddings
        """
        super().__init__()
        self.lut = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        """
        Args:
          x:        input tensor (batch_size, seq_length)

        Returns:  embedding vector
        """
        return self.lut(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_length: int = 5000):
        """
        Args:
            d_model:     dimension of embeddings
            dropout:     dropout probability
            max_length:  max sequence length for positional encoding
        """
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_length, d_model)
        for pos in range(max_length):
            for i in range(0, d_model // 2):
                theta = pos / (100 ** ((2 * i) / d_model))
                pe[pos, 2 * i] = math.sin(theta)
                pe[pos, 2 * i + 1] = math.cos(theta)

        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        Args:
            x: embeddings (batch_size, seq_length, d_model)
        Returns:
            embeddings + positional encodings (batch_size, seq_length, d_model)
        """
        x = x + self.pe[:x.size(1)].unsqueeze(0).requires_grad_(False)
        return self.dropout(x)

import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout: float = 0.1):
        """
        Args:
            d_model:      dimension of embeddings
            n_heads:      number of attention heads
            dropout:      dropout probability
        """
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_key = d_model // n_heads

        self.Wq = nn.Linear(d_model, d_model)
        self.Wk = nn.Linear(d_model, d_model)
        self.Wv = nn.Linear(d_model, d_model)
        self.Wo = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        """
        Args:
            query: (batch_size, q_len, d_model)
            key:   (batch_size, k_len, d_model)
            value: (batch_size, v_len, d_model)
            mask:  optional mask (batch_size, 1, 1, k_len)  or (batch_size, 1, q_len, k_len)

        Returns:
            output:     (batch_size, q_len, d_model)
            attn_probs: (batch_size, n_heads, q_len, k_len)
        """
        batch_size = query.size(0)

        Q = self.Wq(query)  # (batch_size, q_len, d_model)
        K = self.Wk(key)    # (batch_size, k_len, d_model)
        V = self.Wv(value)  # (batch_size, v_len, d_model)

        # Split heads
        Q = Q.view(batch_size, -1, self.n_heads, self.d_key).transpose(1, 2)  # (batch_size, n_heads, q_len, d_key)
        K = K.view(batch_size, -1, self.n_heads, self.d_key).transpose(1, 2)  # (batch_size, n_heads, k_len, d_key)
        V = V.view(batch_size, -1, self.n_heads, self.d_key).transpose(1, 2)  # (batch_size, n_heads, v_len, d_key)

        # Attention scores
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_key)  # (batch_size, n_heads, q_len, k_len)

        if mask is not None:
            # mask shape must broadcast to (batch_size, n_heads, q_len, k_len)
            scores = scores.masked_fill(mask == 0, float('-inf'))

        attn_probs = F.softmax(scores, dim=-1)
        attn_probs = self.dropout(attn_probs)

        A = torch.matmul(attn_probs, V)  # (batch_size, n_heads, q_len, d_key)

        A = A.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads * self.d_key)  # (batch_size, q_len, d_model)
        output = self.Wo(A)

        return output, attn_probs


def create_padding_mask(seq, pad_idx):
    """
    Args:
      seq: tensor (batch_size, seq_len)
      pad_idx: padding token index

    Returns:
      mask: (batch_size, 1, 1, seq_len)
    """
    mask = (seq != pad_idx).unsqueeze(1).unsqueeze(2)  # (batch_size, 1, 1, seq_len)
    return mask


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model: int, d_ffn: int, dropout: float = 0.1):
        """
        Args:
            d_model: embedding dimension
            d_ffn: hidden dimension of feed-forward layer
            dropout: dropout probability
        """
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ffn)
        self.linear2 = nn.Linear(d_ffn, d_model)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        """
        Args:
            x: input tensor (batch_size, seq_length, d_model)

        Returns:
            transformed tensor (batch_size, seq_length, d_model)
        """
        return self.linear2(self.dropout(F.relu(self.linear1(x))))


class EncoderLayer(nn.Module):
      def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):
          """
          Single encoder block for answer generation (same as translation)
          """
          super().__init__()
          self.attention = MultiHeadAttention(d_model, n_heads, dropout)
          self.attn_layer_norm = nn.LayerNorm(d_model)
          self.positionwise_fnn = PositionwiseFeedForward(d_model, d_ffn, dropout)
          self.fnn_layer_norm = nn.LayerNorm(d_model)
          self.dropout = nn.Dropout(dropout)

      def forward(self, src, src_mask):
          """
          src: (batch_size, seq_len, d_model)
          src_mask: optional padding mask
          """
          _src, attn_probs = self.attention(src, src, src, src_mask)
          src = self.attn_layer_norm(src + self.dropout(_src))

          _src = self.positionwise_fnn(src)
          src = self.fnn_layer_norm(src + self.dropout(_src))

          return src, attn_probs


class Encoder(nn.Module):
      def __init__(self, d_model: int, n_layers: int, n_heads: int, d_ffn: int, dropout: float = 0.1):
          """
          Stack of EncoderLayers
          """
          super().__init__()
          self.layers = nn.ModuleList([
              EncoderLayer(d_model, n_heads, d_ffn, dropout)
              for _ in range(n_layers)
          ])
          self.dropout = nn.Dropout(dropout)

      def forward(self, src, src_mask):
          """
          src: embedded input sequence (batch, seq_len, d_model)
          src_mask: padding mask
          """
          for layer in self.layers:
              src, attn_probs = layer(src, src_mask)

          self.attn_probs = attn_probs  # Save last attention map if needed
          return src

class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, n_heads: int, d_ffn: int, dropout: float):
        super().__init__()
        self.masked_attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.masked_attn_layer_norm = nn.LayerNorm(d_model)

        self.attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.attn_layer_norm = nn.LayerNorm(d_model)

        self.positionwise_fnn = PositionwiseFeedForward(d_model, d_ffn, dropout)
        self.fnn_layer_norm = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, trg, src, trg_mask, src_mask):
        """
        trg: (batch, tgt_len, d_model) - decoder input
        src: (batch, src_len, d_model) - encoder output
        trg_mask: (batch, 1, tgt_len, tgt_len) - causal mask
        src_mask: (batch, 1, 1, src_len) - padding mask
        """
        _trg, attn_probs = self.masked_attention(trg, trg, trg, trg_mask)
        trg = self.masked_attn_layer_norm(trg + self.dropout(_trg))

        _trg, attn_probs = self.attention(trg, src, src, src_mask)
        trg = self.attn_layer_norm(trg + self.dropout(_trg))

        _trg = self.positionwise_fnn(trg)
        trg = self.fnn_layer_norm(trg + self.dropout(_trg))

        return trg, attn_probs


class Decoder(nn.Module):
    def __init__(self, vocab_size: int, d_model: int, n_layers: int, n_heads: int, d_ffn: int, dropout: float = 0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, n_heads, d_ffn, dropout)
            for _ in range(n_layers)
        ])
        self.dropout = nn.Dropout(dropout)
        self.Wo = nn.Linear(d_model, vocab_size)

    def forward(self, trg, src, trg_mask, src_mask):
        """
        trg: embedded target tokens (batch, tgt_len, d_model)
        src: encoder output (batch, src_len, d_model)
        trg_mask: causal mask for target (batch, 1, tgt_len, tgt_len)
        src_mask: padding mask for encoder output (batch, 1, 1, src_len)
        """
        for layer in self.layers:
            trg, attn_probs = layer(trg, src, trg_mask, src_mask)

        self.attn_probs = attn_probs
        return self.Wo(trg)


class Transformer(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: Embeddings,
                 trg_embed: Embeddings, src_pad_idx: int, trg_pad_idx: int, device):
        """
        Args:
            encoder:        encoder stack
            decoder:        decoder stack
            src_embed:      question/context embeddings
            trg_embed:      answer embeddings
            src_pad_idx:    padding index for input
            trg_pad_idx:    padding index for output
            device:         cpu or gpu
        """
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.trg_embed = trg_embed
        self.device = device
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx

    def make_src_mask(self, src):
        """
        Creates padding mask for encoder input
        """
        return (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

    def make_trg_mask(self, trg):
        """
        Creates padding + causal mask for decoder input
        """
        seq_length = trg.size(1)
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
        trg_sub_mask = torch.tril(torch.ones((seq_length, seq_length), device=self.device)).bool()
        return trg_pad_mask & trg_sub_mask

    def forward(self, src, trg):
        """
        src: (batch_size, src_seq_length) - question + context
        trg: (batch_size, trg_seq_length) - answer (shifted)
        Returns:
            logits: (batch_size, trg_seq_length, vocab_size)
        """
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        memory = self.encoder(self.src_embed(src), src_mask)
        output = self.decoder(self.trg_embed(trg), memory, trg_mask, src_mask)

        return output




def make_model(device, src_vocab, trg_vocab, n_layers: int = 3, d_model: int = 256,
               d_ffn: int = 2048, n_heads: int = 8, dropout: float = 0.1,
               max_length: int = 5000):
    """
    Constructs a Transformer model for answer generation.

    Args:
        src_vocab: source vocabulary (e.g. for question + context)
        trg_vocab: target vocabulary (e.g. for answer)
        device: torch device
        n_layers: number of encoder & decoder layers
        d_model: embedding dimension
        d_ffn: feed-forward hidden dimension
        n_heads: number of attention heads
        dropout: dropout probability
        max_length: max sequence length for positional encoding

    Returns:
        A full Transformer model instance
    """
    encoder = Encoder(d_model, n_layers, n_heads, d_ffn, dropout)
    decoder = Decoder(len(trg_vocab), d_model, n_layers, n_heads, d_ffn, dropout)

    src_embed = Embeddings(len(src_vocab), d_model)
    trg_embed = Embeddings(len(trg_vocab), d_model)
    pos_enc = PositionalEncoding(d_model, dropout, max_length)

    model = Transformer(
        encoder,
        decoder,
        nn.Sequential(src_embed, pos_enc),
        nn.Sequential(trg_embed, pos_enc),
        src_pad_idx=src_vocab.get_stoi().get("<pad>", 0),
        trg_pad_idx=trg_vocab.get_stoi().get("<pad>", 0),
        device=device
    )

    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return model


import math
# Set device for model (GPU if available, otherwise CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Build vocab from source and target datasets
# Assuming you have already built vocab_src and vocab_trg using the build_vocabulary function.
# If not, you need to do that before calling make_model. Here is how to get them:
# vocab_src = build_vocabulary(tokenizer_en, train_dataset)  # For question + context
# vocab_trg = build_vocabulary(tokenizer_en, train_dataset)  # For answer (it might differ in some cases)

# Create the model using make_model function
model = make_model(device, vocab, vocab,  # Assuming vocab_src and vocab_trg are the same for now
                   n_layers=3, n_heads=8, d_model=256,
                   d_ffn=512, max_length=300)

# Move model to the correct device (GPU or CPU)
model.to(device)

6681


Transformer(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-2): 3 x EncoderLayer(
        (attention): MultiHeadAttention(
          (Wq): Linear(in_features=256, out_features=256, bias=True)
          (Wk): Linear(in_features=256, out_features=256, bias=True)
          (Wv): Linear(in_features=256, out_features=256, bias=True)
          (Wo): Linear(in_features=256, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (attn_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (positionwise_fnn): PositionwiseFeedForward(
          (linear1): Linear(in_features=256, out_features=512, bias=True)
          (linear2): Linear(in_features=512, out_features=256, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (fnn_layer_norm): LayerNorm((256,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (dropout): Dropout(p=0.1, inplace=False)

In [None]:
# Learning rate
LEARNING_RATE = 0.0005

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Loss function - CrossEntropyLoss with padding index
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

def train(model, iterator, optimizer, criterion, clip):
    """
    Function to train the model for one epoch.

    Args:
        model: The Transformer model to be trained.
        iterator: The data iterator (DataLoader) for training data.
        optimizer: Optimizer (Adam in your case).
        criterion: Loss function (CrossEntropyLoss).
        clip: Gradient clipping value to avoid exploding gradients.

    Returns:
        epoch_loss: The average loss for the epoch.
    """
    # Set the model to training mode
    model.train()

    epoch_loss = 0

    # Loop through each batch in the iterator
    for i, batch in enumerate(iterator):
        # Get source (src) and target (trg) from batch
        src, trg = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass through the model (excluding the last token in trg for input)
        logits = model(src, trg[:, :-1])

        # Prepare the expected output (excluding the first token in trg)
        expected_output = trg[:, 1:]

        # Calculate the loss (CrossEntropyLoss)
        loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
                         expected_output.contiguous().view(-1))

        # Backpropagation
        loss.backward()

        # Clip the gradients to prevent gradient explosion
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        # Update the model's weights
        optimizer.step()

        # Accumulate the loss for averaging later
        epoch_loss += loss.item()

    # Return the average loss for the epoch
    return epoch_loss / len(iterator)



def evaluate(model, iterator, criterion):
    """
    Function to evaluate the model on the validation or test data.

    Args:
        model: The Transformer model to be evaluated.
        iterator: The data iterator (DataLoader) for validation or test data.
        criterion: Loss function (CrossEntropyLoss).

    Returns:
        epoch_loss: The average loss for the evaluation.
    """
    # Set the model to evaluation mode
    model.eval()

    epoch_loss = 0

    # Disable gradient computation for evaluation
    with torch.no_grad():
        # Loop through each batch in the iterator
        for i, batch in enumerate(iterator):
            # Get source (src) and target (trg) from the batch
            src, trg = batch

            # Forward pass through the model (excluding the last token in trg for input)
            logits = model(src, trg[:, :-1])

            # Prepare the expected output (excluding the first token in trg)
            expected_output = trg[:, 1:]

            # Calculate the loss (CrossEntropyLoss)
            loss = criterion(logits.contiguous().view(-1, logits.shape[-1]),
                             expected_output.contiguous().view(-1))

            # Accumulate the loss for averaging later
            epoch_loss += loss.item()

    # Return the average loss for the epoch
    return epoch_loss / len(iterator)


def epoch_time(start_time, end_time):
    """
    Function to compute the elapsed time between start and end.

    Args:
        start_time: Start time (typically the time before starting the epoch).
        end_time: End time (typically the time after finishing the epoch).

    Returns:
        elapsed_mins: Elapsed time in minutes.
        elapsed_secs: Elapsed time in seconds.
    """
    # Calculate the total elapsed time
    elapsed_time = end_time - start_time

    # Convert the elapsed time into minutes and seconds
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))

    return elapsed_mins, elapsed_secs

import time
import torch

N_EPOCHS = 10
CLIP = 1
SAVE_PATH = "/content/drive/MyDrive/transformer_checkpoint.pt"  # where to save model
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time()

    train_loss = train(model, train_iter, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iter, criterion)

    end_time = time.time()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'valid_loss': valid_loss,
            'epoch': epoch,
        }, SAVE_PATH)

    print(f"Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s")
    print(f"\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}")
    print(f"\tValid Loss: {valid_loss:.3f} | Valid PPL: {math.exp(valid_loss):7.3f}")

 # Load best checkpoint
checkpoint = torch.load('/content/drive/MyDrive/transformer_checkpoint.pt', map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

# Evaluate on test data
test_loss = evaluate(model, test_iter, criterion)
print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f}')

from google.colab import drive
drive.mount('/content/drive')
torch.save(model.state_dict(), '/content/drive/MyDrive/transformer1_model.pth')


from google.colab import drive
drive.mount('/content/drive')

model.load_state_dict(torch.load('/content/drive/MyDrive/transformer1_model.pth'))
model.eval()


def generate_answer(model, src_sentence, src_vocab, trg_vocab, tokenizer, max_len=300, device='cuda'):
    """
    Generate an answer from a given input sentence using the trained Transformer model.

    Args:
        model: Trained Transformer model.
        src_sentence: Source sentence (question + context) as a string.
        src_vocab: Vocabulary object for source side.
        trg_vocab: Vocabulary object for target side.
        tokenizer: Tokenizer function to split sentence into tokens.
        max_len: Maximum length of generated output.
        device: CPU or GPU.

    Returns:
        generated_answer: The generated answer as a string.
    """
    model.eval()

    # Tokenize and numericalize
    tokens = tokenizer(src_sentence.lower())
    tokens = ['<sos>'] + tokens + ['<eos>']  # adding special tokens if needed

    src_indexes = [src_vocab.get_stoi().get(token, src_vocab.get_stoi()['<unk>']) for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)  # shape (1, src_len)

    src_mask = model.make_src_mask(src_tensor)

    # Encode the source
    with torch.no_grad():
        memory = model.encoder(model.src_embed(src_tensor), src_mask)

    # Start decoding
    trg_indexes = [trg_vocab.get_stoi()['<sos>']]  # start with <sos>

    for i in range(max_len):
        trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(0).to(device)  # shape (1, len_so_far)
        trg_mask = model.make_trg_mask(trg_tensor)

        with torch.no_grad():
            output = model.decoder(model.trg_embed(trg_tensor), memory, trg_mask, src_mask)

        pred_token = output.argmax(-1)[:, -1].item()  # last token prediction

        trg_indexes.append(pred_token)

        if pred_token == trg_vocab.get_stoi()['<eos>']:
            break

    # Convert generated indexes back to tokens
    trg_tokens = [trg_vocab.get_itos()[i] for i in trg_indexes]

    # Remove special tokens
    generated_answer = trg_tokens[1:-1]  # skip <sos> and <eos>

    return ' '.join(generated_answer)

# Example usage
src_sentence = "how do i activate my card?"
print(src_sentence)
generated = generate_answer(model, src_sentence, vocab, vocab, tokenizer_en, device=device)
print("Generated Answer:", generated)


src_sentence = "how to apply for a loan?"
print(src_sentence)
generated = generate_answer(model, src_sentence, vocab, vocab, tokenizer_en, device=device)
print("Generated Answer:", generated)



Epoch: 01 | Time: 3m 15s
	Train Loss: 3.129 | Train PPL:  22.860
	Valid Loss: 1.714 | Valid PPL:   5.553
Epoch: 02 | Time: 3m 18s
	Train Loss: 1.552 | Train PPL:   4.721
	Valid Loss: 1.284 | Valid PPL:   3.612
Epoch: 03 | Time: 3m 18s
	Train Loss: 1.276 | Train PPL:   3.584
	Valid Loss: 1.126 | Valid PPL:   3.082
Epoch: 04 | Time: 3m 18s
	Train Loss: 1.145 | Train PPL:   3.142
	Valid Loss: 1.042 | Valid PPL:   2.836
Epoch: 05 | Time: 3m 18s
	Train Loss: 1.063 | Train PPL:   2.894
	Valid Loss: 0.987 | Valid PPL:   2.682
Epoch: 06 | Time: 3m 18s
	Train Loss: 1.004 | Train PPL:   2.729
	Valid Loss: 0.950 | Valid PPL:   2.586
Epoch: 07 | Time: 3m 18s
	Train Loss: 0.959 | Train PPL:   2.609
	Valid Loss: 0.925 | Valid PPL:   2.521
Epoch: 08 | Time: 3m 18s
	Train Loss: 0.923 | Train PPL:   2.517
	Valid Loss: 0.903 | Valid PPL:   2.468
Epoch: 09 | Time: 3m 18s
	Train Loss: 0.894 | Train PPL:   2.444
	Valid Loss: 0.886 | Valid PPL:   2.425
Epoch: 10 | Time: 3m 18s
	Train Loss: 0.868 | Train PPL

In [None]:

src_sentence = "how to lock my card?"
print(src_sentence)
generated = generate_answer(model, src_sentence, vocab, vocab, tokenizer_en, device=device)
print("Generated Answer:", generated)


how to lock my card?
Generated Answer: sure ! i ' m here to assist you with locking your card . to lock your card , please follow these steps 1 . log in to your online banking account or mobile banking app . 2 . navigate to the cards or account section . 3 . look for the option to lock or freeze your card . 4 . click on that option and follow the prompts to confirm the card lock . if you encounter any difficulties or have further questions , please don ' t hesitate to let me know . i ' m here to help ! let me know if you need any more details .
