# üéôÔ∏è Japanese Audio ‚Üí SRT Subtitles (JP + EN)

This notebook:
1. **Transcribes** uploaded Japanese audio using [Qwen/Qwen3-ASR-1.7B](https://huggingface.co/Qwen/Qwen3-ASR-1.7B) with word-level timestamps via the ForcedAligner
2. **Generates** an SRT subtitle file from the transcription
3. **Translates** the Japanese SRT to English using [Helsinki-NLP/opus-mt-ja-en](https://huggingface.co/Helsinki-NLP/opus-mt-ja-en)

**Requirements:** A Colab runtime with a **T4 GPU** (free tier works).

> ‚ö†Ô∏è Make sure you've selected **Runtime ‚Üí Change runtime type ‚Üí T4 GPU** before running.

## 1 ¬∑ Install Dependencies

In [None]:
!pip install -q qwen-asr transformers sentencepiece sacremoses

In [None]:
!pip install -U flash-attn --no-build-isolation

## 2 ¬∑ Upload Japanese Audio File

Supported formats: `.wav`, `.mp3`, `.flac`, `.ogg`, `.m4a`, etc.

In [None]:
import ipywidgets as widgets
from IPython.display import display, Audio, HTML
import os

uploader = widgets.FileUpload(
    accept=".wav,.mp3,.flac,.ogg,.m4a,.aac,.wma,.opus",
    multiple=False,
    description="Select audio",
    button_style="primary",
    layout=widgets.Layout(width="300px"),
)

status = widgets.HTML(value="<i>No file selected.</i>")

AUDIO_PATH = None


def on_upload(change):
    global AUDIO_PATH
    uploaded = change["new"]
    if uploaded:
        file_info = uploaded[0]
        name = file_info["name"]
        content = file_info["content"]
        AUDIO_PATH = os.path.join("/content", name)
        with open(AUDIO_PATH, "wb") as f:
            f.write(content)
        size_mb = len(content) / (1024 * 1024)
        status.value = f"‚úÖ <b>{name}</b> uploaded ({size_mb:.1f} MB)"


uploader.observe(on_upload, names="value")
display(widgets.VBox([uploader, status]))

In [None]:
import os

# Preview the uploaded audio

AUDIO_PATH = "ja_audio.mp3"

if AUDIO_PATH and os.path.exists(AUDIO_PATH):
    display(Audio(AUDIO_PATH))
else:
    print("‚ö†Ô∏è  Please upload an audio file in the cell above first.")

## 3 ¬∑ Transcribe with Qwen3-ASR-1.7B

To fit on a T4 (15 GB VRAM), we run ASR and alignment as **two separate steps** so both models are never loaded at the same time.

In [None]:
# @title Configuration { display-mode: "form" }

# @markdown **Chunk length (seconds)** ‚Äî Each audio chunk is processed separately
# @markdown to fit in GPU memory. Shorter = less VRAM but more chunks.
# @markdown 20 s works on a free-tier T4 (15 GB). Increase if you have more VRAM.
CHUNK_SEC = 20  # @param {type:"slider", min:5, max:120, step:5}

# @markdown ---
# @markdown **Gemini translation batch size** ‚Äî Number of subtitle lines sent
# @markdown per API call. Larger = fewer calls but longer prompts.
GEMINI_BATCH_SIZE = 100  # @param {type:"slider", min:10, max:500, step:10}

In [None]:
import gc
import os
import numpy as np
import torch
import librosa
from qwen_asr import Qwen3ASRModel

# Help PyTorch reuse freed VRAM fragments
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

assert AUDIO_PATH and os.path.exists(AUDIO_PATH), (
    "No audio file found. Run the upload cell above first."
)

# --- Load and split audio manually ---
# The audio tower's attention is O(n¬≤) on sequence length, so we split
# into short segments and feed each one individually.
SR = 16_000  # qwen-asr expects 16 kHz

print(f"Loading audio: {os.path.basename(AUDIO_PATH)} ‚Ä¶")
full_wav, _ = librosa.load(AUDIO_PATH, sr=SR, mono=True)
total_dur = len(full_wav) / SR
print(f"Duration: {total_dur:.1f} s  ({total_dur / 60:.1f} min)")

chunk_samples = CHUNK_SEC * SR
audio_chunks = []
for start in range(0, len(full_wav), chunk_samples):
    chunk = full_wav[start : start + chunk_samples]
    if len(chunk) < SR // 2:  # skip tiny tail < 0.5 s
        continue
    audio_chunks.append((float(start) / SR, chunk))

print(f"Split into {len(audio_chunks)} chunks of ‚â§{CHUNK_SEC} s each.")

# --- Load ASR model ---
print("\nLoading Qwen3-ASR-1.7B ‚Ä¶")
asr_model = Qwen3ASRModel.from_pretrained(
    "Qwen/Qwen3-ASR-1.7B",
    dtype=torch.bfloat16,
    device_map="cuda:0",
    attn_implementation="flash_attention_2",
    max_inference_batch_size=1,
    max_new_tokens=4096,
)
print("‚úÖ ASR model loaded.")

In [None]:
# Transcribe each chunk individually to stay within T4 VRAM
all_texts = []
for i, (offset, chunk_wav) in enumerate(audio_chunks):
    print(
        f"  Chunk {i + 1}/{len(audio_chunks)}  "
        f"[{offset:.1f}s ‚Äì {offset + len(chunk_wav) / SR:.1f}s] ‚Ä¶",
        end=" ",
    )
    r = asr_model.transcribe(
        audio=(chunk_wav, SR),
        language="Japanese",
        return_time_stamps=False,
    )
    text = r[0].text.strip()
    all_texts.append(text)
    print(text[:80])

transcribed_text = "".join(all_texts)
print(f"\n{'‚îÄ' * 60}")
print(f"Full transcription ({len(transcribed_text)} chars):\n{transcribed_text}")

# Free ASR model before loading the aligner
del asr_model
gc.collect()
torch.cuda.empty_cache()
print("\n‚úÖ ASR model unloaded ‚Äî GPU memory freed.")

In [None]:
# --- Step 2: Forced Aligner for word-level timestamps ---
from dataclasses import replace
from qwen_asr import Qwen3ForcedAligner

print("Loading Qwen3-ForcedAligner-0.6B ‚Ä¶")
aligner = Qwen3ForcedAligner.from_pretrained(
    "Qwen/Qwen3-ForcedAligner-0.6B",
    dtype=torch.bfloat16,
    device_map="cuda:0",
    attn_implementation="flash_attention_2",
)
print("‚úÖ Aligner loaded.")

# Align each chunk separately (same chunking as ASR) and shift timestamps
print("Aligning timestamps ‚Ä¶")
time_stamps = []
for i, (offset, chunk_wav) in enumerate(audio_chunks):
    chunk_text = all_texts[i]
    if not chunk_text.strip():
        continue
    print(f"  Aligning chunk {i + 1}/{len(audio_chunks)} ‚Ä¶")
    alignment = aligner.align(
        audio=(chunk_wav, SR),
        text=chunk_text,
        language="Japanese",
    )
    # Shift timestamps by the chunk's offset (stamps are frozen dataclasses)
    for stamp in alignment[0]:
        shifted = replace(
            stamp,
            start_time=stamp.start_time + offset,
            end_time=stamp.end_time + offset,
        )
        time_stamps.append(shifted)

print(f"\nTimestamp segments: {len(time_stamps)}")
if time_stamps:
    print(
        f"First: {time_stamps[0].text} [{time_stamps[0].start_time:.2f}s ‚Äì {time_stamps[0].end_time:.2f}s]"
    )

# Free aligner
del aligner
gc.collect()
torch.cuda.empty_cache()
print("\n‚úÖ Aligner unloaded ‚Äî GPU memory freed.")

## 4 ¬∑ Generate SRT Subtitles

In [None]:
from datetime import timedelta


def format_srt_time(seconds: float) -> str:
    """Convert seconds to SRT timestamp format: HH:MM:SS,mmm"""
    td = timedelta(seconds=seconds)
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    secs = total_seconds % 60
    millis = int(td.microseconds / 1000)
    return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"


def group_timestamps_to_subtitles(
    stamps, max_chars: int = 40, max_duration: float = 7.0, gap_threshold: float = 0.6
):
    """
    Group word-level timestamps into subtitle segments.

    Args:
        stamps: list of timestamp objects with .text, .start_time, .end_time
        max_chars: max characters per subtitle line
        max_duration: max duration (seconds) per subtitle
        gap_threshold: silence gap (seconds) that forces a new subtitle
    """
    if not stamps:
        return []

    subtitles = []
    current_text = ""
    current_start = stamps[0].start_time
    current_end = stamps[0].end_time

    for i, stamp in enumerate(stamps):
        # Decide whether to start a new subtitle
        start_new = False
        if i == 0:
            current_text = stamp.text
            current_start = stamp.start_time
            current_end = stamp.end_time
            continue

        # Check gap between previous and current word
        gap = stamp.start_time - current_end
        new_duration = stamp.end_time - current_start
        new_len = len(current_text) + len(stamp.text)

        if gap > gap_threshold or new_duration > max_duration or new_len > max_chars:
            start_new = True

        if start_new:
            subtitles.append((current_start, current_end, current_text.strip()))
            current_text = stamp.text
            current_start = stamp.start_time
            current_end = stamp.end_time
        else:
            current_text += stamp.text
            current_end = stamp.end_time

    # Don't forget the last segment
    if current_text.strip():
        subtitles.append((current_start, current_end, current_text.strip()))

    return subtitles


def build_srt(subtitles) -> str:
    """Build SRT string from list of (start, end, text) tuples."""
    lines = []
    for idx, (start, end, text) in enumerate(subtitles, 1):
        lines.append(str(idx))
        lines.append(f"{format_srt_time(start)} --> {format_srt_time(end)}")
        lines.append(text)
        lines.append("")  # blank line separator
    return "\n".join(lines)


# Build Japanese SRT
subtitles_ja = group_timestamps_to_subtitles(time_stamps)

srt_ja = build_srt(subtitles_ja)

# Save
base_name = os.path.splitext(os.path.basename(AUDIO_PATH))[0]
srt_ja_path = f"/content/{base_name}_ja.srt"
with open(srt_ja_path, "w", encoding="utf-8") as f:
    f.write(srt_ja)

print(f"‚úÖ Japanese SRT saved to: {srt_ja_path}")
print(f"   {len(subtitles_ja)} subtitle segments\n")
print("--- Preview (first 10 segments) ---")
print("\n".join(srt_ja.split("\n")[:40]))

## 5 ¬∑ Translate Subtitles to English (opus-mt, local)

Uses [Helsinki-NLP/opus-mt-ja-en](https://huggingface.co/Helsinki-NLP/opus-mt-ja-en) ‚Äî a lightweight MarianMT model for Japanese ‚Üí English. Fast and runs entirely on-device, but quality is limited for nuanced text.

In [None]:
from transformers import MarianMTModel, MarianTokenizer

TRANSLATION_MODEL = "Helsinki-NLP/opus-mt-ja-en"

print(f"Loading translation model: {TRANSLATION_MODEL} ‚Ä¶")
trans_tokenizer = MarianTokenizer.from_pretrained(TRANSLATION_MODEL)
trans_model = MarianMTModel.from_pretrained(TRANSLATION_MODEL).to("cuda")
print("‚úÖ Translation model loaded.")

In [None]:
def translate_texts(texts: list[str], batch_size: int = 32) -> list[str]:
    """Translate a list of Japanese texts to English in batches."""
    translations = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i : i + batch_size]
        inputs = trans_tokenizer(
            batch, return_tensors="pt", padding=True, truncation=True, max_length=512
        ).to("cuda")
        with torch.no_grad():
            output_ids = trans_model.generate(**inputs, max_length=512)
        decoded = trans_tokenizer.batch_decode(output_ids, skip_special_tokens=True)
        translations.extend(decoded)
    return translations


# Extract Japanese texts from subtitles
ja_texts = [text for _, _, text in subtitles_ja]

print(f"Translating {len(ja_texts)} subtitle segments ‚Ä¶")
en_texts = translate_texts(ja_texts)
print("‚úÖ Translation complete.")

# Build English subtitles with original timings
subtitles_en = [
    (start, end, en_text) for (start, end, _), en_text in zip(subtitles_ja, en_texts)
]

srt_en = build_srt(subtitles_en)

# Save
srt_en_path = f"/content/{base_name}_en.srt"
with open(srt_en_path, "w", encoding="utf-8") as f:
    f.write(srt_en)

print(f"\n‚úÖ English SRT saved to: {srt_en_path}")
print(f"   {len(subtitles_en)} subtitle segments\n")
print("--- Preview (first 10 segments) ---")
print("\n".join(srt_en.split("\n")[:40]))

## 6 ¬∑ Translate Subtitles with Gemini 3 Flash (free via Colab)

Google Colab provides free access to Gemini models through the built-in `google.generativeai` API. Gemini 3 Flash produces much higher-quality translations than the small opus-mt model ‚Äî especially for nuanced or conversational Japanese.

In [None]:
import google.generativeai as genai
import json
import time

# Colab provides a free Gemini API key automatically
from google.colab import userdata

try:
    api_key = userdata.get("GOOGLE_API_KEY")
except userdata.SecretNotFoundError:
    # Fallback: use the Colab-provided default
    import os

    api_key = os.environ.get("GOOGLE_API_KEY", "")

assert api_key, (
    "No Gemini API key found. In Colab, go to üîë Secrets (left sidebar) "
    "and add GOOGLE_API_KEY, or enable the free Gemini integration."
)

genai.configure(api_key=api_key)
gemini_model = genai.GenerativeModel("gemini-3-flash")

SYSTEM_PROMPT = (
    "You are a professional Japanese-to-English subtitle translator. "
    "You will receive numbered Japanese subtitle lines. "
    "Return ONLY a JSON array of strings ‚Äî one English translation per line, "
    "in the same order. Keep translations concise and natural for subtitles. "
    "Preserve the original meaning and tone. Do NOT add numbering or extra text."
)


def translate_with_gemini(
    texts: list[str], batch_size: int = GEMINI_BATCH_SIZE
) -> list[str]:
    """Translate Japanese texts to English using Gemini 3 Flash in batches."""
    all_translations = []

    for batch_start in range(0, len(texts), batch_size):
        batch = texts[batch_start : batch_start + batch_size]
        batch_num = batch_start // batch_size + 1
        total_batches = (len(texts) + batch_size - 1) // batch_size
        print(f"  Batch {batch_num}/{total_batches} ({len(batch)} lines) ‚Ä¶", end=" ")

        # Build numbered input
        numbered = "\n".join(f"{i + 1}. {t}" for i, t in enumerate(batch))
        prompt = f"{SYSTEM_PROMPT}\n\nSubtitle lines:\n{numbered}"

        for attempt in range(3):
            try:
                response = gemini_model.generate_content(prompt)
                raw = response.text.strip()
                # Strip markdown code fences if present
                if raw.startswith("```"):
                    raw = raw.split("\n", 1)[1]
                    raw = raw.rsplit("```", 1)[0]
                translations = json.loads(raw)
                assert isinstance(translations, list) and len(translations) == len(
                    batch
                )
                all_translations.extend(translations)
                print("‚úÖ")
                break
            except (json.JSONDecodeError, AssertionError, Exception) as e:
                if attempt < 2:
                    print(f"‚ö†Ô∏è retry ({e.__class__.__name__}) ‚Ä¶", end=" ")
                    time.sleep(2**attempt)
                else:
                    # Fallback: return originals for this batch
                    print(f"‚ùå fallback (kept Japanese)")
                    all_translations.extend(batch)

        # Respect free-tier rate limits
        if batch_start + batch_size < len(texts):
            time.sleep(1)

    return all_translations


# --- Translate ---
ja_texts_gemini = [text for _, _, text in subtitles_ja]
print(f"Translating {len(ja_texts_gemini)} subtitles with Gemini 3 Flash ‚Ä¶\n")
en_texts_gemini = translate_with_gemini(ja_texts_gemini)
print(f"\n‚úÖ Gemini translation complete.")

# Build English subtitles with original timings
subtitles_en_gemini = [
    (start, end, en_text)
    for (start, end, _), en_text in zip(subtitles_ja, en_texts_gemini)
]

srt_en_gemini = build_srt(subtitles_en_gemini)

# Save
srt_en_gemini_path = f"/content/{base_name}_en_gemini.srt"
with open(srt_en_gemini_path, "w", encoding="utf-8") as f:
    f.write(srt_en_gemini)

print(f"‚úÖ Gemini English SRT saved to: {srt_en_gemini_path}")
print(f"   {len(subtitles_en_gemini)} subtitle segments\n")
print("--- Preview (first 10 segments) ---")
print("\n".join(srt_en_gemini.split("\n")[:40]))

## 7 ¬∑ Side-by-Side Comparison (opus-mt vs Gemini)

In [None]:
header = (
    f"{'#':>3}  {'Time':^27}  {'Japanese':<30}  {'opus-mt':<30}  {'Gemini 3 Flash':<30}"
)
print(header)
print("‚îÄ" * len(header))
for i, ((s, e, ja), (_, _, en_opus), (_, _, en_gem)) in enumerate(
    zip(subtitles_ja, subtitles_en, subtitles_en_gemini), 1
):
    time_str = f"{format_srt_time(s)} ‚Üí {format_srt_time(e)}"
    print(f"{i:>3}  {time_str}  {ja:<30}  {en_opus:<30}  {en_gem:<30}")
    if i >= 30:
        remaining = len(subtitles_ja) - 30
        if remaining > 0:
            print(f"\n... and {remaining} more segments.")
        break

## 8 ¬∑ Download SRT Files

In [None]:
try:
    from google.colab import files

    print("Downloading Japanese SRT ‚Ä¶")
    files.download(srt_ja_path)
    print("Downloading English SRT (opus-mt) ‚Ä¶")
    files.download(srt_en_path)
    print("Downloading English SRT (Gemini) ‚Ä¶")
    files.download(srt_en_gemini_path)
except ImportError:
    print("Not running in Colab ‚Äî files saved at:")
    print(f"  Japanese:        {srt_ja_path}")
    print(f"  English (opus):  {srt_en_path}")
    print(f"  English (Gemini): {srt_en_gemini_path}")

## 9 ¬∑ Cleanup (Optional)

Free GPU memory if you want to run other things in this session.

In [None]:
import gc

del trans_model, trans_tokenizer
gc.collect()
torch.cuda.empty_cache()
print("‚úÖ GPU memory freed.")