# 🎙️ Audio Transcription with Google Drive Integration

This notebook allows you to transcribe audio files stored in your Google Drive using the `transcript_pkg`.

## Features:
- 📁 Transcribe single files or entire folders
- 🌐 Support for multiple languages (English, Portuguese, Auto-detect)
- 📝 Multiple output formats (TXT, SRT, VTT)
- 🚀 GPU acceleration support
- 💾 Save results directly to Google Drive
- 📊 Segment-based progress tracking with ETA
- 🔄 Connection keepalive for long transcriptions


## Tips and Best Practices

### Model Selection Guide:
- **tiny**: Fastest (39M parameters) - Good for quick drafts
- **base**: Balanced (74M parameters) - Recommended for most use cases
- **small**: Better accuracy (244M parameters) - Good for important content
- **medium**: High accuracy (769M parameters) - For professional use
- **large**: Best accuracy (1550M parameters) - When quality is critical

### Performance Tips:
1. **GPU Acceleration**: Google Colab provides free GPU access. Always check GPU is enabled: Runtime → Change runtime type → GPU
2. **Batch Processing**: Process multiple files at once for efficiency
3. **File Size**: For very long audio files (>1 hour), the notebook now includes:
   - Connection keepalive to prevent timeouts
   - Automatic checkpointing to Google Drive
   - Resume capability if kernel restarts
   - Memory management to prevent crashes

### Language Settings:
- Use `auto` for mixed-language content or when unsure
- Use specific language codes (`en`, `pt`) for better accuracy when language is known
- Enable `multilingual` mode for content with multiple languages

### Output Formats:
- **txt**: Plain text, best for reading and searching
- **srt**: Subtitle format with timestamps, compatible with most video players
- **vtt**: WebVTT format, ideal for web-based video players

### Troubleshooting:
- If transcription fails, check file format is supported
- Ensure sufficient Google Drive storage space
- For large batches, monitor Colab runtime limits
- Clear Colab disk space if needed: `!rm -rf /content/*`


## Step 1: Mount Google Drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')
print("✅ Google Drive mounted successfully!")


## Step 2: Install Dependencies

In [None]:
# Install required packages
!pip install -q faster-whisper rich tinytag scipy sounddevice numpy

# Install ffmpeg for audio processing
!apt-get -qq install -y ffmpeg

print("✅ All dependencies installed successfully!")


## Step 3: Setup transcript_pkg


In [None]:
# Setup transcript_pkg by copying the actual files
# You can either clone your repository or upload the transcript_pkg folder to Colab

import os
import shutil

# Option 1: If you have the transcript_pkg in your repository
# !git clone https://github.com/yourusername/transcriber.git /content/transcriber

# Option 2: Create a minimal version for Colab
os.makedirs('/content/transcript_pkg', exist_ok=True)

# Create __init__.py
with open('/content/transcript_pkg/__init__.py', 'w') as f:
    f.write('"""Audio transcription tools for live and file-based transcription."""\n\n__version__ = "0.1.0"\n')

print("✅ transcript_pkg directory created!")


In [None]:
# Create an enhanced file_transcribe module for Google Colab with checkpointing and segment progress
file_transcribe_code = '''"""Enhanced file transcription module for Google Colab with checkpointing, connection keepalive and segment-based progress tracking."""

import time
from pathlib import Path
from typing import Optional, List, Dict, Tuple
import numpy as np
import os
import threading
from datetime import datetime
from collections import deque
import signal
import sys
import json
import gc

from faster_whisper import WhisperModel
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn, ProgressColumn, Task
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.live import Live
from rich.prompt import Confirm
from tinytag import TinyTag

console = Console()

AUDIO_EXTENSIONS = {
    ".mp3", ".wav", ".flac", ".ogg", ".m4a", ".mp4", 
    ".aac", ".wma", ".opus", ".webm", ".mkv", ".avi", ".mov", ".m4v"
}

LANGUAGE_MAP = {
    "en": "en",
    "pt": "pt", 
    "auto": None
}

# Colab keepalive settings
COLAB_KEEPALIVE_INTERVAL = 20  # seconds (reduced for better keepalive)
CHECKPOINT_SAVE_INTERVAL = 30  # Save checkpoint every 30 segments

def format_timestamp(seconds: float) -> str:
    """Convert seconds to timestamp format."""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = seconds % 60
    return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"

def format_duration(seconds: float) -> str:
    """Format duration in human-readable format."""
    if seconds < 60:
        return f"{seconds:.1f}s"
    elif seconds < 3600:
        return f"{seconds/60:.1f}m"
    else:
        return f"{seconds/3600:.1f}h"

def format_duration_hhmmss(seconds: float) -> str:
    """Format duration in seconds as HH:MM:SS."""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = int(seconds % 60)
    return f"{hours:02d}:{minutes:02d}:{secs:02d}"

def get_audio_duration(file_path: Path) -> float:
    """Get actual audio duration using TinyTag."""
    try:
        tag = TinyTag.get(file_path)
        return tag.duration or 0.0
    except Exception:
        return 0.0  # Return 0 if duration can't be read

class TranscriptionCheckpoint:
    """Manage transcription progress checkpoints for Google Drive."""
    
    def __init__(self, checkpoint_dir: Path):
        """Initialize checkpoint manager."""
        self.checkpoint_dir = checkpoint_dir
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = self.checkpoint_dir / "transcription_state.json"
        self.state = {
            "files_completed": [],
            "current_file": None,
            "current_file_segments": [],
            "current_file_position": 0,
            "total_files": 0,
            "model_size": None,
            "language": None,
            "output_format": None,
            "multilingual": False,
            "timestamp": None
        }
        
    def load(self) -> bool:
        """Load checkpoint state from Google Drive."""
        if self.state_file.exists():
            try:
                with open(self.state_file, 'r') as f:
                    self.state = json.load(f)
                return True
            except Exception as e:
                console.print(f"[yellow]Warning: Could not load checkpoint: {e}[/yellow]")
                return False
        return False
        
    def save(self):
        """Save checkpoint state to Google Drive."""
        self.state["timestamp"] = datetime.now().isoformat()
        try:
            with open(self.state_file, 'w') as f:
                json.dump(self.state, f, indent=2)
            # Force sync to Google Drive
            os.sync()
        except Exception as e:
            console.print(f"[yellow]Warning: Could not save checkpoint: {e}[/yellow]")
            
    def mark_file_complete(self, file_name: str):
        """Mark a file as completed."""
        if file_name not in self.state["files_completed"]:
            self.state["files_completed"].append(file_name)
        self.state["current_file"] = None
        self.state["current_file_segments"] = []
        self.state["current_file_position"] = 0
        self.save()
        
    def set_current_file(self, file_name: str):
        """Set the current file being processed."""
        self.state["current_file"] = file_name
        self.save()
        
    def update_progress(self, segment_data: dict, position: float):
        """Update progress for current file."""
        self.state["current_file_segments"].append(segment_data)
        self.state["current_file_position"] = position
        # Save every 10 segments
        if len(self.state["current_file_segments"]) % 10 == 0:
            self.save()
            
    def get_resume_info(self) -> dict:
        """Get information for resuming transcription."""
        return {
            "completed_files": self.state["files_completed"],
            "current_file": self.state["current_file"],
            "current_position": self.state["current_file_position"],
            "segments": self.state["current_file_segments"]
        }
        
    def clear(self):
        """Clear all checkpoint data."""
        try:
            if self.state_file.exists():
                self.state_file.unlink()
            # Clear state
            self.state = {
                "files_completed": [],
                "current_file": None,
                "current_file_segments": [],
                "current_file_position": 0,
                "total_files": 0,
                "model_size": None,
                "language": None,
                "output_format": None,
                "multilingual": False,
                "timestamp": None
            }
        except Exception as e:
            console.print(f"[yellow]Warning: Could not clear checkpoint: {e}[/yellow]")

class ColabKeepalive:
    """Keep Colab connection alive by periodically outputting status."""
    
    def __init__(self, interval: int = COLAB_KEEPALIVE_INTERVAL):
        self.interval = interval
        self.running = False
        self.thread = None
        self.last_status = ""
        
    def start(self):
        """Start the keepalive thread."""
        self.running = True
        self.thread = threading.Thread(target=self._keepalive_loop, daemon=True)
        self.thread.start()
        
    def stop(self):
        """Stop the keepalive thread."""
        self.running = False
        if self.thread:
            self.thread.join(timeout=2)
            
    def update_status(self, status: str):
        """Update the status message."""
        self.last_status = status
        
    def _keepalive_loop(self):
        """Main keepalive loop."""
        from IPython.display import clear_output
        while self.running:
            # Output a minimal update to keep connection alive
            timestamp = datetime.now().strftime('%H:%M:%S')
            # Use IPython display for better Colab compatibility
            clear_output(wait=True)
            print(f"⏳ {timestamp} - {self.last_status}")
            time.sleep(self.interval)

class RemainingAudioDurationColumn(ProgressColumn):
    """Custom column showing the calculated ETA with a live countdown."""

    def __init__(self):
        super().__init__()
        self.task_etas = {}  # task_id -> (eta_in_seconds, last_update_time)
        self.lock = threading.Lock()

    def set_eta(self, task_id: int, eta: float):
        """Set the ETA for a specific task."""
        with self.lock:
            self.task_etas[task_id] = (eta, time.time())

    def render(self, task: Task) -> Text:
        """Render the ETA by calculating the remaining time since the last update."""
        with self.lock:
            eta_info = self.task_etas.get(task.id)
            if eta_info:
                eta, last_update_time = eta_info
                time_elapsed = time.time() - last_update_time
                current_eta = eta - time_elapsed

                if current_eta > 0:
                    return Text(format_duration_hhmmss(current_eta), style="bold")

        return Text("--:--:--", style="bold")

class StreamingTranscriptionWriter:
    """Handles streaming output of transcription data."""
    
    def __init__(self, output_file: Path, format_type: str, multilingual: bool = False):
        self.output_file = output_file
        self.format_type = format_type
        self.multilingual = multilingual
        self.segment_count = 0
        self.file_handle = None
        self._initialize_file()
    
    def _initialize_file(self):
        """Initialize the output file with headers if needed."""
        self.output_file.parent.mkdir(parents=True, exist_ok=True)
        
        if self.format_type == "txt":
            self.file_handle = open(self.output_file, "w", encoding="utf-8")
            if self.multilingual:
                self.file_handle.write("[Multilingual transcription - language shown in brackets]\\\\n\\\\n")
        elif self.format_type == "vtt":
            self.file_handle = open(self.output_file, "w", encoding="utf-8")
            self.file_handle.write("WEBVTT\\\\n\\\\n")
        elif self.format_type == "srt":
            self.file_handle = open(self.output_file, "w", encoding="utf-8")
    
    def write_segment(self, segment):
        """Write a single segment to the output file."""
        if not self.file_handle:
            return
        
        self.segment_count += 1
        
        if self.format_type == "txt":
            if self.multilingual and hasattr(segment, "language"):
                self.file_handle.write(f"[{segment.language}] {segment.text.strip()}\\\\n")
            else:
                self.file_handle.write(f"{segment.text.strip()}\\\\n")
        
        elif self.format_type == "srt":
            self.file_handle.write(f"{self.segment_count}\\\\n")
            start = format_timestamp(segment.start).replace(".", ",")
            end = format_timestamp(segment.end).replace(".", ",")
            self.file_handle.write(f"{start} --> {end}\\\\n")
            if self.multilingual and hasattr(segment, "language"):
                self.file_handle.write(f"[{segment.language}] {segment.text.strip()}\\\\n\\\\n")
            else:
                self.file_handle.write(f"{segment.text.strip()}\\\\n\\\\n")
        
        elif self.format_type == "vtt":
            start = format_timestamp(segment.start)
            end = format_timestamp(segment.end)
            self.file_handle.write(f"{start} --> {end}\\\\n")
            if self.multilingual and hasattr(segment, "language"):
                self.file_handle.write(f"[{segment.language}] {segment.text.strip()}\\\\n\\\\n")
            else:
                self.file_handle.write(f"{segment.text.strip()}\\\\n\\\\n")
        
        self.file_handle.flush()
    
    def close(self):
        """Close the output file."""
        if self.file_handle:
            self.file_handle.close()

def detect_device_and_compute_type() -> Tuple[str, str]:
    """Detect if GPU is available and return appropriate device and compute type."""
    try:
        import torch
        if torch.cuda.is_available():
            gpu_name = torch.cuda.get_device_name(0)
            console.print(f"[green]✓[/green] GPU detected: {gpu_name}")
            return "cuda", "float16"
    except ImportError:
        pass
    
    console.print("[yellow]ℹ[/yellow] Using CPU for transcription")
    return "cpu", "int8"

def load_whisper_model_with_retry(model_size: str, device: str, compute_type: str, max_retries: int = 3):
    """Load Whisper model with retry logic for handling Hugging Face connectivity issues."""
    import time
    
    for attempt in range(max_retries):
        try:
            console.print(f"[cyan]Loading model (attempt {attempt + 1}/{max_retries})...[/cyan]")
            
            # Try different loading strategies
            try:
                # First try: Normal loading with local_files_only=False
                model = WhisperModel(
                    model_size, 
                    device=device, 
                    compute_type=compute_type,
                    local_files_only=False
                )
                return model
            except Exception as e1:
                console.print(f"[yellow]Standard loading failed: {str(e1)[:100]}...[/yellow]")
                
                # Second try: Download path explicitly
                try:
                    # Set environment variable to avoid the HF token warning
                    os.environ["HF_HUB_DISABLE_TELEMETRY"] = "1"
                    
                    # Try with download_root
                    download_root = "/content/whisper_models"
                    os.makedirs(download_root, exist_ok=True)
                    
                    model = WhisperModel(
                        model_size,
                        device=device,
                        compute_type=compute_type,
                        download_root=download_root
                    )
                    return model
                except Exception as e2:
                    console.print(f"[yellow]Download root method failed: {str(e2)[:100]}...[/yellow]")
                    
                    # Third try: Use alternative loading
                    if attempt < max_retries - 1:
                        wait_time = (attempt + 1) * 5
                        console.print(f"[yellow]Waiting {wait_time} seconds before retry...[/yellow]")
                        time.sleep(wait_time)
                    else:
                        raise Exception(f"Failed to load model after {max_retries} attempts")
                        
        except Exception as e:
            if attempt == max_retries - 1:
                console.print(f"[red]Failed to load model: {e}[/red]")
                console.print("[yellow]Troubleshooting tips:[/yellow]")
                console.print("1. Try restarting the Colab runtime")
                console.print("2. Check your internet connection")
                console.print("3. Try a different model size (e.g., 'tiny' or 'small')")
                console.print("4. Clear Colab cache: !rm -rf /root/.cache/huggingface")
                raise
    
    return None

def transcribe_file_with_progress(
    audio_file: Path,
    model: WhisperModel,
    output_dir: Path,
    language: str = "auto",
    output_format: str = "txt",
    multilingual: bool = False,
    keepalive: Optional[ColabKeepalive] = None,
    progress: Optional[Progress] = None,
    task_id: Optional[int] = None,
    eta_column: Optional[RemainingAudioDurationColumn] = None,
    checkpoint: Optional[TranscriptionCheckpoint] = None,
    resume_position: float = 0
) -> Dict:
    """Transcribe a single audio file with segment-based progress tracking."""
    
    # Get audio duration
    audio_duration = get_audio_duration(audio_file)
    
    # Prepare output file
    output_file = output_dir / f"{audio_file.stem}.{output_format}"
    
    # Create writer
    writer = StreamingTranscriptionWriter(output_file, output_format, multilingual)
    
    # Start transcription
    console.print(f"[cyan]Processing:[/cyan] {audio_file.name} ({format_duration(audio_duration)})")
    start_time = time.time()
    
    # Update keepalive status
    if keepalive:
        keepalive.update_status(f"Transcribing {audio_file.name}")
    
    try:
        # Transcribe
        segments_iterator, info = model.transcribe(
            str(audio_file),
            language=LANGUAGE_MAP.get(language),
            beam_size=5,
            vad_filter=True,
            vad_parameters=dict(min_silence_duration_ms=500),
            multilingual=multilingual
        )
        
        # Process segments with progress tracking
        segment_count = 0
        audio_position = 0
        session_start_time = time.time()
        
        # Update progress bar total if available
        if progress and task_id is not None:
            progress.update(task_id, total=info.duration)
        
        for segment in segments_iterator:
            # Skip if resuming and segment is before resume position
            if resume_position > 0 and segment.end <= resume_position:
                continue
                
            # Write segment
            writer.write_segment(segment)
            segment_count += 1
            audio_position = segment.end
            
            # Update checkpoint
            if checkpoint:
                segment_data = {
                    "text": segment.text,
                    "start": segment.start,
                    "end": segment.end,
                    "language": getattr(segment, "language", None)
                }
                checkpoint.update_progress(segment_data, audio_position)
            
            # Update progress
            if progress and task_id is not None:
                progress.update(task_id, completed=audio_position)
                
                # Calculate and update ETA
                if eta_column and audio_position > 5:  # Wait for 5 seconds before calculating ETA
                    elapsed_time = time.time() - session_start_time
                    rate = audio_position / elapsed_time
                    if rate > 0:
                        remaining_audio = info.duration - audio_position
                        eta = remaining_audio / rate
                        eta_column.set_eta(task_id, eta)
            
            # Update keepalive with segment info
            if keepalive and segment_count % 5 == 0:  # Update every 5 segments
                keepalive.update_status(
                    f"Transcribing {audio_file.name} - {segment_count} segments, "
                    f"{audio_position:.1f}s/{info.duration:.1f}s ({audio_position/info.duration*100:.1f}%)"
                )
                
                            # Periodic memory cleanup to prevent kernel crashes
            if segment_count % 100 == 0:
                gc.collect()
                try:
                    import torch
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                except:
                    pass
        
        # Close writer
        writer.close()
        
        # Calculate stats
        process_time = time.time() - start_time
        speed = info.duration / process_time if process_time > 0 else 0
        
        # Final progress update
        if progress and task_id is not None:
            progress.update(task_id, completed=info.duration)
        
        console.print(
            f"[green]✓[/green] {audio_file.name} "
            f"[dim]({info.duration:.1f}s @ {speed:.1f}x speed, {segment_count} segments)[/dim]"
        )
        
        return {
            "success": True,
            "file": audio_file.name,
            "duration": info.duration,
            "segments": segment_count,
            "process_time": process_time,
            "speed": speed,
            "output_file": str(output_file),
            "detected_language": info.language if language == "auto" else language
        }
        
    except Exception as e:
        writer.close()
        console.print(f"[red]✗[/red] Error processing {audio_file.name}: {e}")
        return {
            "success": False,
            "file": audio_file.name,
            "error": str(e)
        }

def find_audio_files(input_path: Path) -> List[Path]:
    """Find all audio files in the given path."""
    audio_files = []
    
    if input_path.is_file():
        if input_path.suffix.lower() in AUDIO_EXTENSIONS:
            audio_files.append(input_path)
    else:
        for ext in AUDIO_EXTENSIONS:
            audio_files.extend(input_path.glob(f"*{ext}"))
            audio_files.extend(input_path.glob(f"*{ext.upper()}"))
    
    return sorted(audio_files)

def transcribe_folder(
    input_path: Path,
    output_path: Path,
    model_size: str = "base",
    language: str = "auto",
    output_format: str = "txt",
    multilingual: bool = False,
    checkpoint_dir: Optional[Path] = None
) -> List[Dict]:
    """Transcribe all audio files in a folder with checkpoint support."""
    
    # Set checkpoint directory
    if checkpoint_dir is None:
        checkpoint_dir = output_path / ".checkpoints"
    
    # Initialize checkpoint
    checkpoint = TranscriptionCheckpoint(checkpoint_dir)
    
    # Check for existing checkpoint
    resume_info = None
    if checkpoint.load():
        console.print(Panel(
            f"[yellow]Found previous transcription checkpoint[/yellow]\\\\n"
            f"Completed files: {len(checkpoint.state['files_completed'])}\\\\n"
            f"Timestamp: {checkpoint.state['timestamp']}",
            title="[bold cyan]Resume Transcription?[/bold cyan]",
            border_style="cyan"
        ))
        
        try:
            # In Colab, default to resume to avoid losing progress
            resume = True
            console.print("[green]Resuming from checkpoint...[/green]")
        except:
            resume = True
            
        if resume:
            resume_info = checkpoint.get_resume_info()
        else:
            checkpoint.clear()
    
    # Find audio files
    audio_files = find_audio_files(input_path)
    
    if not audio_files:
        console.print("[red]No audio files found![/red]")
        return []
    
    # Filter out completed files if resuming
    if resume_info and resume_info["completed_files"]:
        original_count = len(audio_files)
        audio_files = [f for f in audio_files if f.name not in resume_info["completed_files"]]
        console.print(f"[green]Skipping {original_count - len(audio_files)} already completed files[/green]")
    
    console.print(f"[green]Found {len(audio_files)} audio files to process[/green]")
    
    # Analyze total audio duration
    total_duration = 0
    console.print("[cyan]Analyzing audio files...[/cyan]")
    for audio_file in audio_files:
        duration = get_audio_duration(audio_file)
        total_duration += duration
        console.print(f"  • {audio_file.name}: {format_duration(duration)}")
    
    console.print(f"[green]Total audio duration: {format_duration(total_duration)}[/green]\\\\n")
    
    # Create output directory
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Load model with retry logic
    device, compute_type = detect_device_and_compute_type()
    console.print(f"[cyan]Loading Whisper {model_size} model...[/cyan]")
    
    try:
        with console.status("[bold cyan]Loading model...[/bold cyan]"):
            model = load_whisper_model_with_retry(model_size, device=device, compute_type=compute_type)
        
        console.print(f"[green]✓[/green] Model loaded successfully!")
    except Exception as e:
        console.print(f"[red]Error loading model: {e}[/red]")
        console.print("\\\\n[yellow]Alternative solution:[/yellow]")
        console.print("Try running this command first to clear cache:")
        console.print("[cyan]!rm -rf /root/.cache/huggingface[/cyan]")
        console.print("Then restart the runtime and try again.")
        return []
    
    # Initialize keepalive
    keepalive = ColabKeepalive()
    keepalive.start()
    
    # Process files with enhanced progress
    results = []
    eta_column = RemainingAudioDurationColumn()
    
    try:
        with Progress(
            SpinnerColumn(spinner_name="dots12", style="cyan"),
            TextColumn("[bold blue]{task.description}"),
            BarColumn(bar_width=40, style="cyan", complete_style="green"),
            TaskProgressColumn(),
            "•",
            TimeElapsedColumn(),
            "•",
            eta_column,
            console=console,
            refresh_per_second=1
        ) as progress:
            # Main task for overall progress
            main_task = progress.add_task(
                f"[cyan]Overall Progress ({len(audio_files)} files)", 
                total=total_duration
            )
            
            # Individual file task
            file_task = progress.add_task("[yellow]Current file", visible=False)
            
            total_processed = 0
            
            for idx, audio_file in enumerate(audio_files, 1):
                # Check if this is the file we need to resume
                resume_position = 0
                if resume_info and resume_info["current_file"] == audio_file.name:
                    resume_position = resume_info["current_position"]
                    console.print(f"[yellow]Resuming {audio_file.name} from position {resume_position:.1f}s[/yellow]")
                
                # Set current file in checkpoint
                checkpoint.set_current_file(audio_file.name)
                
                # Update file task
                progress.update(
                    file_task, 
                    description=f"[yellow]File {idx}/{len(audio_files)}: {audio_file.name}",
                    visible=True,
                    completed=resume_position
                )
                
                try:
                    # Transcribe with progress
                    result = transcribe_file_with_progress(
                        audio_file, model, output_path, 
                        language, output_format, multilingual,
                        keepalive, progress, file_task, eta_column,
                        checkpoint, resume_position
                    )
                    
                    results.append(result)
                    
                    # Update overall progress
                    if result["success"]:
                        total_processed += result["duration"]
                        progress.update(main_task, completed=total_processed)
                        # Mark file as complete in checkpoint
                        checkpoint.mark_file_complete(audio_file.name)
                    
                    # Hide file task between files
                    progress.update(file_task, visible=False)
                    
                    # Force save checkpoint after each file
                    checkpoint.save()
                    
                    # Memory cleanup between files
                    gc.collect()
                    try:
                        import torch
                        if torch.cuda.is_available():
                            torch.cuda.empty_cache()
                    except:
                        pass
                        
                except Exception as e:
                    console.print(f"[red]Error processing {audio_file.name}: {e}[/red]")
                    results.append({
                        "success": False,
                        "file": audio_file.name,
                        "error": str(e)
                    })
                    # Continue with next file even if one fails
                    continue
            
            # Ensure main task shows completion
            progress.update(main_task, completed=total_duration)
    
    finally:
        # Stop keepalive
        keepalive.stop()
        print()  # Clear the keepalive line
    
    return results

def display_results_summary(results: List[Dict]):
    """Display a summary of transcription results."""
    if not results:
        return
    
    # Create summary table
    table = Table(title="📊 Transcription Results", show_header=True, header_style="bold cyan")
    table.add_column("File", style="cyan", width=30)
    table.add_column("Status", style="green")
    table.add_column("Duration", justify="right")
    table.add_column("Segments", justify="right")
    table.add_column("Speed", justify="right")
    
    successful = 0
    failed = 0
    total_duration = 0
    total_time = 0
    
    for result in results:
        if result["success"]:
            successful += 1
            total_duration += result["duration"]
            total_time += result["process_time"]
            
            table.add_row(
                result["file"][:30] + "..." if len(result["file"]) > 30 else result["file"],
                "✅ Success",
                format_duration(result["duration"]),
                str(result["segments"]),
                f"{result['speed']:.1f}x"
            )
        else:
            failed += 1
            error_msg = str(result["error"])[:20] + "..." if len(str(result["error"])) > 20 else str(result["error"])
            table.add_row(
                result["file"][:30] + "..." if len(result["file"]) > 30 else result["file"],
                f"❌ {error_msg}",
                "-",
                "-",
                "-"
            )
    
    console.print("\\\\n")
    console.print(table)
    
    # Summary statistics
    if successful > 0:
        avg_speed = total_duration / total_time if total_time > 0 else 0
        console.print("\\\\n")
        console.print(Panel(
            f"[bold]Summary:[/bold]\\\\n\\\\n"
            f"✅ Successful: {successful} files\\\\n"
            f"❌ Failed: {failed} files\\\\n"
            f"⏱️  Total audio: {format_duration(total_duration)}\\\\n"
            f"⚡ Processing time: {format_duration(total_time)}\\\\n"
            f"🚀 Average speed: {avg_speed:.1f}x realtime",
            title="[bold green]Transcription Complete![/bold green]",
            border_style="green"
        ))
'''

with open('/content/transcript_pkg/file_transcribe.py', 'w') as f:
    f.write(file_transcribe_code)

print("✅ Enhanced file_transcribe.py created with segment progress tracking and keepalive!")


### ⚠️ Troubleshooting: Model Loading Issues

If you encounter a **"502 Bad Gateway"** or similar error when loading the model, this is usually due to Hugging Face connectivity issues. The notebook now includes retry logic to handle this automatically. However, if the issue persists:


In [None]:
# Clear Hugging Face cache if you encounter model loading errors
# Uncomment and run if needed:
# !rm -rf /root/.cache/huggingface
# !rm -rf /content/whisper_models

# After running this, restart the runtime: Runtime → Restart runtime


## Step 4: Configure Transcription Settings

Modify the parameters below according to your needs:


In [None]:
# Configuration parameters - MODIFY THESE AS NEEDED

# Input path in Google Drive (can be a file or folder)
INPUT_PATH = "/content/drive/MyDrive/AudioFiles"  # @param {type:"string"}

# Output path in Google Drive  
OUTPUT_PATH = "/content/drive/MyDrive/Transcriptions"  # @param {type:"string"}

# Model size: tiny, base, small, medium, large
MODEL_SIZE = "base"  # @param ["tiny", "base", "small", "medium", "large"]

# Language: en (English), pt (Portuguese), auto (auto-detect)
LANGUAGE = "auto"  # @param ["auto", "en", "pt"]

# Output format: txt, srt, vtt
OUTPUT_FORMAT = "txt"  # @param ["txt", "srt", "vtt"]

# Enable multilingual mode (shows language for each segment)
MULTILINGUAL = False  # @param {type:"boolean"}

print("📋 Configuration:")
print(f"  Input: {INPUT_PATH}")
print(f"  Output: {OUTPUT_PATH}")
print(f"  Model: {MODEL_SIZE}")
print(f"  Language: {LANGUAGE}")
print(f"  Format: {OUTPUT_FORMAT}")
print(f"  Multilingual: {MULTILINGUAL}")


In [None]:
# Optional: Clear checkpoints to start fresh
# Uncomment the lines below if you want to remove previous checkpoint data

# import shutil
# checkpoint_path = Path(OUTPUT_PATH) / ".checkpoints"
# if checkpoint_path.exists():
#     shutil.rmtree(checkpoint_path)
#     print("✅ Checkpoints cleared!")
# else:
#     print("ℹ️ No checkpoints found.")


## Step 5: Run Transcription

Execute the cell below to start the transcription process. The notebook will:
- Keep your Colab connection alive during long transcriptions
- Show segment-by-segment progress with ETA
- Automatically handle connection issues


In [None]:
from pathlib import Path
import sys
sys.path.append('/content')

from transcript_pkg.file_transcribe import transcribe_folder, display_results_summary
from rich.console import Console
from rich.panel import Panel

console = Console()

# Convert paths
input_path = Path(INPUT_PATH)
output_path = Path(OUTPUT_PATH)

# Check if input exists
if not input_path.exists():
    console.print(f"[red]Error: Input path does not exist: {INPUT_PATH}[/red]")
    console.print("[yellow]Please check your INPUT_PATH and ensure the folder/file exists in your Google Drive.[/yellow]")
else:
    # Run transcription
    console.print(Panel(
        "[bold green]Starting Transcription Process[/bold green]\n\n"
        "This may take a while depending on the size and number of files.\n"
        "The process will use GPU acceleration if available.\n\n"
        "[cyan]Features:[/cyan]\n"
        "• Segment-by-segment progress tracking\n"
        "• Real-time ETA calculations\n"
        "• Connection keepalive for long files\n"
        "• Automatic checkpointing to Google Drive\n"
        "• Resume capability if kernel restarts\n"
        "• Memory management to prevent crashes",
        title="🎙️ Transcription",
        border_style="green"
    ))
    
    results = transcribe_folder(
        input_path=input_path,
        output_path=output_path,
        model_size=MODEL_SIZE,
        language=LANGUAGE,
        output_format=OUTPUT_FORMAT,
        multilingual=MULTILINGUAL
    )
    
    # Display results summary
    if results:
        display_results_summary(results)
        
        # Show output location
        console.print(f"\n📁 [bold cyan]Output files saved to:[/bold cyan] {output_path}")
        console.print("[dim]You can find your transcriptions in the specified Google Drive folder.[/dim]")


## Step 6: View Transcription Files

List and preview the transcribed files:


In [None]:
# List all transcription files created
import os
from rich.table import Table

if output_path.exists():
    files = list(output_path.glob(f"*.{OUTPUT_FORMAT}"))
    
    if files:
        # Create a table to display files
        table = Table(title=f"📄 Transcription Files ({len(files)} total)", show_header=True)
        table.add_column("File Name", style="cyan")
        table.add_column("Size", justify="right", style="yellow")
        table.add_column("Path", style="dim")
        
        for file in sorted(files):
            size_kb = os.path.getsize(file) / 1024
            table.add_row(
                file.name,
                f"{size_kb:.1f} KB",
                str(file.relative_to(Path("/content/drive")))
            )
        
        console.print(table)
    else:
        console.print("[yellow]No transcription files found in the output directory.[/yellow]")
else:
    console.print("[red]Output directory does not exist.[/red]")


## Step 7: Preview a Transcription

Preview the content of the first transcription file:


In [None]:
# Preview a transcription file
if output_path.exists():
    files = list(output_path.glob(f"*.{OUTPUT_FORMAT}"))
    
    if files:
        # Let user select which file to preview
        console.print(f"\n[cyan]Found {len(files)} transcription file(s). Showing preview of the first one.[/cyan]")
        
        # Get the first file
        preview_file = sorted(files)[0]
        
        console.print(f"\n[bold]File: {preview_file.name}[/bold]\n")
        
        # Read and display content (limit to first 1000 characters for preview)
        try:
            with open(preview_file, 'r', encoding='utf-8') as f:
                content = f.read()
                preview_length = 1000
                
                if len(content) > preview_length:
                    preview = content[:preview_length] + "\n\n[... truncated for preview ...]"
                else:
                    preview = content
                
                console.print(Panel(
                    preview,
                    title=f"📄 {preview_file.name}",
                    border_style="blue",
                    padding=(1, 2)
                ))
                
                console.print(f"\n[dim]Full file location: {preview_file}[/dim]")
                console.print(f"[dim]Total content length: {len(content)} characters[/dim]")
                
        except Exception as e:
            console.print(f"[red]Error reading file: {e}[/red]")
    else:
        console.print("[yellow]No transcription files found to preview.[/yellow]")
else:
    console.print("[red]Output directory does not exist.[/red]")
