In [None]:
# --- INSTALL A RELIABLE BLEU SCORE LIBRARY ---
!pip install sacrebleu

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import sacrebleu
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import os
import json
import re
import time
from typing import List, Dict, Tuple

# --- Configuration & Hyperparameters (QUALITY-FOCUSED) ---
BPE_FILES_PATH = "/kaggle/input/temp-bpe"
WORD_2_VEC_EMBEDDINGS_PATH = "/kaggle/input/word2vec-combine-only/word2vec_FINAL_combined_embeddings_50k.npy"

# Model Hyperparameters
ENCODER_EMBED_DIM = 300
HIDDEN_DIM = 256
LSTM_LAYERS = 2
ENCODER_DROPOUT = 0.4
DECODER_DROPOUT = 0.4
LEARNING_RATE = 0.001
BATCH_SIZE = 64
NUM_EPOCHS = 8

# Data Settings
MAX_CODE_LEN = 256
MAX_SUMMARY_LEN = 50
TRAINING_SUBSET_SIZE = 250000

# --- Device Configuration ---
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {DEVICE}")

# --- BPE Tokenizer Class (No change) ---
class BPE_Loader:
    def __init__(self):
        self.token_to_id: Dict[str, int] = {}
        self.id_to_token: Dict[int, str] = {}
        self.merges: Dict[Tuple[str, str], int] = {}

    def load(self, file_prefix: str):
        vocab_path = f"{file_prefix}_vocab.json"
        merges_path = f"{file_prefix}_merges.json"
        if not os.path.exists(vocab_path): raise FileNotFoundError(f"Missing: {vocab_path}")
        if not os.path.exists(merges_path): raise FileNotFoundError(f"Missing: {merges_path}")
        with open(vocab_path, 'r', encoding='utf-8') as f: self.token_to_id = json.load(f)
        self.id_to_token = {i: t for t, i in self.token_to_id.items()}
        with open(merges_path, 'r', encoding='utf-8') as f:
            merges_loaded = json.load(f)
            self.merges = {tuple(k.split(' ')): v for k, v in merges_loaded.items()}
        print(f"Loaded BPE tokenizer from '{file_prefix}'. Vocab size: {len(self.token_to_id)}")

    def decode(self, token_ids: List[int]) -> str:
        tokens = [self.id_to_token.get(i, '<UNK>') for i in token_ids]
        return "".join(tokens).replace('</w>', ' ').strip()

    def _tokenize_word(self, word: str) -> List[str]:
        word_tuple = tuple(word) + ('</w>',)
        while True:
            pairs = list(zip(word_tuple[:-1], word_tuple[1:]))
            applicable_merges = {p: self.merges[p] for p in pairs if p in self.merges}
            if not applicable_merges: break
            best_pair = min(applicable_merges, key=applicable_merges.get)
            new_token, i, new_word_tuple = "".join(best_pair), 0, []
            while i < len(word_tuple):
                if i < len(word_tuple) - 1 and (word_tuple[i], word_tuple[i+1]) == best_pair:
                    new_word_tuple.append(new_token); i += 2
                else: new_word_tuple.append(word_tuple[i]); i += 1
            word_tuple = tuple(new_word_tuple)
        return list(word_tuple)

    def encode(self, text: str) -> List[int]:
        if not isinstance(text, str): return []
        ids = []
        unk_id = self.token_to_id.get('<UNK>')
        words = re.findall(r"\w+|\S", text)
        for word in words:
            for token in self._tokenize_word(word):
                ids.append(self.token_to_id.get(token, unk_id))
        return ids

In [None]:
# --- Check for existing df ---
try:
    print(f"Using existing DataFrame. Shape: {df.shape}")
except NameError:
    raise NameError("DataFrame 'df' not found. Please run your data loading cell first.")

# --- Load Tokenizer and Pre-trained Embeddings ---
print("Loading BPE and Word2Vec...")
tokenizer = BPE_Loader()
tokenizer.load(os.path.join(BPE_FILES_PATH, 'bpe_combined'))
pretrained_embeddings = np.load(WORD_2_VEC_EMBEDDINGS_PATH)
pretrained_embeddings = torch.tensor(pretrained_embeddings, dtype=torch.float32)
print("Word2Vec embeddings loaded.")

# --- Define Special Token IDs ---
PAD_TOKEN, SOS_TOKEN, EOS_TOKEN = '<PAD>', '<SOS>', '<EOS>'
PAD_ID, SOS_ID, EOS_ID = tokenizer.token_to_id[PAD_TOKEN], tokenizer.token_to_id[SOS_TOKEN], tokenizer.token_to_id[EOS_TOKEN]
VOCAB_SIZE = len(tokenizer.token_to_id)

# --- PyTorch Dataset Class ---
class CodeSummaryDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_code_len, max_summary_len):
        self.df = dataframe.dropna(subset=['code', 'summary']).reset_index(drop=True)
        self.tokenizer, self.max_code_len, self.max_summary_len = tokenizer, max_code_len, max_summary_len
        self.codes, self.summaries = self.df['code'].values, self.df['summary'].values
    def __len__(self): return len(self.df)
    def __getitem__(self, idx):
        code_tokens = self.tokenizer.encode(str(self.codes[idx]))[:self.max_code_len]
        summary_tokens = self.tokenizer.encode(str(self.summaries[idx]))[:self.max_summary_len - 2]
        code_tensor = torch.tensor(code_tokens + [PAD_ID] * (self.max_code_len - len(code_tokens)), dtype=torch.long)
        summary_tensor = torch.tensor([SOS_ID] + summary_tokens + [EOS_ID] + [PAD_ID] * (self.max_summary_len - len(summary_tokens) - 2), dtype=torch.long)
        return code_tensor, summary_tensor

# --- Prepare DataLoaders from df ---
print("Preparing DataLoaders...")
train_df_split = df[df['partition'] == 'train'].copy()
valid_df_split = df[df['partition'] == 'valid'].copy()
test_df_split = df[df['partition'] == 'test'].copy()

if TRAINING_SUBSET_SIZE > 0 and len(train_df_split) > TRAINING_SUBSET_SIZE:
    train_df_split = train_df_split.sample(n=TRAINING_SUBSET_SIZE, random_state=42)

train_dataset = CodeSummaryDataset(train_df_split, tokenizer, MAX_CODE_LEN, MAX_SUMMARY_LEN)
valid_dataset = CodeSummaryDataset(valid_df_split, tokenizer, MAX_CODE_LEN, MAX_SUMMARY_LEN)

# ===> THE FIX FOR ASSERTION ERRORS IS HERE <===
# Set num_workers=0 to prevent multiprocessing errors in Kaggle.
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

print(f"Data ready. Training batches: {len(train_loader)}, Validation batches: {len(valid_loader)}")

In [None]:
# --- Task 6: Model Implementation (BiLSTM Encoder-Decoder) ---
class Encoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_layers, dropout, pretrained_embeddings):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=PAD_ID)
        self.embedding.weight.data.copy_(pretrained_embeddings)
        self.embedding.weight.requires_grad = False
        self.lstm = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout, bidirectional=True, batch_first=True)
        self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        self.n_layers = n_layers
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))
        outputs, (hidden, cell) = self.lstm(embedded)
        hidden_cat = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        cell_cat = torch.cat((cell[-2,:,:], cell[-1,:,:]), dim=1)
        new_hidden = torch.tanh(self.fc_hidden(hidden_cat)).unsqueeze(0).repeat(self.n_layers, 1, 1)
        new_cell = torch.tanh(self.fc_cell(cell_cat)).unsqueeze(0).repeat(self.n_layers, 1, 1)
        return new_hidden, new_cell

class Decoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_layers, dropout, pretrained_embeddings):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=PAD_ID)
        self.embedding.weight.data.copy_(pretrained_embeddings)
        self.embedding.weight.requires_grad = False
        self.lstm = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
        self.dropout = nn.Dropout(dropout)
    def forward(self, input_token, hidden, cell):
        input_token = input_token.unsqueeze(1)
        embedded = self.dropout(self.embedding(input_token))
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder, self.decoder, self.device = encoder, decoder, device
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.shape[0], trg.shape[1]
        trg_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input_token = trg[:, 0]
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input_token, hidden, cell)
            outputs[:, t] = output
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = trg[:, t] if teacher_force else top1
        return outputs

# --- Initialize Model, Optimizer, and NEW Scheduler ---
print(f"Initializing BiLSTM Seq2Seq on {DEVICE}...")
enc = Encoder(VOCAB_SIZE, ENCODER_EMBED_DIM, HIDDEN_DIM, LSTM_LAYERS, ENCODER_DROPOUT, pretrained_embeddings)
dec = Decoder(VOCAB_SIZE, ENCODER_EMBED_DIM, HIDDEN_DIM, LSTM_LAYERS, DECODER_DROPOUT, pretrained_embeddings)
model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID)

# ===> ADDING THE LEARNING RATE SCHEDULER <===
# This will reduce LR when validation loss stops improving.
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.1, patience=2, verbose=True)

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Model initialized. Trainable parameters: {trainable_params:,}")

In [None]:
# --- Task 7: Training Procedures ---
def train_step(model, dataloader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for src, trg in tqdm(dataloader, desc="Training", leave=False):
        src, trg = src.to(DEVICE), trg.to(DEVICE)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

def evaluate_step(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for src, trg in tqdm(dataloader, desc="Validating", leave=False):
            src, trg = src.to(DEVICE), trg.to(DEVICE)
            output = model(src, trg, teacher_forcing_ratio=0)
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

# --- Main Execution Loop (with Scheduler) ---
print("Starting Training...")
best_valid_loss = float('inf')
train_losses, valid_losses = [], []
MODEL_SAVE_PATH = 'bilstm_model.pt'

for epoch in range(NUM_EPOCHS):
    start_time = time.time()
    train_loss = train_step(model, train_loader, optimizer, criterion, clip=1.0)
    valid_loss = evaluate_step(model, valid_loader, criterion)
    
    # ===> SCHEDULER STEP <===
    scheduler.step(valid_loss) # Pass validation loss to the scheduler

    end_time = time.time()
    epoch_mins, epoch_secs = divmod(end_time - start_time, 60)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        save_msg = f"-> Model Saved"
    else:
        save_msg = ""
    print(f'Epoch: {epoch+1:02} | Time: {int(epoch_mins)}m {int(epoch_secs)}s {save_msg}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {np.exp(train_loss):.2f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {np.exp(valid_loss):.2f}')
print("Training Complete.")

In [None]:
# --- Task 7: Comprehensive Evaluation ---
# 1. Convergence Analysis (Plot)
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.title('Training Convergence Analysis')
plt.xlabel('Epochs'); plt.ylabel('Loss'); plt.legend(); plt.grid(True)
plt.savefig('training_convergence.png'); plt.show()

# ===> NEW INFERENCE FUNCTION: BEAM SEARCH <===
def generate_summary_beam_search(model, tokenizer, code_text, device, max_len=MAX_SUMMARY_LEN, beam_width=5):
    model.eval()
    with torch.no_grad():
        tokens = tokenizer.encode(code_text)[:MAX_CODE_LEN]
        src_tensor = torch.LongTensor(tokens).unsqueeze(0).to(device)
        hidden, cell = model.encoder(src_tensor)
        
        # Start with <SOS> token for all beams
        # Each beam is a tuple of (sequence_of_ids, probability_score, hidden_state, cell_state)
        beams = [([SOS_ID], 0.0, hidden, cell)]
        
        for _ in range(max_len):
            new_beams = []
            for seq, score, h, c in beams:
                if seq[-1] == EOS_ID: # This beam is finished
                    new_beams.append((seq, score, h, c))
                    continue
                
                last_token = torch.LongTensor([seq[-1]]).to(device)
                output, new_h, new_c = model.decoder(last_token, h, c)
                
                # Get top N next tokens and their probabilities
                log_probs = torch.log_softmax(output, dim=1)
                top_log_probs, top_indices = log_probs.topk(beam_width)
                
                for i in range(beam_width):
                    next_token_id = top_indices[0, i].item()
                    log_prob = top_log_probs[0, i].item()
                    new_seq = seq + [next_token_id]
                    new_score = score + log_prob
                    new_beams.append((new_seq, new_score, new_h, new_c))
            
            # Sort all potential new beams by their score and keep the best `beam_width`
            beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
            
            # Check if all top beams have finished
            if all(b[0][-1] == EOS_ID for b in beams):
                break
                
    # The best sequence is the one with the highest score
    best_seq = beams[0][0]
    generated_summary = tokenizer.decode(best_seq[1:-1]) # Exclude SOS and EOS
    return generated_summary

# 2. BLEU Score Evaluation
print("\nStarting BLEU Score Evaluation on Test Set...")
model.load_state_dict(torch.load(MODEL_SAVE_PATH))
test_eval_df = test_df_split.dropna(subset=['code', 'summary']).head(1000)
predictions, references = [], []

for _, row in tqdm(test_eval_df.iterrows(), total=len(test_eval_df), desc="Generating Summaries for BLEU"):
    code_text, reference_summary = row['code'], str(row['summary'])
    # Call the new beam search function
    predicted_summary = generate_summary_beam_search(model, tokenizer, code_text, DEVICE)
    predictions.append(predicted_summary)
    references.append(reference_summary)

bleu = sacrebleu.corpus_bleu(predictions, [references])
print(f"\n>>> Final Test BLEU Score: {bleu.score:.2f} <<<")

# 3. Generation Quality Examples
print("\n--- Qualitative Examples (Test Set) ---")
for _, row in test_df_split.sample(3, random_state=123).iterrows():
    code_in, truth = row['code'], row['summary']
    # Call the new beam search function
    gen_summary = generate_summary_beam_search(model, tokenizer, code_in, DEVICE)
    print(f"\nInput Code Snippet:\n{code_in[:200]}...")
    print(f"Ground Truth Summary: {truth}")
    print(f"Generated Summary:    {gen_summary}")
    print("-" * 50)