# FLAN-T5-small LoRA su CNN/DailyMail (PyTorch)

In [None]:
!pip install transformers datasets torch nltk

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    get_linear_schedule_with_warmup,
)
from datasets import load_dataset
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

nltk.download("punkt")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


## Dataset CNN/DailyMail 3.0.0

In [None]:
dataset = load_dataset("abisee/cnn_dailymail", "3.0.0")

print(dataset["train"][0])

## Tokenizer FLAN-T5-small e preprocessing

In [None]:
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-small")

def preprocess_data(examples):
    inputs = [f"Summarize: {art}" for art in examples["article"]]
    targets = [hl for hl in examples["highlights"]]

    model_inputs = tokenizer(
        inputs,
        max_length=256,
        truncation=True,
        padding="max_length",
    )

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(
            targets,
            max_length=256,
            truncation=True,
            padding="max_length",
        )

    model_inputs["labels"] = labels["input_ids"]
    model_inputs["decoder_input_ids"] = labels["input_ids"]

    return model_inputs

train_dataset = dataset["train"].select(range(10000)).map(
    preprocess_data, batched=True
)
test_dataset = dataset["test"].select(range(500)).map(
    preprocess_data, batched=True
)

columns_to_keep = ["input_ids", "attention_mask", "decoder_input_ids", "labels"]
train_dataset.set_format(type="torch", columns=columns_to_keep)
test_dataset.set_format(type="torch", columns=columns_to_keep)

## DataLoader

In [None]:
data_collator = DataCollatorForSeq2Seq(tokenizer, model=None)

train_dataloader = DataLoader(
    train_dataset,
    shuffle=True,
    batch_size=32,
    collate_fn=data_collator,
)
#Con shuffle=False lâ€™ordine dei sample Ã¨ deterministico, quindi se tieni fisso il seed e il modello,
#ottieni sempre gli stessi batch e le stesse metriche batchâ€‘perâ€‘batch. Con shuffle=True lâ€™ordine cambia a ogni iterazione,
#a meno di fissare e reimpostare i seed prima di ogni epoch
test_dataloader = DataLoader(
    test_dataset,
    shuffle=True,
    batch_size=32,
    collate_fn=data_collator,
)

## Modello FLAN-T5-small e definizione LoRA

In [None]:
model = AutoModelForSeq2SeqLM.from_pretrained("google/flan-t5-small")
model.to(device)

class LoRALayer(nn.Module):
    def __init__(self, dense: nn.Linear, rank: int = 4):
        super().__init__()
        self.dense = dense
        self.rank = rank
        in_features = dense.in_features
        out_features = dense.out_features
        self.w_a = nn.Parameter(torch.randn(in_features, rank) * 0.01)
        self.w_b = nn.Parameter(torch.randn(rank, out_features) * 0.01)

    def forward(self, inputs):
        original_output = self.dense(inputs)
        lora_output = inputs @ self.w_a @ self.w_b
        self.dense.weight.requires_grad = False
        if self.dense.bias is not None:
            self.dense.bias.requires_grad = False
        return original_output + lora_output





In [None]:
print("=" * 80)
print("STRUTTURA FLAN-T5-SMALL - Layer Lineari e Parametri")
print("=" * 80)

# Conta parametri totali
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"\nðŸ“Š PARAMETRI TOTALI: {total_params:,}")
print(f"âœ… PARAMETRI TRAINABLE: {trainable_params:,}")
print(f"ðŸ”’ PARAMETRI FROZEN: {total_params - trainable_params:,}")
print(f"ðŸ“ˆ % TRAINABLE: {100 * trainable_params / total_params:.2f}%\n")

linear_layers = []

for name, module in model.named_modules():
    if isinstance(module, nn.Linear):
        weight_trainable = module.weight.requires_grad
        bias_trainable = module.bias.requires_grad if module.bias is not None else False

        weight_params = module.weight.numel()
        bias_params = module.bias.numel() if module.bias is not None else 0
        total_layer_params = weight_params + bias_params

        linear_layers.append({
            'nome': name,
            'in_features': module.in_features,
            'out_features': module.out_features,
            'ha_bias': module.bias is not None,
            'weight_trainable': weight_trainable,
            'bias_trainable': bias_trainable,
            'n_params': total_layer_params,
            'trainable_params': weight_params if weight_trainable else 0
        })

# Raggruppa per tipo
encoder_layers = [l for l in linear_layers if 'encoder' in l['nome']]
decoder_layers = [l for l in linear_layers if 'decoder' in l['nome']]
altri = [l for l in linear_layers if 'encoder' not in l['nome'] and 'decoder' not in l['nome']]

print("=" * 80)
print("ðŸ”µ ENCODER LAYERS (primi 8 esempi):")
print("=" * 80)
for layer in encoder_layers[:8]:
    status = "âœ… TRAIN" if layer['weight_trainable'] else "ðŸ”’ FROZEN"
    print(f"\n{status} | {layer['nome']}")
    print(f"         Shape: {layer['in_features']} â†’ {layer['out_features']}")
    print(f"         Parametri: {layer['n_params']:,} | Bias: {'SÃ¬' if layer['ha_bias'] else 'No'}")
    print(f"         weight.requires_grad = {layer['weight_trainable']}")
    if layer['ha_bias']:
        print(f"         bias.requires_grad = {layer['bias_trainable']}")

if len(encoder_layers) > 8:
    print(f"\n... e altri {len(encoder_layers)-8} layer encoder")

print("\n" + "=" * 80)
print("ðŸŸ¢ DECODER LAYERS (primi 8 esempi):")
print("=" * 80)
for layer in decoder_layers[:8]:
    status = "âœ… TRAIN" if layer['weight_trainable'] else "ðŸ”’ FROZEN"
    print(f"\n{status} | {layer['nome']}")
    print(f"         Shape: {layer['in_features']} â†’ {layer['out_features']}")
    print(f"         Parametri: {layer['n_params']:,} | Bias: {'SÃ¬' if layer['ha_bias'] else 'No'}")
    print(f"         weight.requires_grad = {layer['weight_trainable']}")
    if layer['ha_bias']:
        print(f"         bias.requires_grad = {layer['bias_trainable']}")

if len(decoder_layers) > 8:
    print(f"\n... e altri {len(decoder_layers)-8} layer decoder")

print("\n" + "=" * 80)
print("ðŸŸ¡ ALTRI LAYERS (LM Head, Embeddings):")
print("=" * 80)
for layer in altri:
    status = "âœ… TRAIN" if layer['weight_trainable'] else "ðŸ”’ FROZEN"
    print(f"\n{status} | {layer['nome']}")
    print(f"         Shape: {layer['in_features']} â†’ {layer['out_features']}")
    print(f"         Parametri: {layer['n_params']:,}")
    print(f"         weight.requires_grad = {layer['weight_trainable']}")



In [None]:
def count_parameters(model):
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    return total, trainable

def print_parameters(model, label="model"):
    total, trainable = count_parameters(model)
    print(f"{label}:")
    print(f"  totali      : {total:,}")
    print(f"  trainabili  : {trainable:,}")

print_parameters(model, "Prima di LoRA")

## Applicare LoRA: congelare encoder/shared e wrappare decoder + lm_head

In [None]:
# Congela shared embedding
for p in model.get_input_embeddings().parameters():
    p.requires_grad = False

# Congela encoder
for p in model.encoder.parameters():
    p.requires_grad = False

# Congela i pesi originali Linear nel decoder (ridondante ma ok)
for _, module in model.decoder.named_modules():
    if isinstance(module, nn.Linear):
        module.weight.requires_grad = False
        if module.bias is not None:
            module.bias.requires_grad = False

# Wrappa ricorsivamente le Linear nel decoder con LoRA
def wrap_linear_lora(mod, rank=4):
    for name, submodule in list(mod.named_children()):
        if isinstance(submodule, nn.Linear):
            setattr(mod, name, LoRALayer(submodule, rank=rank))
        else:
            wrap_linear_lora(submodule, rank=rank)

#wrap_linear_lora(model.decoder, rank=4)

# lm_head con LoRA
lm_head = model.lm_head
lora_lm_head = LoRALayer(lm_head, rank=4).to(device)



CHECK model summary

In [None]:
# (qui applichi il tuo LoRALinear al decoder + lm_head)
print_parameters(model, "Dopo applicazione LoRA")


## Ottimizzatore, scheduler e setup training (3 epoche)

In [None]:
from torch.optim import AdamW
from tqdm.auto import tqdm

optimizer = AdamW(
    filter(lambda p: p.requires_grad, model.parameters()),
    lr=1e-3,
)

num_epochs = 3
num_training_steps = num_epochs * len(train_dataloader)
num_warmup_steps = int(0.1 * num_training_steps)

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=num_warmup_steps,
    num_training_steps=num_training_steps,
)
num_training_steps, num_warmup_steps

## Training loop

In [None]:
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    total_tokens = 0

    progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for batch in progress_bar:
        batch = {k: v.to(device) for k, v in batch.items()}

        outputs = model(
            input_ids=batch["input_ids"],
            attention_mask=batch["attention_mask"],
            decoder_input_ids=batch["decoder_input_ids"],
            labels=batch["labels"],
        )
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        with torch.no_grad():
            valid_tokens = (batch["labels"] != -100).sum().item()
            total_loss += loss.item() * max(valid_tokens, 1)
            total_tokens += max(valid_tokens, 1)

        progress_bar.set_postfix({"loss": total_loss / total_tokens})

    epoch_loss = total_loss / total_tokens
    print(f"Epoch {epoch+1}: train loss {epoch_loss:.4f}")

## Salvataggio modello

In [None]:
save_dir = "./summarisation_model_lora"
model.save_pretrained(save_dir)
tokenizer.save_pretrained(save_dir)

## Valutazione: BLEU su un sottoinsieme del test set

In [None]:
import nltk
nltk.download("punkt")
nltk.download("punkt_tab")

def calculate_bleu(reference, hypothesis):
    reference_tokens = [nltk.word_tokenize(reference)]
    hypothesis_tokens = nltk.word_tokenize(hypothesis)
    return sentence_bleu(
        reference_tokens,
        hypothesis_tokens,
        smoothing_function=SmoothingFunction().method4,
    )

def evaluate_bleu(dataloader, num_batches=1):
    model.eval()
    scores = []

    with torch.no_grad():
        for batch_idx, batch in enumerate(dataloader):
            if batch_idx >= num_batches:
                break
            batch = {k: v.to(device) for k, v in batch.items()}

            outputs = model.generate(
                input_ids=batch["input_ids"],
                attention_mask=batch["attention_mask"],
                max_length=128,
                num_beams=4,
                early_stopping=True,
            )

            for i in range(outputs.size(0)):
                reference = tokenizer.decode(
                    batch["labels"][i], skip_special_tokens=True
                )
                hypothesis = tokenizer.decode(
                    outputs[i], skip_special_tokens=True
                )
                scores.append(calculate_bleu(reference, hypothesis))

    return sum(scores) / len(scores) if scores else 0.0

avg_bleu = evaluate_bleu(test_dataloader, num_batches=1)
print(f"Average BLEU score on validation set: {avg_bleu}")
