<a href="https://colab.research.google.com/github/SingularitySmith/PRUT-Transcriber/blob/main/PRUT_Transcriber4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# WhisperX Ultra-Resilient Transcription System
# Single-cell design with aggressive memory management

"""
CODE BLOCK 1: COMPLETE SYSTEM - RUN THIS SINGLE CELL REPEATEDLY
This cell contains the entire system and can be run after crashes
"""

import os
import json
import time
import gc
import subprocess
import sys
from datetime import datetime


In [3]:

# ============================================
# CODE BLOCK 1.1: CONFIGURATION
# ============================================

SOURCE_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Recordings_PRUT'
OUTPUT_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Transcripts'
WAV_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/WAV_Cache'
CHECKPOINT_FILE = '/content/drive/MyDrive/PRUT-Transcriptions/checkpoint.json'
LOG_FILE = '/content/drive/MyDrive/PRUT-Transcriptions/processing.log'

# Create directories
for dir_path in [OUTPUT_DIR, WAV_DIR]:
    os.makedirs(dir_path, exist_ok=True)


In [4]:

# ============================================
# CODE BLOCK 1.2: LOGGING SYSTEM
# ============================================

def log_message(message, level="INFO"):
    """Log to both console and file"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {level}: {message}"
    print(log_entry)

    try:
        with open(LOG_FILE, 'a') as f:
            f.write(log_entry + "\n")
    except:
        pass  # Don't fail if can't write log


In [5]:

# ============================================
# CODE BLOCK 1.3: CHECKPOINT MANAGEMENT
# ============================================

def load_checkpoint():
    """Load progress from checkpoint file"""
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            return json.load(f)
    return {
        'processed_files': [],
        'failed_files': {},
        'current_mode': 'ultra_minimal',  # Start with absolute minimum
        'model_loaded': None,
        'last_update': None,
        'session_count': 0
    }

def save_checkpoint(checkpoint):
    """Save progress to checkpoint file"""
    checkpoint['last_update'] = datetime.now().isoformat()
    checkpoint['session_count'] = checkpoint.get('session_count', 0) + 1
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump(checkpoint, f, indent=2)
    log_message(f"Checkpoint saved (session #{checkpoint['session_count']})")


In [6]:

# ============================================
# CODE BLOCK 1.4: PROCESSING MODES
# ============================================

PROCESSING_MODES = {
    'ultra_minimal': {
        'method': 'whisper_api',  # Use OpenAI Whisper API directly
        'model': 'tiny',           # Smallest possible model
        'skip_vad': True,          # Skip voice activity detection
        'skip_align': True,        # Skip alignment
        'skip_diarize': True,      # Skip diarization
        'chunk_duration': 180      # 3-minute chunks
    },
    'minimal': {
        'method': 'whisperx',
        'model': 'base',
        'skip_vad': True,
        'skip_align': False,
        'skip_diarize': True,
        'chunk_duration': 300
    },
    'standard': {
        'method': 'whisperx',
        'model': 'small',
        'skip_vad': False,
        'skip_align': False,
        'skip_diarize': True,
        'chunk_duration': 600
    },
    'high': {
        'method': 'whisperx',
        'model': 'medium',
        'skip_vad': False,
        'skip_align': False,
        'skip_diarize': False,
        'chunk_duration': 900
    }
}


In [7]:

# ============================================
# CODE BLOCK 1.5: SAFE DEPENDENCY INSTALLATION
# ============================================

def ensure_dependencies(mode):
    """Install only necessary dependencies for current mode"""
    log_message(f"Checking dependencies for {mode} mode...")

    # Basic dependencies always needed
    basic_deps = ['pydub']
    for dep in basic_deps:
        try:
            __import__(dep)
        except ImportError:
            log_message(f"Installing {dep}...")
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', dep])

    # FFmpeg
    if subprocess.call(['which', 'ffmpeg'], stdout=subprocess.DEVNULL) != 0:
        log_message("Installing ffmpeg...")
        subprocess.call(['apt-get', '-qq', 'update'])
        subprocess.call(['apt-get', '-qq', 'install', 'ffmpeg'])

    # Mode-specific dependencies
    if PROCESSING_MODES[mode]['method'] == 'whisper_api':
        try:
            import whisper
        except ImportError:
            log_message("Installing OpenAI Whisper...")
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'openai-whisper'])

    elif PROCESSING_MODES[mode]['method'] == 'whisperx':
        try:
            import whisperx
        except ImportError:
            log_message("Installing WhisperX...")
            # Install with specific order to avoid conflicts
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'torch==2.0.0', 'torchaudio==2.0.0', '--index-url', 'https://download.pytorch.org/whl/cu118'])
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', 'git+https://github.com/m-bain/whisperx.git'])


In [8]:

# ============================================
# CODE BLOCK 1.6: ULTRA-MINIMAL TRANSCRIPTION
# ============================================

def transcribe_ultra_minimal(audio_path):
    """Use OpenAI Whisper directly - most stable option"""
    import whisper

    log_message("Loading tiny Whisper model...")
    model = whisper.load_model("tiny")

    log_message("Transcribing with OpenAI Whisper...")
    result = model.transcribe(audio_path, language='en')

    # Convert to WhisperX-like format
    segments = []
    if 'segments' in result:
        for seg in result['segments']:
            segments.append({
                'start': seg['start'],
                'end': seg['end'],
                'text': seg['text']
            })
    else:
        # Fallback if no segments
        segments.append({
            'start': 0,
            'end': 0,
            'text': result.get('text', '')
        })

    # Clean up model
    del model
    gc.collect()

    return {'segments': segments}


In [9]:

# ============================================
# CODE BLOCK 1.7: WHISPERX TRANSCRIPTION
# ============================================

def transcribe_whisperx(audio_path, mode_config):
    """Use WhisperX with configurable features"""
    import whisperx
    import torch

    device = "cuda" if torch.cuda.is_available() else "cpu"

    # Load only necessary models
    log_message(f"Loading {mode_config['model']} model on {device}...")

    # Load with minimal configuration
    model = whisperx.load_model(
        mode_config['model'],
        device,
        compute_type="int8",  # Always use int8 for stability
        language='en',
        asr_options={
            "suppress_numerals": True,
            "max_new_tokens": None,
            "clip_timestamps": None,
            "hallucination_silence_threshold": None,
            "hotwords": None
        } if mode_config['skip_vad'] else {}
    )

    # Load audio
    audio = whisperx.load_audio(audio_path)

    # Transcribe with minimal batch size
    log_message("Transcribing...")
    result = model.transcribe(audio, batch_size=1)

    # Optional alignment
    if not mode_config['skip_align']:
        try:
            log_message("Aligning transcript...")
            model_a, metadata = whisperx.load_align_model(language_code='en', device=device)
            result = whisperx.align(result["segments"], model_a, metadata, audio, device)
            del model_a
        except Exception as e:
            log_message(f"Alignment failed: {e}", "WARNING")

    # Clean up
    del model
    torch.cuda.empty_cache() if device == "cuda" else None
    gc.collect()

    return result


In [10]:

# ============================================
# CODE BLOCK 1.8: AUDIO PROCESSING
# ============================================

def convert_to_wav(input_path, output_path):
    """Convert audio file to WAV format"""
    try:
        from pydub import AudioSegment
        log_message(f"Converting to WAV: {os.path.basename(input_path)}")

        audio = AudioSegment.from_file(input_path)
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(output_path, format="wav")

        log_message(f"Saved WAV to cache: {os.path.basename(output_path)}")
        return True
    except Exception as e:
        log_message(f"Conversion failed: {e}", "ERROR")
        return False

def split_audio_for_processing(wav_path, chunk_duration):
    """Split audio into smaller chunks"""
    from pydub import AudioSegment

    audio = AudioSegment.from_wav(wav_path)
    total_duration = len(audio) / 1000  # seconds

    if total_duration <= chunk_duration:
        return [(wav_path, 0)]  # No need to split

    chunks = []
    chunk_ms = chunk_duration * 1000

    for i in range(0, len(audio), chunk_ms):
        chunk = audio[i:i + chunk_ms]
        chunk_path = wav_path.replace('.wav', f'_chunk_{i//1000}.wav')
        chunk.export(chunk_path, format="wav")
        chunks.append((chunk_path, i/1000))

    log_message(f"Split into {len(chunks)} chunks of {chunk_duration}s each")
    return chunks


In [11]:

# ============================================
# CODE BLOCK 1.9: MAIN PROCESSING FUNCTION
# ============================================

def process_single_file(filepath, checkpoint):
    """Process a single file with current mode"""
    mode = checkpoint['current_mode']
    mode_config = PROCESSING_MODES[mode]
    base_name = os.path.splitext(os.path.basename(filepath))[0]

    log_message(f"\n{'='*60}")
    log_message(f"Processing: {os.path.basename(filepath)}")
    log_message(f"Mode: {mode}")
    log_message(f"{'='*60}")

    try:
        # Get or create WAV file
        wav_path = os.path.join(WAV_DIR, f"{base_name}.wav")
        if not os.path.exists(wav_path):
            if not filepath.endswith('.wav'):
                if not convert_to_wav(filepath, wav_path):
                    raise Exception("Failed to convert to WAV")
            else:
                import shutil
                shutil.copy2(filepath, wav_path)

        # Check file size and split if needed
        file_size_mb = os.path.getsize(wav_path) / (1024*1024)
        log_message(f"File size: {file_size_mb:.1f} MB")

        chunks = split_audio_for_processing(wav_path, mode_config['chunk_duration'])
        all_segments = []

        # Process each chunk
        for chunk_idx, (chunk_path, start_offset) in enumerate(chunks):
            if len(chunks) > 1:
                log_message(f"Processing chunk {chunk_idx+1}/{len(chunks)}...")

            # Choose transcription method
            if mode_config['method'] == 'whisper_api':
                result = transcribe_ultra_minimal(chunk_path)
            else:
                result = transcribe_whisperx(chunk_path, mode_config)

            # Adjust timestamps and collect segments
            for segment in result.get('segments', []):
                segment['start'] = segment.get('start', 0) + start_offset
                segment['end'] = segment.get('end', 0) + start_offset
                all_segments.append(segment)

            # Clean up chunk if it's temporary
            if chunk_path != wav_path:
                os.remove(chunk_path)

            # Force garbage collection after each chunk
            gc.collect()
            time.sleep(1)  # Brief pause

        # Save transcript
        output_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcript.txt")
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(f"# Transcription of: {os.path.basename(filepath)}\n")
            f.write(f"# Processing mode: {mode}\n")
            f.write(f"# Processed on: {datetime.now().isoformat()}\n\n")

            for seg_idx, segment in enumerate(all_segments):
                start = segment.get('start', 0)
                end = segment.get('end', 0)
                text = segment.get('text', '').strip()

                if text:  # Only write non-empty segments
                    f.write(f"[{start:.2f}-{end:.2f}] {text}\n")

        log_message(f"SUCCESS: Saved transcript to {os.path.basename(output_path)}")

        # Update checkpoint
        checkpoint['processed_files'].append(os.path.basename(filepath))
        save_checkpoint(checkpoint)

        return True

    except Exception as e:
        log_message(f"FAILED: {e}", "ERROR")

        # Log failure
        checkpoint['failed_files'][os.path.basename(filepath)] = {
            'error': str(e),
            'mode': mode,
            'timestamp': datetime.now().isoformat()
        }
        save_checkpoint(checkpoint)

        return False


In [12]:

# ============================================
# CODE BLOCK 1.10: GET PENDING FILES
# ============================================

def get_pending_files(checkpoint):
    """Get list of files not yet processed"""
    all_files = []
    supported_formats = ['.mp4', '.mp3', '.wav', '.m4a', '.flac', '.ogg']

    try:
        for filename in os.listdir(SOURCE_DIR):
            if os.path.splitext(filename)[1].lower() in supported_formats:
                all_files.append(filename)
    except Exception as e:
        log_message(f"Error reading source directory: {e}", "ERROR")
        return []

    # Filter out already processed files
    pending = []
    for f in all_files:
        if f not in checkpoint['processed_files']:
            base_name = os.path.splitext(f)[0]
            transcript_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcript.txt")

            if not os.path.exists(transcript_path):
                pending.append(f)
            else:
                # File was processed but not in checkpoint
                checkpoint['processed_files'].append(f)
                log_message(f"Found existing transcript for {f}, updating checkpoint")

    return sorted(pending)  # Sort for consistent ordering


In [13]:

# ============================================
# CODE BLOCK 1.11: MAIN EXECUTION
# ============================================

def main():
    """Main execution function"""

    # Mount Google Drive
    try:
        from google.colab import drive
        if not os.path.exists('/content/drive'):
            drive.mount('/content/drive')
            log_message("Google Drive mounted")
        else:
            log_message("Google Drive already mounted")
    except Exception as e:
        log_message(f"Could not mount Drive: {e}", "WARNING")

    # Load checkpoint
    checkpoint = load_checkpoint()

    log_message("\n" + "="*60)
    log_message(f"SESSION #{checkpoint.get('session_count', 0) + 1} STARTING")
    log_message(f"Progress: {len(checkpoint['processed_files'])} files completed")
    log_message(f"Failed: {len(checkpoint['failed_files'])} files")
    log_message(f"Current mode: {checkpoint['current_mode']}")
    log_message("="*60)

    # Get pending files
    pending_files = get_pending_files(checkpoint)
    log_message(f"Files remaining: {len(pending_files)}")

    if not pending_files:
        log_message("\nAll files processed in current mode!")

        # Check if we should upgrade mode
        modes = list(PROCESSING_MODES.keys())
        current_idx = modes.index(checkpoint['current_mode'])

        if current_idx < len(modes) - 1:
            next_mode = modes[current_idx + 1]
            log_message(f"\nUpgrading to {next_mode} mode for better quality...")
            checkpoint['current_mode'] = next_mode
            checkpoint['processed_files'] = []
            checkpoint['failed_files'] = {}
            save_checkpoint(checkpoint)
            pending_files = get_pending_files(checkpoint)
        else:
            log_message("\nALL PROCESSING COMPLETE!")
            return

    # Ensure dependencies for current mode
    ensure_dependencies(checkpoint['current_mode'])

    # Process files one by one
    processed_count = 0
    max_files_per_session = 3  # Process fewer files per session for stability

    for filename in pending_files[:max_files_per_session]:
        filepath = os.path.join(SOURCE_DIR, filename)

        # Check if file exists
        if not os.path.exists(filepath):
            log_message(f"File not found: {filename}", "WARNING")
            continue

        success = process_single_file(filepath, checkpoint)
        processed_count += 1

        # Longer cooldown between files
        if processed_count < len(pending_files):
            log_message("Cooling down for 15 seconds...")
            time.sleep(15)

        # Force garbage collection
        gc.collect()

    # Summary
    log_message("\n" + "="*60)
    log_message(f"SESSION COMPLETE")
    log_message(f"Processed this session: {processed_count}")
    log_message(f"Total completed: {len(checkpoint['processed_files'])}")
    log_message(f"Still pending: {len(pending_files) - processed_count}")
    log_message("="*60)

    if len(pending_files) > processed_count:
        log_message("\nRun this cell again to continue processing!")

    # Final cleanup
    gc.collect()


In [None]:

# ============================================
# CODE BLOCK 1.12: EXECUTE MAIN
# ============================================

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        log_message("\nProcess interrupted by user", "WARNING")
    except Exception as e:
        log_message(f"\nFATAL ERROR: {e}", "ERROR")
        import traceback
        log_message(traceback.format_exc(), "ERROR")


[2025-06-18 19:43:46] INFO: Google Drive already mounted
[2025-06-18 19:43:46] INFO: 
[2025-06-18 19:43:46] INFO: SESSION #1 STARTING
[2025-06-18 19:43:46] INFO: Progress: 0 files completed
[2025-06-18 19:43:46] INFO: Failed: 0 files
[2025-06-18 19:43:46] INFO: Current mode: ultra_minimal
[2025-06-18 19:43:46] ERROR: Error reading source directory: [Errno 2] No such file or directory: '/content/drive/MyDrive/PRUT-Transcriptions/Recordings_PRUT'
[2025-06-18 19:43:46] INFO: Files remaining: 0
[2025-06-18 19:43:46] INFO: 
All files processed in current mode!
[2025-06-18 19:43:46] INFO: 
Upgrading to minimal mode for better quality...
[2025-06-18 19:43:46] INFO: Checkpoint saved (session #1)
[2025-06-18 19:43:46] ERROR: Error reading source directory: [Errno 2] No such file or directory: '/content/drive/MyDrive/PRUT-Transcriptions/Recordings_PRUT'
[2025-06-18 19:43:46] INFO: Checking dependencies for minimal mode...
[2025-06-18 19:43:46] INFO: Installing WhisperX...


In [None]:

# ============================================
# CODE BLOCK 1.13: USAGE INSTRUCTIONS
# ============================================

"""
CRASH-PROOF TRANSCRIPTION SYSTEM

SETUP:
1. GPU Runtime: Use T4 or CPU (system adapts automatically)
2. Just run this single cell - it handles everything

AFTER CRASH:
1. Reconnect to runtime
2. Run this same cell again
3. It automatically resumes from checkpoint

PROCESSING MODES (AUTOMATIC PROGRESSION):
1. ultra_minimal: OpenAI Whisper tiny model (most stable)
2. minimal: WhisperX base model, no VAD
3. standard: WhisperX small model with VAD
4. high: WhisperX medium model with diarization

FEATURES:
- Saves after EVERY file
- Logs all operations
- 3-minute chunks for stability
- Automatic mode progression
- Crash recovery built-in

MONITORING:
- Check progress: cat /content/drive/MyDrive/PRUT-Transcriptions/checkpoint.json
- View logs: cat /content/drive/MyDrive/PRUT-Transcriptions/processing.log

TROUBLESHOOTING:
- If crashes persist, manually edit checkpoint.json:
  "current_mode": "ultra_minimal"
- Delete specific files from "processed_files" array to reprocess
"""

# V7

## First Time Setup:

python# Just run the main cell - it will:
 1. Mount Drive
 2. Install dependencies (only first time)
 3. Create all directories
 4. Start processing with minimal quality

## After a Crash:

python# Simply run the same cell again!
 It will:
 1. Read checkpoint.json
 2. Skip already processed files
 3. Continue from where it stopped

In [1]:
# WhisperX Checkpoint-Based Transcription System
# Designed to survive crashes and resume from last position

# ============================================
# SINGLE CELL - RUN THIS REPEATEDLY AFTER CRASHES
# ============================================

import os
import json
import time
import gc
import torch
from datetime import datetime

# Configuration
SOURCE_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Recordings_PRUT'
OUTPUT_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Transcripts'
WAV_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/WAV_Cache'
CHECKPOINT_FILE = '/content/drive/MyDrive/PRUT-Transcriptions/checkpoint.json'

# Create directories
for dir_path in [OUTPUT_DIR, WAV_DIR]:
    os.makedirs(dir_path, exist_ok=True)


In [5]:

# ============================================
# CHECKPOINT MANAGEMENT
# ============================================

def load_checkpoint():
    """Load progress from checkpoint file"""
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            return json.load(f)
    return {
        'processed_files': [],
        'failed_files': {},
        'current_quality': 'minimal',
        'model_loaded': None,
        'last_update': None
    }

def save_checkpoint(checkpoint):
    """Save progress to checkpoint file"""
    checkpoint['last_update'] = datetime.now().isoformat()
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump(checkpoint, f, indent=2)
    print(f"✓ Checkpoint saved at {checkpoint['last_update']}")

def get_pending_files(checkpoint):
    """Get list of files not yet processed"""
    all_files = []
    supported_formats = ['.mp4', '.mp3', '.wav', '.m4a', '.flac', '.ogg']

    for filename in os.listdir(SOURCE_DIR):
        if os.path.splitext(filename)[1].lower() in supported_formats:
            all_files.append(filename)

    # Filter out already processed files
    pending = [f for f in all_files if f not in checkpoint['processed_files']]

    # Also check if transcript exists (in case checkpoint was corrupted)
    verified_pending = []
    for f in pending:
        base_name = os.path.splitext(f)[0]
        transcript_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcript.txt")
        if not os.path.exists(transcript_path):
            verified_pending.append(f)
        else:
            # File was processed but not in checkpoint, update checkpoint
            checkpoint['processed_files'].append(f)

    return verified_pending


In [6]:

# ============================================
# QUALITY LEVELS CONFIGURATION
# ============================================

QUALITY_LEVELS = {
    'minimal': {
        'model': 'base',
        'batch_size': 1,
        'compute_type': 'int8',
        'chunk_duration': 300,  # 5 minutes
        'device': 'cuda',
        'skip_diarization': True
    },
    'standard': {
        'model': 'small',
        'batch_size': 2,
        'compute_type': 'float16',
        'chunk_duration': 600,  # 10 minutes
        'device': 'cuda',
        'skip_diarization': True
    },
    'high': {
        'model': 'medium',
        'batch_size': 4,
        'compute_type': 'float16',
        'chunk_duration': 900,  # 15 minutes
        'device': 'cuda',
        'skip_diarization': False
    },
    'maximum': {
        'model': 'large-v3',
        'batch_size': 4,
        'compute_type': 'float32',
        'chunk_duration': 1800,  # 30 minutes (full file)
        'device': 'cuda',
        'skip_diarization': False
    }
}


In [7]:

# ============================================
# DEPENDENCY INSTALLATION (ONLY IF NEEDED)
# ============================================

def ensure_dependencies():
    """Install dependencies only if not already installed"""
    try:
        import whisperx
        print("✓ WhisperX already installed")
    except ImportError:
        print("Installing WhisperX and dependencies...")
        os.system('pip install -q git+https://github.com/m-bain/whisperx.git')
        os.system('pip install -q pydub')
        print("✓ Dependencies installed")

    # Ensure ffmpeg
    if os.system('which ffmpeg > /dev/null 2>&1') != 0:
        print("Installing ffmpeg...")
        os.system('apt-get -qq update && apt-get -qq install ffmpeg')

    return True


In [8]:

# ============================================
# MODEL LOADING WITH MEMORY MANAGEMENT
# ============================================

def load_models_for_quality(quality_level):
    """Load models appropriate for quality level"""
    config = QUALITY_LEVELS[quality_level]

    # Clear any existing models first
    if 'whisperx' in globals():
        torch.cuda.empty_cache()
        gc.collect()

    # Import after ensuring dependencies
    global whisperx
    import whisperx

    print(f"\nLoading {quality_level} quality models...")
    print(f"Model: {config['model']}, Device: {config['device']}")

    try:
        # Load main model
        model = whisperx.load_model(
            config['model'],
            config['device'],
            compute_type=config['compute_type'],
            language='en'
        )

        # Load alignment model
        model_a, metadata = whisperx.load_align_model(
            language_code='en',
            device=config['device']
        )

        # Diarization model (optional)
        diarize_model = None
        if not config['skip_diarization']:
            try:
                # Try to get HF token
                from google.colab import userdata
                HF_TOKEN = userdata.get('HF_TOKEN')

                # Import and load diarization
                from pyannote.audio import Pipeline
                diarize_model = Pipeline.from_pretrained(
                    "pyannote/speaker-diarization@2.1",
                    use_auth_token=HF_TOKEN
                )
                if config['device'] == 'cuda':
                    diarize_model.to(torch.device('cuda'))
                print("✓ Diarization model loaded")
            except Exception as e:
                print(f"⚠ Diarization not available: {e}")
                diarize_model = None

        print(f"✓ Models loaded for {quality_level} quality")
        return model, model_a, metadata, diarize_model, config

    except Exception as e:
        print(f"❌ Failed to load {quality_level} models: {e}")
        if quality_level != 'minimal':
            print("Falling back to minimal quality...")
            return load_models_for_quality('minimal')
        else:
            raise e


In [9]:

# ============================================
# AUDIO PROCESSING FUNCTIONS
# ============================================

def convert_to_wav(input_path, output_path):
    """Convert audio file to WAV format"""
    try:
        from pydub import AudioSegment
        print(f"Converting to WAV: {os.path.basename(input_path)}")

        audio = AudioSegment.from_file(input_path)
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(output_path, format="wav")

        print(f"✓ Saved WAV to cache: {os.path.basename(output_path)}")
        return True
    except Exception as e:
        print(f"❌ Conversion failed: {e}")
        return False

def split_audio_file(wav_path, chunk_duration):
    """Split audio into chunks for processing"""
    from pydub import AudioSegment

    audio = AudioSegment.from_wav(wav_path)
    chunks = []

    # Calculate chunks
    total_duration = len(audio)
    chunk_ms = chunk_duration * 1000

    for i in range(0, total_duration, chunk_ms):
        chunk = audio[i:i + chunk_ms]
        chunk_path = wav_path.replace('.wav', f'_chunk_{i//1000}.wav')
        chunk.export(chunk_path, format="wav")
        chunks.append((chunk_path, i/1000))  # path and start time

    print(f"✓ Split into {len(chunks)} chunks of {chunk_duration}s each")
    return chunks

def transcribe_chunk(chunk_path, model, model_a, metadata, config):
    """Transcribe a single audio chunk"""
    audio = whisperx.load_audio(chunk_path)

    # Transcribe with retry logic
    for attempt in range(3):
        try:
            result = model.transcribe(audio, batch_size=config['batch_size'])
            break
        except RuntimeError as e:
            if "out of memory" in str(e) and attempt < 2:
                print(f"OOM - Retrying with smaller batch...")
                config['batch_size'] = max(1, config['batch_size'] // 2)
                torch.cuda.empty_cache()
                gc.collect()
            else:
                raise e

    # Align
    try:
        result = whisperx.align(
            result["segments"], model_a, metadata,
            audio, config['device'],
            return_char_alignments=False
        )
    except:
        print("⚠ Alignment failed, using unaligned segments")

    return result


In [10]:

# ============================================
# MAIN PROCESSING FUNCTION
# ============================================

def process_single_file(filepath, checkpoint, models):
    """Process a single file with checkpointing"""
    model, model_a, metadata, diarize_model, config = models
    base_name = os.path.splitext(os.path.basename(filepath))[0]

    print(f"\n{'='*60}")
    print(f"Processing: {os.path.basename(filepath)}")
    print(f"Quality: {checkpoint['current_quality']}")
    print(f"{'='*60}")

    try:
        # Step 1: Get or create WAV file
        wav_path = os.path.join(WAV_DIR, f"{base_name}.wav")
        if not os.path.exists(wav_path):
            if filepath.endswith('.wav'):
                # Copy WAV to cache
                import shutil
                shutil.copy2(filepath, wav_path)
            else:
                # Convert to WAV
                if not convert_to_wav(filepath, wav_path):
                    raise Exception("Failed to convert to WAV")
        else:
            print(f"✓ Using cached WAV file")

        # Step 2: Process in chunks if needed
        file_size = os.path.getsize(wav_path) / (1024*1024)  # MB
        print(f"File size: {file_size:.1f} MB")

        if file_size > 50 and config['chunk_duration'] < 1800:
            # Split large files
            chunks = split_audio_file(wav_path, config['chunk_duration'])
            all_segments = []

            for chunk_idx, (chunk_path, start_offset) in enumerate(chunks):
                print(f"\nProcessing chunk {chunk_idx+1}/{len(chunks)}...")

                result = transcribe_chunk(chunk_path, model, model_a, metadata, config)

                # Adjust timestamps
                for segment in result.get('segments', []):
                    segment['start'] += start_offset
                    segment['end'] += start_offset
                    all_segments.append(segment)

                # Clean up chunk
                os.remove(chunk_path)
                torch.cuda.empty_cache()
                gc.collect()

            result = {'segments': all_segments}
        else:
            # Process whole file
            result = transcribe_chunk(wav_path, model, model_a, metadata, config)

        # Step 3: Save transcript
        output_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcript.txt")
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(f"# Transcription of: {os.path.basename(filepath)}\n")
            f.write(f"# Quality level: {checkpoint['current_quality']}\n")
            f.write(f"# Processed on: {datetime.now().isoformat()}\n\n")

            for segment in result.get('segments', []):
                start = segment.get('start', 0)
                end = segment.get('end', 0)
                text = segment.get('text', '').strip()
                speaker = segment.get('speaker', 'Speaker1')

                f.write(f"{speaker} [{start:.2f}-{end:.2f}]: {text}\n")

        print(f"\n✅ SUCCESS: Saved transcript to {output_path}")

        # Update checkpoint
        checkpoint['processed_files'].append(os.path.basename(filepath))
        if os.path.basename(filepath) in checkpoint['failed_files']:
            del checkpoint['failed_files'][os.path.basename(filepath)]
        save_checkpoint(checkpoint)

        return True

    except Exception as e:
        print(f"\n❌ FAILED: {e}")

        # Record failure
        checkpoint['failed_files'][os.path.basename(filepath)] = {
            'error': str(e),
            'quality': checkpoint['current_quality'],
            'timestamp': datetime.now().isoformat()
        }
        save_checkpoint(checkpoint)

        return False


In [11]:

# ============================================
# MAIN EXECUTION LOOP
# ============================================

def main():
    """Main execution function - run this after each crash"""

    # Mount Google Drive if needed
    try:
        from google.colab import drive
        drive.mount('/content/drive', force_remount=True)
        print("✓ Google Drive mounted")
    except:
        print("⚠ Could not mount Drive - assuming already mounted")

    # Ensure dependencies
    ensure_dependencies()

    # Load checkpoint
    checkpoint = load_checkpoint()
    print(f"\n📊 Progress Report:")
    print(f"Files processed: {len(checkpoint['processed_files'])}")
    print(f"Files failed: {len(checkpoint['failed_files'])}")
    print(f"Current quality: {checkpoint['current_quality']}")

    # Get pending files
    pending_files = get_pending_files(checkpoint)
    print(f"Files remaining: {len(pending_files)}")

    if not pending_files:
        print("\n✅ All files processed!")

        # Check if we should upgrade quality
        qualities = list(QUALITY_LEVELS.keys())
        current_idx = qualities.index(checkpoint['current_quality'])

        if current_idx < len(qualities) - 1:
            next_quality = qualities[current_idx + 1]
            print(f"\n🔄 Upgrading to {next_quality} quality and reprocessing...")
            checkpoint['current_quality'] = next_quality
            checkpoint['processed_files'] = []
            save_checkpoint(checkpoint)
            pending_files = get_pending_files(checkpoint)
        else:
            print("\n🎉 All files processed at maximum quality!")
            return

    # Load models for current quality
    try:
        models = load_models_for_quality(checkpoint['current_quality'])
        checkpoint['model_loaded'] = checkpoint['current_quality']
        save_checkpoint(checkpoint)
    except Exception as e:
        print(f"❌ Failed to load models: {e}")
        return

    # Process files one by one
    processed_count = 0
    for filename in pending_files:
        filepath = os.path.join(SOURCE_DIR, filename)

        success = process_single_file(filepath, checkpoint, models)
        processed_count += 1

        # Cool down between files
        if processed_count < len(pending_files):
            print("\n⏳ Cooling down for 10 seconds...")
            time.sleep(10)

        # Check if we've processed enough for this session
        if processed_count >= 5:  # Process 5 files per run to avoid timeout
            print(f"\n⚠ Processed {processed_count} files. Run again to continue.")
            break

    # Final cleanup
    torch.cuda.empty_cache()
    gc.collect()

    print(f"\n📊 Session Summary:")
    print(f"Files processed this session: {processed_count}")
    print(f"Total files completed: {len(checkpoint['processed_files'])}")
    print(f"Files remaining: {len(pending_files) - processed_count}")
    print("\n💡 If runtime crashed, just run this cell again!")


In [2]:

# ============================================
# RUN THE MAIN FUNCTION
# ============================================

if __name__ == "__main__":
    main()


NameError: name 'main' is not defined

In [None]:

# ============================================
# USAGE INSTRUCTIONS
# ============================================
"""
SETUP:
1. Set runtime to GPU (T4 or better)
2. Add HF_TOKEN to Colab secrets (optional, for diarization)
3. Run this single cell

AFTER CRASH:
1. Reconnect to runtime
2. Run this same cell again
3. It will automatically resume from where it left off

FEATURES:
- Saves progress after each file
- Caches WAV conversions
- Starts with fast/minimal quality
- Automatically upgrades quality after all files processed
- Splits large files into chunks
- Handles out-of-memory errors gracefully

QUALITY PROGRESSION:
1. Minimal: base model, no diarization (fastest)
2. Standard: small model, better accuracy
3. High: medium model with diarization
4. Maximum: large-v3 model, best quality

FILES ARE SAVED TO:
- Transcripts: /PRUT-Transcriptions/Transcripts/
- WAV Cache: /PRUT-Transcriptions/WAV_Cache/
- Checkpoint: /PRUT-Transcriptions/checkpoint.json
"""

# V6

In [1]:
# WhisperX Transcription with Speaker Diarization
# Updated for English transcription with MP4 support

# ============================================
# STEP 1: GPU Setup and Verification
# ============================================
# First, set Runtime to GPU (T4) in Colab: Runtime > Change runtime type > GPU

import tensorflow as tf
import torch

# Verify GPU availability
tf_device = tf.test.gpu_device_name()
torch_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if tf_device != '/device:GPU:0' or torch_device.type != 'cuda':
    raise SystemError('GPU not found. Please enable GPU in Runtime settings.')

print(f'TensorFlow GPU: {tf_device}')
print(f'PyTorch GPU: {torch_device}')
print(f'CUDA available: {torch.cuda.is_available()}')

# Check GPU details
!nvidia-smi


TensorFlow GPU: /device:GPU:0
PyTorch GPU: cuda
CUDA available: True
Wed Jun 18 19:01:01 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   40C    P0             26W /   70W |     102MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+---

In [2]:

# ============================================
# STEP 2: Install Dependencies
# ============================================
!pip install -q pydub
!pip install -q git+https://github.com/m-bain/whisperx.git

# Additional dependencies for video processing
!apt-get -qq install ffmpeg


  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.6/59.6 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.4/37.4 MB[0m [31m27.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m68.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m110.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m129.3 MB/s[0m eta [36m0:00:00

In [3]:

# ============================================
# STEP 3: Import Libraries and Set Locale
# ============================================
import os
import gc
import locale
from pydub import AudioSegment
from google.colab import drive, userdata
import whisperx

# Set UTF-8 locale
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
os.environ['LC_ALL'] = 'C.UTF-8'
os.environ['LANG'] = 'C.UTF-8'

print(f"Locale encoding: {locale.getpreferredencoding()}")


Locale encoding: utf-8


In [2]:

# ============================================
# STEP 4: Mount Google Drive
# ============================================
drive.mount('/content/drive')

# Update these paths to your actual directories
SOURCE_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Recordings_PRUT'  # UPDATE THIS
OUTPUT_DIR = '/content/drive/MyDrive/PRUT-Transcriptions/Transcripts'  # UPDATE THIS
TEMP_AUDIO_DIR = '/content/temp_audio'  # Temporary directory for WAV files

# Create directories if they don't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(TEMP_AUDIO_DIR, exist_ok=True)


NameError: name 'drive' is not defined

In [6]:

# ============================================
# STEP 5: Configure WhisperX with Secrets
# ============================================
# Store your HuggingFace token in Colab secrets:
# Click the key icon in the left sidebar > Add a secret named 'HF_TOKEN'

try:
    HF_TOKEN = userdata.get('HF_TOKEN')
except:
    print("Warning: HF_TOKEN not found in secrets. Using hardcoded token.")
    HF_TOKEN = "your_huggingface_token_here"  # Fallback - replace with your token

# WhisperX configuration
device = "cuda"
batch_size = 4  # Adjust based on GPU memory (start with 4, can try 6 or 8)
compute_type = "float32"  # Options: "float32", "float16", "int8"
language = "en"  # Changed from 'de' to 'en'


In [7]:

# ============================================
# STEP 6: Load WhisperX Models
# ============================================
print("Loading WhisperX models...")

# Load main transcription model
model = whisperx.load_model("large-v3", device, language=language, compute_type=compute_type)

# Load alignment model
model_a, metadata = whisperx.load_align_model(language_code=language, device=device)

# Load diarization model
diarize_model = whisperx.DiarizationPipeline(use_auth_token=HF_TOKEN, device=device)

print("All models loaded successfully!")


Loading WhisperX models...


DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _speechbrain_save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _speechbrain_load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for load
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint save hook for _save
DEBUG:speechbrain.utils.checkpoints:Registered checkpoint load hook for _recover
INFO:pytorch_lightning.utilities.migration.utils:Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.5.1.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../usr/local/lib/python3.11/dist-packages/whisperx/assets/pytorch_model.bin`


>>Performing voice activity detection using Pyannote...
Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.6.0+cu124. Bad things might happen unless you revert torch to 1.x.


Downloading: "https://download.pytorch.org/torchaudio/models/wav2vec2_fairseq_base_ls960_asr_ls960.pth" to /root/.cache/torch/hub/checkpoints/wav2vec2_fairseq_base_ls960_asr_ls960.pth
100%|██████████| 360M/360M [00:01<00:00, 275MB/s]


AttributeError: module 'whisperx' has no attribute 'DiarizationPipeline'

In [9]:

# ============================================
# STEP 7: Audio Conversion Functions
# ============================================
def convert_to_wav(input_path, output_path):
    """Convert MP4/MP3/other formats to WAV"""
    try:
        audio = AudioSegment.from_file(input_path)
        # Convert to mono, 16kHz for WhisperX
        audio = audio.set_channels(1).set_frame_rate(16000)
        audio.export(output_path, format="wav")
        return True
    except Exception as e:
        print(f"Error converting {input_path}: {e}")
        return False


In [10]:

# ============================================
# STEP 8: Main Transcription Function
# ============================================
def transcribe_with_diarization(audio_path, min_speakers=1, max_speakers=10):
    """Transcribe audio with speaker diarization"""

    # Load audio
    audio = whisperx.load_audio(audio_path)

    # Transcribe
    print(f"Transcribing {os.path.basename(audio_path)}...")
    result = model.transcribe(audio, batch_size=batch_size)

    # Align whisper output
    print("Aligning transcript...")
    result = whisperx.align(result["segments"], model_a, metadata, audio, device,
                           return_char_alignments=False)

    # Diarize
    print("Performing speaker diarization...")
    diarize_segments = diarize_model(audio, min_speakers=min_speakers, max_speakers=max_speakers)

    # Assign speakers to words
    result = whisperx.assign_word_speakers(diarize_segments, result)

    return result


In [1]:

# ============================================
# STEP 9: Process All Files
# ============================================
# Define speaker mapping
speaker_labels = {}
speaker_counter = 1

# Supported formats
supported_formats = ['.mp4', '.mp3', '.wav', '.m4a', '.flac', '.ogg']

# Process all files
for filename in os.listdir(SOURCE_DIR):
    file_ext = os.path.splitext(filename)[1].lower()

    if file_ext in supported_formats:
        try:
            input_path = os.path.join(SOURCE_DIR, filename)
            base_name = os.path.splitext(filename)[0]

            # Convert to WAV if needed
            if file_ext != '.wav':
                print(f"\nConverting {filename} to WAV...")
                wav_path = os.path.join(TEMP_AUDIO_DIR, f"{base_name}.wav")
                if not convert_to_wav(input_path, wav_path):
                    continue
            else:
                wav_path = input_path

            # Transcribe with diarization
            result = transcribe_with_diarization(wav_path)

            # Map speakers to sequential labels
            for segment in result["segments"]:
                if 'speaker' in segment and segment['speaker'] not in speaker_labels:
                    speaker_labels[segment['speaker']] = f"Speaker{speaker_counter}"
                    speaker_counter += 1

            # Save transcription
            output_path = os.path.join(OUTPUT_DIR, f"{base_name}_transcript.txt")
            with open(output_path, 'w', encoding='utf-8') as f:
                for segment in result["segments"]:
                    speaker = speaker_labels.get(segment.get('speaker', 'Unknown'), 'Unknown')
                    start = segment['start']
                    end = segment['end']
                    text = segment['text']
                    f.write(f"{speaker} [{start:.2f}-{end:.2f}]: {text}\n")

            print(f"✓ Saved transcript to {output_path}")

            # Clean up temporary WAV file
            if file_ext != '.wav' and os.path.exists(wav_path):
                os.remove(wav_path)

        except RuntimeError as e:
            if "out of memory" in str(e):
                print(f"⚠️  Out of memory for {filename}. Clearing cache...")
                torch.cuda.empty_cache()
                gc.collect()
                # Optionally reduce batch size
                batch_size = max(1, batch_size - 1)
                print(f"Reduced batch size to {batch_size}")
                continue
            else:
                print(f"❌ Error processing {filename}: {e}")

        except Exception as e:
            print(f"❌ Error processing {filename}: {e}")

        # Clear GPU memory after each file
        torch.cuda.empty_cache()
        gc.collect()

print("\n✅ Transcription complete!")
print(f"Processed files saved to: {OUTPUT_DIR}")


NameError: name 'os' is not defined

In [None]:

# ============================================
# STEP 10: Clean Up (Optional)
# ============================================
# Run this to free GPU memory when done
del model, model_a, diarize_model
torch.cuda.empty_cache()
gc.collect()
print("GPU memory cleared")


In [None]:

# ============================================
# OPTIONAL: Advanced Configuration
# ============================================
"""
Advanced options you can modify:

1. Batch Size:
   - Start with 4
   - Increase to 6 or 8 if GPU has enough memory
   - Decrease to 2 or 1 if you get out-of-memory errors

2. Compute Type:
   - "float32": Best accuracy (default)
   - "float16": Faster, slightly less accurate
   - "int8": Fastest, least accurate

3. Model Size:
   - "large-v3": Best accuracy (current)
   - "medium": Faster, good accuracy
   - "small": Much faster, lower accuracy
   - "base": Fastest, lowest accuracy

4. Speaker Count:
   - Adjust min_speakers and max_speakers based on your audio
   - Set both to same number if you know exact speaker count

5. Language:
   - Change language parameter for other languages
   - See WhisperX documentation for supported languages
"""