In [12]:
import uuid
from db_utils import DatabaseManager
from text_chunking_util import SimpleTextChunkCreator
from models.db_models import AudioChunk


def saveChunksForJob(db_manager: DatabaseManager, job_id: str):
    """
    Create audio chunks from job's firstDraft and save to database
    
    Args:
        db_manager: Database manager instance
        job_id: ID of the job to process
    """
    # Fetch job
    job = db_manager.get_job(job_id)
    if not job:
        raise ValueError(f"Job with ID {job_id} not found")
    
    # Check if audioChunks already exist (idempotent behavior)
    if job.audioChunks and len(job.audioChunks) > 0:
        print(f"✓ Audio chunks already exist for job {job_id}")
        print(f"  Existing chunks: {len(job.audioChunks)}")
        return job.audioChunks
    
    # Get firstDraft text
    first_draft = job.firstDraft
    if not first_draft:
        raise ValueError(f"Job {job_id} has no firstDraft content")
    
    # Create chunks using SimpleTextChunkCreator
    chunker = SimpleTextChunkCreator()
    text_chunks = chunker.convertTextToChunks(first_draft, '\n')
    
    # Convert to AudioChunk objects with unique IDs
    audio_chunks = []
    for index, chunk_text in enumerate(text_chunks):
        chunk_id = str(uuid.uuid4())[:8]  # Short unique ID
        
        audio_chunk = AudioChunk(
            chunkId=chunk_id,
            text=chunk_text,
            outputAudioFilePath=""  # Will be populated during audio generation
        )
        audio_chunks.append(audio_chunk)
    
    # Update job with audio chunks
    job.audioChunks = audio_chunks
    db_manager.update_job_field(job_id, job)
    
    print(f"✓ Created {len(audio_chunks)} audio chunks for job {job_id}")
    return audio_chunks

In [13]:
import re
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from tts_util import OpenAiTTSProvider
from typing import Tuple, List
from models.db_models import AudioChunk


def convertTextChunksToAudio(job_id: str, db_manager: DatabaseManager) -> Tuple[List[AudioChunk], str]:
    """
    Convert text chunks to audio files using parallel processing
    
    Args:
        job_id: ID of the job to process
        db_manager: Database manager instance
        
    Returns:
        Tuple[List[AudioChunk], str]: (audio_chunks, folder_path)
    """
    # Fetch job
    job = db_manager.get_job(job_id)
    if not job:
        raise ValueError(f"Job with ID {job_id} not found")
    
    # Check if audioChunks array is populated
    if not job.audioChunks or len(job.audioChunks) == 0:
        raise ValueError(f"Job {job_id} has no audio chunks to process")
    
    # Create folder name from first chunk's text (always create for consistency)
    first_chunk_text = job.audioChunks[0].text[:50]  # Limit to first 50 chars
    # Clean text for folder name (remove special chars, keep alphanumeric and spaces)
    clean_text = re.sub(r'[^\w\s-]', '', first_chunk_text).strip()
    clean_text = re.sub(r'[-\s]+', '_', clean_text)  # Replace spaces and hyphens with underscore
    
    # Create folder with YYYYMMDD_text format
    date_str = datetime.now().strftime('%Y%m%d')
    folder_name = f"{date_str}_{clean_text}"
    folder_path = f"./audio_outputs/audio_chunks/{folder_name}"
    
    # Identify chunks that need audio generation
    chunks_to_process = []
    completed_count = 0
    
    for i, chunk in enumerate(job.audioChunks):
        if chunk.outputAudioFilePath and chunk.outputAudioFilePath.strip():
            completed_count += 1
        else:
            chunks_to_process.append((i, chunk))
    
    print(f"Audio generation status for job {job_id}:")
    print(f"  Already completed: {completed_count}")
    print(f"  Needs processing: {len(chunks_to_process)}")
    print(f"  Total chunks: {len(job.audioChunks)}")
    
    # If all chunks are already completed, return early
    if len(chunks_to_process) == 0:
        print(f"✓ All audio chunks already generated for job {job_id}")
        return job.audioChunks, folder_path
    
    print(f"Creating audio files in folder: {folder_path}")
    
    # Initialize TTS provider
    tts_provider = OpenAiTTSProvider(max_retries=3, retry_delay=1.0)
    
    def process_chunk(index, audio_chunk):
        """Process a single audio chunk"""
        try:
            file_path = tts_provider.generate_speech(
                voice="echo",  # Default voice
                instructions="Clear and engaging narration",  # Default instructions
                input_text=audio_chunk.text,
                folder=folder_path,
                chunk_index=index,
                chunk_id=audio_chunk.chunkId
            )
            return index, file_path, None
        except Exception as e:
            return index, None, str(e)
    
    # Process only chunks that need audio generation
    with ThreadPoolExecutor(max_workers=5) as executor:
        # Submit only incomplete chunks
        future_to_index = {
            executor.submit(process_chunk, i, chunk): i 
            for i, chunk in chunks_to_process
        }
        
        processed_count = 0
        failed_count = 0
        
        # Process results as they complete
        for future in as_completed(future_to_index):
            index, file_path, error = future.result()
            
            if error:
                print(f"✗ Failed to process chunk {index}: {error}")
                failed_count += 1
            else:
                # Update the chunk with the file path
                job.audioChunks[index].outputAudioFilePath = file_path
                print(f"✓ Processed chunk {index}: {file_path}")
                processed_count += 1
    
    # Update job in database
    db_manager.update_job_field(job_id, job)
    
    print(f"\n✓ Audio generation completed!")
    print(f"  Successfully processed: {processed_count}")
    print(f"  Failed: {failed_count}")
    print(f"  Previously completed: {completed_count}")
    print(f"  Total chunks: {len(job.audioChunks)}")
    
    return job.audioChunks, folder_path

In [None]:
import os
from pydub import AudioSegment
from pydub.silence import Silence


def stitchAudioChunks(job_id: str, folder_path: str, db_manager: DatabaseManager, gap_seconds: float = 1.0) -> str:
    """
    Stitch audio chunks into a single final audio file
    
    Args:
        job_id: ID of the job to process
        folder_path: Path to folder containing audio chunk files
        db_manager: Database manager instance
        gap_seconds: Gap in seconds between audio chunks (default: 1.0)
        
    Returns:
        str: Path to the final stitched audio file
    """
    # Fetch job
    job = db_manager.get_job(job_id)
    if not job:
        raise ValueError(f"Job with ID {job_id} not found")
    
    # Check if audioChunks array is populated
    if not job.audioChunks or len(job.audioChunks) == 0:
        raise ValueError(f"Job {job_id} has no audio chunks to stitch")
    
    # Check if all chunks have outputAudioFilePath
    missing_audio_chunks = []
    for i, chunk in enumerate(job.audioChunks):
        if not chunk.outputAudioFilePath or not chunk.outputAudioFilePath.strip():
            missing_audio_chunks.append(i)
    
    if missing_audio_chunks:
        raise ValueError(f"Job {job_id} has chunks without audio files at indices: {missing_audio_chunks}")
    
    print(f"Stitching {len(job.audioChunks)} audio chunks for job {job_id}")
    print(f"Gap between chunks: {gap_seconds} seconds")
    
    # Create gap audio segment
    gap_ms = int(gap_seconds * 1000)  # Convert to milliseconds
    gap_audio = AudioSegment.silent(duration=gap_ms)
    
    # Load and stitch audio chunks sequentially
    final_audio = None
    successful_chunks = 0
    
    for i, chunk in enumerate(job.audioChunks):
        try:
            # Load audio file
            audio_file_path = chunk.outputAudioFilePath
            if not os.path.exists(audio_file_path):
                print(f"⚠️  Audio file not found: {audio_file_path}")
                continue
                
            chunk_audio = AudioSegment.from_wav(audio_file_path)
            
            # Add to final audio
            if final_audio is None:
                final_audio = chunk_audio
            else:
                # Add gap, then the chunk
                final_audio = final_audio + gap_audio + chunk_audio
            
            successful_chunks += 1
            print(f"✓ Added chunk {i+1}/{len(job.audioChunks)}: {os.path.basename(audio_file_path)}")
            
        except Exception as e:
            print(f"✗ Failed to process chunk {i}: {e}")
    
    if final_audio is None or successful_chunks == 0:
        raise ValueError(f"No audio chunks could be processed for job {job_id}")
    
    # Create output directory
    output_dir = "./audio_outputs/final_audio"
    os.makedirs(output_dir, exist_ok=True)
    
    # Generate filename from folder path
    folder_name = os.path.basename(folder_path.rstrip('/'))
    final_filename = f"{folder_name}.wav"
    final_file_path = os.path.join(output_dir, final_filename)
    
    # Export final audio
    final_audio.export(final_file_path, format="wav")
    
    # Update job with final audio path
    job.finalAudioFilePath = final_file_path
    db_manager.update_job_field(job_id, job)
    
    print(f"\n✓ Audio stitching completed!")
    print(f"  Processed chunks: {successful_chunks}/{len(job.audioChunks)}")
    print(f"  Final audio duration: {len(final_audio)/1000:.1f} seconds")
    print(f"  Final audio saved to: {final_file_path}")
    
    return final_file_path

In [14]:
# Test tts_util.py
from tts_util import OpenAiTTSProvider

ttsProvider = OpenAiTTSProvider()
print(ttsProvider.generate_speech('echo', 'Calm and soothing', 'Hello, world! How is it going. I am Abhilash. This is a test audio. I am an entrepreneure. I like to build cool things. You mother fucker', './audio_outputs', 1, 'test'))

INFO:tts_util:Generating audio for chunk 1_test, attempt 1
INFO:tts_util:Successfully generated audio: ./audio_outputs/001_test.wav


./audio_outputs/001_test.wav


In [15]:
# Test chunk creation util
from text_chunking_util import SimpleTextChunkCreator
chunkCreator = SimpleTextChunkCreator()
text = """Hey there. I am testing chunking logic.It should chunk based on new line characters."""
chunks = chunkCreator.convertTextToChunks(text, "\n")
print(chunks)

['Hey there. I am testing chunking logic.It should chunk based on new line characters.']


In [None]:
import os
from dotenv import load_dotenv
from db_utils import DatabaseManager

if __name__ == "__main__":
    load_dotenv()
    
    # Initialize database manager
    db_manager = DatabaseManager()
    
    # Set the job ID (replace with actual job ID from story generation)
    job_id = "68d19cc430f1f23c880f32a4"
    
    # Step 1: Create and save audio chunks for the job
    print("=== STEP 1: Creating Audio Chunks ===")
    audio_chunks = saveChunksForJob(db_manager, job_id)
    
    print(f"Processed {len(audio_chunks)} chunks:")
    for i, chunk in enumerate(audio_chunks):
        print(f"  Chunk {i+1}: ID={chunk.chunkId}, Length={len(chunk.text)} chars")
    
    # Step 2: Convert text chunks to audio files
    print("\n=== STEP 2: Converting Chunks to Audio ===")
    processed_chunks, folder_path = convertTextChunksToAudio(job_id, db_manager)
    
    print(f"\n✓ Audio pipeline completed!")
    print(f"  Audio files saved to: {folder_path}")
    print(f"  Total processed chunks: {len(processed_chunks)}")
    
    # Step 3: Stitch audio chunks into final audio file
    print("\n=== STEP 3: Stitching Audio Chunks ===")
    final_audio_path = stitchAudioChunks(job_id, folder_path, db_manager, gap_seconds=1.0)
    
    print(f"\n🎉 Complete audio generation pipeline finished!")
    print(f"  Final audio file: {final_audio_path}")
    
    # Close DB connection when done
    db_manager.close()