# Optimization: Model Migration
----

This notebook focuses on **Workload Optimization** for model migration (e.g., GPT-4.x ‚Üí GPT-5.x) using the **Azure OpenAI Responses API**.

You will learn:
- What changes when migrating to GPT-5.x (quality/latency/cost/reasoning controls)
- Responses API parameter differences (e.g., `max_tokens` is not used)
- Migration checklist (regions, model availability, deployment naming, caching, evaluation)
- Weighted routing (canary rollout) across two models and how to analyze the results

## Table of contents

- [Why Model Migration Needs a Plan](#why-model-migration-needs-a-plan)
- [Migration Checklist (GPT-4.x ‚Üí GPT-5.x)](#migration-checklist-gpt-4x--gpt-5x)
- [Responses API Migration Notes](#responses-api-migration-notes)
- [Region / Model Availability Checks](#region--model-availability-checks)
- [Example: APIM Weighted Routing + Analysis](#example-apim-weighted-routing--analysis)
- [Wrap-up](#wrap-up)

## Why Model Migration Needs a Plan

Model migration is not just a ‚Äúdrop-in replacement‚Äù. Even when an answer looks correct, the system-level behavior can change:

- **Output shape**: verbosity, formatting, instruction-following fidelity
- **Latency profile**: time-to-first-token (streaming) and tail latency
- **Cost profile**: tokenization differences + caching behavior + pricing deltas
- **Reasoning controls**: GPT-5.x introduces explicit reasoning controls (`reasoning.effort`)
- **Availability**: model versions and SKUs vary by **region** and **deployment type**

A practical approach is to run a controlled migration: **evaluate ‚Üí canary ‚Üí ramp ‚Üí finalize**, with routing weights and measurable acceptance criteria.

## Migration Checklist (GPT-4.x ‚Üí GPT-5.x)

Use this checklist before you switch production traffic.

### 1) Inventory & Compatibility

- Confirm your current **deployment names**, **API version**, and any gateway/proxy policies (APIM, custom router).
- Identify prompts that rely on ‚Äúquirks‚Äù (formatting, strict JSON, special tokens) and add contract tests.
- Decide whether you need **streaming** and measure **TTFT** separately from total latency.

### 2) Parameter & API Differences (High impact)

- If you migrate to **Responses API**, you typically use:
  - `instructions` + `input` (instead of Chat Completions `messages`)
  - `max_output_tokens` (instead of `max_tokens`)
  - `reasoning: { effort: ... }` for GPT-5.x reasoning control
- If you keep Chat Completions, expect some parameter names and behaviors to differ by model family and SDK version.

### 3) Region / SKU / Model Availability

- **Do not assume** the same model/version exists in every region.
- Verify the target model is available in your region and SKU (GlobalStandard / Standard / PTU, etc.).
- If you use multi-region routing, validate cross-region latency and compliance requirements.

### 4) Evaluation & Rollout

- Run an evaluation set representative of your workload: accuracy, refusal rate, formatting compliance, safety constraints.
- Start with a **canary weight** (e.g., 95/5), then ramp (80/20 ‚Üí 50/50 ‚Üí 0/100) based on metrics.
- Monitor: error rate (429/5xx), latency percentiles, cache hit %, and cost/request.

## Responses API Migration Notes

If you are moving from Chat Completions to Responses API, the most common ‚Äúgotchas‚Äù are parameter names and payload shape changes.

### Common Mapping

| Legacy (Chat Completions) | Responses API | Notes |
|---|---|---|
| `messages=[{role, content}]` | `instructions=...` + `input=...` | Split system vs user content |
| `max_tokens` | `max_output_tokens` | Responses API uses output token cap |
| `stream=True` | `stream=True` | Streaming event types differ |
| `temperature`, `top_p` | `temperature`, `top_p` | Still exist, but defaults/behavior can vary by model |
| (N/A) | `reasoning={effort: ...}` | GPT-5.x reasoning control (optional) |

### Important Notes

- If your code still passes `max_tokens`, it will be ignored or error depending on SDK/version. Use `max_output_tokens`.
- GPT-5.x may behave differently with the same prompt; treat it as a **behavioral migration** and re-run evaluations.
- Prompt caching in Azure requires a sufficiently large stable prefix; align your `instructions` / prefix for cache eligibility.

In [None]:
# Environment setup and imports
import os
import sys
import time
import json
import random
import statistics
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from dotenv import load_dotenv

load_dotenv(override=True)

# Verify required packages
try:
    from openai import AzureOpenAI
    print("‚úÖ Azure OpenAI package imported successfully")
except ImportError:
    print("‚ùå ERROR: openai package not installed.")
    print("   Run: pip install openai>=1.60.0")
    raise

# Load Foundry project settings (created by 0_setup/1_setup.ipynb)
config_file = "../0_setup/.foundry_config.json"
try:
    with open(config_file, "r", encoding="utf-8") as f:
        config = json.load(f)
    print(f"‚úÖ Loaded settings from '{config_file}'")
except FileNotFoundError:
    print(f"‚ö†Ô∏è Could not find '{config_file}'.")
    print("üí° Run 0_setup/1_setup.ipynb first to create it.")
    config = {}

AZURE_OPENAI_ENDPOINT = os.environ.get("AZURE_OPENAI_ENDPOINT", "")
AZURE_OPENAI_API_KEY = os.environ.get("AZURE_OPENAI_API_KEY", "")
AZURE_OPENAI_API_VERSION = "2025-04-01-preview"

print(f"\nüìå Azure OpenAI Endpoint: {AZURE_OPENAI_ENDPOINT[:50]}..." if AZURE_OPENAI_ENDPOINT else "‚ö†Ô∏è AZURE_OPENAI_ENDPOINT not set")
print(f"üìå API Key: {'‚úÖ Set' if AZURE_OPENAI_API_KEY else '‚ö†Ô∏è Not set'}")

client: Optional[AzureOpenAI] = None
if AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY:
    client = AzureOpenAI(
        azure_endpoint=AZURE_OPENAI_ENDPOINT,
        api_key=AZURE_OPENAI_API_KEY,
        api_version=AZURE_OPENAI_API_VERSION,
    )
    print("‚úÖ Azure OpenAI client initialized")
    print(f"   Base URL: {AZURE_OPENAI_ENDPOINT.rstrip('/')}/openai/v1/")
else:
    print("\n‚ö†Ô∏è Live calls disabled (missing endpoint/key). You can still read the notebook.")

In [28]:
# Pricing (per 1M tokens) - update to match your contract
PRICING: Dict[str, Dict[str, float]] = {
    "gpt-4o": {"input": 2.50, "cached_input": 1.25, "output": 10.00},
    "gpt-5.1": {"input": 1.25, "cached_input": 0.13, "output": 10.00},
}

# Select TWO models for a migration experiment
MODEL_A = {
    "label": "gpt-4o (baseline)",
    "api_model": "gpt-4o",
    "price_key": "gpt-4o",
    "reasoning_effort": None,
}
MODEL_B = {
    "label": "gpt-5.1 (candidate)",
    "api_model": "gpt-5.1",
    "price_key": "gpt-5.1",
    "reasoning_effort": "low",  # none/low/medium/high (only for GPT-5.x)
}

MODELS = [MODEL_A, MODEL_B]
MODEL_LABELS = [m["label"] for m in MODELS]

# Validate price keys exist
for m in MODELS:
    if m["price_key"] not in PRICING:
        raise KeyError(f"Missing PRICING entry for price_key='{m['price_key']}'")

print("üß™ Models Configured")
print("=" * 60)
for m in MODELS:
    print(f"  ‚Ä¢ {m['label']}: api_model={m['api_model']} price_key={m['price_key']} reasoning_effort={m.get('reasoning_effort')}")
print("=" * 60)

üß™ Models Configured
  ‚Ä¢ gpt-4o (baseline): api_model=gpt-4o price_key=gpt-4o reasoning_effort=None
  ‚Ä¢ gpt-5.1 (candidate): api_model=gpt-5.1 price_key=gpt-5.1 reasoning_effort=low


In [29]:
def calculate_cost(price_key: str, input_tokens: int, output_tokens: int, cached_tokens: int = 0) -> float:
    """Calculate request cost in USD using PRICING (per 1M tokens)."""
    pricing = PRICING[price_key]
    uncached_tokens = max(0, input_tokens - cached_tokens)
    return (
        uncached_tokens * pricing["input"]
        + cached_tokens * pricing["cached_input"]
        + output_tokens * pricing["output"]
    ) / 1_000_000


def check_answer(response: str, correct_variants: List[str]) -> bool:
    """Simple keyword-based check (use a stronger eval for production)."""
    text = (response or "").lower().strip()
    return any(v.lower() in text for v in correct_variants)


def call_with_responses_api(
    client: AzureOpenAI,
    api_model: str,
    instructions: str,
    user_input: str,
    *,
    max_output_tokens: int = 200,
    reasoning_effort: Optional[str] = None,
    prompt_cache_key: Optional[str] = None,
    stream: bool = False,
    temperature: Optional[float] = None,
    top_p: Optional[float] = None,
    ) -> Dict[str, Any]:
    """
    Wrapper for Azure OpenAI Responses API calls.

    Notes:
    - Responses API uses `max_output_tokens` (NOT `max_tokens`).
    - GPT-5.x can accept `reasoning={effort: ...}`.
    - Azure prompt caching can be routed via `extra_body={"prompt_cache_key": ...}`.
    """
    params: Dict[str, Any] = {
        "model": api_model,
        "instructions": instructions,
        "input": user_input,
        "max_output_tokens": max_output_tokens,
    }
    if prompt_cache_key:
        params["extra_body"] = {"prompt_cache_key": prompt_cache_key}
    if reasoning_effort:
        params["reasoning"] = {"effort": reasoning_effort}
    if temperature is not None:
        params["temperature"] = temperature
    if top_p is not None:
        params["top_p"] = top_p
    if stream:
        params["stream"] = True

    start = time.time()
    if not stream:
        response = client.responses.create(**params)
        latency = time.time() - start

        usage = getattr(response, "usage", None)
        input_tokens = getattr(usage, "input_tokens", 0) if usage else 0
        output_tokens = getattr(usage, "output_tokens", 0) if usage else 0
        cached_tokens = 0
        details = getattr(usage, "input_tokens_details", None) if usage else None
        if details:
            cached_tokens = getattr(details, "cached_tokens", 0) or 0

        content = ""
        if getattr(response, "output", None):
            for item in response.output:
                if hasattr(item, "content"):
                    for part in item.content:
                        if hasattr(part, "text") and part.text:
                            content += part.text

        return {
            "success": True,
            "latency": latency,
            "first_token_time": latency,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cached_tokens": cached_tokens,
            "content": content,
            "stream": False,
        }

    # Streaming mode (TTFT included)
    stream_iter = client.responses.create(**params)
    first_token_time: Optional[float] = None
    content = ""
    input_tokens = 0
    output_tokens = 0
    cached_tokens = 0

    for event in stream_iter:
        if first_token_time is None and hasattr(event, "type"):
            if event.type in [
                "response.output_item.added",
                "response.content_part.added",
                "response.output_text.delta",
            ]:
                first_token_time = time.time() - start

        if hasattr(event, "type") and event.type == "response.output_text.delta":
            delta = getattr(event, "delta", None)
            if delta:
                content += delta

        if hasattr(event, "type") and event.type == "response.completed":
            resp = getattr(event, "response", None)
            usage = getattr(resp, "usage", None) if resp else None
            if usage:
                input_tokens = getattr(usage, "input_tokens", 0)
                output_tokens = getattr(usage, "output_tokens", 0)
                details = getattr(usage, "input_tokens_details", None)
                if details:
                    cached_tokens = getattr(details, "cached_tokens", 0) or 0

    latency = time.time() - start
    return {
        "success": True,
        "latency": latency,
        "first_token_time": first_token_time or latency,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "cached_tokens": cached_tokens,
        "content": content,
        "stream": True,
    }


print("‚úÖ Helpers ready: calculate_cost / check_answer / call_with_responses_api")

‚úÖ Helpers ready: calculate_cost / check_answer / call_with_responses_api


### Code Call Differences: GPT-4.x vs GPT-5.x

In practice, the biggest code-level changes are:

- **Responses API output cap**: use `max_output_tokens` (not `max_tokens`)
- **GPT-5.x reasoning**: optionally set `reasoning={"effort": "low|medium|high"}`
- **Deployment naming**: on Azure, `model=...` is often the deployment name (not the base model family name)

The cell below shows a minimal side-by-side call pattern.

In [30]:
# Minimal side-by-side call example (safe by default)
enable_live_calls = bool(globals().get("ENABLE_LIVE_CALLS", False))
instructions = globals().get(
    "INSTRUCTIONS",
    "You are a helpful assistant. Answer concisely and directly. "
    "For questions requiring a specific format, follow the format exactly.",
 )
max_output_tokens = int(globals().get("MAX_OUTPUT_TOKENS", 120))
prompt_cache_key = globals().get("PROMPT_CACHE_KEY", "migration_canary_v1")

if not client:
    print("‚ö†Ô∏è Client not initialized; skipping live example.")
elif not enable_live_calls:
    print("‚ÑπÔ∏è Set ENABLE_LIVE_CALLS=True (defined later in this notebook) to run live calls.")
else:
    question = "Explain this code: def f(n): return n if n<=1 else f(n-1)+f(n-2). Answer in 2 sentences."
    print("\nüß™ GPT-4.x-style (baseline) call")
    res_a = call_with_responses_api(
        client=client,
        api_model=MODEL_A["api_model"],
        instructions=instructions,
        user_input=question,
        max_output_tokens=max_output_tokens,
        prompt_cache_key=prompt_cache_key,
        stream=False,
    )
    print(f"  latency={res_a['latency']:.3f}s input={res_a['input_tokens']} output={res_a['output_tokens']}")
    print(f"  content={res_a['content'][:120]}...")

    print("\nüß™ GPT-5.x-style (candidate) call with reasoning control")
    res_b = call_with_responses_api(
        client=client,
        api_model=MODEL_B["api_model"],
        instructions=instructions,
        user_input=question,
        max_output_tokens=max_output_tokens,
        reasoning_effort=MODEL_B.get("reasoning_effort"),
        prompt_cache_key=prompt_cache_key,
        stream=False,
    )
    print(f"  latency={res_b['latency']:.3f}s input={res_b['input_tokens']} output={res_b['output_tokens']}")
    print(f"  content={res_b['content'][:120]}...")

‚ÑπÔ∏è Set ENABLE_LIVE_CALLS=True (defined later in this notebook) to run live calls.


## Region / Model Availability Checks

A frequent migration blocker is **availability**: the target GPT-5.x model (and the specific version/SKU) may not be available in the same region as your existing GPT-4.x deployment.

### What to verify

- Your Azure OpenAI / Foundry resource **location (region)**
- The target model **is supported in that region** and in your intended **SKU**
- You have a deployment created for the target model, and your code uses the correct **deployment name** (often passed as `model=...`)

Below is a helper cell that prints your endpoint and (optionally) queries Azure if you provide resource info.

In [None]:
import shutil
import subprocess

print("üîé Endpoint")
print("=" * 60)
print(AZURE_OPENAI_ENDPOINT or "(not set)")
print("=" * 60)

# Optional: if you know the Azure resource name and resource group, we can query its region via Azure CLI.
# Set these (env vars or inline) to enable the checks below.
AZURE_OPENAI_RESOURCE_GROUP = os.environ.get("AZURE_OPENAI_RESOURCE_GROUP", "")
AZURE_OPENAI_RESOURCE_NAME = os.environ.get("AZURE_OPENAI_RESOURCE_NAME", "")

if not shutil.which("az"):
    print("‚ö†Ô∏è Azure CLI not found in PATH. Install/login if you want CLI-based checks.")
elif not AZURE_OPENAI_RESOURCE_GROUP or not AZURE_OPENAI_RESOURCE_NAME:
    print("‚ÑπÔ∏è To query region via Azure CLI, set:")
    print("  export AZURE_OPENAI_RESOURCE_GROUP='...'")
    print("  export AZURE_OPENAI_RESOURCE_NAME='...'")
else:
    print("‚úÖ Azure CLI detected, querying resource...")
    cmd = [
        "az",
        "cognitiveservices",
        "account",
        "show",
        "-g",
        AZURE_OPENAI_RESOURCE_GROUP,
        "-n",
        AZURE_OPENAI_RESOURCE_NAME,
        "-o",
        "json",
    ]
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        print("‚ùå Failed to query resource via Azure CLI:")
        print(p.stderr.strip() or p.stdout.strip())
    else:
        data = json.loads(p.stdout)
        print(f"üìç Resource location: {data.get('location')}")
        print(f"üè∑Ô∏è  Kind: {data.get('kind')}")
        print(f"‚úÖ Provisioning state: {data.get('properties', {}).get('provisioningState')}")

print("\nüí° Region availability tip:")
print("- Validate model availability by region before rollout (models/SKU differ by region).")
print("- If you use multi-region routing, consider using response headers (e.g., x-ms-region) in gateway tests to confirm backend selection.")

## Example: APIM Weighted Routing + Analysis

The client-side router above is great for quick experiments, but many production migrations prefer **centralized routing** (policy/gateway) so that rollouts don‚Äôt require redeploying every client.

This section provides a minimal **Azure API Management (APIM)** example that:
- Creates (or reuses) an APIM instance
- Configures an API endpoint that **routes traffic by weight** to two Azure OpenAI backends (baseline vs candidate)
- Adds a response header (`x-routed-backend`) so you can **analyze which backend served each request**
- Runs a small load test and summarizes distribution + latency

This is inspired by the AI-Gateway backend pool load-balancing lab, but simplified for a two-model migration scenario.

> Note: APIM provisioning can take a long time depending on SKU/region. The cells below are written so you can **skip creation** and only apply policy to an existing APIM.

In [32]:
# APIM routing configuration (fill these in)
import shutil
import subprocess
import textwrap
from pathlib import Path

# Toggle infra actions
CREATE_APIM = True  # If True, create RG + APIM (can take time)

AZURE_SUBSCRIPTION_ID = os.environ.get("AZURE_SUBSCRIPTION_ID", "")
APIM_LOCATION = os.environ.get("APIM_LOCATION", "eastus")
APIM_RESOURCE_GROUP = os.environ.get("APIM_RESOURCE_GROUP", "rg-model-migration")
APIM_SERVICE_NAME = os.environ.get("APIM_SERVICE_NAME", "apim-model-migration")  # must be globally unique in Azure
APIM_PUBLISHER_EMAIL = os.environ.get("APIM_PUBLISHER_EMAIL", "you@example.com")
APIM_PUBLISHER_NAME = os.environ.get("APIM_PUBLISHER_NAME", "Model Migration Lab")

# API surface exposed by APIM
APIM_API_ID = "migration-router"
APIM_API_PATH = "inference"  # https://{apim}.azure-api.net/inference/...
APIM_OPERATION_ID = "responses"  # POST /responses

# Responses API version (same family as used elsewhere in this repo)
RESPONSES_API_VERSION = "2025-04-01-preview"

# Backend A (baseline) and Backend B (candidate)
# Provide TWO backend endpoints and keys so APIM can route between them.
BACKEND_A = {
    "label": MODEL_A["label"],
    "endpoint": os.environ.get("BACKEND_A_AZURE_OPENAI_ENDPOINT", ""),  # e.g., https://xxx.openai.azure.com
    "api_key": os.environ.get("BACKEND_A_AZURE_OPENAI_API_KEY", ""),
    "deployment": os.environ.get("BACKEND_A_DEPLOYMENT", MODEL_A["api_model"]),
}
BACKEND_B = {
    "label": MODEL_B["label"],
    "endpoint": os.environ.get("BACKEND_B_AZURE_OPENAI_ENDPOINT", ""),
    "api_key": os.environ.get("BACKEND_B_AZURE_OPENAI_API_KEY", ""),
    "deployment": os.environ.get("BACKEND_B_DEPLOYMENT", MODEL_B["api_model"]),
}

# Weight split (baseline vs candidate)
APIM_WEIGHTS = {
    "A": int(os.environ.get("APIM_WEIGHT_A", "90")),
    "B": int(os.environ.get("APIM_WEIGHT_B", "10")),
}

# Optional: APIM subscription key (if your APIM requires it)
APIM_SUBSCRIPTION_KEY = os.environ.get("APIM_SUBSCRIPTION_KEY", "")

def require_nonempty(name: str, value: str) -> None:
    if not value:
        raise ValueError(f"Missing required value: {name}")

def run_az(args: List[str]) -> str:
    cmd = ["az"] + args
    print("\n$ " + " ".join(cmd))
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        raise RuntimeError((p.stderr or p.stdout).strip())
    return p.stdout.strip()

def maybe_set_subscription() -> None:
    sub = (AZURE_SUBSCRIPTION_ID or "").strip()
    if sub:
        run_az(["account", "set", "--subscription", sub])

def ensure_resource_group() -> None:
    exists = run_az(["group", "exists", "-n", APIM_RESOURCE_GROUP]).strip().lower() == "true"
    if exists:
        print(f"‚ÑπÔ∏è Resource group exists: {APIM_RESOURCE_GROUP} (skip create)")
        return
    print(f"üÜï Creating resource group: {APIM_RESOURCE_GROUP} ({APIM_LOCATION})")
    run_az(["group", "create", "-n", APIM_RESOURCE_GROUP, "-l", APIM_LOCATION])

def apim_service_exists() -> bool:
    try:
        # `-o none` keeps output small; command returns non-zero if not found
        run_az(["apim", "show", "-g", APIM_RESOURCE_GROUP, "-n", APIM_SERVICE_NAME, "-o", "none"])
        return True
    except Exception:
        return False

print("üîß APIM config")
print("=" * 100)
print(f"CREATE_APIM={CREATE_APIM}")
print(f"Subscription override: {'set' if AZURE_SUBSCRIPTION_ID else '(use current az context)'}")
print(f"APIM: rg={APIM_RESOURCE_GROUP} name={APIM_SERVICE_NAME} location={APIM_LOCATION}")
print(f"API: https://{APIM_SERVICE_NAME}.azure-api.net/{APIM_API_PATH} (POST /responses)")
print(f"Weights: A={APIM_WEIGHTS['A']} B={APIM_WEIGHTS['B']}")
print("=" * 100)

üîß APIM config
CREATE_APIM=True
Subscription override: set
APIM: rg=rg-model-migration name=apim-model-migration location=eastus
API: https://apim-model-migration.azure-api.net/inference (POST /responses)
Weights: A=90 B=10


In [None]:
# (Optional) Create resource group + APIM (can take a while)
import time as _time

def wait_for_apim_ready(timeout_minutes: int = 60, poll_interval_s: int = 30) -> bool:
    """Poll APIM provisioningState until Succeeded or timeout."""
    deadline = _time.time() + timeout_minutes * 60
    print(f"\n‚è≥ Waiting for APIM provisioning (timeout={timeout_minutes}m, poll every {poll_interval_s}s)...")
    while _time.time() < deadline:
        try:
            out = run_az([
                "apim", "show",
                "-g", APIM_RESOURCE_GROUP,
                "-n", APIM_SERVICE_NAME,
                "--query", "provisioningState",
                "-o", "tsv",
            ])
            state = out.strip()
            print(f"   provisioningState: {state}")
            if state == "Succeeded":
                return True
            if state in ("Failed", "Canceled"):
                print(f"‚ùå APIM provisioning ended with state: {state}")
                return False
        except Exception as e:
            print(f"   (poll error: {e})")
        _time.sleep(poll_interval_s)
    print("‚ùå Timeout waiting for APIM provisioning")
    return False


def get_apim_subscription_key() -> str:
    """
    Retrieve the primary key from the built-in 'all-access' subscription.
    If not found, try 'master' subscription. Returns empty string on failure.
    """
    for sub_name in ["Built-in all-access subscription", "master"]:
        try:
            key = run_az([
                "apim", "subscription", "show",
                "-g", APIM_RESOURCE_GROUP,
                "-n", APIM_SERVICE_NAME,
                "--subscription-id", sub_name,
                "--query", "primaryKey",
                "-o", "tsv",
            ]).strip()
            if key:
                return key
        except Exception:
            pass
    # Fallback: list subscriptions and pick first one with a key
    try:
        keys_json = run_az([
            "apim", "subscription", "list",
            "-g", APIM_RESOURCE_GROUP,
            "-n", APIM_SERVICE_NAME,
            "--query", "[0].primaryKey",
            "-o", "tsv",
        ]).strip()
        if keys_json:
            return keys_json
    except Exception:
        pass
    return ""


if not shutil.which("az"):
    print("‚ö†Ô∏è Azure CLI not found in PATH. Install/login to run APIM steps.")
else:
    maybe_set_subscription()
    ensure_resource_group()

    exists = apim_service_exists()
    if exists:
        print(f"‚ÑπÔ∏è APIM service exists: {APIM_SERVICE_NAME} (skip create)")
    elif not CREATE_APIM:
        print("‚ö†Ô∏è APIM service not found, and CREATE_APIM=False.")
        print("   Either set CREATE_APIM=True or point to an existing APIM via APIM_SERVICE_NAME/APIM_RESOURCE_GROUP.")
    else:
        # SKU note: Developer is common for labs but can still take time to provision.
        # Adjust SKU to your needs (and cost constraints).
        run_az([
            "apim", "create",
            "-g", APIM_RESOURCE_GROUP,
            "-n", APIM_SERVICE_NAME,
            "--location", APIM_LOCATION,
            "--publisher-email", APIM_PUBLISHER_EMAIL,
            "--publisher-name", APIM_PUBLISHER_NAME,
            "--sku-name", "Developer",
            "--no-wait",
        ])

        print("\n‚è≥ APIM provisioning started (Developer SKU can take 30-60 minutes).")
        print("   You can also poll manually with:")
        print(f"   az apim show -g {APIM_RESOURCE_GROUP} -n {APIM_SERVICE_NAME} -o table")

        # Wait for provisioning to complete
        ready = wait_for_apim_ready(timeout_minutes=70, poll_interval_s=60)
        if ready:
            print(f"\n‚úÖ APIM ready: {APIM_SERVICE_NAME}")
        else:
            raise RuntimeError("APIM provisioning did not succeed in time")

    # Retrieve and set APIM subscription key if not already set
    if not APIM_SUBSCRIPTION_KEY:
        print("\nüîë APIM_SUBSCRIPTION_KEY not set; attempting to retrieve from APIM...")
        retrieved_key = get_apim_subscription_key()
        if retrieved_key:
            # Update the global variable and environment
            globals()["APIM_SUBSCRIPTION_KEY"] = retrieved_key
            os.environ["APIM_SUBSCRIPTION_KEY"] = retrieved_key
            print("‚úÖ Retrieved APIM subscription key and set in environment")
            print(f"   Key (first 2 chars): {retrieved_key[:2]}...")
            print("\nüí° To persist, add to your .env file:")
            print(f'   APIM_SUBSCRIPTION_KEY="{retrieved_key}"')
        else:
            print("‚ö†Ô∏è Could not retrieve APIM subscription key automatically.")
            print("   You may need to create a subscription manually or set APIM_SUBSCRIPTION_KEY in .env")
    else:
        print("\nüîë APIM_SUBSCRIPTION_KEY already set")


$ az account set --subscription 3d4d3dd0-79d4-40cf-a94e-b4154812c6ca



$ az group exists -n rg-model-migration
‚ÑπÔ∏è Resource group exists: rg-model-migration (skip create)

$ az apim show -g rg-model-migration -n apim-model-migration -o none
‚ÑπÔ∏è APIM service exists: apim-model-migration (skip create)

üîë APIM_SUBSCRIPTION_KEY already set (first 8 chars): 8bce805a...


In [None]:
# Configure an APIM API + weighted routing policy for Responses API
from xml.sax.saxutils import escape as xml_escape

def get_subscription_id() -> str:
    # Prefer explicit override, else use current az context.
    if (AZURE_SUBSCRIPTION_ID or "").strip():
        return AZURE_SUBSCRIPTION_ID.strip()
    return run_az(["account", "show", "--query", "id", "-o", "tsv"]).strip()

def apply_apim_api_policy(policy_xml: str) -> None:
    """Apply API-level policy. Uses CLI if available, else falls back to az rest."""
    policy_path = Path("apim_migration_policy.xml")
    policy_path.write_text(policy_xml, encoding="utf-8")

    # 1) Try CLI subcommand (may be missing depending on az version/extensions)
    try:
        run_az([
            "apim", "api", "policy", "create",
            "-g", APIM_RESOURCE_GROUP,
            "--service-name", APIM_SERVICE_NAME,
            "--api-id", APIM_API_ID,
            "--xml-content", f"@{policy_path}",
        ])
        print("‚úÖ Policy applied via: az apim api policy create")
        return
    except Exception as e:
        print("‚ÑπÔ∏è az apim api policy create not available or failed; trying az rest fallback")
        print(f"   ({type(e).__name__}) {str(e).splitlines()[-1] if str(e) else str(e)}")

    sub_id = get_subscription_id()
    uri = (
        "https://management.azure.com"
        f"/subscriptions/{sub_id}"
        f"/resourceGroups/{APIM_RESOURCE_GROUP}"
        f"/providers/Microsoft.ApiManagement/service/{APIM_SERVICE_NAME}"
        f"/apis/{APIM_API_ID}"
        "/policies/policy"
        "?api-version=2022-08-01"
    )
    payload = {"properties": {"format": "xml", "value": policy_xml}}
    payload_path = Path("apim_policy_payload.json")
    payload_path.write_text(json.dumps(payload), encoding="utf-8")
    run_az([
        "rest",
        "--method", "put",
        "--uri", uri,
        "--body", f"@{payload_path}",
        "--headers", "Content-Type=application/json",
    ])
    print("‚úÖ Policy applied via: az rest (Management API)")
    print(f"   Policy file: {policy_path}")


# Azure CLI is used to create/configure APIM; this cell is idempotent-ish.
if not shutil.which("az"):
    print("‚ö†Ô∏è Azure CLI not found; cannot configure APIM via CLI.")
else:
    maybe_set_subscription()
    ensure_resource_group()

    if not apim_service_exists():
        print(f"‚ùå APIM service not found: {APIM_SERVICE_NAME} (rg={APIM_RESOURCE_GROUP})")
        print("   Create it first (set CREATE_APIM=True in the previous cell) or point to an existing APIM.")
        raise RuntimeError("APIM service missing")

    # Validate required backend inputs
    require_nonempty("BACKEND_A.endpoint", BACKEND_A["endpoint"])
    require_nonempty("BACKEND_A.api_key", BACKEND_A["api_key"])
    require_nonempty("BACKEND_A.deployment", BACKEND_A["deployment"])
    require_nonempty("BACKEND_B.endpoint", BACKEND_B["endpoint"])
    require_nonempty("BACKEND_B.api_key", BACKEND_B["api_key"])
    require_nonempty("BACKEND_B.deployment", BACKEND_B["deployment"])

    weight_a = int(APIM_WEIGHTS["A"])
    weight_b = int(APIM_WEIGHTS["B"])
    if weight_a < 0 or weight_b < 0 or (weight_a + weight_b) <= 0:
        raise ValueError("APIM_WEIGHTS must be non-negative and sum to > 0")
    threshold = int(round(100.0 * (weight_a / (weight_a + weight_b))))

    # Build APIM policy XML (routes by weight, rewrites to /openai/v1/responses, overrides `model`).
    # Note: policy expressions use Newtonsoft JSON objects (JObject).
    policy_xml = f"""<policies>
  <inbound>
    <base />
    <set-variable name=\"roll\" value=\"@((new System.Random()).Next(0, 100))\" />
    <choose>
      <when condition=\"@(((int)context.Variables[&quot;roll&quot;]) &lt; {threshold})\">
        <set-variable name=\"backendLabel\" value=\"A\" />
        <set-variable name=\"targetModel\" value=\"{xml_escape(BACKEND_A['deployment'])}\" />
        <set-backend-service base-url=\"{xml_escape(BACKEND_A['endpoint'].rstrip('/'))}\" />
        <set-header name=\"api-key\" exists-action=\"override\">
          <value>{xml_escape(BACKEND_A['api_key'])}</value>
        </set-header>
      </when>
      <otherwise>
        <set-variable name=\"backendLabel\" value=\"B\" />
        <set-variable name=\"targetModel\" value=\"{xml_escape(BACKEND_B['deployment'])}\" />
        <set-backend-service base-url=\"{xml_escape(BACKEND_B['endpoint'].rstrip('/'))}\" />
        <set-header name=\"api-key\" exists-action=\"override\">
          <value>{xml_escape(BACKEND_B['api_key'])}</value>
        </set-header>
      </otherwise>
    </choose>

    <set-query-parameter name=\"api-version\" exists-action=\"override\">
      <value>{RESPONSES_API_VERSION}</value>
    </set-query-parameter>

    <rewrite-uri template=\"/openai/responses\" />

    <set-body><![CDATA[
@{{
    var body = context.Request.Body.As<Newtonsoft.Json.Linq.JObject>(preserveContent: true);
    body[\"model\"] = (string)context.Variables[\"targetModel\"];
    return body.ToString(Newtonsoft.Json.Formatting.None);
}}
    ]]></set-body>
  </inbound>

  <backend>
    <base />
  </backend>

  <outbound>
    <base />
    <set-header name=\"x-routed-backend\" exists-action=\"override\">
      <value>@((string)context.Variables.GetValueOrDefault(\"backendLabel\", \"unknown\"))</value>
    </set-header>
  </outbound>

  <on-error>
    <base />
  </on-error>
</policies>
"""

    # Create API (idempotent-ish) and operation, then apply policy
    try:
        run_az([
            "apim", "api", "create",
            "-g", APIM_RESOURCE_GROUP,
            "--service-name", APIM_SERVICE_NAME,
            "--api-id", APIM_API_ID,
            "--path", APIM_API_PATH,
            "--display-name", "Model Migration Router",
            "--protocols", "https",
            "--service-url", "https://example.com",
            "--subscription-required", "false",
        ])
    except Exception as e:
        print(f"‚ÑπÔ∏è API create may have failed (already exists?): {e}")

    try:
        run_az([
            "apim", "api", "operation", "create",
            "-g", APIM_RESOURCE_GROUP,
            "--service-name", APIM_SERVICE_NAME,
            "--api-id", APIM_API_ID,
            "--operation-id", APIM_OPERATION_ID,
            "--display-name", "Responses",
            "--method", "POST",
            "--url-template", "/responses",
        ])
    except Exception as e:
        print(f"‚ÑπÔ∏è Operation create may have failed (already exists?): {e}")

    apply_apim_api_policy(policy_xml)

    # Re-check subscription key from environment (may have been set in APIM creation cell)
    _current_key = os.environ.get("APIM_SUBSCRIPTION_KEY", "") or APIM_SUBSCRIPTION_KEY
    print("\n‚úÖ APIM policy applied")
    print(f"   Endpoint: https://{APIM_SERVICE_NAME}.azure-api.net/{APIM_API_PATH}/responses")
    if _current_key:
        print(f"   Using APIM_SUBSCRIPTION_KEY (first 2 chars): {_current_key[:2]}...")
    else:
        print("   ‚ö†Ô∏è No APIM_SUBSCRIPTION_KEY set; assumes subscription not required")

In [35]:
# Invoke APIM endpoint N times and analyze routing distribution + latency
import time
import requests
from collections import Counter

APIM_BASE_URL = f"https://{APIM_SERVICE_NAME}.azure-api.net"
APIM_URL = f"{APIM_BASE_URL}/{APIM_API_PATH}/responses"

# Re-read subscription key from env (may have been set by APIM creation cell)
_APIM_SUB_KEY = os.environ.get("APIM_SUBSCRIPTION_KEY", "") or APIM_SUBSCRIPTION_KEY
if _APIM_SUB_KEY:
    print(f"üîë Using APIM subscription key...")
else:
    print("‚ö†Ô∏è No APIM subscription key set. Assuming subscription not required on API.")

def percentile(data: list, p: float) -> float:
    """Calculate the p-th percentile of a list of numbers."""
    if not data:
        return 0.0
    sorted_data = sorted(data)
    k = (len(sorted_data) - 1) * (p / 100.0)
    f = int(k)
    c = f + 1 if f + 1 < len(sorted_data) else f
    return sorted_data[f] + (sorted_data[c] - sorted_data[f]) * (k - f)

def apim_post_responses(payload: dict, timeout_s: float = 60.0) -> dict:
    headers = {"Content-Type": "application/json"}
    if _APIM_SUB_KEY:
        headers["Ocp-Apim-Subscription-Key"] = _APIM_SUB_KEY
    t0 = time.perf_counter()
    resp = requests.post(APIM_URL, headers=headers, json=payload, timeout=timeout_s)
    latency_s = time.perf_counter() - t0
    routed = resp.headers.get("x-routed-backend", "unknown")
    
    # Capture error details before raising
    if not resp.ok:
        error_detail = f"{resp.status_code} {resp.reason}: {resp.text[:500]}"
        raise requests.HTTPError(error_detail, response=resp)
    
    data = resp.json()
    return {"latency_s": latency_s, "routed": routed, "response": data}

def extract_text_from_responses_json(data: dict) -> str:
    # Responses API commonly returns: output[0].content[0].text
    try:
        out0 = (data.get("output") or [])[0]
        content0 = (out0.get("content") or [])[0]
        txt = content0.get("text")
        return txt if isinstance(txt, str) else ""
    except Exception:
        return ""

def extract_usage_from_responses_json(data: dict) -> dict:
    usage = data.get("usage") or {}
    input_tokens = int(usage.get("input_tokens") or 0)
    output_tokens = int(usage.get("output_tokens") or 0)
    cached_tokens = 0
    details = usage.get("input_tokens_details") or {}
    if isinstance(details, dict):
        cached_tokens = int(details.get("cached_tokens") or 0)
    return {
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "cached_tokens": cached_tokens,
    }

APIM_TEST_RUNS = int(os.getenv("APIM_TEST_RUNS", "20"))
APIM_TIMEOUT_S = float(os.getenv("APIM_TIMEOUT_S", "60"))

# Use a small fixed prompt to keep costs predictable
payload = {
    "model": "will-be-overridden-by-apim",
    "instructions": "Answer briefly.",
    "input": "Say 'ok' and a random 3-digit number.",
    "max_output_tokens": 50,
}

records = []
failures = 0
for i in range(APIM_TEST_RUNS):
    try:
        result = apim_post_responses(payload, timeout_s=APIM_TIMEOUT_S)
        text = extract_text_from_responses_json(result["response"])
        usage = extract_usage_from_responses_json(result["response"])
        routed = result["routed"]
        model_by_backend = {"A": MODEL_A, "B": MODEL_B}
        model_cfg = model_by_backend.get(routed)
        cost_usd = None
        if model_cfg and "price_key" in model_cfg:
            cost_usd = calculate_cost(
                model_cfg["price_key"],
                usage["input_tokens"],
                usage["output_tokens"],
                usage["cached_tokens"],
            )
        records.append({
            "i": i,
            "routed": routed,
            "latency_s": result["latency_s"],
            "text": text,
            **usage,
            "cost_usd": cost_usd,
        })
    except Exception as e:
        failures += 1
        records.append({"i": i, "routed": "error", "latency_s": None, "text": str(e)})

if not records:
    raise RuntimeError("No APIM calls were recorded")

routed_counts = Counter([r["routed"] for r in records if r.get("routed")])
latencies = [r["latency_s"] for r in records if isinstance(r.get("latency_s"), (int, float))]

print(f"APIM URL: {APIM_URL}")
print(f"Runs: {APIM_TEST_RUNS} | Failures: {failures}")
print("Routing counts:")
for k, v in sorted(routed_counts.items(), key=lambda kv: kv[0]):
    print(f"  {k}: {v} ({v / APIM_TEST_RUNS:.1%})")

if latencies:
    print(f"Latency avg: {sum(latencies)/len(latencies):.3f}s | p50: {percentile(latencies, 50):.3f}s | p95: {percentile(latencies, 95):.3f}s")
else:
    print("No successful latencies to summarize")

# Show error details if any failures occurred
error_records = [r for r in records if r.get("routed") == "error"]
if error_records:
    print(f"\n‚ùå Error details (first 5 of {len(error_records)}):")
    for r in error_records[:5]:
        print(f"  [{r['i']}] {r.get('text', 'unknown error')[:300]}")

üîë Using APIM subscription key...


APIM URL: https://apim-model-migration.azure-api.net/inference/responses
Runs: 10 | Failures: 0
Routing counts:
  A: 8 (80.0%)
  B: 2 (20.0%)
Latency avg: 1.347s | p50: 1.177s | p95: 1.998s


### Notes
- This test is meaningful only if the APIM policy cell ran successfully and your APIM instance is reachable.
- If you see `x-routed-backend=unknown`, verify the APIM policy was applied to the API you‚Äôre calling and that outbound headers are not stripped by another policy.
- Increase `APIM_TEST_RUNS` (env var) for a tighter observed distribution around your configured weights.

### Acceptance Criteria Summary

The following criteria are used to judge migration readiness. All thresholds can be customized via environment variables.

| Metric | Criteria | Default | Env Variable |
|--------|----------|---------|--------------|
| **Overall Success Rate** | Must be ‚â• threshold | 98% | `AC_MIN_SUCCESS_RATE` |
| **Overall Error Rate** | Must be ‚â§ threshold | 2% | `AC_MAX_ERROR_RATE` |
| **Candidate p95 Latency** | Must be ‚â§ absolute max | 5.0s | `AC_MAX_P95_LATENCY_S` |
| **Candidate p95 vs Baseline** | Increase must be ‚â§ threshold | +20% | `AC_MAX_CANDIDATE_P95_INCREASE` |
| **Candidate Avg Cost** | Must be ‚â§ absolute max | (optional) | `AC_MAX_AVG_COST_USD` |
| **Candidate Cost vs Baseline** | Increase must be ‚â§ threshold | (optional) | `AC_MAX_CANDIDATE_COST_INCREASE` |

**Verdict Logic:**
- ‚úÖ **PASS**: All criteria met ‚Üí safe to increase candidate traffic weight
- ‚ùå **FAIL**: Any criterion violated ‚Üí investigate before ramping up

In [36]:
# Acceptance criteria: auto-judge migration readiness from APIM test results
from dataclasses import dataclass
from typing import Optional, List, Tuple

def _fmt_pct(x: float) -> str:
    return f"{x*100:.2f}%"

def _fmt_s(x: float) -> str:
    return f"{x:.3f}s"

def _fmt_usd(x: Optional[float]) -> str:
    return "‚Äî" if x is None else f"${x:.6f}"

def _safe_mean(xs: List[float]) -> Optional[float]:
    xs2 = [x for x in xs if isinstance(x, (int, float))]
    return (sum(xs2) / len(xs2)) if xs2 else None

def _safe_p(xs: List[float], p: float) -> Optional[float]:
    xs2 = [x for x in xs if isinstance(x, (int, float))]
    return percentile(xs2, p) if xs2 else None

@dataclass(frozen=True)
class AcceptanceCriteria:
    max_error_rate: float = float(os.getenv("AC_MAX_ERROR_RATE", "0.02"))  # e.g., 2%
    max_p95_latency_s: float = float(os.getenv("AC_MAX_P95_LATENCY_S", "5.0"))
    max_avg_cost_usd: Optional[float] = (
    float(os.getenv("AC_MAX_AVG_COST_USD")) if os.getenv("AC_MAX_AVG_COST_USD") else None
    )
    max_candidate_p95_increase: float = float(os.getenv("AC_MAX_CANDIDATE_P95_INCREASE", "0.20"))  # +20%
    max_candidate_cost_increase: Optional[float] = (
    float(os.getenv("AC_MAX_CANDIDATE_COST_INCREASE")) if os.getenv("AC_MAX_CANDIDATE_COST_INCREASE") else None
    )
    min_success_rate: float = float(os.getenv("AC_MIN_SUCCESS_RATE", "0.98"))  # overall successes

criteria = AcceptanceCriteria()

backend_labels = {"A": MODEL_A.get("label", "A"), "B": MODEL_B.get("label", "B")}
expected_b_share = (APIM_WEIGHTS["B"] / (APIM_WEIGHTS["A"] + APIM_WEIGHTS["B"])) if (APIM_WEIGHTS["A"] + APIM_WEIGHTS["B"]) else None

def summarize_backend(records: list, backend: str) -> dict:
    subset = [r for r in records if r.get("routed") == backend]
    n = len(subset)
    lat = [r.get("latency_s") for r in subset if isinstance(r.get("latency_s"), (int, float))]
    costs = [r.get("cost_usd") for r in subset if isinstance(r.get("cost_usd"), (int, float))]
    input_tokens = [r.get("input_tokens") for r in subset if isinstance(r.get("input_tokens"), (int, float))]
    output_tokens = [r.get("output_tokens") for r in subset if isinstance(r.get("output_tokens"), (int, float))]

    return {
        "n": n,
        "lat_avg": _safe_mean(lat),
        "lat_p50": _safe_p(lat, 50),
        "lat_p95": _safe_p(lat, 95),
        "avg_cost_usd": _safe_mean(costs) if costs else None,
        "avg_input_tokens": _safe_mean(input_tokens),
        "avg_output_tokens": _safe_mean(output_tokens),
    }

total_runs = len(records)
successes = len([r for r in records if r.get("routed") not in (None, "error") and isinstance(r.get("latency_s"), (int, float))])
overall_success_rate = (successes / total_runs) if total_runs else 0.0
overall_error_rate = 1.0 - overall_success_rate

sum_a = summarize_backend(records, "A")
sum_b = summarize_backend(records, "B")

def judge() -> Tuple[bool, List[str]]:
    reasons: List[str] = []
    ok = True

    if overall_success_rate < criteria.min_success_rate:
        ok = False
        reasons.append(f"Overall success rate {_fmt_pct(overall_success_rate)} < {_fmt_pct(criteria.min_success_rate)}")

    # Absolute criteria (candidate-focused, but also checks baseline stability if present)
    lat_p95_a = sum_a.get("lat_p95")
    lat_p95_b = sum_b.get("lat_p95")
    if lat_p95_b is not None and lat_p95_b > criteria.max_p95_latency_s:
        ok = False
        reasons.append(f"Candidate p95 {_fmt_s(lat_p95_b)} > max {_fmt_s(criteria.max_p95_latency_s)}")

    # Relative p95 increase: candidate vs baseline (only if both exist)
    if lat_p95_a is not None and lat_p95_b is not None and lat_p95_a > 0:
        rel_inc = (lat_p95_b / lat_p95_a) - 1.0
        if rel_inc > criteria.max_candidate_p95_increase:
            ok = False
            reasons.append(f"Candidate p95 +{rel_inc*100:.1f}% > +{criteria.max_candidate_p95_increase*100:.1f}% vs baseline")

    # Cost criteria (only if we can compute cost; requires price_key mapping)
    cost_a = sum_a.get("avg_cost_usd")
    cost_b = sum_b.get("avg_cost_usd")
    if criteria.max_avg_cost_usd is not None and cost_b is not None and cost_b > criteria.max_avg_cost_usd:
        ok = False
        reasons.append(f"Candidate avg cost {_fmt_usd(cost_b)} > max {_fmt_usd(criteria.max_avg_cost_usd)}")
    if criteria.max_candidate_cost_increase is not None and cost_a is not None and cost_b is not None and cost_a > 0:
        rel_cost = (cost_b / cost_a) - 1.0
        if rel_cost > criteria.max_candidate_cost_increase:
            ok = False
            reasons.append(f"Candidate cost +{rel_cost*100:.1f}% > +{criteria.max_candidate_cost_increase*100:.1f}% vs baseline")

    # Error rate criterion (overall)
    if overall_error_rate > criteria.max_error_rate:
        ok = False
        reasons.append(f"Overall error rate {_fmt_pct(overall_error_rate)} > max {_fmt_pct(criteria.max_error_rate)}")

    return ok, reasons

ok, reasons = judge()

def print_markdown_table(rows: list, headers: list) -> None:
    # Simple markdown table printer (no external deps)
    print("| " + " | ".join(headers) + " |")
    print("| " + " | ".join(["---"] * len(headers)) + " |")
    for r in rows:
        print("| " + " | ".join(r) + " |")

rows = []
rows.append([
    "Traffic share (B)",
    "‚Äî",
    _fmt_pct((sum_b['n'] / total_runs) if total_runs else 0.0),
    _fmt_pct(expected_b_share) if expected_b_share is not None else "‚Äî",
    "(info)",
])
rows.append([
    "p95 latency",
    _fmt_s(sum_a['lat_p95']) if sum_a['lat_p95'] is not None else "‚Äî",
    _fmt_s(sum_b['lat_p95']) if sum_b['lat_p95'] is not None else "‚Äî",
    f"<= {_fmt_s(criteria.max_p95_latency_s)} and <= +{criteria.max_candidate_p95_increase*100:.0f}% vs A",
    "PASS" if (sum_b['lat_p95'] is None or sum_b['lat_p95'] <= criteria.max_p95_latency_s) else "FAIL",
])
rows.append([
    "Error rate (overall)",
    "‚Äî",
    _fmt_pct(overall_error_rate),
    f"<= {_fmt_pct(criteria.max_error_rate)} (and success >= {_fmt_pct(criteria.min_success_rate)})",
    "PASS" if (overall_error_rate <= criteria.max_error_rate and overall_success_rate >= criteria.min_success_rate) else "FAIL",
])
rows.append([
    "Avg cost / req",
    _fmt_usd(sum_a.get('avg_cost_usd')),
    _fmt_usd(sum_b.get('avg_cost_usd')),
    (f"<= {_fmt_usd(criteria.max_avg_cost_usd)}" if criteria.max_avg_cost_usd is not None else "(optional)") + (f" and <= +{criteria.max_candidate_cost_increase*100:.0f}% vs A" if criteria.max_candidate_cost_increase is not None else ""),
    "PASS" if (criteria.max_avg_cost_usd is None or sum_b.get('avg_cost_usd') is None or sum_b.get('avg_cost_usd') <= criteria.max_avg_cost_usd) else "FAIL",
])

print("\nüß™ Acceptance criteria evaluation")
print("=" * 100)
print(f"Baseline(A): {backend_labels['A']} | Candidate(B): {backend_labels['B']}")
print(f"Overall success rate: {_fmt_pct(overall_success_rate)} | Overall error rate: {_fmt_pct(overall_error_rate)}")
print("=" * 100)
print_markdown_table(rows, headers=["Metric", "Baseline (A)", "Candidate (B)", "Criteria", "Verdict"])

print("\n‚úÖ MIGRATION: PASS" if ok else "\n‚ùå MIGRATION: FAIL")
if reasons:
    print("Reasons:")
    for r in reasons:
        print(" - " + r)


üß™ Acceptance criteria evaluation
Baseline(A): gpt-4o (baseline) | Candidate(B): gpt-5.1 (candidate)
Overall success rate: 100.00% | Overall error rate: 0.00%
| Metric | Baseline (A) | Candidate (B) | Criteria | Verdict |
| --- | --- | --- | --- | --- |
| Traffic share (B) | ‚Äî | 20.00% | 10.00% | (info) |
| p95 latency | 1.547s | 2.260s | <= 5.000s and <= +20% vs A | PASS |
| Error rate (overall) | ‚Äî | 0.00% | <= 2.00% (and success >= 98.00%) | PASS |
| Avg cost / req | $0.000125 | $0.000101 | (optional) | PASS |

‚ùå MIGRATION: FAIL
Reasons:
 - Candidate p95 +46.1% > +20.0% vs baseline


## Wrap-up

You now have a practical migration workflow:

- A checklist for GPT-4.x ‚Üí GPT-5.x migration planning
- Responses API migration notes (parameter and payload differences)
- A simple **weight-based canary router** with measurable outcomes

Suggested next steps:
1. Replace the toy scenarios with your enterprise eval set and stricter validators (JSON schema, regex, unit tests).
2. Run canary at small weight, then ramp based on acceptance criteria (accuracy, latency percentiles, cost).
3. If you route across regions (or via APIM), capture response headers (e.g., region) during testing to validate the backend selection behavior.