In [4]:
# api_testing_notebook.ipynb
"""
Interactive Whisper API Testing Notebook

This notebook helps you:
1. Test the API with various audio files
2. Understand performance characteristics
3. Test concurrency and error handling
4. Explore different models and configurations
"""

'\nInteractive Whisper API Testing Notebook\n\nThis notebook helps you:\n1. Test the API with various audio files\n2. Understand performance characteristics\n3. Test concurrency and error handling\n4. Explore different models and configurations\n'

## Whisper API Testing & Exploration

 
## This notebook provides hands-on testing of your Whisper API to understand:
## - How different audio formats perform
## - Response times for various file sizes
## - Concurrency behavior
## - Error handling
## - Memory usage patterns

## 1. Setup and Health Check

In [5]:
"""
One-cell smoke + stress test for Whisper API.

- Autodetect API base URL from .env or by probing ports (8000/8001).
- Health check, warm-up request, and concurrent burst with safe retries.
- Prints a concise summary (200 vs 503 vs failures).
"""

from __future__ import annotations

import io
import json
import math
import os
import time
import wave
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import requests
from requests.adapters import HTTPAdapter


# --------------------------- Config / Auto-detect -----------------------------

def read_port_from_env(env_path: Path) -> Optional[int]:
    """Return PORT from .env as int if present and valid."""
    if not env_path.is_file():
        return None
    for line in env_path.read_text(encoding="utf-8").splitlines():
        s = line.strip()
        if not s or s.startswith("#") or not s.startswith("PORT="):
            continue
        raw = s.split("=", 1)[1]
        raw = raw.split("#", 1)[0].strip().strip('"').strip("'")
        try:
            return int(raw)
        except ValueError:
            return None
    return None


def probe_base_url(ports: List[int], timeout_s: float = 2.0) -> Optional[str]:
    """Return first base URL whose /health responds 200."""
    for p in ports:
        base = f"http://localhost:{p}"
        try:
            r = requests.get(f"{base}/health", timeout=timeout_s)
            if r.status_code == 200:
                return base
        except requests.RequestException:
            pass
    return None


repo_root = Path.cwd()
env_port = read_port_from_env(repo_root / ".env")
candidate_ports: List[int] = []
if env_port:
    candidate_ports.append(env_port)
for common in (8000, 8001):
    if common not in candidate_ports:
        candidate_ports.append(common)

API_BASE_URL = (
    probe_base_url(candidate_ports) or f"http://localhost:{env_port or 8000}"
)
TRANSCRIBE_URL = f"{API_BASE_URL}/v1/transcribe"
HEALTH_URL = f"{API_BASE_URL}/health"
METRICS_URL = f"{API_BASE_URL}/metrics"

RAPIDAPI_HEADERS = {
    "X-RapidAPI-Proxy-Secret": os.getenv("RAPIDAPI_PROXY_SECRET", ""),
    "X-RapidAPI-User": "test-user@example.com",
}

print("API candidates:", ", ".join(f"http://localhost:{p}" for p in candidate_ports))
print("Selected base: ", API_BASE_URL)


# ------------------------------- HTTP helpers --------------------------------

def build_session(pool_maxsize: int = 32) -> requests.Session:
    """Return a Session with increased connection pool and no implicit retries."""
    session = requests.Session()
    adapter = HTTPAdapter(pool_connections=pool_maxsize, pool_maxsize=pool_maxsize)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


HTTP_SESSION = build_session()


def check_api_health(timeout_s: float = 5.0) -> bool:
    """Print health info; return True if API responds with 200."""
    try:
        resp = requests.get(HEALTH_URL, timeout=timeout_s)
    except requests.RequestException as exc:
        print("❌ Cannot connect to API. Start with: ./start.sh start")
        print(f"   Details: {exc}")
        return False

    if resp.status_code != 200:
        print(f"❌ /health returned HTTP {resp.status_code}")
        print(f"   Body: {resp.text[:300]}")
        return False

    try:
        data = resp.json()
    except json.JSONDecodeError:
        print("❌ /health did not return JSON")
        print(f"   Body: {resp.text[:300]}")
        return False

    mem = data.get("memory_percent")
    tasks = data.get("active_tasks")
    print("✅ API is healthy!")
    if mem is not None:
        print(f"   Memory Usage: {float(mem):.1f}% (system RAM)")
    if tasks is not None:
        print(f"   Active Tasks: {int(tasks)}")
    return True


# ------------------------------ Audio synthesis ------------------------------

def create_sine_wav(
    duration_s: float = 2.0,
    frequency_hz: float = 440.0,
    sample_rate: int = 16_000,
    amplitude: float = 0.2,
) -> bytes:
    """Return mono 16-bit PCM WAV sine tone as bytes."""
    if duration_s <= 0:
        raise ValueError("duration_s must be > 0")
    if sample_rate <= 0:
        raise ValueError("sample_rate must be > 0")

    t = np.linspace(0, duration_s, int(sample_rate * duration_s), False)
    wave_data = amplitude * np.sin(2 * math.pi * frequency_hz * t)
    pcm16 = np.int16(np.clip(wave_data, -1.0, 1.0) * 32767)

    buf = io.BytesIO()
    with wave.open(buf, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)  # 16-bit
        wf.setframerate(sample_rate)
        wf.writeframes(pcm16.tobytes())
    return buf.getvalue()


# ------------------------ Single request with retries ------------------------

def _build_files_payload(audio_bytes: bytes, filename: str) -> Dict[str, tuple]:
    """Return fresh files payload per attempt (do not reuse BytesIO)."""
    return {"file": (filename, io.BytesIO(audio_bytes), "audio/wav")}


def transcribe_audio(
    audio_bytes: bytes,
    filename: str = "audio.wav",
    use_rapidapi: bool = False,
    timeout_s: float = 120.0,
    max_attempts: int = 2,
) -> Dict[str, Any]:
    """POST WAV to /v1/transcribe and return result dict."""
    headers: Dict[str, str] = {"Connection": "close"}
    if use_rapidapi:
        headers.update(RAPIDAPI_HEADERS)

    attempt = 0
    while True:
        attempt += 1
        files = _build_files_payload(audio_bytes, filename)
        t0 = time.time()
        try:
            resp = HTTP_SESSION.post(
                TRANSCRIBE_URL, files=files, headers=headers, timeout=timeout_s
            )
            latency = time.time() - t0
        except requests.RequestException as exc:
            if attempt < max_attempts:
                time.sleep(0.2 * attempt)
                continue
            return {
                "status_code": None,
                "ok": False,
                "latency_s": None,
                "error": f"request-failed: {exc}",
            }

        if resp.status_code == 200:
            try:
                payload = resp.json()
            except ValueError:
                payload = {"text": None, "raw": resp.text[:2000]}
            return {
                "status_code": 200,
                "ok": True,
                "latency_s": latency,
                "response_json": payload,
            }

        if resp.status_code == 503 and attempt < max_attempts:
            retry_after = resp.headers.get("Retry-After")
            try:
                sleep_s = float(retry_after)
            except (TypeError, ValueError):
                sleep_s = min(0.5 * attempt, 2.0)
            time.sleep(sleep_s)
            continue

        out: Dict[str, Any] = {
            "status_code": resp.status_code,
            "ok": False,
            "latency_s": latency,
            "retry_after": resp.headers.get("Retry-After"),
        }
        try:
            out["response_json"] = resp.json()
        except ValueError:
            out["response_text"] = resp.text[:2000]
        return out


# ---------------------------- Concurrency test -------------------------------

def run_concurrency_test(num_requests: int = 6, stagger_s: float = 0.02) -> None:
    """Send num_requests concurrent posts with a tiny stagger and summarize."""
    import concurrent.futures  # local import to keep top clean

    print(f"Sending {num_requests} concurrent requests...")

    payloads: List[Tuple[str, bytes]] = []
    for i in range(num_requests):
        freq = 440 + (i * 100)
        wav = create_sine_wav(duration_s=2.0, frequency_hz=freq)
        payloads.append((f"concurrent_{i}.wav", wav))

    results: List[Dict[str, Any]] = []
    t0 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as pool:
        futures = {}
        for name, audio in payloads:
            time.sleep(stagger_s)
            futures[pool.submit(transcribe_audio, audio, name)] = name

        for fut in concurrent.futures.as_completed(futures):
            name = futures[fut]
            try:
                res = fut.result()
            except Exception as exc:  # rare; print and mark failed
                res = {
                    "status_code": None,
                    "ok": False,
                    "latency_s": None,
                    "error": f"unexpected-exception: {exc}",
                }
            res["filename"] = name
            results.append(res)

    elapsed = time.time() - t0
    ok = [r for r in results if r.get("status_code") == 200]
    lim = [r for r in results if r.get("status_code") == 503]
    oth = [
        r for r in results
        if r.get("status_code") not in (200, 503)
        and r.get("status_code") is not None
    ]
    fail = [r for r in results if r.get("status_code") is None]

    print(f"\nCompleted in {elapsed:.2f} s")
    print(f"  Successful (200): {len(ok)}")
    print(f"  Rejected  (503):  {len(lim)}")
    print(f"  Other HTTP errs:  {len(oth)}")
    print(f"  Failed requests:  {len(fail)}")

    # Peek a couple to help debug quickly
    def peek(label: str, items: List[Dict[str, Any]], n: int = 2) -> None:
        print(f"\n{label} (up to {n}):")
        for r in items[:n]:
            print(
                f"  • {r.get('filename')}: "
                f"status={r.get('status_code')} "
                f"latency={r.get('latency_s')}"
            )
            if "error" in r:
                print(f"    error: {r['error']}")
            elif "response_json" in r:
                preview = json.dumps(r["response_json"])[:160]
                print(f"    json: {preview}...")
            else:
                text = (r.get("response_text") or "")[:160]
                print(f"    text: {text}...")

    peek("Successful", ok)
    peek("Rejected (503)", lim)
    peek("Other HTTP errors", oth)
    peek("Failed requests", fail)


# ------------------------------- Run the test --------------------------------

print(time.strftime("Testing at: %Y-%m-%d %H:%M:%S"))
if not check_api_health():
    raise SystemExit("API not reachable. Start with: ./start.sh start")

# Warm-up: small 1s tone to trigger model/caches
warm = transcribe_audio(create_sine_wav(1.0, 500.0), "warmup.wav")
print("Warm-up:", warm.get("status_code"), f"{warm.get('latency_s'):.3f}s")

# Concurrency: with MAX_CONCURRENT_JOBS=4, expect ~4x 200 and ~2x 503 for 6 req
run_concurrency_test(num_requests=6, stagger_s=0.02)

API candidates: http://localhost:8000, http://localhost:8001
Selected base:  http://localhost:8000
Testing at: 2025-08-27 22:49:30
✅ API is healthy!
   Memory Usage: 32.4% (system RAM)
   Active Tasks: 0
Warm-up: 200 0.508s
Sending 6 concurrent requests...

Completed in 3.12 s
  Successful (200): 6
  Rejected  (503):  0
  Other HTTP errs:  0
  Failed requests:  0

Successful (up to 2):
  • concurrent_0.wav: status=200 latency=0.5552840232849121
    json: {"text": ""}...
  • concurrent_1.wav: status=200 latency=1.037302017211914
    json: {"text": "Thank you."}...

Rejected (503) (up to 2):

Other HTTP errors (up to 2):

Failed requests (up to 2):


# 2) Audio synthesis utility

In [None]:
"""
Utilities to create small synthetic WAV files for testing.
"""

import numpy as np


def create_synthetic_audio(
    duration_seconds: float = 2.0,
    frequency: float = 440.0,
    sample_rate: int = 16_000,
    amplitude: float = 0.2,
) -> bytes:
    """Create a mono 16-bit PCM WAV sine wave and return raw bytes."""
    if duration_seconds <= 0:
        raise ValueError("duration_seconds must be > 0")
    if sample_rate <= 0:
        raise ValueError("sample_rate must be > 0")

    t = np.linspace(0, duration_seconds, int(sample_rate * duration_seconds), False)
    wave_data = amplitude * np.sin(2 * math.pi * frequency * t)

    # Convert to 16-bit PCM
    pcm16 = np.int16(np.clip(wave_data, -1.0, 1.0) * 32767)

    buffer = io.BytesIO()
    with wave.open(buffer, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)  # 16-bit
        wf.setframerate(sample_rate)
        wf.writeframes(pcm16.tobytes())

    return buffer.getvalue()

In [None]:
import requests

url = "https://whisper-transcription-pro.p.rapidapi.com/v1/transcribe"

files = { "file": "open('LJ001-0001.wav', 'rb')" }
payload = {
	"language": "en",
	"Transcribe an audio file": "{
	
}"
}
headers = {
	"X-RapidAPI-Key": "e127d7231emsh382e834defb667fp1a5d1djsn8b300538ee5f",
	"X-RapidAPI-Host": "whisper-transcription-pro.p.rapidapi.com"
}

response = requests.post(url, data=payload, files=files, headers=headers)

print(response.json())

In [7]:
import requests

url = "https://audio-transcription-speech-to-text-api.p.rapidapi.com/v1/transcribe"

# open the audio file in binary mode
with open("tests/data/ljs/LJ001-0001.wav", "rb") as f:
    files = {"file": f}
    data = {"language": "en"}  # optional, can be omitted for auto-detect

    headers = {
        "X-RapidAPI-Key": "e127d7231emsh382e834defb667fp1a5d1djsn8b300538ee5f",   # keep your real key secret
        "X-RapidAPI-Host": "audio-transcription-speech-to-text-api.p.rapidapi.com"
    }

    response = requests.post(url, headers=headers, files=files, data=data)

print(response.status_code)
print(response.json())


200
{'text': 'Printing, in the only sense with which we are at present concerned, differs from most, if not from all, the arts and crafts represented in the exhibition'}


# 3) Single request helper

In [None]:
"""
HTTP helper to POST an audio file to /v1/transcribe.
"""


def transcribe_audio(
    audio_bytes: bytes,
    filename: str = "audio.wav",
    use_rapidapi: bool = False,
    timeout_s: float = 60.0,
) -> Dict[str, Any]:
    """POST a WAV file to /v1/transcribe and return a result dict."""
    url = TRANSCRIBE_URL
    headers = RAPIDAPI_HEADERS if use_rapidapi else {}

    files = {
        "file": (filename, io.BytesIO(audio_bytes), "audio/wav"),
    }

    t0 = time.time()
    try:
        resp = requests.post(url, files=files, headers=headers, timeout=timeout_s)
        latency = time.time() - t0
    except requests.RequestException as exc:
        return {
            "status_code": None,
            "ok": False,
            "latency_s": None,
            "error": f"request-failed: {exc}",
        }

    result: Dict[str, Any] = {
        "status_code": resp.status_code,
        "ok": resp.ok,
        "latency_s": latency,
        "retry_after": resp.headers.get("Retry-After"),
    }

    # Try to parse JSON, otherwise keep text for debugging.
    try:
        result["response_json"] = resp.json()
    except ValueError:
        result["response_text"] = resp.text[:2000]

    return result

# 4) Concurrency test (fixed)

In [None]:
"""
Concurrency test that creates unique 2s sine WAVs and posts them in parallel.
"""

import concurrent.futures


def test_concurrent_requests(num_requests: int = 6) -> List[Dict[str, Any]]:
    """Fire num_requests concurrent POSTs and summarize outcomes."""
    if num_requests <= 0:
        raise ValueError("num_requests must be > 0")

    print(f"Sending {num_requests} concurrent requests...")

    audio_payloads: List[Tuple[str, bytes]] = []
    for i in range(num_requests):
        freq = 440 + (i * 100)  # make each file unique
        audio = create_synthetic_audio(duration_seconds=2.0, frequency=freq)
        audio_payloads.append((f"concurrent_{i}.wav", audio))

    results: List[Dict[str, Any]] = []
    t0 = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as pool:
        futures = {
            pool.submit(transcribe_audio, audio, filename): filename
            for filename, audio in audio_payloads
        }
        for fut in concurrent.futures.as_completed(futures):
            name = futures[fut]
            try:
                res = fut.result()
            except requests.RequestException as exc:
                res = {
                    "status_code": None,
                    "ok": False,
                    "latency_s": None,
                    "error": f"request-exception: {exc}",
                }
            except Exception as exc:
                res = {
                    "status_code": None,
                    "ok": False,
                    "latency_s": None,
                    "error": f"unexpected-exception: {exc}",
                }
            res["filename"] = name
            results.append(res)

    total_time = time.time() - t0

    # Aggregate
    successful = [r for r in results if r.get("status_code") == 200]
    rejected_503 = [r for r in results if r.get("status_code") == 503]
    others = [
        r
        for r in results
        if r.get("status_code") not in (200, 503) and r.get("status_code") is not None
    ]
    failed_requests = [r for r in results if r.get("status_code") is None]

    print(f"\nCompleted in {total_time:.2f} seconds")
    print(f"  Successful:     {len(successful)}")
    print(f"  Rejected (503): {len(rejected_503)}")
    print(f"  Other errors:   {len(others)}")
    print(f"  Failed req:     {len(failed_requests)}")

    if rejected_503:
        print("  Note: 503s are expected when exceeding MAX_CONCURRENT_JOBS.")
        retry_after_vals = {r.get('retry_after') for r in rejected_503}
        print(f"  Retry-After values seen: {sorted(v for v in retry_after_vals if v)}")

    # Show a couple of examples for quick debugging
    def _peek(label: str, items: List[Dict[str, Any]], n: int = 2) -> None:
        print(f"\n{label} (showing up to {n}):")
        for r in items[:n]:
            print(
                f"  • {r.get('filename')}: "
                f"status={r.get('status_code')} "
                f"latency={r.get('latency_s')}"
            )
            if "error" in r:
                print(f"    error: {r['error']}")
            elif "response_json" in r:
                # Show just the beginning to avoid noise
                preview = json.dumps(r["response_json"])[:160]
                print(f"    json: {preview}...")
            else:
                print(f"    text: {r.get('response_text','')[:160]}...")

    _peek("Successful", successful)
    _peek("Rejected (503)", rejected_503)
    _peek("Other errors", others)
    _peek("Failed requests", failed_requests)

    return results

In [None]:
def transcribe_audio(
    audio_bytes: bytes, 
    filename: str = "test.wav",
    headers: Dict = None
) -> Dict[str, Any]:
    """Send audio to API for transcription."""
    files = {"file": (filename, io.BytesIO(audio_bytes), "audio/wav")}
    
    start_time = time.time()
    try:
        response = requests.post(
            TRANSCRIBE_URL, 
            files=files, 
            headers=headers or {},
            timeout=60
        )
        elapsed = time.time() - start_time
        
        result = {
            "status_code": response.status_code,
            "elapsed_seconds": elapsed,
            "size_kb": len(audio_bytes) / 1024,
            "kb_per_second": (len(audio_bytes) / 1024) / elapsed if elapsed > 0 else 0
        }
        
        if response.status_code == 200:
            result["text"] = response.json().get("text", "")
            result["text_length"] = len(result["text"])
        else:
            result["error"] = response.text
            
        return result
        
    except requests.exceptions.Timeout:
        return {"error": "Request timed out", "elapsed_seconds": time.time() - start_time}
    except Exception as e:
        return {"error": str(e), "elapsed_seconds": time.time() - start_time}




# 5) Run the tests

In [None]:
if not api_is_running:
    raise SystemExit("API is not running; start it first: ./start.sh start")

# Warm-up: a single quick request so the model/cache initializes
warmup_audio = create_synthetic_audio(duration_seconds=1.0, frequency=500.0)
warmup = transcribe_audio(warmup_audio, "warmup.wav")
print("Warm-up:", warmup.get("status_code"), warmup.get("latency_s"))

# Concurrency: with MAX_CONCURRENT_JOBS=4, expect 4x 200 and ~2x 503 for 6 req
concurrent_results = test_concurrent_requests(6)

In [None]:
"""
HTTP session configuration for stable concurrent posting.
"""

from __future__ import annotations

import time
from typing import Any, Dict, Optional

import requests
from requests.adapters import HTTPAdapter
from urllib3.util import Retry


def build_session(pool_maxsize: int = 16, total_retries: int = 0) -> requests.Session:
    """Create a configured Session with a larger connection pool."""
    session = requests.Session()

    # Retries disabled here; we'll implement our own around the call
    retries = Retry(
        total=total_retries,
        backoff_factor=0,  # we handle backoff ourselves
        status_forcelist=(),
        allowed_methods=False,
        raise_on_status=False,
        raise_on_redirect=False,
    )

    adapter = HTTPAdapter(
        pool_connections=pool_maxsize,
        pool_maxsize=pool_maxsize,
        max_retries=retries,
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


HTTP_SESSION = build_session(pool_maxsize=32, total_retries=0)

In [None]:
"""
Robust POST to /v1/transcribe with retry/backoff and no keep-alive.
"""

import io
import json


def transcribe_audio(
    audio_bytes: bytes,
    filename: str = "audio.wav",
    use_rapidapi: bool = False,
    timeout_s: float = 120.0,
    max_attempts: int = 3,
) -> Dict[str, Any]:
    """POST a WAV file to /v1/transcribe and return a result dict."""
    url = TRANSCRIBE_URL
    base_headers: Dict[str, str] = {"Connection": "close"}
    if use_rapidapi:
        base_headers.update(RAPIDAPI_HEADERS)

    files = {
        "file": (filename, io.BytesIO(audio_bytes), "audio/wav"),
    }

    attempt = 0
    start_overall = time.time()

    while True:
        attempt += 1
        t0 = time.time()
        try:
            resp = HTTP_SESSION.post(
                url,
                files=files,
                headers=base_headers,
                timeout=timeout_s,
            )
            latency = time.time() - t0
        except requests.exceptions.RequestException as exc:
            # Retry on low-level connection issues.
            if attempt < max_attempts:
                sleep_s = min(0.2 * attempt, 1.0)
                time.sleep(sleep_s)
                continue
            return {
                "status_code": None,
                "ok": False,
                "latency_s": None,
                "error": f"request-failed: {exc}",
            }

        # Successful HTTP response
        if resp.status_code == 200:
            try:
                payload = resp.json()
            except ValueError:
                payload = {"text": None, "raw": resp.text[:2000]}
            return {
                "status_code": 200,
                "ok": True,
                "latency_s": latency,
                "response_json": payload,
            }

        # Respect 503 Retry-After if present
        if resp.status_code == 503 and attempt < max_attempts:
            retry_after = resp.headers.get("Retry-After")
            try:
                sleep_s = float(retry_after)
            except (TypeError, ValueError):
                sleep_s = min(0.5 * attempt, 2.0)
            time.sleep(sleep_s)
            continue

        # Other non-200s: return details for debugging
        result: Dict[str, Any] = {
            "status_code": resp.status_code,
            "ok": False,
            "latency_s": latency,
            "retry_after": resp.headers.get("Retry-After"),
        }
        try:
            result["response_json"] = resp.json()
        except ValueError:
            result["response_text"] = resp.text[:2000]
        return result

In [None]:
"""
Fire num_requests concurrent POSTs; small stagger to avoid socket thrash.
"""

import concurrent.futures
from typing import List, Tuple


def test_concurrent_requests(num_requests: int = 6) -> List[Dict[str, Any]]:
    """Send num_requests concurrent requests and summarize outcomes."""
    if num_requests <= 0:
        raise ValueError("num_requests must be > 0")

    print(f"Sending {num_requests} concurrent requests...")

    audio_payloads: List[Tuple[str, bytes]] = []
    for i in range(num_requests):
        freq = 440 + (i * 100)
        audio = create_synthetic_audio(duration_seconds=2.0, frequency=freq)
        audio_payloads.append((f"concurrent_{i}.wav", audio))

    results: List[Dict[str, Any]] = []
    t0 = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as pool:
        futures = {}
        for idx, (filename, audio) in enumerate(audio_payloads):
            # Small stagger reduces chance of RemoteDisconnected with keep-alives.
            time.sleep(0.02)  # 20 ms
            fut = pool.submit(transcribe_audio, audio, filename)
            futures[fut] = filename

        for fut in concurrent.futures.as_completed(futures):
            name = futures[fut]
            try:
                res = fut.result()
            except Exception as exc:
                res = {
                    "status_code": None,
                    "ok": False,
                    "latency_s": None,
                    "error": f"unexpected-exception: {exc}",
                }
            res["filename"] = name
            results.append(res)

    total_time = time.time() - t0

    # Aggregate
    successful = [r for r in results if r.get("status_code") == 200]
    rejected_503 = [r for r in results if r.get("status_code") == 503]
    others = [
        r
        for r in results
        if r.get("status_code") not in (200, 503) and r.get("status_code") is not None
    ]
    failed_requests = [r for r in results if r.get("status_code") is None]

    print(f"\nCompleted in {total_time:.2f} seconds")
    print(f"  Successful:     {len(successful)}")
    print(f"  Rejected (503): {len(rejected_503)}")
    print(f"  Other errors:   {len(others)}")
    print(f"  Failed req:     {len(failed_requests)}")

    if rejected_503:
        print("  Note: 503s are expected when exceeding MAX_CONCURRENT_JOBS.")
        retry_vals = {r.get("retry_after") for r in rejected_503}
        retry_vals = [v for v in retry_vals if v]
        if retry_vals:
            print(f"  Retry-After values seen: {sorted(retry_vals)}")

    # Peek a couple for quick inspection
    def _peek(label: str, items: List[Dict[str, Any]], n: int = 2) -> None:
        print(f"\n{label} (up to {n}):")
        for r in items[:n]:
            print(
                f"  • {r.get('filename')}: "
                f"status={r.get('status_code')} "
                f"latency={r.get('latency_s')}"
            )
            if "error" in r:
                print(f"    error: {r['error']}")
            elif "response_json" in r:
                preview = json.dumps(r["response_json"])[:160]
                print(f"    json: {preview}...")
            else:
                print(f"    text: {r.get('response_text', '')[:160]}...")

    _peek("Successful", successful)
    _peek("Rejected (503)", rejected_503)
    _peek("Other errors", others)
    _peek("Failed requests", failed_requests)

    return results

In [None]:
# Re-check health
api_is_running = check_api_health()
if not api_is_running:
    raise SystemExit("API not running; start with ./start.sh start")

# Warm-up so models/caches are ready
warmup_audio = create_synthetic_audio(duration_seconds=1.0, frequency=500.0)
warmup = transcribe_audio(warmup_audio, "warmup.wav")
print("Warm-up:", warmup.get("status_code"), warmup.get("latency_s"))

# Now concurrency (expect ~4 OK + ~2 503 if MAX_CONCURRENT_JOBS=4)
concurrent_results = test_concurrent_requests(6)

## 4. Concurrent Request Testing


In [None]:
def test_concurrent_requests(num_requests: int = 6):
    """Test how the API handles multiple concurrent requests."""
    print(f"Sending {num_requests} concurrent requests...")
    
    # Create different audio files for each request
    audio_files = []
    for i in range(num_requests):
        # Different frequencies to create unique files
        freq = 440 + (i * 100)
        audio = create_synthetic_audio(
            duration_seconds=2.0,
            frequency=freq,
            filename=f"concurrent_{i}.wav"
        )
        audio_files.append((f"concurrent_{i}.wav", audio))
    
    results = []
    start_time = time.time()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_requests) as executor:
        # Submit all requests at once
        futures = {
            executor.submit(transcribe_audio, audio, filename): filename
            for filename, audio in audio_files
        }
        
        # Collect results as they complete
        for future in concurrent.futures.as_completed(futures):
            filename = futures[future]
            result = future.result()
            result["filename"] = filename
            results.append(result)
    
    total_time = time.time() - start_time
    
    # Analyze results
    successful = [r for r in results if r.get("status_code") == 200]
    failed_503 = [r for r in results if r.get("status_code") == 503]
    other_errors = [r for r in results if r.get("status_code") not in [200, 503]]
    
    print(f"\nCompleted in {total_time:.2f} seconds")
    print(f"  Successful: {len(successful)}")
    print(f"  Rejected (503): {len(failed_503)}")
    print(f"  Other errors: {len(other_errors)}")
    
    if failed_503:
        print("\n  Note: 503 errors are expected when exceeding MAX_CONCURRENT_JOBS")
    
    return results

In [None]:
# Test with default concurrency limit (should be 4)
concurrent_results = test_concurrent_requests(6)

## 5. Test Different Audio Formats


In [None]:
def test_real_audio_files():
    """Test with real audio files if available."""
    test_paths = [
        Path("tests/data/ljs/LJ001-0001.wav"),
        Path("test_data/sample.mp3"),
        Path("audio.wav"),
    ]
    
    found_files = []
    for path in test_paths:
        if path.exists():
            found_files.append(path)
    
    if not found_files:
        print("No real audio files found. Create some in test_data/ to test.")
        return
    
    print(f"Found {len(found_files)} real audio files:\n")
    
    for audio_path in found_files:
        print(f"Testing: {audio_path}")
        audio_bytes = audio_path.read_bytes()
        
        result = transcribe_audio(
            audio_bytes, 
            audio_path.name
        )
        
        if result.get("status_code") == 200:
            print(f"  ✅ Transcribed in {result['elapsed_seconds']:.2f}s")
            print(f"     Text: '{result['text'][:100]}...'")
        else:
            print(f"  ❌ Failed: {result.get('error')}")
        print()

In [None]:
test_real_audio_files()

## 6. Memory and Performance Monitoring

In [None]:
# %%
def monitor_api_performance():
    """Monitor API metrics over multiple requests."""
    print("Monitoring API performance...\n")
    
    # Get initial metrics
    metrics_before = requests.get(METRICS_URL).json()
    health_before = requests.get(HEALTH_URL).json()
    
    print("Initial State:")
    print(f"  Total Requests: {metrics_before['total_requests']}")
    print(f"  Memory Usage: {health_before['memory_percent']:.1f}%")
    print()
    
    # Make several requests
    print("Making 5 test requests...")
    for i in range(5):
        audio = create_synthetic_audio(duration_seconds=1.0)
        result = transcribe_audio(audio, f"perf_test_{i}.wav")
        print(f"  Request {i+1}: {result['elapsed_seconds']:.2f}s")
        time.sleep(0.5)  # Small delay between requests
    
    # Get final metrics
    metrics_after = requests.get(METRICS_URL).json()
    health_after = requests.get(HEALTH_URL).json()
    
    print("\nFinal State:")
    print(f"  Total Requests: {metrics_after['total_requests']}")
    print(f"  Accepted: {metrics_after['accepted_requests']}")
    print(f"  Rejected: {metrics_after['rejected_requests']}")
    print(f"  Avg Processing Time: {metrics_after['avg_processing_time_ms']:.1f}ms")
    print(f"  Memory Usage: {health_after['memory_percent']:.1f}%")
    
    # Calculate changes
    requests_processed = metrics_after['total_requests'] - metrics_before['total_requests']
    memory_change = health_after['memory_percent'] - health_before['memory_percent']
    
    print(f"\nChanges:")
    print(f"  Requests Processed: {requests_processed}")
    print(f"  Memory Change: {memory_change:+.1f}%")

In [None]:
monitor_api_performance()

## 7. Error Handling Tests

In [None]:
def test_error_handling():
    """Test various error conditions."""
    print("Testing error handling:\n")
    
    # Test 1: Oversized file
    print("1. Testing oversized file (>200MB)...")
    huge_data = b"x" * (201 * 1024 * 1024)  # 201MB
    result = transcribe_audio(huge_data, "huge.wav")
    expected = 413
    actual = result.get("status_code", "no status")
    print(f"   Expected: {expected}, Got: {actual}")
    print(f"   {'✅ PASS' if actual == expected else '❌ FAIL'}")
    print()
    
    # Test 2: Wrong file type
    print("2. Testing wrong content type...")
    files = {"file": ("test.txt", io.BytesIO(b"not audio"), "text/plain")}
    response = requests.post(TRANSCRIBE_URL, files=files)
    expected = 415
    actual = response.status_code
    print(f"   Expected: {expected}, Got: {actual}")
    print(f"   {'✅ PASS' if actual == expected else '❌ FAIL'}")
    print()
    
    # Test 3: Memory pressure (simulate)
    print("3. Checking memory threshold behavior...")
    health = requests.get(HEALTH_URL).json()
    print(f"   Current memory: {health['memory_percent']:.1f}%")
    print(f"   Threshold: 85.0%")
    if health['memory_percent'] > 85:
        print("   ⚠️  Memory is high, requests may be rejected")
    else:
        print("   ✅ Memory is healthy")

In [None]:
test_error_handling()

## 8. Configuration Testing


In [None]:
# %%
def test_with_different_models():
    """Test performance with different model sizes."""
    models = [
        "mlx-community/whisper-tiny-mlx",
        "mlx-community/whisper-base-mlx",
        "mlx-community/whisper-large-v3-mlx",
    ]
    
    print("Testing different models (requires .env changes and restart):\n")
    print("Current model from .env:", os.getenv("MODEL_NAME", "unknown"))
    print("\nTo test different models:")
    for model in models:
        print(f"  1. Set MODEL_NAME={model}")
        print(f"  2. Restart API: ./start.sh restart")
        print(f"  3. Run this cell again")
    print("\nModel size vs speed tradeoff:")
    print("  tiny:  ~39MB, fastest, least accurate")
    print("  base:  ~74MB, balanced")
    print("  large: ~1.5GB, slowest, most accurate")

In [None]:
test_with_different_models()

## 9. RapidAPI Testing (When Ready)

In [None]:
# %%
def test_rapidapi_auth():
    """Test RapidAPI authentication headers."""
    print("Testing RapidAPI authentication:\n")
    
    # Check if RapidAPI secret is configured
    rapidapi_secret = os.getenv("RAPIDAPI_PROXY_SECRET", "")
    if not rapidapi_secret:
        print("⚠️  RAPIDAPI_PROXY_SECRET not set in environment")
        print("   Set it when ready for RapidAPI deployment")
        return
    
    print("Testing with RapidAPI headers...")
    audio = create_synthetic_audio(duration_seconds=1.0)
    
    # Test without headers (should work now, fail in production)
    result_no_auth = transcribe_audio(audio, "no_auth.wav")
    print(f"Without auth: {result_no_auth.get('status_code', 'error')}")
    
    # Test with headers
    result_with_auth = transcribe_audio(audio, "with_auth.wav", RAPIDAPI_HEADERS)
    print(f"With auth: {result_with_auth.get('status_code', 'error')}")
    
    print("\nNote: Auth will be enforced when REQUIRE_AUTH=true in production")

In [None]:
test_rapidapi_auth()