In [None]:
# Setup: Install required packages
# Run this cell first to ensure all dependencies are installed

import subprocess
import sys

packages = [
    "openai",
    "azure-identity",
    "python-dotenv"
]

for package in packages:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
    
print("✅ All packages installed successfully!")

In [None]:
# Configuration: Load environment variables
# Set up your Azure OpenAI connection

import os
from dotenv import load_dotenv

# Load environment variables from .env file (if present)
load_dotenv()

# Check required configuration
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini")

if endpoint:
    print(f"✅ Azure OpenAI Endpoint: {endpoint}")
    print(f"✅ Deployment Name: {deployment}")
else:
    print("⚠️ AZURE_OPENAI_ENDPOINT not set")
    print("Please set the following environment variable:")
    print("  - AZURE_OPENAI_ENDPOINT")
    print("And optionally:")
    print("  - AZURE_OPENAI_DEPLOYMENT_NAME (default: gpt-4o-mini)")
    print("  - AZURE_OPENAI_API_KEY (for API key auth)")

# Check authentication method
api_key = os.getenv("AZURE_OPENAI_API_KEY")
client_id = os.getenv("AZURE_CLIENT_ID")

if api_key:
    print("✅ Authentication: API Key")
elif client_id:
    print("✅ Authentication: Service Principal")
else:
    print("✅ Authentication: Azure CLI / Managed Identity")

In [None]:
# Create Azure OpenAI Client
# This demonstrates how to set up the client with different auth methods

import os
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# Get configuration
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini")
api_key = os.getenv("AZURE_OPENAI_API_KEY")

if not endpoint:
    raise ValueError("AZURE_OPENAI_ENDPOINT environment variable must be set")

# Create client based on authentication method
if api_key:
    # API Key authentication
    client = AzureOpenAI(
        azure_endpoint=endpoint,
        api_key=api_key,
        api_version="2024-10-21"
    )
    print("✅ Created AzureOpenAI client with API Key auth")
else:
    # Token-based authentication (Service Principal, Managed Identity, or Azure CLI)
    credential = DefaultAzureCredential()
    token_provider = get_bearer_token_provider(
        credential, 
        "https://cognitiveservices.azure.com/.default"
    )
    client = AzureOpenAI(
        azure_endpoint=endpoint,
        azure_ad_token_provider=token_provider,
        api_version="2024-10-21"
    )
    print("✅ Created AzureOpenAI client with token-based auth")

In [None]:
# Define the Support Ticket model
# This is the common data model used across all workflow patterns

from dataclasses import dataclass
from typing import Optional

@dataclass
class SupportTicket:
    """Represents a customer support ticket."""
    ticket_id: str
    customer_name: str
    subject: str
    description: str
    category: Optional[str] = None
    priority: Optional[str] = None
    status: str = "Open"

# Example ticket
example_ticket = SupportTicket(
    ticket_id="TICKET-001",
    customer_name="John Smith",
    subject="Cannot access my account",
    description="I've been trying to log in for the past hour but keep getting an 'invalid credentials' error. I'm sure my password is correct."
)

print("✅ SupportTicket model defined")
print()
print("Example Ticket:")
print(f"  ID: {example_ticket.ticket_id}")
print(f"  Customer: {example_ticket.customer_name}")
print(f"  Subject: {example_ticket.subject}")
print(f"  Status: {example_ticket.status}")

In [None]:
# Workflow Building Blocks - Executor Base Class
# This is the foundation for all workflow patterns

from dataclasses import dataclass
from typing import Any

@dataclass
class WorkflowEvent:
    """Event emitted during workflow execution."""
    executor_id: str
    data: Any

class Executor:
    """Base class for workflow executors."""
    
    def __init__(self, name: str):
        self.name = name
    
    async def execute(self, input_data: Any) -> tuple[Any, WorkflowEvent]:
        """Execute the function and return result with event."""
        raise NotImplementedError("Subclasses must implement execute()")

print("✅ Workflow base classes defined")
print()
print("Building Blocks:")
print("  - WorkflowEvent: Tracks execution events")
print("  - Executor: Base class for workflow steps")

In [None]:
# Test the Azure OpenAI connection with a simple workflow
# This verifies everything is working before the hands-on exercises

async def test_ai_call():
    """Test a simple AI call."""
    response = client.chat.completions.create(
        model=deployment,
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Say 'Hello from Workflow Lab!' in exactly 5 words."}
        ],
        max_tokens=50
    )
    return response.choices[0].message.content

# Run the test
import asyncio
result = await test_ai_call()

print("Response from Azure OpenAI:")
print(f"  {result}")
print()
print(f"✅ Azure OpenAI connection successful!")
print("You're ready to start the hands-on exercises!")

#  Workflow Concepts - Python

This notebook explains the key concepts of building AI workflows using Python with Azure OpenAI.

## Table of Contents

1. [What is a Workflow?](#1-what-is-a-workflow)
2. [Core Building Blocks](#2-core-building-blocks)
3. [Sequential Workflow](#3-sequential-workflow)
4. [Concurrent Workflow](#4-concurrent-workflow)
5. [Human-in-the-Loop Workflow](#5-human-in-the-loop-workflow)
6. [Best Practices](#6-best-practices)

## 1. What is a Workflow?

A **Workflow** is a series of **Executors** connected in sequence or parallel. Data flows through the workflow, being processed by each executor.

`
          
 Executor   Executor   Executor  
    A               B               C      
          
`

### Key Benefits

| Benefit | Description |
|---------|-------------|
| **Composability** | Build complex workflows from simple, reusable components |
| **AI Integration** | Seamlessly integrate AI agents into the workflow |
| **Human Oversight** | Pause workflows to get human input or approval |
| **Async Support** | Leverage Python's asyncio for concurrent execution |

## 2. Core Building Blocks

### 2.1 Executors

An **Executor** is a unit of work in a workflow. It receives input, processes it, and returns output.

`python
from dataclasses import dataclass
from typing import Any

@dataclass
class WorkflowEvent:
    """Event emitted during workflow execution."""
    executor_id: str
    data: Any


class Executor:
    """Base class for workflow executors."""
    
    def __init__(self, name: str):
        self.name = name
    
    async def execute(self, input_data: Any) -> tuple[Any, WorkflowEvent]:
        """Execute the function and return result with event."""
        raise NotImplementedError
`

### Example: Ticket Intake Executor

`python
class TicketIntakeExecutor(Executor):
    """Executor that handles ticket intake and validation."""
    
    def __init__(self):
        super().__init__("TicketIntake")
    
    async def execute(self, ticket: SupportTicket) -> tuple[str, WorkflowEvent]:
        if not ticket.subject or not ticket.description:
            raise ValueError("Support ticket must have a subject and description.")
        
        ticket_text = f"""
Ticket ID: {ticket.ticket_id}
Customer: {ticket.customer_name}
Subject: {ticket.subject}
Description: {ticket.description}
"""
        event = WorkflowEvent(executor_id=self.name, data=ticket_text)
        return ticket_text, event
`

### 2.2 AI Agents

Integrate AI models (like Azure OpenAI) into workflows:

`python
from openai import AzureOpenAI

class AICategorizationExecutor(Executor):
    """Executor that uses AI to categorize the ticket."""
    
    def __init__(self, client: AzureOpenAI, deployment: str):
        super().__init__("CategorizationAgent")
        self.client = client
        self.deployment = deployment
        self.instructions = """
You are a customer support ticket categorization specialist.
Analyze the incoming support ticket and categorize it into:
- BILLING: Payment issues, invoices, subscription
- TECHNICAL: Software bugs, errors, how-to questions
- GENERAL: Account inquiries, feedback

Respond with JSON: {"category": "CATEGORY", "priority": "HIGH|MEDIUM|LOW"}
"""
    
    async def execute(self, ticket_text: str) -> tuple[str, WorkflowEvent]:
        response = self.client.chat.completions.create(
            model=self.deployment,
            messages=[
                {"role": "system", "content": self.instructions},
                {"role": "user", "content": ticket_text}
            ]
        )
        result = response.choices[0].message.content
        event = WorkflowEvent(executor_id=self.name, data=result)
        return result, event
`

### 2.3 Azure OpenAI Client Factory

Support multiple authentication methods:

`python
import os
from openai import AzureOpenAI
from azure.identity import AzureCliCredential, DefaultAzureCredential

def create_chat_client() -> AzureOpenAI:
    """Create Azure OpenAI client with multiple authentication options."""
    endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
    if not endpoint:
        raise ValueError("AZURE_OPENAI_ENDPOINT environment variable is required")
    
    api_key = os.environ.get("AZURE_OPENAI_API_KEY")
    
    if api_key:
        # Use API key authentication
        return AzureOpenAI(
            azure_endpoint=endpoint,
            api_key=api_key,
            api_version="2024-02-15-preview"
        )
    else:
        # Use Azure credential (CLI, Managed Identity, or Service Principal)
        try:
            credential = AzureCliCredential()
            token = credential.get_token("https://cognitiveservices.azure.com/.default")
        except Exception:
            credential = DefaultAzureCredential()
            token = credential.get_token("https://cognitiveservices.azure.com/.default")
        
        return AzureOpenAI(
            azure_endpoint=endpoint,
            api_key=token.token,
            api_version="2024-02-15-preview"
        )
`

## 3. Sequential Workflow

A **Sequential Workflow** processes data through a linear pipeline where each step depends on the previous one.

### Architecture

`
            
   Ticket      Categorization      Response           Final     
   Intake           AI Agent               AI Agent              Response   
            
`

### Implementation

`python
class SequentialWorkflow:
    """A workflow that executes steps sequentially."""
    
    def __init__(self):
        self.steps: list[Executor] = []
    
    def add_step(self, executor: Executor) -> "SequentialWorkflow":
        """Add a step to the workflow."""
        self.steps.append(executor)
        return self
    
    async def run(self, input_data: Any) -> list[WorkflowEvent]:
        """Execute the workflow and return all events."""
        events = []
        current_data = input_data
        
        for executor in self.steps:
            current_data, event = await executor.execute(current_data)
            events.append(event)
        
        return events
`

### Usage Example

`python
# Create executors
ticket_intake = TicketIntakeExecutor()
categorization_agent = AICategorizationExecutor(client, deployment)
response_agent = AIResponseExecutor(client, deployment)

# Build the sequential workflow
workflow = SequentialWorkflow()
workflow.add_step(ticket_intake)
workflow.add_step(categorization_agent)
workflow.add_step(response_agent)

# Execute
events = await workflow.run(sample_ticket)
`

### Use Cases

- **Document Processing**: Parse  Extract  Summarize  Store
- **Order Processing**: Validate  Calculate  Confirm  Fulfill
- **Support Tickets**: Intake  Categorize  Respond

## 4. Concurrent Workflow

A **Concurrent Workflow** distributes work to multiple executors simultaneously and aggregates results.

### Architecture

`
                         
                      Billing Expert      
                 
   Customer                                     Combined      
   Question                      Response      
      Technical Expert         
                         
                                                                
        Fan-Out  Fan-In 
`

### Implementation

`python
import asyncio

class AIAgent:
    """An AI agent with a specific expertise."""
    
    def __init__(self, client: AzureOpenAI, deployment: str, name: str, instructions: str):
        self.client = client
        self.deployment = deployment
        self.name = name
        self.instructions = instructions
    
    async def answer(self, question: str) -> str:
        response = self.client.chat.completions.create(
            model=self.deployment,
            messages=[
                {"role": "system", "content": self.instructions},
                {"role": "user", "content": question}
            ],
            max_tokens=200
        )
        return response.choices[0].message.content


class ConcurrentWorkflow:
    """A workflow that executes agents concurrently (fan-out/fan-in pattern)."""
    
    def __init__(self, agents: list[AIAgent]):
        self.agents = agents
    
    async def run(self, question: str) -> dict[str, str]:
        async def run_agent(agent: AIAgent) -> tuple[str, str]:
            result = await agent.answer(question)
            return agent.name, result
        
        # Fan-out: run all agents concurrently
        tasks = [run_agent(agent) for agent in self.agents]
        results = await asyncio.gather(*tasks)
        
        # Fan-in: collect results
        return dict(results)
`

### Usage Example

`python
# Create specialized agents
billing_expert = AIAgent(client, deployment, "BillingExpert",
    "You are an expert in billing and subscription matters.")

technical_expert = AIAgent(client, deployment, "TechnicalExpert",
    "You are an expert in technical support and troubleshooting.")

# Create and run workflow
workflow = ConcurrentWorkflow([billing_expert, technical_expert])
results = await workflow.run("My subscription was charged twice and the app crashes.")

for name, response in results.items():
    print(f"[{name}]: {response}")
`

### Use Cases

- **Multi-Expert Analysis**: Get perspectives from multiple specialists
- **Parallel Processing**: Process multiple items simultaneously
- **Consensus Building**: Aggregate opinions from multiple AI agents

## 5. Human-in-the-Loop Workflow

A **Human-in-the-Loop Workflow** pauses execution to get human input, approval, or oversight.

### Architecture

`
            
   Ticket       AI Draft      Human Review        Finalize   
   Intake            Agent            (Supervisor)            Response   
            
                                               
                                               
                                      
                                        Supervisor  
                                        - Approve   
                                        - Edit      
                                        - Escalate  
                                      
`

### Key Components

`python
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable

class ReviewAction(Enum):
    APPROVE = "Approve"
    EDIT = "Edit"
    ESCALATE = "Escalate"

@dataclass
class SupervisorReviewRequest:
    """Request sent to supervisor for review."""
    ticket_id: str
    category: str
    priority: str
    draft_response: str

@dataclass
class SupervisorDecision:
    """Supervisor's decision after reviewing the draft."""
    action: ReviewAction
    modified_response: Optional[str]
    notes: str
`

### Implementation

`python
class HumanInTheLoopWorkflow:
    """A workflow that pauses for human review/approval."""
    
    def __init__(self, client: AzureOpenAI, deployment: str):
        self.ticket_intake = TicketIntakeExecutor()
        self.draft_agent = AIDraftExecutor(client, deployment)
        self.current_ticket: Optional[SupportTicket] = None
    
    async def run(self, ticket: SupportTicket, input_handler: Callable) -> str:
        """Run the workflow with human review."""
        self.current_ticket = ticket
        
        # Step 1: Ticket Intake
        ticket_text, _ = await self.ticket_intake.execute(ticket)
        
        # Step 2: AI Draft Generation
        draft_response, _ = await self.draft_agent.execute(ticket_text)
        
        # Step 3: Create review request
        review_request = SupervisorReviewRequest(
            ticket_id=ticket.ticket_id,
            category=self._determine_category(ticket.subject),
            priority=ticket.priority.value,
            draft_response=draft_response
        )
        
        # Step 4: PAUSE - Get human supervisor decision
        decision = await input_handler(review_request)
        
        # Step 5: Finalize based on decision
        return self._finalize(decision)
`

### Console Input Handler

`python
async def console_supervisor_handler(review_request: SupervisorReviewRequest) -> SupervisorDecision:
    print(f"Ticket: {review_request.ticket_id}")
    print(f"AI Draft: {review_request.draft_response}")
    print("[1] Approve [2] Edit [3] Escalate")
    
    choice = input("Enter choice (1-3): ")
    
    if choice == "1":
        return SupervisorDecision(action=ReviewAction.APPROVE, modified_response=None, notes="Approved")
    elif choice == "2":
        modified = input("Enter modified response: ")
        return SupervisorDecision(action=ReviewAction.EDIT, modified_response=modified, notes="Modified")
    else:
        reason = input("Enter escalation reason: ")
        return SupervisorDecision(action=ReviewAction.ESCALATE, modified_response=None, notes=reason)
`

### Use Cases

- **Content Approval**: AI drafts, human approves before publishing
- **Financial Decisions**: AI recommends, human authorizes large transactions
- **Customer Escalation**: AI handles routine, human handles exceptions
- **Quality Control**: AI processes, human spot-checks

## 6. Best Practices

### 6.1 Executor Design

- **Single Responsibility**: Each executor should do one thing well
- **Stateless When Possible**: Avoid storing state in executors
- **Clear Naming**: Use descriptive names that indicate the executor's purpose
- **Type Hints**: Use Python type hints for better code clarity

### 6.2 AI Agent Instructions

`python
#  Good: Specific, structured output
instructions = """
Categorize tickets into: BILLING, TECHNICAL, GENERAL
Output JSON: {"category": "CATEGORY", "confidence": 0.0-1.0}
Be concise. Only output the JSON.
"""

#  Bad: Vague, unstructured
instructions = "Help with customer support"
`

### 6.3 Error Handling

`python
try:
    events = await workflow.run(sample_ticket)
except Exception as e:
    print(f"Workflow error: {e}")
    # Log, retry, or escalate
`

### 6.4 Configuration Management

`python
import os

# Support multiple auth methods via environment variables
# 1. API Key: AZURE_OPENAI_API_KEY
# 2. Azure CLI: (no env vars needed)
# 3. Service Principal: AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET

endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini")
`

## Summary

| Pattern | When to Use | Key Methods |
|---------|-------------|-------------|
| **Sequential** | Linear processing pipeline | add_step(), un() |
| **Concurrent** | Parallel processing, multi-expert | asyncio.gather() |
| **Human-in-the-Loop** | Approval, oversight, exceptions | Input handler callback |

### Quick Reference

`python
# Sequential workflow
workflow = SequentialWorkflow()
workflow.add_step(executor_a)
workflow.add_step(executor_b)
events = await workflow.run(input_data)

# Concurrent workflow
workflow = ConcurrentWorkflow([agent_a, agent_b])
results = await workflow.run(question)

# Human-in-the-loop workflow
workflow = HumanInTheLoopWorkflow(client, deployment)
result = await workflow.run(ticket, input_handler)
`