In [None]:
!pip install youtube_transcript_api



In [None]:
from huggingface_hub import login
import os
from dotenv import load_dotenv

load_dotenv()
your_token = os.getenv("HF")
# Log in using the token
login(token=your_token)

In [36]:
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import re
import math

# -------------------------
# Model & tokenizer
# -------------------------
model_name = "facebook/bart-large-cnn"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16,
    low_cpu_mem_usage=True
).to("cuda")

# true maximum token window the model supports
MODEL_MAX_TOKENS = getattr(model.config, "max_position_embeddings", None) or getattr(tokenizer, "model_max_length", None)
if MODEL_MAX_TOKENS is None:
    # fallback safe cap
    MODEL_MAX_TOKENS = 1024

# keep a safety margin so generation's own max_length fits
SAFETY_MARGIN = 64
SAFE_CHUNK_TOKENS = max(64, MODEL_MAX_TOKENS - SAFETY_MARGIN)


# -------------------------
# Token utilities
# -------------------------
def encode_ids(text, add_special_tokens=True):
    enc = tokenizer(text, return_tensors="pt", truncation=False, add_special_tokens=add_special_tokens)
    return enc.input_ids[0]

def tokens_to_text(token_ids):
    return tokenizer.decode(token_ids, skip_special_tokens=True)

def split_token_tensor(token_ids, max_tokens=SAFE_CHUNK_TOKENS):
    """Split a 1D token tensor into list of 1D token tensors each <= max_tokens."""
    L = token_ids.size(0)
    return [token_ids[i:i+max_tokens] for i in range(0, L, max_tokens)]


# -------------------------
# Safe summarization primitives
# -------------------------
def generate_from_token_chunk(token_chunk, max_length=150, min_length=20, num_beams=4):
    """
    token_chunk: 1D torch tensor of token ids (NOT batched). THIS function ensures device placement and calls generate.
    """
    if token_chunk.size(0) > SAFE_CHUNK_TOKENS:
        raise ValueError(f"generate_from_token_chunk called with {token_chunk.size(0)} tokens > SAFE_CHUNK_TOKENS {SAFE_CHUNK_TOKENS}")

    input_ids = token_chunk.unsqueeze(0).to(model.device)
    out = model.generate(
        input_ids=input_ids,
        max_length=max_length,
        min_length=min_length,
        num_beams=num_beams,
        early_stopping=True
    )
    return tokenizer.decode(out[0], skip_special_tokens=True)


def summarize_text_safe(text, max_length=150, min_length=20):
    """
    Summarize an arbitrary-length text by:
     - tokenizing
     - splitting into safe token chunks
     - generating per-chunk summaries
     - returning list of chunk summaries
    This DOES NOT attempt to produce the final merged summary.
    """
    ids = encode_ids(text)
    if ids.size(0) == 0:
        return []

    token_chunks = split_token_tensor(ids, SAFE_CHUNK_TOKENS)
    summaries = []
    for i, tk in enumerate(token_chunks):
        # defensive check
        if tk.size(0) > SAFE_CHUNK_TOKENS:
            tk = tk[:SAFE_CHUNK_TOKENS]
        s = generate_from_token_chunk(tk, max_length=max_length, min_length=min_length)
        summaries.append(s)
    return summaries


# -------------------------
# Iterative reducer (multi-pass)
# -------------------------
def iterative_compress(text, per_chunk_max_len=150, per_chunk_min_len=20, final_max_len=200):
    """
    Repeatedly summarize until the tokenized combined summary fits into SAFE_CHUNK_TOKENS.
    This avoids ever calling model.generate with too many tokens.
    """
    # 1) first pass: summarize original into chunk summaries
    summaries = summarize_text_safe(text, max_length=per_chunk_max_len, min_length=per_chunk_min_len)
    if not summaries:
        return ""

    # if there's one chunk only, return it (maybe further compress if still long)
    combined = " ".join(summaries)

    # iterative reduction loop: while combined is too long in tokens, compress its parts again
    iter_count = 0
    while True:
        iter_count += 1
        token_len = encode_ids(combined).size(0)
        # debug info
        print(f"[iter {iter_count}] combined token length = {token_len}; SAFE_CHUNK_TOKENS = {SAFE_CHUNK_TOKENS}")

        if token_len <= SAFE_CHUNK_TOKENS:
            # safe to produce final summary (one last generation)
            final_ids = encode_ids(combined)
            # ensure safe truncation just in case
            if final_ids.size(0) > SAFE_CHUNK_TOKENS:
                final_ids = final_ids[:SAFE_CHUNK_TOKENS]
            final_summary = generate_from_token_chunk(final_ids, max_length=final_max_len, min_length=per_chunk_min_len)
            return final_summary

        # otherwise combined is too long — compress it by summarizing its token-chunks
        # split combined into smaller pieces and summarize each piece
        compressed_parts = summarize_text_safe(combined, max_length=per_chunk_max_len, min_length=per_chunk_min_len)
        combined = " ".join(compressed_parts)

        # safety: prevent infinite loops — if no size reduction achieved, truncate
        if iter_count >= 6:
            print("[warning] reached maximum iterations; truncating combined to SAFE_CHUNK_TOKENS and summarizing final chunk.")
            final_ids = encode_ids(combined)[:SAFE_CHUNK_TOKENS]
            return generate_from_token_chunk(final_ids, max_length=final_max_len, min_length=per_chunk_min_len)


# -------------------------
# YouTube utilities
# -------------------------
def extract_youtube_id(url):
    if not url:
        return None
    parsed = urlparse(url)
    hostname = parsed.hostname or ""
    if hostname.endswith("youtube.com") or hostname == "m.youtube.com":
        q = parse_qs(parsed.query)
        if "v" in q:
            return q["v"][0]
        m = re.search(r"/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})", parsed.path)
        if m:
            return m.group(1)
    if hostname == "youtu.be":
        vid = parsed.path.strip("/")
        if len(vid) == 11:
            return vid
    return None

# def youtubeTranscript(url):
#     video_id = extract_youtube_id(url)
#     if not video_id:
#         return "Invalid YouTube URL"
#     try:
#         transcript = YouTubeTranscriptApi().fetch(video_id)
#         finalSTR=''
#         for i in range(len(transcript)):
#             tempSTR = str(transcript[i])
#             finalSTR += tempSTR[24:] + ' '
#         return finalSTR
#     except Exception as e:
#         return f"Error fetching transcript: {str(e)}"
# def youtubeTranscript(url):
#     video_id = extract_youtube_id(url)
#     if not video_id:
#         return "Invalid YouTube URL"
#     try:
#         transcript = YouTubeTranscriptApi().fetch(video_id)
#         finalSTR = ""
#         for item in transcript:
#             finalSTR += item["text"] + " "
#         return finalSTR
#     except Exception as e:
#         return f"Error fetching transcript: {str(e)}"

# def youtubeTranscript(url):
#     video_id = extract_youtube_id(url)
#     if not video_id:
#         return "Invalid YouTube URL"
#     try:
#         transcript = YouTubeTranscriptApi().fetch(video_id)
#         final = []
#         for item in transcript:
#             try:
#                 # Normal format
#                 final.append(item["text"])
#             except:
#                 # Snippet fallback
#                 final.append(item.text)

#         return " ".join(final)

#     except Exception as e:
#         return f"Error fetching transcript: {str(e)}"

# def youtubeTranscript(url):
#     video_id = extract_youtube_id(url)
#     if not video_id:
#         return "Invalid YouTube URL"
#     try:
#         # try english first
#         transcript = YouTubeTranscriptApi().list_transcripts(video_id)

#         # priority:
#         preferred = None

#         # 1. manually written English (best)
#         if transcript.find_transcript(['en'],).is_translatable:
#             preferred = transcript.find_transcript(['en'])

#         # 2. auto-generated English
#         if preferred is None:
#             try:
#                 preferred = transcript.find_transcript(['en'])
#             except:
#                 pass

#         # 3. fallback Kannada
#         if preferred is None:
#             preferred = transcript.find_transcript(['kn'])

#         fetched = preferred.fetch()

#         final = []
#         for item in fetched:
#             try:
#                 final.append(item["text"])
#             except:
#                 final.append(item.text)

#         return " ".join(final)

#     except Exception as e:
#         return f"Error fetching transcript: {str(e)}"



from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled

def youtubeTranscript(url):
    video_id = extract_youtube_id(url)
    if not video_id:
        return "Invalid YouTube URL"

    try:
        ytt = YouTubeTranscriptApi()  # create instance
        transcript_list = ytt.list(video_id)

        # Try to find English transcript, fallback if needed
        try:
            transcript = transcript_list.find_transcript(['en'])
        except Exception:
            transcript = None

        if transcript is None:
            # fallback: use first available transcript
            transcript = next(iter(transcript_list), None)

        if transcript is None:
            return "No transcript found"

        fetched = transcript.fetch()

        texts = []
        for snippet in fetched:
            # snippet is a FetchedTranscriptSnippet
            texts.append(snippet.text)

        return " ".join(texts)

    except TranscriptsDisabled:
        return "Transcripts are disabled for this video"
    except Exception as e:
        return f"Error fetching transcript: {str(e)}"

# -------------------------
# High-level final function
# -------------------------
def final(url_or_text, is_url=True):
    if is_url:
        txt = youtubeTranscript(url_or_text)
        if txt.startswith("Invalid") or txt.startswith("Error"):
            print(txt)
            return txt
    else:
        txt = url_or_text

    print(f"Original transcript token length = {encode_ids(txt).size(0)}")
    result = iterative_compress(txt, per_chunk_max_len=150, per_chunk_min_len=40, final_max_len=200)
    print("\n=== FINAL SUMMARY ===\n")
    print(result)
    return result


# -------------------------
# Example run
# -------------------------
if __name__ == "__main__":
    # Test with either a YouTube URL or huge text.
    final("https://www.youtube.com/watch?v=AMX1kwIASZ4", is_url=True)
    # Or test with a very long string to simulate the transcript:
    # long_text = "This is a sentence. " * 2000
    # final(long_text, is_url=False)


Original transcript token length = 1110
[iter 1] combined token length = 110; SAFE_CHUNK_TOKENS = 960

=== FINAL SUMMARY ===

Khloe's breakup with Connor is one of the most emotionally devastating moments of the series. A father-son storyline where Jim discovers creativity through Connor could deepen both characters. By weaving Connor into everyone's arcs, Georgie and Mandy's first marriage can transform him.
Original transcript token length = 10003
[iter 1] combined token length = 623; SAFE_CHUNK_TOKENS = 960

=== FINAL SUMMARY ===

This is a sentences. These are the sentences that make up a sentence, or a series of sentences, or even just a single sentence. They all start with the same word, and end with a different word, or sometimes even a different sentence.


In [39]:
import gradio as gr
gr.close_all()

demo = gr.Interface(fn=final,
                    inputs=[gr.Textbox(label="Input YouTube Url to summarize")],
                    outputs=[gr.Textbox(label="Summarized text")],
                    title="YouTube Script Summarizer",
                    description="SUMMARIZE THE YOUTUBE VIDEO SCRIPT.")
demo.launch()

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://8babde29255366f4b2.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


