# Tourist Guide TTS Workflow

This notebook implements a Text-to-Speech (TTS) workflow specifically designed for creating tourist guide audio recordings. Unlike podcast-style content, tourist guides require:

- **Clear, informative narration** with appropriate pacing
- **Consistent voice and tone** across multiple locations
- **Structured content** for location descriptions, historical facts, and practical information
- **Professional audio quality** suitable for mobile apps and tourism platforms

## Workflow Overview:
1. Install and configure TTS models
2. Structure tourist guide content
3. Generate audio for individual locations
4. Process multiple locations in batch
5. Apply audio enhancements
6. Export with proper metadata

## 1. Install and Import Required Libraries

We'll use modern TTS models that are compatible with our Python environment and suitable for clear, professional narration.

In [2]:
# Install required packages (run once)
# Note: Some packages may need specific Python versions
# !pip install torch transformers pydub scipy numpy tqdm IPython
# !pip install TTS  # Coqui TTS - alternative to parler-tts
# !pip install gTTS  # Google Text-to-Speech as fallback option

In [None]:
# Import core libraries
import torch
import numpy as np
import json
import os
from pathlib import Path
from tqdm import tqdm
from datetime import datetime
import psutil  # For system memory monitoring

# Audio processing
from pydub import AudioSegment
from pydub.effects import normalize, compress_dynamic_range
from pydub.utils import which
import io

# TTS models (we'll implement multiple options)
from transformers import AutoProcessor
from IPython.display import Audio, display
import IPython.display as ipd

print("Libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"MPS available: {torch.backends.mps.is_available() if hasattr(torch.backends, 'mps') else False}")

# ffmpeg check for pydub MP3 decode/encode support
if not which("ffmpeg"):
    print("⚠️ ffmpeg not found. MP3 decoding/encoding/export will not work. On macOS install with: brew install ffmpeg")

# Device configuration with support for CUDA, MPS (Apple Silicon), and CPU
def get_optimal_device():
    """Automatically detect and return the best available device"""
    if torch.cuda.is_available():
        device = "cuda"
        device_name = torch.cuda.get_device_name(0)
        print(f"🚀 Using NVIDIA GPU: {device_name}")
    elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
        device = "mps"
        print(f"🍎 Using Apple Silicon GPU (Metal Performance Shaders)")
    else:
        device = "cpu"
        print(f"💻 Using CPU (consider using GPU for faster processing)")
    
    return device

# Reproducibility helper
def set_global_seed(seed: int = 42):
    """Set global random seed for reproducibility across CPU/MPS/CUDA."""
    try:
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
        # No special API for MPS; torch.manual_seed covers it
        print(f"🔑 Global seed set to {seed}")
    except Exception as e:
        print(f"⚠️ Failed to set global seed: {e}")

Libraries imported successfully!
PyTorch version: 2.8.0
CUDA available: False
MPS available: True


## 2. Load and Configure TTS Models

We'll set up TTS models optimized for clear, engaging narration suitable for tourist guides. We'll implement multiple model options for flexibility.

In [None]:
# Configure device for TTS processing
device = get_optimal_device()
print(f"Selected device: {device}")

# Set reproducibility seed
set_global_seed(42)

# Model configuration for tourist guides
TTS_CONFIG = {
    "voice_style": "professional_guide",  # Clear, informative, friendly
    "speaking_rate": "moderate",          # Not too fast, easy to follow
    "emphasis": "educational",           # Suitable for learning content
    "language": "en",                    # Can be extended for multilingual guides
    "device": device,                    # Store device info
}

🍎 Using Apple Silicon GPU (Metal Performance Shaders)
Selected device: mps


In [None]:
# Runtime configuration for generation and chunking
GEN_CONFIG = {
    "voice_preset": "v2/en_speaker_9",
    "temperature": 0.7,
    "semantic_temperature": 0.8,
    "do_sample": True,
    "seed": 42,
    # Chunking
    "chunk_max_chars": 220,
    "chunk_gap_ms": 350
}

if 'tts_system' in globals() and model_loaded:
    tts_system.set_config(
        voice_preset=GEN_CONFIG["voice_preset"],
        temperature=GEN_CONFIG["temperature"],
        semantic_temperature=GEN_CONFIG["semantic_temperature"],
        do_sample=GEN_CONFIG["do_sample"],
        seed=GEN_CONFIG["seed"]
    )

In [None]:
# TTS Model Loader Class with Multi-Device Support
class TouristGuideTTS:
    def __init__(self, device="cpu", seed: int = 42):
        self.device = device
        self.model = None
        self.processor = None
        self.sampling_rate = None
        
        # Device-specific configurations
        self.torch_dtype = self._get_optimal_dtype()
        self.voice_preset = None
        # Default generation params (can be overridden via set_config)
        self.gen_params = {
            "temperature": 0.7,
            "semantic_temperature": 0.8,
            "do_sample": True,
        }
        set_global_seed(seed)
        
    def set_config(self, voice_preset=None, temperature=None, semantic_temperature=None, do_sample=None, seed=None):
        """Set runtime configuration for voice and generation behavior."""
        if voice_preset is not None:
            self.voice_preset = voice_preset
        if temperature is not None:
            self.gen_params["temperature"] = temperature
        if semantic_temperature is not None:
            self.gen_params["semantic_temperature"] = semantic_temperature
        if do_sample is not None:
            self.gen_params["do_sample"] = do_sample
        if seed is not None:
            set_global_seed(seed)
        print("⚙️ TTS config updated:", {
            "voice_preset": self.voice_preset,
            **self.gen_params
        })
        
    def _get_optimal_dtype(self):
        """Get optimal tensor type based on device"""
        if self.device == "cuda":
            return torch.float16  # Use half precision for CUDA
        elif self.device == "mps":
            return torch.float32  # MPS works best with float32
        else:
            return torch.float32  # CPU uses float32
        
    def load_bark_model(self):
        """Load Suno Bark model for tourist guide narration"""
        try:
            from transformers import BarkModel, AutoProcessor
            self.processor = AutoProcessor.from_pretrained("suno/bark")
            
            print(f"Loading Bark model on {self.device} with {self.torch_dtype}...")
            self.model = BarkModel.from_pretrained(
                "suno/bark", 
                torch_dtype=self.torch_dtype
            )
            
            # Move model to device
            if self.device == "mps":
                # For MPS, we need to be more careful with device placement
                try:
                    self.model = self.model.to(self.device)
                except Exception as e:
                    print(f"⚠️ MPS placement failed, falling back to CPU: {e}")
                    self.device = "cpu"
                    self.torch_dtype = torch.float32
                    self.model = self.model.to(self.device)
            else:
                self.model = self.model.to(self.device)
            
            self.sampling_rate = 24000
            if self.voice_preset is None:
                self.voice_preset = "v2/en_speaker_9"  # Professional, clear voice
            print(f"✅ Bark model loaded successfully on {self.device}")
            return True
        except Exception as e:
            print(f"❌ Failed to load Bark model: {e}")
            return False
    
    def load_gtts_fallback(self):
        """Load Google TTS as fallback option"""
        try:
            from gtts import gTTS
            self.gtts = gTTS
            self.sampling_rate = 22050  # Standard for gTTS
            print("✅ Google TTS loaded as fallback")
            return True
        except Exception as e:
            print(f"❌ Failed to load Google TTS: {e}")
            return False
    
    def generate_speech(self, text, method="bark"):
        """Generate speech from text using specified method"""
        if method == "bark" and self.model is not None:
            return self._generate_bark_speech(text)
        elif method == "gtts" and hasattr(self, 'gtts'):
            return self._generate_gtts_speech(text)
        else:
            raise ValueError(f"Method '{method}' not available or not loaded")
    
    def _to_device_with_dtype(self, inputs):
        """Move inputs to target device and coerce dtype when possible."""
        out = {}
        for k, v in inputs.items():
            if hasattr(v, 'to'):
                try:
                    # Preserve integer types; coerce only floating tensors
                    if hasattr(v, 'is_floating_point') and v.is_floating_point():
                        out[k] = v.to(self.device, dtype=self.torch_dtype)
                    else:
                        out[k] = v.to(self.device)
                except Exception:
                    out[k] = v
        return out
    
    def _mps_empty_cache(self):
        try:
            if self.device == "mps" and hasattr(torch, "mps") and hasattr(torch.mps, "empty_cache"):
                torch.mps.empty_cache()
        except Exception:
            pass
    
    def _generate_bark_speech(self, text):
        """Generate speech using Bark model with device-aware processing"""
        try:
            # Ensure processor returns tensors for device moves
            inputs = self.processor(text, voice_preset=self.voice_preset, return_tensors="pt")
            
            # Move inputs to device
            inputs = self._to_device_with_dtype(inputs)
            
            from contextlib import nullcontext
            amp_ctx = (
                torch.autocast(device_type='cuda', dtype=self.torch_dtype)
                if self.device == 'cuda' else nullcontext()
            )
            
            def run_generate(extra_kwargs):
                with torch.inference_mode():
                    with amp_ctx:
                        return self.model.generate(
                            **inputs,
                            **extra_kwargs
                        )
            
            # Try with configured kwargs first
            try:
                speech_output = run_generate(self.gen_params)
            except TypeError as te:
                # Some versions/models may not accept certain kwargs like semantic_temperature
                print(f"ℹ️ Retrying generate without semantic_temperature: {te}")
                fallback_kwargs = {
                    k: v for k, v in self.gen_params.items() if k != "semantic_temperature"
                }
                try:
                    speech_output = run_generate(fallback_kwargs)
                except TypeError as te2:
                    print(f"ℹ️ Retrying generate with minimal kwargs: {te2}")
                    speech_output = run_generate({})
            
            audio_arr = speech_output[0].detach().cpu().numpy()
            return audio_arr, self.sampling_rate
        except Exception as e:
            print(f"❌ Bark generation failed: {e}")
            # Fallback to CPU if there are device issues
            if self.device != "cpu":
                print("🔄 Attempting generation on CPU...")
                self.device = "cpu"
                self.model = self.model.to("cpu")
                self._mps_empty_cache()
                return self._generate_bark_speech(text)
            raise e
    
    def _generate_gtts_speech(self, text):
        """Generate speech using Google TTS with simple retry/backoff for transient failures"""
        import time
        attempts = [0.5, 1.5, 3.0]  # backoff delays
        last_err = None
        for i, delay in enumerate([0.0] + attempts):
            try:
                if delay:
                    time.sleep(delay)
                tts = self.gtts(text=text, lang='en', slow=False)
                # Save to temporary file and load as audio
                temp_file = io.BytesIO()
                tts.write_to_fp(temp_file)
                temp_file.seek(0)
                
                # Convert to audio array
                audio_segment = AudioSegment.from_file(temp_file, format="mp3")
                audio_arr = np.array(audio_segment.get_array_of_samples(), dtype=np.float32)
                if audio_segment.channels == 2:
                    audio_arr = audio_arr.reshape((-1, 2)).mean(axis=1)
                max_abs = np.max(np.abs(audio_arr)) if audio_arr.size else 0.0
                if max_abs > 0:
                    audio_arr = audio_arr / max_abs  # Normalize
                
                return audio_arr, audio_segment.frame_rate
            except Exception as e:
                last_err = e
                print(f"⚠️ gTTS attempt {i+1} failed: {e}")
                continue
        print(f"❌ Google TTS generation failed after retries: {last_err}")
        raise last_err

# Initialize TTS system with automatic device detection
tts_system = TouristGuideTTS(device=device)

# Try to load models (in order of preference)
model_loaded = False
if tts_system.load_bark_model():
    model_loaded = True
    current_tts_method = "bark"
elif tts_system.load_gtts_fallback():
    model_loaded = True
    current_tts_method = "gtts"
else:
    print("❌ No TTS models could be loaded")

if model_loaded:
    print(f"🎯 TTS system ready using: {current_tts_method}")
    print(f"💾 Memory usage optimized for: {device}")
    if device == "mps":
        print("📱 Apple Silicon optimizations enabled")

In [None]:
# Apple Silicon Optimizations and Memory Management
def optimize_for_apple_silicon():
    """Apply Apple Silicon specific optimizations"""
    if device == "mps":
        print("🍎 Applying Apple Silicon optimizations...")
        
        # Check available memory
        available_memory_gb = psutil.virtual_memory().available / (1024**3)
        print(f"💾 Available RAM: {available_memory_gb:.1f} GB")
        
        # Optimize PyTorch for Apple Silicon
        # Use torch.mps.empty_cache() when available, plus GC
        try:
            if hasattr(torch, "mps") and hasattr(torch.mps, "empty_cache"):
                torch.mps.empty_cache()
        except Exception:
            pass
        import gc
        gc.collect()  # Force Python garbage collection
        print("   ✅ Cleared memory cache")
        
        # Set memory management strategies
        if available_memory_gb > 16:
            print("✅ High memory system detected - using full precision")
        elif available_memory_gb > 8:
            print("⚠️ Medium memory system - optimizing batch sizes")
        else:
            print("⚠️ Low memory system - using conservative settings")
            
        # Apple Silicon specific settings
        optimizations = {
            "use_mps_fallback": True,
            "batch_size": "auto",
            "memory_efficient": available_memory_gb < 16
        }
        
        return optimizations
    
    return {}

# Apply optimizations if on Apple Silicon
apple_optimizations = optimize_for_apple_silicon()

# Store optimization settings for later use
if apple_optimizations:
    print(f"✅ Applied {len(apple_optimizations)} Apple Silicon optimizations")
    # These settings can be used later in model loading and batch processing
    TTS_CONFIG.update(apple_optimizations)

# Device-specific performance tips
if device == "mps":
    print("\n🚀 Apple Silicon Performance Tips:")
    print("   • Close other memory-intensive apps")
    print("   • Use smaller chunk sizes if you encounter memory issues")
    print("   • MPS fallback to CPU is automatic if needed")
    print("   • Monitor Activity Monitor for memory pressure")
elif device == "cuda":
    print("\n🚀 NVIDIA GPU Performance Tips:")
    print("   • Monitor GPU memory usage")
    print("   • Use mixed precision for faster inference")
elif device == "cpu":
    print("\n🚀 CPU Performance Tips:")
    print("   • Consider using a GPU for faster processing")
    print("   • Process smaller text chunks to avoid memory issues")

🍎 Applying Apple Silicon optimizations...
💾 Available RAM: 100.0 GB
   ✅ Cleared memory cache
✅ High memory system detected - using full precision
✅ Applied 3 Apple Silicon optimizations

🚀 Apple Silicon Performance Tips:
   • Close other memory-intensive apps
   • Use smaller batch sizes if you encounter memory issues
   • MPS fallback to CPU is automatic if needed
   • Monitor Activity Monitor for memory pressure


## 3. Prepare Tourist Guide Content

Structure and format tourist guide text content, including location descriptions, historical facts, and visitor information.

In [8]:
# Tourist Guide Content Structure
class TouristLocation:
    def __init__(self, name, location_type, content_sections):
        self.name = name
        self.location_type = location_type  # monument, museum, park, restaurant, etc.
        self.content_sections = content_sections
        self.audio_duration = None
        self.audio_file_path = None
    
    def get_full_script(self):
        """Combine all content sections into a complete script"""
        script_parts = []
        
        # Add introduction
        if "introduction" in self.content_sections:
            script_parts.append(self.content_sections["introduction"])
        
        # Add main description
        if "description" in self.content_sections:
            script_parts.append(self.content_sections["description"])
        
        # Add historical information
        if "history" in self.content_sections:
            script_parts.append(f"Here's some historical context: {self.content_sections['history']}")
        
        # Add practical information
        if "practical_info" in self.content_sections:
            script_parts.append(f"Visitor information: {self.content_sections['practical_info']}")
        
        # Add conclusion
        if "conclusion" in self.content_sections:
            script_parts.append(self.content_sections["conclusion"])
        
        # Join with appropriate pauses
        return " ... ".join(script_parts)
    
    def to_dict(self):
        """Convert to dictionary for JSON serialization"""
        return {
            "name": self.name,
            "location_type": self.location_type,
            "content_sections": self.content_sections,
            "audio_duration": self.audio_duration,
            "audio_file_path": self.audio_file_path
        }

# Sample tourist locations for testing
sample_locations = [
    TouristLocation(
        name="Statue of Liberty",
        location_type="monument",
        content_sections={
            "introduction": "Welcome to the Statue of Liberty, one of New York's most iconic landmarks.",
            "description": "Standing at 305 feet tall, this magnificent copper statue represents freedom and democracy. The statue depicts Libertas, the Roman goddess of liberty, holding a torch and a tablet inscribed with the date of American independence.",
            "history": "The statue was a gift from France to the United States in 1886, designed by Frédéric Auguste Bartholdi. It served as a beacon of hope for millions of immigrants arriving in New York Harbor.",
            "practical_info": "Ferries depart from Battery Park every 30 minutes. Advance reservations are recommended, especially for crown access. The statue is open daily except December 25th.",
            "conclusion": "Take your time to appreciate this symbol of freedom and the incredible views of New York Harbor."
        }
    ),
    TouristLocation(
        name="Central Park",
        location_type="park",
        content_sections={
            "introduction": "You're now entering Central Park, Manhattan's green oasis spanning 843 acres.",
            "description": "This urban park features rolling meadows, tranquil lakes, and winding paths. You'll find iconic spots like Bethesda Fountain, Strawberry Fields, and the Central Park Zoo.",
            "history": "Designed by Frederick Law Olmsted and Calvert Vaux in the 1850s, Central Park was the first landscaped public park in the United States.",
            "practical_info": "The park is open from 6 AM to 1 AM daily. Bike rentals are available at several locations. Free walking tours are offered on weekends.",
            "conclusion": "Enjoy your stroll through this beautiful park and don't forget to visit the famous landmarks along the way."
        }
    )
]

print(f"✅ Created {len(sample_locations)} sample tourist locations")
for location in sample_locations:
    print(f"   • {location.name} ({location.location_type})")

✅ Created 2 sample tourist locations
   • Statue of Liberty (monument)
   • Central Park (park)


In [9]:
# Preview the full script for a location
preview_location = sample_locations[0]
script = preview_location.get_full_script()

print(f"📝 Full script for {preview_location.name}:")
print("=" * 60)
print(script)
print("=" * 60)
print(f"Script length: {len(script)} characters")

📝 Full script for Statue of Liberty:
Welcome to the Statue of Liberty, one of New York's most iconic landmarks. ... Standing at 305 feet tall, this magnificent copper statue represents freedom and democracy. The statue depicts Libertas, the Roman goddess of liberty, holding a torch and a tablet inscribed with the date of American independence. ... Here's some historical context: The statue was a gift from France to the United States in 1886, designed by Frédéric Auguste Bartholdi. It served as a beacon of hope for millions of immigrants arriving in New York Harbor. ... Visitor information: Ferries depart from Battery Park every 30 minutes. Advance reservations are recommended, especially for crown access. The statue is open daily except December 25th. ... Take your time to appreciate this symbol of freedom and the incredible views of New York Harbor.
Script length: 825 characters


## 4. Generate Audio for Single Location

Convert text content for a single tourist location into speech audio, with appropriate pacing and emphasis for guide narration.

In [None]:
# --- Chunking utilities for long scripts ---
import re

def split_into_sentences(text: str, max_chars: int = 250):
    """Split text into sentence-like chunks and pack to <= max_chars."""
    parts = re.split(r"(?<=[\.!?])\s+", text.strip())
    chunks = []
    buf = ""
    for p in parts:
        if not p:
            continue
        candidate = (buf + " " + p).strip() if buf else p
        if len(candidate) <= max_chars:
            buf = candidate
        else:
            if buf:
                chunks.append(buf)
            if len(p) <= max_chars:
                buf = p
            else:
                # hard wrap very long sentence
                for i in range(0, len(p), max_chars):
                    seg = p[i:i+max_chars]
                    if seg:
                        chunks.append(seg)
                buf = ""
    if buf:
        chunks.append(buf)
    return chunks


def concat_segments(segments, gap_ms: int = 300):
    """Concatenate pydub segments with small silences in between."""
    if not segments:
        return AudioSegment.silent(duration=0)
    gap = AudioSegment.silent(duration=gap_ms)
    out = segments[0]
    for s in segments[1:]:
        out = out + gap + s
    return out


def generate_location_audio(location, tts_system, method="bark", use_chunking=True):
    """Generate audio for a single tourist location."""
    print(f"🎙️ Generating audio for: {location.name}")
    
    # Get the complete script
    script = location.get_full_script()
    
    try:
        if use_chunking and method == "bark":
            # Split and synthesize per chunk for stability
            max_chars = GEN_CONFIG.get("chunk_max_chars", 220) if 'GEN_CONFIG' in globals() else 220
            gap_ms = GEN_CONFIG.get("chunk_gap_ms", 350) if 'GEN_CONFIG' in globals() else 350
            chunks = split_into_sentences(script, max_chars=max_chars)
            print(f"   ✂️ Chunking into {len(chunks)} pieces")
            segs = []
            sr = None
            for idx, chunk in enumerate(chunks, 1):
                audio_arr, sampling_rate = tts_system.generate_speech(chunk, method=method)
                sr = sampling_rate if sr is None else sr
                seg = audio_to_segment(audio_arr, sampling_rate)
                segs.append(seg)
                print(f"   • Chunk {idx}/{len(chunks)}: {len(seg)} ms")
            stitched = concat_segments(segs, gap_ms=gap_ms)
            location.audio_duration = len(stitched) / 1000.0
            print(f"✅ Audio generated successfully (chunked)")
            print(f"   Duration: {location.audio_duration:.1f} seconds")
            print(f"   Sampling rate: {sr} Hz")
            return np.array(stitched.get_array_of_samples(), dtype=np.int16), sr
        else:
            # Generate speech in one go
            audio_arr, sampling_rate = tts_system.generate_speech(script, method=method)
            
            # Calculate duration
            duration = len(audio_arr) / sampling_rate if sampling_rate else None
            location.audio_duration = duration
            
            print(f"✅ Audio generated successfully")
            print(f"   Duration: {duration:.1f} seconds")
            print(f"   Sampling rate: {sampling_rate} Hz")
            print(f"   Audio shape: {audio_arr.shape}")
            
            return audio_arr, sampling_rate
            
    except Exception as e:
        print(f"❌ Error generating audio: {e}")
        return None, None


def audio_to_segment(audio_arr, sampling_rate):
    """Convert numpy array to pydub AudioSegment."""
    # Ensure audio is in the right format
    if audio_arr is None or getattr(audio_arr, 'size', 0) == 0:
        return AudioSegment.silent(duration=0)
    if audio_arr.dtype != np.int16:
        # Normalize and convert to 16-bit PCM
        max_abs = np.max(np.abs(audio_arr)) if audio_arr.size else 0.0
        if max_abs > 0:
            audio_arr = (audio_arr / max_abs)
        audio_arr = (audio_arr * 32767).astype(np.int16)
    
    # Create audio segment
    audio_segment = AudioSegment(
        audio_arr.tobytes(),
        frame_rate=sampling_rate,
        sample_width=2,  # 16-bit = 2 bytes
        channels=1
    )
    
    return audio_segment

In [None]:
# --- Chunking utilities for long scripts ---
# Simple sentence splitter without external deps
import re
def split_into_sentences(text: str, max_chars: int = 250):
    # Split by sentence terminators, then pack into chunks <= max_chars
    parts = re.split(r"(?<=[\.!?])\s+", text.strip())
    chunks = []
    buf = ""
    for p in parts:
        if not p:
            continue
        candidate = (buf + " " + p).strip() if buf else p
        if len(candidate) <= max_chars:
            buf = candidate
        else:
            if buf:
                chunks.append(buf)
            if len(p) <= max_chars:
                buf = p
            else:
                # hard wrap very long sentence
                for i in range(0, len(p), max_chars):
                    seg = p[i:i+max_chars]
                    if seg:
                        chunks.append(seg)
                buf = ""
    if buf:
        chunks.append(buf)
    return chunks

def concat_segments(segments, gap_ms: int = 300):
    # Concatenate pydub segments with small silences
    gap = AudioSegment.silent(duration=gap_ms)
    out = segments[0]
    for s in segments[1:]:
        out = out + gap + s
    return out

def generate_location_audio(location, tts_system, method="bark", use_chunking=True):
    """Generate audio for a single tourist location"""
    print(f"🎙️ Generating audio for: {location.name}")
    
    # Get the complete script
    script = location.get_full_script()
    
    try:
        if use_chunking and method == "bark":
            # Split and synthesize per chunk for stability
            chunks = split_into_sentences(script, max_chars=220)
            print(f"   ✂️ Chunking into {len(chunks)} pieces")
            segs = []
            total_samples = 0
            sr = None
            for idx, chunk in enumerate(chunks, 1):
                audio_arr, sampling_rate = tts_system.generate_speech(chunk, method=method)
                sr = sampling_rate if sr is None else sr
                seg = audio_to_segment(audio_arr, sampling_rate)
                segs.append(seg)
                total_samples += len(audio_arr)
                print(f"   • Chunk {idx}/{len(chunks)}: {len(seg)} ms")
            audio_segment = concat_segments(segs, gap_ms=350)
            location.audio_duration = len(audio_segment) / 1000.0
            print(f"✅ Audio generated successfully (chunked)")
            print(f"   Duration: {location.audio_duration:.1f} seconds")
            print(f"   Sampling rate: {sr} Hz")
            return np.array(audio_segment.get_array_of_samples(), dtype=np.int16), sr
        else:
            # Generate speech in one go
            audio_arr, sampling_rate = tts_system.generate_speech(script, method=method)
            
            # Calculate duration
            duration = len(audio_arr) / sampling_rate if sampling_rate else None
            location.audio_duration = duration
            
            print(f"✅ Audio generated successfully")
            print(f"   Duration: {duration:.1f} seconds")
            print(f"   Sampling rate: {sampling_rate} Hz")
            print(f"   Audio shape: {audio_arr.shape}")
            
            return audio_arr, sampling_rate
            
    except Exception as e:
        print(f"❌ Error generating audio: {e}")
        return None, None

def audio_to_segment(audio_arr, sampling_rate):
    """Convert numpy array to pydub AudioSegment"""
    # Ensure audio is in the right format
    if audio_arr is None or audio_arr.size == 0:
        return AudioSegment.silent(duration=0)
    if audio_arr.dtype != np.int16:
        # Normalize and convert to 16-bit PCM
        max_abs = np.max(np.abs(audio_arr)) if audio_arr.size else 0.0
        if max_abs > 0:
            audio_arr = (audio_arr / max_abs)
        audio_arr = (audio_arr * 32767).astype(np.int16)
    
    # Create audio segment
    audio_segment = AudioSegment(
        audio_arr.tobytes(),
        frame_rate=sampling_rate,
        sample_width=2,  # 16-bit = 2 bytes
        channels=1
)
    
    return audio_segment

In [16]:
# Test audio generation for the first location
if model_loaded:
    test_location = sample_locations[0]
    audio_data, sample_rate = generate_location_audio(test_location, tts_system, method=current_tts_method)
    
    if audio_data is not None:
        # Display audio player
        display(Audio(audio_data, rate=sample_rate))
        
        # Convert to AudioSegment for further processing
        audio_segment = audio_to_segment(audio_data, sample_rate)
        print(f"📊 Audio segment created: {len(audio_segment)}ms duration")
else:
    print("⚠️ No TTS model loaded. Please check the model loading section above.")

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.


🎙️ Generating audio for: Statue of Liberty
✅ Audio generated successfully
   Duration: 14.0 seconds
   Sampling rate: 24000 Hz
   Audio shape: (335040,)
✅ Audio generated successfully
   Duration: 14.0 seconds
   Sampling rate: 24000 Hz
   Audio shape: (335040,)


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.


🎙️ Generating audio for: Statue of Liberty
✅ Audio generated successfully
   Duration: 14.0 seconds
   Sampling rate: 24000 Hz
   Audio shape: (335040,)
✅ Audio generated successfully
   Duration: 14.0 seconds
   Sampling rate: 24000 Hz
   Audio shape: (335040,)


📊 Audio segment created: 13960ms duration


## 5. Batch Process Multiple Locations

Process multiple tourist locations in batch, generating individual audio files for each location with consistent voice and quality.

In [None]:
def batch_process_locations(locations, tts_system, method="bark", output_dir="./tourist_guides", use_chunking=True):
    """Process multiple locations and generate audio files with device-aware optimization"""
    # Create output directory
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    results = []
    failed_locations = []
    
    print(f"🎵 Starting batch processing of {len(locations)} locations...")
    print(f"📁 Output directory: {output_path.absolute()}")
    print(f"🔧 Using device: {tts_system.device}")
    
    # Device-specific memory management
    def clear_device_cache():
        """Clear device cache based on current device"""
        if tts_system.device == "cuda":
            torch.cuda.empty_cache()
        elif tts_system.device == "mps":
            # Use torch.mps.empty_cache when available, plus gc as a fallback
            try:
                if hasattr(torch, "mps") and hasattr(torch.mps, "empty_cache"):
                    torch.mps.empty_cache()
            except Exception:
                pass
            import gc
            gc.collect()
        # CPU doesn't need cache clearing
    
    for i, location in enumerate(tqdm(locations, desc="Processing locations")):
        try:
            # Clear cache before each location for memory efficiency
            clear_device_cache()
            
            # Generate audio
            audio_data, sample_rate = generate_location_audio(location, tts_system, method=method, use_chunking=use_chunking)
            
            if audio_data is not None:
                # Convert to AudioSegment
                audio_segment = audio_to_segment(audio_data, sample_rate)
                
                # Create filename
                safe_name = "".join(c for c in location.name if c.isalnum() or c in (' ', '-', '_')).rstrip()
                safe_name = safe_name.replace(' ', '_')
                filename = f"{i+1:02d}_{safe_name}.wav"
                filepath = output_path / filename
                
                # Save audio file
                audio_segment.export(filepath, format="wav")
                location.audio_file_path = str(filepath)
                
                results.append({
                    "location": location.name,
                    "status": "success",
                    "file_path": str(filepath),
                    "duration": location.audio_duration,
                    "file_size_mb": filepath.stat().st_size / (1024 * 1024),
                    "device_used": tts_system.device
                })
                
                print(f"✅ {location.name}: {location.audio_duration:.1f}s → {filename}")
            else:
                failed_locations.append(location.name)
                results.append({
                    "location": location.name,
                    "status": "failed",
                    "error": "Audio generation failed",
                    "device_used": tts_system.device
                })
                
        except Exception as e:
            # Handle device-specific errors
            error_msg = str(e)
            if "mps" in error_msg.lower() or "metal" in error_msg.lower():
                error_msg += " (MPS/Metal issue - try reducing chunk size)"
            elif "cuda" in error_msg.lower() or "memory" in error_msg.lower():
                error_msg += " (GPU memory issue - try smaller models)"
            
            failed_locations.append(location.name)
            results.append({
                "location": location.name,
                "status": "failed",
                "error": error_msg,
                "device_used": tts_system.device
            })
            print(f"❌ {location.name}: {error_msg}")
            
            # Clear cache after errors
            clear_device_cache()
    
    # Final cache cleanup
    clear_device_cache()
    
    # Summary
    successful = len([r for r in results if r["status"] == "success"])
    print(f"\n📊 Batch processing complete:")
    print(f"   ✅ Successful: {successful}/{len(locations)}")
    print(f"   ❌ Failed: {len(failed_locations)}")
    print(f"   🔧 Device used: {tts_system.device}")
    
    if failed_locations:
        print(f"   Failed locations: {', '.join(failed_locations)}")
    
    return results, output_path

# Run batch processing if model is loaded
if model_loaded:
    batch_results, output_directory = batch_process_locations(
        sample_locations, 
        tts_system, 
        method=current_tts_method,
        use_chunking=True
    )
else:
    print("⚠️ Skipping batch processing - no TTS model loaded")

🎵 Starting batch processing of 2 locations...
📁 Output directory: /Users/david/Development/text-to-speech/tourist_guides
🔧 Using device: mps


Processing locations:   0%|          | 0/2 [00:00<?, ?it/s]

🎙️ Generating audio for: Statue of Liberty


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
Processing locations:  50%|█████     | 1/2 [01:18<01:18, 78.42s/it]The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:10000 for open-end generation.


✅ Audio generated successfully
   Duration: 14.5 seconds
   Sampling rate: 24000 Hz
   Audio shape: (347520,)
✅ Statue of Liberty: 14.5s → 01_Statue_of_Liberty.wav
🎙️ Generating audio for: Central Park


Processing locations: 100%|██████████| 2/2 [02:37<00:00, 78.85s/it]

✅ Audio generated successfully
   Duration: 14.4 seconds
   Sampling rate: 24000 Hz
   Audio shape: (346240,)
✅ Central Park: 14.4s → 02_Central_Park.wav

📊 Batch processing complete:
   ✅ Successful: 2/2
   ❌ Failed: 0
   🔧 Device used: mps





## 6. Audio Post-processing and Enhancement

Apply audio enhancements such as normalization, noise reduction, and adding background music or ambient sounds suitable for tourist guides.

In [None]:
def enhance_audio_for_tourism(audio_segment, enhancement_options=None):
    """Apply enhancements to make audio suitable for tourist guides"""
    if enhancement_options is None:
        enhancement_options = {
            "normalize": True,
            "compress": True,
            "add_silence": True,
            "fade_in_out": True
        }
    
    enhanced = audio_segment
    
    print(f"🎛️ Enhancing audio ({len(enhanced)}ms)...")
    
    # Add silence at beginning and end for smooth playback
    if enhancement_options.get("add_silence", True):
        silence_start = AudioSegment.silent(duration=500)  # 0.5 seconds
        silence_end = AudioSegment.silent(duration=1000)   # 1 second
        enhanced = silence_start + enhanced + silence_end
        print("   ✅ Added silence padding")
    
    # Normalize audio levels
    if enhancement_options.get("normalize", True):
        enhanced = normalize(enhanced)
        print("   ✅ Normalized audio levels")
    
    # Apply gentle compression for consistent volume
    if enhancement_options.get("compress", True):
        enhanced = compress_dynamic_range(enhanced, threshold=-20.0, ratio=2.0)
        print("   ✅ Applied dynamic range compression")
    
    # Add fade in/out for professional sound
    if enhancement_options.get("fade_in_out", True):
        enhanced = enhanced.fade_in(300).fade_out(500)
        print("   ✅ Added fade in/out")
    
    print(f"   📊 Final duration: {len(enhanced)}ms")
    return enhanced

def add_background_ambience(speech_audio, ambience_type="subtle", volume_reduction=-20):
    """Add subtle background ambience for tourism context"""
    # For now, we'll create simple background tones
    # In practice, you might load actual ambient sound files
    
    if ambience_type == "subtle":
        # Create very quiet background tone
        duration = len(speech_audio)
        # Generate a quiet sine wave as placeholder
        from pydub.generators import Sine
        background = Sine(220).to_audio_segment(duration=duration)  # A3 note
        background = background + volume_reduction  # Make it very quiet
        
        # Mix with speech (speech dominates)
        mixed = speech_audio.overlay(background)
        print(f"   🎵 Added subtle background ambience")
        return mixed
    
    return speech_audio

# Test enhancement on a sample if we have audio
if model_loaded and 'audio_segment' in locals():
    print("Testing audio enhancement...")
    
    # Original audio info
    print(f"Original audio: {len(audio_segment)}ms, {audio_segment.dBFS:.1f} dBFS")
    
    # Enhance the audio
    enhanced_audio = enhance_audio_for_tourism(audio_segment)
    
    # Display both original and enhanced
    print("\n🎧 Original audio:")
    display(Audio(audio_segment.get_array_of_samples(), rate=audio_segment.frame_rate))
    
    print("\n🎧 Enhanced audio:")
    display(Audio(enhanced_audio.get_array_of_samples(), rate=enhanced_audio.frame_rate))
    
    print(f"\nEnhanced audio: {len(enhanced_audio)}ms, {enhanced_audio.dBFS:.1f} dBFS")
else:
    print("⚠️ No audio available for enhancement testing")

## 7. Export Audio Files with Metadata

Save processed audio files with proper naming conventions and metadata including location names, duration, and guide information.

In [None]:
def export_enhanced_guide(location, audio_segment, output_dir="./enhanced_guides", metadata=None):
    """Export enhanced audio with metadata"""
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    # Create safe filename
    safe_name = "".join(c for c in location.name if c.isalnum() or c in (' ', '-', '_')).rstrip()
    safe_name = safe_name.replace(' ', '_')
    
    # Export in multiple formats
    exports = {}
    
    # High quality WAV for archival
    wav_path = output_path / f"{safe_name}_guide.wav"
    audio_segment.export(wav_path, format="wav")
    exports["wav"] = wav_path
    
    # Compressed MP3 for mobile apps (requires ffmpeg)
    from pydub.utils import which as _which
    if _which("ffmpeg"):
        mp3_path = output_path / f"{safe_name}_guide.mp3"
        audio_segment.export(mp3_path, format="mp3", bitrate="128k")
        exports["mp3"] = mp3_path
    else:
        print("⚠️ Skipping MP3 export: ffmpeg not found. Install with 'brew install ffmpeg' on macOS.")
    
    # Create metadata file
    metadata_info = {
        "location_name": location.name,
        "location_type": location.location_type,
        "duration_seconds": len(audio_segment) / 1000,
        "duration_formatted": f"{len(audio_segment) // 60000}:{(len(audio_segment) % 60000) // 1000:02d}",
        "file_size_mb": {
            "wav": wav_path.stat().st_size / (1024 * 1024),
            **({"mp3": exports["mp3"].stat().st_size / (1024 * 1024)} if "mp3" in exports else {})
        },
        "created_date": datetime.now().isoformat(),
        "content_sections": list(location.content_sections.keys()),
        "script_length_chars": len(location.get_full_script()),
        "audio_format": {
            "sample_rate": audio_segment.frame_rate,
            "channels": audio_segment.channels,
            "sample_width": audio_segment.sample_width
        }
    }
    
    if metadata:
        metadata_info.update(metadata)
    
    # Save metadata as JSON
    metadata_path = output_path / f"{safe_name}_metadata.json"
    with open(metadata_path, 'w', encoding='utf-8') as f:
        json.dump(metadata_info, f, indent=2, ensure_ascii=False)
    
    exports["metadata"] = metadata_path
    
    return exports, metadata_info

## Next Steps and Extensions

This notebook provides a foundation for creating tourist guide audio content. Here are some suggestions for extending the workflow:

### 🌍 **Multilingual Support**
- Add support for multiple languages
- Implement language-specific TTS models
- Create multilingual metadata

### 🎵 **Advanced Audio Features**
- Add real background ambient sounds (city sounds, nature, etc.)
- Implement audio transitions between locations
- Add music intros/outros

### 📱 **Mobile App Integration**
- Generate different quality levels for different connection speeds
- Create chunked audio for progressive loading
- Add GPS-triggered audio cues

### 🧠 **AI Enhancements**
- Use AI to generate location descriptions from photos
- Implement personalized content based on visitor interests
- Add interactive Q&A capabilities

### 📊 **Analytics and Optimization**
- Track which sections are most engaging
- A/B test different narration styles
- Optimize content length based on visitor behavior

In [None]:
# Summary of the workflow
print("🎯 Tourist Guide TTS Workflow Summary")
print("=" * 50)

if model_loaded:
    print(f"✅ TTS Model: {current_tts_method}")
    print(f"✅ Sample locations: {len(sample_locations)}")
    
    if 'batch_results' in locals():
        successful_count = len([r for r in batch_results if r["status"] == "success"])
        print(f"✅ Batch processing: {successful_count}/{len(sample_locations)} successful")
    
    if 'enhanced_audio' in locals():
        print(f"✅ Audio enhancement: Completed")
    
    if 'exports' in locals():
        print(f"✅ Export formats: {', '.join(exports.keys())}")
        
    print("\n🚀 Ready for production use!")
    print("   You can now add more locations and scale up the generation.")
else:
    print("⚠️ No TTS models loaded")
    print("   Please check the model loading section and install required dependencies.")

print("\n📚 Next steps:")
print("   1. Add more tourist locations to the sample_locations list")
print("   2. Customize voice settings and enhancement options")
print("   3. Implement multilingual support")
print("   4. Add background ambient sounds")
print("   5. Integrate with mobile app or web platform")

In [None]:
# Device-Specific Troubleshooting and Performance Monitoring
def check_device_status():
    """Check device status and provide troubleshooting information"""
    print("🔍 Device Status Check")
    print("=" * 40)
    
    # General PyTorch info
    print(f"PyTorch version: {torch.__version__}")
    print(f"Current device: {device}")
    
    if device == "cuda":
        print(f"\n🚀 NVIDIA GPU Information:")
        print(f"   GPU Name: {torch.cuda.get_device_name(0)}")
        print(f"   GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
        print(f"   Memory Allocated: {torch.cuda.memory_allocated() / 1e9:.2f} GB")
        print(f"   Memory Cached: {torch.cuda.memory_reserved() / 1e9:.2f} GB")
        
    elif device == "mps":
        print(f"\n🍎 Apple Silicon Information:")
        print(f"   MPS Available: {torch.backends.mps.is_available()}")
        print(f"   MPS Built: {torch.backends.mps.is_built()}")
        
        # System memory info
        memory = psutil.virtual_memory()
        print(f"   System Memory: {memory.total / 1e9:.1f} GB")
        print(f"   Available Memory: {memory.available / 1e9:.1f} GB")
        print(f"   Memory Usage: {memory.percent}%")
        
        # Apple Silicon specific recommendations
        print(f"\n💡 Apple Silicon Tips:")
        if memory.available / 1e9 < 4:
            print("   ⚠️ Low memory - consider closing other apps")
        print("   • Use Safari instead of Chrome to save memory")
        print("   • Close unnecessary applications")
        print("   • Monitor Activity Monitor for memory pressure")
        
    elif device == "cpu":
        print(f"\n💻 CPU Information:")
        print(f"   CPU Cores: {psutil.cpu_count()} physical, {psutil.cpu_count(logical=False)} logical")
        print(f"   CPU Usage: {psutil.cpu_percent()}%")
        print(f"   Available Memory: {psutil.virtual_memory().available / 1e9:.1f} GB")
        
        print(f"\n💡 CPU Optimization Tips:")
        print("   • Consider using a GPU for faster processing")
        print("   • Process one location at a time to avoid memory issues")
        print("   • Use Google TTS as a lightweight alternative")

def monitor_performance():
    """Monitor performance during processing"""
    import time
    
    start_time = time.time()
    
    def get_performance_stats():
        stats = {
            "elapsed_time": time.time() - start_time,
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "memory_available_gb": psutil.virtual_memory().available / 1e9
        }
        
        if device == "cuda":
            stats.update({
                "gpu_memory_allocated_gb": torch.cuda.memory_allocated() / 1e9,
                "gpu_memory_cached_gb": torch.cuda.memory_reserved() / 1e9
            })
        
        return stats
    
    return get_performance_stats

# Run device status check
check_device_status()

# Initialize performance monitor
perf_monitor = monitor_performance()

print(f"\n✅ System ready for TTS processing on {device}")
if device == "mps":
    print("🍎 Apple Silicon detected - optimizations applied")
elif device == "cuda":
    print("🚀 NVIDIA GPU detected - high performance mode")
else:
    print("💻 CPU mode - consider GPU for better performance")