Single trial for only Podcast Transcription using FasterWhisper.

In [None]:
# Install dependencies
!pip install -q yt-dlp faster-whisper

import os
from pathlib import Path
import yt_dlp
import torchs
from faster_whisper import WhisperModel

# --- CONFIG ---
YOUTUBE_URL   = "https://www.youtube.com/watch?v=MPQrgicE0D4"  # Replace with your podcast URL
MODEL_SIZE    = "tiny"  # tiny | base | small | medium | large-v3
AUDIO_DIR     = Path("/content/audios")
OUTPUT_DIR    = Path("/content/transcripts")

AUDIO_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# --- Download Audio ---
print(f"[INFO] Downloading audio from: {YOUTUBE_URL}")
ytdl_opts = {
    "format": "bestaudio[ext=m4a]/bestaudio/best",
    "outtmpl": str(AUDIO_DIR / "%(title)s.%(ext)s"),
    "postprocessors": [{
        "key": "FFmpegExtractAudio",
        "preferredcodec": "mp3",
        "preferredquality": "192",
    }],
    "quiet": True,
}

with yt_dlp.YoutubeDL(ytdl_opts) as ydl:
    info = ydl.extract_info(YOUTUBE_URL, download=True)
    downloaded_file = ydl.prepare_filename(info)
    audio_path = Path(downloaded_file).with_suffix(".mp3")

print(f"[OK] Audio downloaded: {audio_path.name}")

# --- Transcribe ---
device_fw = "cuda" if torch.cuda.is_available() else "cpu"
compute_type_fw = "float16" if device_fw == "cuda" else "int8"

print(f"[INFO] Transcribing with faster-whisper ({MODEL_SIZE}) on {device_fw}...")
fw_model = WhisperModel(MODEL_SIZE, device=device_fw, compute_type=compute_type_fw)

segments_iter, info = fw_model.transcribe(
    str(audio_path),
    vad_filter=False,
    beam_size=5,
    temperature=0.0,
    word_timestamps=False
)

# Save transcript
output_txt = OUTPUT_DIR / f"{audio_path.stem}.txt"
with open(output_txt, "w", encoding="utf-8") as f:
    for seg in segments_iter:
        f.write(f"{seg.text.strip()}\n")

print(f"[DONE] Transcript saved at: {output_txt}")

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.9/175.9 kB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m85.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m55.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.8/38.8 MB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.4/17.4 MB[0m [31m117.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25h[INFO] Downloading audio from: https://www.yo



[OK] Audio downloaded: ¿Hasta dónde puede llegar el Gobierno？.mp3
[INFO] Transcribing with faster-whisper (small) on cuda...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


vocabulary.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.bin:   0%|          | 0.00/484M [00:00<?, ?B/s]

[DONE] Transcript saved at: /content/transcripts/¿Hasta dónde puede llegar el Gobierno？.txt


First batch of podcast transcriptions

In [None]:
# Install dependencies
!pip install -q yt-dlp faster-whisper

import os
import math
import time
from pathlib import Path
import yt_dlp
import torch
from faster_whisper import WhisperModel

# ---------------- CONFIG ----------------
URLS = [
    "https://youtu.be/MvCPYH4p0jc?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY",
    "https://youtu.be/oEqOUDDwlUk?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY",
    "https://youtu.be/Fe787vCVg_0?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY",
    "https://youtu.be/6BTWzVjvTuM?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY",
    "https://youtu.be/7mbWvYnV0d0?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY",
    "https://youtu.be/1UyuBWYuDNU?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY"

]

MODEL_SIZE    = "medium"            # tiny | base | small | medium | large-v3
AUDIO_DIR     = Path("/content/audios")
TRANS_DIR     = Path("/content/transcripts")
ZIP_PATH      = Path("/content/transcripts_batch.zip")

# Speed/robustness for yt-dlp (avoids SABR slowness)
YTDLP_OPTS = {
    "format": "bestaudio[ext=m4a]/bestaudio/best",
    "outtmpl": str(AUDIO_DIR / "%(title)s-%(id)s.%(ext)s"),
    "postprocessors": [{
        "key": "FFmpegExtractAudio",
        "preferredcodec": "mp3",
        "preferredquality": "192",
    }],
    "retries": 3,
    "socket_timeout": 30,
    "noplaylist": True,         # do not expand the playlist; we supply single video URLs
    "quiet": True,
}

# ---------------- PREP ----------------
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
TRANS_DIR.mkdir(parents=True, exist_ok=True)

device_fw = "cuda" if torch.cuda.is_available() else "cpu"
compute_type_fw = "float16" if device_fw == "cuda" else "int8"
print(f"[INFO] Device: {device_fw} | compute_type: {compute_type_fw}")

# Load model once and reuse
model = WhisperModel(MODEL_SIZE, device=device_fw, compute_type=compute_type_fw)

def download_audio(url: str) -> Path:
    """Download a single video's best audio as mp3 and return path."""
    with yt_dlp.YoutubeDL(YTDLP_OPTS) as ydl:
        info = ydl.extract_info(url, download=True)
        downloaded_file = ydl.prepare_filename(info)
        mp3_path = Path(downloaded_file).with_suffix(".mp3")
        return mp3_path

def safe_stem(p: Path) -> str:
    return "".join(c if c.isalnum() or c in " .-_()" else "_" for c in p.stem).strip().rstrip("._")

def transcribe_file(audio_path: Path, out_dir: Path) -> Path:
    """Transcribe an audio file and write plain text transcript; returns output .txt path."""
    out_txt = out_dir / f"{safe_stem(audio_path)}.txt"
    if out_txt.exists():
        print(f"  ↳ Transcript exists, skipping: {out_txt.name}")
        return out_txt

    print(f"  ↳ Transcribing: {audio_path.name}")
    segments_iter, info = model.transcribe(
        str(audio_path),
        vad_filter=False,     # no VAD
        beam_size=5,
        temperature=0.0,
        word_timestamps=False
    )
    with open(out_txt, "w", encoding="utf-8") as f:
        for seg in segments_iter:
            f.write(seg.text.strip() + "\n")
    return out_txt

# ---------------- RUN ----------------
all_outputs = []
start_all = time.time()

for i, url in enumerate(URLS, start=1):
    print(f"\n[{i}/{len(URLS)}] URL: {url}")
    try:
        audio_path = download_audio(url)
        print(f"  ↳ Downloaded: {audio_path.name}")
    except Exception as e:
        print(f"  ! Download failed, skipping: {e}")
        continue

    try:
        out_txt = transcribe_file(audio_path, TRANS_DIR)
        print(f"  ✓ Transcript: {out_txt.name}")
        all_outputs.append(out_txt)
    except Exception as e:
        print(f"  ! Transcription failed, skipping: {e}")
        continue

elapsed = time.time() - start_all
print(f"\n[INFO] Completed {len(all_outputs)}/{len(URLS)}")

# Zip transcripts for easy download
if all_outputs:
    # remove old zip if any
    if ZIP_PATH.exists():
        ZIP_PATH.unlink()
    !zip -qr /content/transcripts_batch.zip /content/transcripts
    print(f"[INFO] ZIP ready: {ZIP_PATH}")
else:
    print("[INFO] No transcripts were produced.")

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.9/175.9 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m91.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m76.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 MB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.8/38.8 MB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.4/17.4 MB[0m [31m114.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25h[INFO] Device: cuda | compute_type: float16


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


vocabulary.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.bin:   0%|          | 0.00/1.53G [00:00<?, ?B/s]


[1/6] URL: https://youtu.be/MvCPYH4p0jc?list=PLVzyrmx4CxOD0y9j5EcTM2BakMD0FGPhY


KeyboardInterrupt: 

Batch transcription with chunk generation for sentiment analysis and style (speaker) detection

In [None]:
# ============================================================
# Robust download with cookies + web-only clients (no android)
# Transcribe (full .txt) + on-the-fly sentence chunks (CSV)
# ============================================================
!python -m pip -q install --upgrade "yt-dlp @ https://github.com/yt-dlp/yt-dlp/archive/refs/heads/master.zip" faster-whisper nltk

import os, csv, time, re
from pathlib import Path
import yt_dlp
import torch
from faster_whisper import WhisperModel

import nltk
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)
from nltk.tokenize import sent_tokenize

from google.colab import files

# ------------------ SETTINGS ------------------
USE_COOKIES = True  # upload cookies.txt from your browser session
URLS = [
    "https://www.youtube.com/watch?v=z9g_ZB-ffwE&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=37",
    "https://www.youtube.com/watch?v=gMt3r9nyg8M&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=36",
    "https://www.youtube.com/watch?v=qEXT4Kx2h-I&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=35",
    "https://www.youtube.com/watch?v=tG6HT244UmI&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=34",
    "https://www.youtube.com/watch?v=V9w9ay3AV8I&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=33",
    "https://www.youtube.com/watch?v=35nFSKJp93I&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=32",
    "https://www.youtube.com/watch?v=3CWCA1eRPGQ&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=31"
]

MODEL_SIZE  = "large-v3-turbo"           # tiny | base | small | medium | large-v3
FORCE_LANG  = "es"                 # e.g., "es" or "" for auto
BEAM_SIZE   = 5
TEMPERATURE = (0.0, 0.2, 0.4)

# Chunking
MAX_CHARS_PER_CHUNK = 5000
MAX_SENTS_PER_CHUNK = 1
SENT_OVERLAP        = 0

# Paths
AUDIO_DIR    = Path("/content/audios")
FULL_TXT_DIR = Path("/content/transcripts_full")
CSV_DIR      = Path("/content/transcripts_chunks/csv")
ZIP_TXT      = Path("/content/transcripts_full.zip")
ZIP_CSV      = Path("/content/transcripts_chunks.zip")
COOKIE_PATH  = Path("/content/cookies.txt")

AUDIO_DIR.mkdir(parents=True, exist_ok=True)
FULL_TXT_DIR.mkdir(parents=True, exist_ok=True)
CSV_DIR.mkdir(parents=True, exist_ok=True)

# ---- Upload cookies ----
if USE_COOKIES:
    print("⬆️ Upload cookies.txt (exported from a logged-in YouTube session; Netscape format)")
    uploaded = files.upload()
    fname = next(iter(uploaded.keys()))
    COOKIE_PATH.write_bytes(uploaded[fname])

def safe_name(s: str) -> str:
    return "".join(c if c.isalnum() or c in " .-_()" else "_" for c in (s or "")).strip().rstrip("._")

def normalize_text(t: str) -> str:
    return re.sub(r"\s+", " ", (t or "")).strip()

def strip_playlist_params(url: str) -> str:
    # Keep only the watch?v=... part to avoid playlist edge-cases
    m = re.search(r"(https?://www\.youtube\.com/watch\?v=[A-Za-z0-9_-]{11})", url)
    return m.group(1) if m else url

def ydl_base_opts():
    # Core options used for all attempts
    return {
        "format": "bestaudio[ext=m4a]/bestaudio/best",
        "outtmpl": str(AUDIO_DIR / "%(title)s-%(id)s.%(ext)s"),
        "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "mp3"}],
        "retries": 5,
        "socket_timeout": 30,
        "noplaylist": True,
        "quiet": True,
        # Smaller chunk-size avoids YT throttling quirks (>10MB) per yt-dlp FAQ
        "http_chunk_size": 9 * 1024 * 1024,
        "http_headers": {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) "
                                       "AppleWebKit/537.36 (KHTML, like Gecko) "
                                       "Chrome/123.0 Safari/537.36"},
    }

# We will try web clients in this order:
# 1) web_safari  (often exposes HLS that avoids PO-token; works with cookies)
# 2) web         (works sometimes; SABR may be enforced)
# 3) web_embedded(only embeddable, but sometimes bypasses SABR)
CLIENT_TRIES = [
    ["web_safari"],
    ["web"],
    ["web_embedded"],
]

def download_audio(url: str):
    url = strip_playlist_params(url)
    last_err = None
    for clients in CLIENT_TRIES:
        opts = ydl_base_opts()
        opts["extractor_args"] = {"youtube": {"player_client": clients}}
        if USE_COOKIES and COOKIE_PATH.exists():
            opts["cookiefile"] = str(COOKIE_PATH)

        print(f"  ↳ Trying player_client={clients} …")
        try:
            with yt_dlp.YoutubeDL(opts) as ydl:
                info = ydl.extract_info(url, download=True)
                title = info.get("title") or "audio"
                vid   = info.get("id") or "unknown"
                mp3_path = Path(ydl.prepare_filename(info)).with_suffix(".mp3")
                if mp3_path.exists():
                    return mp3_path, vid, title
        except Exception as e:
            last_err = e
            print(f"    ! attempt failed: {e}")
            continue
    # if all attempts failed
    raise RuntimeError(f"All client attempts failed. Last error: {last_err}")

class ChunkWriter:
    def __init__(self, csv_writer, video_id, max_chars=450, max_sents=5, overlap=1):
        self.w = csv_writer; self.video_id = video_id
        self.max_chars=max_chars; self.max_sents=max_sents; self.overlap=max(0, overlap)
        self.buffer=[]; self.chunk_id=0
    def add_segment_sentences(self, sents, seg_start, seg_end):
        if not sents: return
        seg_start=float(seg_start or 0.0); seg_end=float(seg_end or seg_start)
        dur=max(0.001, seg_end-seg_start); per=dur/max(1,len(sents))
        for i, s in enumerate(sents):
            st=seg_start+i*per; en=seg_start+(i+1)*per
            self.buffer.append({"text": s, "start": st, "end": en})
        self._flush_complete()
    def _flush_complete(self, force=False):
        while self.buffer:
            txt=""; count=0; last=-1
            for i,it in enumerate(self.buffer):
                cand=((" "+it["text"]) if txt else it["text"])
                if count>=self.max_sents: break
                if len((txt+cand).strip())>self.max_chars and count>0: break
                txt=(txt+cand).strip(); count+=1; last=i
            if last==-1:
                it=self.buffer[0]; self._write(it["start"], it["end"], it["text"]); del self.buffer[0]; continue
            if not force and count<=self.overlap and len(self.buffer)<=self.max_sents: break
            st=self.buffer[0]["start"]; en=self.buffer[last]["end"]
            self._write(st, en, txt)
            remove_n=max(1, count-self.overlap) if self.overlap>=0 else count
            self.buffer=self.buffer[remove_n:]
        if force and self.buffer:
            st=self.buffer[0]["start"]; en=self.buffer[-1]["end"]
            txt=" ".join([x["text"] for x in self.buffer]).strip()
            if txt: self._write(st, en, txt)
            self.buffer.clear()
    def _write(self, st, en, text):
        self.w.writerow([self.video_id, self.chunk_id, round(st,2), round(en,2), text]); self.chunk_id+=1
    def close(self): self._flush_complete(force=True)

# ---- Transcribe + write full transcript + chunks ----
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
print(f"[INFO] faster-whisper: {MODEL_SIZE} on {device} ({compute_type})")
model = WhisperModel(MODEL_SIZE, device=device, compute_type=compute_type)
force_language = FORCE_LANG.strip().lower() or None

t0 = time.time(); done = 0
for idx, url in enumerate(URLS, 1):
    print(f"\n[{idx}/{len(URLS)}] {url}")
    try:
        audio_path, video_id, title = download_audio(url)
        base = safe_name(f"{title}-{video_id}")
        full_txt_path = FULL_TXT_DIR / f"{base}.txt"
        csv_path      = CSV_DIR / f"{base}.csv"

        if full_txt_path.exists() and csv_path.exists():
            print(f"  ↳ Already processed: {base}"); done += 1; continue

        print(f"  ↳ Transcribing -> {full_txt_path.name} + {csv_path.name}")
        with open(full_txt_path, "w", encoding="utf-8") as ftxt, \
             open(csv_path, "w", encoding="utf-8", newline="") as fcsv:
            writer = csv.writer(fcsv); writer.writerow(["video_id","chunk_id","start","end","text"])
            cw = ChunkWriter(writer, video_id,
                             max_chars=MAX_CHARS_PER_CHUNK,
                             max_sents=MAX_SENTS_PER_CHUNK,
                             overlap=SENT_OVERLAP)

            seg_iter, info = model.transcribe(
                str(audio_path),
                language=force_language,
                vad_filter=False,
                beam_size=BEAM_SIZE,
                temperature=TEMPERATURE,
                word_timestamps=False
            )
            for seg in seg_iter:
                line = normalize_text(seg.text)
                if not line: continue
                ftxt.write(line + "\n")
                sents = [s.strip() for s in sent_tokenize(line, language="spanish") if s.strip()]
                cw.add_segment_sentences(sents, seg.start, seg.end)
            cw.close()

        print(f"  ✓ Done: {base}"); done += 1
    except Exception as e:
        print(f"  ! Error: {e}")

if ZIP_TXT.exists(): ZIP_TXT.unlink()
if ZIP_CSV.exists(): ZIP_CSV.unlink()
!zip -qr /content/transcripts_full.zip /content/transcripts_full
!zip -qr /content/transcripts_chunks.zip /content/transcripts_chunks
print(f"\n[DONE] {done}/{len(URLS)} processed")
print(f"Full transcripts: {FULL_TXT_DIR} (zip: {ZIP_TXT})")
print(f"Chunk CSVs:       {CSV_DIR}      (zip: {ZIP_CSV})")

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
⬆️ Upload cookies.txt (exported from a logged-in YouTube session; Netscape format)


Saving cookies.txt to cookies.txt
[INFO] faster-whisper: large-v3-turbo on cuda (float16)

[1/7] https://www.youtube.com/watch?v=z9g_ZB-ffwE&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=37
  ↳ Trying player_client=['web_safari'] …




  ↳ Transcribing -> SE ACABÓ LO QUE SE DABA _ La Pija y la Quinqui 3x40-z9g_ZB-ffwE.txt + SE ACABÓ LO QUE SE DABA _ La Pija y la Quinqui 3x40-z9g_ZB-ffwE.csv
  ✓ Done: SE ACABÓ LO QUE SE DABA _ La Pija y la Quinqui 3x40-z9g_ZB-ffwE

[2/7] https://www.youtube.com/watch?v=gMt3r9nyg8M&list=PL6CFCewUyIQJ5lDLoHL6PRk2xBqJ6Pz9C&index=36
  ↳ Trying player_client=['web_safari'] …






KeyboardInterrupt: 

In [None]:
import shutil
import os

# Path to the folder you want to clean
folder_path = '/content'

# Iterate through all items in the folder and remove them
for item in os.listdir(folder_path):
    item_path = os.path.join(folder_path, item)
    try:
        if os.path.isfile(item_path) or os.path.islink(item_path):
            os.unlink(item_path)  # Remove file or link
        elif os.path.isdir(item_path):
            shutil.rmtree(item_path)  # Remove directory
    except Exception as e:
        print(f'Failed to delete {item_path}. Reason: {e}')

print("✅ All content inside '/content' has been deleted.")

✅ All content inside '/content' has been deleted.


with proper chunk logic

In [None]:
# -*- coding: utf-8 -*-
import re, csv, time, torch, yt_dlp
from pathlib import Path
from faster_whisper import WhisperModel
import nltk
nltk.download('punkt', quiet=True)  # sentence tokenizer
from nltk.tokenize import sent_tokenize
from google.colab import files

# ----------------- CONFIG -----------------
USE_COOKIES = True
URLS = [
    "https://www.youtube.com/watch?v=MPQrgicE0D4",
    "https://www.youtube.com/watch?v=_WOBtEhxw9E",
    "https://www.youtube.com/watch?v=Xm27EegJYoY",
    "https://www.youtube.com/watch?v=ODQARH3WQDs",
    "https://www.youtube.com/watch?v=0oCyBIHsbBY",
    "https://www.youtube.com/watch?v=UVg5XSKY-NE",
    "https://www.youtube.com/watch?v=z0lPrJ8PkL0",
    "https://www.youtube.com/watch?v=qWKfxlG5xmg",
    "https://www.youtube.com/watch?v=gWG0y0Xr2Cw",
    "https://www.youtube.com/watch?v=2EYQVnI9Uc8",
    "https://youtu.be/MvCPYH4p0jc",
    "https://youtu.be/oEqOUDDwlUk",
    "https://youtu.be/Fe787vCVg_0",
    "https://youtu.be/6BTWzVjvTuM",
    "https://youtu.be/7mbWvYnV0d0",
    "https://youtu.be/1UyuBWYuDNU",
    "https://youtu.be/4Tt28gX1gV0",
    "https://youtu.be/EdKTNYLmExw",
    "https://youtu.be/k7faw7tk7zc",
    "https://youtu.be/qbKnSzJQLJA",
    "https://youtu.be/JjDWIT3CG7I",
    "https://youtu.be/wFycgD4-a44",
    "https://youtu.be/4xvbfyiqFl0",
    "https://youtu.be/diUd0oAdcPg",
    "https://youtu.be/2OSCAVhZfPQ",
    "https://youtu.be/TftCmJPhVzc",
    "https://youtu.be/dqZGCZ8h4qc",
    "https://youtu.be/aWmCEc1nBOM",
    "https://youtu.be/V9wJxbP0Xs0",
    "https://youtu.be/Y-7sBt0YE_s",
    "https://youtu.be/q_4SsQn9VcQ",
    "https://youtu.be/wByhoIpmJhA",
    "https://youtu.be/my6GTM9kG_g",
    "https://youtu.be/aN76rS0rTvs",
    "https://youtu.be/IboeL6CgtAE",
    "https://youtu.be/d1Ar3LnYBr8",
    "https://youtu.be/Z7Kh_AzcVHk",
    "https://youtu.be/ZPmJteFJAT0",
    "https://youtu.be/J9vWEcQDq0w",
    "https://youtu.be/O6facBzawUE",
    "https://www.youtube.com/watch?v=XojAvxb8ltI",
    "https://www.youtube.com/watch?v=NqFxStjnkJo",
    "https://www.youtube.com/watch?v=IFLCpt821HI",
    "https://www.youtube.com/watch?v=hHTbKTCgAEQ",
    "https://www.youtube.com/watch?v=Xlzcgmh2jlg",
    "https://www.youtube.com/watch?v=Diu_LYkLsPo",
    "https://www.youtube.com/watch?v=36iupsRxNNE",
    "https://www.youtube.com/watch?v=P9Gy3KNLwsA",
    "https://www.youtube.com/watch?v=ZYYa4e-DxQE",
    "https://www.youtube.com/watch?v=6p1N0rXA1Og",
    "https://www.youtube.com/watch?v=YG6sEHnYRpM",
    "https://www.youtube.com/watch?v=FZjGU7Ww2Yc",
    "https://www.youtube.com/watch?v=NeutxLBEfIE",
    "https://www.youtube.com/watch?v=DO8ikVs5ODc",
    "https://www.youtube.com/watch?v=9WTE-VPUS6c",
    "https://www.youtube.com/watch?v=DjZe2NMHDvQ",
    "https://www.youtube.com/watch?v=m5hTQAV_kt0",
    "https://www.youtube.com/watch?v=TL9b56iPWis",
    "https://www.youtube.com/watch?v=kPbe4cfL8cM",
    "https://www.youtube.com/watch?v=fCGUKzzBS1Y",
    "https://www.youtube.com/watch?v=ZtXrrDs9jdE",
    "https://www.youtube.com/watch?v=SY1HTy9qGvU",
    "https://www.youtube.com/watch?v=O8xr_3QCX-Y",
    "https://www.youtube.com/watch?v=jYfK0HF3EGE",
    "https://www.youtube.com/watch?v=DxYltGrOBBI",
    "https://www.youtube.com/watch?v=NwzGmP_BJaE",
    "https://www.youtube.com/watch?v=MrSiUHNgdo4",
    "https://www.youtube.com/watch?v=JnPEoaMtMHs",
    "https://www.youtube.com/watch?v=4ATkdGhm070",
    "https://www.youtube.com/watch?v=D1lo7HTUCgQ",
    "https://www.youtube.com/watch?v=-JMGeHrL7sY",
    "https://www.youtube.com/watch?v=xO9HD0c91xE",
    "https://www.youtube.com/watch?v=NbKgcnQFDsA",
    "https://www.youtube.com/watch?v=PmJKePlmj9A",
    "https://www.youtube.com/watch?v=JdTFyUV1FXY",
    "https://www.youtube.com/watch?v=nyaFnYfnvqk",
    "https://www.youtube.com/watch?v=XmT_iDHrXpM",
    "https://www.youtube.com/watch?v=3ghcInncQp4",
    "https://www.youtube.com/watch?v=O7wanpqZGSM",
    "https://www.youtube.com/watch?v=CryGY3yeQFs",
    "https://www.youtube.com/watch?v=fqqVw9Lc-9U",
    "https://www.youtube.com/watch?v=t5-pYIMu_yQ",
    "https://www.youtube.com/watch?v=2wWyTcchyx0",
    "https://www.youtube.com/watch?v=mPRPlUosQvE",
    "https://www.youtube.com/watch?v=mQox-dAoEaI",
    "https://www.youtube.com/watch?v=n48NKtTtme0",
    "https://www.youtube.com/watch?v=iNyZ_164mfQ",
    "https://www.youtube.com/watch?v=VPnMSbGarpw",
    "https://www.youtube.com/watch?v=kn8jseFJ-9A",
    "https://www.youtube.com/watch?v=XlteT9dv7XE"
]

MODEL_SIZE  = "large-v3-turbo"
FORCE_LANG  = "es"     # whisper language hint
BEAM_SIZE   = 5
TEMPERATURE = (0.0, 0.2, 0.4)

# --- One sentence per chunk; no overlap ---
MAX_CHARS_PER_CHUNK = 2000   # safety: avoid splitting long sentences by char cap
MAX_SENTS_PER_CHUNK = 1      # exactly one sentence per chunk
SENT_OVERLAP        = 0      # no duplicates across chunks

AUDIO_DIR    = Path("/content/audios")
FULL_TXT_DIR = Path("/content/transcripts_full")
CSV_DIR      = Path("/content/transcripts_chunks/csv")
ZIP_TXT      = Path("/content/transcripts_full.zip")
ZIP_CSV      = Path("/content/transcripts_chunks.zip")
COOKIE_PATH  = Path("/content/cookies.txt")

AUDIO_DIR.mkdir(parents=True, exist_ok=True)
FULL_TXT_DIR.mkdir(parents=True, exist_ok=True)
CSV_DIR.mkdir(parents=True, exist_ok=True)

if USE_COOKIES:
    print("⬆️ Upload cookies.txt (exported from a logged-in YouTube session; Netscape format)")
    uploaded = files.upload()
    fname = next(iter(uploaded.keys()))
    COOKIE_PATH.write_bytes(uploaded[fname])

def safe_name(s: str) -> str:
    return "".join(c if c.isalnum() or c in " .-_()" else "_" for c in (s or "")).strip().rstrip("._")

def normalize_text(t: str) -> str:
    return re.sub(r"\s+", " ", (t or "")).strip()

def strip_playlist_params(url: str) -> str:
    # Keep only the watch?v=... part to avoid playlist edge-cases
    m = re.search(r"(https?://www\.youtube\.com/watch\?v=[A-Za-z0-9_-]{11})", url)
    return m.group(1) if m else url

def ydl_base_opts():
    # Core options used for all attempts
    return {
        "format": "bestaudio/best",
        "outtmpl": str(AUDIO_DIR / "%(title)s-%(id)s.%(ext)s"),
        "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "mp3"}],
        "retries": 5,
        "socket_timeout": 30,
        "noplaylist": True,
        "quiet": True,
        # "http_chunk_size": 9 * 1024 * 1024,
        #"http_headers": {"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) "
         #                              "AppleWebKit/537.36 (KHTML, like Gecko) "
          #                             "Chrome/123.0 Safari/537.36"},
    }

CLIENT_TRIES = [
    ["web_safari"],
    ["web"],
    ["web_embedded"],
]

def download_audio(url: str):
    url = strip_playlist_params(url)
    last_err = None
    for clients in CLIENT_TRIES:
        opts = ydl_base_opts()
        opts["extractor_args"] = {"youtube": {"player_client": clients}}
        if USE_COOKIES and COOKIE_PATH.exists():
            opts["cookiefile"] = str(COOKIE_PATH)

        print(f"  ↳ Trying player_client={clients} …")
        try:
            with yt_dlp.YoutubeDL(opts) as ydl:
                info = ydl.extract_info(url, download=True)
                title = info.get("title") or "audio"
                vid   = info.get("id") or "unknown"
                mp3_path = Path(ydl.prepare_filename(info)).with_suffix(".mp3")
                if mp3_path.exists():
                    return mp3_path, vid, title
        except Exception as e:
            last_err = e
            print(f"    ! attempt failed: {e}")
            continue
    # if all attempts failed
    raise RuntimeError(f"All client attempts failed. Last error: {last_err}")

# ----------------- ChunkWriter (UNCHANGED) -----------------
class ChunkWriter:
    def __init__(self, csv_writer, video_id, max_chars=450, max_sents=5, overlap=1):
        self.w = csv_writer; self.video_id = video_id
        self.max_chars=max_chars; self.max_sents=max_sents; self.overlap=max(0, overlap)
        self.buffer=[]; self.chunk_id=0
    def add_segment_sentences(self, sents, seg_start, seg_end):
        if not sents: return
        seg_start=float(seg_start or 0.0); seg_end=float(seg_end or seg_start)
        dur=max(0.001, seg_end-seg_start); per=dur/max(1,len(sents))
        for i, s in enumerate(sents):
            st=seg_start+i*per; en=seg_start+(i+1)*per
            self.buffer.append({"text": s, "start": st, "end": en})
        self._flush_complete()
    def _flush_complete(self, force=False):
        while self.buffer:
            txt=""; count=0; last=-1
            for i,it in enumerate(self.buffer):
                cand=((" "+it["text"]) if txt else it["text"])
                if count>=self.max_sents: break
                if len((txt+cand).strip())>self.max_chars and count>0: break
                txt=(txt+cand).strip(); count+=1; last=i
            if last==-1:
                it=self.buffer[0]; self._write(it["start"], it["end"], it["text"]); del self.buffer[0]; continue
            if not force and count<=self.overlap and len(self.buffer)<=self.max_sents: break
            st=self.buffer[0]["start"]; en=self.buffer[last]["end"]
            self._write(st, en, txt)
            remove_n=max(1, count-self.overlap) if self.overlap>=0 else count
            self.buffer=self.buffer[remove_n:]
        if force and self.buffer:
            st=self.buffer[0]["start"]; en=self.buffer[-1]["end"]
            txt=" ".join([x["text"] for x in self.buffer]).strip()
            if txt: self._write(st, en, txt)
            self.buffer.clear()
    def _write(self, st, en, text):
        self.w.writerow([self.video_id, self.chunk_id, round(st,2), round(en,2), text]); self.chunk_id+=1
    def close(self): self._flush_complete(force=True)

# ----------------- NLTK Sentence Assembler -----------------
class NLTKSentenceAssembler:
    """
    Accumulates text across Whisper segments and emits only COMPLETE sentences
    using NLTK's Spanish model. The final (last) piece is kept as a tail until
    the next segment arrives, so you don't get mid-sentence fragments.
    Timestamps are approximated as the min start / max end of contributing segments.
    """
    def __init__(self, language="spanish"):
        self.lang = language
        self.buf_text = ""
        self.buf_st = None
        self.buf_en = None

    def add_segment(self, text, seg_start, seg_end):
        text = (text or "").strip()
        if not text:
            return []
        if self.buf_text:
            self.buf_text += " " + text
            self.buf_st = min(self.buf_st, float(seg_start))
            self.buf_en = max(self.buf_en, float(seg_end))
        else:
            self.buf_text = text
            self.buf_st = float(seg_start or 0.0)
            self.buf_en = float(seg_end or seg_start)

        # Tokenize the entire buffer (Spanish rules)
        sents = [s.strip() for s in sent_tokenize(self.buf_text, language=self.lang) if s.strip()]
        if not sents:
            return []

        # Heuristic: treat all but the last as "complete"; keep the last as tail
        complete = sents[:-1]
        tail = sents[-1] if sents else ""

        emitted = []
        if complete:
            st, en = float(self.buf_st), float(self.buf_en)
            for s in complete:
                emitted.append((s, st, en))

            # Rebuild buffer to only contain the tail
            self.buf_text = tail
            # Keep the same approx time window for the tail

        return emitted

    def flush(self):
        """Optionally emit the remaining tail as a final sentence (even if incomplete)."""
        if not self.buf_text.strip():
            return []
        st = float(self.buf_st or 0.0)
        en = float(self.buf_en or st)
        out = [(self.buf_text.strip(), st, en)]
        self.buf_text = ""
        self.buf_st = None
        self.buf_en = None
        return out

# ----------------- Transcribe + write -----------------
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
print(f"[INFO] faster-whisper: {MODEL_SIZE} on {device} ({compute_type})")
model = WhisperModel(MODEL_SIZE, device=device, compute_type=compute_type)
force_language = FORCE_LANG.strip().lower() or None

t0 = time.time(); done = 0
for idx, url in enumerate(URLS, 1):
    print(f"\n[{idx}/{len(URLS)}] {url}")
    try:
        audio_path, video_id, title = download_audio(url)
        base = safe_name(f"{title}-{video_id}")
        full_txt_path = FULL_TXT_DIR / f"{base}.txt"
        csv_path      = CSV_DIR / f"{base}.csv"

        if full_txt_path.exists() and csv_path.exists():
            print(f"  ↳ Already processed: {base}"); done += 1; continue

        print(f"  ↳ Transcribing -> {full_txt_path.name} + {csv_path.name}")
        with open(full_txt_path, "w", encoding="utf-8") as ftxt, \
             open(csv_path, "w", encoding="utf-8", newline="") as fcsv:
            writer = csv.writer(fcsv); writer.writerow(["video_id","chunk_id","start","end","text"])
            cw = ChunkWriter(writer, video_id,
                             max_chars=MAX_CHARS_PER_CHUNK,    # one sentence shouldn’t hit this, but safe
                             max_sents=MAX_SENTS_PER_CHUNK,    # 1
                             overlap=SENT_OVERLAP)             # 0

            assembler = NLTKSentenceAssembler(language="spanish")

            seg_iter, info = model.transcribe(
                str(audio_path),
                language=force_language,
                vad_filter=False,
                beam_size=BEAM_SIZE,
                temperature=TEMPERATURE,
                word_timestamps=False  # set True if you later want precise per-sentence times
            )

            for seg in seg_iter:
                line = normalize_text(seg.text)
                if not line:
                    continue
                # Keep a raw line-by-line file of Whisper segments (handy for debugging)
                ftxt.write(line + "\n")

                # Emit ONLY complete sentences reconstructed across segments
                for sent_text, st, en in assembler.add_segment(line, seg.start, seg.end):
                    # Feed as a single-sentence "segment" into ChunkWriter
                    cw.add_segment_sentences([sent_text], st, en)

            # Optionally flush the final tail as a last sentence (remove if you want strictly terminated sentences)
            for sent_text, st, en in assembler.flush():
                cw.add_segment_sentences([sent_text], st, en)

            cw.close()

        print(f"  ✓ Done: {base}"); done += 1
    except Exception as e:
        print(f"  ! Error: {e}")

# ----------------- Zip outputs -----------------
if ZIP_TXT.exists(): ZIP_TXT.unlink()
if ZIP_CSV.exists(): ZIP_CSV.unlink()
!zip -qr /content/transcripts_full.zip /content/transcripts_full
!zip -qr /content/transcripts_chunks.zip /content/transcripts_chunks
print(f"\n[DONE] {done}/{len(URLS)} processed")
print(f"Full transcripts: {FULL_TXT_DIR} (zip: {ZIP_TXT})")
print(f"Chunk CSVs:       {CSV_DIR}      (zip: {ZIP_CSV})")

⬆️ Upload cookies.txt (exported from a logged-in YouTube session; Netscape format)


Saving cookies.txt to cookies (4).txt
[INFO] faster-whisper: large-v3-turbo on cuda (float16)

[1/90] https://www.youtube.com/watch?v=MPQrgicE0D4
  ↳ Trying player_client=['web_safari'] …




KeyboardInterrupt: 