In [None]:
!pip install faster_whisper

Collecting faster_whisper
  Downloading faster_whisper-1.2.1-py3-none-any.whl.metadata (16 kB)
Collecting ctranslate2<5,>=4.0 (from faster_whisper)
  Downloading ctranslate2-4.6.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (10 kB)
Collecting onnxruntime<2,>=1.14 (from faster_whisper)
  Downloading onnxruntime-1.23.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Collecting av>=11 (from faster_whisper)
  Downloading av-16.0.1-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting coloredlogs (from onnxruntime<2,>=1.14->faster_whisper)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime<2,>=1.14->faster_whisper)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading faster_whisper-1.2.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m28.3 MB/s[0m eta [3

In [None]:
from __future__ import annotations
import os, sys, shutil, subprocess, time, json, re
from typing import Optional, Iterable
from google import genai
from google.genai import types
from faster_whisper import WhisperModel
from faster_whisper.utils import download_model
# ================== CONFIG ==================
FW_MODEL      = os.environ.get("FW_MODEL", "Systran/faster-distil-whisper-large-v2")
FORCE_LANG    = None          # "it","en", or None for autodetect
CHUNK_SECONDS = 30
USE_VAD       = True
BEAM_SIZE     = 3
WORD_TS       = False
GEMINI_MODEL  = os.environ.get("GEMINI_MODEL", "gemini-flash-latest")
MAX_MD_CHARS  = 130_000
ENABLE_SEARCH_DEFAULT = False  # set True only if you need web tools
ALLOWED_EXTS  = {".wav", ".mp3", ".flac"}
DRIVE_MOUNT   = "/content/drive"
DRIVE_IN_DIR  = f"{DRIVE_MOUNT}/MyDrive/asr_in"
DRIVE_OUT_DIR = f"{DRIVE_MOUNT}/MyDrive/asr_out"
MODEL_CACHE   = f"{DRIVE_MOUNT}/MyDrive/.ctranslate2-cache"
LOCAL_IN_DIR  = "/content/_in"

# ================== CT2 (cache) helpers ==================
def _is_ct2_dir(path: str) -> bool:
    """Valid CT2 layouts:
    - distilled: model.bin (+ config/tokenizer)
    - split:     encoder*.bin + decoder*.bin (+ config)
    """
    if not (os.path.isdir(path) and any(os.scandir(path))):
        return False
    files = {f.name for f in os.scandir(path) if f.is_file()}
    has_model_bin = ("model.bin" in files) or any(n.startswith("model.") and n.endswith(".bin") for n in files)
    has_split = any(n.startswith("encoder") and n.endswith(".bin") for n in files) and \
                any(n.startswith("decoder") and n.endswith(".bin") for n in files)
    has_cfg = ("config.json" in files) or ("tokenizer.json" in files)
    return (has_model_bin or has_split) and has_cfg

def _find_ct2_dir(base: str) -> Optional[str]:
    if not os.path.isdir(base):
        return None
    if _is_ct2_dir(base):
        return base
    for root, dirs, files in os.walk(base):
        if _is_ct2_dir(root):
            return root
    return None

def _print_tree(path: str, max_items: int = 60) -> None:
    shown = 0
    for root, dirs, files in os.walk(path):
        rel = os.path.relpath(root, path)
        print(f"   [{rel}]")
        for d in dirs:
            print(f"     <DIR> {d}")
            shown += 1
            if shown >= max_items: return
        for f in files:
            print(f"           {f}")
            shown += 1
            if shown >= max_items: return

def _is_repo_id(model_id: str) -> bool:
    return "/" in model_id  # e.g. "Systran/faster-distil-whisper-large-v2"

def _hf_repo_cache_root(cache_dir: str, repo_id: str) -> str:
    # HF cache convention: models--ORG--REPO
    return os.path.join(cache_dir, "models--" + repo_id.replace("/", "--"))

def _find_ct2_dir_for_repo(cache_dir: str, repo_id: str) -> Optional[str]:
    repo_root = _hf_repo_cache_root(cache_dir, repo_id)
    if not os.path.isdir(repo_root):
        return None
    # usually .../snapshots/<rev>/...
    return _find_ct2_dir(repo_root)

# ================== Drive ==================
def mount_drive() -> None:
    from google.colab import drive
    drive.mount(DRIVE_MOUNT)
    for p in (DRIVE_IN_DIR, DRIVE_OUT_DIR, MODEL_CACHE, LOCAL_IN_DIR):
        os.makedirs(p, exist_ok=True)

# ================== Deps ==================
def pip_install() -> None:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U",
                           "faster-whisper>=1.0.0",
                           "google-genai>=0.2.0"])
    # avoid import confusion with the old SDK
    subprocess.call([sys.executable, "-m", "pip", "uninstall", "-y", "google-generativeai"],
                    stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def ensure_model_cached() -> str:
    """Download FW_MODEL if missing and return the *repo-specific* CT2 folder."""
    os.makedirs(MODEL_CACHE, exist_ok=True)

    if _is_repo_id(FW_MODEL):
        existing = _find_ct2_dir_for_repo(MODEL_CACHE, FW_MODEL)
        if existing:
            print(f"   Model cache → OK for {FW_MODEL} ({existing})")
            return existing

    print(f"   Model cache missing for {FW_MODEL} → downloading once")
    prev = os.environ.get("HF_HUB_OFFLINE", "1")
    os.environ["HF_HUB_OFFLINE"] = "0"
    try:
        local_dir = download_model(FW_MODEL, cache_dir=MODEL_CACHE, local_files_only=False)
    finally:
        os.environ["HF_HUB_OFFLINE"] = prev

    ct2_dir = _find_ct2_dir_for_repo(MODEL_CACHE, FW_MODEL) or _find_ct2_dir(local_dir)
    if not ct2_dir:
        print("   DEBUG tree:")
        _print_tree(local_dir)
        raise RuntimeError(f"Incomplete CT2 under {local_dir} for FW_MODEL='{FW_MODEL}'")

    print(f"   Model cache ready → {ct2_dir}")
    return ct2_dir

# ================== Audio I/O ==================
def pick_latest_from_drive() -> Optional[str]:
    if not os.path.isdir(DRIVE_IN_DIR):
        return None
    cands = []
    for name in os.listdir(DRIVE_IN_DIR):
        p = os.path.join(DRIVE_IN_DIR, name)
        if os.path.isfile(p) and os.path.splitext(name)[1].lower() in ALLOWED_EXTS:
            cands.append(p)
    if not cands:
        return None
    cands.sort(key=lambda p: os.path.getmtime(p), reverse=True)
    return cands[0]

def prepare_audio() -> tuple[str, str, str]:
    src = pick_latest_from_drive()
    if not src:
        present = []
        if os.path.isdir(DRIVE_IN_DIR):
            present = [n for n in os.listdir(DRIVE_IN_DIR) if os.path.isfile(os.path.join(DRIVE_IN_DIR, n))]
        raise FileNotFoundError(
            f"Nessun file valido in {DRIVE_IN_DIR}. Metti .wav/.mp3/.flac e riesegui. "
            f"Presenti: {present or '—'}"
        )
    base = os.path.basename(src)
    name, ext = os.path.splitext(base)
    ext = ext.lower()
    if ext not in ALLOWED_EXTS:
        raise ValueError(f"Estensione non supportata: {ext}")
    local_path = os.path.join(LOCAL_IN_DIR, f"{name}{ext}")
    if os.path.exists(local_path):
        os.remove(local_path)
    shutil.copyfile(src, local_path)
    return src, name, local_path

def run_dir_for(base_name: str) -> str:
    rd = os.path.join(DRIVE_OUT_DIR, base_name)
    os.makedirs(rd, exist_ok=True)
    return rd

def detect_device() -> str:
    try:
        import torch
        return "cuda" if torch.cuda.is_available() else "cpu"
    except Exception:
        return "cpu"

# ================== Transcribe ==================
def transcribe(input_audio_path: str, out_txt_path: str) -> dict:
    device = detect_device()
    compute_type = "int8_float16" if device == "cuda" else "int8"

    local_model_dir = ensure_model_cached()

    model = WhisperModel(
        local_model_dir,              # use resolved CT2 path (not a model name)
        device=device,
        compute_type=compute_type,
        download_root=MODEL_CACHE,
        cpu_threads=os.cpu_count()
    )

    segments_iter, info = model.transcribe(
        input_audio_path,
        language=FORCE_LANG,
        vad_filter=USE_VAD,
        vad_parameters={"min_silence_duration_ms": 300},
        beam_size=BEAM_SIZE,
        chunk_length=CHUNK_SECONDS,
        condition_on_previous_text=False,
        word_timestamps=WORD_TS,
        no_speech_threshold=0.6,
        compression_ratio_threshold=2.6,
        log_prob_threshold=-1.0
    )

    with open(out_txt_path, "w", encoding="utf-8") as tf:
        buf = []
        for seg in segments_iter:
            t = (seg.text or "").strip()
            if t:
                buf.append(t)
        text = " ".join(buf)
        text = re.sub(r"\s+", " ", text).strip()
        text = re.sub(r"\s+([.!?…])", r"\1", text)
        tf.write(text + "\n")

    if not text:
        raise RuntimeError("Empty transcript: no speech detected.")

    return {
        "language": getattr(info, "language", FORCE_LANG),
        "duration_min": round(getattr(info, "duration", 0)/60, 1),
        "device": device,
        "compute": compute_type,
    }

# ================== Gemini ==================
def _get_gemini_api_key_any() -> str:
    key = os.environ.get("GEMINI_API_KEY", "")
    if key:
        return key
    try:
        from google.colab import userdata
        key = userdata.get("GOOGLE_API_KEY")
        if key:
            os.environ["GEMINI_API_KEY"] = key
            return key
    except Exception:
        pass
    raise RuntimeError("API key mancante. Imposta GEMINI_API_KEY o Colab Secret GOOGLE_API_KEY.")

def _smart_chunks(txt: str, max_chars: int = MAX_MD_CHARS) -> list[str]:
    if len(txt) <= max_chars:
        return [txt]
    parts, buf, size = [], [], 0
    for para in re.split(r"(\n{2,})", txt):
        if size + len(para) > max_chars and buf:
            parts.append("".join(buf)); buf, size = [], 0
        buf.append(para); size += len(para)
    if buf:
        parts.append("".join(buf))
    return parts

def _lesson_system_instruction(lang_hint: str | None) -> str:
    return f"""You are an accurate note-taker. Convert the transcript into clean LESSON NOTES in Markdown.
- Keep the original language; if unclear use '{lang_hint or "auto"}'.
- No timestamps. Do not invent content. Keep technical terms verbatim.
- Prefer bullet points and short lines.
Structure:
# Titolo sintetico
## Obiettivi della lezione
## Outline (sequenza degli argomenti)
## Concetti chiave
## Esempi/Case
## Termini e definizioni (formato **Termine:** definizione)
## Citazioni utili (tra virgolette)
## Domande aperte
## Action items / Compiti
Output: valid Markdown only.
""".strip()

def _stream_text(chunks: Iterable) -> str:
    out = []
    for ch in chunks:
        t = getattr(ch, "text", None)
        if t:
            out.append(t)
    return "".join(out)

def generate_notes_with_genai(raw_text: str,
                              lang_hint: str | None = None,
                              enable_search: bool | None = None) -> str:
    _get_gemini_api_key_any()
    client = genai.Client(api_key=os.environ["GEMINI_API_KEY"])
    system_instruction = [types.Part.from_text(text=_lesson_system_instruction(lang_hint))]
    chunks = _smart_chunks(raw_text, max_chars=MAX_MD_CHARS)
    if enable_search is None:
        enable_search = ENABLE_SEARCH_DEFAULT

    tools = []
    if enable_search:
        tools = [
            types.Tool(url_context=types.UrlContext()),
            types.Tool(googleSearch=types.GoogleSearch()),
        ]

    md_parts = []
    for i, ch in enumerate(chunks, 1):
        contents = [types.Content(
            role="user",
            parts=[types.Part.from_text(
                text=f"Part {i}/{len(chunks)}. Apply the system instruction and produce LESSON NOTES.\n\n{ch}"
            )],
        )]
        base_config = types.GenerateContentConfig(
            system_instruction=system_instruction,
            temperature=0.2,
            top_p=0.95,
            max_output_tokens=8192,
            thinking_config=types.ThinkingConfig(thinking_budget=1200),
        )
        try:
            cfg = base_config if not tools else types.GenerateContentConfig(
                system_instruction=system_instruction,
                temperature=base_config.temperature,
                top_p=base_config.top_p,
                max_output_tokens=base_config.max_output_tokens,
                thinking_config=base_config.thinking_config,
                tools=tools,
            )
            stream = client.models.generate_content_stream(
                model=GEMINI_MODEL, contents=contents, config=cfg
            )
            md = _stream_text(stream).strip()
        except Exception:
            # fallback without tools
            stream = client.models.generate_content_stream(
                model=GEMINI_MODEL, contents=contents, config=base_config
            )
            md = _stream_text(stream).strip()

        if not md:
            raise RuntimeError("Gemini returned empty text.")
        md_parts.append(md)

    merged = ("\n\n".join(md_parts)).strip()
    return (merged + "\n") if not merged.endswith("\n") else merged

# ================== State / Resume (per-run subfolder) ==================
def run_dir_for(base_name: str) -> str:
    rd = os.path.join(DRIVE_OUT_DIR, base_name)
    os.makedirs(rd, exist_ok=True)
    return rd

def _state_path(base_name: str) -> str:
    return os.path.join(run_dir_for(base_name), "state.json")

def save_state(base_name: str, status: str, meta: dict, out_txt: str, out_md: str, src_audio_drive_path: str | None = None):
    state = {
        "base_name": base_name,
        "status": status,  # TRANSCRIBED_READY_FOR_NOTES | DONE
        "out_txt": out_txt,
        "out_md": out_md,
        "meta": meta,
        "src_audio_drive_path": src_audio_drive_path,
        "ts": time.time(),
    }
    with open(_state_path(base_name), "w", encoding="utf-8") as f:
        json.dump(state, f, ensure_ascii=False, indent=2)

def load_pending_state() -> Optional[dict]:
    if not os.path.isdir(DRIVE_OUT_DIR):
        return None
    pend = []
    for name in os.listdir(DRIVE_OUT_DIR):
        st_path = os.path.join(DRIVE_OUT_DIR, name, "state.json")
        if os.path.isfile(st_path):
            try:
                with open(st_path, "r", encoding="utf-8") as f:
                    st = json.load(f)
                if st.get("status") == "TRANSCRIBED_READY_FOR_NOTES":
                    pend.append((os.path.getmtime(st_path), st))
            except Exception:
                pass
    if not pend:
        return None
    pend.sort(key=lambda x: x[0], reverse=True)
    return pend[0][1]

# ================== Auto-compression (asr_in) ==================
def auto_compress_audio(src_drive_path: str) -> Optional[str]:
    if not (src_drive_path and os.path.isfile(src_drive_path)):
        return None

    in_dir = os.path.dirname(src_drive_path)
    base = os.path.basename(src_drive_path)
    name, ext = os.path.splitext(base)
    ext = ext.lower()

    if ext == ".wav":
        dst_path = os.path.join(in_dir, f"{name}.flac")
        print("   Compressing WAV → FLAC (lossless)")
        cmd = ["ffmpeg", "-y", "-i", src_drive_path, "-compression_level", "5", dst_path]
    elif ext == ".mp3":
        dst_path = os.path.join(in_dir, f"{name}.opus")
        print("   Compressing MP3 → Opus (64 kbps)")
        cmd = ["ffmpeg", "-y", "-i", src_drive_path, "-b:a", "64k", dst_path]
    elif ext == ".flac":
        print("   Source already FLAC — no compression needed.")
        return src_drive_path
    else:
        print(f"   Unsupported extension for compression: {ext}")
        return src_drive_path

    subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)

    try:
        os.remove(src_drive_path)
        print(f"   Deleted original → {base}")
    except Exception as e:
        print(f"   Warning: could not delete original ({e.__class__.__name__})")

    return dst_path

# ================== MAIN ==================
t0 = time.perf_counter()
print("0/7 Mounting Drive…"); mount_drive()
print(f"   IN:  {DRIVE_IN_DIR}\n   OUT: {DRIVE_OUT_DIR}\n   CACHE: {MODEL_CACHE}")

print("1/7 Install packages…"); pip_install()
print("2/7 Cache Whisper model…"); ensure_model_cached()

resume = load_pending_state()
if resume:
    print("3/7 Resume found → skip transcription.")
    base_name = resume["base_name"]
    run_dir   = run_dir_for(base_name)
    out_txt   = resume["out_txt"]
    out_md    = resume["out_md"]
    local_audio = None
    meta = resume.get("meta", {})
    src_path = resume.get("src_audio_drive_path")
else:
    print("3/7 Preparing input…")
    src_path, base_name, local_audio = prepare_audio()
    run_dir = run_dir_for(base_name)
    out_txt = os.path.join(run_dir, f"{base_name}.txt")
    out_md  = os.path.join(run_dir, f"{base_name}_lezione.md")
    print(f"   File: {src_path}")

    print("4/7 Transcription…")
    meta = transcribe(local_audio, out_txt)
    print(f"   OK. language={meta['language']} duration≈{meta['duration_min']} min device={meta['device']} compute={meta['compute']}")
    save_state(base_name, "TRANSCRIBED_READY_FOR_NOTES", meta, out_txt, out_md, src_audio_drive_path=src_path)

if resume:
    print("4/7 Transcription… [SKIP]")

print("5/7 Gemini Lesson Notes…")
try:
    with open(out_txt, "r", encoding="utf-8") as f:
        raw = f.read().strip()
    notes_md = generate_notes_with_genai(raw_text=raw, lang_hint=(FORCE_LANG or "auto"), enable_search=False)
    with open(out_md, "w", encoding="utf-8") as f:
        f.write(notes_md)
    save_state(base_name, "DONE", meta, out_txt, out_md, src_audio_drive_path=src_path)
    print(f"   Notes → {out_md}")
except Exception as e:
    print("   ERROR in Notes step — state saved. You can retry from the .txt")
    print("   Details:", repr(e))
    print("7/7 Partial. Total time:", round(time.perf_counter()-t0, 1), "s")
    sys.exit(0)

print("6/7 Auto compression & cleanup in asr_in…")
try:
    if src_path:
        new_audio_path = auto_compress_audio(src_path)
        if new_audio_path:
            meta["compressed_audio"] = new_audio_path
except Exception as e:
    print(f"   Compression skipped ({e.__class__.__name__}): {e}")

# Persist meta.json to the run subfolder
try:
    with open(os.path.join(run_dir, "meta.json"), "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)
    print(f"   Metadata → {os.path.join(run_dir, 'meta.json')}")
except Exception as e:
    print(f"   Warning: could not write meta.json ({e.__class__.__name__})")

print("7/7 Done. Total time:", round(time.perf_counter()-t0, 1), "s")

0/7 Mounting Drive…
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
   IN:  /content/drive/MyDrive/asr_in
   OUT: /content/drive/MyDrive/asr_out
   CACHE: /content/drive/MyDrive/.ctranslate2-cache
1/7 Install packages…
2/7 Cache Whisper model…
   Model cache → OK for Systran/faster-distil-whisper-large-v2 (/content/drive/MyDrive/.ctranslate2-cache/models--Systran--faster-distil-whisper-large-v2/snapshots/fe9b404fc56de3f7c38606ef9ba6fd83526d05e4)
3/7 Preparing input…
   File: /content/drive/MyDrive/asr_in/audiostorto.mp3
4/7 Transcription…
   Model cache → OK for Systran/faster-distil-whisper-large-v2 (/content/drive/MyDrive/.ctranslate2-cache/models--Systran--faster-distil-whisper-large-v2/snapshots/fe9b404fc56de3f7c38606ef9ba6fd83526d05e4)
   OK. language=it duration≈28.5 min device=cuda compute=int8_float16
5/7 Gemini Lesson Notes…
   Notes → /content/drive/MyDrive/asr_out/audiostorto/audiostorto_lezione