In [1]:
!pip install -q ffmpeg-python requests
!apt-get -y install ffmpeg




ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 123 not upgraded.


In [2]:
from pathlib import Path
import os, time, json
import ffmpeg
import requests
from kaggle_secrets import UserSecretsClient

In [3]:
# Config
INPUT_PATH = Path("URL")
# or: INPUT_PATH = Path("/kaggle/input/my-folder-with-mp4s")

OUT_DIR = Path("/kaggle/working/transcricoes")
OUT_DIR.mkdir(parents=True, exist_ok=True)

LANGUAGE_CODE = "pt"  # optional. Set None to let AssemblyAI auto-detect.
ENABLE_SPEAKER_LABELS = True

# IMPORTANT: store your key in Kaggle Secrets as ASSEMBLYAI_API_KEY
user_secrets = UserSecretsClient()
ASSEMBLYAI_API_KEY = user_secrets.get_secret("ASSEMBLYAI_API_KEY")

if not ASSEMBLYAI_API_KEY:
    raise RuntimeError("Missing ASSEMBLYAI_API_KEY. Add it in Kaggle -> Add-ons -> Secrets.")

In [4]:
BASE_URL = "https://api.assemblyai.com/v2"

In [5]:
def resolve_video_list(input_path: Path):
    if input_path.is_file():
        if input_path.suffix.lower() != ".mp4":
            raise ValueError(f"Not an .mp4 file: {input_path}")
        return [input_path]
        
    if input_path.is_dir():
        videos = sorted(input_path.glob("*.mp4"))
        if not videos:
            raise FileNotFoundError(f"No .mp4 found in: {input_path}")
        return videos
        
    raise FileNotFoundError(f"INPUT_PATH does not exist: {input_path}")

In [6]:
def extract_audio(video_path: Path, wav_path: Path):
    # Extract audio to mono 16kHz WAV (good default for ASR).
    stream = ffmpeg.input(str(video_path)).audio.filter("dynaudnorm")
    (
        ffmpeg
        .output(stream, str(wav_path), format="wav", acodec="pcm_s16le", ac=1, ar=16000)
        .overwrite_output()
        .run(quiet=True)
    )

In [7]:
def aai_upload_file(file_path: Path) -> str:
    # Upload a local file to AssemblyAI; returns an upload_url.
    headers = {
        "authorization": ASSEMBLYAI_API_KEY,
        "content-type": "application/octet-stream",
    }
    
    with open(file_path, "rb") as f:
        r = requests.post(f"{BASE_URL}/upload", headers=headers, data=f)
    r.raise_for_status()
    
    return r.json()["upload_url"]

In [8]:
def aai_submit_transcript(audio_url: str) -> str:
    # Submit transcription job; returns transcript id.
    headers = {"authorization": ASSEMBLYAI_API_KEY, "content-type": "application/json"}
    payload = {
        "audio_url": audio_url,
        "speaker_labels": bool(ENABLE_SPEAKER_LABELS),
        # Optional quality-of-life settings:
        "punctuate": True,
        "format_text": True,
    }
    
    if LANGUAGE_CODE:
        payload["language_code"] = LANGUAGE_CODE

    r = requests.post(f"{BASE_URL}/transcript", headers=headers, json=payload)
    r.raise_for_status()
    
    return r.json()["id"]

In [9]:
def aai_poll_transcript(transcript_id: str, poll_interval_s: int = 5, timeout_s: int = 3600) -> dict:
    # Poll until completed/error; returns the final transcript JSON.
    headers = {"authorization": ASSEMBLYAI_API_KEY}
    deadline = time.time() + timeout_s

    while True:
        r = requests.get(f"{BASE_URL}/transcript/{transcript_id}", headers=headers)
        r.raise_for_status()
        data = r.json()
        status = data.get("status")
        
        if status == "completed":
            return data
        if status == "error":
            raise RuntimeError(f"AssemblyAI error: {data.get('error')}")
        if time.time() > deadline:
            raise TimeoutError("Transcription timed out.")
        time.sleep(poll_interval_s)

In [10]:
def ms_to_hhmmss(ms: int) -> str:
    s = int(ms // 1000)
    h = s // 3600
    m = (s % 3600) // 60
    sec = s % 60
    
    return f"{h:02d}:{m:02d}:{sec:02d}"

In [11]:
def save_outputs(transcript_json: dict, out_txt: Path, out_json: Path | None = None):
    # Save diarized transcript as TXT + optional raw JSON.
    utterances = transcript_json.get("utterances") or []
    lines = []

    # When speaker_labels=True, AssemblyAI returns 'utterances' with speaker + start/end (ms) + text.
    for u in utterances:
        spk = u.get("speaker", "UNKNOWN")
        start = ms_to_hhmmss(u.get("start", 0))
        end = ms_to_hhmmss(u.get("end", 0))
        text = (u.get("text") or "").strip()
        if text:
            lines.append(f"[{start} - {end}] SPEAKER_{spk}: {text}")

    # Fallback: if no utterances (speaker labels off), save plain text.
    if not lines and transcript_json.get("text"):
        lines = [transcript_json["text"]]

    out_txt.write_text("\n".join(lines), encoding="utf-8")

    if out_json:
        out_json.write_text(json.dumps(transcript_json, ensure_ascii=False, indent=2), encoding="utf-8")


In [12]:
# Pipeline

video_files = resolve_video_list(INPUT_PATH)
print("Videos found:", len(video_files))

for video_path in video_files:
    print("\nâ–¶ Processing:", video_path.name)

    wav_path = OUT_DIR / f"{video_path.stem}.wav"
    out_txt = OUT_DIR / f"{video_path.stem}_assemblyai_diarized.txt"
    out_json = OUT_DIR / f"{video_path.stem}_assemblyai_raw.json"

    extract_audio(video_path, wav_path)
    if not wav_path.exists():
        raise FileNotFoundError(f"WAV not created: {wav_path}")

    upload_url = aai_upload_file(wav_path)
    print("  Uploaded.")

    transcript_id = aai_submit_transcript(upload_url)
    print("  Transcript job id:", transcript_id)

    result = aai_poll_transcript(transcript_id, poll_interval_s=5, timeout_s=7200)
    print("  Status:", result.get("status"))

    save_outputs(result, out_txt=out_txt, out_json=out_json)
    print("Saved:", out_txt)

print("\nDone. Outputs in:", OUT_DIR)


FileNotFoundError: INPUT_PATH does not exist: URL