<a href="https://colab.research.google.com/github/DamlaSuYayla/Data-Analysis-01/blob/main/PdfSummurizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# libraries
%pip install -q transformers datasets evaluate rouge_score sentencepiece pdfplumber PyPDF2 gradio accelerate einops PyMuPDF gtts nltk evaluate

  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.8/67.8 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.0/60.0 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m51.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m18.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.1/24.1 MB[0m [31m86.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.2/98.2 kB[0m [31m12.0 MB/s[0m eta [36m0:00:00

In [2]:
# Consolidated imports
import os
import re
import warnings
import subprocess
import glob
import json
from pathlib import Path
from datetime import datetime
from typing import List, Optional, Dict
import numpy as np

# default model name
model_name = 'allenai/led-base-16384'

# Core ML / NLP libraries
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from datasets import Dataset
import evaluate
import gradio as gr

# PDF and audio helpers
try:
    import fitz  # PyMuPDF
    FITZ_AVAILABLE = True
except Exception:
    FITZ_AVAILABLE = False

try:
    import pdfplumber
    PDFPLUMBER_AVAILABLE = True
except Exception:
    PDFPLUMBER_AVAILABLE = False

try:
    import PyPDF2
    PYPDF2_AVAILABLE = True
except Exception:
    PYPDF2_AVAILABLE = False

try:
    from gtts import gTTS
    GTTS_AVAILABLE = True
except Exception:
    GTTS_AVAILABLE = False

# NLTK setup
import nltk
try:
    nltk.data.find('tokenizers/punkt')
except Exception:
    nltk.download('punkt', quiet=True)

print(f"Imports ready.")

Imports ready.


In [3]:
# GPU check
print('CUDA available:', torch.cuda.is_available())
try:
    device_name = torch.cuda.get_device_name(0)
    print('Device:', device_name)
    import subprocess
    subprocess.run(['nvidia-smi'])
    if 'T4' not in device_name.upper():
        print('\nWarning: This GPU is not a T4.')
except Exception as e:
    print('GPU query error:', e)


CUDA available: True
Device: Tesla T4


In [4]:
# training_data directories
try:
    from google.colab import drive
    IN_COLAB = True
except Exception:
    IN_COLAB = False

if IN_COLAB:
    try:
        drive.mount('/content/drive', force_remount=False)
        print('Google Drive mounted at /content/drive')
    except Exception as e:
        print('Drive mount failed:', e)

os.makedirs('/content/training_data/pdfs', exist_ok=True)
os.makedirs('/content/training_data/summaries', exist_ok=True)

print('Directories ready')

Mounted at /content/drive
Google Drive mounted at /content/drive
Directories ready


In [5]:
# PDF extraction, cleaner, chunker, summarizer, and TTS

warnings.filterwarnings('ignore')

# sentence tokenizer with fallback
def sent_tokenize(text: str):
    try:
        return nltk.sent_tokenize(text)
    except Exception:
        return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()]

# Config
import torch
DEVICE = 'cuda' if (torch.cuda.is_available() if 'torch' in globals() else False) else 'cpu'
# Use the default model_name defined in the imports cell
MODEL_NAME = model_name
MAX_TOKENS = 4096
OVERLAP = 0.15

# Data cleaner
class DataCleaner:
    PATTERNS = {
        'refs': [r'\[[\d,\s-]+\]', r'\([\w\s]+,\s*\d{4}\)', r'(?:References|Bibliography)[\s\S]*$'],
        'tables': [r'(?:Table)\s+\d+[.:][^\n]*', r'[-|+]{3,}'],
        'figures': [r'(?:Figure|Fig)\s+\d+[.:][^\n]*'],
        'footnotes': [r'^\s*\d+\s+[A-Z].*$', r'^\s*\*+\s*.+$'],
        'code': [r'```[\s\S]*?```', r'(?:def|class|import)\s+\w+']
    }

    def clean(self, text: str) -> str:
        if not text:
            return ''
        for patterns in self.PATTERNS.values():
            for p in patterns:
                text = re.sub(p, '', text, flags=re.MULTILINE | re.IGNORECASE)
        text = re.sub(r'(\w)-\s*\n\s*(\w)', r'\1\2', text)
        text = re.sub(r' +', ' ', text)
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE)
        sentences = sent_tokenize(text)
        return ' '.join(s for s in sentences if len(s.split()) >= 5)

# Chunker
class Chunker:
    def __init__(self, tokenizer=None):
        self.tokenizer = tokenizer

    def chunk(self, text: str) -> List[str]:
        if not self.tokenizer:
            return [text]
        sentences = sent_tokenize(text)
        if not sentences:
            return []

        chunks, current = [], [sentences[0]]
        for i in range(1, len(sentences)):
            chunk_text = ' '.join(current + [sentences[i]])
            tokens = len(self.tokenizer.encode(chunk_text, add_special_tokens=False))
            if tokens <= MAX_TOKENS:
                current.append(sentences[i])
            else:
                chunks.append(' '.join(current))
                current = [sentences[i]]
        if current:
            chunks.append(' '.join(current))

        if len(chunks) > 1:
            for i in range(1, len(chunks)):
                prev_words = chunks[i-1].split()
                overlap_words = prev_words[-max(1, int(len(prev_words)*OVERLAP)):]
                chunks[i] = ' '.join(overlap_words) + ' ' + chunks[i]
        return chunks

# Summarizer
class Summarizer:
    def __init__(self, model_path=MODEL_NAME):
        self.model_path = model_path
        self.model = None
        self.tokenizer = None
        self.cleaner = DataCleaner()
        self.chunker = None
        self.max_chunks = 20
        self.max_depth = 3

    def _load(self):
        if self.model is None:
            from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
            self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_path).to(DEVICE).eval()
            self.chunker = Chunker(self.tokenizer)

    def _clean_summary(self, text: str) -> str:
        text = re.sub(r'\[\d+[\d,\s-]*\]', '', text)
        text = re.sub(r'\([A-Za-z\s]+,?\s*\d{4}\)', '', text)
        text = re.sub(r'\s+', ' ', text).strip()
        text = re.sub(r'^[.,;:!?\-–—\s]+', '', text)
        if text and not text.endswith(('.', '!', '?')):
            sentences = text.rsplit('.', 1)
            if len(sentences) > 1:
                text = sentences[0] + '.'
        return text

    def _summarize_chunk(self, text: str) -> str:
        self._load()
        inputs = self.tokenizer(text, max_length=MAX_TOKENS, truncation=True, return_tensors='pt').to(DEVICE)
        import torch
        global_attention_mask = torch.zeros_like(inputs['input_ids'])
        global_attention_mask[:, 0] = 1
        with torch.no_grad():
            out = self.model.generate(
                **inputs,
                global_attention_mask=global_attention_mask,
                max_length=400,
                min_length=50,
                num_beams=2,
                length_penalty=1.5,
                early_stopping=True,
                no_repeat_ngram_size=3
            )
        return self._clean_summary(self.tokenizer.decode(out[0], skip_special_tokens=True))

    def summarize(self, text: str, depth: int = 0) -> str:
        text = self.cleaner.clean(text)
        if not text:
            return 'Text could not be extracted.'
        self._load()
        tokens = len(self.tokenizer.encode(text))
        if tokens <= MAX_TOKENS:
            return self._summarize_chunk(text)
        if depth >= self.max_depth:
            return self._summarize_chunk(text[:MAX_TOKENS * 4])
        chunks = self.chunker.chunk(text)
        if len(chunks) > self.max_chunks:
            step = len(chunks) // self.max_chunks
            chunks = chunks[::step][:self.max_chunks]
        print(f'Summarizing {len(chunks)} chunks...')
        summaries = [self._summarize_chunk(c) for c in chunks]
        combined = ' '.join(summaries)
        if len(self.tokenizer.encode(combined)) > MAX_TOKENS:
            return self.summarize(combined, depth + 1)
        return self._summarize_chunk(combined)

# PDF extraction helper
def extract_pdf(path: str) -> str:
    try:
        if FITZ_AVAILABLE:
            doc = fitz.open(path)
            text = '\n'.join(p.get_text() for p in doc)
            doc.close()
            return text
    except Exception:
        pass
    # fallback to pdfplumber
    try:
        import pdfplumber
        texts = []
        with pdfplumber.open(path) as pdf:
            for p in pdf.pages:
                texts.append(p.extract_text() or '')
        return '\n'.join(texts)
    except Exception:
        try:
            import PyPDF2
            reader = PyPDF2.PdfReader(path)
            texts = []
            for p in range(len(reader.pages)):
                try:
                    texts.append(reader.pages[p].extract_text() or '')
                except:
                    texts.append('')
            return '\n'.join(texts)
        except Exception as e:
            print('PDF reading error:', e)
            return ''

# TTS helper
def text_to_speech(text: str) -> Optional[str]:
    if not text:
        return None
    if not GTTS_AVAILABLE:
        print('gTTS not available; install gtts to enable audio output.')
        return None
    # Ensure the output directory exists
    os.makedirs('/content/outputs', exist_ok=True)
    out_path = f"/content/outputs/audio_{datetime.now():%Y%m%d_%H%M%S}.mp3"
    gTTS(text=text, lang='en').save(out_path)
    return out_path

In [7]:
# Building training data: PDF -> (input, summary)

pdf_paths = sorted(glob.glob('/content/training_data/pdfs/*.pdf'))
inputs = []
targets = []

for p in pdf_paths:
    base = os.path.splitext(os.path.basename(p))[0]
    summary_path = f'/content/training_data/summaries/{base}.txt'
    if os.path.exists(summary_path):
        txt = extract_pdf(p)
        with open(summary_path, 'r', encoding='utf-8') as f:
            summary = f.read().strip()
        if len(txt.strip()) > 200 and len(summary) > 20:
            inputs.append(txt)
            targets.append(summary)
        else:
            print(f'Skipped (too short): {p}')
    else:
        print(f'Missing summary for {p}')

print('Total pairs:', len(inputs))

# Example
if len(inputs) > 0:
    with open('/content/sample_pair.jsonl','w',encoding='utf-8') as f:
        json.dump({'text': inputs[0], 'summary': targets[0]}, f, ensure_ascii=False, indent=2)
    print('Sample saved to /content/sample_pair.jsonl')

Total pairs: 8
Sample saved to /content/sample_pair.jsonl


In [8]:
# Tokenization and sliding-window long text chunking

model_name = 'allenai/led-base-16384'
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
max_input_length = 16384
stride = 1024  # overlap
max_target_length = 512

all_input_ids = []
all_attention_masks = []
all_global_att = []
all_labels = []

for doc, summary in zip(inputs, targets):
    enc = tokenizer(doc, max_length=max_input_length, truncation=True, return_overflowing_tokens=True, stride=stride)
    input_id_chunks = enc['input_ids']
    attention_chunks = enc['attention_mask']
    for ids, att in zip(input_id_chunks, attention_chunks):
        # first token of chunk
        gatt = [0] * len(ids)
        if len(gatt) > 0:
            gatt[0] = 1
        all_input_ids.append(ids)
        all_attention_masks.append(att)
        all_global_att.append(gatt)
        label_ids = tokenizer(summary, truncation=True, max_length=max_target_length).input_ids
        all_labels.append(label_ids)

print('Total chunks:', len(all_input_ids))

tokenizer_config.json:   0%|          | 0.00/27.0 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

Total chunks: 10


In [9]:
# Creating Dataset and train/test split

data_dict = {
    'input_ids': all_input_ids,
    'attention_mask': all_attention_masks,
    'global_attention_mask': all_global_att,
    'labels': all_labels
}
raw_ds = Dataset.from_dict(data_dict)
raw_ds = raw_ds.train_test_split(test_size=0.1)
print(raw_ds)


DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'global_attention_mask', 'labels'],
        num_rows: 9
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'global_attention_mask', 'labels'],
        num_rows: 1
    })
})


In [10]:
# Data collator and metric function

rouge = evaluate.load('rouge')

def compute_metrics(eval_pred):
    preds, labels = eval_pred
    if isinstance(preds, tuple):
        preds = preds[0]
    decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    result = rouge.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    # simplify floats
    return {k: round(float(v.mid.fmeasure) if hasattr(v, 'mid') else float(v), 4) for k,v in result.items()}


def collate_fn(batch: List[Dict]):
    input_ids = [torch.tensor(x['input_ids'], dtype=torch.long) for x in batch]
    attention_mask = [torch.tensor(x['attention_mask'], dtype=torch.long) for x in batch]
    global_att = [torch.tensor(x['global_attention_mask'], dtype=torch.long) for x in batch]
    labels = [torch.tensor(x['labels'], dtype=torch.long) for x in batch]

    input_ids = torch.nn.utils.rnn.pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask = torch.nn.utils.rnn.pad_sequence(attention_mask, batch_first=True, padding_value=0)
    global_att = torch.nn.utils.rnn.pad_sequence(global_att, batch_first=True, padding_value=0)
    labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=-100)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_mask,
        'global_attention_mask': global_att,
        'labels': labels
    }


Downloading builder script: 0.00B [00:00, ?B/s]

In [11]:
# Loading model and training settings

from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments


model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
model.gradient_checkpointing_enable()

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

training_args = Seq2SeqTrainingArguments(
    output_dir='/content/led_finetuned',
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    gradient_accumulation_steps=8,
    fp16=True,
    eval_strategy='steps',
    eval_steps=500,
    save_steps=500,
    save_total_limit=3,
    num_train_epochs=3,
    learning_rate=3e-5,
    predict_with_generate=True,
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model='rouge1'
)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=raw_ds['train'],
    eval_dataset=raw_ds['test'],
    tokenizer=tokenizer,
    data_collator=collate_fn,
    compute_metrics=compute_metrics
)

print('Trainer ready.')

pytorch_model.bin:   0%|          | 0.00/648M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/168 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/648M [00:00<?, ?B/s]

Trainer ready.


In [12]:
import os
from transformers.integrations import WandbCallback

os.environ["WANDB_DISABLED"] = "true"

if 'trainer' in globals():
    trainer.args.report_to = []
    try:
        trainer.remove_callback(WandbCallback)
    except Exception:
        pass

# Run training
if len(raw_ds['train']) > 0:
    print("Starting training...")
    trainer.train()

    # Example: short test run
    trainer.args.num_train_epochs = 1
    trainer.train()
    trainer.save_model('/content/drive/MyDrive/summarizer-test')
else:
    print("Training skipped: The training dataset is empty (0 samples). Please upload PDFs to '/content/training_data/pdfs/' and summaries to '/content/training_data/summaries/'.")

Starting training...


Input ids are automatically padded from 8868 to 9216 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 6553 to 7168 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 9915 to 10240 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 15332 to 15360 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 3990 to 4096 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 12540 to 13312 to be a multiple of `config.attention_window`: 1024
Input ids are automatically padded from 6650 to 7168 to be a multiple of `config.attention_window`: 1024


Step,Training Loss,Validation Loss


Step,Training Loss,Validation Loss


In [13]:
# Example: save and load the model
save_path = '/content/drive/MyDrive/led-summarizer'

print(f'To save the model: trainer.save_model("{save_path}") and tokenizer.save_pretrained("{save_path}")')

To save the model: trainer.save_model("/content/drive/MyDrive/led-summarizer") and tokenizer.save_pretrained("/content/drive/MyDrive/led-summarizer")


In [14]:
# Inference function: long document -> chunk summaries -> combined summary

def summarize_long_document(doc_text: str, model, tokenizer, max_input_length=16384, stride=1024, max_target_length=512, device=None):
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    enc = tokenizer(doc_text, max_length=max_input_length, truncation=True, return_overflowing_tokens=True, stride=stride)
    parts = []
    for ids, att in zip(enc['input_ids'], enc['attention_mask']):
        gatt = [0]*len(ids)
        if len(gatt)>0:
            gatt[0]=1
        input_ids = torch.tensor([ids], dtype=torch.long).to(device)
        attention_mask = torch.tensor([att], dtype=torch.long).to(device)
        global_att = torch.tensor([gatt], dtype=torch.long).to(device)
        out = model.generate(input_ids=input_ids, attention_mask=attention_mask, global_attention_mask=global_att, max_length=max_target_length, num_beams=4)
        s = tokenizer.decode(out[0], skip_special_tokens=True)
        parts.append(s)
    # second-level summarization (hierarchical)
    combined = '\n'.join(parts)
    if len(combined.strip())==0:
        return ''
    enc2 = tokenizer(combined, truncation=True, max_length=max_input_length, return_tensors='pt').to(device)
    out2 = model.generate(**enc2, max_length=max_target_length, num_beams=4)
    final = tokenizer.decode(out2[0], skip_special_tokens=True)
    return final



In [15]:
# Gradio Blocks UI: Upload PDF -> summary (+ optional audio)
import gradio as gr
from pathlib import Path
import traceback

# Detect fine-tuned model if available
candidates = [
    Path('/content/drive/MyDrive/summarizer-test'), # Correct path from training
    Path('/content/drive/MyDrive/led-summarizer'),
    Path('/content/led_finetuned'),
    Path('/content/checkpoints/final_model')
]
# Find the first existing path, fallback to base model
model_path = next((str(p) for p in candidates if p.exists()), model_name)
print(f"Using model from: {model_path}")

try:
    summarizer = Summarizer(model_path)
except Exception as e:
    print(f"Failed to load model from {model_path}. Falling back to default.")
    summarizer = Summarizer(model_name)

def process_pdf(file):
    if not file:
        return 'Please upload a PDF file first.'
    try:
        path = file.name if hasattr(file, 'name') else file
        print(f"Processing file: {path}")

        text = extract_pdf(path)
        print(f"Extracted text length: {len(text)}")

        if not text or len(text.strip()) < 10:
            return "Error: No text could be extracted from this PDF. It might be an image-based PDF (scanned) or empty."

        summary = summarizer.summarize(text)
        return summary
    except Exception as e:
        traceback.print_exc()
        return f"An error occurred during summarization: {str(e)}"

def get_audio(text):
    if not text or text.startswith('Error') or text.startswith('Please') or text.startswith('An error'):
        return None, None
    try:
        path = text_to_speech(text)
        return path, path
    except Exception as e:
        print(f"TTS Error: {e}")
        return None, None

with gr.Blocks(title='PDF Summarizer', theme=gr.themes.Default()) as app:
    gr.Markdown('# 📄 PDF Summarizer')
    gr.Markdown('*Transform lengthy documents into concise summaries with AI*')

    with gr.Row():
        with gr.Column(scale=1):
            pdf_input = gr.File(label='Upload PDF')
            summarize_btn = gr.Button('Generate Summary', variant='primary')
        with gr.Column(scale=2):
            summary_output = gr.Textbox(label='Summary', lines=12, placeholder='Your summary will appear here...')

    gr.Markdown('### 🔊 Text-to-Speech')
    with gr.Row():
        audio_btn = gr.Button('Convert to Audio')
        audio_output = gr.Audio(label='Listen', type='filepath')
        download_output = gr.File(label='Download MP3')

    summarize_btn.click(fn=process_pdf, inputs=[pdf_input], outputs=[summary_output])
    audio_btn.click(fn=get_audio, inputs=[summary_output], outputs=[audio_output, download_output])

print('Gradio Blocks UI ready.')

Using model from: /content/drive/MyDrive/summarizer-test
Gradio Blocks UI ready.


In [None]:
import gradio as gr

# Clean up previous sessions
gr.close_all()

try:
    app.launch(share=True, debug=True)
except KeyboardInterrupt:
    print("\nThe application was closed by the user. Resources are being released...")
    gr.close_all()
except Exception as e:
    print(f"An unexpected error occurred:{e}")
    gr.close_all()

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://044557afc7c84e505c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
