In [None]:
!pip install imageio-ffmpeg torch torchvision

In [None]:
import os
import sys
import logging
import tempfile
import shutil
import subprocess
import traceback
import gc
from typing import List, Set, Dict, Optional, Any
from dataclasses import dataclass, field
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
import threading
from dataclasses import asdict
from sagemaker.pytorch import PyTorchProcessor
from sagemaker import get_execution_role
from sagemaker.processing import ProcessingOutput
from datetime import datetime
import asyncio
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
import nest_asyncio

nest_asyncio.apply()

In [None]:
mp.set_start_method("spawn", force=True) 

_PIPELINE = None
_PIPELINE_DEVICE = None

In [None]:
# Import torch first to avoid registration conflicts
import torch
import torchvision  # Import this explicitly before transformers

# Env setup
import os
import imageio_ffmpeg, os
os.environ["PATH"] = os.path.dirname(imageio_ffmpeg.get_ffmpeg_exe()) + os.pathsep + os.environ.get("PATH","")
os.environ.setdefault("TRANSFORMERS_CACHE", "/tmp/transformers_cache")
os.environ.setdefault("HF_HOME", "/tmp/hf_home")
os.environ.setdefault("TORCH_HOME", "/tmp/torch_home")

import boto3
from botocore.exceptions import ClientError
import pandas as pd
from tqdm import tqdm
import imageio_ffmpeg
from transformers import pipeline
import argparse, os, logging

In [None]:
# Manually set the access and secret keys here so we don't have to deal with gnarly IAM stuff
os.environ["AWS_ACCESS_KEY_ID"] = "AKIA4T4OB5FWRFY3DRPR"
os.environ["AWS_SECRET_ACCESS_KEY"] = "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"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

In [None]:
# Set batch size and num workers here
ASR_WORKERS = 1
BATCH_SIZE = 4

In [None]:
# Verify that having set S3 keys, we can indeed access the bucket
s3 = boto3.client("s3")
print(s3.list_objects_v2(Bucket="asrelder-data")["KeyCount"])

In [None]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
    force=True,
)
logger = logging.getLogger("asr_pipeline")

In [None]:
# =============================================== Config ===============================================
@dataclass
class Config:
    """Configuration for the transcription pipeline"""
    # S3 settings
    s3_input: str = "s3://asrelder-data/common_voice/23/cv-corpus-23.0-2025-09-05/en/clips/"
    output_local_csv: str = "./transcripts_from_prefix.csv"
    write_back_to_s3: bool = False
    output_s3_uri: Optional[str] = None
    #output_s3_uri: str = "s3://asrelder-data/outputs/transcripts_from_prefix.csv" uncomment if we want it back on s3
    validation_csv_path: str = "common_voices_23_train_with_validated_votes.csv"
    validation_csv_column: str = "path"

    # Processing settings
    max_files: Optional[int] = 1500
    download_workers: int = 8  # 24
    append_every_n: int = 200
    resume_from_csv: bool = True

    # Model settings
    model_id: str = "openai/whisper-base"
    language: Optional[str] = "en"
    task: str = "transcribe"
    chunk_length_s: int = 30
    stride_length_s: tuple = (5, 5)

    # File settings
    audio_extensions: List[str] = field(default_factory=lambda: [".mp3", ".wav", ".flac", ".m4a", ".ogg"])

In [None]:
def _build_worker_pipeline(cfg_dict, device_id):
    use_cuda = torch.cuda.is_available() and device_id >= 0
    dtype = torch.float16 if use_cuda else torch.float32
    generate_kwargs = {}
    if cfg_dict.get("language"): generate_kwargs["language"] = cfg_dict["language"]
    if cfg_dict.get("task"):     generate_kwargs["task"] = cfg_dict["task"]
    return pipeline(
        "automatic-speech-recognition",
        model=cfg_dict["model_id"],
        device=(device_id if use_cuda else -1),
        torch_dtype=dtype,
        return_timestamps=True,
        chunk_length_s=cfg_dict["chunk_length_s"],
        stride_length_s=tuple(cfg_dict["stride_length_s"]),
        generate_kwargs=(generate_kwargs or None),
    )

def _transcribe_worker(args):
    # args: (audio_path, cfg_dict, device_id)
    audio_path, cfg_dict, device_id = args
    global _PIPELINE, _PIPELINE_DEVICE
    if (_PIPELINE is None) or (_PIPELINE_DEVICE != device_id):
        _PIPELINE = _build_worker_pipeline(cfg_dict, device_id)
        _PIPELINE_DEVICE = device_id
    try:
        out = _PIPELINE(audio_path)
        text = out.get("text", "") if isinstance(out, dict) else str(out)
        return {"path": audio_path, "text": text, "error": ""}
    except Exception as e:
        return {"path": audio_path, "text": "", "error": f"{type(e).__name__}: {e}"}

In [None]:
# =============================================== Core Components ===============================================
class FFmpegSetup:
    """Manages FFmpeg availability"""

    @staticmethod
    def ensure_available() -> Optional[str]:
        """Check if ffmpeg is available on PATH"""
        ff = None
        try:
            ff = imageio_ffmpeg.get_ffmpeg_exe()
        except Exception as e:
            logger.warning(f"imageio-ffmpeg error: {e}")

        if ff and os.path.exists(ff):
            ff_dir = os.path.dirname(ff)
            os.environ["PATH"] = ff_dir + os.pathsep + os.environ.get("PATH", "")

        resolved = shutil.which("ffmpeg")
        if resolved:
            try:
                out = subprocess.run(
                    [resolved, "-version"],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    check=True,
                    timeout=5
                )
                logger.info(f"ffmpeg: {resolved} | {out.stdout.decode('utf-8').splitlines()[0]}")
            except Exception:
                logger.info(f"ffmpeg: {resolved} (version check failed)")
        else:
            # Create shim if needed
            if ff and os.path.exists(ff):
                bin_dir = os.path.expanduser("~/.local/bin")
                os.makedirs(bin_dir, exist_ok=True)
                shim = os.path.join(bin_dir, "ffmpeg")
                with open(shim, "w") as f:
                    f.write(f"#!/usr/bin/env bash\n\"{ff}\" \"$@\"\n")
                os.chmod(shim, 0o755)
                os.environ["PATH"] = bin_dir + os.pathsep + os.environ.get("PATH", "")
                resolved = shutil.which("ffmpeg")
                if resolved:
                    logger.info(f"ffmpeg shim created: {resolved}")

        if not resolved:
            logger.warning("FFmpeg not found; use torchaudio fallback")

        return resolved

class S3Manager:
    """Handles S3 operations"""

    def __init__(self, config: Config):
        self.config = config
        self.client = boto3.client("s3")

    def parse_uri(self, uri: str) -> tuple[str, str]:
        """Parse S3 URI into bucket and key"""
        if not uri.startswith("s3://"):
            raise ValueError(f"Invalid S3 URI: {uri}")
        p = urlparse(uri)
        return p.netloc, p.path.lstrip("/")

    def is_audio_file(self, key: str) -> bool:
        """Check if key is an audio file"""
        return any(key.lower().endswith(ext) for ext in self.config.audio_extensions)

    def list_audio_keys(self, bucket: str, prefix: str, allowed_filenames: Optional[Set[str]] = None) -> List[str]:
        """List all audio keys under prefix, optionally filtered by allowed filenames"""
        if self.is_audio_file(prefix):
            return [prefix]

        keys = []
        paginator = self.client.get_paginator("list_objects_v2")

        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if not key.endswith("/") and self.is_audio_file(key):
                    if allowed_filenames:
                        filename = os.path.basename(key)
                        if filename not in allowed_filenames:
                            continue
                    keys.append(key)
                    if self.config.max_files and len(keys) >= self.config.max_files:
                        return keys
        return keys

    def download_to_temp(self, bucket: str, key: str) -> str:
        """Download S3 object to temporary file"""
        _, ext = os.path.splitext(key)
        if not ext:
            ext = ".mp3"

        fd, tmp_path = tempfile.mkstemp(suffix=ext)
        os.close(fd)

        with open(tmp_path, "wb") as f:
            self.client.download_fileobj(bucket, key, f)

        return tmp_path

    def upload_file(self, local_path: str, s3_uri: str):
        """Upload file to S3"""
        bucket, key = self.parse_uri(s3_uri)
        self.client.upload_file(local_path, bucket, key)
        logger.info(f"Uploaded to {s3_uri}")

class TranscriptionManager:
    """Manages multiple ASR pipelines for parallel processing"""

    def __init__(self, config: Config):
        self.config = config
        self.pipelines = []
        self.pipeline_locks = []

        num_workers = getattr(config, 'asr_workers', ASR_WORKERS)

        # Create multiple pipeline instances
        for i in range(num_workers):
            if torch.cuda.is_available():
                device_id = i % torch.cuda.device_count()
            else:
                device_id = -1

            logger.info(f"Creating ASR worker {i+1}/{num_workers} on device {device_id}")
            pipe = self._build_pipeline(device_id=device_id)
            self.pipelines.append(pipe)
            self.pipeline_locks.append(threading.Lock())

    def _build_pipeline(self, device_id=-1):
        """Build the ASR pipeline"""
        use_cuda = torch.cuda.is_available() and device_id >= 0
        dtype = torch.float16 if use_cuda else torch.float32

        generate_kwargs = {}
        if self.config.language:
            generate_kwargs["language"] = self.config.language
        if self.config.task:
            generate_kwargs["task"] = self.config.task

        logger.info(f"Loading ASR: {self.config.model_id} (device={device_id}, dtype={dtype})")

        return pipeline(
            "automatic-speech-recognition",
            model=self.config.model_id,
            device=device_id,
            torch_dtype=dtype,
            return_timestamps=True,
            chunk_length_s=self.config.chunk_length_s,
            stride_length_s=self.config.stride_length_s,
            generate_kwargs=generate_kwargs or None,
            batch_size=BATCH_SIZE,
        )

    def transcribe(self, audio_path: str) -> Dict[str, Any]:
        """Single file transcription - uses first pipeline"""
        return self._transcribe_with_pipeline(audio_path, 0)

    def transcribe_batch_parallel(self, paths_and_keys: List[tuple]) -> List[Dict[str, Any]]:
        """Transcribe multiple files in parallel using ThreadPoolExecutor"""
        num_workers = len(self.pipelines)

        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = []
            for i, (path, key) in enumerate(paths_and_keys):
                pipeline_idx = i % num_workers
                future = executor.submit(
                    self._transcribe_with_pipeline_and_key,
                    path,
                    key,
                    pipeline_idx
                )
                futures.append(future)

            # Collect results
            results = []
            for future in as_completed(futures):
                try:
                    result = future.result()
                    results.append(result)
                except Exception as e:
                    logger.error(f"Transcription failed: {e}")

            return results

    def _transcribe_with_pipeline(self, audio_path: str, pipeline_idx: int) -> Dict[str, Any]:
        """Transcribe using a specific pipeline instance"""
        with self.pipeline_locks[pipeline_idx]:
            try:
                result = self.pipelines[pipeline_idx](audio_path)
                text = result.get("text", "") if isinstance(result, dict) else str(result)
                return {"text": text, "error": None}
            except Exception as e:
                if "ffmpeg" in str(e).lower():
                    return self._fallback_transcribe(audio_path, pipeline_idx)
                return {"text": "", "error": f"{type(e).__name__}: {e}"}

    def _transcribe_with_pipeline_and_key(self, audio_path: str, s3_key: str, pipeline_idx: int) -> Dict[str, Any]:
        """Transcribe with key tracking for batch processing"""
        result = self._transcribe_with_pipeline(audio_path, pipeline_idx)

        if result["text"]:
            preview = result["text"][:100] + "..." if len(result["text"]) > 100 else result["text"]
            logger.info(f"✓ Worker {pipeline_idx}: {os.path.basename(s3_key)}: {preview}")
        else:
            logger.warning(f"✗ Worker {pipeline_idx}: {os.path.basename(s3_key)}: {result['error']}")

        return {
            "s3_key": s3_key,
            "filename": os.path.basename(s3_key),
            "transcribed_text": result["text"],
            "error": result["error"] or ""
        }

    def _fallback_transcribe(self, audio_path: str, pipeline_idx: int) -> Dict[str, Any]:
        """Fallback using torchaudio"""
        try:
            import torchaudio
            waveform, sr = torchaudio.load(audio_path)
            if waveform.ndim == 2:
                waveform = waveform.mean(dim=0, keepdim=True)

            with self.pipeline_locks[pipeline_idx]:
                result = self.pipelines[pipeline_idx](waveform.squeeze(0).numpy(), sampling_rate=sr)
                text = result.get("text", "") if isinstance(result, dict) else str(result)
                return {"text": text, "error": None}
        except Exception as e:
            return {"text": "", "error": f"Fallback failed: {e}"}

    def cleanup_gpu_memory(self):
        """Clean up GPU memory"""
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()


class CSVManager:
    """Handles CSV operations """

    def __init__(self, csv_path: str):
        self.csv_path = csv_path

    def read_processed_keys(self) -> Set[str]:
        """Read already processed S3 keys from CSV"""
        if not os.path.exists(self.csv_path):
            return set()
        try:
            df = pd.read_csv(self.csv_path, usecols=["s3_key"])
            return set(df["s3_key"].astype(str).tolist())
        except Exception as e:
            logger.warning(f"Could not read existing CSV: {e}")
            return set()

    def append_results(self, results: List[Dict[str, Any]]):
        """Append results to CSV"""
        if not results:
            return

        df = pd.DataFrame(results)
        mode = "a" if os.path.exists(self.csv_path) else "w"
        header = not os.path.exists(self.csv_path)

        df.to_csv(self.csv_path, index=False, mode=mode, header=header)
        logger.info(f"Appended {len(results)} rows to {self.csv_path}")

In [None]:
class ValidationCSVManager:
    """Handles the CSV with validated files"""

    def __init__(self, csv_path: str, column_name: str):
        self.csv_path = csv_path
        self.column_name = column_name

    def read_allowed_filenames(self) -> Set[str]:
        """Read the list of allowed filenames from the validation CSV"""
        if not os.path.exists(self.csv_path):
            raise FileNotFoundError(f"Validation CSV not found: {self.csv_path}")

        try:
            df = pd.read_csv(self.csv_path, usecols=[self.column_name])
            filenames = df[self.column_name].astype(str).tolist()
            # Remove any NaN values and strip whitespace
            filenames = [f.strip() for f in filenames if pd.notna(f) and f.strip()]
            logger.info(f"Loaded {len(filenames)} allowed filenames from {self.csv_path}")
            return set(filenames)
        except Exception as e:
            logger.error(f"Failed to read validation CSV: {e}")
            raise

In [None]:
class ProductionPipeline:
    """Main for the transcription pipeline"""

    def __init__(self, config: Config):
        self.config = config
        self.s3_manager = S3Manager(config)
        self.transcription_manager = TranscriptionManager(config)
        self.csv_manager = CSVManager(config.output_local_csv)
        self.validation_csv_manager = ValidationCSVManager(
            config.validation_csv_path,
            config.validation_csv_column
        )
        self.results_buffer: List[Dict[str, Any]] = []

    def run(self):
        """Execute the transcription pipeline"""
        # Setup
        FFmpegSetup.ensure_available()

        allowed_filenames = self.validation_csv_manager.read_allowed_filenames()
        logger.info(f"Will only process files from validation CSV: {len(allowed_filenames)} files")


        bucket, prefix = self.s3_manager.parse_uri(self.config.s3_input)
        prefix_norm = prefix.rstrip("/") + "/"

        # Normalize allow-list to just basenames so it matches S3 keys
        def _norm(name: str) -> str:
            name = (name or "").strip().lstrip("/")
            return os.path.basename(name)

        allowed_filenames = { _norm(x) for x in self.validation_csv_manager.read_allowed_filenames() }
        logger.info(f"Will only process files from validation CSV (normalized): {len(allowed_filenames)}")


        existing = []
        paginator = self.s3_manager.client.get_paginator("list_objects_v2")
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if key.endswith("/"):
                    continue
                if not self.s3_manager.is_audio_file(key):
                    continue
                if os.path.basename(key) in allowed_filenames:
                    existing.append(key)

        all_keys = existing
        logger.info(f"Using {len(all_keys)} existing keys from CSV list (after intersection).")



        logger.info(f"Found {len(all_keys)} matching files in S3")

        processed_keys = set()
        if self.config.resume_from_csv:
            processed_keys = self.csv_manager.read_processed_keys()

        keys_to_process = [k for k in all_keys if k not in processed_keys]

        logger.info(
            f"Processing {len(keys_to_process)} files "
            f"(skipped {len(processed_keys)} already done)"
        )

        if not keys_to_process:
            logger.info("No files to process")
            return

        # Process with concurrent downloads
        self._process_with_concurrency(bucket, keys_to_process)

        # Final flush
        self._flush_results()

        # Upload to S3 if configured
        if self.config.write_back_to_s3 and self.config.output_s3_uri:
            self.s3_manager.upload_file(
                self.config.output_local_csv,
                self.config.output_s3_uri
            )

        logger.info("Pipeline complete!")


    def _process_with_concurrency(self, bucket: str, keys: List[str]):
        """Process files with concurrent downloads and parallel transcription"""
        batch_size = max(1, self.config.download_workers * 2)

        with ThreadPoolExecutor(max_workers=self.config.download_workers) as pool:
            for i in range(0, len(keys), batch_size):
                batch = keys[i:i + batch_size]

                # Download batch
                downloaded = []
                futures = {
                    pool.submit(self.s3_manager.download_to_temp, bucket, k): k
                    for k in batch
                }

                progress = tqdm(
                    as_completed(futures),
                    total=len(futures),
                    desc=f"Downloading batch {i//batch_size + 1}"
                )

                for future in progress:
                    key = futures[future]
                    try:
                        local_path = future.result()
                        downloaded.append((local_path, key))
                    except Exception as e:
                        logger.error(f"Download failed for {key}: {e}")
                        self.results_buffer.append({
                            "s3_key": key,
                            "filename": os.path.basename(key),
                            "transcribed_text": "",
                            "error": f"Download failed: {e}"
                        })

                # Transcribe downloaded files in parallel
                if downloaded:
                    logger.info(f"Transcribing {len(downloaded)} files in parallel...")
                    results = self.transcription_manager.transcribe_batch_parallel(downloaded)

                    for result in results:
                        self.results_buffer.append(result)

                    # Clean up temp files
                    for local_path, _ in downloaded:
                        try:
                            os.remove(local_path)
                        except:
                            pass

                # Periodic flush
                if len(self.results_buffer) >= self.config.append_every_n:
                    self._flush_results()

                # Memory cleanup
                self.transcription_manager.cleanup_gpu_memory()

    def _process_single_file(self, future, key: str):
        """Process a single downloaded file"""
        local_path = None
        try:
            # Get downloaded file
            local_path = future.result()

            # Transcribe
            result = self.transcription_manager.transcribe(local_path)

            # Store result
            self.results_buffer.append({
                "s3_key": key,
                "filename": os.path.basename(key),
                "transcribed_text": result["text"],
                "error": result["error"] or ""
            })

            if result["text"]:
                preview = result["text"][:100] + "..." if len(result["text"]) > 100 else result["text"]
                logger.info(f"{os.path.basename(key)}: {preview}")
            else:
                logger.warning(f"{os.path.basename(key)}: {result['error']}")

            # Periodic flush
            if len(self.results_buffer) >= self.config.append_every_n:
                self._flush_results()

            # Memory cleanup
            self.transcription_manager.cleanup_gpu_memory()

        except Exception as e:
            tb = traceback.format_exc(limit=2)
            self.results_buffer.append({
                "s3_key": key,
                "filename": os.path.basename(key),
                "transcribed_text": "",
                "error": f"{type(e).__name__}: {e} | {tb}"
            })
            logger.error(f"Failed to process {key}: {e}")

        finally:
            # Clean up temp file
            if local_path and os.path.exists(local_path):
                try:
                    os.remove(local_path)
                except Exception:
                    pass

    def _flush_results(self):
        """Flush results buffer to CSV"""
        if self.results_buffer:
            self.csv_manager.append_results(self.results_buffer)
            self.results_buffer.clear()

In [None]:
class AsyncProductionPipeline(ProductionPipeline):
    """Async variant of ProductionPipeline for concurrent processing"""

    def _list_all_audio(self, bucket: str, prefix_norm: str, allowed_filenames: set[str]) -> list[str]:
        """List all matching audio keys from S3 (same logic as original run())"""
        paginator = self.s3_manager.client.get_paginator("list_objects_v2")
        keys = []

        for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if key.endswith("/"):
                    continue
                if not self.s3_manager.is_audio_file(key):
                    continue
                if os.path.basename(key) in allowed_filenames:
                    keys.append(key)
                    if self.config.max_files and len(keys) >= self.config.max_files:
                        return keys

        logger.info(f"Using {len(keys)} keys from allowed list (after intersection)")
        return keys

    def _filter_unprocessed(self, all_keys: list[str]) -> list[str]:
        """Filter out keys already present in output CSV"""
        processed_keys = set()
        if self.config.resume_from_csv:
            processed_keys = self.csv_manager.read_processed_keys()

        keys_to_process = [k for k in all_keys if k not in processed_keys]
        logger.info(
            f"Processing {len(keys_to_process)} files "
            f"(skipped {len(processed_keys)} already done)"
        )
        return keys_to_process

    async def _download_file(self, bucket: str, key: str) -> str:
        """Download file in a thread so asyncio can schedule around it"""
        return await asyncio.to_thread(self.s3_manager.download_to_temp, bucket, key)

    async def _transcribe_file(self, local_path: str, key: str, pipeline_idx: int) -> dict:
        """Run transcription asynchronously in a thread"""
        return await asyncio.to_thread(
            self.transcription_manager._transcribe_with_pipeline_and_key,
            local_path, key, pipeline_idx
        )

    async def _process_one(self, sem: asyncio.Semaphore, bucket: str, key: str, pipeline_idx: int):
        """One async unit of work: download → transcribe → cleanup"""
        async with sem:
            local_path = None
            try:
                local_path = await self._download_file(bucket, key)
                result = await self._transcribe_file(local_path, key, pipeline_idx)
                self.results_buffer.append(result)
            except Exception as e:
                self.results_buffer.append({
                    "s3_key": key,
                    "filename": os.path.basename(key),
                    "transcribed_text": "",
                    "error": f"Error: {e}"
                })
            finally:
                if local_path and os.path.exists(local_path):
                    try:
                        os.remove(local_path)
                    except Exception:
                        pass

            if len(self.results_buffer) >= self.config.append_every_n:
                await asyncio.to_thread(self._flush_results)
                self.transcription_manager.cleanup_gpu_memory()

    async def _process_with_asyncio(self, bucket: str, keys: list[str], concurrency: int = 16):
        """Main async loop: orchestrate downloads + transcription concurrently"""
        sem = asyncio.Semaphore(concurrency)
        tasks = [
            self._process_one(sem, bucket, key, i % len(self.transcription_manager.pipelines))
            for i, key in enumerate(keys)
        ]
        total = len(tasks)
        start_time = datetime.now()
        completed = 0

        # Use asyncio.as_completed (standard) + tqdm for progress bar
        for fut in tqdm(asyncio.as_completed(tasks), total=total, desc="Processing files"):
            await fut
            completed += 1
            if completed % 10 == 0 or completed == total:
                elapsed = (datetime.now() - start_time).total_seconds()
                rate = completed / elapsed if elapsed > 0 else 0
                remaining = total - completed
                eta_min = (remaining / rate / 60) if rate > 0 else float("inf")
                logger.info(f"{completed}/{total} done ({rate:.2f} files/s) — ETA {eta_min:.1f} min")

        await asyncio.to_thread(self._flush_results)

    def run(self):
        """Override run() to use async version"""
        FFmpegSetup.ensure_available()
        allowed_filenames = self.validation_csv_manager.read_allowed_filenames()
        allowed_filenames = {os.path.basename(f.strip()) for f in allowed_filenames}

        bucket, prefix = self.s3_manager.parse_uri(self.config.s3_input)
        prefix_norm = prefix.rstrip("/") + "/"

        all_keys = self._list_all_audio(bucket, prefix_norm, allowed_filenames)
        keys_to_process = self._filter_unprocessed(all_keys)
        if not keys_to_process:
            logger.info("No files to process.")
            return

        logger.info(f"Starting async transcription on {len(keys_to_process)} files...")

        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                task = loop.create_task(self._process_with_asyncio(bucket, keys_to_process))
                loop.run_until_complete(task)
            else:
                asyncio.run(self._process_with_asyncio(bucket, keys_to_process))
        except RuntimeError:
            # fallback if event loop handling differs (rare edge case)
            asyncio.get_event_loop().run_until_complete(self._process_with_asyncio(bucket, keys_to_process))

        logger.info("Async pipeline complete!")

In [None]:
# ======================================== Run main loop ========================================
def main():
    try:
        config = Config()
        # pipeline = ProductionPipeline(config)
        pipeline = AsyncProductionPipeline(config)
        pipeline.run()
        return 0
    except KeyboardInterrupt:
        logger.info("Interrupted by user")
        return 130
    except Exception as e:
        logger.error(f"Pipeline failed: {e}")
        logger.error(traceback.format_exc())
        return 1


if __name__ == "__main__":
    result = main()
    if result == 0:
        print("Pipeline completed successfully!")
    else:
        print(f"Pipeline failed with code: {result}")