## üîê Prerequisites

Before running the first cell, make sure you're authenticated with Azure CLI:

```bash
az login
```

# üîÑ Workflow Observability

## Industry Use Case: Loan Processing Pipeline

This notebook demonstrates **telemetry for workflows** with multiple executors.

| Feature | FSI Application |
|---------|-----------------|
| **Workflow Spans** | Track loan processing stages |
| **Executor Tracing** | Monitor validation and approval steps |
| **Message Flow** | Trace data between pipeline stages |

In [None]:
import os
from dotenv import load_dotenv

load_dotenv('../../.env', override=True)

print(f"‚úÖ Environment loaded")

In [None]:
import asyncio
from random import uniform

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    handler,
)
from agent_framework.observability import configure_otel_providers, get_tracer
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_trace_id
from typing_extensions import Never

print("‚úÖ All imports loaded")

## Define Loan Processing Executors

A simple loan processing pipeline:
1. **ValidationExecutor**: Validates loan application data
2. **CreditCheckExecutor**: Performs credit check
3. **ApprovalExecutor**: Makes final approval decision

In [None]:
# Loan application data structure
class LoanApplication:
    def __init__(self, applicant: str, amount: float, income: float):
        self.applicant = applicant
        self.amount = amount
        self.income = income
        self.credit_score = 0
        self.status = "pending"


class ValidationExecutor(Executor):
    """Validates loan application data."""

    @handler
    async def validate(self, app: LoanApplication, ctx: WorkflowContext[LoanApplication]) -> None:
        print(f"[Validation] Processing application for {app.applicant}")
        await asyncio.sleep(uniform(0.1, 0.3))  # Simulate validation
        
        # Basic validation
        if app.amount > app.income * 5:
            app.status = "rejected - amount too high"
            print(f"[Validation] ‚ùå Rejected: Loan amount exceeds 5x income")
        else:
            app.status = "validated"
            print(f"[Validation] ‚úÖ Passed: ${app.amount:,.2f} loan for {app.applicant}")
        
        await ctx.send_message(app)


class CreditCheckExecutor(Executor):
    """Performs credit check on the applicant."""

    @handler
    async def check_credit(self, app: LoanApplication, ctx: WorkflowContext[LoanApplication]) -> None:
        print(f"[CreditCheck] Checking credit for {app.applicant}")
        await asyncio.sleep(uniform(0.2, 0.5))  # Simulate credit API call
        
        if app.status == "validated":
            # Simulate credit score
            app.credit_score = int(uniform(600, 850))
            print(f"[CreditCheck] Credit score: {app.credit_score}")
        else:
            print(f"[CreditCheck] Skipped - application already {app.status}")
        
        await ctx.send_message(app)


class ApprovalExecutor(Executor):
    """Makes final approval decision."""

    @handler
    async def approve(self, app: LoanApplication, ctx: WorkflowContext[Never, str]) -> None:
        print(f"[Approval] Making decision for {app.applicant}")
        await asyncio.sleep(uniform(0.1, 0.2))  # Simulate decision
        
        if app.status != "validated":
            result = f"DENIED: {app.status}"
        elif app.credit_score >= 700:
            result = f"APPROVED: ${app.amount:,.2f} loan at 5.5% APR (Credit: {app.credit_score})"
        elif app.credit_score >= 650:
            result = f"APPROVED: ${app.amount:,.2f} loan at 8.5% APR (Credit: {app.credit_score})"
        else:
            result = f"DENIED: Credit score {app.credit_score} below minimum 650"
        
        print(f"[Approval] Decision: {result}")
        await ctx.yield_output(result)


print("‚úÖ Executors defined: ValidationExecutor, CreditCheckExecutor, ApprovalExecutor")

## Configure Observability & Run Workflow

In [None]:
async def run_loan_processing_workflow():
    """Run loan processing workflow with observability."""
    print("\n--- Loan Processing Workflow with Observability ---\n")
    
    # Configure OpenTelemetry providers (uses console exporter by default)
    # Set OTEL_EXPORTER_OTLP_ENDPOINT in .env to export to a collector
    configure_otel_providers()
    print("‚úÖ Observability configured\n")
    
    # Create executors
    validation = ValidationExecutor(id="loan-validation")
    credit_check = CreditCheckExecutor(id="credit-check")
    approval = ApprovalExecutor(id="loan-approval")
    
    # Build workflow pipeline
    workflow = (
        WorkflowBuilder()
        .add_edge(validation, credit_check)
        .add_edge(credit_check, approval)
        .set_start_executor(validation)
        .build()
    )
    
    # Test loan applications
    applications = [
        LoanApplication("Alice Johnson", 50000, 75000),   # Should approve
        LoanApplication("Bob Smith", 500000, 60000),      # Amount too high
    ]
    
    with get_tracer().start_as_current_span("Loan Processing Batch", kind=SpanKind.CLIENT) as span:
        trace_id = format_trace_id(span.get_span_context().trace_id)
        print(f"üîç Trace ID: {trace_id}\n")
        
        for app in applications:
            print(f"\n{'='*50}")
            print(f"Processing: {app.applicant} - ${app.amount:,.2f} (Income: ${app.income:,.2f})")
            print(f"{'='*50}")
            
            result = None
            async for event in workflow.run_stream(app):
                if isinstance(event, WorkflowOutputEvent):
                    result = event.data
            
            print(f"\nüìã Final Result: {result}")

await run_loan_processing_workflow()

## Key Takeaways

| Feature | Description |
|---------|-------------|
| `configure_otel_providers()` | Set up OpenTelemetry tracing |
| `WorkflowBuilder` | Create sequential/parallel pipelines |
| `@handler` | Mark executor methods for workflow |
| `ctx.send_message()` | Pass data to next executor |
| `ctx.yield_output()` | Return final workflow result |

## Telemetry Collected

| Span Type | Description |
|-----------|-------------|
| `workflow.build` | Workflow construction |
| `workflow.run` | Overall execution |
| `executor.process` | Each executor invocation |
| `message.send` | Data flow between executors |