# TTS v5 - Local Version (Kokoro v1.0)
- Adapted for running locally with Kokoro TTS v1.0
- Features 54 voices across 8 languages
- Includes sentence tracking and timeline manifest generation
- Released January 2025

## 0) Environment Setup (Optional)

**This step helps you manage Python packages and avoid conflicts with your system installation.**

- If you have **conda** installed, you can create a fresh environment for this notebook
- Or use an existing environment by providing its name
- At the end of the notebook, you can easily clean up and delete the environment to free storage

In [1]:
import subprocess
import sys
import os

# Flag to track if we created an environment in this notebook
environment_created_by_notebook = False
environment_name = None

# Check if conda is installed
try:
    result = subprocess.run(['conda', '--version'], capture_output=True, text=True, check=True)
    conda_available = True
    print(f"✓ Conda detected: {result.stdout.strip()}")
except (subprocess.CalledProcessError, FileNotFoundError):
    conda_available = False
    print("✗ Conda not found - skipping environment management")
    print("Packages will be installed in your current Python environment")

if conda_available:
    print("\n" + "="*60)
    print("ENVIRONMENT SETUP OPTIONS")
    print("="*60)
    
    choice = input("\nDo you want to:\n  [1] Create a NEW conda environment (recommended)\n  [2] Use an EXISTING environment\n  [3] Skip and use current environment\n\nEnter choice (1/2/3): ").strip()
    
    if choice == "1":
        # Create new environment
        env_name = input("\nEnter name for new environment (default: kokoro_tts): ").strip()
        if not env_name:
            env_name = "kokoro_tts"
        
        print(f"\n→ Creating conda environment: {env_name}")
        print("  This may take a few minutes...")
        
        try:
            # Create environment with Python 3.10
            subprocess.run(['conda', 'create', '-n', env_name, 'python=3.10', '-y'], 
                          check=True, capture_output=True)
            
            environment_created_by_notebook = True
            environment_name = env_name
            
            print(f"✓ Environment '{env_name}' created successfully!")
            print(f"\n{'='*60}")
            print("IMPORTANT: Restart your Jupyter kernel and select the new environment:")
            print(f"  Kernel → Change Kernel → {env_name}")
            print(f"{'='*60}\n")
            
        except subprocess.CalledProcessError as e:
            print(f"✗ Failed to create environment: {e}")
            print("Continuing with current environment...")
    
    elif choice == "2":
        # Use existing environment
        env_name = input("\nEnter name of existing environment: ").strip()
        if env_name:
            environment_name = env_name
            print(f"\n✓ Using existing environment: {env_name}")
            print(f"\n{'='*60}")
            print("IMPORTANT: Make sure your kernel is using this environment:")
            print(f"  Kernel → Change Kernel → {env_name}")
            print(f"{'='*60}\n")
        else:
            print("✗ No environment name provided - using current environment")
    
    else:
        print("\n✓ Using current environment")

print("\nYou can now proceed with the rest of the notebook.")

✓ Conda detected: conda 25.7.0

ENVIRONMENT SETUP OPTIONS

✓ Using current environment

You can now proceed with the rest of the notebook.


## 1) Install Dependencies

In [2]:
# Core TTS + I/O deps
!pip install "kokoro>=1.0" soundfile misaki[en] pypdf ebooklib pydub

# Advanced PDF extraction
!pip install "unstructured[local-inference]"
!pip install "detectron2@git+https://github.com/facebookresearch/detectron2.git@v0.6#egg=detectron2"

# Note: ffmpeg should be installed on your system for MP3 encoding
# Linux: sudo apt-get install ffmpeg
# macOS: brew install ffmpeg
# Windows: Download from https://ffmpeg.org/

# Silence overly chatty logs
import logging
logging.getLogger("phonemizer").setLevel(logging.ERROR)
logging.getLogger("unstructured").setLevel(logging.ERROR)
logging.getLogger("pypdf").setLevel(logging.CRITICAL)

zsh:1: no matches found: misaki[en]
Collecting detectron2@ git+https://github.com/facebookresearch/detectron2.git@v0.6#egg=detectron2
  Cloning https://github.com/facebookresearch/detectron2.git (to revision v0.6) to /private/var/folders/4t/cg7byznj6zz9d4cdtnn5rvy40000gn/T/pip-install-doef3e7w/detectron2_80b2a62340b9474b8b1199a4d59e7e91
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/detectron2.git /private/var/folders/4t/cg7byznj6zz9d4cdtnn5rvy40000gn/T/pip-install-doef3e7w/detectron2_80b2a62340b9474b8b1199a4d59e7e91
  Running command git checkout -q d1e04565d3bec8719335b88be9e9b961bf3ec464
  Resolved https://github.com/facebookresearch/detectron2.git to commit d1e04565d3bec8719335b88be9e9b961bf3ec464
  Preparing metadata (setup.py) ... [?25ldone


## 2) Configuration and Setup

In [3]:
import os
from pathlib import Path

# --- Output directory setup ---
OUTPUT_DIR = Path(".")  # Use current directory (same as notebook location)
print(f"Output directory: {OUTPUT_DIR.resolve()}")

# --- Device selection ---
# DEVICE_MODE: "auto" (default), "cuda", or "cpu"
DEVICE_MODE = "auto"

import torch
def _pick_device():
    if DEVICE_MODE == "cuda":
        return "cuda"
    if DEVICE_MODE == "cpu":
        return "cpu"
    return "cuda" if torch.cuda.is_available() else "cpu"

DEVICE = _pick_device()
print(f"Using device: {DEVICE}")

Output directory: /Users/simon/Documents/GitHub/ttsweb.github.io
Using device: cpu


## 3) Helper Functions (PDF/EPUB extraction & TTS synthesis)

In [4]:
import numpy as np
import soundfile as sf
import re, io, zipfile, torch
from pathlib import Path
from typing import List, Tuple, Dict, Union
from functools import lru_cache

from pypdf import PdfReader
from ebooklib import epub
from kokoro import KPipeline
from pydub import AudioSegment

# Imports for advanced PDF extraction
from unstructured.partition.auto import partition

# Sentence-ish split; keeps chunks small (avoids 510-phoneme truncation)
SPLIT_PATTERN = r"[.?!]\s+|[\n]{2,}"
SPLIT_PATTERN_CAP = r"([.?!]\s+|[\n]{2,})"


# --- PDF Extraction using unstructured.io ---
def extract_text_from_pdf(file_like: io.BytesIO) -> List[Dict]:
    """Extract text from PDF using unstructured.io with layout analysis."""
    print("Parsing PDF with layout analysis (strategy='hi_res')...")
    try:
        partitioned_elements = partition(file=file_like, strategy="hi_res", content_type="application/pdf", include_page_breaks=True)
        print(f"Unstructured 'hi_res' returned {len(partitioned_elements)} raw elements.")
    except Exception as e:
        print(f"Unstructured 'hi_res' strategy failed: {e}. Falling back to 'fast'.")
        try:
            file_like.seek(0)
            partitioned_elements = partition(file=file_like, strategy="fast", content_type="application/pdf", include_page_breaks=True)
            print(f"Unstructured 'fast' returned {len(partitioned_elements)} raw elements.")
        except Exception as e2:
            print(f"Unstructured 'fast' strategy also failed: {e2}.")
            return [{"text": "Error: Unstructured parsing failed.", "metadata": {"page_number": 1, "points": None}}]

    element_list = []
    current_page = 1
    print("\n--- Processing elements (checking for points) ---")

    for i, el in enumerate(partitioned_elements):
        meta_dict = el.metadata.to_dict()

        page_num_meta = meta_dict.get("page_number")
        if page_num_meta is not None:
             current_page = page_num_meta

        # Extract coordinate points if available
        points = None
        coords_meta = meta_dict.get("coordinates")
        if coords_meta:
            points = coords_meta.get("points")

        location_data = {
            "page_number": current_page,
            "points": points
        }

        element_text = str(el).strip()
        if element_text:
            element_list.append({
                "text": element_text,
                "metadata": location_data
            })

    print("--- Finished processing elements ---")
    print(f"Unstructured: Found {len(element_list)} text elements.")
    if not element_list:
         return [{"text": "Warning: Unstructured found no text elements.", "metadata": {"page_number": 1, "points": None}}]
    return element_list


# --- EPUB Extraction ---
def extract_chapters_from_epub(file_like: io.BytesIO):
    bk = epub.read_epub(file_like)
    chapters = []
    for item in bk.get_items_of_type(epub.ITEM_DOCUMENT):
        if getattr(item, "is_nav", False): continue
        html = item.get_content().decode("utf-8", errors="ignore")
        text = re.sub(r"<(script|style).*?>.*?</\1>", " ", html, flags=re.S|re.I)
        text = re.sub(r"<br\s*/?>", "\n", text, flags=re.I)
        text = re.sub(r"</p>|</div>|</h\d>", "\n\n", text, flags=re.I)
        text = re.sub(r"<[^>]+>", " ", text)
        text = re.sub(r"[ \t]+", " ", text)
        text = re.sub(r"\n{3,}", "\n\n", text).strip()
        if text:
            title = Path(item.file_name).stem
            first = text.splitlines()[0] if text else ""; m = re.match(r"(?i)\s*(chapter|part|book)\b[^\n]{0,80}", first)
            if m: title = first[:60]
            chapters.append((title, text))
    if not chapters:
        blobs = [];
        for item in bk.get_items_of_type(epub.ITEM_DOCUMENT):
             if getattr(item, "is_nav", False): continue
             blobs.append(item.get_content().decode("utf-8", errors="ignore"))
        html = " ".join(blobs)
        text = re.sub(r"<(script|style).*?>.*?</\1>", " ", html, flags=re.S|re.I)
        text = re.sub(r"<br\s*/?>", "\n", text, flags=re.I)
        text = re.sub(r"</p>|</div>|</h\d>", "\n\n", text, flags=re.I)
        text = re.sub(r"<[^>]+>", " ", text)
        text = re.sub(r"[ \t]+", " ", text)
        text = re.sub(r"\n{3,}", "\n\n", text).strip()
        if text: chapters = [("Chapter 1", text)]
    return chapters

def safe_name(s: str) -> str:
    s = re.sub(r"[^\w\-]+", "_", s).strip("_"); return s or "chapter"

# --- Pipeline cache ---
@lru_cache(maxsize=4)
def get_pipeline(lang_code='a', device=DEVICE):
    return KPipeline(lang_code=lang_code, device=device)

def _synthesize_sentence(pipe: KPipeline, sentence: str, voice='af_heart', speed=1.0) -> np.ndarray:
    subchunks = [];
    for _, _, audio in pipe(sentence, voice=voice, speed=speed, split_pattern=None): subchunks.append(audio)
    if not subchunks: return np.zeros((0,), dtype=np.float32)
    return np.concatenate(subchunks, axis=0)

def split_sentences_keep_delim(text: str) -> List[str]:
    parts = re.split(SPLIT_PATTERN_CAP, text); sents = []
    for i in range(0, len(parts), 2):
        chunk = (parts[i] or "").strip(); sep = parts[i+1] if i+1 < len(parts) else ""
        if not chunk: continue
        if sep and not sep.isspace(): chunk = (chunk + " " + sep.strip()).strip()
        sents.append(chunk)
    return sents

# --- Synthesizer ---
def synth_text_to_wav_and_manifest(
    text_or_elements: Union[str, List[Dict]],
    voice='af_heart',
    speed=1.0,
    lang_code='a',
    device=DEVICE) -> Tuple[bytes, Dict]:
    pipe = get_pipeline(lang_code=lang_code, device=device)
    sr = 24000

    if isinstance(text_or_elements, str):
        elements = [{"text": text_or_elements, "metadata": {"page_number": 1, "points": None}}]
    else:
        elements = text_or_elements

    pcm_all = []; timeline = []; t = 0.0; sentence_index = 0
    print(f"Synthesizing {len(elements)} text elements...")

    for element in elements:
        element_text = element.get("text", "")
        element_meta = element.get("metadata", {})

        sentences = split_sentences_keep_delim(element_text)

        for sent in sentences:
            if not sent: continue
            pcm = _synthesize_sentence(pipe, sent, voice=voice, speed=speed)
            dur = pcm.shape[0] / sr
            timeline.append({
                "i": sentence_index,
                "start": round(t, 3),
                "end": round(t + dur, 3),
                "text": sent.strip(),
                "location": element_meta
            })
            pcm_all.append(pcm); t += dur; sentence_index += 1

    pcm_cat = np.concatenate(pcm_all, axis=0) if pcm_all else np.zeros((sr//10,), dtype=np.float32)
    buf = io.BytesIO(); sf.write(buf, pcm_cat, sr, format='WAV'); buf.seek(0)
    manifest = {"audioUrl": "", "sentences": timeline}
    return buf.read(), manifest

def wav_to_mp3_bytes(wav_bytes: bytes, bitrate="128k") -> bytes:
    audio = AudioSegment.from_file(io.BytesIO(wav_bytes), format="wav"); out = io.BytesIO()
    audio.export(out, format="mp3", bitrate=bitrate); out.seek(0); return out.read()

## 4) High-Level Synthesis Wrappers

In [5]:
def synth_string(text: str,
                 voice="af_heart",
                 speed=1.0,
                 out_format="wav",
                 lang_code="a",
                 device=None,
                 basename="kokoro_text",
                 output_dir=None):
    device = device or DEVICE
    output_dir = Path(output_dir) if output_dir else OUTPUT_DIR

    elements = [{
        "text": text,
        "metadata": {"page_number": 1, "source": "string", "coordinates": None}
    }]

    wav_bytes, manifest = synth_text_to_wav_and_manifest(
        elements,
        voice=voice, speed=speed, lang_code=lang_code, device=device
    )

    out_base = output_dir / basename

    if out_format.lower() == "mp3":
        mp3 = wav_to_mp3_bytes(wav_bytes)
        audio_path = str(out_base) + ".mp3"
        with open(audio_path, "wb") as f: f.write(mp3)
    else:
        audio_path = str(out_base) + ".wav"
        with open(audio_path, "wb") as f: f.write(wav_bytes)

    manifest_path = str(out_base) + "_manifest.json"
    manifest["audioUrl"] = Path(audio_path).name
    with open(manifest_path, "w", encoding="utf-8") as f:
        import json; json.dump(manifest, f, ensure_ascii=False, indent=2)

    return audio_path, manifest_path

def synth_pdf(file_path_or_bytes,
              voice="af_heart",
              speed=1.0,
              out_format="wav",
              lang_code="a",
              device=None,
              basename=None,
              output_dir=None):
    device = device or DEVICE
    output_dir = Path(output_dir) if output_dir else OUTPUT_DIR
    
    if isinstance(file_path_or_bytes, (str, Path)):
        with open(file_path_or_bytes, "rb") as fh:
            pdf_bytes = io.BytesIO(fh.read())
        stem = Path(file_path_or_bytes).stem
    else:
        pdf_bytes = file_path_or_bytes
        stem = basename or "document"

    elements = extract_text_from_pdf(pdf_bytes)

    wav_bytes, manifest = synth_text_to_wav_and_manifest(
        elements,
        voice=voice, speed=speed, lang_code=lang_code, device=device
    )

    out_base = output_dir / f"{(basename or stem)}_tts"

    if out_format.lower() == "mp3":
        mp3 = wav_to_mp3_bytes(wav_bytes)
        audio_path = str(out_base) + ".mp3"
        with open(audio_path, "wb") as f: f.write(mp3)
    else:
        audio_path = str(out_base) + ".wav"
        with open(audio_path, "wb") as f: f.write(wav_bytes)

    manifest_path = str(out_base) + "_manifest.json"
    manifest["audioUrl"] = Path(audio_path).name
    with open(manifest_path, "w", encoding="utf-8") as f:
        import json; json.dump(manifest, f, ensure_ascii=False, indent=2)

    return audio_path, manifest_path

def synth_epub(file_path_or_bytes,
               voice="af_heart",
               speed=1.0,
               per_chapter_format="wav",
               lang_code="a",
               device=None,
               zip_name=None,
               output_dir=None):
    device = device or DEVICE
    output_dir = Path(output_dir) if output_dir else OUTPUT_DIR

    if isinstance(file_path_or_bytes, (str, Path)):
        with open(file_path_or_bytes, "rb") as fh:
            epub_bytes = io.BytesIO(fh.read())
        stem = Path(file_path_or_bytes).stem
    else:
        epub_bytes = file_path_or_bytes
        stem = "book"

    chapters = extract_chapters_from_epub(epub_bytes)
    assert chapters, "No chapters detected in EPUB."

    zip_buf = io.BytesIO()
    with zipfile.ZipFile(zip_buf, "w", zipfile.ZIP_DEFLATED) as zf:
        for idx, (title, body) in enumerate(chapters, 1):
            name = f"{idx:02d}_{safe_name(title)[:40]}"

            chapter_elements = [{
                "text": body,
                "metadata": {
                    "chapter_index": idx,
                    "chapter_title": title,
                    "page_number": 1,
                    "coordinates": None
                }
            }]

            wav_bytes, manifest = synth_text_to_wav_and_manifest(
                chapter_elements,
                voice=voice, speed=speed, lang_code=lang_code, device=device
            )

            if per_chapter_format.lower() == "mp3":
                data = wav_to_mp3_bytes(wav_bytes)
                audio_name = f"{name}.mp3"
                zf.writestr(audio_name, data)
            else:
                audio_name = f"{name}.wav"
                zf.writestr(audio_name, wav_bytes)

            manifest["audioUrl"] = audio_name
            import json
            zf.writestr(f"{name}_manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))

    zip_buf.seek(0)
    zpath = str(output_dir / f"{zip_name or (stem + '_chapters')}.zip")
    with open(zpath, "wb") as f:
        f.write(zip_buf.read())
    return zpath

## Usage Examples

Below are examples for synthesizing text, PDFs, and EPUBs locally.

### A) String → Audio

In [None]:
# Configuration
VOICE = "af_heart"
SPEED = 1.0
FORMAT = "mp3"  # "wav" or "mp3"
LANG = "a"
BASENAME = "kokoro_text"

# Text to synthesize
TEXT = """Paste or type your text here.
It can be multiple paragraphs. Chapters aren't needed for this path.
"""

# Run synthesis
audio_path, manifest_path = synth_string(
    TEXT, 
    voice=VOICE, 
    speed=SPEED,
    out_format=FORMAT, 
    lang_code=LANG,
    basename=BASENAME
)

print(f"Audio saved to: {audio_path}")
print(f"Manifest saved to: {manifest_path}")

### B) PDF → Audio (with manifest)

In [6]:
# Configuration
VOICE = "af_heart"
SPEED = 1.0
FORMAT = "mp3"  # "wav" or "mp3"
LANG = "a"

# Specify the path to your PDF file (relative to notebook location)
PDF_PATH = "Case1Writeup.pdf"  # Change this to your PDF filename

# Run synthesis
audio_path, manifest_path = synth_pdf(
    PDF_PATH, 
    voice=VOICE, 
    speed=SPEED,
    out_format=FORMAT, 
    lang_code=LANG
)

print(f"Audio saved to: {audio_path}")
print(f"Manifest saved to: {manifest_path}")

Parsing PDF with layout analysis (strategy='hi_res')...


yolox_l0.05.onnx:   0%|          | 0.00/217M [00:00<?, ?B/s]

Unstructured 'hi_res' strategy failed: Unable to get page count. Is poppler installed and in PATH?. Falling back to 'fast'.
Unstructured 'fast' returned 16 raw elements.

--- Processing elements (checking for points) ---
--- Finished processing elements ---
Unstructured: Found 14 text elements.


config.json: 0.00B [00:00, ?B/s]

  WeightNorm.apply(module, name, dim)


kokoro-v1_0.pth:   0%|          | 0.00/327M [00:00<?, ?B/s]

Synthesizing 14 text elements...


voices/af_heart.pt:   0%|          | 0.00/523k [00:00<?, ?B/s]

Audio saved to: Case1Writeup_tts.mp3
Manifest saved to: Case1Writeup_tts_manifest.json


### C) EPUB → ZIP (Per-Chapter Audio + Manifests)

In [None]:
# Configuration
VOICE = "af_heart"
SPEED = 1.0
CHAPTER_FORMAT = "wav"  # "wav" or "mp3"
LANG = "a"
ZIP_NAME = ""  # Optional: custom name for the output ZIP file

# Specify the path to your EPUB file (relative to notebook location)
EPUB_PATH = "book.epub"  # Change this to your EPUB filename

# Run synthesis
zip_path = synth_epub(
    EPUB_PATH, 
    voice=VOICE, 
    speed=SPEED,
    per_chapter_format=CHAPTER_FORMAT,
    lang_code=LANG,
    zip_name=(ZIP_NAME or None)
)

print(f"ZIP archive saved to: {zip_path}")

## Notes

- **Output Directory**: By default, all outputs are saved to the same directory as the notebook. You can change this by modifying `OUTPUT_DIR` in the Configuration cell.
- **Input Files**: Place your PDF/EPUB files in the same directory as the notebook, or provide relative/absolute paths.
- **Device Selection**: The notebook will automatically use CUDA if available, otherwise CPU. You can override this by setting `DEVICE_MODE` in the Configuration cell.
- **Voice Options**: Kokoro v1.0 includes 54 voices across 8 languages:
  - **US Female**: af_alloy, af_aoede, af_bella, af_heart, af_jessica, af_kore, af_nicole, af_nova, af_river, af_sarah, af_sky
  - **US Male**: am_adam, am_echo, am_eric, am_fenrir, am_liam, am_michael, am_onyx, am_puck
  - **British Female**: bf_alice, bf_emma, bf_isabella, bf_lily
  - **British Male**: bm_daniel, bm_fable, bm_george, bm_lewis
  - **Additional languages**: French, Japanese, Korean, Chinese (and more)
  - **Voice Blending**: You can blend voice embeddings to create personalized voices
- **PDF Extraction**: The notebook uses `unstructured.io` for advanced PDF extraction with layout analysis. This may take longer but provides better results.
- **Manifest Files**: Each audio output includes a JSON manifest file with sentence-level timing information and metadata.

## Cleanup: Delete Environment (Optional)

**If you created a new environment at the beginning of this notebook**, you can delete it here to free up storage space.

⚠️ **Warning**: This will permanently delete the environment and all installed packages!

In [None]:
import subprocess

# Check if we created an environment in this notebook
if 'environment_created_by_notebook' not in globals():
    print("✗ No environment tracking found")
    print("This cell only works if you ran the environment setup cell at the beginning")
elif not environment_created_by_notebook:
    print("✗ No environment was created by this notebook")
    print("You can only delete environments that were created in this session")
else:
    print(f"Environment '{environment_name}' was created by this notebook")
    print(f"\n{'='*60}")
    print("DELETE ENVIRONMENT")
    print(f"{'='*60}")
    
    confirm = input(f"\nAre you sure you want to DELETE '{environment_name}'?\nType 'yes' to confirm: ").strip().lower()
    
    if confirm == 'yes':
        print(f"\n→ Deleting environment '{environment_name}'...")
        print("  This may take a moment...")
        
        try:
            subprocess.run(['conda', 'env', 'remove', '-n', environment_name, '-y'], 
                          check=True, capture_output=True)
            print(f"✓ Environment '{environment_name}' deleted successfully!")
            print("  Storage space has been freed.")
            
            # Reset the flag
            environment_created_by_notebook = False
            environment_name = None
            
        except subprocess.CalledProcessError as e:
            print(f"✗ Failed to delete environment: {e}")
            print("You may need to delete it manually with: conda env remove -n {environment_name}")
    else:
        print("\n✗ Deletion cancelled - environment preserved")