#Load models

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#Import

In [None]:
import os
import re
import html
import torch
from datasets import load_dataset, DatasetDict
import nltk
nltk.download('punkt_tab')
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


#Summary Process

In [None]:
class SummaryProcessor:
    """
    Strictly handles data processing for the Summarization Task (Model 1).
    """
    def __init__(self):
        # CNN/DailyMail specific cleaning patterns
        self.BOILERPLATE_PATTERNS = [
            r"^Editor.?s Note:.*$",
            r"^READ:\s.*$",
            r"^WATCH:\s.*$",
            r"^\(CNN\)\s*[-–—]?\s*"
        ]
        self.BP_RE = [re.compile(p, re.IGNORECASE | re.MULTILINE) for p in self.BOILERPLATE_PATTERNS]
        self.URL_RE = re.compile(r"https?://\S+")

    def clean_text(self, t: str) -> str:
        """Complex cleaning logic specific to CNN news articles."""
        if not isinstance(t, str): return ""
        t = html.unescape(t)
        t = self.URL_RE.sub("", t)
        for pat in self.BP_RE:
            t = pat.sub("", t)
        t = t.replace("\u00A0", " ")
        t = re.sub(r"\s+", " ", t).strip()
        return t

    def _cleaner_batch(self, batch):
        articles = [self.clean_text(a) for a in batch["article"]]
        highlights = [self.clean_text(h) for h in batch["highlights"]]
        return {"article": articles, "highlights": highlights}

    def get_dataset(self, train_size=15000, val_size=3000, test_size=3000):
        print("Loading CNN/DailyMail dataset...")
        raw = load_dataset("cnn_dailymail", "3.0.0")

        raw_subset = DatasetDict({
            "train": raw["train"].select(range(train_size)),
            "validation": raw["validation"].select(range(val_size)),
            "test": raw["test"].select(range(test_size))
        })

        return raw_subset.map(self._cleaner_batch, batched=True)

#Simplify Process

In [None]:
class SimplifyProcessor:
    """
    Handles data processing for the Simplification Task (WikiLarge).
    Logic extracted from textsimplification(1).py
    """
    def __init__(self):
        # Regex extracted from your simplification model code
        self.URL_RE = re.compile(r"https?://\S+")
        self.source_text_column = "Normal"
        self.target_text_column = "Simple"

    def clean_text(self, t: str) -> str:
        """
        Exact cleaning logic from textsimplification(1).py
        """
        if t is None:  # Add check for None values
            return ""
        t = str(t)  # Ensure text is string

        # 1. Unescape HTML
        t = html.unescape(t)

        # 2. Remove URLs
        t = self.URL_RE.sub("", t)

        # 3. Handle non-breaking spaces
        t = t.replace("\u00A0", " ")

        # 4. Collapse whitespace
        t = re.sub(r"\s+", " ", t).strip()

        return t

    def _cleaner_batch(self, batch):
        """
        Internal helper for dataset mapping.
        Matches columns: 'Normal' (source) and 'Simple' (target).
        """
        inputs = [self.clean_text(a) for a in batch[self.source_text_column]]
        targets = [self.clean_text(h) for h in batch[self.target_text_column]]
        return {self.source_text_column: inputs, self.target_text_column: targets}

#Pipeline Class

In [None]:
class SumSimPipeline:
    def __init__(self, summarizer_path, simplifier_path):
        """
        Args:
            summarizer_path (str): Path to the summarization model folder (e.g. on Drive)
            simplifier_path (str): Path to the simplification model folder
        """
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"--- Initializing SumSimple Pipeline on {self.device.upper()} ---")

        # 1. Initialize Independent Processors
        self.sum_proc = SummaryProcessor()
        self.simp_proc = SimplifyProcessor()

        # --- LOAD SUMMARIZER ---
        print(f"Checking Summarizer path: {summarizer_path}")
        if not os.path.exists(summarizer_path):
            raise FileNotFoundError(f"Could not find summarizer directory at: {summarizer_path}\nMake sure Drive is mounted and the path is correct.")

        print("Loading Summarizer from Drive...")
        try:
            self.sum_tokenizer = AutoTokenizer.from_pretrained(summarizer_path, local_files_only=True)
            self.sum_model = AutoModelForSeq2SeqLM.from_pretrained(summarizer_path, local_files_only=True).to(self.device)
        except Exception as e:
            raise RuntimeError(f"Failed to load Summarizer model: {e}")

        # --- LOAD SIMPLIFIER ---
        # (Assuming you might not have a custom simplifier yet, we handle the default fallback logic)
        print(f"Checking Simplifier path: {simplifier_path}")
        if os.path.exists(simplifier_path):
            print("Loading Simplifier from local/Drive path...")
            try:
                self.simp_tokenizer = AutoTokenizer.from_pretrained(simplifier_path, local_files_only=True)
                self.simp_model = AutoModelForSeq2SeqLM.from_pretrained(simplifier_path, local_files_only=True).to(self.device)
            except Exception as e:
                print(f"Warning: Failed to load local simplifier: {e}")
                self._load_default_simplifier()
        else:
            print(f"Simplifier path not found at '{simplifier_path}'. Loading default 't5-small'...")
            self._load_default_simplifier()

    def _generate_summary(self, raw_text):
        """
        Dedicated generation logic for Model 1 (Summarization).
        Uses: prefix='summarize: ', specific beam width, min_length constraints.
        """
        # 1. Specific Cleaning
        clean_input = self.sum_proc.clean_text(raw_text)

        # 2. Specific Preprocessing (Prefix)
        input_text = "summarize: " + clean_input

        inputs = self.sum_tokenizer(
            input_text,
            return_tensors="pt",
            max_length=512,
            truncation=True
        ).to(self.device)

        # 3. Specific Generation Config
        with torch.no_grad():
            summary_ids = self.sum_model.generate(
                inputs["input_ids"],
                max_length=128,       # Concise summary
                min_length=40,        # Enforce minimum content
                num_beams=4,
                length_penalty=2.0,
                early_stopping=True
            )

        return self.sum_tokenizer.decode(summary_ids[0], skip_special_tokens=True)

    def _generate_simplification(self, text_input):
        """
        Modified to strictly match 'textsimplification(1).py'
        """
        # 1. Specific Cleaning (Uses SimplifyProcessor from processors.py)
        clean_input = self.simp_proc.clean_text(text_input)

        # 2. Specific Preprocessing
        # Matches: prefix="simplify: ", max_source_length=256
        input_text = "simplify: " + clean_input

        inputs = self.simp_tokenizer(
            input_text,
            return_tensors="pt",
            max_length=256,       # <-- UPDATED: Matches max_source_length
            truncation=True
        ).to(self.device)

        # 3. Specific Generation Config
        # Matches: max_target_length=128, gen_num_beams=4
        with torch.no_grad():
            simple_ids = self.simp_model.generate(
                inputs["input_ids"],
                max_length=128,       # <-- UPDATED: Matches max_target_length
                num_beams=4,          # <-- UPDATED: Matches gen_num_beams
                early_stopping=True
            )

        return self.simp_tokenizer.decode(simple_ids[0], skip_special_tokens=True)

    def process(self, prompt_task, article_text):
        task = prompt_task.lower().strip()

        print(">> Running Summarizer...")
        # Step 1: Get the full summary (Decoded string)
        base_summary = self._generate_summary(article_text)

        if "normal summary" in task:
            return base_summary

        # elif "simple summary" in task:
        #     print(">> Running Simplifier (Sentence-by-Sentence)...")

        #     # Step 2: Split the summary paragraph into sentences
        #     sentences = nltk.sent_tokenize(base_summary)

        #     simplified_sentences = []

        #     # Step 3: Loop through each sentence and simplify it individually
        #     for i, sentence in enumerate(sentences):
        #         # We pass the individual sentence to the model
        #         simple_sent = self._generate_simplification(sentence)
        #         simplified_sentences.append(simple_sent)

        #     # Step 4: Join them back together
        #     return " ".join(simplified_sentences)
        elif "simple summary" in task:
            print(">> Running Simple Summary Pipeline (Simplify → Summarize)...")

            sentences = nltk.sent_tokenize(article_text)  # full article, not base_summary

            simplified_sentences = []
            for s in sentences:
                simplified = self._generate_simplification(s)
                simplified_sentences.append(simplified)

            simplified_article = " ".join(simplified_sentences)

            simple_summary = self._generate_summary(simplified_article)

            return simple_summary

        else:
            return "Error: Invalid task."

#Metric(FKGL)

In [None]:
VOWELS = "aeiouy"

def count_syllables(word: str) -> int:
    word = word.lower()
    # Keep only letters
    word = re.sub(r"[^a-z]", "", word)
    if not word:
        return 0

    syllables = 0
    prev_is_vowel = False

    for ch in word:
        is_vowel = ch in VOWELS
        # Count vowel groups, not individual vowels
        if is_vowel and not prev_is_vowel:
            syllables += 1
        prev_is_vowel = is_vowel

    # Handle common silent trailing "e"
    if word.endswith("e") and syllables > 1:
        syllables -= 1

    # At least one syllable for any non-empty word
    return max(syllables, 1)

def fkgl(text: str) -> float:
    text = text.strip()
    if not text:
        return 0.0

    # Sentence segmentation (handles multi-sentence summaries)
    sentences = nltk.sent_tokenize(text)
    num_sent = max(len(sentences), 1)

    # Word extraction (alphabetic "words" only)
    words = re.findall(r"[A-Za-z]+", text)
    num_words = max(len(words), 1)

    # Syllable counting
    syllables = sum(count_syllables(w) for w in words)

    words_per_sent = num_words / num_sent
    syllables_per_word = syllables / num_words

    return 0.39 * words_per_sent + 11.8 * syllables_per_word - 15.59


# QAGS Function Code

In [None]:
# =========================================
# QAGS Faithfulness Metric
# =========================================

import torch
import spacy
import logging
import re
import numpy as np
import collections
import string
from typing import List, Tuple, Optional, Dict
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("OptimizedQAGS")

class OptimizedQAGS:
    def __init__(self, device_id: int = 0, batch_size: int = 8, use_fp16: bool = True):
        self.device = device_id
        self.batch_size = batch_size
        self.device_str = f"cuda:{device_id}" if device_id >= 0 else "cpu"
        self.torch_dtype = torch.float16 if use_fp16 and device_id >= 0 else torch.float32

        # Load spacy
        self.nlp = spacy.load("en_core_web_sm", disable=["lemmatizer", "textcat"])

        # Question generation model
        self.qg_model_name = "valhalla/t5-base-qg-hl"
        self.qg_tokenizer = AutoTokenizer.from_pretrained(self.qg_model_name)
        self.qg_model = AutoModelForSeq2SeqLM.from_pretrained(
            self.qg_model_name,
            torch_dtype=self.torch_dtype
        ).to(self.device_str)
        self.qg_model.eval()

        # Question answering pipeline - use better model for harder questions
        self.qa_pipeline = pipeline(
            "question-answering",
            model="deepset/roberta-base-squad2",
            tokenizer="deepset/roberta-base-squad2",
            device=self.device,
            handle_impossible_answer=True,
            batch_size=self.batch_size
        )

    def extract_candidates(self, text: str, max_candidates: int = 25) -> List[Dict[str, any]]:
        """Extract answer candidates with context for better QG."""
        text = (text or "").strip()
        if not text:
            return []

        doc = self.nlp(text)
        candidates = []
        seen = set()

        # High-priority entities for factual claims
        priority_labels = {
            "PERSON": 10, "ORG": 9, "GPE": 8, "LOC": 7,
            "DATE": 9, "CARDINAL": 8, "MONEY": 8,
            "PERCENT": 8, "EVENT": 8, "WORK_OF_ART": 7,
            "ORDINAL": 7, "TIME": 8, "QUANTITY": 7
        }

        # Extract named entities
        for ent in doc.ents:
            if ent.label_ in priority_labels:
                cand_text = ent.text.strip()
                cand_lower = cand_text.lower()
                if cand_text and cand_lower not in seen and len(cand_text.split()) <= 8:
                    seen.add(cand_lower)
                    candidates.append({
                        'text': cand_text,
                        'start': ent.start_char,
                        'end': ent.end_char,
                        'priority': priority_labels[ent.label_],
                        'type': ent.label_
                    })

        # Add noun chunks for additional coverage
        if len(candidates) < max_candidates:
            stop_words = {"it", "he", "she", "they", "this", "that", "these",
                         "those", "we", "you", "i", "me", "my", "your", "his",
                         "her", "our", "their", "its"}

            for chunk in doc.noun_chunks:
                clean = chunk.text.strip()
                clean_lower = clean.lower()
                words = clean.split()

                # Better filtering
                if (clean and
                    clean_lower not in seen and
                    clean_lower not in stop_words and
                    2 <= len(words) <= 6 and
                    not all(w.lower() in stop_words for w in words)):

                    # Skip if it's mostly punctuation or numbers
                    if sum(c.isalnum() for c in clean) > len(clean) * 0.5:
                        seen.add(clean_lower)
                        candidates.append({
                            'text': clean,
                            'start': chunk.start_char,
                            'end': chunk.end_char,
                            'priority': 4,
                            'type': 'NOUN_CHUNK'
                        })

                        if len(candidates) >= max_candidates:
                            break

        # Sort by priority
        candidates.sort(key=lambda x: x['priority'], reverse=True)
        return candidates[:max_candidates]

    def highlight_answer_in_context(self, context: str, answer: str,
                                     start: int, end: int) -> str:
        """Create highlighted context for QG."""
        before = context[:start]
        highlighted = f"<hl> {answer} <hl>"
        after = context[end:]
        return before + highlighted + after

    def generate_questions_batch(self, summary: str, candidates: List[Dict],
                                 min_questions: int = 10, max_questions: int = 20) -> List[Tuple[str, str]]:
        """Generate high-quality questions."""
        summary = (summary or "").strip()
        if not summary or not candidates:
            return []

        # Prepare inputs
        input_texts = []
        valid_candidates = []

        for cand in candidates[:max_questions * 2]:
            highlighted = self.highlight_answer_in_context(
                summary,
                cand['text'],
                cand['start'],
                cand['end']
            )
            input_texts.append(highlighted)
            valid_candidates.append(cand)

        if not input_texts:
            return []

        # Generate questions
        inputs = self.qg_tokenizer(
            input_texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        ).to(self.device_str)

        with torch.no_grad():
            outputs = self.qg_model.generate(
                input_ids=inputs["input_ids"],
                attention_mask=inputs["attention_mask"],
                max_length=64,
                num_beams=5,  # Increased for better quality
                early_stopping=True,
                no_repeat_ngram_size=3,
                length_penalty=1.2,  # Encourage longer questions
                num_return_sequences=1
            )

        questions = self.qg_tokenizer.batch_decode(outputs, skip_special_tokens=True)

        # Filter and validate
        qa_pairs = []
        seen_questions = set()

        question_starters = {
            "what", "who", "where", "when", "why", "how", "which",
            "whose", "whom", "did", "does", "do", "is", "are", "was",
            "were", "has", "have", "had", "can", "could", "will", "would",
            "should", "may", "might"
        }

        for q, cand_dict in zip(questions, valid_candidates):
            q_clean = q.strip()
            if not q_clean:
                continue

            # Add question mark if missing
            if not q_clean.endswith("?"):
                q_clean = q_clean + "?"

            q_lower = q_clean.lower()
            words = q_clean.split()

            # Must be reasonable length
            if len(words) < 4 or len(words) > 20:
                continue

            # Must start with question word
            first_word = words[0].lower().strip("?.,!")
            if first_word not in question_starters:
                continue

            # Skip degenerate outputs
            if q_lower.strip("?") in {"true", "false", "yes", "no"}:
                continue

            # Check answer visibility in question
            cand_text = cand_dict['text']
            cand_lower = cand_text.lower()
            q_no_punct = re.sub(r'[^\w\s]', ' ', q_lower).strip()

            # For named entities and important facts, allow even if answer appears
            # For generic chunks, be more strict
            if cand_dict['type'] in ['NOUN_CHUNK']:
                if cand_lower in q_no_punct and len(words) < 8:
                    continue

            # Deduplicate similar questions
            q_normalized = ' '.join(sorted(q_lower.split()))
            if q_normalized not in seen_questions:
                seen_questions.add(q_normalized)
                qa_pairs.append((q_clean, cand_text))

                if len(qa_pairs) >= max_questions:
                    break

        return qa_pairs

    def get_answers_batch(self, context: str, questions: List[str]) -> List[Tuple[str, float]]:
        """Get answers with confidence scores."""
        context = (context or "").strip()
        if not context or not questions:
            return [("", 0.0)] * len(questions)

        try:
            preds = self.qa_pipeline(
                question=questions,
                context=[context] * len(questions),
                doc_stride=128,
                max_seq_len=512,
                max_answer_len=50,
                top_k=1
            )
        except Exception as e:
            logger.warning(f"QA pipeline error: {e}")
            return [("", 0.0)] * len(questions)

        if not isinstance(preds, list):
            preds = [preds]

        answers = []
        for p in preds:
            score = float(p.get("score", 0.0))
            ans = (p.get("answer") or "").strip()

            # Return answer with confidence
            if score < 0.01 or not ans or ans.lower() in {"no answer", "unknown", "none"}:
                answers.append(("", 0.0))
            else:
                answers.append((ans, score))

        return answers

    def compute_f1(self, a_gold: str, a_pred: str) -> Optional[float]:
        """Compute F1 with exact match bonus."""
        def normalize(s):
            s = re.sub(r'\b(a|an|the)\b', ' ', s.lower())
            s = ''.join(ch if ch not in string.punctuation else ' ' for ch in s)
            return ' '.join(s.split())

        a_gold = (a_gold or "").strip()
        a_pred = (a_pred or "").strip()

        # Both empty = can't evaluate
        if not a_gold and not a_pred:
            return None

        # One empty = complete mismatch
        if not a_gold or not a_pred:
            return 0.0

        gold_norm = normalize(a_gold)
        pred_norm = normalize(a_pred)

        if not gold_norm and not pred_norm:
            return None

        if not gold_norm or not pred_norm:
            return 0.0

        # Exact match after normalization
        if gold_norm == pred_norm:
            return 1.0

        gold_toks = gold_norm.split()
        pred_toks = pred_norm.split()

        # Token-level F1
        common = collections.Counter(gold_toks) & collections.Counter(pred_toks)
        num_same = sum(common.values())

        if num_same == 0:
            return 0.0

        precision = num_same / len(pred_toks)
        recall = num_same / len(gold_toks)
        f1 = (2 * precision * recall) / (precision + recall)

        return f1

    def calculate_score(self, source: str, summary: str, verbose: bool = False,
                       confidence_weight: bool = True) -> float:
        """
        Calculate QAGS score with optional confidence weighting.

        Args:
            source: Source document
            summary: Summary to evaluate
            verbose: Print detailed output
            confidence_weight: Weight scores by QA confidence (helps detect hallucinations)
        """
        source = (source or "").strip()
        summary = (summary or "").strip()

        if not source or not summary:
            logger.warning("Empty source or summary")
            return 0.0

        # Extract candidates from summary
        candidates = self.extract_candidates(summary, max_candidates=25)

        if verbose:
            print(f"Extracted {len(candidates)} candidates")
            print(f"Top candidates: {[c['text'] for c in candidates[:8]]}\n")

        if not candidates:
            logger.warning("No candidates extracted from summary")
            return 0.0

        # Generate questions
        qa_pairs = self.generate_questions_batch(summary, candidates,
                                                  min_questions=10, max_questions=20)

        if verbose:
            print(f"Generated {len(qa_pairs)} questions\n")

        if not qa_pairs:
            logger.warning("No valid questions generated")
            return 0.0

        # Get answers with confidence
        questions = [q for q, _ in qa_pairs]
        expected_answers = [a for _, a in qa_pairs]

        ans_src = self.get_answers_batch(source, questions)
        ans_sum = self.get_answers_batch(summary, questions)

        # Calculate F1 scores
        scores = []
        confidences = []

        for i, (q, exp_ans, (a_sum, conf_sum), (a_src, conf_src)) in enumerate(
            zip(questions, expected_answers, ans_sum, ans_src)):

            f1 = self.compute_f1(a_src, a_sum)

            if verbose:
                print(f"{i+1}. Q: {q}")
                print(f"   Expected: {exp_ans}")
                print(f"   Summary: '{a_sum}' (conf: {conf_sum:.2f})")
                print(f"   Source:  '{a_src}' (conf: {conf_src:.2f})")

                if f1 is None:
                    print(f"   F1: None (both empty)")
                else:
                    print(f"   F1: {f1:.2f}")

                # Highlight potential hallucinations
                if f1 is not None and f1 < 0.3 and conf_sum > 0.5:
                    print(f"   ⚠️  POTENTIAL HALLUCINATION (low F1, high summary confidence)")
                print()

            if f1 is not None:
                scores.append(f1)
                # Use minimum confidence (if either is low, penalize)
                min_conf = min(conf_src, conf_sum) if confidence_weight else 1.0
                confidences.append(min_conf)

        if not scores:
            logger.warning("No valid F1 scores computed")
            return 0.0

        # Calculate weighted average if using confidence
        if confidence_weight and sum(confidences) > 0:
            # Weight by confidence - low confidence answers get less weight
            weighted_scores = [s * (0.5 + 0.5 * c) for s, c in zip(scores, confidences)]
            final_score = float(np.mean(weighted_scores))
        else:
            final_score = float(np.mean(scores))

        if verbose:
            print(f"{'='*70}")
            print(f"Final QAGS Score: {final_score:.3f}")
            print(f"Valid comparisons: {len(scores)}/{len(qa_pairs)}")
            avg_f1 = np.mean(scores)
            avg_conf = np.mean(confidences) if confidences else 0
            print(f"Average F1: {avg_f1:.3f} | Average Confidence: {avg_conf:.3f}")
            print(f"{'='*70}")

        return final_score

def interpret(s):
    if s > 0.9: return "High Faithfulness"
    if s > 0.6: return "Mixed/Partial"
    return "Low Faithfulness"

qags = OptimizedQAGS(batch_size=16, use_fp16=True)

# FactCC Function Code

In [None]:
# =========================================
# FactCC Faithfulness Metric
# =========================================

import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

LOAD_DIR = "/content/gdrive/MyDrive/faithfulness_model_cnn_dm"  # your path from the notebook

factcc_tokenizer = AutoTokenizer.from_pretrained(LOAD_DIR)
factcc_model = AutoModelForSequenceClassification.from_pretrained(LOAD_DIR).to(device)

def check_faithfulness(
    source_text: str,
    summary_text: str,
    max_length: int = 512,
) -> float:
    """
    Returns P(faithful | source_text, summary_text) in [0, 1].
    """
    enc = factcc_tokenizer(
        source_text,
        summary_text,
        truncation=True,
        max_length=max_length,
        padding="max_length",
        return_tensors="pt",
    ).to(device)

    factcc_model.eval()
    with torch.no_grad():
        logits = factcc_model(**enc).logits  # [1, 2]

    probs = F.softmax(logits, dim=-1).squeeze(0)  # [2]
    faithful_prob = probs[1].item()
    return faithful_prob


#Testing

In [None]:
if __name__ == "__main__":
    # 1. Setup Pipeline
    summarizer_path = "/content/drive/MyDrive/MSCS Semester 3/NLP/SumSim Models/t5-summarizer"
    simplifier_path = "/content/drive/MyDrive/MSCS Semester 3/NLP/SumSim Models/t5-simplifier"
    pipeline = SumSimPipeline(
        summarizer_path=summarizer_path,
        simplifier_path=simplifier_path
    )

    # 2. Load Dataset
    dataset_config = "3.0.0"
    print(f"\nLoading CNN/DailyMail ({dataset_config}) to fetch test examples...")
    raw_data = load_dataset("cnn_dailymail", dataset_config)

    # 3. Select Samples AFTER the training/validation set
    test_size = 3000
    num_samples = 10

    print(f"Model code used indices 0-{test_size}. Selecting indices {test_size}-{test_size + num_samples}...")

    # We define the range starting from 3000 up to 3010 (10 samples)
    test_subset = raw_data["test"].select(range(test_size, test_size + num_samples))

    # 4. Iterate and Test
    print("\n" + "="*60)
    print("       STARTING PIPELINE TEST RUN (SAMPLES 3000-3010)")
    print("="*60 + "\n")

    for i, item in enumerate(test_subset):
        article = item["article"]
        reference = item["highlights"]
        actual_idx = test_size + i

        print(article + "\n")
        print(f"--- [Dataset Index: {actual_idx}] (Sample {i+1}/{num_samples}) ---")
        print(f"Original Article Length: {len(article.split())} words")

        # TASK A: Normal Summary
        print(f"\n>> Processing: Normal Summary...")
        normal_out = pipeline.process("task: normal summary", article)

        # TASK B: Simple Summary
        print(f">> Processing: Simple Summary...")
        simple_out = pipeline.process("task: simple summary", article)

        # Display Results
        print("-" * 20)
        print(f"\n[NORMAL SUMMARY]:\n{normal_out}")
        print(f"\n[SIMPLE SUMMARY]:\n{simple_out}")

        # Clean up reference for printing (remove newlines)
        clean_ref = reference.replace("\n", " ")
        print(f"\n[REFERENCE HIGHLIGHTS]:\n{clean_ref}")

        # Display FKGL
        fk_normal = fkgl(normal_out)
        fk_simple = fkgl(simple_out)
        fk_ref_hl = fkgl(clean_ref)
        print(f"\n[FKGL - NORMAL SUMMARY]: {fk_normal:.2f}")
        print(f"[FKGL - SIMPLE SUMMARY]: {fk_simple:.2f}")
        print(f"[FKGL - REFERENCE HIGHLIGHTS]: {fk_ref_hl:.2f}")

        # =========================================
        # QAGS scores (source vs summaries)
        # =========================================
        try:
            qags_normal = qags.calculate_score(article, normal_out, verbose=False)
            qags_simple = qags.calculate_score(article, simple_out, verbose=False)
        except Exception as e:
            print(f"[QAGS] Error computing scores: {e}")
            qags_normal = qags_simple = float("nan")

        print(f"\n[QAGS - NORMAL SUMMARY]: {qags_normal:.4f} ({interpret(qags_normal)})")
        print(f"[QAGS - SIMPLE SUMMARY]: {qags_simple:.4f} ({interpret(qags_simple)})")

        # =========================================
        # FactCC scores (source vs summaries)
        # =========================================
        try:
            factcc_normal = check_faithfulness(article, normal_out)
            factcc_simple = check_faithfulness(article, simple_out)
            factcc_ref = check_faithfulness(article, clean_ref)
        except Exception as e:
            print(f"[FactCC] Error computing scores: {e}")
            factcc_normal = factcc_simple = factcc_ref = float("nan")

        print(f"\n[FactCC - NORMAL SUMMARY]: {factcc_normal:.4f}")
        print(f"[FactCC - SIMPLE SUMMARY]: {factcc_simple:.4f}")
        print(f"[FactCC - REFERENCE HIGHLIGHTS]: {factcc_ref:.4f}")

        print("\n" + "="*60 + "\n")

--- Initializing SumSimple Pipeline on CPU ---
Checking Summarizer path: /content/drive/MyDrive/MSCS Semester 3/NLP/SumSim Models/t5-summarizer
Loading Summarizer from Drive...
Checking Simplifier path: /content/drive/MyDrive/MSCS Semester 3/NLP/SumSim Models/t5-simplifier
Loading Simplifier from local/Drive path...

Loading CNN/DailyMail (3.0.0) to fetch test examples...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

3.0.0/train-00000-of-00003.parquet:   0%|          | 0.00/257M [00:00<?, ?B/s]

3.0.0/train-00001-of-00003.parquet:   0%|          | 0.00/257M [00:00<?, ?B/s]

3.0.0/train-00002-of-00003.parquet:   0%|          | 0.00/259M [00:00<?, ?B/s]

3.0.0/validation-00000-of-00001.parquet:   0%|          | 0.00/34.7M [00:00<?, ?B/s]

3.0.0/test-00000-of-00001.parquet:   0%|          | 0.00/30.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/287113 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/13368 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/11490 [00:00<?, ? examples/s]

Model code used indices 0-3000. Selecting indices 3000-3010...

       STARTING PIPELINE TEST RUN (SAMPLES 3000-3010)

Hannover fired coach Tayfun Korkut on Monday after a run of 13 games without a win left the club close to the Bundesliga's relegation zone. Michael Frontzek has been named as his successor, signing a contract valid for the remaining five matches of the season. The 51-year-old Frontzek worked as an assistant at Hannover from 2004 to 2005. Tayfun Korkut has been sacked by Hannover after a long winless run saw the club fall down the league table . Michael Frontzek will take over at Hannover and has five games left this season to avoid relegation . He is returning to the Bundesliga as coach for the first time since 2011, when he was fired by Borussia Moenchengladbach. Korkut's departure follows a 4-0 loss to Bayer Leverkusen on Saturday. Hannover are now only one place and two points above Paderborn, who currently occupies the relegation play-off place. The Bundesliga's bo