# Qwen3-TTS Audiobook Converter

**Main Use Case:** Upload an EPUB + voice sample → Get a voice-cloned audiobook

**Supported Environments:**
- Google Colab (with GPU - recommended)
- VS Code with Jupyter extension
- Standard Jupyter notebooks

---

## Important: Enable GPU Runtime (Colab)

If using Google Colab:
1. Go to **Runtime** > **Change runtime type**
2. Select **T4 GPU** (or A100 if available)
3. Click **Save**

---

## 1. Setup & Installation

In [None]:
# Detect environment
import sys

try:
    import google.colab
    IN_COLAB = True
    print("Environment: Google Colab")
except ImportError:
    IN_COLAB = False
    print("Environment: Local Jupyter (VS Code or standalone)")

# Install dependencies
print("\nInstalling dependencies...")

if IN_COLAB:
    !pip install -q qwen-tts>=0.0.5 openai-whisper soundfile ebooklib PyPDF2 pydub beautifulsoup4
else:
    %pip install -q qwen-tts>=0.0.5 openai-whisper soundfile ebooklib PyPDF2 pydub beautifulsoup4 ipywidgets

print("Installation complete!")

In [None]:
# Core imports
import os
import re
import zipfile
import time
import subprocess
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import List, Optional, Tuple, Dict
from dataclasses import dataclass
from html import unescape

import numpy as np
import torch
import soundfile as sf
import whisper
from qwen_tts import Qwen3TTSModel
from tqdm.auto import tqdm
from IPython.display import Audio, display, HTML, clear_output

# Book processing
import PyPDF2
from pydub import AudioSegment

try:
    from bs4 import BeautifulSoup
    BS4_AVAILABLE = True
except ImportError:
    BS4_AVAILABLE = False

# Environment-specific imports
if IN_COLAB:
    from google.colab import files
    WIDGETS_AVAILABLE = False
else:
    try:
        import ipywidgets as widgets
        from IPython.display import display
        WIDGETS_AVAILABLE = True
    except ImportError:
        WIDGETS_AVAILABLE = False

print("All imports successful!")
if not IN_COLAB:
    print(f"File picker widgets: {'Available' if WIDGETS_AVAILABLE else 'Not available (using text input)'}")

In [None]:
# Device detection
def get_device_and_dtype():
    """Auto-detect the best available device and appropriate dtype."""
    if torch.cuda.is_available():
        device = "cuda"
        dtype = torch.bfloat16
        gpu_name = torch.cuda.get_device_name(0)
        gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1e9
        print(f"GPU: {gpu_name} ({gpu_mem:.1f} GB)")
    elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
        device = "mps"
        dtype = torch.float32
        print("Device: Apple Silicon (MPS)")
    else:
        device = "cpu"
        dtype = torch.float32
        print("Device: CPU (this will be slow)")
    return device, dtype

DEVICE, DTYPE = get_device_and_dtype()
print(f"Using: {DEVICE} with {DTYPE}")

## 2. Helper Functions

In [None]:
# File handling with ipywidgets support for local Jupyter

class FileUploader:
    """Cross-platform file uploader that works in Colab and local Jupyter."""
    
    def __init__(self):
        self.uploaded_file = None
    
    def upload(self, prompt: str, accept: str = "", optional: bool = False) -> Optional[str]:
        """Upload a file and return the path."""
        print(prompt)
        if optional:
            print("(Optional - press Enter or Cancel to skip)")
        
        if IN_COLAB:
            return self._colab_upload(optional)
        elif WIDGETS_AVAILABLE:
            return self._widget_upload(accept, optional)
        else:
            return self._text_upload(optional)
    
    def _colab_upload(self, optional: bool) -> Optional[str]:
        """Upload using Colab's built-in uploader."""
        try:
            uploaded = files.upload()
            if uploaded:
                return list(uploaded.keys())[0]
        except Exception:
            if optional:
                return None
            raise
        return None
    
    def _widget_upload(self, accept: str, optional: bool) -> Optional[str]:
        """Upload using ipywidgets FileUpload."""
        uploader = widgets.FileUpload(accept=accept, multiple=False)
        button = widgets.Button(description="Confirm Upload", button_style='success')
        skip_button = widgets.Button(description="Skip", button_style='warning') if optional else None
        output = widgets.Output()
        
        self.uploaded_file = None
        self._upload_done = False
        
        def on_confirm(b):
            with output:
                if uploader.value:
                    file_info = list(uploader.value.values())[0]
                    filename = list(uploader.value.keys())[0]
                    # Save to local file
                    with open(filename, 'wb') as f:
                        f.write(file_info['content'])
                    self.uploaded_file = filename
                    print(f"Saved: {filename}")
                else:
                    print("No file selected")
            self._upload_done = True
        
        def on_skip(b):
            with output:
                print("Skipped")
            self._upload_done = True
        
        button.on_click(on_confirm)
        if skip_button:
            skip_button.on_click(on_skip)
            display(widgets.HBox([uploader, button, skip_button]), output)
        else:
            display(widgets.HBox([uploader, button]), output)
        
        # Wait for user action
        while not self._upload_done:
            time.sleep(0.1)
        
        return self.uploaded_file
    
    def _text_upload(self, optional: bool) -> Optional[str]:
        """Upload using text input (fallback)."""
        print("Enter file path (drag & drop or type):")
        path = input().strip().strip("'\"")
        if not path:
            return None
        if os.path.exists(path):
            return path
        print(f"File not found: {path}")
        return None


# Global uploader instance
uploader = FileUploader()


def read_text_file(file_path: str) -> str:
    """Read text from a file with encoding detection."""
    for encoding in ['utf-8', 'utf-16', 'latin-1', 'cp1252']:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                return f.read().strip()
        except UnicodeDecodeError:
            continue
    raise ValueError(f"Could not decode file: {file_path}")


def download_file(filepath: str):
    """Download a file."""
    if IN_COLAB:
        files.download(filepath)
    else:
        print(f"File saved to: {os.path.abspath(filepath)}")


def play_audio(audio_path: str):
    """Play audio in the notebook."""
    display(Audio(audio_path))


# Create output directory
OUTPUT_DIR = Path("audiobooks")
OUTPUT_DIR.mkdir(exist_ok=True)
print(f"Output directory: {OUTPUT_DIR.absolute()}")

## 3. Load Models

Downloads ~6GB on first run (Qwen3-TTS) + ~74MB (Whisper)

In [None]:
print("Loading Whisper model...")
whisper_model = whisper.load_model("base")
print("Whisper loaded!\n")

print("Loading Qwen3-TTS model...")
print("(First run downloads ~6GB)\n")

tts_model = Qwen3TTSModel.from_pretrained(
    "Qwen/Qwen3-TTS-12Hz-1.7B-Base",
    device_map=DEVICE,
    torch_dtype=DTYPE
)

print("\n" + "="*50)
print("MODELS READY")
print("="*50)

## 4. Book Processing Functions

In [None]:
# ============================================================
# EPUB PARSER - Proper reading order and chapter detection
# ============================================================

@dataclass
class Chapter:
    """Represents a chapter with its content and metadata."""
    id: str
    title: str
    file_path: str
    content: str


class EPUBParser:
    """Parse EPUB files using OPF spine for reading order and NCX for chapter titles."""
    
    # XML Namespaces
    NS = {
        'container': 'urn:oasis:names:tc:opendocument:xmlns:container',
        'opf': 'http://www.idpf.org/2007/opf',
        'dc': 'http://purl.org/dc/elements/1.1/',
        'ncx': 'http://www.daisy.org/z3986/2005/ncx/'
    }
    
    def __init__(self, epub_path: str):
        self.epub_path = epub_path
        self.opf_path = None
        self.opf_dir = None
        self.metadata = {}
        self.manifest = {}
        self.spine = []
        self.ncx_chapters = {}
        self.cover_path = None
    
    def parse(self) -> 'EPUBParser':
        """Parse the EPUB file."""
        with zipfile.ZipFile(self.epub_path, 'r') as z:
            self._find_opf(z)
            self._parse_opf(z)
            self._parse_ncx(z)
            self._find_cover(z)
        return self
    
    def _find_opf(self, z: zipfile.ZipFile):
        """Find OPF file location from container.xml."""
        container = z.read('META-INF/container.xml').decode('utf-8')
        root = ET.fromstring(container)
        rootfile = root.find('.//container:rootfile', self.NS)
        self.opf_path = rootfile.get('full-path')
        self.opf_dir = str(Path(self.opf_path).parent)
    
    def _parse_opf(self, z: zipfile.ZipFile):
        """Parse OPF for metadata, manifest, and spine."""
        opf_content = z.read(self.opf_path).decode('utf-8')
        root = ET.fromstring(opf_content)
        
        # Metadata
        metadata = root.find('opf:metadata', self.NS)
        if metadata is not None:
            title = metadata.find('dc:title', self.NS)
            creator = metadata.find('dc:creator', self.NS)
            self.metadata['title'] = title.text if title is not None else 'Unknown'
            self.metadata['author'] = creator.text if creator is not None else 'Unknown'
            
            # Cover reference
            for meta in metadata.findall('opf:meta', self.NS):
                if meta.get('name') == 'cover':
                    self.metadata['cover_id'] = meta.get('content')
        
        # Manifest
        manifest = root.find('opf:manifest', self.NS)
        for item in manifest.findall('opf:item', self.NS):
            item_id = item.get('id')
            self.manifest[item_id] = {
                'href': item.get('href'),
                'media_type': item.get('media-type')
            }
        
        # Spine (reading order)
        spine = root.find('opf:spine', self.NS)
        for itemref in spine.findall('opf:itemref', self.NS):
            idref = itemref.get('idref')
            linear = itemref.get('linear', 'yes')
            if linear == 'yes' and idref in self.manifest:
                media_type = self.manifest[idref]['media_type']
                if media_type in ('application/xhtml+xml', 'text/html'):
                    self.spine.append(idref)
    
    def _parse_ncx(self, z: zipfile.ZipFile):
        """Parse NCX for chapter titles."""
        # Find NCX file
        ncx_id = None
        for item_id, item in self.manifest.items():
            if item['media_type'] == 'application/x-dtbncx+xml':
                ncx_id = item_id
                break
        
        if not ncx_id:
            return
        
        ncx_href = self.manifest[ncx_id]['href']
        ncx_path = f"{self.opf_dir}/{ncx_href}" if self.opf_dir else ncx_href
        
        try:
            ncx_content = z.read(ncx_path).decode('utf-8')
            root = ET.fromstring(ncx_content)
            
            for navpoint in root.findall('.//ncx:navPoint', self.NS):
                label = navpoint.find('ncx:navLabel/ncx:text', self.NS)
                content = navpoint.find('ncx:content', self.NS)
                
                if label is not None and content is not None:
                    src = content.get('src')
                    # Remove anchor if present
                    file_ref = src.split('#')[0] if src else None
                    title = label.text
                    
                    if file_ref and title:
                        self.ncx_chapters[file_ref] = title
        except Exception:
            pass
    
    def _find_cover(self, z: zipfile.ZipFile):
        """Find and extract cover image."""
        cover_id = self.metadata.get('cover_id')
        if cover_id and cover_id in self.manifest:
            cover_href = self.manifest[cover_id]['href']
            cover_path = f"{self.opf_dir}/{cover_href}" if self.opf_dir else cover_href
            
            # Extract cover to temp file
            try:
                cover_data = z.read(cover_path)
                ext = Path(cover_href).suffix
                temp_cover = f"/tmp/cover{ext}"
                with open(temp_cover, 'wb') as f:
                    f.write(cover_data)
                self.cover_path = temp_cover
            except Exception:
                pass
    
    def get_chapters(self, include_ids: List[str] = None, exclude_ids: List[str] = None) -> List[Chapter]:
        """Get chapters in reading order with optional filtering."""
        chapters = []
        
        with zipfile.ZipFile(self.epub_path, 'r') as z:
            for item_id in self.spine:
                # Apply filters
                if include_ids and item_id not in include_ids:
                    continue
                if exclude_ids and item_id in exclude_ids:
                    continue
                
                item = self.manifest[item_id]
                href = item['href']
                file_path = f"{self.opf_dir}/{href}" if self.opf_dir else href
                
                # Get title from NCX or use item_id
                title = self.ncx_chapters.get(href, item_id)
                
                # Extract content
                try:
                    content = z.read(file_path).decode('utf-8', errors='ignore')
                    clean_content = clean_html(content)
                    clean_content = clean_for_tts(clean_content)
                    
                    if clean_content.strip():
                        chapters.append(Chapter(
                            id=item_id,
                            title=title,
                            file_path=file_path,
                            content=clean_content
                        ))
                except Exception:
                    continue
        
        return chapters


# ============================================================
# TEXT CLEANING FOR TTS
# ============================================================

def clean_html(html_content: str) -> str:
    """Clean HTML content to plain text."""
    if not html_content:
        return ""
    if BS4_AVAILABLE:
        try:
            soup = BeautifulSoup(html_content, 'html.parser')
            for script in soup(["script", "style"]):
                script.decompose()
            text = soup.get_text()
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            return ' '.join(chunk for chunk in chunks if chunk)
        except Exception:
            pass
    # Fallback
    html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
    html_content = re.sub(r'<[^>]+>', ' ', html_content)
    html_content = unescape(html_content)
    return re.sub(r'\s+', ' ', html_content).strip()


def fix_spaced_capitals(text: str) -> str:
    """Convert 'A H OUSE D IVIDED' to 'A House Divided'."""
    # Find sequences of spaced single capitals (3+ in a row)
    def fix_word(match):
        chars = match.group(0).replace(' ', '')
        if len(chars) >= 2:
            return chars[0] + chars[1:].lower()
        return chars
    
    # Pattern: Capital, space, capital repeated (at least 3 capitals)
    pattern = r'\b([A-Z]\s+){2,}[A-Z]+\b'
    return re.sub(pattern, fix_word, text)


def remove_footnote_markers(text: str) -> str:
    """Remove footnote symbols: *, †, ‡, §, etc."""
    # Remove common footnote markers (when not part of a word)
    markers = r'[*†‡§¶‖¹²³⁴⁵⁶⁷⁸⁹⁰]+'
    # Remove markers at end of words or standalone
    text = re.sub(r'\s*' + markers + r'\s*', ' ', text)
    return re.sub(r'\s+', ' ', text)


def normalize_special_chars(text: str) -> str:
    """Normalize special characters for better TTS."""
    # Em-dash and en-dash to hyphen with spaces
    text = text.replace('—', ' - ')
    text = text.replace('–', ' - ')
    # Curly quotes to straight
    text = text.replace('"', '"').replace('"', '"')
    text = text.replace(''', "'").replace(''', "'")
    # Ellipsis
    text = text.replace('…', '...')
    return text


def clean_for_tts(text: str, remove_footnotes: bool = True) -> str:
    """Clean text for TTS consumption."""
    # Fix spaced capitals (e.g., "A H OUSE D IVIDED")
    text = fix_spaced_capitals(text)
    
    # Remove footnote markers
    if remove_footnotes:
        text = remove_footnote_markers(text)
    
    # Normalize special characters
    text = normalize_special_chars(text)
    
    # Normalize whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text


# ============================================================
# LEGACY EXTRACTION (for PDF/TXT)
# ============================================================

def extract_text_from_epub(file_path: str) -> str:
    """Extract text from EPUB (legacy flat extraction)."""
    text_parts = []
    with zipfile.ZipFile(file_path, 'r') as epub_zip:
        html_files = sorted([
            f for f in epub_zip.namelist()
            if f.lower().endswith(('.html', '.xhtml', '.htm'))
            and not f.lower().startswith('__macosx')
        ])
        for file_name in html_files:
            try:
                content = epub_zip.read(file_name).decode('utf-8', errors='ignore')
                clean_text = clean_html(content)
                clean_text = clean_for_tts(clean_text)
                if clean_text.strip():
                    text_parts.append(clean_text)
            except Exception:
                continue
    return '\n\n'.join(text_parts)


def extract_text_from_pdf(file_path: str) -> str:
    text = ""
    with open(file_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        for page in pdf_reader.pages:
            try:
                page_text = page.extract_text()
                if page_text:
                    text += f"\n\n{page_text}"
            except Exception:
                continue
    return clean_for_tts(text.strip())


def extract_text(file_path: str) -> str:
    ext = Path(file_path).suffix.lower()
    if ext == '.epub':
        return extract_text_from_epub(file_path)
    elif ext == '.pdf':
        return extract_text_from_pdf(file_path)
    elif ext == '.txt':
        return clean_for_tts(read_text_file(file_path))
    else:
        raise ValueError(f"Unsupported format: {ext}")


def split_into_chunks(text: str, chunk_size_words: int = 1500) -> List[str]:
    text = re.sub(r'\s+', ' ', text).strip()
    sentences = re.split(r'(?<=[.!?])\s+', text)
    chunks, current_chunk, current_words = [], "", 0
    for sentence in sentences:
        sentence_words = len(sentence.split())
        if current_words + sentence_words <= chunk_size_words:
            current_chunk += sentence + " "
            current_words += sentence_words
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = sentence + " "
            current_words = sentence_words
    if current_chunk.strip():
        chunks.append(current_chunk.strip())
    return [c for c in chunks if c.strip()]


print("Book processing functions loaded!")

## 5. Audiobook Converter

In [None]:
# ============================================================
# AUDIOBOOK CONVERTER - Chapter-aware with M4B output
# ============================================================

@dataclass
class ChapterAudio:
    """Audio data for a single chapter."""
    id: str
    title: str
    audio: np.ndarray
    sample_rate: int
    start_time: float = 0.0
    duration: float = 0.0


class AudiobookConverter:
    """Convert books to audiobooks with chapter markers and M4B output."""
    
    # Default content to include for EPUBs
    DEFAULT_INCLUDE = [
        'foreword', 'introduction',
        'chapter01', 'chapter02', 'chapter03', 'chapter04', 'chapter05',
        'chapter06', 'chapter07', 'chapter08', 'chapter09', 'chapter10',
        'chapter11', 'chapter12', 'chapter13', 'chapter14', 'chapter15',
    ]
    
    # Content to exclude
    DEFAULT_EXCLUDE = [
        'frontcoverImage', 'fm01', 'title', 'copyrightPage',
        'contents', 'mapsandillus', 'acknowledgements',
        'timeline', 'genealogical', 'transliteration',
        'maps', 'maps-1', 'maps-2',
        'further', 'notes', 'bibliography', 'index'
    ]
    
    def __init__(
        self, 
        tts_model, 
        whisper_model, 
        ref_audio: str, 
        ref_text: Optional[str] = None,
        config: Optional[Dict] = None
    ):
        self.tts_model = tts_model
        self.whisper_model = whisper_model
        self.ref_audio = ref_audio
        self.config = config or {}
        
        # Defaults
        self.announce_chapters = self.config.get('announce_chapters', True)
        self.chapter_pause = self.config.get('chapter_pause', 2.5)
        self.chunk_size = self.config.get('chunk_size', 1500)
        self.output_format = self.config.get('output_format', 'm4b')
        
        # Get reference text
        if ref_text:
            self.ref_text = ref_text
            print(f"Using provided transcript: {self.ref_text[:100]}...")
        else:
            print("Transcribing reference audio with Whisper...")
            result = self.whisper_model.transcribe(ref_audio)
            self.ref_text = result["text"].strip()
            print(f"Auto-transcription: {self.ref_text}")
    
    def _generate_audio(self, text: str) -> Tuple[np.ndarray, int]:
        """Generate audio using TTS model."""
        wavs, sr = self.tts_model.generate_voice_clone(
            text=text,
            language="English",
            ref_audio=self.ref_audio,
            ref_text=self.ref_text,
            x_vector_only_mode=False,
            non_streaming_mode=True
        )
        return wavs[0], sr
    
    def _create_silence(self, duration_sec: float, sample_rate: int) -> np.ndarray:
        """Create silence array."""
        return np.zeros(int(duration_sec * sample_rate), dtype=np.float32)
    
    def _generate_chapter_audio(self, chapter: Chapter, sample_rate: int = None) -> ChapterAudio:
        """Generate audio for a single chapter with optional announcement."""
        audio_parts = []
        sr = sample_rate
        
        # Generate chapter announcement
        if self.announce_chapters:
            print(f"  Generating announcement: {chapter.title[:50]}...")
            try:
                ann_audio, sr = self._generate_audio(chapter.title)
                audio_parts.append(ann_audio)
                # Brief pause after announcement
                audio_parts.append(self._create_silence(1.0, sr))
            except Exception as e:
                print(f"  Warning: Announcement failed: {e}")
        
        # Generate content in chunks
        chunks = split_into_chunks(chapter.content, self.chunk_size)
        print(f"  Generating {len(chunks)} chunks...")
        
        for i, chunk in enumerate(tqdm(chunks, desc=f"  {chapter.title[:30]}", leave=False)):
            try:
                audio, sr = self._generate_audio(chunk)
                audio_parts.append(audio)
            except Exception as e:
                print(f"  Warning: Chunk {i+1} failed: {e}")
        
        # Add pause after chapter
        if audio_parts:
            audio_parts.append(self._create_silence(self.chapter_pause, sr))
        
        # Concatenate
        if not audio_parts:
            raise RuntimeError(f"No audio generated for chapter: {chapter.title}")
        
        full_audio = np.concatenate(audio_parts)
        
        return ChapterAudio(
            id=chapter.id,
            title=chapter.title,
            audio=full_audio,
            sample_rate=sr,
            duration=len(full_audio) / sr
        )
    
    def convert_epub(
        self, 
        epub_path: str,
        include_ids: List[str] = None,
        exclude_ids: List[str] = None
    ) -> str:
        """Convert EPUB to audiobook with chapters."""
        book_name = Path(epub_path).stem
        
        # Parse EPUB
        print(f"\nParsing EPUB: {Path(epub_path).name}")
        parser = EPUBParser(epub_path).parse()
        
        print(f"Title: {parser.metadata.get('title', 'Unknown')}")
        print(f"Author: {parser.metadata.get('author', 'Unknown')}")
        print(f"Cover: {'Found' if parser.cover_path else 'Not found'}")
        
        # Get chapters with filtering
        include = include_ids or self.DEFAULT_INCLUDE
        exclude = exclude_ids or self.DEFAULT_EXCLUDE
        
        chapters = parser.get_chapters(include_ids=include, exclude_ids=exclude)
        
        if not chapters:
            # Fall back to getting all content chapters
            print("No chapters matched filters, using all content...")
            chapters = parser.get_chapters(exclude_ids=exclude)
        
        print(f"\nChapters to convert: {len(chapters)}")
        for i, ch in enumerate(chapters):
            word_count = len(ch.content.split())
            print(f"  {i+1}. {ch.title} ({word_count:,} words)")
        
        total_words = sum(len(ch.content.split()) for ch in chapters)
        print(f"\nTotal: {total_words:,} words")
        
        # Generate audio for each chapter
        print("\n" + "="*50)
        print("GENERATING AUDIO")
        print("="*50)
        
        chapter_audios = []
        current_time = 0.0
        
        for i, chapter in enumerate(chapters):
            print(f"\n[{i+1}/{len(chapters)}] {chapter.title}")
            
            try:
                ch_audio = self._generate_chapter_audio(chapter)
                ch_audio.start_time = current_time
                current_time += ch_audio.duration
                chapter_audios.append(ch_audio)
                
                print(f"  Duration: {ch_audio.duration/60:.1f} minutes")
            except Exception as e:
                print(f"  ERROR: {e}")
        
        if not chapter_audios:
            raise RuntimeError("No audio generated")
        
        # Save output
        print("\n" + "="*50)
        print("SAVING OUTPUT")
        print("="*50)
        
        output_path = self._save_audiobook(
            chapter_audios,
            book_name,
            parser.metadata,
            parser.cover_path
        )
        
        total_duration = sum(ca.duration for ca in chapter_audios)
        print(f"\nTotal duration: {total_duration/60:.1f} minutes")
        print(f"Output: {output_path}")
        
        return output_path
    
    def _save_audiobook(
        self,
        chapter_audios: List[ChapterAudio],
        book_name: str,
        metadata: Dict,
        cover_path: str = None
    ) -> str:
        """Save audiobook with chapter markers."""
        
        # Concatenate all audio
        all_audio = np.concatenate([ca.audio for ca in chapter_audios])
        sample_rate = chapter_audios[0].sample_rate
        
        # Save as WAV first
        wav_path = str(OUTPUT_DIR / f"{book_name}_temp.wav")
        sf.write(wav_path, all_audio, sample_rate)
        
        if self.output_format == 'wav':
            output_path = str(OUTPUT_DIR / f"{book_name}.wav")
            os.rename(wav_path, output_path)
            return output_path
        
        # Create M4B with chapters
        output_path = str(OUTPUT_DIR / f"{book_name}.m4b")
        
        try:
            # Create metadata file
            metadata_path = self._create_ffmetadata(chapter_audios, metadata)
            
            # Build ffmpeg command
            cmd = [
                "ffmpeg", "-y",
                "-i", wav_path,
                "-i", metadata_path,
                "-map", "0:a",
                "-map_metadata", "1",
                "-c:a", "aac",
                "-b:a", "128k",
            ]
            
            # Add cover if available
            if cover_path and os.path.exists(cover_path):
                cmd.extend([
                    "-i", cover_path,
                    "-map", "2:v",
                    "-c:v", "mjpeg",
                    "-disposition:v:0", "attached_pic"
                ])
            
            cmd.append(output_path)
            
            print("Creating M4B with chapter markers...")
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode != 0:
                print(f"FFmpeg warning: {result.stderr[:500] if result.stderr else 'Unknown'}")
                # Fall back to WAV
                output_path = str(OUTPUT_DIR / f"{book_name}.wav")
                os.rename(wav_path, output_path)
                print(f"Saved as WAV instead: {output_path}")
            else:
                # Clean up
                os.remove(wav_path)
                os.remove(metadata_path)
                print(f"M4B created with {len(chapter_audios)} chapters")
        
        except FileNotFoundError:
            print("FFmpeg not found - saving as WAV")
            output_path = str(OUTPUT_DIR / f"{book_name}.wav")
            os.rename(wav_path, output_path)
        
        return output_path
    
    def _create_ffmetadata(self, chapter_audios: List[ChapterAudio], metadata: Dict) -> str:
        """Create FFMETADATA file for chapter markers."""
        lines = [";FFMETADATA1"]
        lines.append(f"title={metadata.get('title', 'Audiobook')}")
        lines.append(f"artist={metadata.get('author', 'Unknown')}")
        lines.append(f"album={metadata.get('title', 'Audiobook')}")
        lines.append("")
        
        for ca in chapter_audios:
            start_ms = int(ca.start_time * 1000)
            end_ms = int((ca.start_time + ca.duration) * 1000)
            
            lines.append("[CHAPTER]")
            lines.append("TIMEBASE=1/1000")
            lines.append(f"START={start_ms}")
            lines.append(f"END={end_ms}")
            lines.append(f"title={ca.title}")
            lines.append("")
        
        metadata_path = str(OUTPUT_DIR / "ffmetadata.txt")
        with open(metadata_path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(lines))
        
        return metadata_path
    
    # Legacy method for non-EPUB files
    def convert(self, book_path: str, chunk_size_words: int = 1500) -> str:
        """Convert PDF/TXT to audiobook (legacy flat conversion)."""
        ext = Path(book_path).suffix.lower()
        
        if ext == '.epub':
            return self.convert_epub(book_path)
        
        book_name = Path(book_path).stem
        output_path = str(OUTPUT_DIR / f"{book_name}.wav")
        
        print(f"\nExtracting text from {Path(book_path).name}...")
        text = extract_text(book_path)
        word_count = len(text.split())
        print(f"Extracted {word_count:,} words")
        
        chunks = split_into_chunks(text, chunk_size_words)
        print(f"Split into {len(chunks)} chunks\n")
        
        audio_segments = []
        sample_rate = None
        
        for i, chunk in enumerate(tqdm(chunks, desc="Generating audio")):
            try:
                audio, sr = self._generate_audio(chunk)
                audio_segments.append(audio)
                sample_rate = sr
            except Exception as e:
                print(f"\nWarning: Chunk {i+1} failed: {e}")
        
        if not audio_segments:
            raise RuntimeError("No audio generated")
        
        print(f"\nCombining {len(audio_segments)} segments...")
        combined = np.concatenate(audio_segments)
        sf.write(output_path, combined, sample_rate)
        
        duration_min = len(combined) / sample_rate / 60
        print(f"\nSaved: {output_path}")
        print(f"Duration: {duration_min:.1f} minutes")
        
        return output_path


print("AudiobookConverter ready!")

---

# Convert EPUB to Voice-Cloned Audiobook

**Features:**
- Chapter-aware extraction with proper reading order (OPF spine)
- Chapter titles from NCX navigation
- Clean text processing (removes footnotes, fixes formatting)
- M4B output with embedded chapter markers
- Cover art embedding (from EPUB)
- Configurable content inclusion/exclusion

**Upload your files:**
1. **Voice sample** (WAV/MP3/FLAC) - 5-30 seconds of clear speech
2. **Transcript** (optional .txt) - exact words in the voice sample
3. **Book file** (EPUB recommended, also PDF/TXT)

---

In [None]:
# Step 1: Upload voice sample
print("="*50)
print("STEP 1: VOICE SAMPLE")
print("="*50)

voice_audio_path = uploader.upload(
    "Upload your voice sample (WAV, MP3, or FLAC):",
    accept=".wav,.mp3,.flac"
)

if voice_audio_path:
    print(f"\nVoice sample: {voice_audio_path}")
    print("\nPreview:")
    play_audio(voice_audio_path)

In [None]:
# Step 2: Upload transcript (optional)
print("="*50)
print("STEP 2: TRANSCRIPT (OPTIONAL)")
print("="*50)
print("\nProviding a transcript improves voice cloning quality.")
print("If skipped, Whisper will auto-transcribe.\n")

transcript_text = None
transcript_path = uploader.upload(
    "Upload transcript file (.txt):",
    accept=".txt",
    optional=True
)

if transcript_path:
    transcript_text = read_text_file(transcript_path)
    print(f"\nTranscript loaded: {transcript_text}")
else:
    print("\nNo transcript provided - will auto-transcribe.")

In [None]:
# Step 3: Upload book
print("="*50)
print("STEP 3: BOOK FILE")
print("="*50)

book_path = uploader.upload(
    "Upload your book (EPUB, PDF, or TXT):",
    accept=".epub,.pdf,.txt"
)

if book_path:
    print(f"\nBook: {book_path}")
    print(f"Format: {Path(book_path).suffix.upper()}")

In [None]:
# Step 4: Configure and Convert!
if voice_audio_path and book_path:
    print("="*50)
    print("CONFIGURATION")
    print("="*50)
    
    # ============================================================
    # CONVERSION SETTINGS - Modify these as needed
    # ============================================================
    
    config = {
        # Chapter announcements (TTS reads chapter title before content)
        'announce_chapters': True,
        
        # Pause duration between chapters (seconds)
        'chapter_pause': 2.5,
        
        # Words per TTS chunk (larger = fewer API calls, smaller = better for long content)
        'chunk_size': 1500,
        
        # Output format: 'm4b' (with chapters) or 'wav' (lossless)
        'output_format': 'm4b',
    }
    
    # Content to INCLUDE (for EPUBs) - set to None to use defaults
    # Default includes: foreword, introduction, chapter01-15
    include_content = None  # Or specify: ['foreword', 'introduction', 'chapter01', ...]
    
    # Content to EXCLUDE (for EPUBs) - set to None to use defaults
    # Default excludes: TOC, copyright, notes, index, maps, etc.
    exclude_content = None  # Or specify: ['notes', 'index', 'bibliography', ...]
    
    # ============================================================
    
    print(f"Announce chapters: {config['announce_chapters']}")
    print(f"Chapter pause: {config['chapter_pause']}s")
    print(f"Output format: {config['output_format'].upper()}")
    
    print("\n" + "="*50)
    print("CONVERTING TO AUDIOBOOK")
    print("="*50)
    
    converter = AudiobookConverter(
        tts_model=tts_model,
        whisper_model=whisper_model,
        ref_audio=voice_audio_path,
        ref_text=transcript_text,
        config=config
    )
    
    # Use appropriate conversion method
    ext = Path(book_path).suffix.lower()
    if ext == '.epub':
        output_file = converter.convert_epub(
            book_path,
            include_ids=include_content,
            exclude_ids=exclude_content
        )
    else:
        output_file = converter.convert(book_path)
    
    print("\n" + "="*50)
    print("DONE!")
    print("="*50)
    print("\nPreview (first 30 seconds):")
    play_audio(output_file)
else:
    print("Please complete Steps 1 and 3 above first.")

In [None]:
# Step 5: Download
if 'output_file' in dir() and output_file and os.path.exists(output_file):
    file_size_mb = os.path.getsize(output_file) / (1024 * 1024)
    print(f"Output file: {output_file}")
    print(f"File size: {file_size_mb:.1f} MB")
    
    if output_file.endswith('.m4b'):
        print("\nM4B files can be played in:")
        print("  - Apple Books / iTunes")
        print("  - VLC Media Player")
        print("  - Most podcast/audiobook apps")
        print("  - Supports chapter navigation!")
    
    print(f"\nDownloading: {Path(output_file).name}")
    download_file(output_file)
else:
    print("Run Step 4 first.")

---

## Cleanup (Optional)

In [None]:
import gc
if torch.cuda.is_available():
    torch.cuda.empty_cache()
gc.collect()
print("Memory cleared")