# Switch-Case Edge Group - Deterministic Multi-Way Routing

## Overview
This notebook demonstrates **switch-case edge groups** for deterministic one-of-N routing. You'll learn how to:
- Use `Case` and `Default` for cleaner conditional routing than multiple `add_edge()` calls
- Implement three-way decision logic (NotSpam/Spam/Uncertain)
- Share state across executors using `ctx.set_shared_state()` / `ctx.get_shared_state()`
- Keep executor responsibilities narrow with typed data contracts
- Build robust workflows with structured agent outputs

## What This Sample Does
The workflow implements an **email triage system** with three possible outcomes:
1. **store_email** - Persists email once in shared state
2. **spam_detection_agent** - Classifies email as NotSpam/Spam/Uncertain
3. **to_detection_result** - Transforms agent response into typed `DetectionResult`
4. **Switch-Case Routing:**
   - **Case 1 (NotSpam)** → Draft professional reply
   - **Case 2 (Spam)** → Block and log
   - **Default (Uncertain)** → Flag for human review

**Input:** Ambiguous email text (from `resources/ambiguous_email.txt`)  
**Output:** Drafted reply, spam notice, OR uncertain flag (based on classification)

## Key Differences from `edge_condition.ipynb`
- **Cleaner Syntax:** `add_switch_case_edge_group()` vs multiple `add_edge()` calls
- **Explicit Default:** `Default(target=...)` makes fallback path clear
- **Ordered Evaluation:** Cases checked in order, first match wins
- **Better Readability:** Switch-case intent is obvious from code structure

## Step 1: Import Required Libraries

In [None]:
# Copyright (c) Microsoft. All rights reserved.

from dotenv import load_dotenv
import asyncio
import os
from dataclasses import dataclass
from typing import Any, Literal
from uuid import uuid4

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    Case,
    ChatMessage,
    Default,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    executor,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel
from typing_extensions import Never

load_dotenv('../../.env')

endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")


## Step 2: Define Constants and Models

**Constants:**
- `EMAIL_STATE_PREFIX` - Prefix for email storage keys
- `CURRENT_EMAIL_ID_KEY` - Key for tracking current email ID

**Pydantic Models (Agent Outputs):**
- `DetectionResultAgent` - Structured JSON from spam detector
- `EmailResponse` - Structured JSON from email assistant

**Dataclasses (Internal Types):**
- `DetectionResult` - Internal typed payload for routing
- `Email` - In-memory email record

In [None]:
EMAIL_STATE_PREFIX = "email:"
CURRENT_EMAIL_ID_KEY = "current_email_id"


class DetectionResultAgent(BaseModel):
    """Structured output returned by the spam detection agent."""

    # The agent classifies the email and provides a rationale.
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str


class EmailResponse(BaseModel):
    """Structured output returned by the email assistant agent."""

    # The drafted professional reply.
    response: str


@dataclass
class DetectionResult:
    # Internal typed payload used for routing and downstream handling.
    spam_decision: str
    reason: str
    email_id: str


@dataclass
class Email:
    # In memory record of the email content stored in shared state.
    email_id: str
    email_content: str

## Step 3: Create Condition Factory for Cases
Factory function that generates predicates matching specific `spam_decision` values.

**How It Works:**
- `get_case("NotSpam")` returns predicate checking `msg.spam_decision == "NotSpam"`
- `get_case("Spam")` returns predicate checking `msg.spam_decision == "Spam"`
- Type checking ensures only `DetectionResult` messages are matched

In [None]:
def get_case(expected_decision: str):
    """Factory that returns a predicate matching a specific spam_decision value."""

    def condition(message: Any) -> bool:
        # Only match when the upstream payload is a DetectionResult with the expected decision.
        return isinstance(message, DetectionResult) and message.spam_decision == expected_decision

    return condition

## Step 4: Define Executor Functions

**store_email:**
- Persists email once in shared state with unique ID
- Avoids redundant data passing between executors
- Forwards email as `AgentExecutorRequest` to spam detector

**to_detection_result:**
- Transforms `AgentExecutorResponse` → `DetectionResult`
- Attaches email ID for downstream state lookups

**submit_to_email_assistant:**
- Retrieves original email from shared state
- Only handles NotSpam path (defensive check)

**finalize_and_send:**
- Terminal executor for NotSpam path
- Parses and yields drafted reply

**handle_spam:**
- Terminal executor for Spam path
- Yields spam notice with reason

**handle_uncertain:**
- Terminal executor for Uncertain path (Default case)
- Surfaces original content for human review

In [None]:
@executor(id="store_email")
async def store_email(email_text: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    # Persist the raw email once. Store under a unique key and set the current pointer for convenience.
    new_email = Email(email_id=str(uuid4()), email_content=email_text)
    await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email)
    await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id)

    # Kick off the detector by forwarding the email as a user message to the spam_detection_agent.
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=new_email.email_content)], should_respond=True)
    )


@executor(id="to_detection_result")
async def to_detection_result(response: AgentExecutorResponse, ctx: WorkflowContext[DetectionResult]) -> None:
    # Parse the detector JSON into a typed model. Attach the current email id for downstream lookups.
    parsed = DetectionResultAgent.model_validate_json(response.agent_run_response.text)
    email_id: str = await ctx.get_shared_state(CURRENT_EMAIL_ID_KEY)
    await ctx.send_message(DetectionResult(spam_decision=parsed.spam_decision, reason=parsed.reason, email_id=email_id))


@executor(id="submit_to_email_assistant")
async def submit_to_email_assistant(detection: DetectionResult, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
    # Only proceed for the NotSpam branch. Guard against accidental misrouting.
    if detection.spam_decision != "NotSpam":
        raise RuntimeError("This executor should only handle NotSpam messages.")

    # Load the original content from shared state using the id carried in DetectionResult.
    email: Email = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
    await ctx.send_message(
        AgentExecutorRequest(messages=[ChatMessage(Role.USER, text=email.email_content)], should_respond=True)
    )


@executor(id="finalize_and_send")
async def finalize_and_send(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
    # Terminal step for the drafting branch. Yield the email response as output.
    parsed = EmailResponse.model_validate_json(response.agent_run_response.text)
    await ctx.yield_output(f"Email sent: {parsed.response}")


@executor(id="handle_spam")
async def handle_spam(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    # Spam path terminal. Include the detector's rationale.
    if detection.spam_decision == "Spam":
        await ctx.yield_output(f"Email marked as spam: {detection.reason}")
    else:
        raise RuntimeError("This executor should only handle Spam messages.")


@executor(id="handle_uncertain")
async def handle_uncertain(detection: DetectionResult, ctx: WorkflowContext[Never, str]) -> None:
    # Uncertain path terminal. Surface the original content to aid human review.
    if detection.spam_decision == "Uncertain":
        email: Email | None = await ctx.get_shared_state(f"{EMAIL_STATE_PREFIX}{detection.email_id}")
        await ctx.yield_output(
            f"Email marked as uncertain: {detection.reason}. Email content: {getattr(email, 'email_content', '')}"
        )
    else:
        raise RuntimeError("This executor should only handle Uncertain messages.")

## Step 5: Build the Switch-Case Workflow

**Workflow Structure:**
```
store_email → spam_detection_agent → to_detection_result
                                           │
                        ┌──────────────────┼──────────────────┐
                        │                  │                  │
                   NotSpam              Spam            Uncertain
                        │                  │              (Default)
                        ▼                  ▼                  ▼
            submit_to_assistant      handle_spam      handle_uncertain
                        │
                        ▼
              email_assistant_agent
                        │
                        ▼
               finalize_and_send
```

**Key Points:**
- `add_switch_case_edge_group()` replaces multiple conditional `add_edge()` calls
- Cases evaluated in order: NotSpam → Spam → Default
- `Default` catches any unmatched cases (Uncertain)
- Much cleaner than `edge_condition.ipynb` approach

In [None]:
async def run_workflow():
    """Main function to run the workflow."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    deployment_name = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME")
    chat_client = AzureOpenAIChatClient(
        deployment_name=deployment_name,
        endpoint=endpoint,
        credential=AzureCliCredential()
    )

    # Agents. response_format enforces that the LLM returns JSON that Pydantic can validate.
    spam_detection_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are a spam detection assistant that identifies spam emails. "
                "Be less confident in your assessments. "
                "Always return JSON with fields 'spam_decision' (one of NotSpam, Spam, Uncertain) "
                "and 'reason' (string)."
            ),
            response_format=DetectionResultAgent,
        ),
        id="spam_detection_agent",
    )

    email_assistant_agent = AgentExecutor(
        chat_client.create_agent(
            instructions=(
                "You are an email assistant that helps users draft responses to emails with professionalism."
            ),
            response_format=EmailResponse,
        ),
        id="email_assistant_agent",
    )

    # Build workflow: store -> detection agent -> to_detection_result -> switch (NotSpam or Spam or Default).
    # The switch-case group evaluates cases in order, then falls back to Default when none match.
    workflow = (
        WorkflowBuilder()
        .set_start_executor(store_email)
        .add_edge(store_email, spam_detection_agent)
        .add_edge(spam_detection_agent, to_detection_result)
        .add_switch_case_edge_group(
            to_detection_result,
            [
                Case(condition=get_case("NotSpam"), target=submit_to_email_assistant),
                Case(condition=get_case("Spam"), target=handle_spam),
                Default(target=handle_uncertain),
            ],
        )
        .add_edge(submit_to_email_assistant, email_assistant_agent)
        .add_edge(email_assistant_agent, finalize_and_send)
        .build()
    )

    # Read ambiguous email if available. Otherwise use a simple inline sample.
    resources_path = os.path.join(
        os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "ambiguous_email.txt"
    )
    if os.path.exists(resources_path):
        with open(resources_path, encoding="utf-8") as f:  # noqa: ASYNC230
            email = f.read()
    else:
        print("Unable to find resource file, using default text.")
        email = (
            "Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. "
            "Let me know if you'd like more details."
        )

    print("=== Email Content ===")
    print(email[:200] + "..." if len(email) > 200 else email)
    print("\n=== Processing ===")

    # Run and print the outputs from whichever branch completes.
    events = await workflow.run(email)
    outputs = events.get_outputs()
    if outputs:
        print("\n=== Workflow Output ===")
        for output in outputs:
            print(output)

## Step 6: Execute the Workflow
**Note:** Requires Azure OpenAI credentials. Authenticate with `az login`.

In [None]:
# Uncomment to run (requires Azure OpenAI access)
# await run_workflow()

## Expected Output (Uncertain Path)
```
=== Email Content ===
Hey there, I noticed you might be interested in our latest offer—no pressure, but it expires soon. 
Let me know if you'd like more details.

=== Processing ===

=== Workflow Output ===
Email marked as uncertain: The email contains promotional language but lacks typical spam indicators. 
Requires human review. Email content: Hey there, I noticed you might be interested in our latest offer...
```

## Key Takeaways

### Switch-Case Edge Groups
✅ **Cleaner Syntax:** One `add_switch_case_edge_group()` vs multiple `add_edge()` calls  
✅ **Explicit Default:** `Default(target=...)` makes fallback obvious  
✅ **Ordered Evaluation:** Cases checked sequentially, first match wins  
✅ **Better Intent:** Switch-case structure is self-documenting

### Syntax Comparison

**Edge Conditions (Old Style):**
```python
workflow.add_edge(classifier, path_a, condition=lambda m: m.decision == 'A')
workflow.add_edge(classifier, path_b, condition=lambda m: m.decision == 'B')
workflow.add_edge(classifier, path_default, condition=lambda m: True)
```

**Switch-Case (New Style):**
```python
workflow.add_switch_case_edge_group(
    classifier,
    [
        Case(condition=lambda m: m.decision == 'A', target=path_a),
        Case(condition=lambda m: m.decision == 'B', target=path_b),
        Default(target=path_default),
    ],
)
```

**Benefits:**
- All cases grouped together (easier to read)
- Explicit `Default` vs implicit fallback
- Evaluation order is visually clear

### Shared State Pattern
**Why Use Shared State?**
- Avoid passing large data (emails) between every executor
- Store once, retrieve anywhere in the workflow
- Lightweight message passing (IDs instead of content)

**Pattern:**
```python
# Store once
email = Email(email_id=str(uuid4()), email_content=text)
await ctx.set_shared_state(f"email:{email.email_id}", email)
await ctx.set_shared_state("current_email_id", email.email_id)

# Pass only ID
await ctx.send_message(DetectionResult(email_id=email.email_id, ...))

# Retrieve when needed
email_id = detection.email_id
email = await ctx.get_shared_state(f"email:{email_id}")
```

### Typed Data Contracts
**Dataclasses for Internal Types:**
```python
@dataclass
class DetectionResult:
    spam_decision: str  # NotSpam, Spam, Uncertain
    reason: str
    email_id: str  # Reference to shared state
```

**Benefits:**
- Type hints for IDE support
- Clear executor input/output contracts
- Easier to validate and debug

**Pydantic for Agent I/O:**
```python
class DetectionResultAgent(BaseModel):
    spam_decision: Literal["NotSpam", "Spam", "Uncertain"]
    reason: str
```

**Benefits:**
- Automatic validation
- Constrained values (Literal types)
- JSON schema generation

## Switch-Case Routing Flow

### NotSpam Path:
1. `to_detection_result` sends `DetectionResult(spam_decision="NotSpam", ...)`
2. **Case 1** (`get_case("NotSpam")`) evaluates to `True`
3. Routes to `submit_to_email_assistant`
4. Retrieves email from shared state
5. Sends to `email_assistant_agent`
6. `finalize_and_send` yields drafted reply

### Spam Path:
1. `to_detection_result` sends `DetectionResult(spam_decision="Spam", ...)`
2. **Case 1** fails (NotSpam), **Case 2** (`get_case("Spam")`) succeeds
3. Routes to `handle_spam`
4. Yields spam notice

### Uncertain Path:
1. `to_detection_result` sends `DetectionResult(spam_decision="Uncertain", ...)`
2. **Case 1** fails (NotSpam), **Case 2** fails (Spam)
3. **Default** catches unmatched case
4. Routes to `handle_uncertain`
5. Retrieves email from shared state
6. Yields uncertain notice with content

## Best Practices

### When to Use Switch-Case
✅ **3+ branches** - Clearer than multiple conditional edges  
✅ **Mutually exclusive** - Only one path should execute  
✅ **Known options** - Finite set of possible values  
✅ **Default handling** - Need explicit fallback path

### When to Use Edge Conditions
✅ **Binary decisions** - Simple if/else (2 branches)  
✅ **Complex predicates** - Multi-field validation  
✅ **Independent conditions** - Edges don't form a set

### Shared State Guidelines
- **Use prefixes** for namespacing (`EMAIL_STATE_PREFIX`)
- **Store once** at workflow entry
- **Pass IDs** instead of full objects
- **Clean up** if state is large (not shown in sample)

### Defensive Coding
```python
# Always validate executor inputs
if detection.spam_decision != "NotSpam":
    raise RuntimeError("This executor should only handle NotSpam messages.")
```

**Why?**
- Catches routing bugs early
- Makes contracts explicit
- Prevents silent failures

## Pattern Variations

### Priority-Based Routing
```python
workflow.add_switch_case_edge_group(
    classifier,
    [
        Case(condition=lambda m: m.priority == 'critical', target=urgent_handler),
        Case(condition=lambda m: m.priority == 'high', target=high_handler),
        Case(condition=lambda m: m.priority == 'normal', target=normal_handler),
        Default(target=low_handler),
    ],
)
```

### Status-Based Routing
```python
workflow.add_switch_case_edge_group(
    validator,
    [
        Case(condition=lambda m: m.status == 'approved', target=process),
        Case(condition=lambda m: m.status == 'rejected', target=reject),
        Default(target=review_queue),  # pending, uncertain
    ],
)
```

### Category-Based Routing
```python
workflow.add_switch_case_edge_group(
    categorizer,
    [
        Case(condition=lambda m: m.category == 'question', target=qa_agent),
        Case(condition=lambda m: m.category == 'feedback', target=feedback_handler),
        Case(condition=lambda m: m.category == 'complaint', target=escalation),
        Default(target=general_handler),
    ],
)
```

## Next Steps
- Compare with `edge_condition.ipynb` to see the difference in syntax
- Explore `multi_selection_edge_group.ipynb` for **fan-out** (one-to-many routing)
- See `simple_loop.ipynb` for iterative patterns with feedback loops