In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!apt-get update
!apt-get install -y ffmpeg libavcodec-extra

In [3]:
# Install required packages
!pip install -q google-generativeai openai-whisper librosa soundfile pesq pystoi numpy scipy matplotlib seaborn pandas scikit-learn ffmpeg-python

[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/803.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m803.2/803.2 kB[0m [31m44.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone
  Building wheel for pesq (setup.py) ... [?25l[?25hdone


In [4]:
import os
import json
import random
import subprocess
import numpy as np
import librosa
import soundfile as sf
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field, asdict
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy import stats
import google.generativeai as genai
import whisper
from pesq import pesq
from pystoi import stoi
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)



In [5]:
# You'll need to update these paths:
DATASET_PATH = "/content/drive/MyDrive/adversarial-audio/Normal-Examples"
OUTPUT_DIR = "/content/outputs"

In [None]:
GEMINI_API_KEY = "enter gemini api"

In [None]:
# Configuration
#GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "enter gemini api")  # Set your API key
DATASET_PATH = Path("/content/drive/MyDrive/adversarial-audio/Normal-Examples")  # <-- Added Path()
OUTPUT_DIR = Path("./agent_orchestration_outputs")
ARTIFACTS_DIR = OUTPUT_DIR / "artifacts"
RESULTS_DIR = OUTPUT_DIR / "results"

# Create output directories
OUTPUT_DIR.mkdir(exist_ok=True)
ARTIFACTS_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)

# Audio processing parameters
TARGET_SR = 16000
NORMALIZE_PEAK = 0.99
TARGET_LUFS = -23.0  # Broadcast standard

# Codec configuration (opus and amr-wb only)
# Note: libopencore_amrwb is decoder only, use libvo_amrwbenc for encoding
CODECS = {
    "opus": {"codec": "libopus", "bitrates": [32, 64, 96, 128]},
    "amr-wb": {"codec": "libvo_amrwbenc", "bitrates": [6.6, 8.85, 12.65, 14.25, 15.85, 18.25, 19.85, 23.05, 23.85]},
}

EOT_NUM_SAMPLES = 10

# ASR model
ASR_MODEL_NAME = "base"  # Options: tiny, base, small, medium, large


# Perturbation constraints (STRICT - for high-quality imperceptible attacks)
MAX_LINF = 0.008  # Maximum L‚àû norm (reduced for imperceptibility)
MAX_L2 = 0.08     # Maximum L2 norm (reduced for imperceptibility)
MIN_PESQ = 3.5    # Minimum PESQ score (3.5+ = "good" quality)
MIN_STOI = 0.85   # Minimum STOI score (0.85+ = highly intelligible)
MIN_SNR = 20.0    # Minimum SNR in dB (high to ensure perturbation is subtle)
TARGET_SNR = 30.0 # Target SNR in dB (very high for near-imperceptible noise)

# LLM parameters
MAX_ITERATIONS = 5  # Maximum feedback loop iterations
STRATEGY_TOP_K = 3  # Top-k strategies to return to LLM

print("Configuration loaded successfully!")

Configuration loaded successfully!


In [8]:
class AudioNormalizer:
    """Normalize audio to target sample rate and loudness."""

    def __init__(self, target_sr: int = TARGET_SR, target_lufs: float = TARGET_LUFS, peak: float = NORMALIZE_PEAK):
        self.target_sr = target_sr
        self.target_lufs = target_lufs
        self.peak = peak

    def normalize(self, audio_path: Path) -> Tuple[np.ndarray, int]:
        """Load and normalize audio file."""
        # Load audio
        audio, sr = librosa.load(str(audio_path), sr=self.target_sr, mono=True)

        # Peak normalization
        peak_val = np.max(np.abs(audio))
        if peak_val > 0:
            audio = audio * (self.peak / peak_val)

        # LUFS normalization (simplified - using RMS approximation)
        # For production, use pyloudnorm or similar
        rms = np.sqrt(np.mean(audio**2))
        target_rms = 10 ** (self.target_lufs / 20) * 0.1  # Approximate conversion
        if rms > 0:
            audio = audio * (target_rms / rms)

        # Clip to prevent overflow
        audio = np.clip(audio, -1.0, 1.0)

        return audio.astype(np.float32), self.target_sr

    def save_normalized(self, audio: np.ndarray, output_path: Path, sr: int = TARGET_SR):
        """Save normalized audio to file."""
        sf.write(str(output_path), audio, sr)


class ASRBaseline:
    """Whisper-based ASR baseline for transcription."""

    def __init__(self, model_name: str = ASR_MODEL_NAME):
        print(f"Loading Whisper model: {model_name}")
        self.model = whisper.load_model(model_name)
        self.model_name = model_name

    def transcribe(self, audio: np.ndarray, sr: int = TARGET_SR) -> str:
        """Transcribe audio to text."""
        # Whisper expects float32 audio in range [-1, 1]
        if audio.dtype != np.float32:
            audio = audio.astype(np.float32)

        # Resample if needed (Whisper expects 16kHz)
        if sr != 16000:
            audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)

        result = self.model.transcribe(audio, language="en", fp16=False)
        return result["text"].strip()

    def compute_wer(self, reference: str, hypothesis: str) -> float:
        """Compute Word Error Rate (WER)."""
        ref_words = reference.lower().split()
        hyp_words = hypothesis.lower().split()

        if len(ref_words) == 0:
            return 1.0 if len(hyp_words) > 0 else 0.0

        # Dynamic programming for edit distance
        d = np.zeros((len(ref_words) + 1, len(hyp_words) + 1))
        for i in range(len(ref_words) + 1):
            d[i, 0] = i
        for j in range(len(hyp_words) + 1):
            d[0, j] = j

        for i in range(1, len(ref_words) + 1):
            for j in range(1, len(hyp_words) + 1):
                if ref_words[i-1] == hyp_words[j-1]:
                    d[i, j] = d[i-1, j-1]
                else:
                    d[i, j] = min(
                        d[i-1, j] + 1,      # deletion
                        d[i, j-1] + 1,      # insertion
                        d[i-1, j-1] + 1     # substitution
                    )

        return d[len(ref_words), len(hyp_words)] / len(ref_words)

    def compute_cer(self, reference: str, hypothesis: str) -> float:
        """Compute Character Error Rate (CER)."""
        ref_chars = list(reference.lower().replace(" ", ""))
        hyp_chars = list(hypothesis.lower().replace(" ", ""))

        if len(ref_chars) == 0:
            return 1.0 if len(hyp_chars) > 0 else 0.0

        # Character-level edit distance
        d = np.zeros((len(ref_chars) + 1, len(hyp_chars) + 1))
        for i in range(len(ref_chars) + 1):
            d[i, 0] = i
        for j in range(len(hyp_chars) + 1):
            d[0, j] = j

        for i in range(1, len(ref_chars) + 1):
            for j in range(1, len(hyp_chars) + 1):
                if ref_chars[i-1] == hyp_chars[j-1]:
                    d[i, j] = d[i-1, j-1]
                else:
                    d[i, j] = min(
                        d[i-1, j] + 1,
                        d[i, j-1] + 1,
                        d[i-1, j-1] + 1
                    )

        return d[len(ref_chars), len(hyp_chars)] / len(ref_chars)


# Initialize components
normalizer = AudioNormalizer()
asr_baseline = ASRBaseline()

print("Audio normalizer and ASR baseline initialized!")


Loading Whisper model: base


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 139M/139M [00:00<00:00, 199MiB/s]


Audio normalizer and ASR baseline initialized!


In [9]:
class CodecDetector:
    """Detect codec information from audio files."""

    def detect(self, audio_path: Path) -> Dict[str, Any]:
        """Detect codec using ffprobe."""
        try:
            cmd = [
                "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format",
                "-show_streams", str(audio_path)
            ]
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            data = json.loads(result.stdout)

            # Extract codec info
            stream = data.get("streams", [{}])[0]
            format_info = data.get("format", {})

            codec_name = stream.get("codec_name", "unknown")
            bitrate = int(format_info.get("bit_rate", 0)) // 1000  # Convert to kbps
            sample_rate = int(stream.get("sample_rate", TARGET_SR))
            channels = int(stream.get("channels", 1))
            container = format_info.get("format_name", "").split(",")[0]

            return {
                "codec_name": codec_name,
                "bitrate_kbps": bitrate,
                "sample_rate": sample_rate,
                "channels": channels,
                "container": container,
                "detected": True
            }
        except Exception as e:
            # Fallback to heuristic detection (opus and amr-wb only)
            ext = audio_path.suffix.lower()
            mapping = {
                ".wav": {"codec_name": "pcm", "bitrate_kbps": 1411},
                ".flac": {"codec_name": "flac", "bitrate_kbps": 1000},
                ".opus": {"codec_name": "opus", "bitrate_kbps": 96},
                ".amr": {"codec_name": "amr-wb", "bitrate_kbps": 24}
            }
            default = mapping.get(ext, {"codec_name": "unknown", "bitrate_kbps": 128})
            return {
                **default,
                "sample_rate": TARGET_SR,
                "channels": 1,
                "container": ext.lstrip("."),
                "detected": False,
                "error": str(e)
            }


class CodecStack:
    """Codec stack for transcoding audio."""

    # Map codec names to proper file extensions (opus and amr-wb only)
    CODEC_EXTENSIONS = {
        "opus": ".opus",
        "amr-wb": ".amr",
    }

    def __init__(self, codecs: Dict[str, Dict] = CODECS):
        self.codecs = codecs

    def _get_output_path(self, output_path: Path, codec_name: str) -> Path:
        """Ensure output path has proper extension for codec."""
        ext = self.CODEC_EXTENSIONS.get(codec_name, ".tmp")
        if output_path.suffix != ext:
            # Replace extension
            return output_path.with_suffix(ext)
        return output_path

    def encode(self, audio_path: Path, codec_name: str, bitrate: Any, output_path: Path) -> bool:
        """Encode audio using specified codec and bitrate."""
        if codec_name not in self.codecs:
            raise ValueError(f"Unsupported codec: {codec_name}")

        codec_info = self.codecs[codec_name]
        codec = codec_info["codec"]

        # Ensure proper file extension
        output_path = self._get_output_path(output_path, codec_name)

        try:
            # Build ffmpeg command
            cmd = [
                "ffmpeg", "-y", "-i", str(audio_path),
                "-acodec", codec,
                "-ar", str(TARGET_SR),
                "-ac", "1"  # Mono
            ]

            # Add bitrate (opus and amr-wb only)
            cmd.extend(["-b:a", f"{bitrate}k"])

            cmd.append(str(output_path))

            # Run encoding
            result = subprocess.run(
                cmd, capture_output=True, text=True, check=True
            )
            return True
        except subprocess.CalledProcessError as e:
            print(f"Encoding failed: {e.stderr}")
            return False

    def decode(self, encoded_path: Path, output_path: Path) -> bool:
        """Decode encoded audio back to WAV."""
        try:
            cmd = [
                "ffmpeg", "-y", "-i", str(encoded_path),
                "-acodec", "pcm_s16le",
                "-ar", str(TARGET_SR),
                "-ac", "1",
                str(output_path)
            ]
            subprocess.run(cmd, capture_output=True, text=True, check=True)
            return True
        except subprocess.CalledProcessError as e:
            print(f"Decoding failed: {e.stderr}")
            return False

# Initialize codec components
codec_detector = CodecDetector()
codec_stack = CodecStack()

print("Codec detector and stack initialized!")


Codec detector and stack initialized!


In [10]:
# Initialize Gemini
genai.configure(api_key=GEMINI_API_KEY)

@dataclass
class PerturbationStrategy:
    """Structured perturbation strategy from LLM."""
    name: str
    family: str  # e.g., "narrowband_spectral_noise", "phase_only", "micro_time_warp", "spread_spectrum"
    optimizer: str  # e.g., "CMA-ES", "gradient", "black_box"
    constraints: Dict[str, float]
    codec_config: Dict[str, Any]  # target codecs and bitrate range (no chaining)
    code_snippet: str
    parameters: Dict[str, Any] = field(default_factory=dict)
    description: str = ""


class LLMOrchestrator:
    """Gemini 2.5-based LLM orchestrator for strategy generation."""

    def __init__(self, model_name: str = None):
        # Try Gemini 3 models in order of preference
        gemini_models = [
            "gemini-2.5-flash-lite",
            #"gemini-2.0-flash-exp",  # Latest experimental
            #"gemini-1.5-pro",        # Gemini 1.5 Pro
            #"gemini-1.5-flash",      # Gemini 1.5 Flash
            #"gemini-pro"             # Fallback
        ]

        if model_name:
            gemini_models.insert(0, model_name)

        self.model = None
        self.model_name = None

        for model in gemini_models:
            try:
                self.model = genai.GenerativeModel(model)
                self.model_name = model
                print(f"Successfully loaded Gemini model: {model}")
                break
            except Exception as e:
                print(f"Could not load {model}: {e}")
                continue

        if self.model is None:
            raise RuntimeError("Failed to load any Gemini model. Please check your API key and model availability.")

    def generate_strategy(
        self,
        codec_info: Dict[str, Any],
        available_codecs: Dict[str, Dict],
        previous_feedback: Optional[str] = None,
        iteration: int = 1
    ) -> PerturbationStrategy:
        """Generate perturbation strategy based on codec information."""

        # Build prompt
        prompt = self._build_strategy_prompt(
            codec_info, available_codecs, previous_feedback, iteration
        )

        try:
            response = self.model.generate_content(prompt)
            strategy_text = response.text

            # Parse strategy from response
            strategy = self._parse_strategy(strategy_text, codec_info)
            return strategy
        except Exception as e:
            print(f"LLM generation failed: {e}")
            # Return default strategy
            return self._default_strategy(codec_info)

    def _build_strategy_prompt(
        self,
        codec_info: Dict[str, Any],
        available_codecs: Dict[str, Dict],
        previous_feedback: Optional[str],
        iteration: int
    ) -> str:
        """Build prompt for strategy generation."""

        codec_list = ", ".join([f"{k} (bitrates: {v['bitrates']})"
                                for k, v in available_codecs.items()])

        feedback_section = ""
        if previous_feedback:
            feedback_section = f"""
Previous iteration feedback:
{previous_feedback}

Based on this feedback, revise your strategy.
"""

        prompt = f"""You are an expert in adversarial audio generation for ASR systems. Your task is to design perturbation strategies that survive lossy codec compression.

Current codec context:
- Detected codec: {codec_info.get('codec_name', 'unknown')}
- Bitrate: {codec_info.get('bitrate_kbps', 'unknown')} kbps
- Sample rate: {codec_info.get('sample_rate', TARGET_SR)} Hz
- Target codec for attack: {codec_info.get('target_codec', 'unknown')}

Available codecs (applied individually, no chaining):
{codec_list}

‚ö†Ô∏è STRICT QUALITY CONSTRAINTS (MUST BE MET):
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
- Maximum L‚àû norm: {MAX_LINF} (perturbation must be tiny!)
- Maximum L2 norm: {MAX_L2} (overall perturbation energy limit)
- Minimum PESQ: {MIN_PESQ} (PESQ 3.5+ = "Good" quality - HARD REQUIREMENT)
- Minimum STOI: {MIN_STOI} (STOI 0.85+ = highly intelligible - HARD REQUIREMENT)
- Minimum SNR: {MIN_SNR} dB (perturbation must be 20dB below signal)
- Target SNR: {TARGET_SNR} dB (aim for 30dB - near imperceptible)
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ

CRITICAL: The perturbation MUST be nearly imperceptible to human listeners.
- PESQ below 3.5 = FAILURE (audio sounds degraded)
- STOI below 0.85 = FAILURE (speech becomes unclear)
- SNR below 20dB = FAILURE (noise is audible)

{feedback_section}

Think step-by-step:
1. What frequency bands are least perceptible to humans but affect ASR?
2. How can I exploit codec compression artifacts without adding audible noise?
3. What perturbation pattern will survive codec quantization?
4. How do I maximize ASR confusion while keeping PESQ > 3.5 and STOI > 0.85?

Generate a perturbation strategy in the following JSON format:
{{
    "name": "strategy_name",
    "family": "narrowband_spectral_noise|phase_only|micro_time_warp|spread_spectrum|psychoacoustic|hybrid",
    "optimizer": "CMA-ES|gradient|black_box",
    "constraints": {{
        "max_linf": {MAX_LINF},
        "max_l2": {MAX_L2},
        "min_pesq": {MIN_PESQ},
        "min_stoi": {MIN_STOI},
        "min_snr": {MIN_SNR},
        "target_snr": {TARGET_SNR}
    }},
    "codec_config": {{
        "target_codecs": ["opus", "amr-wb"],
        "bitrate_range": [32, 128]
    }},
    "parameters": {{
        "frequency_bands": [3000, 4000],
        "noise_level": 0.002,
        "time_warp_factor": 0.005,
        "phase_shift": 0.005
    }},
    "code_snippet": "Python code implementing the perturbation",
    "description": "Detailed description of the strategy"
}}

Focus on strategies that:
1. Exploit codec-specific vulnerabilities (e.g., Opus's variable bitrate encoding, AMR-WB's speech-optimized compression)
2. Use frequency-domain perturbations that survive quantization
3. Apply phase-only modifications that are less perceptible
4. Ensure robustness when audio is compressed with Opus or AMR-WB codecs

Return ONLY the JSON, no additional text."""

        return prompt

    def _parse_strategy(self, response_text: str, codec_info: Dict[str, Any]) -> PerturbationStrategy:
        """Parse strategy from LLM response."""
        try:
            # Extract JSON from response
            import re
            json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
            if json_match:
                strategy_dict = json.loads(json_match.group())
            else:
                raise ValueError("No JSON found in response")

            return PerturbationStrategy(
                name=strategy_dict.get("name", "default_strategy"),
                family=strategy_dict.get("family", "narrowband_spectral_noise"),
                optimizer=strategy_dict.get("optimizer", "CMA-ES"),
                constraints=strategy_dict.get("constraints", {}),
                codec_config=strategy_dict.get("codec_config", {"target_codecs": ["opus", "amr-wb"], "bitrate_range": [32, 128]}),
                code_snippet=strategy_dict.get("code_snippet", ""),
                parameters=strategy_dict.get("parameters", {}),
                description=strategy_dict.get("description", "")
            )
        except Exception as e:
            print(f"Failed to parse strategy: {e}")
            return self._default_strategy(codec_info)

    def _default_strategy(self, codec_info: Dict[str, Any]) -> PerturbationStrategy:
        """Return default strategy if LLM fails."""
        return PerturbationStrategy(
            name="default_narrowband_noise",
            family="narrowband_spectral_noise",
            optimizer="CMA-ES",
            constraints={
                "max_linf": MAX_LINF,
                "max_l2": MAX_L2,
                "min_pesq": MIN_PESQ,
                "min_stoi": MIN_STOI,
                "min_snr": MIN_SNR,
                "target_snr": TARGET_SNR
            },
            codec_config={
                "target_codecs": ["opus", "amr-wb"],
                "bitrate_range": [32, 128]
            },
            code_snippet="""
def apply_perturbation(audio, sr=16000):
    # Narrowband noise injection
    noise = np.random.randn(len(audio)) * 0.005
    # Filter to 3-4 kHz band
    from scipy import signal
    b, a = signal.butter(4, [3000/(sr/2), 4000/(sr/2)], btype='band')
    noise = signal.filtfilt(b, a, noise)
    return audio + noise
            """,
            parameters={"frequency_bands": [3000, 4000], "noise_level": 0.005},
            description="Default narrowband spectral noise injection"
        )

    def generate_feedback_summary(
        self,
        results: List[Dict[str, Any]],
        top_k: int = STRATEGY_TOP_K
    ) -> str:
        """Generate feedback summary for LLM based on results."""
        if not results:
            return "No results available yet."

        # Sort by WER increase (best attacks first)
        sorted_results = sorted(
            results,
            key=lambda x: x.get("wer_delta", 0),
            reverse=True
        )

        top_results = sorted_results[:top_k]

        summary = f"Top {len(top_results)} strategies:\n\n"
        for i, result in enumerate(top_results, 1):
            summary += f"{i}. {result.get('strategy_name', 'unknown')}:\n"
            summary += f"   - WER delta: {result.get('wer_delta', 0):.3f}\n"
            summary += f"   - CER delta: {result.get('cer_delta', 0):.3f}\n"
            summary += f"   - PESQ: {result.get('pesq', 0):.2f}\n"
            summary += f"   - STOI: {result.get('stoi', 0):.3f}\n"
            summary += f"   - SNR: {result.get('snr', 0):.2f} dB\n"
            summary += f"   - Codec: {result.get('codec', 'unknown')}\n\n"

        # Failure modes
        failures = [r for r in results if r.get("wer_delta", 0) < 0.1]
        if failures:
            summary += f"\nFailure modes ({len(failures)} strategies):\n"
            summary += "- Low WER increase despite perturbation\n"
            summary += "- Constraint violations (PESQ/STOI too low)\n"
            summary += "- Codec-specific robustness issues\n"

        return summary


# Initialize LLM orchestrator
llm_orchestrator = LLMOrchestrator()

print("LLM orchestrator initialized!")


Successfully loaded Gemini model: gemini-2.5-flash-lite
LLM orchestrator initialized!


In [11]:
class PerturbationExecutor:
    """Execute perturbation strategies on audio."""

    def __init__(self, codec_stack: CodecStack, normalizer: AudioNormalizer):
        self.codec_stack = codec_stack
        self.normalizer = normalizer

    def apply_perturbation(
        self,
        audio: np.ndarray,
        strategy: PerturbationStrategy,
        sr: int = TARGET_SR
    ) -> np.ndarray:
        """Apply perturbation based on strategy."""
        # Import scipy.signal for LLM-generated code
        from scipy import signal as scipy_signal

        # Execute the code snippet from strategy
        try:
            # Create execution context with more libraries available
            exec_globals = {
                "np": np,
                "librosa": librosa,
                "audio": audio.copy(),
                "sr": sr,
                "strategy": strategy,
                "signal": scipy_signal,  # Add scipy.signal for filters
            }

            # Execute code snippet
            exec(strategy.code_snippet, exec_globals)

            # Get perturbed audio (assuming function returns it)
            if "perturbed_audio" in exec_globals:
                perturbed = exec_globals["perturbed_audio"]
            elif "result" in exec_globals:
                perturbed = exec_globals["result"]
            else:
                # Fallback: apply default perturbation
                perturbed = self._apply_default_perturbation(audio, strategy, sr)

            # Ensure constraints (L‚àû, L2, and perceptual quality)
            perturbed = self._enforce_constraints_with_quality(audio, perturbed, strategy, sr)

            return perturbed.astype(np.float32)
        except Exception as e:
            print(f"Perturbation execution failed: {e}")
            # Apply default perturbation with constraints
            perturbed = self._apply_default_perturbation(audio, strategy, sr)
            perturbed = self._enforce_constraints_with_quality(audio, perturbed, strategy, sr)
            return perturbed.astype(np.float32)

    def _apply_default_perturbation(
        self,
        audio: np.ndarray,
        strategy: PerturbationStrategy,
        sr: int
    ) -> np.ndarray:
        """Apply default perturbation based on family (with strict quality constraints)."""
        family = strategy.family.lower()
        params = strategy.parameters

        if "narrowband" in family or "spectral" in family:
            # Narrowband spectral noise
            noise_level = params.get("noise_level", 0.005)
            freq_bands = params.get("frequency_bands", [3000, 4000])

            from scipy import signal
            noise = np.random.randn(len(audio)) * noise_level
            b, a = signal.butter(4, [freq_bands[0]/(sr/2), freq_bands[1]/(sr/2)], btype='band')
            noise = signal.filtfilt(b, a, noise)
            return audio + noise

        elif "phase" in family:
            # Phase-only modification
            fft = np.fft.fft(audio)
            magnitude = np.abs(fft)
            phase = np.angle(fft)
            phase_shift = params.get("phase_shift", 0.01) * np.random.randn(len(phase))
            new_fft = magnitude * np.exp(1j * (phase + phase_shift))
            return np.real(np.fft.ifft(new_fft))

        elif "time_warp" in family:
            # Micro time warping
            from scipy.interpolate import interp1d
            warp_factor = params.get("time_warp_factor", 0.01)
            n = len(audio)
            indices = np.arange(n) + warp_factor * np.sin(2 * np.pi * np.arange(n) / (n/10))
            indices = np.clip(indices, 0, n-1)
            f = interp1d(np.arange(n), audio, kind='linear', fill_value='extrapolate')
            return f(indices)

        elif "spread_spectrum" in family:
            # Spread spectrum pattern
            noise_level = params.get("noise_level", 0.003)
            noise = np.random.randn(len(audio)) * noise_level
            # Modulate with chirp
            t = np.arange(len(audio)) / sr
            chirp = np.sin(2 * np.pi * (1000 + 2000 * t) * t)
            noise = noise * (1 + 0.1 * chirp)
            return audio + noise

        else:
            # Default: additive Gaussian noise
            noise_level = params.get("noise_level", 0.005)
            noise = np.random.randn(len(audio)) * noise_level
            return audio + noise

    def _enforce_constraints_with_quality(
        self,
        original: np.ndarray,
        perturbed: np.ndarray,
        strategy: PerturbationStrategy,
        sr: int = TARGET_SR,
        max_iterations: int = 10
    ) -> np.ndarray:
        """Enforce perturbation constraints including perceptual quality (PESQ/STOI).

        This method iteratively reduces the perturbation magnitude if PESQ/STOI
        constraints are violated, ensuring the perturbed audio sounds acceptable.
        """
        constraints = strategy.constraints
        min_pesq = constraints.get("min_pesq", MIN_PESQ)
        min_stoi = constraints.get("min_stoi", MIN_STOI)

        # First apply L‚àû and L2 constraints
        perturbation = perturbed - original

        # L‚àû constraint
        max_linf = constraints.get("max_linf", MAX_LINF)
        if np.max(np.abs(perturbation)) > max_linf:
            perturbation = perturbation * (max_linf / np.max(np.abs(perturbation)))

        # L2 constraint
        max_l2 = constraints.get("max_l2", MAX_L2)
        l2_norm = np.linalg.norm(perturbation)
        if l2_norm > max_l2:
            perturbation = perturbation * (max_l2 / l2_norm)

        perturbed = np.clip(original + perturbation, -1.0, 1.0)

        # Now check and enforce PESQ/STOI constraints by iteratively reducing perturbation
        scale_factor = 1.0
        for iteration in range(max_iterations):
            # Compute current perturbed audio
            current_perturbed = np.clip(original + perturbation * scale_factor, -1.0, 1.0)

            # Ensure same length for metrics computation
            min_len = min(len(original), len(current_perturbed))
            orig_trimmed = original[:min_len]
            pert_trimmed = current_perturbed[:min_len]

            # Compute PESQ and STOI
            try:
                current_pesq = pesq(sr, orig_trimmed, pert_trimmed, 'wb')
            except Exception:
                current_pesq = 0.0

            try:
                current_stoi = stoi(orig_trimmed, pert_trimmed, sr, extended=False)
            except Exception:
                current_stoi = 0.0

            # Check if constraints are satisfied
            pesq_ok = current_pesq >= min_pesq
            stoi_ok = current_stoi >= min_stoi

            if pesq_ok and stoi_ok:
                # Constraints satisfied
                if iteration > 0:
                    print(f"  ‚úì Quality constraints met after {iteration} scaling iterations (PESQ={current_pesq:.2f}, STOI={current_stoi:.3f})")
                return current_perturbed

            # Reduce perturbation magnitude
            scale_factor *= 0.7  # Reduce by 30% each iteration

            if scale_factor < 0.01:
                # Perturbation too small, return with minimal perturbation
                print(f"  ‚ö†Ô∏è Could not meet quality constraints, using minimal perturbation (PESQ={current_pesq:.2f}, STOI={current_stoi:.3f})")
                break

        # Return the best we could achieve
        return np.clip(original + perturbation * scale_factor, -1.0, 1.0)

    def _enforce_constraints(
        self,
        original: np.ndarray,
        perturbed: np.ndarray,
        strategy: PerturbationStrategy
    ) -> np.ndarray:
        """Legacy method - enforce only L‚àû and L2 constraints (no quality check)."""
        constraints = strategy.constraints
        perturbation = perturbed - original

        # L‚àû constraint
        max_linf = constraints.get("max_linf", MAX_LINF)
        if np.max(np.abs(perturbation)) > max_linf:
            perturbation = perturbation * (max_linf / np.max(np.abs(perturbation)))

        # L2 constraint
        max_l2 = constraints.get("max_l2", MAX_L2)
        l2_norm = np.linalg.norm(perturbation)
        if l2_norm > max_l2:
            perturbation = perturbation * (max_l2 / l2_norm)

        perturbed = original + perturbation
        return np.clip(perturbed, -1.0, 1.0)

    def apply_single_codec(
        self,
        audio: np.ndarray,
        codec_name: str,
        bitrate: Optional[float] = None,
        output_dir: Path = None,
        sr: int = TARGET_SR
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        """Compress audio to a single codec and decode back.

        Args:
            audio: Input audio as numpy array
            codec_name: Name of codec to use (e.g., "opus", "amr-wb")
            bitrate: Bitrate to use. If None, randomly selects from codec's bitrates
            output_dir: Directory for temporary files
            sr: Sample rate

        Returns:
            Tuple of (decoded_audio, metadata_dict)
        """
        if codec_name not in CODECS:
            raise ValueError(f"Unknown codec: {codec_name}")

        if bitrate is None:
            bitrate = np.random.choice(CODECS[codec_name]["bitrates"])

        # Create temp directory if needed
        if output_dir is None:
            output_dir = Path("/tmp")
        temp_dir = output_dir / "codec_temp"
        temp_dir.mkdir(parents=True, exist_ok=True)

        # Save original perturbed audio to temp WAV
        temp_original = temp_dir / f"temp_input_{codec_name}.wav"
        sf.write(str(temp_original), audio, sr)

        # Encode to codec
        encoded_path = temp_dir / f"encoded_{codec_name}_{bitrate}"
        if not self.codec_stack.encode(temp_original, codec_name, bitrate, encoded_path):
            # If encoding fails, return original audio
            return audio.copy(), {"codec": codec_name, "bitrate": bitrate, "success": False}

        # Get actual path with correct extension
        encoded_path = self.codec_stack._get_output_path(encoded_path, codec_name)

        if not encoded_path.exists():
            return audio.copy(), {"codec": codec_name, "bitrate": bitrate, "success": False}

        # Decode back to WAV
        decoded_path = temp_dir / f"decoded_{codec_name}_{bitrate}.wav"
        if not self.codec_stack.decode(encoded_path, decoded_path):
            return audio.copy(), {"codec": codec_name, "bitrate": bitrate, "success": False}

        # Load decoded audio
        decoded_audio, _ = librosa.load(str(decoded_path), sr=sr, mono=True)

        # Clean up temp files
        try:
            if temp_original.exists():
                temp_original.unlink()
            if encoded_path.exists():
                encoded_path.unlink()
            if decoded_path.exists():
                decoded_path.unlink()
        except:
            pass  # Ignore cleanup errors

        metadata = {
            "codec": codec_name,
            "bitrate": bitrate,
            "success": True
        }

        return decoded_audio, metadata
# Initialize executor
perturbation_executor = PerturbationExecutor(codec_stack, normalizer)

print("Perturbation executor initialized!")


Perturbation executor initialized!


In [12]:
class MetricsComputer:
    """Compute all evaluation metrics."""

    def __init__(self, asr_baseline: ASRBaseline):
        self.asr = asr_baseline

    def compute_all_metrics(
        self,
        original_audio: np.ndarray,
        perturbed_audio: np.ndarray,
        original_transcript: str,
        sr: int = TARGET_SR
    ) -> Dict[str, float]:
        """Compute all metrics for a perturbed audio sample."""
        metrics = {}

        # ASR metrics
        perturbed_transcript = self.asr.transcribe(perturbed_audio, sr)
        metrics["wer"] = self.asr.compute_wer(original_transcript, perturbed_transcript)
        metrics["cer"] = self.asr.compute_cer(original_transcript, perturbed_transcript)
        metrics["perturbed_transcript"] = perturbed_transcript

        # Perceptual quality metrics
        min_len = min(len(original_audio), len(perturbed_audio))
        orig_trimmed = original_audio[:min_len]
        pert_trimmed = perturbed_audio[:min_len]

        try:
            metrics["pesq"] = pesq(sr, orig_trimmed, pert_trimmed, 'wb')
        except:
            metrics["pesq"] = 0.0

        try:
            metrics["stoi"] = stoi(orig_trimmed, pert_trimmed, sr, extended=False)
        except:
            metrics["stoi"] = 0.0

        # Signal metrics
        perturbation = pert_trimmed - orig_trimmed
        signal_power = np.mean(orig_trimmed ** 2)
        noise_power = np.mean(perturbation ** 2)
        if noise_power > 0:
            metrics["snr"] = 10 * np.log10(signal_power / noise_power)
        else:
            metrics["snr"] = float('inf')

        # LUFS (simplified RMS-based approximation)
        rms_pert = np.sqrt(np.mean(pert_trimmed ** 2))
        metrics["lufs"] = 20 * np.log10(rms_pert / 0.1) if rms_pert > 0 else -np.inf

        # Norms
        metrics["l2_norm"] = np.linalg.norm(perturbation)
        metrics["linf_norm"] = np.max(np.abs(perturbation))

        return metrics

    def compute_baseline_metrics(
        self,
        original_audio: np.ndarray,
        original_transcript: str,
        sr: int = TARGET_SR
    ) -> Dict[str, float]:
        """Compute baseline metrics for original audio."""
        # Verify transcription
        verified_transcript = self.asr.transcribe(original_audio, sr)
        wer = self.asr.compute_wer(original_transcript, verified_transcript)
        cer = self.asr.compute_cer(original_transcript, verified_transcript)

        return {
            "wer": wer,
            "cer": cer,
            "transcript": verified_transcript,
            "snr": float('inf'),  # No noise in original
            "pesq": 5.0,  # Perfect quality
            "stoi": 1.0   # Perfect intelligibility
        }

    def compute_delta_metrics(
        self,
        baseline: Dict[str, float],
        perturbed: Dict[str, float]
    ) -> Dict[str, float]:
        """Compute delta metrics (perturbed - baseline)."""
        return {
            "wer_delta": perturbed["wer"] - baseline["wer"],
            "cer_delta": perturbed["cer"] - baseline["cer"],
            "pesq_delta": perturbed["pesq"] - baseline["pesq"],
            "stoi_delta": perturbed["stoi"] - baseline["stoi"],
            "snr_delta": perturbed["snr"] - baseline["snr"] if baseline["snr"] != float('inf') else -perturbed["snr"]
        }


# Initialize metrics computer
metrics_computer = MetricsComputer(asr_baseline)

print("Metrics computer initialized!")


Metrics computer initialized!


In [None]:
@dataclass
class ExperimentResult:
    """Results from a single experiment run."""
    audio_file: str
    strategy_name: str
    iteration: int
    baseline_metrics: Dict[str, float]
    perturbed_metrics: Dict[str, float]
    eot_results: List[Dict[str, Any]]
    delta_metrics: Dict[str, float]
    codec_info: Dict[str, Any]
    strategy: Dict[str, Any]
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())


class AgentOrchestrator:
    """Main orchestrator for the entire pipeline."""

    def __init__(
        self,
        normalizer: AudioNormalizer,
        asr_baseline: ASRBaseline,
        codec_detector: CodecDetector,
        codec_stack: CodecStack,
        llm_orchestrator: LLMOrchestrator,
        perturbation_executor: PerturbationExecutor,
        metrics_computer: MetricsComputer
    ):
        self.normalizer = normalizer
        self.asr_baseline = asr_baseline
        self.codec_detector = codec_detector
        self.codec_stack = codec_stack
        self.llm_orchestrator = llm_orchestrator
        self.perturbation_executor = perturbation_executor
        self.metrics_computer = metrics_computer

    def run_experiment(
        self,
        audio_path: Path,
        reference_transcript: Optional[str] = None,
        max_iterations: int = MAX_ITERATIONS,
        codecs_to_test: List[str] = ["amr-wb"]
    ) -> Tuple[List[ExperimentResult], Dict[str, np.ndarray], np.ndarray, Dict[Tuple[str, int], np.ndarray]]:
        """Run full experiment pipeline - one codec at a time.

        Args:
            audio_path: Path to input audio file
            reference_transcript: Optional reference transcript (auto-generated if None)
            max_iterations: Number of iterations per codec
            codecs_to_test: List of codec names to test (default: ["opus", "amr-wb"])

        Returns:
            Tuple of (all_results, final_perturbed_audios, original_audio, perturbed_audios_by_iteration)
            - all_results: List of ExperimentResult for all iterations
            - final_perturbed_audios: Dict mapping codec_name -> final perturbed audio
            - original_audio: Original normalized audio
            - perturbed_audios_by_iteration: Dict mapping (codec_name, iteration) -> perturbed audio
        """
        print(f"\n{'='*80}")
        print(f"Starting experiment for: {audio_path.name}")
        print(f"Testing codecs: {', '.join(codecs_to_test)}")
        print(f"{'='*80}\n")

        # Step 1: Normalize audio
        print("Step 1: Normalizing audio...")
        original_audio, sr = self.normalizer.normalize(audio_path)
        print(f"  Loaded audio: {len(original_audio)/sr:.2f}s, {sr} Hz")

        # Get or generate reference transcript
        if reference_transcript is None:
            print("  Generating reference transcript...")
            reference_transcript = self.asr_baseline.transcribe(original_audio, sr)
        print(f"  Reference: '{reference_transcript}'")

        # Compute baseline metrics
        print("\nStep 2: Computing baseline metrics...")
        baseline_metrics = self.metrics_computer.compute_baseline_metrics(
            original_audio, reference_transcript, sr
        )
        print(f"  Baseline WER: {baseline_metrics['wer']:.3f}")
        print(f"  Baseline CER: {baseline_metrics['cer']:.3f}")

        # Step 2: Detect codec (for LLM context)
        print("\nStep 3: Detecting codec...")
        codec_info = self.codec_detector.detect(audio_path)
        print(f"  Codec: {codec_info['codec_name']}")
        print(f"  Bitrate: {codec_info['bitrate_kbps']} kbps")

        # Main experiment: Loop over codecs first, then iterations
        all_results = []
        final_perturbed_audios = {}  # Track final perturbed audio for each codec
        perturbed_audios_by_iteration = {}  # Track perturbed audio for each (codec, iteration)

        for codec_name in codecs_to_test:
            print(f"\n{'#'*80}")
            print(f"# Testing Codec: {codec_name.upper()}")
            print(f"{'#'*80}\n")

            # Reset feedback for each codec
            previous_feedback = None
            final_perturbed_audio = None

            for iteration in range(1, max_iterations + 1):
                print(f"\n{'='*80}")
                print(f"Codec: {codec_name.upper()} | Iteration {iteration}/{max_iterations}")
                print(f"{'='*80}\n")

                # Show feedback from previous iteration (if any)
                if previous_feedback:
                    print("üìã Feedback from previous iteration:")
                    print("-" * 80)
                    print(previous_feedback)
                    print("-" * 80)
                    print()

                # Step 3: Generate strategy from LLM
                print("Step 4: Generating perturbation strategy from LLM...")
                # Update codec_info to reflect current codec being tested
                current_codec_info = codec_info.copy()
                current_codec_info['target_codec'] = codec_name
                strategy = self.llm_orchestrator.generate_strategy(
                    current_codec_info, CODECS, previous_feedback, iteration
                )
                print(f"  Strategy: {strategy.name}")
                print(f"  Family: {strategy.family}")
                print(f"  Optimizer: {strategy.optimizer}")

                # Step 4: Apply perturbation
                print("\nStep 5: Applying perturbation...")
                perturbed_audio = self.perturbation_executor.apply_perturbation(
                    original_audio, strategy, sr
                )
                print(f"  Perturbation applied: L‚àû={np.max(np.abs(perturbed_audio - original_audio)):.6f}")

                # Step 5: Compress to single codec (no chains)
                print(f"\nStep 6: Compressing to {codec_name.upper()} codec...")
                # Randomly select bitrate for this codec
                bitrate = np.random.choice(CODECS[codec_name]["bitrates"])
                print(f"  Using bitrate: {bitrate} kbps")

                compressed_audio, codec_metadata = self.perturbation_executor.apply_single_codec(
                    perturbed_audio, codec_name, bitrate, ARTIFACTS_DIR, sr
                )

                if not codec_metadata.get("success", False):
                    print(f"  ‚ö†Ô∏è  Codec compression failed, using perturbed audio directly")
                    compressed_audio = perturbed_audio.copy()
                else:
                    print(f"  ‚úì Successfully compressed and decoded")

                # Step 6: Pass compressed audio to ASR (Whisper)
                print(f"\nStep 7: Running ASR on compressed audio...")
                compressed_transcript = self.asr_baseline.transcribe(compressed_audio, sr)
                print(f"  Compressed transcript: '{compressed_transcript}'")

                # Step 7: Compute metrics on compressed audio
                print("\nStep 8: Computing metrics on compressed audio...")
                compressed_metrics = self.metrics_computer.compute_all_metrics(
                    original_audio, compressed_audio, reference_transcript, sr
                )
                delta_metrics = self.metrics_computer.compute_delta_metrics(
                    baseline_metrics, compressed_metrics
                )

                print(f"  WER: {compressed_metrics['wer']:.3f} (Œî: {delta_metrics['wer_delta']:+.3f})")
                print(f"  CER: {compressed_metrics['cer']:.3f} (Œî: {delta_metrics['cer_delta']:+.3f})")
                print(f"  PESQ: {compressed_metrics['pesq']:.2f}")
                print(f"  STOI: {compressed_metrics['stoi']:.3f}")
                print(f"  SNR: {compressed_metrics['snr']:.2f} dB")

                # Store result for this iteration
                eot_results = [{
                    **codec_metadata,
                    **compressed_metrics,
                    **delta_metrics
                }]

                # Create result
                result = ExperimentResult(
                    audio_file=str(audio_path),
                    strategy_name=strategy.name,
                    iteration=iteration,
                    baseline_metrics=baseline_metrics,
                    perturbed_metrics=compressed_metrics,  # Use compressed metrics
                    eot_results=eot_results,
                    delta_metrics=delta_metrics,
                    codec_info={**codec_info, "target_codec": codec_name},
                    strategy=asdict(strategy)
                )
                all_results.append(result)

                # Step 8: Generate feedback for next iteration
                print("\nStep 9: Generating feedback summary...")
                feedback_data = [{
                    "strategy_name": strategy.name,
                    "wer_delta": delta_metrics["wer_delta"],
                    "cer_delta": delta_metrics["cer_delta"],
                    "pesq": compressed_metrics["pesq"],
                    "stoi": compressed_metrics["stoi"],
                    "snr": compressed_metrics["snr"],
                    "codec": codec_name
                }]

                previous_feedback = self.llm_orchestrator.generate_feedback_summary(
                    feedback_data, top_k=1
                )
                print(f"\nüìä Feedback summary for next iteration:")
                print("=" * 80)
                print(previous_feedback)
                print("=" * 80)

                # Track perturbed audio for this iteration
                perturbed_audios_by_iteration[(codec_name, iteration)] = perturbed_audio.copy()
                
                # Track final perturbed audio for this codec (will be saved after all iterations)
                final_perturbed_audio = perturbed_audio

                # Check if we've achieved good results
                if delta_metrics["wer_delta"] > 0.3 and compressed_metrics["pesq"] >= MIN_PESQ:
                    print(f"\n‚úì Good results achieved! WER delta: {delta_metrics['wer_delta']:.3f}")
                    break

            # Store final perturbed audio for this codec
            final_perturbed_audios[codec_name] = final_perturbed_audio
            print(f"\n‚úì Completed {codec_name.upper()} codec testing")

        # Return all results, final perturbed audios, and per-iteration perturbed audios
        return all_results, final_perturbed_audios, original_audio, perturbed_audios_by_iteration

    def save_results(self, results: List[ExperimentResult], output_path: Path):
        """Save experiment results to JSON."""
        results_dict = [asdict(r) for r in results]
        with open(output_path, 'w') as f:
            json.dump(results_dict, f, indent=2, default=str)
        print(f"\nResults saved to: {output_path}")

    def save_best_perturbed_audio(
        self,
        original_audio: np.ndarray,
        best_info: Dict[str, Any],
        perturbed_audios_by_iteration: Dict[Tuple[str, int], np.ndarray],
        audio_name: str,
        output_dir: Path,
        sr: int = TARGET_SR
    ):
        """Save the best perturbed audio file based on best iteration."""
        if best_info is None:
            print("  No best iteration found, skipping audio save.")
            return

        codec_name = best_info.get('codec')
        best_iteration = best_info.get('iteration')
        
        if codec_name is None or best_iteration is None:
            print("  Missing codec or iteration info, skipping audio save.")
            return

        # Get the perturbed audio for the best iteration
        key = (codec_name, best_iteration)
        if key not in perturbed_audios_by_iteration:
            print(f"  Perturbed audio not found for {codec_name} iteration {best_iteration}, skipping save.")
            return

        perturbed_audio = perturbed_audios_by_iteration[key]

        artifact_dir = output_dir / "artifacts" / "best"
        artifact_dir.mkdir(parents=True, exist_ok=True)

        # Save original (once)
        orig_path = artifact_dir / f"{audio_name}_original.wav"
        sf.write(str(orig_path), original_audio, sr)
        print(f"  Original audio saved to: {orig_path}")

        # Save best perturbed audio
        pert_path = artifact_dir / f"{audio_name}_perturbed_best_{codec_name}_iter{best_iteration}.wav"
        sf.write(str(pert_path), perturbed_audio, sr)
        print(f"  Best perturbed audio ({codec_name}, iteration {best_iteration}) saved to: {pert_path}")

    def save_final_perturbed_audio(
        self,
        original_audio: np.ndarray,
        perturbed_audios: Dict[str, np.ndarray],
        audio_name: str,
        output_dir: Path,
        sr: int = TARGET_SR
    ):
        """Save final perturbed audio files after all iterations (one per codec)."""
        if not perturbed_audios:
            print("  No perturbed audio to save.")
            return

        artifact_dir = output_dir / "artifacts" / "final"
        artifact_dir.mkdir(parents=True, exist_ok=True)

        # Save original (once)
        orig_path = artifact_dir / f"{audio_name}_original.wav"
        sf.write(str(orig_path), original_audio, sr)
        print(f"  Original audio saved to: {orig_path}")

        # Save final perturbed audio for each codec
        for codec_name, perturbed_audio in perturbed_audios.items():
            if perturbed_audio is not None:
                pert_path = artifact_dir / f"{audio_name}_perturbed_{codec_name}_final.wav"
                sf.write(str(pert_path), perturbed_audio, sr)
                print(f"  Final perturbed audio ({codec_name}) saved to: {pert_path}")

    def find_best_iteration_per_codec(
        self,
        results: List[ExperimentResult],
        min_pesq: float = MIN_PESQ,
        min_stoi: float = MIN_STOI,
        min_snr: float = MIN_SNR
    ) -> Dict[str, Optional[Dict[str, Any]]]:
        """Find the best iteration for each codec that meets all constraints.

        Args:
            results: List of ExperimentResult from experiment
            min_pesq: Minimum PESQ threshold
            min_stoi: Minimum STOI threshold
            min_snr: Minimum SNR threshold

        Returns:
            Dict mapping codec_name -> best iteration info (or None if no iteration meets constraints)
        """
        # Group results by codec
        results_by_codec = {}
        for result in results:
            codec = result.codec_info.get('target_codec', 'unknown')
            if codec not in results_by_codec:
                results_by_codec[codec] = []
            results_by_codec[codec].append(result)

        # Find best iteration for each codec
        best_per_codec = {}
        for codec, codec_results in results_by_codec.items():
            best_result = None
            best_wer_delta = -float('inf')

            for result in codec_results:
                metrics = result.perturbed_metrics
                pesq_val = metrics.get('pesq', 0)
                stoi_val = metrics.get('stoi', 0)
                snr_val = metrics.get('snr', -float('inf'))

                # Check if meets all constraints
                meets_constraints = (
                    pesq_val >= min_pesq and
                    stoi_val >= min_stoi and
                    snr_val >= min_snr
                )

                if meets_constraints:
                    wer_delta = result.delta_metrics.get('wer_delta', 0)
                    # Select based on highest WER delta
                    if wer_delta > best_wer_delta:
                        best_wer_delta = wer_delta
                        best_result = result

            # Store best result info for this codec
            if best_result:
                best_per_codec[codec] = {
                    'codec': codec,
                    'iteration': best_result.iteration,
                    'strategy': best_result.strategy_name,
                    'wer': best_result.perturbed_metrics.get('wer', 0),
                    'cer': best_result.perturbed_metrics.get('cer', 0),
                    'stoi': best_result.perturbed_metrics.get('stoi', 0),
                    'pesq': best_result.perturbed_metrics.get('pesq', 0),
                    'snr': best_result.perturbed_metrics.get('snr', 0),
                    'wer_delta': best_result.delta_metrics.get('wer_delta', 0),
                    'cer_delta': best_result.delta_metrics.get('cer_delta', 0)
                }
            else:
                best_per_codec[codec] = None

        return best_per_codec

    def save_artifacts(
        self,
        original_audio: np.ndarray,
        perturbed_audio: np.ndarray,
        audio_name: str,
        strategy_name: str,
        iteration: int,
        sr: int = TARGET_SR
    ):
        """Save audio artifacts."""
        artifact_dir = ARTIFACTS_DIR / audio_name / f"iter_{iteration}"
        artifact_dir.mkdir(parents=True, exist_ok=True)

        # Save original
        orig_path = artifact_dir / "original.wav"
        sf.write(str(orig_path), original_audio, sr)

        # Save perturbed
        pert_path = artifact_dir / f"perturbed_{strategy_name}.wav"
        sf.write(str(pert_path), perturbed_audio, sr)

        print(f"  Artifacts saved to: {artifact_dir}")


# Initialize main orchestrator
orchestrator = AgentOrchestrator(
    normalizer=normalizer,
    asr_baseline=asr_baseline,
    codec_detector=codec_detector,
    codec_stack=codec_stack,
    llm_orchestrator=llm_orchestrator,
    perturbation_executor=perturbation_executor,
    metrics_computer=metrics_computer
)

print("Main orchestrator initialized!")



Main orchestrator initialized!


In [None]:
# Batch processing for 10 audio files from each category
SIGNAL_CATEGORIES = ["long-signals", "medium-signals", "short-signals"]
SAMPLES_PER_CATEGORY = 10
BASE_PATH = Path("/content/drive/MyDrive/adversarial-audio/Normal-Examples")

# Batch results collector
batch_results = []

print(f"{'='*80}")
print("STARTING BATCH PROCESSING")
print(f"{'='*80}\n")

# Process each category
for category in SIGNAL_CATEGORIES:
    print(f"\n{'#'*80}")
    print(f"# Processing Category: {category.upper()}")
    print(f"{'#'*80}\n")

    category_path = BASE_PATH / category
    if not category_path.exists():
        print(f"‚ö†Ô∏è Category path not found: {category_path}")
        continue

    # Get first 10 audio files (sorted)
    audio_files = sorted(list(category_path.glob("*.wav")))[:SAMPLES_PER_CATEGORY]

    if len(audio_files) == 0:
        print(f"‚ö†Ô∏è No audio files found in {category_path}")
        continue

    print(f"Found {len(audio_files)} audio files to process\n")

    # Process each audio file
    for file_idx, audio_file in enumerate(audio_files, 1):
        print(f"\n{'='*80}")
        print(f"[{file_idx}/{len(audio_files)}] Processing: {audio_file.name}")
        print(f"Category: {category}")
        print(f"{'='*80}\n")

        try:
            # Run experiment with AMR-WB only
            results, final_perturbed_audios, original_audio, perturbed_audios_by_iteration = orchestrator.run_experiment(
                audio_file,
                reference_transcript=None,
                max_iterations=MAX_ITERATIONS,
                codecs_to_test=["amr-wb"]
            )

            # Find best iteration for each codec
            best_iterations = orchestrator.find_best_iteration_per_codec(
                results,
                min_pesq=MIN_PESQ,
                min_stoi=MIN_STOI,
                min_snr=MIN_SNR
            )

            # Add entries for each codec's best iteration and save best perturbed audio
            for codec_name, best_info in best_iterations.items():
                if best_info is not None:
                    batch_results.append({
                        "original_audio_file": audio_file.name,
                        "category": category,
                        "codec": codec_name,
                        "best_iteration": best_info['iteration'],
                        "strategy_used": best_info['strategy'],
                        "wer": best_info['wer'],
                        "cer": best_info['cer'],
                        "stoi": best_info['stoi'],
                        "pesq": best_info['pesq'],
                        "snr": best_info['snr'],
                        "wer_delta": best_info['wer_delta'],
                        "cer_delta": best_info['cer_delta']
                    })
                    print(f"\n‚úì Best {codec_name.upper()} iteration: {best_info['iteration']}")
                    print(f"  Strategy: {best_info['strategy']}")
                    print(f"  WER: {best_info['wer']:.3f} (Œî: {best_info['wer_delta']:+.3f})")
                    print(f"  PESQ: {best_info['pesq']:.2f}, STOI: {best_info['stoi']:.3f}, SNR: {best_info['snr']:.2f} dB")
                    
                    # Save best perturbed audio for this codec
                    print(f"\nüíæ Saving best perturbed audio...")
                    orchestrator.save_best_perturbed_audio(
                        original_audio,
                        best_info,
                        perturbed_audios_by_iteration,
                        audio_file.stem,
                        OUTPUT_DIR,
                        TARGET_SR
                    )
                else:
                    print(f"\n‚ö†Ô∏è No {codec_name.upper()} iteration met constraints")

            # Save individual experiment results
            results_path = RESULTS_DIR / f"experiment_{audio_file.stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            orchestrator.save_results(results, results_path)

        except Exception as e:
            print(f"\n‚ùå Error processing {audio_file.name}: {e}")
            import traceback
            traceback.print_exc()
            continue

    print(f"\n‚úì Completed category: {category}")

# Save consolidated batch results
batch_results_path = RESULTS_DIR / f"batch_results_all_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(batch_results_path, 'w') as f:
    json.dump(batch_results, f, indent=2, default=str)

print(f"\n{'='*80}")
print("BATCH PROCESSING COMPLETE")
print(f"{'='*80}")
print(f"\nConsolidated results saved to: {batch_results_path}")

# Print summary statistics
total_files = len(SIGNAL_CATEGORIES) * SAMPLES_PER_CATEGORY
total_entries = len(batch_results)
print(f"\nSummary:")
print(f"  Total audio files processed: {total_files}")
print(f"  Total successful best iterations: {total_entries}")
print(f"  Expected entries (1 per file for AMR-WB): {total_files}")

# Statistics by category
for category in SIGNAL_CATEGORIES:
    category_entries = [r for r in batch_results if r['category'] == category]
    print(f"\n{category}:")
    print(f"  Total best iterations: {len(category_entries)}")
    amrwb_count = len([r for r in category_entries if r['codec'] == 'amr-wb'])
    print(f"  AMR-WB: {amrwb_count}")

print(f"\n{'='*80}")



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
   - Codec: opus


Failure modes (1 strategies):
- Low WER increase despite perturbation
- Constraint violations (PESQ/STOI too low)
- Codec-specific robustness issues


Codec: OPUS | Iteration 3/5

üìã Feedback from previous iteration:
--------------------------------------------------------------------------------
Top 1 strategies:

1. default_narrowband_noise:
   - WER delta: 0.000
   - CER delta: 0.000
   - PESQ: 3.52
   - STOI: 0.999
   - SNR: 31.53 dB
   - Codec: opus


Failure modes (1 strategies):
- Low WER increase despite perturbation
- Constraint violations (PESQ/STOI too low)
- Codec-specific robustness issues

--------------------------------------------------------------------------------

Step 4: Generating perturbation strategy from LLM...
  Strategy: opus_amrwb_psychoacoustic_freq_mod
  Family: psychoacoustic
  Optimizer: black_box

Step 5: Applying perturbation...
  ‚úì Quality constraints met after 6 s

In [None]:
# Simple test: Single audio example - Attack + AMR-WB compression
# This demonstrates the attack on a single file and saves results for comparison

print("="*80)
print("SINGLE AUDIO TEST: Attack + AMR-WB Compression")
print("="*80)

# Select a single audio file for testing
TEST_AUDIO_PATH = Path("/content/drive/MyDrive/adversarial-audio/Normal-Examples/short-signals/sample-053322.wav")

if not TEST_AUDIO_PATH.exists():
    # Try alternative path
    TEST_AUDIO_PATH = Path("/content/drive/MyDrive/adversarial-audio/Normal-Examples/medium-signals/sample-070169.wav")

print(f"\nüìÅ Test audio file: {TEST_AUDIO_PATH.name}")
print(f"   Path: {TEST_AUDIO_PATH}")

# Create output directory for test
TEST_OUTPUT_DIR = OUTPUT_DIR / "single_test"
TEST_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Step 1: Normalize and load audio
print("\n" + "="*80)
print("Step 1: Loading and normalizing audio...")
print("="*80)
original_audio, sr = normalizer.normalize(TEST_AUDIO_PATH)
print(f"‚úì Loaded: {len(original_audio)/sr:.2f}s, {sr} Hz")

# Step 2: Get reference transcript
print("\n" + "="*80)
print("Step 2: Generating reference transcript...")
print("="*80)
reference_transcript = asr_baseline.transcribe(original_audio, sr)
print(f"‚úì Reference transcript: '{reference_transcript}'")

# Step 3: Generate a perturbation strategy
print("\n" + "="*80)
print("Step 3: Generating perturbation strategy...")
print("="*80)
codec_info = codec_detector.detect(TEST_AUDIO_PATH)
codec_info['target_codec'] = 'amr-wb'
strategy = llm_orchestrator.generate_strategy(codec_info, CODECS, None, 1)
print(f"‚úì Strategy: {strategy.name}")
print(f"  Family: {strategy.family}")

# Step 4: Apply perturbation
print("\n" + "="*80)
print("Step 4: Applying perturbation...")
print("="*80)
perturbed_audio = perturbation_executor.apply_perturbation(original_audio, strategy, sr)
perturbation_magnitude = np.max(np.abs(perturbed_audio - original_audio))
print(f"‚úì Perturbation applied: L‚àû={perturbation_magnitude:.6f}")

# Step 5: Compress with AMR-WB
print("\n" + "="*80)
print("Step 5: Compressing with AMR-WB...")
print("="*80)
bitrate = 15.85  # Use a mid-range bitrate
compressed_audio, codec_metadata = perturbation_executor.apply_single_codec(
    perturbed_audio, 'amr-wb', bitrate, TEST_OUTPUT_DIR, sr
)
print(f"‚úì Compressed with AMR-WB at {bitrate} kbps")
print(f"  Success: {codec_metadata.get('success', False)}")

# Step 6: Compute metrics
print("\n" + "="*80)
print("Step 6: Computing metrics...")
print("="*80)
metrics = metrics_computer.compute_all_metrics(original_audio, compressed_audio, reference_transcript, sr)
print(f"‚úì Metrics computed:")
print(f"  WER: {metrics['wer']:.4f}")
print(f"  CER: {metrics['cer']:.4f}")
print(f"  PESQ: {metrics['pesq']:.2f}")
print(f"  STOI: {metrics['stoi']:.3f}")
print(f"  SNR: {metrics['snr']:.2f} dB")

# Step 7: Save all versions for comparison
print("\n" + "="*80)
print("Step 7: Saving audio files for comparison...")
print("="*80)

audio_name = TEST_AUDIO_PATH.stem

# Save original
orig_path = TEST_OUTPUT_DIR / f"{audio_name}_original.wav"
sf.write(str(orig_path), original_audio, sr)
print(f"‚úì Original saved: {orig_path}")

# Save perturbed (before compression)
pert_path = TEST_OUTPUT_DIR / f"{audio_name}_perturbed.wav"
sf.write(str(pert_path), perturbed_audio, sr)
print(f"‚úì Perturbed (pre-compression) saved: {pert_path}")

# Save compressed (after AMR-WB)
compressed_path = TEST_OUTPUT_DIR / f"{audio_name}_compressed_amrwb_{bitrate}kbps.wav"
sf.write(str(compressed_path), compressed_audio, sr)
print(f"‚úì Compressed (AMR-WB) saved: {compressed_path}")

print("\n" + "="*80)
print("TEST COMPLETE!")
print("="*80)
print(f"\nüìä Summary:")
print(f"  Original transcript: '{reference_transcript}'")
print(f"  Compressed transcript: '{metrics.get('perturbed_transcript', 'N/A')}'")
print(f"  WER: {metrics['wer']:.4f}")
print(f"  PESQ: {metrics['pesq']:.2f} ({'‚úì Meets threshold' if metrics['pesq'] >= 3.5 else '‚úó Below threshold'})")
print(f"  STOI: {metrics['stoi']:.3f} ({'‚úì Meets threshold' if metrics['stoi'] >= 0.85 else '‚úó Below threshold'})")
print(f"  SNR: {metrics['snr']:.2f} dB ({'‚úì Meets threshold' if metrics['snr'] >= 20.0 else '‚úó Below threshold'})")
print(f"\nüíæ Files saved to: {TEST_OUTPUT_DIR}")
print(f"  - {orig_path.name} (original)")
print(f"  - {pert_path.name} (perturbed, before compression)")
print(f"  - {compressed_path.name} (compressed with AMR-WB)")

