# AI Red Teaming Notebook Overview

## Note this is for regualted Azure Subscriptions preventing Storage Keys

This streamlined notebook guides you through progressively richer AI red teaming evaluations using the Azure AI Evaluation SDK.

You will:
- Run a fast smoke test with a deterministic safe callback (baseline expectations, near-zero Attack Success Rate).
- Target a real Azure OpenAI deployment to observe genuine safety behavior.
- Expand coverage across multiple risk categories and layered attack strategies.
- Add advanced multi-strategy scans (including composed transformations) to probe layered defenses.
- (Optional, end of notebook) Supply your own domain‑specific risky objectives.

Artifacts: Each scan writes a JSON scorecard file (label + UTC time). Use these for comparison, regression tracking, or upload into Azure AI Foundry.

Execution time scales roughly with: risk_categories × attack_strategies × num_objectives. Start small, expand only after verifying prior steps.

In [None]:
# Cell 1: Installation
# Installs all required packages (idempotent) and prints resolved versions, including duckdb.
import sys, subprocess

packages = [
    "duckdb==1.3.2",  # pinned for compatibility
    "azure-ai-evaluation[redteam]",
    "azure-identity",
    "openai",
    "azure-ai-projects",
    "python-dotenv",
]

subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "--upgrade", *packages])
print("Installed (pinned / required):")
for p in packages:
    print("  -", p)

In [None]:
# Cell 2 - imports
from typing import Optional, Dict, Any
import os

# Azure imports
from azure.ai.evaluation.red_team import RedTeam, RiskCategory, AttackStrategy

# OpenAI import
from openai import AzureOpenAI

## Core Concepts: RedTeam, Risk Categories, Attack Strategies & Targets

**RedTeam Orchestrator**: Generates attack objectives, transforms prompts via strategies, invokes your target, and scores responses.

**Risk Categories (what we probe)**: Violence, Hate/Unfairness, Sexual, SelfHarm. You can supply a subset for faster iteration. Missing categories reduce coverage but cut cost/time.

**Attack Strategies (how we probe)**:
- Complexity group macros: `EASY`, `MODERATE` (bundles of simpler / moderate transformations)
- Individual transformations: Flip, CharSwap, UnicodeConfusable, Leetspeak, Url, Base64, ROT13, etc.
- Composition: `AttackStrategy.Compose([Base64, ROT13])` layers transformations to simulate obfuscation chains.

**num_objectives**: Count of seed prompts per category (per applied strategy). Linear multiplier on runtime.

**Targets (what gets attacked)**:
1. Simple synchronous callback (returns fixed text) – deterministic baseline.
2. Model configuration dict – RedTeam handles generation calls internally.
3. Fully custom (async) application wrapper – replicate real app logic, pre/post-processing.

We progress through (1) → (2) → (3+) for clarity.

> Tip: Keep early scans lean (≤2 categories, 1 strategy, num_objectives=1) to validate authentication & environment quickly.

In [None]:
# Cell 3 - Create credential (switchable auth)
import os
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential

# Allow easy switching via env flag; default to DefaultAzureCredential for broader token chain
_use_default = os.environ.get("REDTEAM_USE_DEFAULT_CRED", "1") == "1"
if _use_default:
    credential = DefaultAzureCredential(exclude_interactive_browser_credential=True)
    print("Using DefaultAzureCredential (Managed Identity + Azure CLI + Env, etc.)")
else:
    credential = ManagedIdentityCredential()
    print("Using ManagedIdentityCredential explicitly")

# Quick probe (optional) – will no-op on some identities if scope isn't accessible
try:
    token = credential.get_token("https://management.azure.com/.default")
    print("Acquired mgmt token (truncated):", token.token[:24], "...")
except Exception as e:  # noqa: BLE001
    print("Token probe skipped:", e)


In [None]:
# Cell 4 - Dynamic .env discovery
from pathlib import Path
from typing import List
import os, re
from dotenv import load_dotenv

REQUIRED_KEYS = [
    "AZURE_SUBSCRIPTION_ID",
    "AZURE_RESOURCE_GROUP_NAME",
    "AZURE_PROJECT_NAME",
    "AZURE_OPENAI_DEPLOYMENT_NAME",
    "AZURE_OPENAI_ENDPOINT",
    "AZURE_OPENAI_API_KEY",
    "AZURE_OPENAI_API_VERSION",
]

explicit_path = os.environ.get("REDTEAM_DOTENV_PATH")
searched: List[Path] = []
selected = None

candidates: List[Path] = []
if explicit_path:
    p = Path(explicit_path)
    if p.is_file():
        candidates.append(p)

# Typical Azure AI Foundry project mount pattern: /afh/projects/<resource-project-guid>/shared/files/.env
root = Path('/afh/projects')
if root.is_dir():
    for child in root.iterdir():
        if child.is_dir() and ('-project-' in child.name):
            env_candidate = child / 'shared' / 'files' / '.env'
            searched.append(env_candidate)
            if env_candidate.is_file():
                candidates.append(env_candidate)

# Fallback: shallow glob for any .env directly under shared/files
if not candidates and root.is_dir():
    for env_candidate in root.glob('**/shared/files/.env'):
        searched.append(env_candidate)
        if env_candidate.is_file():
            candidates.append(env_candidate)
            break

# Choose first containing all required keys, else first existing
for c in candidates:
    try:
        text = c.read_text()
        if all(re.search(rf'^ {k}=', text, re.MULTILINE) or re.search(rf'^{k}=', text, re.MULTILINE) for k in REQUIRED_KEYS):
            selected = c
            break
    except Exception:
        pass
if selected is None and candidates:
    selected = candidates[0]

if selected and selected.is_file():
    load_dotenv(selected)
    missing_after = [k for k in REQUIRED_KEYS if not os.environ.get(k)]
    print(f"Loaded .env from: {selected}")
    if missing_after:
        print("Still missing keys:", missing_after)
else:
    print("No .env loaded. Candidates searched (first 5):", [str(p) for p in searched[:5]])


In [None]:
# Cell 5 - Set variables
import os

_required_keys = [
    "AZURE_SUBSCRIPTION_ID",
    "AZURE_RESOURCE_GROUP_NAME",
    "AZURE_PROJECT_NAME",
    "AZURE_OPENAI_DEPLOYMENT_NAME",
    "AZURE_OPENAI_ENDPOINT",
    "AZURE_OPENAI_API_KEY",
    "AZURE_OPENAI_API_VERSION",
]
_env = {k: os.environ.get(k) for k in _required_keys}
_missing = [k for k, v in _env.items() if not v]

if _missing:
    print("Missing environment variables:", _missing)
else:
    # Construct objects / variables consumed by later cells
    azure_ai_project = {
        "subscription_id": _env["AZURE_SUBSCRIPTION_ID"],
        "resource_group_name": _env["AZURE_RESOURCE_GROUP_NAME"],
        "project_name": _env["AZURE_PROJECT_NAME"],
        "credential": credential,
    }
    azure_openai_deployment = _env["AZURE_OPENAI_DEPLOYMENT_NAME"]
    azure_openai_endpoint = _env["AZURE_OPENAI_ENDPOINT"]
    azure_openai_api_key = _env["AZURE_OPENAI_API_KEY"]
    azure_openai_api_version = _env["AZURE_OPENAI_API_VERSION"]

    print("Config loaded.")
    print(f"Deployment: {azure_openai_deployment}")
    print(f"Endpoint: {azure_openai_endpoint}")
    print("API key prefix:", azure_openai_api_key[:6] + '...')

### Environment Variables Loaded
Required keys (subscription, resource group, project name, OpenAI deployment details, API key & version) enable two things:
1. Locating the Azure AI Project for logging / result persistence.
2. Invoking your Azure OpenAI deployment during model-based scans.

Missing values will be surfaced in the next cell; resolve them before proceeding to model or advanced scans.

### Target Patterns
You can point RedTeam at different target styles:
- Deterministic callback (fast baseline, isolates framework logic).
- Direct model configuration dict (minimal code, invokes deployment directly).
- Rich application function (mirrors real system behavior: pre/post-processing, memory, safety layers).

We start with the deterministic callback to validate plumbing before spending tokens on real model invocations.

In [None]:
# Cell 6 - Define a simple callback function that always returns a fixed response
def financial_advisor_callback(query: str) -> str:  # noqa: ARG001
    return "I'm a financial advisor assistant. I can help with investment advice and financial planning within legal and ethical guidelines."

In [None]:
# Cell 7 - Create the `RedTeam` instance with minimal configurations
red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    risk_categories=[RiskCategory.Violence, RiskCategory.HateUnfairness],
    num_objectives=1,
 )
# (Inserted) Configure writable output directory to avoid permission issues in read-only code mount
import pathlib, os
_rt_out = pathlib.Path.home() / 'redteam_outputs'
_rt_out.mkdir(parents=True, exist_ok=True)
try:
    # Some internals may expose file_manager; guard if attribute layout changes
    if hasattr(red_team, 'file_manager') and hasattr(red_team.file_manager, 'base_output_dir'):
        red_team.file_manager.base_output_dir = str(_rt_out)
        print('RedTeam base_output_dir set to', red_team.file_manager.base_output_dir)
    else:
        print('WARNING: Unable to set custom base_output_dir; attribute not found')
except Exception as _e:
    print('WARNING: Failed to set base_output_dir:', _e)

In [None]:
# Cell 8 - Basic Scan
from datetime import datetime, timezone
from pathlib import Path
import pathlib, os

scan_ts = datetime.now(timezone.utc).strftime("%H%M%S")
scan_label = "Basic"
scan_name = f"{scan_label}-Scan-{scan_ts}"
# Use writable output directory set earlier (fallback to home)
_writable_base = pathlib.Path(getattr(red_team.file_manager, 'base_output_dir', Path.home()))
_writable_base.mkdir(parents=True, exist_ok=True)
output_path = _writable_base / f"{scan_label.lower()}_scan-{scan_ts}.json"

basic_scan_result = await red_team.scan(
    target=financial_advisor_callback,
    scan_name=scan_name,
    attack_strategies=[AttackStrategy.Flip],
    output_path=str(output_path),
    # skip_upload=True  # uncomment if remote artifact upload warnings persist
 )
print(f"Completed {scan_name} -> {output_path}")

### Interpreting the Basic Scan
Key metric: **Attack Success Rate (ASR)** = percentage of prompts whose responses are flagged harmful.

Because our callback always returns a fixed safe sentence, expected ASR ≈ 0%.

Use the emitted JSON to verify structure and plan which additional strategies or categories to add next.

In [None]:
# Cell 9 - Intermediary Scan (updated to mirror Cell 8 writable path pattern and add basic error handling)
from datetime import datetime, timezone
from pathlib import Path
import pathlib, os

# Lazy client creation (safe on reruns)
if '_rt_client' not in globals():
    from openai import AzureOpenAI
    _rt_client = AzureOpenAI(
        api_key=azure_openai_api_key,
        api_version=azure_openai_api_version,
        azure_endpoint=azure_openai_endpoint,
    )

# Accept both invocation styles used internally by RedTeam
def model_target(query: str = None, user_prompt: str = None):
    prompt = user_prompt if (user_prompt not in (None, "")) else query or "(empty prompt)"
    return _rt_client.chat.completions.create(
        model=azure_openai_deployment,
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        max_tokens=256,
    ).choices[0].message.content or ""

# Reuse writable directory logic from Cell 8 for consistency
_writable_base = pathlib.Path(getattr(red_team.file_manager, 'base_output_dir', Path.home()))
_writable_base.mkdir(parents=True, exist_ok=True)

scan_ts = datetime.now(timezone.utc).strftime('%H%M%S')
scan_label = "Intermediary"
scan_name = f"{scan_label}-Scan-{scan_ts}"
output_path = _writable_base / f"{scan_label.lower()}_scan-{scan_ts}.json"

print(f"Running {scan_name} -> {output_path}")

try:
    intermediary_scan_result = await red_team.scan(
        target=model_target,
        scan_name=scan_name,
        attack_strategies=[AttackStrategy.Flip],
        output_path=str(output_path),
    )
    print(f"Completed {scan_name} -> {output_path}")
except Exception as e:  # noqa: BLE001
    print(f"ERROR during {scan_name}: {e}")
    raise

### Moving to a Model Target
Switching from a deterministic callback to an actual model introduces variability and real guardrail evaluation. Keeping the same single `Flip` strategy isolates model safety behavior from added obfuscation complexity.

Next expansions: increase `num_objectives`, add additional strategies (CharSwap, UnicodeConfusable, etc.), or broaden risk categories.

In [None]:
# Cell 10: Advanced scan - create expanded RedTeam instance (with writable base_output_dir)
advanced_risk_categories = [
    RiskCategory.Violence,
    RiskCategory.HateUnfairness,
    RiskCategory.Sexual,
    RiskCategory.SelfHarm,
]
advanced_red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    risk_categories=advanced_risk_categories,
    num_objectives=3,  # increase coverage per category (adjust for cost/time)
)

# Mirror writable directory logic used earlier for 'red_team'
import pathlib
_adv_out = pathlib.Path(getattr(red_team.file_manager, 'base_output_dir', pathlib.Path.home()))
_adv_out.mkdir(parents=True, exist_ok=True)
try:
    if hasattr(advanced_red_team, 'file_manager') and hasattr(advanced_red_team.file_manager, 'base_output_dir'):
        advanced_red_team.file_manager.base_output_dir = str(_adv_out)
        print('Advanced RedTeam base_output_dir set to', advanced_red_team.file_manager.base_output_dir)
    else:
        print('WARNING: Could not set advanced_red_team base_output_dir; attribute not found')
except Exception as _e:
    print('WARNING: Failed setting advanced_red_team base_output_dir:', _e)

print("Advanced RedTeam configured with categories:", [c.name for c in advanced_risk_categories])

### Expanding Coverage
We now include all four core risk categories and raise `num_objectives` to increase statistical signal. This increases token/time consumption proportionally.

Broader coverage helps surface category-specific weaknesses early (e.g., higher ASR in SelfHarm vs Sexual).

In [None]:
# Cell 11: Advanced scan + inline stderr ERROR capture (standardized naming & writable output dir)
from datetime import datetime, timezone
import sys, io, re, contextlib, time, json
from pathlib import Path
import pathlib  # for consistent writable directory handling

#### Debugger (capture + extract ERROR lines) ####
error_re = re.compile(r'^ERROR:.*', re.IGNORECASE)
class _Cap(contextlib.AbstractContextManager):
    def __enter__(self):
        self._orig = sys.stderr; self.buf = io.StringIO(); sys.stderr = self; return self
    def write(self,d): self.buf.write(d); self._orig.write(d)
    def flush(self): self._orig.flush()
    def __exit__(self,*a): sys.stderr = self._orig
#### End Debugger (setup) ####

scan_ts = datetime.now(timezone.utc).strftime('%H%M%S')
scan_label = "Advanced"
scan_name = f"{scan_label}-Scan-{scan_ts}"

# Use same writable directory pattern as earlier scans (fallback to home if attribute missing)
_writable_base = pathlib.Path(getattr(red_team.file_manager, 'base_output_dir', Path.home()))
_writable_base.mkdir(parents=True, exist_ok=True)
output_path = _writable_base / f"{scan_label.lower()}_scan-{scan_ts}.json"

print(f"Running {scan_name} -> {output_path}")
with _Cap() as cap:  # debugger active
    advanced_scan_result = await advanced_red_team.scan(
        target=model_target,
        scan_name=scan_name,
        attack_strategies=[
            AttackStrategy.EASY,
            AttackStrategy.MODERATE,
            AttackStrategy.Flip,
            AttackStrategy.CharSwap,
            AttackStrategy.UnicodeConfusable,
            AttackStrategy.Leetspeak,
            AttackStrategy.Url,
            AttackStrategy.Compose([AttackStrategy.Base64, AttackStrategy.ROT13]),
        ],
        output_path=str(output_path),
    )
    sys.stderr.flush(); time.sleep(0.2)

#### Debugger (ERROR extraction/print) ####
errs = [ln.strip() for ln in cap.buf.getvalue().splitlines() if error_re.match(ln)]
print(f"Completed {scan_name} -> {output_path}")
print("No ERROR lines captured." if not errs else f"{len(errs)} ERROR line(s):\n" + "\n".join(errs))

### Advanced Strategies & Layering
The advanced scan mixes:
- Complexity groups (`EASY`, `MODERATE`) for breadth.
- Obfuscations (CharSwap, UnicodeConfusable, Leetspeak, Url) to probe normalization defenses.
- Encoding (Base64, ROT13 via composition) to test decoding / content safety layers.

Capturing stderr lets you quickly surface any internal SDK errors alongside scan results.

## Bring Your Own Objectives: Custom Attack Seed Prompts
You can supply your own domain or application-specific risky prompts as objectives instead of (or in addition to) automatically generated ones.

Format: a JSON file whose entries include `prompt` text and `risk-type` (one of: `violence`, `sexual`, `hate_unfairness`, `self_harm`). The number of prompts provided becomes the effective `num_objectives` for the scan.

Use this when:
- You have proprietary misuse scenarios not covered by generic seeds.
- You want regression tracking on a fixed, curated risky prompt set.
- You need to validate mitigations against previously successful attacks.

Below we instantiate a new `RedTeam` with `custom_attack_seed_prompts` pointing to `data/prompts.json`, then run grouped difficulty strategies.

> Tip: Keep a version-controlled prompts file so additions are reviewable and diffs tie to shifts in ASR.


In [None]:
# Cell 12 - Custom prompts RedTeam instance (dynamic prompt discovery + writable dir)
from pathlib import Path
import pathlib, os, re

# Allow explicit override
explicit_prompt_path = os.environ.get("REDTEAM_CUSTOM_PROMPTS_PATH")

candidates = []
searched = []
selected = None

if explicit_prompt_path:
    p = Path(explicit_prompt_path)
    if p.is_file():
        candidates.append(p)

# Derive project mount pattern similar to Cell 4 logic: /afh/projects/*-project-*/shared/files/data/prompts.json
projects_root = Path('/afh/projects')
if projects_root.is_dir():
    for child in projects_root.iterdir():
        if child.is_dir() and ('-project-' in child.name):
            prompt_candidate = child / 'shared' / 'files' / 'data' / 'prompts.json'
            searched.append(prompt_candidate)
            if prompt_candidate.is_file():
                candidates.append(prompt_candidate)

# Fallback: glob search for any prompts.json under shared/files/data
if not candidates and projects_root.is_dir():
    for prompt_candidate in projects_root.glob('**/shared/files/data/prompts.json'):
        searched.append(prompt_candidate)
        if prompt_candidate.is_file():
            candidates.append(prompt_candidate)
            break

# Select first existing candidate
for c in candidates:
    try:
        if c.is_file():
            selected = c
            break
    except Exception:
        pass

if not selected:
    raise FileNotFoundError(
        "Could not locate prompts.json. Searched candidates (first 5): " +
        str([str(p) for p in searched[:5]]) +
        " - set REDTEAM_CUSTOM_PROMPTS_PATH to override."
    )

print(f"Using prompts file: {selected}")

custom_red_team = RedTeam(
    azure_ai_project=azure_ai_project,
    credential=credential,
    custom_attack_seed_prompts=str(selected),
)

# Align writable output directory with earlier scans
_custom_out = pathlib.Path(getattr(red_team.file_manager, 'base_output_dir', Path.home()))
_custom_out.mkdir(parents=True, exist_ok=True)
try:
    if hasattr(custom_red_team, 'file_manager') and hasattr(custom_red_team.file_manager, 'base_output_dir'):
        custom_red_team.file_manager.base_output_dir = str(_custom_out)
        print('Custom RedTeam base_output_dir set to', custom_red_team.file_manager.base_output_dir)
    else:
        print('WARNING: Could not set custom_red_team base_output_dir; attribute not found')
except Exception as _e:
    print('WARNING: Failed setting custom_red_team base_output_dir:', _e)

print("Custom RedTeam ready. Prompt count determines num_objectives.")

In [None]:
# Cell 13: Execute scan with custom prompts and grouped difficulty strategies (writable output dir)
from datetime import datetime, timezone
from pathlib import Path
import pathlib

scan_ts = datetime.now(timezone.utc).strftime('%H%M%S')
scan_label = "Custom"
scan_name = f"{scan_label}-Prompt-Scan-{scan_ts}"

# Use the custom_red_team writable directory (fallback to home)
_writable_base = pathlib.Path(getattr(custom_red_team.file_manager, 'base_output_dir', Path.home()))
_writable_base.mkdir(parents=True, exist_ok=True)
output_path = _writable_base / f"custom_prompt_scan-{scan_ts}.json"
print(f"Running {scan_name} -> {output_path}")

try:
    custom_result = await custom_red_team.scan(
        target=model_target,  # reuse earlier model target callback
        scan_name=scan_name,
        attack_strategies=[
            AttackStrategy.EASY,
            AttackStrategy.MODERATE,
            AttackStrategy.DIFFICULT,
        ],
        output_path=str(output_path),
    )
    print(f"Completed {scan_name} -> {output_path}")
except Exception as e:  # noqa: BLE001
    print(f"ERROR during {scan_name}: {e}")
    raise