# Audio Transcription Pipeline

This notebook transcribes all audio files in a GCS bucket using Gemini 2.5 Pro and uploads the transcripts to the same bucket under a 'transcripts' folder.

In [None]:
import os
import sys
import yaml
import pandas as pd
from pathlib import Path
from datetime import datetime
import json
from tqdm import tqdm
from IPython.display import display, HTML, clear_output
import concurrent.futures
from threading import Lock
import time

# Add src directory to path
sys.path.append('../src')

from clients.gcs_client import GCSClient
from clients.gemini_client import GeminiClient
from utils.prompt_manager import PromptManager

## Configuration

Load the configuration and set up clients for GCS, Gemini, and Prompt Manager.

In [None]:
# --- CONFIGURATION ---
CONFIG_PATH = '../config/sample_experiments.yaml'

# Load YAML config
try:
    with open(CONFIG_PATH, 'r') as f:
        config = yaml.safe_load(f)
    print("Configuration loaded successfully.")
except FileNotFoundError:
    print(f"ERROR: Configuration file not found at {CONFIG_PATH}")
    config = {}

project_id = config.get('project')
location = config.get('location')
bucket_name = config.get('bucket')

# Model configuration for Gemini 2.5 Pro
MODEL_ID = "gemini-2.0-flash-exp"  # Using Gemini 2.0 Flash as it supports audio
GENERATION_CONFIG = {
    "temperature": 0.1,  # Low temperature for accurate transcription
    "top_p": 0.8,
    "top_k": 32,
    "max_output_tokens": 8192,
}

print(f"Project: {project_id}")
print(f"Location: {location}")
print(f"Bucket: {bucket_name}")
print(f"Model: {MODEL_ID}")

In [None]:
# Initialize clients
try:
    gcs_client = GCSClient(bucket_name=bucket_name)
    prompt_manager = PromptManager(project=project_id, location=location)
    gemini_client = GeminiClient(
        model_id=MODEL_ID, 
        config={'project_id': project_id, 'location': location}
    )
    print("✅ All clients initialized successfully.")
except Exception as e:
    print(f"❌ Error initializing clients: {e}")
    raise

## Prompt Configuration

Set your transcription prompt ID here. The prompt should instruct the model to transcribe the audio accurately.

In [None]:
# Set your transcription prompt ID
TRANSCRIPTION_PROMPT_ID = "your-transcription-prompt-id"  # Replace with your actual prompt ID

# Load the transcription prompt
try:
    transcription_prompt = prompt_manager.load(TRANSCRIPTION_PROMPT_ID)
    print("✅ Transcription prompt loaded successfully:")
    print(f"Prompt: {transcription_prompt}")
except Exception as e:
    print(f"❌ Error loading transcription prompt: {e}")
    print("Please ensure you have created a transcription prompt and set the correct ID above.")
    # Use a default prompt for demonstration
    transcription_prompt = """Please transcribe the following audio accurately. 
    Return only the transcription text without any additional commentary or formatting."""
    print(f"Using default prompt: {transcription_prompt}")

## Audio File Discovery

Find all audio files in the bucket that need to be transcribed.

In [None]:
# Define audio file patterns to search for
audio_patterns = [
    "*.wav",
    "*.mp3", 
    "*.m4a",
    "*.flac",
    "calls/*.wav",
    "calls/*.mp3",
    "recordings/*.wav",
    "recordings/*.mp3"
]

print("Searching for audio files...")
audio_files = gcs_client.list_audio_files(audio_patterns)

print(f"✅ Found {len(audio_files)} audio files")

if audio_files:
    # Display first few files
    df_audio = pd.DataFrame(audio_files[:10], columns=['Audio File URI'])
    display(df_audio)
    if len(audio_files) > 10:
        print(f"... and {len(audio_files) - 10} more files")
else:
    print("No audio files found. Please check your bucket and patterns.")

## Helper Functions

Define utility functions for processing audio files and handling transcriptions.

In [None]:
def get_mime_type_from_path(path: str) -> str:
    """Get MIME type from file path based on extension."""
    extension = path.lower().split('.')[-1]
    mime_types = {
        'wav': 'audio/wav',
        'mp3': 'audio/mpeg',
        'm4a': 'audio/mp4',
        'flac': 'audio/flac',
        'aac': 'audio/aac'
    }
    return mime_types.get(extension, 'audio/wav')

def generate_transcript_path(audio_path: str) -> str:
    """Generate the transcript file path in the transcripts folder."""
    # Remove file extension and add .txt
    path_without_ext = Path(audio_path).with_suffix('')
    return f"transcripts/{path_without_ext.name}_transcript.txt"

def check_transcript_exists(transcript_path: str) -> bool:
    """Check if transcript already exists in GCS to avoid reprocessing."""
    try:
        gcs_client.get_file_metadata(transcript_path)
        return True
    except:
        return False

def transcribe_audio_file_optimized(gcs_uri: str, prompt: str, skip_existing: bool = True) -> dict:
    """Transcribe a single audio file with optimizations."""
    try:
        # Parse GCS URI to get the path
        bucket_name, gcs_path = gcs_client.parse_gcs_uri(gcs_uri)
        
        # Check if transcript already exists
        transcript_path = generate_transcript_path(gcs_path)
        if skip_existing and check_transcript_exists(transcript_path):
            return {
                'success': True,
                'transcript': '[SKIPPED - Already exists]',
                'metadata': {'skipped': True},
                'audio_path': gcs_path,
                'mime_type': 'skipped',
                'error': None,
                'transcript_path': transcript_path,
                'saved_to_gcs': True
            }
        
        # Download audio data
        audio_data = gcs_client.download_bytes(gcs_path)
        mime_type = get_mime_type_from_path(gcs_path)
        
        # Generate transcription using Gemini
        result = gemini_client.generate_from_audio(
            audio_data=audio_data,
            prompt=prompt,
            generation_config=GENERATION_CONFIG,
            mime_type=mime_type
        )
        
        transcript_text = result.get('response_text', '')
        metadata = result.get('metadata', {})
        
        return {
            'success': True,
            'transcript': transcript_text,
            'metadata': metadata,
            'audio_path': gcs_path,
            'mime_type': mime_type,
            'error': None
        }
        
    except Exception as e:
        return {
            'success': False,
            'transcript': '',
            'metadata': {},
            'audio_path': gcs_path if 'gcs_path' in locals() else gcs_uri,
            'mime_type': '',
            'error': str(e)
        }

def save_transcript_to_gcs_optimized(transcript: str, transcript_path: str) -> bool:
    """Save transcript to GCS as plain text - optimized version."""
    try:
        # Use GCS client's direct upload from string (if available)
        # Create temporary local file with just the transcript text
        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write(transcript)
            temp_path = f.name
        
        try:
            # Upload to GCS
            gcs_client.upload_file(temp_path, transcript_path)
            return True
        finally:
            # Clean up temp file
            os.unlink(temp_path)
            
    except Exception as e:
        print(f"Error saving transcript: {e}")
        return False

def process_audio_batch(audio_files_batch: list, prompt: str, skip_existing: bool = True) -> list:
    """Process a batch of audio files."""
    results = []
    for audio_file in audio_files_batch:
        # Transcribe audio
        result = transcribe_audio_file_optimized(audio_file, prompt, skip_existing)
        
        if result['success'] and not result['metadata'].get('skipped', False):
            # Save transcript to GCS
            transcript_path = generate_transcript_path(result['audio_path'])
            if save_transcript_to_gcs_optimized(result['transcript'], transcript_path):
                result['transcript_path'] = transcript_path
                result['saved_to_gcs'] = True
            else:
                result['saved_to_gcs'] = False
        elif result['metadata'].get('skipped', False):
            result['saved_to_gcs'] = True  # Already exists
        else:
            result['saved_to_gcs'] = False
            
        results.append(result)
    
    return results

print("✅ Optimized helper functions defined successfully.")

## Test Run - Process First 2 Files

Let's test the transcription pipeline with the first 2 audio files to ensure everything works correctly.

In [None]:
# Test with first 2 files
test_files = audio_files[:2] if len(audio_files) >= 2 else audio_files
print(f"🧪 Testing transcription with {len(test_files)} file(s)...")

test_results = []

for i, audio_file in enumerate(test_files, 1):
    print(f"\n--- Processing Test File {i}/{len(test_files)} ---")
    print(f"File: {audio_file}")
    
    # Transcribe the audio
    result = transcribe_audio_file_optimized(audio_file, transcription_prompt)
    
    if result['success']:
        if result['metadata'].get('skipped', False):
            print(f"⏭️  File already transcribed, skipping")
        else:
            print(f"✅ Transcription successful")
            print(f"Transcript length: {len(result['transcript'])} characters")
            print(f"Processing time: {result['metadata'].get('latency_seconds', 'N/A')} seconds")
            
            # Show first 200 characters of transcript
            preview = result['transcript'][:200] + "..." if len(result['transcript']) > 200 else result['transcript']
            print(f"Preview: {preview}")
        
        # Generate transcript path
        transcript_path = generate_transcript_path(result['audio_path'])
        print(f"Will save to: gs://{bucket_name}/{transcript_path}")
        
        # Save transcript to GCS if not skipped
        if not result['metadata'].get('skipped', False):
            if save_transcript_to_gcs_optimized(result['transcript'], transcript_path):
                print("✅ Transcript saved to GCS")
                result['transcript_path'] = transcript_path
            else:
                print("❌ Failed to save transcript to GCS")
        else:
            result['transcript_path'] = transcript_path
            
    else:
        print(f"❌ Transcription failed: {result['error']}")
    
    test_results.append(result)
    print("-" * 50)

# Summary of test results
successful_tests = sum(1 for r in test_results if r['success'])
print(f"\n🧪 Test Summary: {successful_tests}/{len(test_results)} files processed successfully")

if successful_tests > 0:
    print("✅ Test run completed successfully! Ready to process all files.")
else:
    print("❌ Test run failed. Please check the errors above before proceeding.")

## Full Transcription Pipeline (Optimized)

Process all audio files in the bucket using parallel processing and other optimizations for speed.

In [None]:
# Performance Configuration
MAX_WORKERS = 5  # Number of parallel workers (adjust based on API rate limits)
BATCH_SIZE = 10  # Files to process in each batch
SKIP_EXISTING = True  # Skip files that already have transcripts

print(f"⚡ Performance Settings:")
print(f"   Max parallel workers: {MAX_WORKERS}")
print(f"   Batch size: {BATCH_SIZE}")
print(f"   Skip existing transcripts: {SKIP_EXISTING}")
print(f"   Total files to process: {len(audio_files)}")

# Check how many already exist
if SKIP_EXISTING:
    existing_count = 0
    print("🔍 Checking for existing transcripts...")
    for audio_file in audio_files[:10]:  # Sample check
        _, gcs_path = gcs_client.parse_gcs_uri(audio_file)
        transcript_path = generate_transcript_path(gcs_path)
        if check_transcript_exists(transcript_path):
            existing_count += 1
    
    estimated_existing = (existing_count / min(10, len(audio_files))) * len(audio_files)
    print(f"📊 Estimated {estimated_existing:.0f} files already transcribed (will be skipped)")

In [None]:
# Optimized Parallel Processing Pipeline
total_files = len(audio_files)
print(f"🚀 Starting OPTIMIZED transcription of {total_files} audio files...")
print("⚡ Using parallel processing for maximum speed!")

# Confirm before processing all files
print(f"⚠️  About to process {total_files} audio files with {MAX_WORKERS} parallel workers.")
print("This will be much faster but may incur API costs.")
print("\nPress Enter to continue or Ctrl+C to cancel...")
input()

# Initialize results tracking
all_results = []
successful_transcriptions = 0
failed_transcriptions = 0
skipped_transcriptions = 0
start_time = datetime.now()

# Thread-safe counters
results_lock = Lock()

def update_counters(results_batch):
    global successful_transcriptions, failed_transcriptions, skipped_transcriptions
    with results_lock:
        for result in results_batch:
            if result['success']:
                if result['metadata'].get('skipped', False):
                    skipped_transcriptions += 1
                else:
                    successful_transcriptions += 1
            else:
                failed_transcriptions += 1

# Create batches for processing
audio_batches = [audio_files[i:i + BATCH_SIZE] for i in range(0, len(audio_files), BATCH_SIZE)]
total_batches = len(audio_batches)

print(f"📦 Created {total_batches} batches of ~{BATCH_SIZE} files each")

# Process batches in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # Submit all batch jobs
    future_to_batch = {
        executor.submit(process_audio_batch, batch, transcription_prompt, SKIP_EXISTING): i 
        for i, batch in enumerate(audio_batches)
    }
    
    # Process completed batches with progress bar
    with tqdm(total=total_batches, desc="Processing batches") as pbar:
        for future in concurrent.futures.as_completed(future_to_batch):
            batch_idx = future_to_batch[future]
            
            try:
                batch_results = future.result()
                all_results.extend(batch_results)
                update_counters(batch_results)
                
                # Update progress
                pbar.set_postfix({
                    'Success': successful_transcriptions,
                    'Skipped': skipped_transcriptions, 
                    'Failed': failed_transcriptions
                })
                pbar.update(1)
                
            except Exception as e:
                print(f"\n❌ Error processing batch {batch_idx}: {e}")
                # Add failed results for this batch
                batch_size = len(audio_batches[batch_idx])
                failed_results = [{
                    'success': False,
                    'transcript': '',
                    'metadata': {},
                    'audio_path': f'batch_{batch_idx}_file_{i}',
                    'mime_type': '',
                    'error': str(e),
                    'saved_to_gcs': False
                } for i in range(batch_size)]
                
                all_results.extend(failed_results)
                with results_lock:
                    failed_transcriptions += batch_size
                pbar.update(1)

# Final summary
end_time = datetime.now()
total_time = end_time - start_time

print("\n🎉 OPTIMIZED TRANSCRIPTION PIPELINE COMPLETED!")
print("=" * 60)
print(f"📊 FINAL SUMMARY:")
print(f"   Total files processed: {len(all_results)}")
print(f"   ✅ Successful transcriptions: {successful_transcriptions}")
print(f"   ⏭️  Skipped (already existed): {skipped_transcriptions}")
print(f"   ❌ Failed transcriptions: {failed_transcriptions}")
print(f"   ⏱️  Total processing time: {total_time}")
print(f"   📈 Success rate: {((successful_transcriptions + skipped_transcriptions)/len(all_results)*100):.1f}%")

if successful_transcriptions > 0:
    avg_time = total_time.total_seconds() / successful_transcriptions
    print(f"   ⏱️  Average time per NEW transcription: {avg_time:.1f} seconds")

print(f"\n⚡ Speed improvement: ~{MAX_WORKERS}x faster with parallel processing!")
print(f"💰 Cost savings: Skipped {skipped_transcriptions} already-processed files")

## Legacy Sequential Pipeline (Slower)

This is the original sequential processing method. Use only if you encounter issues with the parallel version above.

In [None]:
# LEGACY SEQUENTIAL PROCESSING (Use only if parallel version fails)
# Confirm before processing all files
total_files = len(audio_files)
print(f"⚠️  About to process {total_files} audio files SEQUENTIALLY (SLOW).")
print("Consider using the parallel version above for much faster processing.")
print("This may take a significant amount of time and will incur API costs.")
print("\nPress Enter to continue or Ctrl+C to cancel...")
input()

print(f"🐌 Starting SEQUENTIAL transcription of {total_files} audio files...")

# Initialize results tracking
all_results = []
successful_transcriptions = 0
failed_transcriptions = 0
start_time = datetime.now()

# Process all files with progress bar
for i, audio_file in enumerate(tqdm(audio_files, desc="Transcribing audio files"), 1):
    try:
        # Clear previous output and show current progress
        clear_output(wait=True)
        print(f"🎵 Processing file {i}/{total_files}")
        print(f"Current file: {audio_file}")
        print(f"✅ Successful: {successful_transcriptions}")
        print(f"❌ Failed: {failed_transcriptions}")
        
        elapsed = datetime.now() - start_time
        if i > 1:
            avg_time_per_file = elapsed.total_seconds() / (i - 1)
            remaining_files = total_files - i + 1
            eta = remaining_files * avg_time_per_file
            print(f"⏱️  ETA: {eta/60:.1f} minutes")
        
        # Transcribe the audio file
        result = transcribe_audio_file_optimized(audio_file, transcription_prompt)
        
        if result['success']:
            # Generate transcript path and save to GCS
            transcript_path = generate_transcript_path(result['audio_path'])
            
            if not result['metadata'].get('skipped', False):
                if save_transcript_to_gcs_optimized(result['transcript'], transcript_path):
                    result['transcript_path'] = transcript_path
                    result['saved_to_gcs'] = True
                    successful_transcriptions += 1
                else:
                    result['saved_to_gcs'] = False
                    failed_transcriptions += 1
            else:
                result['transcript_path'] = transcript_path
                result['saved_to_gcs'] = True
                successful_transcriptions += 1
        else:
            failed_transcriptions += 1
        
        all_results.append(result)
        
    except KeyboardInterrupt:
        print("\n⏹️  Processing interrupted by user")
        break
    except Exception as e:
        print(f"\n❌ Unexpected error processing {audio_file}: {e}")
        failed_transcriptions += 1
        all_results.append({
            'success': False,
            'transcript': '',
            'metadata': {},
            'audio_path': audio_file,
            'mime_type': '',
            'error': str(e),
            'saved_to_gcs': False
        })

# Final summary
end_time = datetime.now()
total_time = end_time - start_time

clear_output(wait=True)
print("🎉 SEQUENTIAL TRANSCRIPTION PIPELINE COMPLETED!")
print("=" * 50)
print(f"📊 SUMMARY:")
print(f"   Total files processed: {len(all_results)}")
print(f"   ✅ Successful transcriptions: {successful_transcriptions}")
print(f"   ❌ Failed transcriptions: {failed_transcriptions}")
print(f"   ⏱️  Total processing time: {total_time}")
print(f"   📈 Success rate: {(successful_transcriptions/len(all_results)*100):.1f}%")

if successful_transcriptions > 0:
    avg_time = total_time.total_seconds() / successful_transcriptions
    print(f"   ⏱️  Average time per file: {avg_time:.1f} seconds")

## Results Analysis

Analyze the transcription results and display detailed statistics.

In [None]:
# Create detailed results DataFrame
results_data = []

for result in all_results:
    results_data.append({
        'Audio File': result.get('audio_path', 'Unknown'),
        'Success': result['success'],
        'Transcript Length': len(result['transcript']) if result['transcript'] else 0,
        'Processing Time (s)': result.get('metadata', {}).get('latency_seconds', 0),
        'Saved to GCS': result.get('saved_to_gcs', False),
        'Error': result.get('error', '')
    })

df_results = pd.DataFrame(results_data)

print("📋 DETAILED RESULTS:")
print("=" * 50)

# Display successful transcriptions
successful_df = df_results[df_results['Success'] == True]
if len(successful_df) > 0:
    print(f"\n✅ SUCCESSFUL TRANSCRIPTIONS ({len(successful_df)} files):")
    display(successful_df[['Audio File', 'Transcript Length', 'Processing Time (s)', 'Saved to GCS']])
    
    print(f"\n📊 STATISTICS:")
    print(f"   Average transcript length: {successful_df['Transcript Length'].mean():.0f} characters")
    print(f"   Average processing time: {successful_df['Processing Time (s)'].mean():.1f} seconds")
    print(f"   Total characters transcribed: {successful_df['Transcript Length'].sum():,}")

# Display failed transcriptions
failed_df = df_results[df_results['Success'] == False]
if len(failed_df) > 0:
    print(f"\n❌ FAILED TRANSCRIPTIONS ({len(failed_df)} files):")
    display(failed_df[['Audio File', 'Error']])

# Save results to CSV for future reference
results_filename = f"transcription_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_results.to_csv(results_filename, index=False)
print(f"\n💾 Results saved to: {results_filename}")

## Verify Transcripts in GCS

Check that the transcripts were successfully uploaded to the GCS bucket.

In [None]:
# List transcript files in GCS
print("🔍 Checking transcripts in GCS bucket...")

try:
    # List files in the transcripts folder
    transcript_patterns = ["transcripts/*.json", "transcripts/*.txt"]
    transcript_files = gcs_client.list_audio_files(transcript_patterns)
    
    print(f"✅ Found {len(transcript_files)} transcript files in GCS:")
    
    if transcript_files:
        # Display transcript files
        df_transcripts = pd.DataFrame(transcript_files, columns=['Transcript File URI'])
        display(df_transcripts)
        
        # Sample a transcript file to verify content
        if len(transcript_files) > 0:
            print(f"\n📄 Sampling content from: {transcript_files[0]}")
            try:
                bucket_name, sample_path = gcs_client.parse_gcs_uri(transcript_files[0])
                sample_content = gcs_client.download_bytes(sample_path).decode('utf-8')
                
                # Display the raw transcript content
                print(f"Transcript preview: {sample_content[:300]}...")
                    
            except Exception as e:
                print(f"Error reading sample file: {e}")
    else:
        print("⚠️  No transcript files found in the transcripts folder.")
        
except Exception as e:
    print(f"❌ Error checking transcripts in GCS: {e}")

print(f"\n🎯 Transcription pipeline completed!")
print(f"📁 All transcripts are saved in: gs://{bucket_name}/transcripts/")