# 🕸️ LangGraph Orchestration Tutorial - Advanced Multi-Agent Workflows

Welcome to the comprehensive tutorial for understanding **advanced multi-agent orchestration** using LangGraph, inspired by the Orion AI Agent System! This tutorial demonstrates how to build sophisticated AI workflows that coordinate multiple specialized agents.

## 🎯 What You'll Learn

- Core concepts of graph-based AI orchestration
- Building stateful multi-agent workflows
- Agent coordination and message passing
- Conditional execution and decision nodes
- Error handling and recovery in complex workflows
- State management across agent interactions

## 🚀 Tutorial Approach

This is a **standalone educational tutorial** that demonstrates the concepts and techniques used in multi-agent orchestration. We'll build simplified versions of key components to understand how they work, similar to the Orion LangGraph orchestrator.

## 📚 Prerequisites

- Completion of Tutorials 1 & 2 (GitHub Integration and AI Generation)
- Basic understanding of state machines and graph theory
- OpenRouter API key for AI model access
- Python knowledge with async/await concepts

## 🔧 LangGraph Overview

LangGraph enables building **stateful, multi-actor applications** with LLMs:
- **Cyclic flows**: Beyond simple chains to complex decision trees
- **Controllability**: Precise control over agent interactions
- **Persistence**: Maintain state across multiple interactions
- **Human-in-the-loop**: Support for human oversight and intervention

## 📦 Setup and Environment

Let's start by setting up our environment and importing the necessary components for LangGraph orchestration.

In [24]:
# Import necessary libraries for LangGraph orchestration tutorial
import os
import json
import asyncio
import time
from typing import Any, Dict, List, Optional, TypedDict, Annotated
from enum import Enum
from openai import OpenAI

from pydantic import BaseModel, Field

print(f"📁 Current working directory: {os.getcwd()}")

📁 Current working directory: /Users/ishandutta/Documents/code/ai-accelerator/orion


## 🔑 Environment Configuration

Let's ensure our OpenRouter API key is configured for multi-agent AI operations.

In [25]:
# Check if OpenRouter API key is configured
api_key = os.getenv("OPENROUTER_API_KEY")

# Configuration for our orchestration system
ORCHESTRATOR_CONFIG = {
    "default_model": "openai/gpt-4o-mini",
    "specialized_models": {
        "code_generator": "openai/gpt-4o-mini",
        "code_reviewer": "openai/gpt-4o-mini",
        "task_classifier": "openai/gpt-4o-mini",
    },
    "max_iterations": 6,
    "timeout_seconds": 300,
}

print(f"\n🔧 Orchestrator configuration loaded:")
print(f"   • Default model: {ORCHESTRATOR_CONFIG['default_model']}")
print(f"   • Specialized agents: {len(ORCHESTRATOR_CONFIG['specialized_models'])}")
print(f"   • Max iterations: {ORCHESTRATOR_CONFIG['max_iterations']}")


🔧 Orchestrator configuration loaded:
   • Default model: openai/gpt-4o-mini
   • Specialized agents: 3
   • Max iterations: 6


## 🏗️ Building Our Simplified LangGraph System

Let's create a simplified version of LangGraph's core concepts to understand how orchestration works.

In [15]:
# State management for our orchestration system
class WorkflowState(TypedDict):
    """State that gets passed between agents in our workflow."""

    task_description: str
    current_step: str
    messages: List[Dict[str, Any]]
    generated_files: List[Dict[str, str]]
    errors: List[str]
    metadata: Dict[str, Any]
    completed_agents: List[str]
    next_agent: Optional[str]


class AgentType(Enum):
    """Types of agents in our orchestration system."""

    TASK_CLASSIFIER = "task_classifier"
    CODE_GENERATOR = "code_generator"
    CODE_REVIEWER = "code_reviewer"
    FILE_MANAGER = "file_manager"
    ORCHESTRATOR = "orchestrator"


class BaseAgent:
    """Base agent class that all specialized agents inherit from."""

    def __init__(self, agent_type: AgentType, model: str, api_key: str):
        self.agent_type = agent_type
        self.model = model
        self.client = OpenAI(
            api_key=api_key,
            base_url="https://openrouter.ai/api/v1",
            default_headers={"X-Title": f"Orion {agent_type.value} Agent"},
        )
        self.execution_history = []

    async def execute(self, state: WorkflowState) -> WorkflowState:
        """Execute this agent's functionality and update state."""
        start_time = time.time()

        try:
            print(f"🤖 {self.agent_type.value} starting execution...")

            # Call the agent-specific logic
            result_state = await self._process(state)

            # Update metadata
            execution_time = time.time() - start_time

            result_state["metadata"][
                f"{self.agent_type.value}_execution_time"
            ] = execution_time
            
            result_state["completed_agents"].append(self.agent_type.value)

            print(f"✅ {self.agent_type.value} completed in {execution_time:.2f}s")
            return result_state

        except Exception as e:
            error_msg = f"Agent {self.agent_type.value} failed: {str(e)}"
            print(f"❌ {error_msg}")

            state["errors"].append(error_msg)
            return state

    async def _process(self, state: WorkflowState) -> WorkflowState:
        """Override this method in subclasses for agent-specific logic."""
        raise NotImplementedError("Subclasses must implement _process method")


print("🏗️ Base orchestration framework defined!")
print("📋 Core components:")
print("   • WorkflowState: Manages data flow between agents")
print("   • AgentType: Defines specialized agent types")
print("   • BaseAgent: Foundation for all specialized agents")

🏗️ Base orchestration framework defined!
📋 Core components:
   • WorkflowState: Manages data flow between agents
   • AgentType: Defines specialized agent types
   • BaseAgent: Foundation for all specialized agents


## 🎯 Specialized Agent Implementations

Let's create specific agents that demonstrate different capabilities in our orchestration system.

In [26]:
class TaskClassifierAgent(BaseAgent):
    """Agent that analyzes and classifies incoming tasks."""
    
    def __init__(self, api_key: str):
        super().__init__(AgentType.TASK_CLASSIFIER, ORCHESTRATOR_CONFIG["specialized_models"]["task_classifier"], api_key)
    
    async def _process(self, state: WorkflowState) -> WorkflowState:
        task_description = state["task_description"]
        
        system_prompt = """You are a task classification expert. Analyze the given task and determine:
1. Task type (code_generation, code_review, file_management, research, etc.)
2. Complexity level (simple, medium, complex)
3. Required agents (list which agents should be involved)
4. Estimated effort (low, medium, high)

Respond with JSON in this format:
{
  "task_type": "code_generation",
  "complexity": "medium", 
  "required_agents": ["code_generator", "code_reviewer"],
  "estimated_effort": "medium",
  "reasoning": "Explanation of classification",
  "next_agent": "code_generator"
}"""
        
        user_prompt = f"Task to classify: {task_description}"
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            temperature=0.1,
            response_format={"type": "json_object"}
        )
        
        classification = json.loads(response.choices[0].message.content)
        
        # Update state with classification
        state["current_step"] = "task_classified"
        state["metadata"]["task_classification"] = classification
        state["next_agent"] = classification.get("next_agent")
        state["messages"].append({
            "agent": self.agent_type.value,
            "content": f"Task classified as {classification['task_type']} with {classification['complexity']} complexity",
            "data": classification
        })
        
        return state

class CodeGeneratorAgent(BaseAgent):
    """Agent specialized in generating code based on requirements."""
    
    def __init__(self, api_key: str):
        super().__init__(AgentType.CODE_GENERATOR, ORCHESTRATOR_CONFIG["specialized_models"]["code_generator"], api_key)
    
    async def _process(self, state: WorkflowState) -> WorkflowState:
        task_description = state["task_description"]
        classification = state["metadata"].get("task_classification", {})
        
        system_prompt = """You are an expert software developer. Generate complete, production-ready code.
        
CRITICAL: You MUST respond with valid JSON in this exact structure:
{
  "success": true,
  "files": [
    {"name": "filename.py", "content": "complete file content"}
  ],
  "reasoning": "Step-by-step explanation",
  "dependencies": ["package1", "package2"],
  "confidence": 0.95,
  "next_action": "code_review"
}

Requirements:
- Complete, runnable code with all imports
- Google-style docstrings and type hints  
- Proper error handling
- Follow PEP 8 style guidelines"""
        
        context = f"Task: {task_description}\nClassification: {classification}"
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": context}
            ],
            temperature=0.1,
            response_format={"type": "json_object"}
        )
        
        generation_result = json.loads(response.choices[0].message.content)
        
        # Update state with generated code
        state["current_step"] = "code_generated"
        state["generated_files"].extend(generation_result.get("files", []))
        state["metadata"]["generation_result"] = generation_result
        state["next_agent"] = "code_reviewer" if generation_result.get("success") else "orchestrator"
        state["messages"].append({
            "agent": self.agent_type.value,
            "content": f"Generated {len(generation_result.get('files', []))} files with {generation_result.get('confidence', 0):.1%} confidence",
            "data": generation_result
        })
        
        return state


In [27]:
class CodeReviewerAgent(BaseAgent):
    """Agent that reviews and validates generated code."""

    def __init__(self, api_key: str):
        super().__init__(
            AgentType.CODE_REVIEWER,
            ORCHESTRATOR_CONFIG["specialized_models"]["code_reviewer"],
            api_key,
        )

    async def _process(self, state: WorkflowState) -> WorkflowState:
        generated_files = state["generated_files"]

        if not generated_files:
            state["errors"].append("No files to review")
            return state

        # Check if this is a retry (second review attempt)
        review_attempts = state["metadata"].get("review_attempts", 0)
        is_retry = review_attempts >= 1
        
        # Update review attempt counter
        state["metadata"]["review_attempts"] = review_attempts + 1

        if is_retry:
            # On second attempt, automatically approve with a message
            review_result = {
                "overall_quality": "fair",
                "issues_found": [],
                "strengths": ["Code meets basic requirements"],
                "recommendations": ["Consider improvements in future iterations"],
                "approved": True,
                "confidence": 0.8,
                "auto_approved": True,
                "reason": "Automatically approved on retry to prevent infinite loops"
            }
            
            print(f"🔄 Auto-approving code on retry attempt #{review_attempts + 1}")
        else:
            # First attempt - do normal review
            system_prompt = """You are a senior code reviewer. Analyze the provided code and provide feedback.
                            
                    IMPORTANT: Be reasonably lenient in approval. Approve code that:
                    - Is syntactically correct and runnable
                    - Has basic error handling
                    - Follows reasonable coding practices
                    - Accomplishes the requested task
                    
                    Only reject code that has serious issues like:
                    - Syntax errors or won't run
                    - Security vulnerabilities
                    - Completely wrong functionality
                    - Missing critical error handling for dangerous operations
                    
                    Respond with JSON in this format:
                    {
                    "overall_quality": "excellent|good|fair|poor",
                    "issues_found": [
                        {"severity": "high|medium|low", "description": "Issue description", "file": "filename.py", "suggestion": "How to fix"}
                    ],
                    "strengths": ["List of code strengths"],
                    "recommendations": ["List of improvements"],
                    "approved": true,
                    "confidence": 0.9
                    }"""

            # Prepare code for review
            code_summary = "\n\n".join(
                [
                    f"File: {file_info['name']}\n{file_info['content'][:500]}..."
                    for file_info in generated_files
                ]
            )

            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": f"Code to review:\n{code_summary}"},
                ],
                temperature=0.1,
                response_format={"type": "json_object"},
            )

            review_result = json.loads(response.choices[0].message.content)

        # Update state with review results
        state["current_step"] = "code_reviewed"
        state["metadata"]["code_review"] = review_result
        state["next_agent"] = (
            "file_manager" if review_result.get("approved", False) else "code_generator"
        )
        
        # Enhanced message with retry info
        approval_status = "✅ APPROVED" if review_result.get("approved", False) else "❌ REJECTED"
        retry_info = f" (Auto-approved on retry)" if is_retry and review_result.get("approved", False) else ""
        
        state["messages"].append(
            {
                "agent": self.agent_type.value,
                "content": f"Review completed - Quality: {review_result.get('overall_quality', 'unknown')}, {approval_status}{retry_info}",
                "data": review_result,
            }
        )

        return state


class FileManagerAgent(BaseAgent):
    """Agent that manages file operations - creating, writing, and organizing files."""

    def __init__(self, api_key: str):
        super().__init__(
            AgentType.FILE_MANAGER,
            ORCHESTRATOR_CONFIG["default_model"],  # Use default model for file operations
            api_key,
        )

    async def _process(self, state: WorkflowState) -> WorkflowState:
        generated_files = state["generated_files"]
        
        if not generated_files:
            state["errors"].append("No files to save")
            return state

        # Determine output directory - use metadata if provided, otherwise create default
        output_dir = state["metadata"].get("output_directory", "/Users/ishandutta/Documents/code/ai-accelerator/orion/langgraph_demo")
        
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        files_created = []
        files_failed = []

        for file_info in generated_files:
            filename = file_info.get("name", "unknown.py")
            content = file_info.get("content", "")
            
            if not filename or not content:
                files_failed.append(filename)
                continue
                
            try:
                file_path = os.path.join(output_dir, filename)
                
                # Create subdirectories if filename includes path
                dir_path = os.path.dirname(file_path)
                if dir_path:
                    os.makedirs(dir_path, exist_ok=True)
                
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(content)
                
                files_created.append(file_path)
                print(f"✅ Created file: {file_path}")
                
            except Exception as e:
                files_failed.append(f"{filename}: {str(e)}")
                print(f"❌ Failed to create {filename}: {e}")

        # Update state with file operation results
        state["current_step"] = "files_saved"
        state["metadata"]["files_created"] = files_created
        state["metadata"]["files_failed"] = files_failed
        state["metadata"]["output_directory"] = output_dir
        state["next_agent"] = "end"
        
        state["messages"].append(
            {
                "agent": self.agent_type.value,
                "content": f"File operations completed - Created: {len(files_created)} files, Failed: {len(files_failed)} files in {output_dir}",
                "data": {
                    "files_created": files_created,
                    "files_failed": files_failed,
                    "output_directory": output_dir
                },
            }
        )

        # Add any failures as errors
        if files_failed:
            for failure in files_failed:
                state["errors"].append(f"File operation failed: {failure}")

        return state


print("🎯 Specialized agents implemented!")
print("🤖 Available agents:")
print("   • TaskClassifierAgent: Analyzes and categorizes tasks")
print("   • CodeGeneratorAgent: Creates code based on requirements")
print("   • CodeReviewerAgent: Reviews and validates generated code")
print("   • FileManagerAgent: Saves generated files to disk")

🎯 Specialized agents implemented!
🤖 Available agents:
   • TaskClassifierAgent: Analyzes and categorizes tasks
   • CodeGeneratorAgent: Creates code based on requirements
   • CodeReviewerAgent: Reviews and validates generated code
   • FileManagerAgent: Saves generated files to disk


## 🕸️ LangGraph-Style Orchestrator

Now let's build our main orchestrator that coordinates agent execution using graph-based workflow concepts.

In [28]:
class SimpleLangGraphOrchestrator:
    """Simplified LangGraph-style orchestrator for multi-agent workflows."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.agents = {}
        self.execution_graph = {}
        self.workflow_history = []

        # Initialize agents
        self.agents[AgentType.TASK_CLASSIFIER] = TaskClassifierAgent(api_key)
        self.agents[AgentType.CODE_GENERATOR] = CodeGeneratorAgent(api_key)
        self.agents[AgentType.CODE_REVIEWER] = CodeReviewerAgent(api_key)
        self.agents[AgentType.FILE_MANAGER] = FileManagerAgent(api_key)

        # Define execution graph (workflow paths)
        self.execution_graph = {
            "start": "task_classifier",
            "task_classifier": {
                "code_generation": "code_generator",
                "default": "code_generator",
            },
            "code_generator": "code_reviewer",
            "code_reviewer": {"approved": "file_manager", "rejected": "code_generator"},
            "file_manager": "end",
            "end": None,
        }

    def create_initial_state(self, task_description: str, output_directory: str = None) -> WorkflowState:
        """Create initial workflow state for a new task."""
        return {
            "task_description": task_description,
            "current_step": "start",
            "messages": [],
            "generated_files": [],
            "errors": [],
            "metadata": {
                "start_time": time.time(),
                "workflow_id": f"workflow_{int(time.time())}",
                "output_directory": output_directory or f"/Users/ishandutta/Documents/code/ai-accelerator/orion/generated_{int(time.time())}",
            },
            "completed_agents": [],
            "next_agent": None,
        }

    def determine_next_agent(self, state: WorkflowState) -> Optional[str]:
        """Determine the next agent to execute based on current state."""
        current_step = state["current_step"]

        # Handle start state
        if current_step == "start":
            return "task_classifier"

        # Handle task classification result
        if current_step == "task_classified":
            task_type = (
                state["metadata"]
                .get("task_classification", {})
                .get("task_type", "code_generation")
            )
            next_step = self.execution_graph.get("task_classifier", {}).get(
                task_type, "code_generator"
            )
            return next_step

        # Handle code generation result
        if current_step == "code_generated":
            return "code_reviewer"

        # Handle code review result
        if current_step == "code_reviewed":
            approved = state["metadata"].get("code_review", {}).get("approved", False)
            return "file_manager" if approved else "code_generator"

        # Handle file manager completion
        if current_step == "files_saved":
            return "end"

        return "end"

    async def execute_workflow(self, task_description: str, output_directory: str = None) -> WorkflowState:
        """Execute a complete multi-agent workflow."""
        print("🚀 Starting LangGraph-style workflow execution")
        print("=" * 60)

        # Initialize state
        state = self.create_initial_state(task_description, output_directory)
        current_agent = "task_classifier"
        iterations = 0

        print(f"📋 Task: {task_description}")
        print(f"🆔 Workflow ID: {state['metadata']['workflow_id']}")
        print(f"📁 Output Directory: {state['metadata']['output_directory']}")
        print()

        while (
            current_agent
            and current_agent != "end"
            and iterations < ORCHESTRATOR_CONFIG["max_iterations"]
        ):
            iterations += 1
            print(f"🔄 Iteration {iterations}: Executing {current_agent}")

            # Get the agent
            agent_type = AgentType(current_agent)
            agent = self.agents.get(agent_type)

            if not agent:
                error_msg = f"Agent {current_agent} not found"
                print(f"❌ {error_msg}")
                state["errors"].append(error_msg)
                break

            # Execute agent
            try:
                state = await agent.execute(state)

                # Determine next agent
                next_agent = self.determine_next_agent(state)

                print(f"📍 Current step: {state['current_step']}")
                print(f"➡️ Next agent: {next_agent}")
                print()

                current_agent = next_agent

                # Break if errors occurred
                if state["errors"]:
                    print("⚠️ Errors detected, stopping workflow")
                    break

            except Exception as e:
                error_msg = f"Workflow execution failed: {str(e)}"
                print(f"❌ {error_msg}")
                state["errors"].append(error_msg)
                break

        # Finalize workflow
        state["metadata"]["end_time"] = time.time()
        state["metadata"]["total_time"] = (
            state["metadata"]["end_time"] - state["metadata"]["start_time"]
        )
        state["metadata"]["iterations"] = iterations
        state["current_step"] = "completed" if not state["errors"] else "failed"

        # Store in history
        self.workflow_history.append(state)

        print("🏁 Workflow execution completed")
        print(f"⏱️ Total time: {state['metadata']['total_time']:.2f}s")
        print(f"🔄 Iterations: {iterations}")
        print(f"✅ Status: {state['current_step']}")
        
        # Show created files
        files_created = state["metadata"].get("files_created", [])
        if files_created:
            print(f"📁 Files created ({len(files_created)}):")
            for file_path in files_created:
                print(f"   • {file_path}")

        return state

    def get_workflow_summary(self, state: WorkflowState) -> Dict[str, Any]:
        """Generate a summary of the workflow execution."""
        return {
            "workflow_id": state["metadata"]["workflow_id"],
            "task": state["task_description"],
            "status": state["current_step"],
            "duration": state["metadata"].get("total_time", 0),
            "iterations": state["metadata"].get("iterations", 0),
            "agents_executed": len(state["completed_agents"]),
            "files_generated": len(state["generated_files"]),
            "files_saved": len(state["metadata"].get("files_created", [])),
            "errors": len(state["errors"]),
            "messages": len(state["messages"]),
            "output_directory": state["metadata"].get("output_directory"),
        }


print("🕸️ LangGraph-style orchestrator implemented!")
print("🚀 Features:")
print("   • Graph-based execution flow")
print("   • Conditional agent routing")
print("   • State management across agents")
print("   • Error handling and recovery")
print("   • Execution history tracking")
print("   • File management and persistence")

🕸️ LangGraph-style orchestrator implemented!
🚀 Features:
   • Graph-based execution flow
   • Conditional agent routing
   • State management across agents
   • Error handling and recovery
   • Execution history tracking
   • File management and persistence


## 🎬 Creating Our Orchestrator Instance

Let's create an instance of our orchestrator and explore its capabilities.

In [29]:
# Create orchestrator instance
orchestrator = SimpleLangGraphOrchestrator(api_key)

print("🎬 LangGraph Orchestrator created successfully!")
print(f"🤖 Agents available: {len(orchestrator.agents)}")
print(f"📊 Workflow history: {len(orchestrator.workflow_history)} executions")

print("\n🕸️ Execution Graph:")
for step, next_steps in orchestrator.execution_graph.items():
    if isinstance(next_steps, dict):
        for condition, next_step in next_steps.items():
            print(f"   {step} --({condition})--> {next_step}")
    else:
        print(f"   {step} --> {next_steps}")

print("\n📋 Available Agent Types:")
for agent_type, agent in orchestrator.agents.items():
    print(f"   • {agent_type.value}: {agent.model}")

🎬 LangGraph Orchestrator created successfully!
🤖 Agents available: 4
📊 Workflow history: 0 executions

🕸️ Execution Graph:
   start --> task_classifier
   task_classifier --(code_generation)--> code_generator
   task_classifier --(default)--> code_generator
   code_generator --> code_reviewer
   code_reviewer --(approved)--> file_manager
   code_reviewer --(rejected)--> code_generator
   file_manager --> end
   end --> None

📋 Available Agent Types:
   • task_classifier: openai/gpt-4o-mini
   • code_generator: openai/gpt-4o-mini
   • code_reviewer: openai/gpt-4o-mini
   • file_manager: openai/gpt-4o-mini


## 🚀 Simple Workflow Execution

Let's execute a simple workflow to see our multi-agent orchestration in action.

In [30]:
# Execute a simple workflow
simple_task = "Create a Python function that calculates factorial of a number"

print(f"🎯 Executing simple workflow: {simple_task}")
print("⏳ This will demonstrate task classification → code generation → code review")

# Execute the workflow
simple_result = await orchestrator.execute_workflow(simple_task, output_directory="/Users/ishandutta/Documents/code/ai-accelerator/orion/langgraph_demo")

# Display results
summary = orchestrator.get_workflow_summary(simple_result)

print("\n📊 Workflow Summary:")
print("=" * 30)
for key, value in summary.items():
    print(f"{key}: {value}")

print("\n💬 Agent Messages:")
print("-" * 30)
for i, message in enumerate(simple_result["messages"], 1):
    print(f"{i}. [{message['agent']}]: {message['content']}")

print("\n📁 Generated Files:")
print("-" * 30)
for file_info in simple_result["generated_files"]:
    filename = file_info.get("name", "unknown.py")
    content_preview = file_info.get("content", "")[:200]
    print(f"File: {filename}")
    print(f"Preview: {content_preview}...")
    print()

if simple_result["errors"]:
    print("\n❌ Errors:")
    for error in simple_result["errors"]:
        print(f"   • {error}")

🎯 Executing simple workflow: Create a Python function that calculates factorial of a number
⏳ This will demonstrate task classification → code generation → code review
🚀 Starting LangGraph-style workflow execution
📋 Task: Create a Python function that calculates factorial of a number
🆔 Workflow ID: workflow_1758958493
📁 Output Directory: /Users/ishandutta/Documents/code/ai-accelerator/orion/langgraph_demo

🔄 Iteration 1: Executing task_classifier
🤖 task_classifier starting execution...
✅ task_classifier completed in 2.51s
📍 Current step: task_classified
➡️ Next agent: code_generator

🔄 Iteration 2: Executing code_generator
🤖 code_generator starting execution...
✅ code_generator completed in 9.52s
📍 Current step: code_generated
➡️ Next agent: code_reviewer

🔄 Iteration 3: Executing code_reviewer
🤖 code_reviewer starting execution...
✅ code_reviewer completed in 4.51s
📍 Current step: code_reviewed
➡️ Next agent: code_generator

🔄 Iteration 4: Executing code_generator
🤖 code_generator sta

## 🔀 Complex Workflow with Conditional Logic

Let's demonstrate a more complex workflow that shows conditional execution paths.

In [31]:
# Execute a complex workflow that might require multiple iterations
complex_task = "Create a Gradio app to use diffusers package for text to image generation and use accelerate library to speed up the process"

print(f"🎯 Executing complex workflow: {complex_task}")
print("⏳ This may require multiple code generation/review cycles")

if orchestrator:
    # Execute the complex workflow
    complex_result = await orchestrator.execute_workflow(complex_task, output_directory="/Users/ishandutta/Documents/code/ai-accelerator/orion/langgraph_demo")

    # Display detailed results
    summary = orchestrator.get_workflow_summary(complex_result)

    print("\n📊 Complex Workflow Results:")
    print("=" * 40)
    print(f"Status: {summary['status']}")
    print(f"Duration: {summary['duration']:.2f}s")
    print(f"Iterations: {summary['iterations']}")
    print(f"Agents executed: {summary['agents_executed']}")
    print(f"Files generated: {summary['files_generated']}")
    print(f"Messages exchanged: {summary['messages']}")

    # Show task classification results
    classification = complex_result["metadata"].get("task_classification", {})
    if classification:
        print("\n🔍 Task Classification:")
        print(f"   Type: {classification.get('task_type', 'unknown')}")
        print(f"   Complexity: {classification.get('complexity', 'unknown')}")
        print(f"   Effort: {classification.get('estimated_effort', 'unknown')}")
        print(f"   Required agents: {classification.get('required_agents', [])}")

    # Show code generation results
    generation = complex_result["metadata"].get("generation_result", {})
    if generation:
        print("\n💻 Code Generation:")
        print(f"   Success: {generation.get('success', False)}")
        print(f"   Confidence: {generation.get('confidence', 0):.1%}")
        print(f"   Dependencies: {generation.get('dependencies', [])}")

    # Show code review results
    review = complex_result["metadata"].get("code_review", {})
    if review:
        print("\n🔍 Code Review:")
        print(f"   Quality: {review.get('overall_quality', 'unknown')}")
        print(f"   Approved: {review.get('approved', False)}")
        print(f"   Issues found: {len(review.get('issues_found', []))}")
        print(f"   Recommendations: {len(review.get('recommendations', []))}")

    # Show execution timeline
    print("\n⏱️ Execution Timeline:")
    for i, message in enumerate(complex_result["messages"], 1):
        agent = message["agent"]
        content = message["content"]
        print(f"   {i}. [{agent}] {content}")

else:
    print("⚠️ Complex workflow not available without API key")
    print("📈 Would demonstrate:")
    print("   • Advanced task classification")
    print("   • Multi-file code generation")
    print("   • Iterative review and refinement")
    print("   • Conditional execution paths")
    print("   • State persistence across iterations")

🎯 Executing complex workflow: Create a Gradio app to use diffusers package for text to image generation and use accelerate library to speed up the process
⏳ This may require multiple code generation/review cycles
🚀 Starting LangGraph-style workflow execution
📋 Task: Create a Gradio app to use diffusers package for text to image generation and use accelerate library to speed up the process
🆔 Workflow ID: workflow_1758958581
📁 Output Directory: /Users/ishandutta/Documents/code/ai-accelerator/orion/langgraph_demo

🔄 Iteration 1: Executing task_classifier
🤖 task_classifier starting execution...
✅ task_classifier completed in 6.43s
📍 Current step: task_classified
➡️ Next agent: code_generator

🔄 Iteration 2: Executing code_generator
🤖 code_generator starting execution...
✅ code_generator completed in 12.90s
📍 Current step: code_generated
➡️ Next agent: code_reviewer

🔄 Iteration 3: Executing code_reviewer
🤖 code_reviewer starting execution...
✅ code_reviewer completed in 5.16s
📍 Current ste

## 📊 Workflow Analytics and Insights

Let's analyze the performance and behavior of our orchestration system.

In [32]:
# Analyze workflow performance and patterns
if orchestrator and orchestrator.workflow_history:
    history = orchestrator.workflow_history

    print("📈 Workflow Analytics Dashboard")
    print("=" * 50)

    # Overall statistics
    total_workflows = len(history)
    successful_workflows = len([w for w in history if w["current_step"] == "completed"])
    avg_duration = (
        sum(w["metadata"].get("total_time", 0) for w in history) / total_workflows
    )
    avg_iterations = (
        sum(w["metadata"].get("iterations", 0) for w in history) / total_workflows
    )

    print(f"📊 Overall Performance:")
    print(f"   Total workflows executed: {total_workflows}")
    print(
        f"   Success rate: {successful_workflows}/{total_workflows} ({successful_workflows/total_workflows*100:.1f}%)"
    )
    print(f"   Average duration: {avg_duration:.2f}s")
    print(f"   Average iterations: {avg_iterations:.1f}")

    # Agent utilization
    agent_usage = {}
    for workflow in history:
        for agent in workflow["completed_agents"]:
            agent_usage[agent] = agent_usage.get(agent, 0) + 1

    print(f"\n🤖 Agent Utilization:")
    for agent, count in agent_usage.items():
        percentage = (count / total_workflows) * 100
        print(f"   {agent}: {count} times ({percentage:.1f}%)")

    # Task complexity analysis
    complexity_stats = {}
    for workflow in history:
        classification = workflow["metadata"].get("task_classification", {})
        complexity = classification.get("complexity", "unknown")
        complexity_stats[complexity] = complexity_stats.get(complexity, 0) + 1

    print(f"\n📋 Task Complexity Distribution:")
    for complexity, count in complexity_stats.items():
        percentage = (count / total_workflows) * 100
        print(f"   {complexity}: {count} tasks ({percentage:.1f}%)")

    # Performance by workflow
    print(f"\n⏱️ Individual Workflow Performance:")
    for i, workflow in enumerate(history, 1):
        status = workflow["current_step"]
        duration = workflow["metadata"].get("total_time", 0)
        iterations = workflow["metadata"].get("iterations", 0)
        files_generated = len(workflow["generated_files"])

        status_emoji = "✅" if status == "completed" else "❌"
        print(
            f"   {i}. {status_emoji} {duration:.2f}s, {iterations} iterations, {files_generated} files"
        )

    # Error analysis
    error_count = sum(len(w["errors"]) for w in history)
    if error_count > 0:
        print(f"\n⚠️ Error Analysis:")
        print(f"   Total errors: {error_count}")

        all_errors = []
        for workflow in history:
            all_errors.extend(workflow["errors"])

        for error in all_errors[:5]:  # Show first 5 errors
            print(f"   • {error}")

    # Show the latest workflow in detail
    if history:
        latest = history[-1]
        print(f"\n🔍 Latest Workflow Details:")
        print(f"   Task: {latest['task_description']}")
        print(f"   Status: {latest['current_step']}")
        print(f"   Agents: {' → '.join(latest['completed_agents'])}")

        if latest["generated_files"]:
            print(f"   Generated files:")
            for file_info in latest["generated_files"]:
                name = file_info.get("name", "unknown")
                size = len(file_info.get("content", ""))
                print(f"     - {name} ({size} chars)")

else:
    print("📈 No workflow history available for analysis")
    print("💡 With API access, this would show:")
    print("   • Success rates and performance metrics")
    print("   • Agent utilization patterns")
    print("   • Task complexity distributions")
    print("   • Error analysis and troubleshooting")
    print("   • Performance optimization opportunities")

📈 Workflow Analytics Dashboard
📊 Overall Performance:
   Total workflows executed: 2
   Success rate: 2/2 (100.0%)
   Average duration: 30.19s
   Average iterations: 6.0

🤖 Agent Utilization:
   task_classifier: 2 times (100.0%)
   code_generator: 4 times (200.0%)
   code_reviewer: 4 times (200.0%)
   file_manager: 2 times (100.0%)

📋 Task Complexity Distribution:
   simple: 1 tasks (50.0%)
   complex: 1 tasks (50.0%)

⏱️ Individual Workflow Performance:
   1. ✅ 23.94s, 6 iterations, 2 files
   2. ✅ 36.45s, 6 iterations, 2 files

🔍 Latest Workflow Details:
   Task: Create a Gradio app to use diffusers package for text to image generation and use accelerate library to speed up the process
   Status: completed
   Agents: task_classifier → code_generator → code_reviewer → code_generator → code_reviewer → file_manager
   Generated files:
     - app.py (1628 chars)
     - app.py (1734 chars)


## 🔧 Advanced Orchestration Patterns

Let's explore some advanced patterns that are possible with LangGraph-style orchestration.

In [23]:
# Advanced orchestration patterns and concepts
class AdvancedOrchestrationPatterns:
    """Demonstrates advanced LangGraph orchestration patterns."""

    @staticmethod
    def parallel_execution_pattern():
        """Pattern for executing multiple agents in parallel."""
        return {
            "concept": "Parallel Agent Execution",
            "description": "Execute multiple agents simultaneously for independent tasks",
            "example": {
                "task": "Generate documentation while running tests",
                "agents": ["doc_generator", "test_runner"],
                "execution": "concurrent",
                "coordination": "wait_for_all",
            },
            "benefits": [
                "Faster execution",
                "Resource utilization",
                "Independent workflows",
            ],
        }

    @staticmethod
    def human_in_the_loop_pattern():
        """Pattern for incorporating human feedback in workflows."""
        return {
            "concept": "Human-in-the-Loop",
            "description": "Pause workflow for human review and approval",
            "example": {
                "task": "Generate code → Human review → Apply changes",
                "breakpoints": ["after_generation", "before_deployment"],
                "human_actions": ["approve", "reject", "modify", "request_changes"],
            },
            "benefits": ["Quality control", "Safety", "Learning feedback"],
        }

    @staticmethod
    def retry_and_recovery_pattern():
        """Pattern for handling failures and retries."""
        return {
            "concept": "Retry and Recovery",
            "description": "Automatic retry with different strategies on failure",
            "example": {
                "strategies": [
                    "exponential_backoff",
                    "different_model",
                    "simplified_prompt",
                    "fallback_agent",
                ],
                "max_attempts": 3,
                "circuit_breaker": True,
            },
            "benefits": ["Reliability", "Fault tolerance", "Graceful degradation"],
        }

    @staticmethod
    def dynamic_routing_pattern():
        """Pattern for dynamic agent selection based on runtime conditions."""
        return {
            "concept": "Dynamic Agent Routing",
            "description": "Select agents based on task complexity, load, or specialization",
            "example": {
                "routing_logic": {
                    "simple_tasks": "fast_agent",
                    "complex_tasks": "expert_agent",
                    "high_load": "distributed_agents",
                },
                "load_balancing": True,
                "specialization": ["python", "javascript", "data_science"],
            },
            "benefits": ["Optimization", "Scalability", "Specialization"],
        }

    @staticmethod
    def state_checkpointing_pattern():
        """Pattern for saving and resuming workflow state."""
        return {
            "concept": "State Checkpointing",
            "description": "Save workflow state at key points for resume capability",
            "example": {
                "checkpoints": [
                    "after_classification",
                    "after_generation",
                    "after_review",
                ],
                "storage": "persistent_store",
                "resume_capability": True,
                "rollback_support": True,
            },
            "benefits": ["Resume interrupted workflows", "Debugging", "Audit trails"],
        }


# Demonstrate advanced patterns
patterns = AdvancedOrchestrationPatterns()

print("🔧 Advanced LangGraph Orchestration Patterns")
print("=" * 55)

advanced_patterns = [
    patterns.parallel_execution_pattern(),
    patterns.human_in_the_loop_pattern(),
    patterns.retry_and_recovery_pattern(),
    patterns.dynamic_routing_pattern(),
    patterns.state_checkpointing_pattern(),
]

for i, pattern in enumerate(advanced_patterns, 1):
    print(f"\n{i}. 🎯 {pattern['concept']}")
    print(f"   📝 {pattern['description']}")

    if "example" in pattern:
        print(f"   💡 Example: {pattern['example']}")

    print(f"   ✅ Benefits: {', '.join(pattern['benefits'])}")

print("\n🏗️ Implementation Considerations:")
considerations = [
    "State persistence and serialization",
    "Error handling and recovery strategies",
    "Performance monitoring and optimization",
    "Security and access control",
    "Scalability and resource management",
    "Debugging and observability tools",
]

for consideration in considerations:
    print(f"   • {consideration}")

print("\n🚀 Real-world Applications:")
applications = [
    "Automated code review and deployment pipelines",
    "Multi-stage data processing workflows",
    "Customer support with escalation paths",
    "Content generation and review systems",
    "Scientific experiment orchestration",
    "Business process automation",
]

for app in applications:
    print(f"   • {app}")

🔧 Advanced LangGraph Orchestration Patterns

1. 🎯 Parallel Agent Execution
   📝 Execute multiple agents simultaneously for independent tasks
   💡 Example: {'task': 'Generate documentation while running tests', 'agents': ['doc_generator', 'test_runner'], 'execution': 'concurrent', 'coordination': 'wait_for_all'}
   ✅ Benefits: Faster execution, Resource utilization, Independent workflows

2. 🎯 Human-in-the-Loop
   📝 Pause workflow for human review and approval
   💡 Example: {'task': 'Generate code → Human review → Apply changes', 'breakpoints': ['after_generation', 'before_deployment'], 'human_actions': ['approve', 'reject', 'modify', 'request_changes']}
   ✅ Benefits: Quality control, Safety, Learning feedback

3. 🎯 Retry and Recovery
   📝 Automatic retry with different strategies on failure
   💡 Example: {'strategies': ['exponential_backoff', 'different_model', 'simplified_prompt', 'fallback_agent'], 'max_attempts': 3, 'circuit_breaker': True}
   ✅ Benefits: Reliability, Fault toleran

## 🧹 Cleanup and Tutorial Summary

Let's summarize what we've learned about LangGraph orchestration and multi-agent systems.

In [None]:
# Final summary and cleanup
if orchestrator:
    final_stats = {
        "total_workflows": len(orchestrator.workflow_history),
        "agents_available": len(orchestrator.agents),
        "patterns_demonstrated": 5,
        "tutorial_completed": True,
    }

    print("📋 LangGraph Orchestration Tutorial Summary")
    print("=" * 50)
    print(f"Workflows executed: {final_stats['total_workflows']}")
    print(f"Agents implemented: {final_stats['agents_available']}")
    print(f"Patterns covered: {final_stats['patterns_demonstrated']}")
    print(f"Tutorial completed: {final_stats['tutorial_completed']}")
    print("=" * 50)

    print("\n🎓 Key Concepts Mastered:")
    concepts = [
        "Graph-based workflow orchestration",
        "Stateful multi-agent coordination",
        "Conditional execution and routing",
        "Error handling and recovery",
        "Performance monitoring and analytics",
        "Advanced orchestration patterns",
    ]

    for concept in concepts:
        print(f"   ✅ {concept}")

    print("\n🛠️ Technical Skills Gained:")
    skills = [
        "Building custom orchestration systems",
        "Implementing agent communication protocols",
        "Managing complex workflow state",
        "Designing resilient multi-agent systems",
        "Performance optimization techniques",
        "Advanced error handling strategies",
    ]

    for skill in skills:
        print(f"   🔧 {skill}")

    print("\n🎯 Best Practices Learned:")
    practices = [
        "Use typed state for reliable data flow",
        "Implement comprehensive error handling",
        "Design for observability and debugging",
        "Plan for scalability from the start",
        "Consider human-in-the-loop workflows",
        "Monitor and optimize performance continuously",
    ]

    for practice in practices:
        print(f"   💡 {practice}")

else:
    print("📋 Tutorial Summary (Demo Mode)")
    print("=" * 35)
    print("✅ Learned LangGraph orchestration concepts")
    print("✅ Understood multi-agent coordination")
    print("✅ Explored advanced workflow patterns")
    print("✅ Built foundation for complex AI systems")

print("\n🚀 Next Steps and Advanced Topics:")
next_steps = [
    "Implement production LangGraph with real agents",
    "Add persistence and state management",
    "Build monitoring and observability tools",
    "Explore parallel and distributed execution",
    "Integrate with external services and APIs",
    "Develop domain-specific agent workflows",
]

for step in next_steps:
    print(f"   📈 {step}")

print("\n🎉 LangGraph Orchestration Tutorial completed successfully!")
print("🕸️ You now understand advanced multi-agent orchestration!")
print(
    "\n💡 Continue exploring with the full Orion system to see these concepts in production!"
)

## 🎓 Tutorial Conclusion

Congratulations! You've completed the **LangGraph Orchestration Tutorial**. Here's what you've mastered:

### ✅ Core LangGraph Concepts

1. **🕸️ Graph-Based Workflows**: Built stateful, multi-step workflows with conditional execution
2. **🤖 Multi-Agent Coordination**: Coordinated specialized agents for complex tasks
3. **📊 State Management**: Maintained shared state across agent interactions
4. **🔀 Conditional Routing**: Implemented dynamic agent selection based on runtime conditions
5. **⚠️ Error Handling**: Built robust error handling and recovery mechanisms
6. **📈 Performance Analytics**: Tracked and analyzed workflow performance

### 🛠️ Advanced Orchestration Patterns

- **Parallel Execution**: Run multiple agents simultaneously
- **Human-in-the-Loop**: Incorporate human oversight and feedback
- **Retry and Recovery**: Handle failures gracefully with multiple strategies
- **Dynamic Routing**: Select optimal agents based on context
- **State Checkpointing**: Save and resume workflow state

### 🎯 Production-Ready Concepts

- **Scalability**: Design systems that grow with demand
- **Observability**: Monitor and debug complex workflows
- **Reliability**: Build fault-tolerant multi-agent systems
- **Performance**: Optimize execution time and resource usage
- **Security**: Implement proper access control and validation

### 🚀 Real-World Applications

You're now prepared to build:
- **Automated Development Pipelines**: Code generation → review → testing → deployment
- **Content Creation Systems**: Research → writing → editing → publishing
- **Customer Support Workflows**: Classification → routing → resolution → follow-up
- **Data Processing Pipelines**: Ingestion → validation → transformation → analysis
- **Business Process Automation**: Multi-step approval and execution workflows

### 🔗 Integration with Orion

This tutorial provides the foundation for understanding how the **Orion AI Agent System** orchestrates its specialized agents:
- **GitHubIntegrationAgent**: Handles repository operations
- **AIGeneratorAgent**: Creates and modifies code
- **CodeTesterAgent**: Validates functionality
- **EnvironmentManagerAgent**: Manages dependencies
- **TaskClassifierAgent**: Routes tasks to appropriate agents

---

*🎊 You've now completed all three core Orion tutorials! You understand GitHub automation, AI code generation, and advanced orchestration. You're ready to build sophisticated AI-powered development workflows!*