# Qwen3 Audiobook Studio (All in One)

This notebook merges:
- Qwen3-TTS features (voice cloning, custom voice, voice design)
- Audiobook conversion (ebook or text to audiobook)
- A single Gradio UI with tabs

Notes:
- The converter uses the local Gradio API at http://127.0.0.1:7860
- The same app exposes the API and the UI
- Recommended runtime: GPU

## 0) Check GPU (optional)

In [1]:
!nvidia-smi -L || true
import torch, platform
print("torch:", torch.__version__)
print("cuda available:", torch.cuda.is_available())
print("python:", platform.python_version())

if torch.cuda.is_available():
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    torch.backends.cudnn.benchmark = True
    try:
        torch.set_float32_matmul_precision("high")
    except Exception:
        pass
    print("GPU optimizations enabled")
else:
    print("CUDA not available, using CPU")

GPU 0: Tesla T4 (UUID: GPU-7a02db82-230d-a6ec-4644-50b0d59b743b)
torch: 2.10.0+cu128
cuda available: True
python: 3.12.11
GPU optimizations enabled


## 1) Install dependencies

In [2]:
!sudo apt-get update
!sudo apt-get install -y ffmpeg sox lsof
!pip  install -U pip
# pin requests to keep Colab compatibility (google-colab requires requests==2.32.4)
# Force upgrade of scientific stack but KEEP numpy < 2.0 to avoid binary incompatibilities with Gradio/Matplotlib
%pip install "numpy<2.0" -U qwen-tts gradio gradio_client num2words regex soundfile huggingface_hub "requests==2.32.4" "transformers>=4.52.0" accelerate sentencepiece tokenizers torchvision "scipy>=1.13.0" "scikit-learn>=1.5.0" "pandas>=2.2.0"

# Optional: FlashAttention (can fail, safe to ignore)
try:
    import torch
    if torch.cuda.is_available():
        %pip install -U flash-attn --no-build-isolation
except Exception as e:
    print("flash-attn install failed, continuing:", e)

Get:1 https://nvidia.github.io/libnvidia-container/stable/deb/amd64  InRelease [1477 B]
Get:2 https://download.docker.com/linux/ubuntu noble InRelease [48.5 kB]       
Get:3 https://cli.github.com/packages stable InRelease [3917 B]                
Hit:4 https://us-east-1.ec2.archive.ubuntu.com/ubuntu noble InRelease          
Get:5 https://us-east-1.ec2.archive.ubuntu.com/ubuntu noble-updates InRelease [126 kB]
Get:6 https://us-east-1.ec2.archive.ubuntu.com/ubuntu noble-backports InRelease [126 kB]
Get:7 https://us-east-1.ec2.archive.ubuntu.com/ubuntu noble-security InRelease [126 kB]
Get:8 https://security.ubuntu.com/ubuntu noble-security InRelease [126 kB]     
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2404/x86_64  InRelease [1581 B]
Get:10 https://packages.cloud.google.com/apt cloud-sdk InRelease [1620 B]      
Get:11 https://download.docker.com/linux/ubuntu noble/stable amd64 Packages [54.3 kB]
Get:12 https://download.docker.com/linux/ubuntu noble/stable 

## 2) Clone audiobook converter repo

In [3]:
import os

# Prefer local repo; clone automatically if missing
REPO = "https://github.com/brunoferreira94/Qwen3-Audiobook-Converter.git"
DEST = "/teamspace/studios/this_studio/Qwen3-Audiobook-Converter"

if os.path.exists(DEST):
    print("Using local repo:", DEST)
else:
    print("Local repo not found. Cloning:", REPO)
    !git clone --depth 1 {REPO} {DEST}

%cd {DEST}
!pip -q install -r requirements.txt

Using local repo: /teamspace/studios/this_studio/Qwen3-Audiobook-Converter
/teamspace/studios/this_studio/Qwen3-Audiobook-Converter


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


## 3) Launch unified Gradio app

This single app provides:
- Voice cloning (Base model)
- Custom voices (CustomVoice model)
- Voice design (VoiceDesign model)
- Audiobook conversion (uses the same local API)

The app runs on port 7860.

In [3]:
import os
import signal
import subprocess

def _kill_port_7860():
    pids = []
    for cmd in (["/usr/sbin/lsof", "-t", "-i:7860"], ["lsof", "-t", "-i:7860"], ["fuser", "-k", "7860/tcp"]):
        try:
            out = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip()
            if cmd[0] == "fuser":
                return True
            if out:
                pids = out.split()
                break
        except Exception:
            continue
    for pid in pids:
        try:
            os.kill(int(pid), signal.SIGKILL)
        except Exception:
            pass
    return bool(pids)

killed = _kill_port_7860()
print("Port 7860 cleared" if killed else "No process found on port 7860")

: 

In [2]:
import os, pathlib, shutil, subprocess, zipfile, tempfile, re, json, time, gc, contextlib
import gradio as gr
import torch
import soundfile as sf
from num2words import num2words
from qwen_tts import Qwen3TTSModel
from transformers import pipeline

REPO_DIR = DEST if "DEST" in globals() else "/teamspace/studios/this_studio/Qwen3-Audiobook-Converter"
if not os.path.exists(REPO_DIR):
    raise FileNotFoundError(f"Repo not found: {REPO_DIR}")

BOOK_DIR = os.path.join(REPO_DIR, "book_to_convert")
OUT_DIR  = os.path.join(REPO_DIR, "audiobooks")
VOICES_DIR = os.path.join(REPO_DIR, "saved_voices") # Directory for saved voices

pathlib.Path(BOOK_DIR).mkdir(parents=True, exist_ok=True)
pathlib.Path(OUT_DIR).mkdir(parents=True, exist_ok=True)
pathlib.Path(VOICES_DIR).mkdir(parents=True, exist_ok=True)

AUDIO_EXTS = {".mp3", ".wav", ".m4a", ".flac", ".ogg"}

MODEL_BASE = "Qwen/Qwen3-TTS-12Hz-1.7B-Base"
MODEL_CUSTOM = "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice"
MODEL_DESIGN = "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign"

current_model = None
current_model_type = None

# --- GPU / dtype helpers ---
def _torch_dtype():
    return torch.float16 if torch.cuda.is_available() else torch.float32

def _autocast():
    if torch.cuda.is_available():
        return torch.amp.autocast(device_type="cuda", dtype=_torch_dtype())
    return contextlib.nullcontext()

def _cleanup():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()

# --- Model loader (single shared) ---
def load_model(model_type: str):
    global current_model, current_model_type

    if current_model_type == model_type and current_model is not None:
        return current_model

    if current_model is not None:
        del current_model
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

    if model_type == "base":
        model_name = MODEL_BASE
    elif model_type == "custom":
        model_name = MODEL_CUSTOM
    elif model_type == "design":
        model_name = MODEL_DESIGN
    else:
        raise ValueError("Unknown model type")

    current_model = Qwen3TTSModel.from_pretrained(
        model_name,
        device_map="cuda:0" if torch.cuda.is_available() else "cpu",
        dtype=_torch_dtype(),
        attn_implementation="sdpa"
    )
    current_model_type = model_type
    return current_model

# --- I/O helpers ---
def _write_wav(wavs, sr):
    temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    sf.write(temp_file.name, wavs[0], sr)
    return temp_file.name

# --- Saved Voices Helpers ---
def list_saved_voices():
    if not os.path.exists(VOICES_DIR):
        return []
    return sorted([f.name for f in pathlib.Path(VOICES_DIR).glob("*") if f.suffix.lower() in AUDIO_EXTS])

def save_new_voice(audio_file, voice_name):
    if not audio_file:
        return "Please upload an audio file first.", list_saved_voices()
    if not voice_name or not voice_name.strip():
        return "Please provide a name for the voice.", list_saved_voices()
    
    # Sanitize filename
    safe_name = re.sub(r'[^\w\-_\. ]', '_', voice_name.strip())
    original_ext = pathlib.Path(audio_file).suffix
    if not safe_name.endswith(original_ext):
        safe_name += original_ext
        
    dest_path = os.path.join(VOICES_DIR, safe_name)
    shutil.copy(audio_file, dest_path)
    
    return f"Voice '{safe_name}' saved successfully!", list_saved_voices()

def get_voice_path(voice_name):
    if not voice_name:
        return None
    p = os.path.join(VOICES_DIR, voice_name)
    if os.path.exists(p):
        return p
    return None

def delete_saved_voice(voice_name):
    if not voice_name:
        return "No voice selected.", list_saved_voices()
    p = os.path.join(VOICES_DIR, voice_name)
    if os.path.exists(p):
        os.remove(p)
        return f"Voice '{voice_name}' deleted.", list_saved_voices()
    return "Voice not found.", list_saved_voices()

def refresh_voices_dropdown():
    return gr.Dropdown(choices=list_saved_voices())

# --------------------------
# Language / normalization helpers (restored)
# --------------------------
# simple heuristics used by detect_lang_auto
_PT_HINTS = re.compile(r"\b(que|não|para|com|uma|você|vocês|também|mais|menos|porque|pois|então|como|quando|onde|capítulo|prefácio|introdução)\b", re.IGNORECASE)
_EN_HINTS = re.compile(r"\b(the|and|or|you|your|with|from|that|this|there|because|then|how|when|where|chapter|preface|introduction)\b", re.IGNORECASE)

def detect_lang_auto(text: str) -> str:
    """Decide 'pt' or 'en' based on simple keyword heuristics and Portuguese diacritics."""
    if not text:
        return "pt"
    pt = len(_PT_HINTS.findall(text))
    en = len(_EN_HINTS.findall(text))
    if pt == en:
        if re.search(r"[ãõçáéíóúâêôà]", text, re.IGNORECASE):
            return "pt"
    return "en" if en > pt else "pt"

def _norm_language(lang: str) -> str:
    """Normalize incoming language strings for TTS/numbers. Returns 'pt','en' or 'Auto'."""
    if lang is None:
        return "pt"
    s = str(lang).strip()
    if s == "":
        return "pt"
    low = s.lower()
    if low in ("auto", "auto-detect"):
        return "Auto"
    if low.startswith("pt"):
        return "pt"
    if low.startswith("en"):
        return "en"
    if low in ("english", "ingles", "inglês"):
        return "en"
    if low in ("portuguese", "portugues", "português"):
        return "pt"
    return "pt"

def _norm_speaker(spk: str) -> str:
    """Normalize speaker name to one of the known custom voices (case-insensitive).
    Falls back to 'ryan' when unknown."""
    if spk is None:
        return CUSTOM_VOICES[0] if CUSTOM_VOICES else "ryan"
    s = str(spk).strip()
    if s == "":
        return CUSTOM_VOICES[0] if CUSTOM_VOICES else "ryan"
    low = s.lower()
    for x in CUSTOM_VOICES:
        if x.lower() == low:
            return x
    # allow a few common aliases
    aliases = {"male": "ryan", "female": "serena"}
    if low in aliases:
        return aliases[low]
    return CUSTOM_VOICES[0] if CUSTOM_VOICES else "ryan"

def spell_digits(text: str, lang: str) -> str:
    lang_eff = resolve_lang(lang)
    dmap = DIGITS_PT if lang_eff == "pt" else DIGITS_EN
    smap = SEP_PT if lang_eff == "pt" else SEP_EN
    out = []
    for ch in text:
        if ch.isdigit():
            out.append(dmap.get(ch, ch))
        elif ch in smap:
            out.append(" " if ch == " " else smap[ch])
        else:
            out.append(ch)

    final = []
    for tok in out:
        if tok == " ":
            final.append(" ")
        else:
            if final and final[-1] != " ":
                final.append(" ")
            final.append(tok)
    return "".join(final).replace("  ", " ").strip()

def speak_protected(kind: str, original: str, lang: str) -> str:
    """Return a spoken form for protected tokens (ISBN, VERSION, etc.)."""
    s = original.strip()
    lang_eff = resolve_lang(lang, ) if 'resolve_lang' in globals() else ("pt" if (lang or "").lower().startswith("pt") else "en")

    if kind == "VERSION":
        if re.match(r"^v\d", s, re.IGNORECASE):
            s2 = s[1:]
            prefix = "versão " if lang_eff == "pt" else "version "
            return prefix + spell_digits(s2, lang_eff)
        return spell_digits(s, lang_eff)

    if kind == "ISBN":
        rest = re.sub(r"^ISBN(?:-1[03])?:?\s*", "", s, flags=re.IGNORECASE)
        return "ISBN " + spell_digits(rest, lang_eff)

    return spell_digits(s, lang_eff)

# --------------------------
# TTS generation functions
# --------------------------
def voice_clone(text, reference_audio, saved_voice, ref_transcript, use_fast_mode):
    # Determine which audio to use: uploaded > saved
    audio_path = reference_audio
    if not audio_path and saved_voice:
        audio_path = get_voice_path(saved_voice)

    if not text or not audio_path:
        return None
        
    model = load_model("base")
    if use_fast_mode or not ref_transcript:
        prompt_items = model.create_voice_clone_prompt(
            ref_audio=audio_path,
            x_vector_only_mode=True
        )
    else:
        prompt_items = model.create_voice_clone_prompt(
            ref_audio=audio_path,
            ref_text=ref_transcript,
            x_vector_only_mode=False
        )
    with torch.inference_mode():
        with _autocast():
            wavs, sr = model.generate_voice_clone(
                text=text,
                voice_clone_prompt=prompt_items
            )
    out_path = _write_wav(wavs, sr)
    _cleanup()
    return out_path

CUSTOM_VOICES = ["serena", "vivian", "ono_anna", "sohee", "aiden", "dylan", "eric", "ryan", "uncle_fu"]

def custom_voice(text, voice_name, instruction):
    if not text:
        return None
    model = load_model("custom")
    with torch.inference_mode():
        with _autocast():
            if instruction and instruction.strip():
                wavs, sr = model.generate_custom_voice(
                    text=text,
                    speaker=voice_name,
                    instruct=instruction
                )
            else:
                wavs, sr = model.generate_custom_voice(
                    text=text,
                    speaker=voice_name
                )
    out_path = _write_wav(wavs, sr)
    _cleanup()
    return out_path

def voice_design(text, voice_description):
    if not text or not voice_description:
        return None
    model = load_model("design")
    with torch.inference_mode():
        with _autocast():
            wavs, sr = model.generate_voice_design(
                text=text,
                instruct=voice_description
            )
    out_path = _write_wav(wavs, sr)
    _cleanup()
    return out_path

# API endpoint used by audiobook_converter.py
def generate_custom_voice(text, language="Auto", speaker=None, instruct="", model_size="auto", seed=0, **kwargs):
    """Public API (keeps original contract). Honors language variants like pt-BR by
    appending a short instruction to bias accent when possible."""
    model = load_model("custom")
    spk = _norm_speaker(speaker)

    # normalize language for model and build a helpful instruction
    low = (language or "").strip().lower()
    
    # Map to model-compatible language codes (model expects 'english' not 'en', 'portuguese' not 'pt')
    if low.startswith("pt"):
        lang_for_model = "portuguese"
    elif low.startswith("en"):
        lang_for_model = "english"
    else:
        lang_for_model = low or "Auto"
    
    # add a short variant instruction (pt-BR / pt-PT) to bias accent when needed
    variant_instr = ""
    if low == "pt-br":
        variant_instr = "Use Brazilian Portuguese (pt-BR)."
    elif low == "pt-pt":
        variant_instr = "Use European Portuguese (pt-PT)."

    full_instr = (str(instruct or "").strip() + " " + variant_instr).strip()

    try:
        seed_int = int(seed) if seed is not None else 0
        torch.manual_seed(seed_int)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed_int)
    except Exception:
        pass

    with torch.inference_mode():
        with _autocast():
            wavs, sr = model.generate_custom_voice(
                text=text,
                language=lang_for_model,
                speaker=spk,
                instruct=full_instr or ""
            )
    out_path = _write_wav(wavs, sr)
    _cleanup()
    return out_path, sr

# --- Transcription helper ---
_asr_pipeline = None
def loaded_asr_pipeline():
    global _asr_pipeline
    if _asr_pipeline is None:
        import torch
        from transformers import pipeline
        _asr_pipeline = pipeline(
            "automatic-speech-recognition",
            model="openai/whisper-tiny", # Leve e rápido para demo
            device="cuda:0" if torch.cuda.is_available() else "cpu"
        )
    return _asr_pipeline

def transcribe_audio_api(audio):
    if not audio: return ""
    pipe = loaded_asr_pipeline()
    result = pipe(audio)
    return result.get("text", "")

# --- Voice Clone API for audiobook_converter ---
def generate_voice_clone_api_for_converter(ref_audio, ref_text, target_text, language="Auto", use_xvector_only=False, model_size="1.7B", max_chunk_chars=200, chunk_gap=0, seed=-1, **kwargs):
    model = load_model("base")
    
    try:
        if seed is not None and int(seed) != -1:
            torch.manual_seed(int(seed))
            if torch.cuda.is_available():
                torch.cuda.manual_seed_all(int(seed))
    except: pass
    
    if not ref_text or not ref_text.strip():
        # Fallback to xvector if no transcription
        use_xvector_only = True

    if use_xvector_only:
        prompt_items = model.create_voice_clone_prompt(
            ref_audio=ref_audio,
            x_vector_only_mode=True
        )
    else:
        prompt_items = model.create_voice_clone_prompt(
            ref_audio=ref_audio,
            ref_text=ref_text,
            x_vector_only_mode=False
        )
        
    with torch.inference_mode():
        with _autocast():
             wavs, sr = model.generate_voice_clone(
                text=target_text,
                voice_clone_prompt=prompt_items
            )
    
    out_path = _write_wav(wavs, sr)
    _cleanup()
    return out_path, sr

# --------------------------
# TXT preprocessing / protection (uses speak_protected)
# --------------------------
PROTECT_PATTERNS = {
    "DATE_DDMMYYYY": re.compile(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b"),
    "DATE_YYYYMMDD": re.compile(r"\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b"),
    "TIME_HHMM": re.compile(r"\b\d{1,2}:\d{2}\b"),
    "ISBN": re.compile(r"\bISBN(?:-1[03])?:?\s*(?:97[89][-\s]?)?\d{1,5}[-\s]?\d{1,7}[-\s]?\d{1,7}[-\s]?\d\b", re.IGNORECASE),
    "VERSION": re.compile(r"\bv?\d+\.\d+(?:\.\d+){0,3}\b", re.IGNORECASE),
    "LONG_ID": re.compile(r"\b\d{7,}\b"),
    "MIXED_CODE": re.compile(r"\b[A-Z]{2,}\d{2,}[A-Z0-9-]*\b"),
}

DIGITS_PT = {"0":"zero","1":"um","2":"dois","3":"tres","4":"quatro","5":"cinco","6":"seis","7":"sete","8":"oito","9":"nove"}
DIGITS_EN = {"0":"zero","1":"one","2":"two","3":"three","4":"four","5":"five","6":"six","7":"seven","8":"eight","9":"nine"}
SEP_PT = {"/":"barra","-":"traco",".":"ponto",":":"dois pontos"," ":" "}
SEP_EN = {"/":"slash","-":"dash",".":"dot",":":"colon"," ":" "}

def resolve_lang(lang: str) -> str:
    """Normalize language for number conversion: returns 'pt' or 'en'.
    Accepts variants like 'pt-BR' and 'pt-PT'."""
    low = (lang or "pt").strip().lower()
    if low == "auto":
        return "pt"
    if low.startswith("en"):
        return "en"
    if low.startswith("pt"):
        return "pt"
    return "pt"

def spell_digits(text: str, lang: str) -> str:
    lang_eff = resolve_lang(lang)
    dmap = DIGITS_PT if lang_eff == "pt" else DIGITS_EN
    smap = SEP_PT if lang_eff == "pt" else SEP_EN
    out = []
    for ch in text:
        if ch.isdigit():
            out.append(dmap.get(ch, ch))
        elif ch in smap:
            out.append(" " if ch == " " else smap[ch])
        else:
            out.append(ch)
    return " ".join([o for o in out if o != ""]).replace("  ", " ").strip()

def protect_text(text: str, enable: bool):
    if not enable:
        return text, {}
    placeholder_map = {}
    idx = 0
    def _make_placeholder():
        nonlocal idx
        idx += 1
        return f"__KEEP_{idx:06d}__"
    for kind, pat in PROTECT_PATTERNS.items():
        def repl(m, k=kind):
            ph = _make_placeholder()
            placeholder_map[ph] = {"kind": k, "text": m.group(0)}
            return ph
        text = pat.sub(repl, text)
    return text, placeholder_map

def unprotect_text(text: str, placeholder_map: dict, as_digits: bool, lang: str):
    for ph, payload in placeholder_map.items():
        original = payload["text"]
        kind = payload.get("kind")
        repl = speak_protected(kind, original, lang) if as_digits else original
        text = text.replace(ph, repl)
    return text

def numbers_to_words_pt_en(text: str, lang: str) -> str:
    def repl(m):
        raw = m.group(0)

        # lang auto: decide pelo contexto local (janela curta)
        if (lang or "").strip().lower() == "auto":
            # pega um pedacinho ao redor para detectar idioma
            start = max(0, m.start() - 40)
            end = min(len(text), m.end() + 40)
            lang_eff = detect_lang_auto(text[start:end])
        else:
            lang_eff = resolve_lang(lang)

        if "," in raw or "." in raw:
            sep = "," if "," in raw else "."
            left, right = raw.split(sep, 1)
            try:
                left_i = int(left); right_i = int(right)
            except:
                return raw
            if lang_eff == "pt":
                return f"{num2words(left_i, lang='pt_BR')} vírgula {num2words(right_i, lang='pt_BR')}"
            else:
                return f"{num2words(left_i, lang='en')} point {num2words(right_i, lang='en')}"
        try:
            n = int(raw)
        except:
            return raw
        return num2words(n, lang="pt_BR" if lang_eff == "pt" else "en")

    pattern = re.compile(r"(?<![A-Za-zÀ-ÿ_])(\d{1,9}(?:[.,]\d{1,3})?)(?![A-Za-zÀ-ÿ_])")
    return pattern.sub(repl, text)

def _normalize_signature_lines(text: str) -> str:
    """Normalize citation/signature lines like '~ JOHN WOODEN' to natural case."""
    def repl(m):
        name = m.group(1).strip()
        name = re.sub(r"\s+", " ", name)
        return "— " + name.title()
    return re.sub(r"(?m)^\s*~\s*([A-Z][A-Z\s\.'-]{2,})\s*$", repl, text)

def preprocess_txt(input_txt: str, lang: str, pause_sec: float, do_numbers: bool, protect_codes: bool, protected_as_digits: bool, split_blocks: bool):
    text = pathlib.Path(input_txt).read_text(encoding="utf-8", errors="ignore").replace("\r\n","\n").replace("\r","\n")
    text = _normalize_signature_lines(text)
    text, keep = protect_text(text, enable=protect_codes)
    if do_numbers:
        text = numbers_to_words_pt_en(text, lang)
    text = unprotect_text(text, keep, as_digits=protected_as_digits, lang=lang)
    if pause_sec and pause_sec > 0:
        text = re.sub(r"\n\s*\n", "\n...\n", text)
    if split_blocks:
        paras = [p.strip() for p in text.split("\n") if p.strip()]
        chunk_size = 140
        chunks = ["\n".join(paras[i:i+chunk_size]) for i in range(0, len(paras), chunk_size)]
        text = "\n\n".join(chunks)
    out_file = input_txt.replace(".txt", "_processed.txt")
    pathlib.Path(out_file).write_text(text, encoding="utf-8")
    return out_file

def loudnorm_measure(in_file: str, target_lufs: float, true_peak: float, lra: float):
    cmd = ["ffmpeg","-hide_banner","-y","-i",in_file,"-af",f"loudnorm=I={target_lufs}:TP={true_peak}:LRA={lra}:print_format=json","-f","null","-"]
    r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    if r.returncode != 0:
        raise RuntimeError(r.stdout[-1200:])
    blocks = re.findall(r"\{[\s\S]*?\}", r.stdout)
    if not blocks:
        raise RuntimeError("Could not extract loudnorm JSON.\n" + r.stdout[-1200:])
    return json.loads(blocks[-1])

def loudnorm_apply_2pass(in_file: str, out_file: str, target_lufs: float, true_peak: float, lra: float, meas: dict):
    fi = str(meas.get("input_i")); ftp = str(meas.get("input_tp"))
    flra = str(meas.get("input_lra")); fth = str(meas.get("input_thresh"))
    foff = str(meas.get("target_offset"))
    af = (f"loudnorm=I={target_lufs}:TP={true_peak}:LRA={lra}:"
          f"measured_I={fi}:measured_TP={ftp}:measured_LRA={flra}:"
          f"measured_thresh={fth}:offset={foff}:linear=true:print_format=summary")
    cmd = ["ffmpeg","-hide_banner","-y","-i",in_file,"-af",af,out_file]
    r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    if r.returncode != 0:
        raise RuntimeError(r.stdout[-1200:])
    return r.stdout

def normalize_volume_folder_2pass(folder: str, target_lufs: float, true_peak: float, lra: float):
    folder_path = pathlib.Path(folder)
    files = [p for p in folder_path.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTS]
    if not files:
        return "No audio found to normalize."
    logs = []
    for p in files:
        tmp = p.with_suffix(".normtmp" + p.suffix)
        try:
            meas = loudnorm_measure(str(p), target_lufs, true_peak, lra)
            loudnorm_apply_2pass(str(p), str(tmp), target_lufs, true_peak, lra, meas)
            tmp.replace(p)
            logs.append(f"OK {p.name}: normalized (2-pass)")
        except Exception as e:
            if tmp.exists():
                tmp.unlink(missing_ok=True)
            logs.append(f"FAIL {p.name}: {str(e)[:800]}")
    return "\n".join(logs[:220]) + ("\n... log truncated" if len(logs) > 220 else "")

def run_convert(book_file, lang, pause_sec, do_numbers, protect_codes, protected_as_digits, split_blocks, normalize_audio, target_lufs, true_peak, lra, ref_audio_file=None, saved_voice=None):
    if book_file is None:
        return "Please upload a file first.", None

    for item in pathlib.Path(OUT_DIR).rglob("*"):
        if item.is_file():
            item.unlink()

    placeholder = os.path.join(BOOK_DIR, "input_here.txt")
    if os.path.isfile(placeholder):
        os.remove(placeholder)

    dst = os.path.join(BOOK_DIR, os.path.basename(book_file))
    shutil.copy(book_file, dst)

    input_for_converter = dst
    if dst.lower().endswith(".txt"):
        input_for_converter = preprocess_txt(
            dst, lang, pause_sec, do_numbers, protect_codes, protected_as_digits, split_blocks
        )

    # Build command arguments
    cmd_args = ["python", "audiobook_converter.py"]
    
    # Check if using saved voice
    if saved_voice:
        voice_path = get_voice_path(saved_voice)
        if voice_path:
            ref_audio_file = voice_path

    # Add voice clone args if reference audio is provided
    if ref_audio_file:
        cmd_args.append("--voice-clone")
        cmd_args.append("--voice-sample")
        cmd_args.append(ref_audio_file)
    
    p = subprocess.Popen(
        cmd_args,
        cwd=REPO_DIR,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True
    )
    p.stdin.write(input_for_converter + "\n")
    p.stdin.flush()

    logs = []
    for line in p.stdout:
        logs.append(line)
    rc = p.wait()
    log_text = "".join(logs)
    if rc != 0:
        return f"Conversion failed (exit={rc}).\n\n{log_text[-9000:]}", None

    norm_text = ""
    if normalize_audio:
        norm_text += "\n\n=== Loudness normalization ===\n"
        norm_text += normalize_volume_folder_2pass(OUT_DIR, float(target_lufs), float(true_peak), float(lra))

    zip_path = os.path.join(REPO_DIR, "audiobooks_output.zip")
    with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
        for pth in pathlib.Path(OUT_DIR).rglob("*"):
            if pth.is_file():
                z.write(str(pth), str(pth.relative_to(REPO_DIR)))

    return f"Done.\n\n{log_text[-9000:]}{norm_text}", zip_path

with gr.Blocks(title="Qwen3 Audiobook Studio") as demo:
    gr.Markdown("# Qwen3 Audiobook Studio")

    # --- Saved Voices Tab ---
    with gr.Tab("Saved Voices"):
        gr.Markdown("### Manage Your Voice Profiles")
        with gr.Row():
            with gr.Column():
                sv_upload = gr.Audio(label="Upload New Audio Sample", type="filepath")
                sv_name = gr.Textbox(label="Voice Name (e.g., 'MyNarration')")
                sv_save_btn = gr.Button("Save Voice Profile", variant="primary")
                sv_result = gr.Textbox(label="Status")
            
            with gr.Column():
                sv_list_box = gr.Dropdown(choices=list_saved_voices(), label="Existing Voices", interactive=True)
                sv_delete_btn = gr.Button("Delete Selected Voice", variant="stop")
        
        # Actions
        sv_save_btn.click(
            save_new_voice, 
            inputs=[sv_upload, sv_name], 
            outputs=[sv_result, sv_list_box]
        )
        
        sv_delete_btn.click(
            delete_saved_voice,
            inputs=[sv_list_box],
            outputs=[sv_result, sv_list_box]
        )

    with gr.Tab("Voice Cloning"):
        clone_text = gr.Textbox(label="Text", lines=4)
        with gr.Row():
            clone_audio = gr.Audio(label="Reference audio (upload)", type="filepath")
            clone_saved_voice = gr.Dropdown(label="OR Select a Saved Voice", choices=list_saved_voices(), value=None)
        
        # When Saved Voices tab updates dropdown, we want this one to update too ideally. 
        # But Gradio doesn't sync across tabs easily without extra events. 
        # We'll just refresh list on click if needed or rely on app restart for now, 
        # OR we can link the save output to update this component too.
        sv_save_btn.click(refresh_voices_dropdown, outputs=clone_saved_voice)
        sv_delete_btn.click(refresh_voices_dropdown, outputs=clone_saved_voice)

        clone_transcript = gr.Textbox(label="Transcript (optional)", lines=3)
        clone_fast_mode = gr.Checkbox(label="Fast mode", value=True)
        clone_btn = gr.Button("Generate", variant="primary")
        clone_output = gr.Audio(label="Output")
        
        clone_btn.click(
            voice_clone, 
            inputs=[clone_text, clone_audio, clone_saved_voice, clone_transcript, clone_fast_mode], 
            outputs=clone_output
        )

    with gr.Tab("Custom Voice"):
        custom_text = gr.Textbox(label="Text", lines=4)
        custom_voice_name = gr.Dropdown(choices=CUSTOM_VOICES, value="serena", label="Voice")
        custom_instruction = gr.Textbox(label="Style instruction (optional)", lines=2)
        custom_btn = gr.Button("Generate", variant="primary")
        custom_output = gr.Audio(label="Output")
        custom_btn.click(custom_voice, inputs=[custom_text, custom_voice_name, custom_instruction], outputs=custom_output)

    with gr.Tab("Voice Design"):
        design_text = gr.Textbox(label="Text", lines=4)
        design_description = gr.Textbox(label="Voice description", lines=4)
        design_btn = gr.Button("Generate", variant="primary")
        design_output = gr.Audio(label="Output")
        design_btn.click(voice_design, inputs=[design_text, design_description], outputs=design_output)

    with gr.Tab("Audiobook Conversion"):
        book = gr.File(label="Book file (PDF/EPUB/DOCX/TXT)", type="filepath")
        lang = gr.Dropdown(["auto", "pt-BR", "pt-PT", "en"], value="auto", label="Language for numbers")
        
        with gr.Row():
            # Reference Audio input
            ref_audio_input = gr.Audio(label="Reference Audio (Upload)", type="filepath")
            # Saved Voice Select
            ref_saved_voice = gr.Dropdown(label="OR Select Saved Voice", choices=list_saved_voices(), value=None)
            
            sv_save_btn.click(refresh_voices_dropdown, outputs=ref_saved_voice)
            sv_delete_btn.click(refresh_voices_dropdown, outputs=ref_saved_voice)

        with gr.Accordion("TXT preprocessing", open=True):
            do_numbers = gr.Checkbox(value=True, label="Convert numbers to words (TXT only)")
            protect_codes = gr.Checkbox(value=True, label="Protect dates/ISBN/codes (TXT only)")
            protected_as_digits = gr.Checkbox(value=True, label="Read protected patterns as digits")
            pause_sec = gr.Slider(0, 3, value=0.0, step=0.5, label="Insert pause between paragraphs (seconds)")
            split_blocks = gr.Checkbox(value=False, label="Split TXT into blocks")
        with gr.Accordion("Audio post-processing", open=True):
            normalize_audio = gr.Checkbox(value=True, label="Normalize loudness (2-pass)")
            target_lufs = gr.Slider(-24, -12, value=-16, step=1, label="Target LUFS")
            true_peak = gr.Slider(-6, 0, value=-1.5, step=0.5, label="True peak (dBTP)")
            lra = gr.Slider(1, 20, value=11, step=1, label="LRA")
        convert_btn = gr.Button("Convert", variant="primary")
        out_log = gr.Textbox(label="Logs", lines=12)
        out_zip = gr.File(label="Download ZIP", elem_id="auto_zip_file")
        convert_evt = convert_btn.click(
            run_convert,
            inputs=[book, lang, pause_sec, do_numbers, protect_codes, protected_as_digits, split_blocks, normalize_audio, target_lufs, true_peak, lra, ref_audio_input, ref_saved_voice],
            outputs=[out_log, out_zip]
        )

        convert_evt.then(
            fn=None,
            js="""() => { setTimeout(() => { const link = document.querySelector('#auto_zip_file a[href]'); if (link) { link.click(); } }, 700); }"""
        )

        # --- Compatibility API for audiobook_converter.py ---
        # Exposes /generate_custom_voice without changing converter code
        with gr.Row(visible=False):
            # API: generate_custom_voice
            api_text = gr.Textbox(value="", visible=False)
            api_language = gr.Textbox(value="Auto", visible=False)
            api_speaker = gr.Textbox(value="ryan", visible=False)
            api_instruct = gr.Textbox(value="", visible=False)
            api_model_size = gr.Textbox(value="auto", visible=False)
            api_seed = gr.Number(value=0, visible=False)
            api_audio = gr.Audio(type="filepath", visible=False)
            api_sr = gr.Number(visible=False)
            api_btn = gr.Button(visible=False)
            api_btn.click(
                fn=generate_custom_voice,
                inputs=[api_text, api_language, api_speaker, api_instruct, api_model_size, api_seed],
                outputs=[api_audio, api_sr],
                api_name="generate_custom_voice"
            )
            
            # API: transcribe_audio
            api_trans_audio = gr.Audio(type="filepath", visible=False)
            api_trans_out = gr.Textbox(visible=False)
            api_trans_btn = gr.Button(visible=False)
            api_trans_btn.click(
                fn=transcribe_audio_api,
                inputs=[api_trans_audio],
                outputs=[api_trans_out],
                api_name="transcribe_audio"
            )
            
            # API: generate_voice_clone
            api_vc_ref_audio = gr.Audio(type="filepath", visible=False)
            api_vc_ref_text = gr.Textbox(visible=False)
            api_vc_target_text = gr.Textbox(visible=False)
            api_vc_lang = gr.Textbox(visible=False)
            api_vc_xvec = gr.Checkbox(visible=False)
            api_vc_model = gr.Textbox(visible=False)
            api_vc_chars = gr.Number(visible=False)
            api_vc_gap = gr.Number(visible=False)
            api_vc_seed = gr.Number(visible=False)
            
            api_vc_out = gr.Audio(visible=False)
            api_vc_sr = gr.Number(visible=False)
            
            api_vc_btn = gr.Button(visible=False)
            api_vc_btn.click(
                fn=generate_voice_clone_api_for_converter,
                inputs=[api_vc_ref_audio, api_vc_ref_text, api_vc_target_text, api_vc_lang, api_vc_xvec, api_vc_model, api_vc_chars, api_vc_gap, api_vc_seed],
                outputs=[api_vc_out, api_vc_sr],
                api_name="generate_voice_clone"
            )

demo.queue(max_size=32)
demo.launch(server_port=7860, share=True, debug=False)

[0;93m2026-02-22 14:35:41.587118531 [W:onnxruntime:Default, device_discovery.cc:211 DiscoverDevicesForPlatform] GPU device discovery failed: device_discovery.cc:91 ReadFileContents Failed to open file: "/sys/class/drm/card0/device/vendor"[m


* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://4dc8196607cc3c062b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Device set to use cuda:0
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/site-packages/gradio/queueing.py", line 766, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/site-packages/gradio/route_utils.py", line 355, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/site-packages/gradio/blocks.py", line 2157, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.12/site-packages/gradio/blocks.py", line 1634, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/zeus/miniconda3/envs/cl