In [None]:
"""
ML-Based Python Code Summarization
Course: Machine Learning for Software Analysis

This project implements a machine learning model that generates
natural language summaries from Python code snippets.

Author: Amerigo Giommetti
Academic Year: 2025/2026
"""

In [None]:
# ============================================================
# 1. Imports and Global Configuration
# ============================================================

# General imports
import random
import math
import time

# Data manipulation imports
import numpy as np
import pandas as pd

# ML imports
import torch
import torch.nn as nn
import torch.optim as optim

# Plotting and evaluation imports
import matplotlib.pyplot as plt
from tqdm import tqdm as tdqm
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Choosing between RAM and GPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

SEED = 42                       # Seed for reproducibility purposes
BATCH_SIZE = 128                # Amount of data processed at same time
EMBEDDING_DIM = 512             # Complexity of module representation
NUM_EPOCHS = 50                 # Number of time the dataset is processed
LEARNING_RATE = 5e-5            # Speed at which the model learns
MAX_CODE_LEN = 200              # Max input code length
MAX_SUMMARY_LEN = 100           # Max input summaries length
MAX_SAMPLES_TRAIN = 100000      # Number of couples code/summary in training dataset
MAX_SAMPLES_VALIDATION = 1000   # Number of couples code/summary in validation dataset
MAX_SAMPLES_TEST = 500          # Number of couples code/summary in test dataset
SRC_VOCAB_SIZE = 10000          # Max amount of different token in BPE toknizer for code
TGT_VOCAB_SIZE = 10000          # Max amount of different token in BPE toknizer for summaries

In [None]:
# ============================================================
# 2. Reproducibility and Utility Functions
# ============================================================

# Sets same seed for all randomness based processes
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
# Use the seed of 1. Global configuration to set for all other processes
set_seed(SEED)

# Helper function form input length normalization
def pad_sequence(seq, max_len, pad_value=0):
    return seq[:max_len] + [pad_value] * max(0, max_len - len(seq))

In [None]:
# ===========================================================================
# 3. Dataset Loading code_x_glue (Direct Download from S3/GitHub Source)
# ===========================================================================

# Import for huggingface dataset loads
from datasets import load_dataset

def load_data_final():
    # Code_x_glue dataset loading
    print("Loading dataset CodeXGLUE...")
    dataset = load_dataset("code_x_glue_ct_code_to_text", "python")

    # Data gathering using already present split of training, validation and test in code_x_glue
    train_raw = dataset['train'].select(range(min(MAX_SAMPLES_TRAIN, len(dataset['train']))))
    val_raw = dataset['validation'].select(range(min(MAX_SAMPLES_VALIDATION, len(dataset['validation']))))
    test_raw  = dataset['test'].select(range(min(MAX_SAMPLES_TEST, len(dataset['test']))))

    # Data shaping into dataframes and minor code cleaning
    def to_df(ds):
        df = pd.DataFrame({'code': ds['code'], 'summary': ds['docstring']})
        df['code'] = df['code'].str.replace(r'\s+', ' ', regex=True).str.strip()
        df['summary'] = df['summary'].str.replace(r'\s+', ' ', regex=True).str.strip()
        return df

    return to_df(train_raw), to_df(val_raw), to_df(test_raw)

# Our final variables to be used in the code
train_df, val_df, test_df = load_data_final()
print(f"Dataset loaded! Number of rows: {len(train_df) + len(test_df) + len(val_df)}")

In [None]:
# ============================================================
# 4. Vocabulary and Tokenization
# ============================================================

# Import and installation for tokenizers BPE approach
!pip install tokenizers
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

# Function to train the tokenizer on sub-words
def train_bpe_tokenizer(data, vocab_size):
    tokenizer = Tokenizer(BPE(unk_token="<unk>"))
    tokenizer.pre_tokenizer = Whitespace()
    trainer = BpeTrainer(
        vocab_size=vocab_size,
        special_tokens=["<pad>", "<sos>", "<eos>", "<unk>"]
    )
    tokenizer.train_from_iterator(data, trainer)
    return tokenizer

# Training call using only training set
print("Training BPE Tokenizer...")
code_tokenizer = train_bpe_tokenizer(train_df['code'].tolist(), vocab_size=SRC_VOCAB_SIZE)
summary_tokenizer = train_bpe_tokenizer(train_df['summary'].tolist(), vocab_size=TGT_VOCAB_SIZE)

# Encoding function
def encode_bpe(text, tokenizer, max_len):
    encoded = tokenizer.encode(text)
    ids = encoded.ids
    ids = [1] + ids + [2] # Adding <sos> and <eos>
    if len(ids) < max_len:
        ids += [0] * (max_len - len(ids)) # Padding
    else:
        ids = ids[:max_len-1] + [2]       # Truncating
    return ids

# Generating Tensors for the dataloader
X_train = torch.tensor([encode_bpe(c, code_tokenizer, MAX_CODE_LEN) for c in train_df['code']]).to(DEVICE)
Y_train = torch.tensor([encode_bpe(s, summary_tokenizer, MAX_SUMMARY_LEN) for s in train_df['summary']]).to(DEVICE)
X_val = torch.tensor([encode_bpe(c, code_tokenizer, MAX_CODE_LEN) for c in val_df['code']]).to(DEVICE)
Y_val = torch.tensor([encode_bpe(s, summary_tokenizer, MAX_SUMMARY_LEN) for s in val_df['summary']]).to(DEVICE)
X_test = torch.tensor([encode_bpe(c, code_tokenizer, MAX_CODE_LEN) for c in test_df['code']]).to(DEVICE)
Y_test = torch.tensor([encode_bpe(s, summary_tokenizer, MAX_SUMMARY_LEN) for s in test_df['summary']]).to(DEVICE)

In [None]:
# ============================================================
# 4.1 Positional encoding
# ============================================================

class PositionalEncoding(nn.Module):
    def __init__(self, emb_dim, max_len=500):
        super().__init__()

        pe = torch.zeros(max_len, emb_dim)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, emb_dim, 2) * (-math.log(10000.0) / emb_dim)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
# ============================================================
# 5. Model Definition
# ============================================================

class TransformerSeq2Seq(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size):
        super().__init__()

        self.src_embedding = nn.Embedding(src_vocab_size, EMBEDDING_DIM)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, EMBEDDING_DIM)
        self.positional_encoding = PositionalEncoding(EMBEDDING_DIM)

        self.transformer = nn.Transformer(
            d_model=EMBEDDING_DIM,
            nhead=8,
            num_encoder_layers=4,
            num_decoder_layers=4,
            dim_feedforward=4 * EMBEDDING_DIM,
            dropout=0.3,
            batch_first=True
        )

        self.fc_out = nn.Linear(EMBEDDING_DIM, tgt_vocab_size)

    def forward(self, src, tgt, src_padding_mask=None, tgt_padding_mask=None):
        src_emb = self.positional_encoding(self.src_embedding(src))
        tgt_emb = self.positional_encoding(self.tgt_embedding(tgt))

        # Mask to avoid the model being able to see in the future
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt.size(1)).to(DEVICE)

        out = self.transformer(
            src=src_emb,
            tgt=tgt_emb,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_padding_mask, # Ignores <pad> in input
            tgt_key_padding_mask=tgt_padding_mask  # Ignores <pad> in the target
        )

        return self.fc_out(out)

In [None]:
# ============================================================
# 6. Training Loop
# ============================================================

# Dataloader import
from torch.utils.data import TensorDataset, DataLoader

# We use a dataloader to pass the set in batches otherwise the CPU
# would receive all the informations at once and crash

# Dataloader for the train
train_dataset = TensorDataset(X_train, Y_train)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Dataloader for the validation
val_dataset = TensorDataset(X_val, Y_val)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Dataloader for the test
test_dataset = TensorDataset(X_test, Y_test)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Variables needed for plotting
train_losses = []
val_losses = []

# Initialization of transformer model, optimizer e loss
model = TransformerSeq2Seq(
    src_vocab_size=SRC_VOCAB_SIZE,
    tgt_vocab_size=TGT_VOCAB_SIZE
).to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.98), eps=1e-9)
criterion = nn.CrossEntropyLoss(ignore_index=0) # Ignores padding
# Scheduler reduces Learning rate if the val_loss doesn't decrease
# to improve model stability during training
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=1)

# Variables needed for early stopping
best_val_loss = float('inf')  # Initializing loss at infinite
patience = 5                  # Epochs to wait for early stopping
counter = 0                   # Counter for early stopping

# Training loop
for epoch in range(NUM_EPOCHS):
    # --- TRAINING PHASE ---
    model.train()
    epoch_train_loss = 0
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        # Masks (as seen in model forward)
        src_mask = (batch_x == 0)
        tgt_pad_mask = (batch_y[:, :-1] == 0)

        output = model(batch_x, batch_y[:, :-1], src_padding_mask=src_mask, tgt_padding_mask=tgt_pad_mask)
        loss = criterion(output.reshape(-1, output.size(-1)), batch_y[:, 1:].reshape(-1))
        loss.backward()
        # Between loss.backward() and the optimization step we use clipping
        # to ensure gradients don't explode rendering the model uncapable of learning
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        epoch_train_loss += loss.item()

    avg_train_loss = epoch_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    # Updating the scheduler based on val_loss
    if (epoch > 0): scheduler.step(avg_val_loss)

    # --- VALIDATION PHASE ---
    model.eval()
    epoch_val_loss = 0
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            src_mask = (batch_x == 0)
            tgt_pad_mask = (batch_y[:, :-1] == 0)
            output = model(batch_x, batch_y[:, :-1], src_padding_mask=src_mask, tgt_padding_mask=tgt_pad_mask)
            loss = criterion(output.reshape(-1, output.size(-1)), batch_y[:, 1:].reshape(-1))
            epoch_val_loss += loss.item()

    avg_val_loss = epoch_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)

    print(f"Epoch {epoch+1} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # --- EARLY STOPPING LOGIC ---
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), 'best_model.pth') # Saving the best model
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break

# Calculating perplexity of each epoch through losses array
train_perplexity = [np.exp(l) for l in train_losses]

In [None]:
# ============================================================
# 6.1 Training plotting
# ============================================================

# Epochs (x axis)
epochs = range(1, len(train_losses) + 1)

# Validation perplexity
val_perplexity = [np.exp(l) for l in val_losses]

# Big plot where we will put 2 subplots
plt.figure(figsize=(12, 5))

# Plot 1: Training & Validation Loss
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label="Training Loss")
plt.plot(epochs, val_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training & Validation Loss")
plt.legend()
plt.grid(True)

# Plot 2: Perplexity
plt.subplot(1, 2, 2)
plt.plot(epochs, train_perplexity, label="Training Perplexity")
plt.plot(epochs, val_perplexity, label="Validation Perplexity")

plt.xlabel("Epoch")
plt.ylabel("Perplexity")
plt.title("Perplexity")
plt.legend()
plt.grid(True)

# Polished layout
plt.tight_layout()
plt.show()

In [None]:
# ============================================================
# 6.2 Best model loading
# ============================================================

# Loading the model from the file where the best result with respect to validation are gotten
model.load_state_dict(torch.load('best_model.pth'))

In [None]:
# ============================================================
# 7. Inference / Code Summarization (Using Beam Search)
# ============================================================

def summarize_code_transformer(model, code_sentence, code_tokenizer, summary_tokenizer, beam_size=3, max_len=MAX_SUMMARY_LEN):
    model.eval()
    with torch.no_grad():
        # Encoding input
        src_ids = encode_bpe(code_sentence, code_tokenizer, MAX_CODE_LEN)
        src_tensor = torch.tensor(src_ids).unsqueeze(0).to(DEVICE)

        # Init of Beam Search: (score, sequence)
        beams = [(0.0, [1])] # 1 is <sos>

        for _ in range(max_len):
            new_beams = []
            for score, seq in beams:
                if seq[-1] == 2: # If predicted <eos> sequence is already over
                    new_beams.append((score, seq))
                    continue

                tgt_tensor = torch.tensor(seq).unsqueeze(0).to(DEVICE)
                output = model(src=src_tensor, tgt=tgt_tensor)

                # We take the last token probability and log_softmax it
                probs = torch.log_softmax(output[0, -1, :], dim=-1)

                # Searching for best prediction of this beam
                top_v, top_i = probs.topk(beam_size)

                for i in range(beam_size):
                    new_beams.append((score + top_v[i].item(), seq + [top_i[i].item()]))

            # We only keep the best "beam_size"
            beams = sorted(new_beams, key=lambda x: x[0], reverse=True)[:beam_size]

            # If all the beams end in <eos> we exit
            if all(seq[-1] == 2 for score, seq in beams):
                break

        # Keep the highest score sequence
        best_seq = beams[0][1]

        # Decode ignoring <sos>, <eos>, <pad>
        decoded = summary_tokenizer.decode(best_seq, skip_special_tokens=True)
        return decoded

# Function that runs the number of tests given in input
def run_test_samples_transformer(n=5):
    print(f"{'ORIGINAL CODE':<50} | {'REAL SUMMARY':<30} | {'GENERATED SUMMARY'}")
    print("-" * 130)

    for _ in range(n):
        idx = random.randint(0, len(test_df) - 1)

        original_code = test_df.iloc[idx]['code']
        real_summary = test_df.iloc[idx]['summary']

        generated_summary = summarize_code_transformer(
            model,
            original_code,
            code_tokenizer,
            summary_tokenizer,
            beam_size=3,
            max_len=MAX_SUMMARY_LEN
        )

        short_code = (original_code[:47] + '...') if len(original_code) > 47 else original_code
        short_real = (real_summary[:27] + '...') if len(real_summary) > 27 else real_summary

        print(f"{short_code:<50} | {short_real:<30} | {generated_summary}")

# Running the test on 5 samples from test set
run_test_samples_transformer(n=5)

In [None]:
# ============================================================
# 8. Evaluation Metrics
# ============================================================

!pip install rouge_score
from rouge_score import rouge_scorer
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def evaluate_transformer_bleu_rouge(model, test_df, code_tokenizer, summary_tokenizer, n_samples):
    model.eval()
    scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)
    bleu_scores = []
    rouge_scores = []

    # Selecting random samples from test set
    samples = test_df.sample(n=min(n_samples, len(test_df)))

    print(f"Evaluating on {len(samples)} samples...")

    for _, row in samples.iterrows():
        reference = row['summary']

        # Using function defined in 7. Inference/Code summarization (using Beam Search)
        prediction = summarize_code_transformer(
            model,
            row['code'],
            code_tokenizer,
            summary_tokenizer,
            beam_size=3
        )

        # For Bleu score calculation we should use same tokenizer of the training
        ref_tokens = summary_tokenizer.encode(reference).tokens
        pred_tokens = summary_tokenizer.encode(prediction).tokens

        # BLEU score
        bleu = sentence_bleu(
            [ref_tokens],
            pred_tokens,
            smoothing_function=SmoothingFunction().method1
        )
        bleu_scores.append(bleu)

        # ROUGE-L score
        rouge = scorer.score(reference, prediction)
        rouge_scores.append(rouge['rougeL'].fmeasure)

    print("-" * 30)
    print(f"Final results:")
    print(f"AVG BLEU:   {np.mean(bleu_scores):.4f}")
    print(f"AVG ROUGE-L: {np.mean(rouge_scores):.4f}")
    print("-" * 30)

# Chiamata alla funzione aggiornata
evaluate_transformer_bleu_rouge(
    model,
    test_df,
    code_tokenizer,
    summary_tokenizer,
    n_samples=100
)

In [None]:
# ============================================================
# 8. Downloading results
# ============================================================

import os
import torch
import shutil

# For downloading models purposes
os.makedirs('progetto_scaricabile/models', exist_ok=True)
os.makedirs('progetto_scaricabile/tokenizers', exist_ok=True)

# Saves PyTorch model
torch.save(model.state_dict(), 'progetto_scaricabile/models/best_model.pth')

# Saves tokenizers
code_tokenizer.save("progetto_scaricabile/tokenizers/code_bpe.json")
summary_tokenizer.save("progetto_scaricabile/tokenizers/summary_bpe.json")

print("File organizedd in 'progetto_scaricabile'")



from google.colab import files

# Creating ZIP archive
shutil.make_archive('modello_summarization_completo', 'zip', 'progetto_scaricabile')

# Downloading on PC
files.download('modello_summarization_completo.zip')