<a href="https://colab.research.google.com/github/ezadEzanee/ASR/blob/main/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Create /content/venv reliably (fallback to virtualenv if needed) and ensure pip exists.
import os, subprocess, sys, pathlib

VENV_DIR = "/content/venv"

def run(cmd):
    print("$", cmd)
    rc = subprocess.call(cmd, shell=True)
    if rc != 0:
        raise SystemExit(rc)

if not os.path.exists(VENV_DIR):
    # Try stdlib venv first
    rc = subprocess.call(f"python -m venv {VENV_DIR}", shell=True)
    if rc != 0:
        # Fallback to virtualenv
        run("pip -q install virtualenv")
        run(f"python -m virtualenv {VENV_DIR}")

# Ensure pip inside venv (some Colab images miss ensurepip)
pip_path = f"{VENV_DIR}/bin/pip"
py_path  = f"{VENV_DIR}/bin/python"
if not os.path.exists(pip_path):
    run("wget -q https://bootstrap.pypa.io/get-pip.py -O /tmp/get-pip.py")
    run(f"{py_path} /tmp/get-pip.py")

# Upgrade pip
run(f"{pip_path} -q install --upgrade pip")

# Make this notebook import from the venv first
venv_site = next(pathlib.Path(f"{VENV_DIR}/lib").glob("python*/site-packages"))
if str(venv_site) not in sys.path:
    sys.path.insert(0, str(venv_site))
os.environ["VIRTUAL_ENV"] = VENV_DIR
os.environ["PYTHONNOUSERSITE"] = "1"   # ignore user site-packages

print("Using venv:", VENV_DIR)
print("Site-packages:", venv_site)
print("Python in venv:", subprocess.check_output([py_path, "-V"]).decode().strip())

$ pip -q install virtualenv
$ python -m virtualenv /content/venv
$ /content/venv/bin/pip -q install --upgrade pip
Using venv: /content/venv
Site-packages: /content/venv/lib/python3.12/site-packages
Python in venv: Python 3.12.11


In [None]:
import os, subprocess

VENV_DIR = "/content/venv"
pip = f"{VENV_DIR}/bin/pip"

# Pins that play nice with pyannote + torch
subprocess.check_call([pip, "install", "-q", "pandas==2.2.3", "numpy==2.2.6"])

# Torch/torchaudio: try CUDA 12.1 → 11.8 → CPU wheels
rc = os.system(f"{pip} -q install torch torchaudio --index-url https://download.pytorch.org/whl/cu121")
if rc != 0:
    rc = os.system(f"{pip} -q install torch torchaudio --index-url https://download.pytorch.org/whl/cu118")
    if rc != 0:
        os.system(f"{pip} -q install torch torchaudio")

# ASR + diarization
subprocess.check_call([pip, "install", "-q", "faster-whisper", "ctranslate2", "pyannote.audio==3.1.1"])

# Verify versions (importing from venv thanks to Cell 1)
import numpy, pandas, torch, torchaudio
print("numpy     :", numpy.__version__, numpy.__file__)
print("pandas    :", pandas.__version__, pandas.__file__)
print("torch     :", torch.__version__)
print("torchaudio:", torchaudio.__version__)


numpy     : 2.0.2 /usr/local/lib/python3.12/dist-packages/numpy/__init__.py
pandas    : 2.2.3 /content/venv/lib/python3.12/site-packages/pandas/__init__.py
torch     : 2.5.1+cu121
torchaudio: 2.5.1+cu121


In [None]:
# Mount Drive if your file is there
from google.colab import drive
drive.mount("/content/drive")

import os

# ---- INPUT ----
INPUT_MP4 = "/content/drive/MyDrive/ASR/Process Walkthrough Session #2 part1.mp4"   # <-- change this to your file path

# ---- Transcription settings ----
MODEL_ID   = "large-v3"       # or "medium" if you want faster
FORCE_LANG = None             # "en" to force English, None for auto
TRANSLATE  = True            # True to translate Malay/English → English

# ---- Diarization settings ----
PYANNOTE_MODEL = "pyannote/speaker-diarization-3.1"

# Pull HF token from Colab secret environment
from google.colab import userdata
userdata.get('HF_TOKEN')

HF_TOKEN = userdata.get("HF_TOKEN")   # ✅ secure, no hardcoding

NUM_SPEAKERS = None           # e.g., 3 to force exactly 3
MIN_SPEAKERS = None           # used only if NUM_SPEAKERS is None
MAX_SPEAKERS = None

# Domain prompt to bias ASR (optional)
DOMAIN_PROMPT = (
    "Oracle E-Business Suite, Discrete Manufacturing, BOM, WIP, subinventory, "
    "AHP, AAVE, AERP, standard cost, machine rate, labor rate, overhead, work order, "
    "job completion, MU18, Order Management, delivery order, UAT, CRP."
)

print("✅ Settings loaded. HF_TOKEN found?", bool(HF_TOKEN))

ValueError: mount failed

In [None]:
import os, json, time, tempfile, subprocess
from datetime import datetime, timedelta

def ts(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def step_start(i,n,msg): print(f"[{i}/{n}] {msg} (start {ts()})"); return time.perf_counter()
def step_end(i,n,t0,extra=None): dt=time.perf_counter()-t0; print(f"[{i}/{n}] end {ts()} ({dt:.1f}s){' | '+extra if extra else ''}")
def srt_time(s):
    if s < 0: s = 0
    return str(timedelta(seconds=s))[:-3].replace('.', ',')
def vtt_time(s):
    if s < 0: s = 0
    return str(timedelta(seconds=s))[:-3]

assert os.path.exists(INPUT_MP4), f"Input not found: {INPUT_MP4}"
base, _ = os.path.splitext(INPUT_MP4)
p2 = base + " - Process 2.json"
p3 = base + " - Process 3.json"
t4 = base + " - Process 4.txt"
s4 = base + " - Process 4.srt"
v4 = base + " - Process 4.vtt"

# ---------- Process 2 ----------
t0 = step_start(2,4,"Transcribing with faster-whisper")
from faster_whisper import WhisperModel
import torch
device  = "cuda" if torch.cuda.is_available() else "cpu"
compute = "float16" if device=="cuda" else "int8"
task    = "translate" if TRANSLATE else "transcribe"

if not os.path.exists(p2):
    wmodel = WhisperModel(MODEL_ID, device=device, compute_type=compute)
    seg_iter, info = wmodel.transcribe(
        INPUT_MP4,
        language=FORCE_LANG,
        task=task,
        beam_size=5,
        vad_filter=True,
        vad_parameters=dict(min_speech_duration_ms=300),
        initial_prompt=DOMAIN_PROMPT,
        condition_on_previous_text=True,
    )
    segs = [{"start": float(s.start), "end": float(s.end), "text": s.text.strip()} for s in seg_iter]
    lang = getattr(info, "language", None) or FORCE_LANG or "unknown"
    with open(p2, "w", encoding="utf-8") as f:
        json.dump({"lang": lang, "segments": segs}, f, ensure_ascii=False, indent=2)
    step_end(2,4,t0,f"cache -> {p2}")
else:
    with open(p2, "r", encoding="utf-8") as f:
        data = json.load(f)
    segs, lang = data["segments"], data["lang"]
    step_end(2,4,t0,"loaded cache")

# ---------- Process 3 ----------
t1 = step_start(3,4,"Running diarization")
diar = None
if os.path.exists(p3):
    with open(p3, "r", encoding="utf-8") as f:
        diar = json.load(f)
    step_end(3,4,t1,"loaded cache")
else:
    if not HF_TOKEN:
        step_end(3,4,t1,"skipped (no HF token)")
    else:
        # Extract mono/16k wav
        tmp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name
        subprocess.run(["ffmpeg", "-y", "-i", INPUT_MP4, "-ar", "16000", "-ac", "1", tmp_wav], check=True)
        try:
            from pyannote.audio import Pipeline
            pipeline = Pipeline.from_pretrained(PYANNOTE_MODEL, use_auth_token=HF_TOKEN)
        except Exception as e:
            print(f"[Diarization] Init failed: {e}")
            pipeline = None
        if pipeline is not None:
            kwargs = {}
            if NUM_SPEAKERS is not None:
                kwargs["num_speakers"] = NUM_SPEAKERS
            else:
                if MIN_SPEAKERS is not None: kwargs["min_speakers"] = MIN_SPEAKERS
                if MAX_SPEAKERS is not None: kwargs["max_speakers"] = MAX_SPEAKERS
            ann = pipeline({"audio": tmp_wav}, **kwargs) if kwargs else pipeline({"audio": tmp_wav})
            diar = [{"start": float(t.start), "end": float(t.end), "speaker": spk}
                    for t,_,spk in ann.itertracks(yield_label=True)]
            with open(p3, "w", encoding="utf-8") as f:
                json.dump(diar, f, ensure_ascii=False, indent=2)
            step_end(3,4,t1,f"cache -> {p3}")
        else:
            step_end(3,4,t1,"skipped")

# ---------- Process 4 ----------
t2 = step_start(4,4,"Merging segments + diarization")

def best_speaker(seg_start, seg_end, diar_list):
    best, ov = "SPEAKER", 0.0
    if not diar_list: return best
    for d in diar_list:
        s,e = float(d["start"]), float(d["end"])
        left, right = max(seg_start,s), min(seg_end,e)
        overlap = max(0.0, right-left)
        if overlap > ov:
            ov, best = overlap, d["speaker"]
    return best

for seg in segs:
    seg["speaker"] = best_speaker(seg["start"], seg["end"], diar)

with open(t4, "w", encoding="utf-8") as f:
    f.write(f"[Language: {lang}]\n\n")
    for seg in segs:
        hhmmss = str(timedelta(seconds=int(seg["start"])))
        f.write(f"[{hhmmss}] {seg['speaker']}: {seg['text']}\n")

with open(s4, "w", encoding="utf-8") as f:
    for i, seg in enumerate(segs, 1):
        f.write(f"{i}\n")
        f.write(f"{srt_time(seg['start'])} --> {srt_time(seg['end'])}\n")
        f.write(f"{seg['speaker']}: {seg['text']}\n\n")

with open(v4, "w", encoding="utf-8") as f:
    f.write("WEBVTT\n\n")
    for seg in segs:
        f.write(f"{vtt_time(seg['start'])} --> {vtt_time(seg['end'])}\n")
        f.write(f"{seg['speaker']}: {seg['text']}\n\n")

step_end(4,4,t2)

print("\n=== Outputs ===")
print("Text:", t4)
print("SRT :", s4)
print("VTT :", v4)

