In [1]:
"""
Wake Word Detection Data Generation Pipeline

This module generates training data (activates, negatives, backgrounds) 
for wake-word detection using the ElevenLabs TTS API.
"""

import os
import glob
import json
import logging
import random
import time
from pathlib import Path
from typing import List, Dict, Tuple
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
from pydub import AudioSegment
from dotenv import load_dotenv
import IPython
from IPython.display import Audio, display

In [2]:
# ============================================================================
# Configuration
# ============================================================================

@dataclass
class PipelineConfig:
    """Configuration parameters for the wake word data pipeline."""
    
    # API Configuration
    api_key: str
    elevenlabs_url: str = "https://api.elevenlabs.io/v1/text-to-speech"
    
    # Directory Configuration
    base_folder: str = "../data"
    output_folder: str = "../data/processed"
    
    # Wake Word Configuration
    wake_word: str = "Hey Jerry"
    
    # Audio Configuration
    audio_sample_rate: int = 44100
    audio_channels: int = 1
    background_duration_ms: int = 10000
    
    # Model Configuration
    tx: int = 5511
    ty: int = 1375
    n_freq: int = 101
    
    # Generation Configuration
    negatives_per_phrase: int = 4
    stability: float = 0.75
    similarity_boost: float = 0.75
    
    # Performance Configuration
    max_workers: int = 4
    retry_attempts: int = 3
    retry_delay: float = 1.0

In [3]:
# ============================================================================
# Pipeline Class
# ============================================================================

class WakeWordDataPipeline:
    """
    Pipeline for generating wake word detection training data.
    
    This class handles:
    - Positive sample generation (wake word)
    - Negative sample generation (similar words)
    - Audio format conversion
    - File organization and standardization
    - Metadata generation
    """
    
    def __init__(self, config: PipelineConfig):
        """
        Initialize the pipeline with configuration.
        
        Args:
            config: PipelineConfig object containing all settings
        """
        self.config = config
        self.logger = self._setup_logging()
        self.voice_ids = self._get_voice_ids()
        self.negative_phrases = self._get_negative_phrases()
        self.folders = self._create_directory_structure()
    
    # ------------------------------------------------------------------------
    # Setup Methods
    # ------------------------------------------------------------------------
    
    def _setup_logging(self) -> logging.Logger:
        """Configure logging for the pipeline."""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[logging.StreamHandler()]
        )
        return logging.getLogger(__name__)
    
    def _get_voice_ids(self) -> List[str]:
        """
        Get list of ElevenLabs voice IDs from a txt file.
        Lines starting with '#' are ignored.

        Returns:
            List of active voice ID strings
        """
        file_path = Path("../data/config/voice_ids.txt")
        if not file_path.exists():
            raise FileNotFoundError(f"Voice IDs file not found: {file_path}")

        with file_path.open("r") as f:
            voice_ids = [line.strip() for line in f if line.strip() and not line.startswith("#")]

        return voice_ids
    
    def _get_negative_phrases(self) -> List[str]:
        """
        Get list of negative phrases (similar-sounding words) from a txt file.
        Lines starting with '#' are ignored.

        Returns:
            List of phrase strings
        """
        file_path = Path("../data/config/negative_phrases.txt")
        if not file_path.exists():
            raise FileNotFoundError(f"Negative phrases file not found: {file_path}")

        with file_path.open("r") as f:
            negatives = [line.strip() for line in f if line.strip() and not line.startswith("#")]

        return negatives
    
    def _create_directory_structure(self) -> Dict[str, str]:
        """
        Create necessary directories for data storage.
        
        Returns:
            Dictionary mapping folder names to paths
        """
        base_path = Path(self.config.base_folder)
        
        folders = {
            'base': str(base_path),
            'activates': str(base_path / 'activates'),
            'negatives': str(base_path / 'negatives'),
            'backgrounds': str(base_path / 'backgrounds'),
            'processed': str(base_path / 'processed'),
            'metadata': str(base_path / 'metadata')
        }
        
        for folder_path in folders.values():
            os.makedirs(folder_path, exist_ok=True)
        
        return folders
    
    # ------------------------------------------------------------------------
    # API Methods
    # ------------------------------------------------------------------------
    
    def _make_api_request(
        self, 
        voice_id: str, 
        text: str, 
        output_path: str
    ) -> Tuple[bool, str]:
        """
        Make TTS API request with retry logic.
        
        Args:
            voice_id: ElevenLabs voice ID
            text: Text to synthesize
            output_path: Where to save the audio file
            
        Returns:
            Tuple of (success: bool, message: str)
        """
        url = f"{self.config.elevenlabs_url}/{voice_id}/stream"
        
        headers = {
            "xi-api-key": self.config.api_key,
            "Content-Type": "application/json"
        }
        
        payload = {
            "text": text,
            "voice_settings": {
                "stability": self.config.stability,
                "similarity_boost": self.config.similarity_boost
            }
        }
        
        for attempt in range(self.config.retry_attempts):
            try:
                response = requests.post(
                    url, 
                    headers=headers, 
                    json=payload, 
                    timeout=30
                )
                
                if response.status_code == 200:
                    with open(output_path, 'wb') as f:
                        f.write(response.content)
                    return True, f"Success: {os.path.basename(output_path)}"
                
                if attempt == self.config.retry_attempts - 1:
                    return False, f"API Error {response.status_code}: {response.text}"
                
                time.sleep(self.config.retry_delay * (2 ** attempt))
                
            except requests.exceptions.RequestException as e:
                if attempt == self.config.retry_attempts - 1:
                    return False, f"Request failed: {str(e)}"
                time.sleep(self.config.retry_delay * (2 ** attempt))
        
        return False, "Max retries exceeded"
    
    # ------------------------------------------------------------------------
    # Generation Methods
    # ------------------------------------------------------------------------
    
    def generate_positive_samples(self) -> int:
        """
        Generate positive samples (wake word audio).
        
        Returns:
            Number of successfully generated samples
        """
        self.logger.info("Generating positive samples...")
        
        tasks = [
            (
                voice_id,
                self.config.wake_word,
                os.path.join(
                    self.folders['activates'],
                    f"pos_{self.config.wake_word.replace(' ', '_').lower()}_{voice_id}.mp3"
                )
            )
            for voice_id in self.voice_ids
        ]
        
        return self._execute_generation_tasks(tasks)
    
    def generate_negative_samples(self) -> int:
        """
        Generate negative samples (similar-sounding words).
        
        Returns:
            Number of successfully generated samples
        """
        self.logger.info("Generating negative samples...")
        
        tasks = []
        for phrase in self.negative_phrases:
            selected_voices = random.sample(
                self.voice_ids,
                min(self.config.negatives_per_phrase, len(self.voice_ids))
            )
            
            for voice_id in selected_voices:
                clean_phrase = phrase.replace(" ", "_").replace("'", "").lower()
                output_path = os.path.join(
                    self.folders['negatives'],
                    f"neg_{clean_phrase}_{voice_id}.mp3"
                )
                tasks.append((voice_id, phrase, output_path))
        
        return self._execute_generation_tasks(tasks)
    
    def _execute_generation_tasks(
        self, 
        tasks: List[Tuple[str, str, str]]
    ) -> int:
        """
        Execute generation tasks in parallel.
        
        Args:
            tasks: List of (voice_id, text, output_path) tuples
            
        Returns:
            Number of successful generations
        """
        success_count = 0
        
        with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
            future_to_task = {
                executor.submit(self._make_api_request, v, t, p): (v, t, p)
                for v, t, p in tasks
            }
            
            for future in as_completed(future_to_task):
                success, message = future.result()
                if success:
                    success_count += 1
                self.logger.info(message)
        
        return success_count
    
    # ------------------------------------------------------------------------
    # Audio Processing Methods
    # ------------------------------------------------------------------------
    
    def convert_to_wav(self, folder_path: str) -> int:
        """
        Convert all audio files in folder to WAV format.
        
        Args:
            folder_path: Path to folder containing audio files
            
        Returns:
            Number of files converted
        """
        self.logger.info(f"Converting audio in {folder_path} to WAV...")
        
        audio_extensions = ['*.mp3', '*.m4a', '*.mp4', '*.flac', '*.ogg']
        audio_files = [
            f for ext in audio_extensions 
            for f in glob.glob(os.path.join(folder_path, ext))
        ]
        
        count = 0
        for file_path in audio_files:
            try:
                audio = AudioSegment.from_file(file_path)
                audio = audio.set_channels(self.config.audio_channels)
                audio = audio.set_frame_rate(self.config.audio_sample_rate)
                
                wav_path = os.path.splitext(file_path)[0] + '.wav'
                audio.export(wav_path, format='wav')
                
                os.remove(file_path)
                count += 1
                
                self.logger.info(f"Converted: {os.path.basename(file_path)}")
                
            except Exception as e:
                self.logger.error(f"Error converting {file_path}: {str(e)}")
        
        return count
    
    def standardize_filenames(self, folder_path: str, prefix: str) -> int:
        """
        Rename files to standardized format (prefix-0001.wav, etc).
        
        Args:
            folder_path: Path to folder containing files
            prefix: Prefix for renamed files (e.g., 'pos', 'neg', 'bg')
            
        Returns:
            Number of files renamed
        """
        self.logger.info(f"Standardizing filenames in {folder_path}...")
        
        wav_files = sorted(glob.glob(os.path.join(folder_path, '*.wav')))
        
        for i, file_path in enumerate(wav_files, 1):
            new_path = os.path.join(folder_path, f"{prefix}-{i:04d}.wav")
            os.rename(file_path, new_path)
        
        return len(wav_files)
    
    def process_background_audio(self) -> int:
        """
        Process background audio to standard duration and format.
        
        Returns:
            Number of background files processed
        """
        self.logger.info("Processing background audio...")
        
        bg_files = glob.glob(os.path.join(self.folders['backgrounds'], '*.wav'))
        
        for file_path in bg_files:
            audio = AudioSegment.from_wav(file_path)
            audio = audio.set_channels(self.config.audio_channels)
            audio = audio.set_frame_rate(self.config.audio_sample_rate)
            
            if len(audio) < self.config.background_duration_ms:
                silence_duration = self.config.background_duration_ms - len(audio)
                audio += AudioSegment.silent(duration=silence_duration)
            
            audio = audio[:self.config.background_duration_ms]
            audio.export(file_path, format='wav')
        
        return len(bg_files)
    
    # ------------------------------------------------------------------------
    # Utility Methods
    # ------------------------------------------------------------------------
    
    def generate_metadata(self) -> None:
        """Generate and save dataset metadata."""
        metadata = {
            'config': asdict(self.config),
            'folders': self.folders
        }
        
        metadata_path = os.path.join(
            self.folders['metadata'],
            'dataset_info.json'
        )
        
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        self.logger.info(f"Metadata saved at {metadata_path}")
    
    def load_raw_audio(self) -> Tuple[List[AudioSegment], List[AudioSegment], List[AudioSegment]]:
        """
        Load generated audio files for verification.
        
        Returns:
            Tuple of (activates, negatives, backgrounds) audio segments
        """
        activates = self._load_audio_from_folder(self.folders['activates'])
        negatives = self._load_audio_from_folder(self.folders['negatives'])
        backgrounds = self._load_audio_from_folder(self.folders['backgrounds'])
        
        return activates, negatives, backgrounds
    
    def _load_audio_from_folder(self, folder_path: str) -> List[AudioSegment]:
        """
        Load all WAV files from a folder.
        
        Args:
            folder_path: Path to folder containing WAV files
            
        Returns:
            List of AudioSegment objects
        """
        audio_segments = []
        
        for filename in os.listdir(folder_path):
            if filename.endswith(".wav"):
                file_path = os.path.join(folder_path, filename)
                audio = AudioSegment.from_wav(file_path)
                audio_segments.append(audio)
        
        return audio_segments
    
    # ------------------------------------------------------------------------
    # Main Pipeline
    # ------------------------------------------------------------------------
    
    def run_full_pipeline(self) -> None:
        """Execute the complete data generation pipeline."""
        self.logger.info("Starting full pipeline...")
        
        # Generate samples
        self.generate_positive_samples()
        self.generate_negative_samples()
        
        # Convert to WAV
        self.convert_to_wav(self.folders['activates'])
        self.convert_to_wav(self.folders['negatives'])
        self.convert_to_wav(self.folders['backgrounds'])
        
        # Standardize filenames
        self.standardize_filenames(self.folders['activates'], 'pos')
        self.standardize_filenames(self.folders['negatives'], 'neg')
        self.standardize_filenames(self.folders['backgrounds'], 'bg')
        
        # Process backgrounds
        self.process_background_audio()
        
        # Generate metadata
        self.generate_metadata()
        
        self.logger.info("Pipeline finished!")

In [4]:
# ============================================================================
# Main Execution
# ============================================================================
def main():
    """Main execution function."""
    
    # Load environment variables
    load_dotenv()
    api_key = os.getenv("ELEVEN_API_KEY")
    
    if not api_key:
        raise ValueError("ELEVEN_API_KEY not found. Please add it to your .env file.")
    
    # Create configuration
    config = PipelineConfig(
        api_key=api_key,
        base_folder="../data",
        wake_word="Hey Jerry"
    )
    
    # Initialize and run pipeline
    pipeline = WakeWordDataPipeline(config)
    pipeline.run_full_pipeline()
    
    # Load and display results
    print("\nLoading generated audio files...")
    activates, negatives, backgrounds = pipeline.load_raw_audio()
    
    print(f"✓ Loaded {len(activates)} activate samples")
    print(f"✓ Loaded {len(negatives)} negative samples")
    print(f"✓ Loaded {len(backgrounds)} background samples")
    
    print("=" * 60)
    print("DATASET SUMMARY")
    print("=" * 60)
    print(f"Total activate samples: {len(activates)}")
    print(f"Total negative samples: {len(negatives)}")
    print(f"Total background samples: {len(backgrounds)}")
    print(f"\nDataset ready for model training!")

    # return objects for inspection in notebook
    return config, activates, negatives, backgrounds


In [5]:
config, activates, negatives, backgrounds = main()

2025-10-10 10:45:37,986 - INFO - Starting full pipeline...
2025-10-10 10:45:37,988 - INFO - Generating positive samples...


2025-10-10 10:45:39,502 - INFO - Success: pos_hey_jerry_ErXwobaYiN019PkySvjV.mp3
2025-10-10 10:45:39,707 - INFO - Success: pos_hey_jerry_pNInz6obpgDQGcFmaJgB.mp3
2025-10-10 10:45:40,708 - INFO - Success: pos_hey_jerry_VR6AewLTigWG4xSOukaG.mp3
2025-10-10 10:45:40,719 - INFO - Success: pos_hey_jerry_Xb7hH8MSUJpSbSDYk0k2.mp3
2025-10-10 10:45:42,515 - INFO - Success: pos_hey_jerry_wViXBPUzp2ZZixB1xQuM.mp3
2025-10-10 10:45:44,160 - INFO - Success: pos_hey_jerry_9BWtsMINqrJLrRacOk9x.mp3
2025-10-10 10:45:44,160 - INFO - Generating negative samples...
2025-10-10 10:45:45,473 - INFO - Success: neg_terry_VR6AewLTigWG4xSOukaG.mp3
2025-10-10 10:45:45,716 - INFO - Success: neg_terry_Xb7hH8MSUJpSbSDYk0k2.mp3
2025-10-10 10:45:46,858 - INFO - Success: neg_harry_Xb7hH8MSUJpSbSDYk0k2.mp3
2025-10-10 10:45:46,919 - INFO - Success: neg_harry_VR6AewLTigWG4xSOukaG.mp3
2025-10-10 10:45:48,183 - INFO - Success: neg_harry_wViXBPUzp2ZZixB1xQuM.mp3
2025-10-10 10:45:49,118 - INFO - API Error 429: {"detail":{"statu


Loading generated audio files...
✓ Loaded 6 activate samples
✓ Loaded 14 negative samples
✓ Loaded 2 background samples
DATASET SUMMARY
Total activate samples: 6
Total negative samples: 14
Total background samples: 2

Dataset ready for model training!


In [6]:
IPython.display.Audio(config.base_folder + "/activates/pos-0001.wav")


In [7]:
IPython.display.Audio(config.base_folder + "/negatives/neg-0001.wav")

In [8]:
IPython.display.Audio(config.base_folder + "/backgrounds/bg-0001.wav")