<a href="https://colab.research.google.com/github/agdelfini/FYP-001333164-T5vsDNN/blob/main/T5_2_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from datasets import load_dataset
dataset = load_dataset("cnn_dailymail", "3.0.0")


In [None]:
!pip install lightning

In [None]:
!pip install transformers

In [None]:
!pip install rouge_score

import pandas as pd
import numpy as np
from rouge_score import rouge_scorer
from tqdm.auto import tqdm
import torch

In [None]:
import json
import pandas as pd
import numpy as np
import torch
from pathlib import Path
import lightning as pl

In [None]:
from sklearn.model_selection import train_test_split
from termcolor import colored
import textwrap

In [None]:
from torch.utils.data import Dataset, DataLoader
from lightning.pytorch import Trainer
from lightning.pytorch.callbacks import ModelCheckpoint

from lightning.pytorch.loggers import TensorBoardLogger
from transformers import T5ForConditionalGeneration, T5TokenizerFast as T5Tokenizer
from torch.optim import AdamW
from tqdm.auto import tqdm

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt


In [None]:
pl.seed_everything(1234)

In [None]:
df = pd.read_csv("/content/news_summary.csv", encoding="latin-1")

In [None]:
df.shape

In [None]:
train_df, test_df = train_test_split(df, test_size=0.1)
print(f"Shape of the Train Set: {train_df.shape}\nShape of the Test Set: {test_df.shape}")

In [None]:
class NewsDataset(Dataset):
    def __init__(self, data, tokenizer, text_max_token_len=512, summary_max_token_len=128):
        """
        A dataset that represents news articles and their respective summaries.

        Args:
        - data (pd.DataFrame): The data that contains the news articles and their summaries.
        - tokenizer (transformers.tokenization_*) : The tokenizer used to tokenize the text and summary.
        - text_max_token_len (int, optional): The maximum length of the text in terms of tokens. Defaults to 512.
        - summary_max_token_len (int, optional): The maximum length of the summary in terms of tokens. Defaults to 128.
        """
        self.tokenizer = tokenizer
        self.data = data
        self.text_max_token_len = text_max_token_len
        self.summary_max_token_len = summary_max_token_len

    def __len__(self):
        """
        Returns:
        - The number of samples in the dataset.
        """
        return len(self.data)

    def __getitem__(self, index):
        """
        Get a sample from the dataset.

        Args:
        - index (int): The index of the sample to get.

        Returns:
        - A dictionary that contains the following:
            - text (str): The original text of the news article.
            - summary (str): The summary of the news article.
            - text_input_ids (torch.Tensor): The input IDs of the text after tokenization.
            - text_attention_mask (torch.Tensor): The attention mask of the text after tokenization.
            - labels (torch.Tensor): The input IDs of the summary after tokenization.
            - labels_attention_mask (torch.Tensor): The attention mask of the summary after tokenization.
        """
        data_row = self.data.iloc[index]
        text = "summarize: " + str(data_row["ctext"])

        # Encode the text
        text_encoding = self.tokenizer(
            text,
            max_length=self.text_max_token_len,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors="pt"
        )

        # Encode the summary
        summary_encoding = self.tokenizer(
            str(data_row["text"]),
            max_length=self.summary_max_token_len,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            add_special_tokens=True,
            return_tensors="pt"
        )

        # Modify the labels so that the model knows which tokens to predict
        labels = summary_encoding['input_ids']
        labels[labels == 0] = -100

        return {
            'text': text,
            'summary': str(data_row['text']),
            'text_input_ids': text_encoding['input_ids'].flatten(),
            'text_attention_mask': text_encoding['attention_mask'].flatten(),
            'labels': labels.flatten(),
            'labels_attention_mask': summary_encoding["attention_mask"].flatten()
        }

In [None]:
class NewsDataModule(pl.LightningDataModule):
    def __init__(self,
                 train_df,
                 test_df,
                 tokenizer,
                 batch_size=8,
                 text_max_token_len=152,
                 summary_max_token_len=128):
        """
        Initializes the NewsDataModule.

        Args:
        - train_df (pandas.DataFrame): the training dataset
        - test_df (pandas.DataFrame): the testing dataset
        - tokenizer (transformers.PreTrainedTokenizer): the tokenizer to be used
        - batch_size (int): the batch size
        - text_max_token_len (int): the maximum number of tokens for the text
        - summary_max_token_len (int): the maximum number of tokens for the summary
        """
        super().__init__()

        self.train_df = train_df
        self.test_df = test_df

        self.batch_size = batch_size
        self.tokenizer = tokenizer
        self.text_max_token_len = text_max_token_len
        self.summary_max_token_len = summary_max_token_len

    def setup(self, stage=None):
        """
        Sets up the dataset.
        """
        self.train_dataset = NewsDataset(
            self.train_df,
            self.tokenizer,
            self.text_max_token_len,
            self.summary_max_token_len)

        self.test_dataset = NewsDataset(
            self.test_df,
            self.tokenizer,
            self.text_max_token_len,
            self.summary_max_token_len)

    def train_dataloader(self):
        """
        Returns the DataLoader for the training set.
        """
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True
        )

    def test_dataloader(self):
        """
        Returns the DataLoader for the testing set.
        """
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False
        )

    def val_dataloader(self):
        """
        Returns the DataLoader for the validation set, which is the same as the testing set.
        """
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False
        )

In [None]:
MODEL_NAME = "t5-base"
tokenizer = T5Tokenizer.from_pretrained(MODEL_NAME)

In [None]:
text_token_counts = [len(tokenizer.encode(str(row["ctext"]))) for _, row in train_df.iterrows()]
summary_token_counts = [len(tokenizer.encode(str(row["text"]))) for _, row in train_df.iterrows()]


In [None]:
N_EPOCHS = 3
BATCH_SIZE=8

data_module = NewsDataModule(
    train_df,
    test_df,
    tokenizer,
    batch_size=BATCH_SIZE

)

Model

In [None]:
from rouge_score import rouge_scorer
from nltk import sent_tokenize
import nltk

# Ensure nltk resources are downloaded
nltk.download('punkt')

def calculate_novelty_score(source_text, generated_summary, n_sentences_bias=3):
    """
    Calculates Novelty Score based on how different the summary is
    from the lead sentences (Lead-3 bias).
    """
    # 1. Extract the "Lead-3" (Introduction)
    sentences = sent_tokenize(source_text)
    lead_text = " ".join(sentences[:n_sentences_bias])

    # 2. Calculate n-gram overlap (using ROUGE-2 precision as a proxy for overlap)
    scorer = rouge_scorer.RougeScorer(['rouge2'], use_stemmer=True)
    scores = scorer.score(lead_text, generated_summary)

    # Precision = How much of the summary appears in the lead text?
    overlap_score = scores['rouge2'].precision

    # 3. Novelty is the inverse of overlap
    # If overlap is 1.0 (100%), Novelty is 0.0.
    # If overlap is 0.0, Novelty is 1.0.
    novelty_score = 1.0 - overlap_score

    return novelty_score

In [None]:
class SummaryModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME, return_dict=True)

    def configure_optimizers(self):
        return Adafactor(
            self.parameters(),
            lr=1e-3,
            eps=(1e-30, 1e-3),
            clip_threshold=1.0,
            decay_rate=-0.8,
            beta1=None,
            weight_decay=0.0,
            relative_step=False,
            scale_parameter=False,
            warmup_init=False
        )

    def forward(self, input_ids, attention_mask, decoder_attention_mask, labels=None):
        output = self.model(
            input_ids,
            attention_mask=attention_mask,
            labels=labels,
            decoder_attention_mask=decoder_attention_mask
        )
        return output.loss, output.logits

    def shared_step(self, batch, batch_idx, stage):
        input_ids = batch['text_input_ids']
        attention_mask = batch["text_attention_mask"]
        labels = batch["labels"]
        labels_attention_mask = batch["labels_attention_mask"]

        loss, _ = self(
            input_ids=input_ids,
            attention_mask=attention_mask,
            decoder_attention_mask=labels_attention_mask,
            labels=labels
        )

        self.log(f"{stage}_loss", loss, prog_bar=True, logger=True)
        return loss

    def training_step(self, batch, batch_idx):
        return self.shared_step(batch, batch_idx, 'train')

    def validation_step(self, batch, batch_idx):
        return self.shared_step(batch, batch_idx, 'val')

    def test_step(self, batch, batch_idx):
        return self.shared_step(batch, batch_idx, 'test')

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=0.0001)


In [None]:
model_1 = SummaryModel()

In [None]:
callbacks = ModelCheckpoint(
    dirpath="/kaggle/working/checkpoints",
    filename="base-checkpoint",
    save_top_k=1,
    verbose=True,
    monitor="val_loss",
    mode='min'
)

logger = TensorBoardLogger("lightning_logs", name="news_summary")

trainer= Trainer(
    logger=logger,
    callbacks=callbacks,
    max_epochs=N_EPOCHS,
    accelerator='gpu',
    devices=1
)

In [None]:
trainer.fit(model_1, data_module)

In [None]:
best_model = SummaryModel.load_from_checkpoint(
    trainer.checkpoint_callback.best_model_path
)
best_model.freeze()

In [None]:
def encode_text(text):
    # Encode the text using the tokenizer
    encoding = tokenizer.encode_plus(
        text,
        max_length=512,
        padding="max_length",
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )
    # Move tensors to the GPU
    input_ids = encoding["input_ids"].to(best_model.device)
    attention_mask = encoding["attention_mask"].to(best_model.device)
    return input_ids, attention_mask

def generate_summary(input_ids, attention_mask):
    generated_ids = best_model.model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=150,

        # --- RETURN TO ACCURACY SETTINGS ---
        num_beams=4,             # Beam Search is smarter/more accurate than Sampling
        do_sample=False,         # Turn off random sampling to stop hallucinations

        # --- FORCE NOVELTY WITHOUT RANDOMNESS ---
        no_repeat_ngram_size=3,  # STRICT: The model cannot repeat any 3-word phrase it already wrote.
        repetition_penalty=2.0,  # Punish repeating words generally

        length_penalty=1.0,      # Ensure it doesn't cut off too short
        early_stopping=True
    )
    return generated_ids

def decode_summary(generated_ids):
    # Decode the generated summary
    summary = [tokenizer.decode(gen_id, skip_special_tokens=True, clean_up_tokenization_spaces=True)
               for gen_id in generated_ids]
    return "".join(summary)

def summarize(text):
    input_ids, attention_mask = encode_text(text)
    generated_ids = generate_summary(input_ids, attention_mask)
    summary = decode_summary(generated_ids)
    return summary

In [None]:
sample_row = test_df.iloc[150]
text = sample_row["text"]
model_summary = summarize(text)

In [None]:
text

In [None]:
sample_row["text"]

In [None]:
model_summary

In [None]:
import pandas as pd
import numpy as np
from rouge_score import rouge_scorer
from tqdm.auto import tqdm

# 1. Setup the ROUGE Scorer
# We use 'rougeL' for the Novelty score and all three for the general evaluation
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)

def calculate_metrics_for_row(source_text, reference_summary, generated_summary):
    """
    Calculates Standard ROUGE (Gen vs Ref) and Novelty (Gen vs Source).
    """
    # --- Part A: Standard ROUGE Metrics (Success vs Reference) ---
    # Compare Generated Summary vs Human Reference Summary
    # We take the F-measure (fmeasure) as the standard aggregate score
    scores = scorer.score(reference_summary, generated_summary)
    rouge1 = scores['rouge1'].fmeasure
    rouge2 = scores['rouge2'].fmeasure
    rougel = scores['rougeL'].fmeasure

    # --- Part B: Novelty Score (N) ---
    # Definition: N = 1 - Granularity (G)
    # Granularity = ROUGE-L(Summary, Full Text)
    # We use Precision here because we want to know how much of the Summary
    # is composed of content found directly in the Full Text.
    granularity_scores = scorer.score(source_text, generated_summary)

    # G: How much of the summary overlaps with the text? (Precision)
    granularity = granularity_scores['rougeL'].precision

    # N: Non-redundant information capacity
    novelty = 1.0 - granularity

    return {
        "rouge1": rouge1,
        "rouge2": rouge2,
        "rougeL": rougel,
        "novelty": novelty,
        "granularity": granularity
    }

def evaluate_model(model, tokenizer, test_data, num_samples=50):
    """
    Runs inference on a subset of the test data and calculates metrics.
    """
    model.eval()

    metrics_list = []

    # Select a subset to save time (Generation is slow!)
    # Change num_samples=len(test_data) for full evaluation
    test_subset = test_data.iloc[:num_samples]

    print(f"Starting evaluation on {len(test_subset)} samples...")

    for index, row in tqdm(test_subset.iterrows(), total=len(test_subset)):
        source_text = str(row['ctext'])
        reference_summary = str(row['text'])

        # 1. Generate Summary using your existing summarize function
        try:
            generated_summary = summarize(source_text)
        except Exception as e:
            print(f"Error generating summary for index {index}: {e}")
            continue

        # 2. Calculate Metrics
        metrics = calculate_metrics_for_row(source_text, reference_summary, generated_summary)
        metrics_list.append(metrics)

    return pd.DataFrame(metrics_list)

# --- Run the Evaluation ---
# We limit to 50 samples for speed. Increase this number for better accuracy.
results_df = evaluate_model(best_model, tokenizer, test_df, num_samples=50)

# --- Calculate and Print Averages ---
avg_results = results_df.mean()

print("\n" + "="*40)
print("FINAL EVALUATION METRICS")
print("="*40)
print(f"ROUGE-1 (F1)   : {avg_results['rouge1']:.4f}")
print(f"ROUGE-2 (F1)   : {avg_results['rouge2']:.4f}")
print(f"ROUGE-L (F1)   : {avg_results['rougeL']:.4f}")
print("-" * 40)
print(f"Granularity (G) : {avg_results['granularity']:.4f}")
print(f"Novelty (N)     : {avg_results['novelty']:.4f}")
print("="*40)


# --- Visual Comparison of a Specific Example ---
print("\n" + "="*40)
print("EXAMPLE COMPARISON")
print("="*40)

# Let's take the first item from our test set
sample_row = test_df.iloc[0]
sample_source = str(sample_row['ctext'])
sample_ref = str(sample_row['text'])

# Generate T5 Summary
sample_gen = summarize(sample_source)

# Calculate metrics for this specific example
sample_metrics = calculate_metrics_for_row(sample_source, sample_ref, sample_gen)

print(f"**Original Article:**\n")
print("-" * 20)
print(f"**Original Reference Summary (Human):**\n{sample_ref}\n")
print("-" * 20)
print(f"**T5 Generated Summary (Model):**\n{sample_gen}\n")
print("-" * 20)
print(f"**Metrics for this example:**")
print(f"Novelty: {sample_metrics['novelty']:.4f} | ROUGE-1: {sample_metrics['rouge1']:.4f}")
print("="*40)