In [1]:
# 1) Colab install: prefers faster-whisper on Py<=3.11, falls back to openai-whisper on Py>=3.12 ---
import sys, subprocess, textwrap
py_minor = sys.version_info.minor

!pip -q install --upgrade pip

COMMON = [
    "yt-dlp==2025.1.26",
    "pydub==0.25.1",
    "librosa==0.10.2.post1",
    "soundfile==0.12.1",
    "noisereduce==3.0.2",
    "rich==13.7.1",
]

if py_minor <= 11:
    # Preferred path (fast + accurate)
    !pip -q install "ctranslate2==4.4.0" "faster-whisper==1.0.1" { " ".join(COMMON) }
else:
    # Python 3.12+ fallback (very accurate, slower)
    !pip -q install "openai-whisper==20240930" { " ".join(COMMON) }

!apt -y -qq install ffmpeg


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m65.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m43.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [2]:
!pip -q install --upgrade pip
!pip -q install faster-whisper==1.0.3
!apt -y -qq install ffmpeg

ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [3]:
# Backend auto-detect
BACKEND = None
try:
    from faster_whisper import WhisperModel
    BACKEND = "faster"
except Exception:
    import whisper as whisper_og
    BACKEND = "openai"

import torch
CUDA_OK = torch.cuda.is_available()
DEVICE = "cuda" if CUDA_OK else "cpu"

def pick_compute_type():
    if DEVICE == "cuda":
        return "int8_float16"   # fast & accurate on GPU (faster-whisper only)
    return "int8"


In [4]:
# 2) IMPORTS & UTILITIES
import os, re, io, json, math, tempfile, subprocess, shutil, sys, time
from datetime import datetime
from pathlib import Path
import numpy as np
import soundfile as sf
from pydub import AudioSegment, effects
import librosa
import noisereduce as nr
from rich import print as rprint
from google.colab import files

# faster-whisper
from faster_whisper import WhisperModel

# Hardware selection
import torch
CUDA_OK = torch.cuda.is_available()
DEVICE = "cuda" if CUDA_OK else "cpu"

  m = re.match('([su]([0-9]{1,2})p?) \(([0-9]{1,2}) bit\)$', token)
  m2 = re.match('([su]([0-9]{1,2})p?)( \(default\))?$', token)
  elif re.match('(flt)p?( \(default\))?$', token):
  elif re.match('(dbl)p?( \(default\))?$', token):


In [6]:
# spacing for Hindi + English text
def smart_space_fix(text):

    if not text:
        return text

    #  Ensuring a space after punctuation if next char isn't space
    text = re.sub(r'([,;:!?])(?!\s)', r'\1 ', text)
    # Support Hindi danda/ dandi
    text = re.sub(r'(।)(?!\s)', r'\1 ', text)

    #  Removing extra spaces BEFORE punctuation
    text = re.sub(r'\s+([,;:!?।])', r'\1', text)

    # Adding a space at script boundaries (Devanagari <-> Latin/number)
    # Devanagari range: \u0900-\u097F
    text = re.sub(r'([\u0900-\u097F])([A-Za-z0-9])', r'\1 \2', text)
    text = re.sub(r'([A-Za-z0-9])([\u0900-\u097F])', r'\1 \2', text)

    # Splitting common glued Hindi function words on both sides
    common = ['है','हैं','था','थी','थे','भी','पर','और','या','कि','जो','जब','जहाँ','क्यों','तो','ना','में','से','के']
    for w in common:
        # add space before the word if preceded by a Devanagari letter w/o space
        text = re.sub(rf'([\u0900-\u097F])({w})', r'\1 \2', text)
        # add space after the word if followed by a Devanagari letter w/o space
        text = re.sub(rf'({w})([\u0900-\u097F])', r'\1 \2', text)

    # 5) Normalize multiple spaces
    text = re.sub(r'\s{2,}', ' ', text).strip()
    return text


def collapse_word_runs(text, max_repeat=3):

    if not text: return text
    tokens = re.findall(r'\w+|[^\w\s]', text, flags=re.UNICODE)
    out = []
    run_word = None
    run_len = 0
    for tok in tokens:
        # treat words separately from punctuation
        if re.match(r'\w+', tok, flags=re.UNICODE):
            if tok == run_word:
                run_len += 1
                if run_len <= max_repeat:
                    out.append(tok)
            else:
                run_word = tok
                run_len = 1
                out.append(tok)
        else:
            # reset run on punctuation
            run_word = None
            run_len = 0
            out.append(tok)
    # Re-join with spaces where appropriate
    txt = ""
    for i, t in enumerate(out):
        if i and re.match(r'\w+', t, flags=re.UNICODE) and re.match(r'\w+', out[i-1], flags=re.UNICODE):
            txt += " "
        elif i and t.isalnum() and out[i-1].isalnum():
            txt += " "
        txt += t
    # squeeze spaces before punctuation
    txt = re.sub(r'\s+([,.;:!?])', r'\1', txt)
    return txt.strip()

def trim_low_info_tail(text, min_unique_ratio=0.25, tail_window=60):

    if not text or len(text) < tail_window:
        return text
    head, tail = text[:-tail_window], text[-tail_window:]
    words = re.findall(r'\w+', tail, flags=re.UNICODE)
    if not words:
        return text
    uniq = set(words)
    ratio = len(uniq) / max(1, len(words))
    if ratio < min_unique_ratio:
        return head.rstrip()
    return text

def remove_repetitions(text, sim_threshold=0.72):
    if not text: return text
    # sentence-level dedupe (your original idea)
    sents = re.split(r'(?<=[.!?])\s+', text.strip())
    out = []
    prev = ""
    for s in sents:
        s_clean = s.strip()
        if not s_clean:
            continue
        if s_clean.lower() != prev.lower() and jaccard_sim(s_clean, prev) < sim_threshold:
            out.append(s_clean)
            prev = s_clean
    cleaned = " ".join(out)
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    # NEW: word-run collapse + tail trim
    cleaned = collapse_word_runs(cleaned, max_repeat=3)
    cleaned = trim_low_info_tail(cleaned, min_unique_ratio=0.25, tail_window=80)
    return cleaned


def pick_compute_type():
    """
    Heuristic for stable GPU/CPU precision that avoids OOM but keeps accuracy high.
    """
    if DEVICE == "cuda":
        # Use mixed int8/float16 on consumer GPUs for Large-v3
        return "int8_float16"
    # CPU path
    return "int8"  # CTranslate2 quantization for speed on CPU

def human_bytes(n):
    units = ["B","KB","MB","GB","TB"]
    i = 0
    while n >= 1024 and i < len(units)-1:
        n /= 1024.0; i += 1
    return f"{n:.1f} {units[i]}"


# 3) MEDIA HELPERS (FFmpeg-based)

def run_ffmpeg(cmd_args, quiet=True):
    base = ["ffmpeg", "-y"]
    if quiet:
        base += ["-hide_banner", "-loglevel", "error"]
    proc = subprocess.run(base + cmd_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.decode("utf-8", errors="ignore"))
    return True

def extract_wav_16k_mono(input_path, out_path):
    # 16kHz mono, 16-bit PCM (s16le)
    run_ffmpeg([
        "-i", input_path,
        "-vn",
        "-ac", "1",
        "-ar", "16000",
        "-c:a", "pcm_s16le",
        out_path
    ])

def normalize_lufs(audio_wav_path, target_lufs=-23.0):

    # Lightweight loudness normalization using pydub effects.normalize (peak) plus gain tweak.
    # For full EBU R128 you'd use ffmpeg loudnorm, but this keeps deps simple & fast.

    audio = AudioSegment.from_wav(audio_wav_path)
    audio = audio.set_channels(1).set_frame_rate(16000)
    # peak normalize
    audio = effects.normalize(audio)

    audio = audio.apply_gain(0.0)
    audio.export(audio_wav_path, format="wav")

def reduce_noise(audio_wav_path, out_path=None, prop_decrease=0.7):

    y, sr = librosa.load(audio_wav_path, sr=16000, mono=True)

    # Build a small noise profile from head (fallback: simple median)
    profile_len = min(len(y), int(0.5 * sr))
    y_noise = y[:profile_len] if profile_len > 0 else None

    try:
        y_nr = nr.reduce_noise(
            y=y,
            sr=sr,
            y_noise=y_noise,          # ok to pass None in v3.x
            stationary=True,
            prop_decrease=prop_decrease
        )
    except TypeError:

        y_nr = nr.reduce_noise(
            y=y,
            sr=sr,
            stationary=True,
            prop_decrease=prop_decrease
        )

    # Normalize to prevent clipping
    peak = np.max(np.abs(y_nr)) or 1.0
    y_nr = y_nr / peak

    if out_path is None:
        out_path = audio_wav_path
    sf.write(out_path, y_nr, sr)
    return out_path


# 4) YT DOWNLOAD

def download_youtube_wav(url, out_wav="yt_audio_16k.wav"):

    # temp output (m4a/webm)
    base = "yt_tmp_audio"
    # extract-audio wav can be brittle; we keep original then convert ourselves.
    ytdlp_cmd = [
        "yt-dlp",
        "-f", "bestaudio/best",
        "-o", f"{base}.%(ext)s",
        url
    ]
    proc = subprocess.run(ytdlp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.decode("utf-8", errors="ignore"))

    # Find the downloaded file (unknown ext)
    dl = None
    for ext in (".m4a", ".webm", ".mp3", ".mp4", ".opus"):
        cand = base + ext
        if os.path.exists(cand):
            dl = cand
            break
    if dl is None:

        matches = list(Path(".").glob("yt_tmp_audio.*"))
        if matches:
            dl = str(matches[0])
    if dl is None:
        raise FileNotFoundError("yt-dlp download failed to produce an audio file.")

    extract_wav_16k_mono(dl, out_wav)
    try:
        os.remove(dl)
    except:
        pass
    return out_wav


# 5) TEXT POST-PROCESSING

def jaccard_sim(a, b):
    if not a or not b: return 0.0
    s1, s2 = set(a.lower().split()), set(b.lower().split())
    if not s1 or not s2: return 0.0
    inter = len(s1 & s2)
    union = len(s1 | s2)
    return inter / union if union else 0.0

def remove_repetitions(text, sim_threshold=0.72):
    if not text: return text
    # split on sentence boundaries; keep ? ! .
    sents = re.split(r'(?<=[.!?])\s+', text.strip())
    out = []
    prev = ""
    for s in sents:
        s_clean = s.strip()
        if not s_clean: continue
        if s_clean.lower() != prev.lower() and jaccard_sim(s_clean, prev) < sim_threshold:
            out.append(s_clean)
            prev = s_clean
    # Clean joins
    cleaned = " ".join(out)
    cleaned = re.sub(r'\s+', ' ', cleaned).strip()
    return cleaned

def context_aware_corrections(text):
    if not text: return text

    corrections = [
        (r'\bi\b', 'I'),
        (r'\bu\b', 'you'),
        (r'\bur\b', 'your'),
        (r'\br\b', 'are'),
        (r'\bpls\b', 'please'),
        (r'\bthx\b', 'thanks'),
        (r'\bwanna\b', 'want to'),
        (r'\bgonna\b', 'going to'),
        (r'\bkinda\b', 'kind of'),
        (r'\bsorta\b', 'sort of'),
        (r'\bgimme\b', 'give me'),
        (r'\bdunno\b', 'do not know'),
    ]
    for pat, repl in corrections:
        text = re.sub(pat, repl, text, flags=re.IGNORECASE)

    # Specific pattern cleanups
    pattern_fixes = [
        (r'\.\s*\.+', '.'),               # collapse multiple dots
        (r'\s+,', ','),                   # space before comma
        (r'\s+\.', '.'),                  # space before period
        (r'(\s*\?)\?+', r'\1'),           # multi question marks
        (r'(\s*!)!+', r'\1'),             # multi exclamations
    ]
    for pat, repl in pattern_fixes:
        text = re.sub(pat, repl, text)

    # Sentence-case pass
    def _cap(m):
        s = m.group(0)
        return s[0].upper() + s[1:]
    text = re.sub(r'(^|\.\s+|\?\s+|!\s+)([a-z])', lambda m: m.group(1) + m.group(2).upper(), text)

    return text.strip()

# 6) SRT / JSON EXPORT

def srt_timestamp(t):
    # t in seconds (float)
    if t is None: t = 0.0
    h = int(t // 3600)
    m = int((t % 3600) // 60)
    s = int(t % 60)
    ms = int((t - int(t)) * 1000)
    return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"

def write_srt(segments, out_path):
    with open(out_path, "w", encoding="utf-8") as f:
        for i, seg in enumerate(segments, start=1):
            start = srt_timestamp(seg["start"])
            end = srt_timestamp(seg["end"])
            txt = seg["text"].strip()
            f.write(f"{i}\n{start} --> {end}\n{txt}\n\n")

def write_json_words(segments, out_path):
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(segments, f, ensure_ascii=False, indent=2)


# 7) TRANSCRIBER (faster-whisper)

class TranscriberPro:
    def __init__(self,
                 preferred_model="large-v3",
                 language=None,                   # None = auto
                 beam_size=5,
                 vad=True,
                 suppress_repetition=True):
        self.device = DEVICE
        self.compute_type = pick_compute_type()
        self.language = language
        self.beam_size = beam_size
        self.vad = vad
        self.suppress_repetition = suppress_repetition
        self.model_name = self._choose_model(preferred_model)
        rprint(f"[bold green]Device:[/bold green] {self.device}  "
               f"[bold green]Compute:[/bold green] {self.compute_type}  "
               f"[bold green]Model:[/bold green] {self.model_name}")
        self.model = WhisperModel(self.model_name, device=self.device, compute_type=self.compute_type)

    def _choose_model(self, preferred):
      if self.device == "cuda":
          return preferred
      else:
          return "medium.en"


    # Media Ingestion
    def from_youtube(self, url):
        rprint("[bold cyan]Downloading YouTube audio...[/bold cyan]")
        wav = download_youtube_wav(url, out_wav="youtube_16k.wav")
        self._post_ingest_cleanup(wav)
        return wav

    def from_video(self, video_path):
        rprint("[bold cyan]Extracting audio from video...[/bold cyan]")
        wav = "video_16k.wav"
        extract_wav_16k_mono(video_path, wav)
        self._post_ingest_cleanup(wav)
        return wav

    def from_audio(self, audio_path):
        rprint("[bold cyan]Standardizing audio...[/bold cyan]")
        wav = "input_16k.wav"
        extract_wav_16k_mono(audio_path, wav)
        self._post_ingest_cleanup(wav)
        return wav

    def _post_ingest_cleanup(self, wav_path):
        rprint("[bold cyan]Enhancing audio (normalize + noise reduce)...[/bold cyan]")
        normalize_lufs(wav_path)
        reduce_noise(wav_path, out_path=wav_path, prop_decrease=0.7)

    #  Transcription Core

    def transcribe(self, wav_path, temperature=0.0):
        rprint("[bold magenta]Transcribing with optimized parameters...[/bold magenta]")

        vad_params = dict(
            min_speech_duration_ms=250,
            max_speech_duration_s=30,
            min_silence_duration_ms=450,
            speech_pad_ms=120
        )

        segs, words = [], []

        generator, info = self.model.transcribe(
            wav_path,
            language=self.language,
            beam_size=self.beam_size,
            best_of=None,
            temperature=temperature,
            vad_filter=self.vad,
            vad_parameters=vad_params,
            condition_on_previous_text=False,
            compression_ratio_threshold=2.2,
            no_speech_threshold=0.60,
            log_prob_threshold=-1.2,
            word_timestamps=True,
            suppress_blank=True

        )

        for seg in generator:
            text = (seg.text or "").strip()
            if not text:
                continue
            start = float(seg.start or 0.0)
            end   = float(seg.end or (start + 1.0))
            avg_lp = float(seg.avg_logprob or -999.0)
            no_sp  = float(seg.no_speech_prob or 0.0)


            toks = re.findall(r'\w+', text, flags=re.UNICODE)
            uniq_ratio = (len(set(toks)) / max(1, len(toks))) if toks else 1.0
            if (no_sp > 0.80 and avg_lp < -0.8) or avg_lp < -1.2 or uniq_ratio < 0.25:
                continue

            segs.append({
                "start": start,
                "end": end,
                "text": text,
                "avg_logprob": avg_lp,
                "no_speech_prob": no_sp
            })

            if getattr(seg, "words", None):
                for w in seg.words:
                    words.append({
                        "word": w.word,
                        "start": float(w.start or start),
                        "end": float(w.end or start),
                        "prob": float(w.probability or 0.0)
                    })

        # Build text and clean it thoroughly
        raw_text = " ".join(s["text"] for s in segs).strip()
        if self.suppress_repetition:
            raw_text = remove_repetitions(raw_text, sim_threshold=0.70)
        final_text = context_aware_corrections(raw_text)

        final_text = collapse_word_runs(final_text, max_repeat=3)
        final_text = trim_low_info_tail(final_text, min_unique_ratio=0.25, tail_window=80)
        final_text = smart_space_fix(final_text)


        return final_text, segs, words


# 8) MAIN UX (upload file / YouTube)

def main():
    rprint("[bold yellow] Video Transcription System[/bold yellow]")
    rprint("="*60)
    rprint("1) Upload a VIDEO or AUDIO file")
    rprint("2) Enter a YouTube URL")

    choice = input("Choose option (1/2): ").strip()

    # Create transcriber (Large-v3 on GPU, Medium on CPU)
    transcriber = TranscriberPro(
        preferred_model="large-v3",
        language=None,            # auto-detect; set "en" for English-only
        beam_size=5,
        vad=True,
        suppress_repetition=True
    )

    src_name = ""
    wav_path = None

    if choice == "1":
        rprint("[bold cyan]Please upload a video or audio file...[/bold cyan]")
        uploaded = files.upload()
        if uploaded:
            in_path = list(uploaded.keys())[0]
            src_name = in_path
            # Decide by extension
            ext = Path(in_path).suffix.lower()
            if ext in [".wav", ".mp3", ".m4a", ".flac", ".ogg", ".opus"]:
                wav_path = transcriber.from_audio(in_path)
            else:
                wav_path = transcriber.from_video(in_path)
        else:
            rprint("[red]No file uploaded.[/red]")
            return

    elif choice == "2":
        url = input("Enter YouTube URL: ").strip()
        if not url:
            rprint("[red]No URL provided.[/red]")
            return
        src_name = url
        try:
            wav_path = transcriber.from_youtube(url)
        except Exception as e:
            rprint(f"[red]YouTube download failed: {e}[/red]")
            return


    else:
        rprint("[red]Invalid choice.[/red]")
        return

    # Transcribe
    try:
        final_text, segs, words = transcriber.transcribe(wav_path, temperature=0.0)
    except RuntimeError as e:
        # If GPU OOM with large-v3, fallback to medium.en automatically
        if "out of memory" in str(e).lower() or "cuda" in str(e).lower():
            rprint("[yellow]GPU ran out of memory. Falling back to 'medium.en'...[/yellow]")
            transcriber = TranscriberPro(
                preferred_model="medium.en",
                language=None,
                beam_size=5,
                vad=True,
                suppress_repetition=True
            )
            final_text, segs, words = transcriber.transcribe(wav_path, temperature=0.0)
        else:
            raise

    # Save outputs
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    base = f"transcription_{ts}"
    txt_path = f"{base}.txt"
    srt_path = f"{base}.srt"
    json_path = f"{base}_words.json"

    with open(txt_path, "w", encoding="utf-8") as f:
        f.write("ENHANCED VIDEO TRANSCRIPTION\n")
        f.write("="*50 + "\n\n")
        f.write(f"Source: {src_name}\n")
        f.write(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("="*50 + "\n\n")
        f.write(final_text.strip() + "\n")

    # Build SRT segments with cleaned sentence chunks (merge short lines)
    srt_ready = []
    for s in segs:
        if s["text"].strip():
            srt_ready.append({
                "start": s["start"],
                "end": s["end"],
                "text": s["text"].strip()
            })
    write_srt(srt_ready, srt_path)
    write_json_words(words, json_path)

    rprint("\n" + "="*60)
    rprint("[bold green]ENHANCED TRANSCRIPTION RESULT:[/bold green]")
    rprint("="*60)
    print(final_text)

    rprint(f"\n[bold]Saved:[/bold] {txt_path}, {srt_path}, {json_path}")
    # Offer downloads
    files.download(txt_path)
    files.download(srt_path)
    files.download(json_path)

if __name__ == "__main__":
    main()


Choose option (1/2): 1


Saving WIN_20250831_04_55_17_Pro.mp4 to WIN_20250831_04_55_17_Pro.mp4


Hello, my name is Anishka Raz.I am third year undergrad of IIT Kanpur.I am from Ayodhya, Uttar Pradesh, India.I am doing my B.Tech from Mechanical Engineering in IIT Kanpur.


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>