# Getting Started with Orchestrator Framework

This tutorial introduces the core concepts of the Orchestrator framework for building AI/LLM workflows.

## What is Orchestrator?

Orchestrator is a powerful Python framework for creating, managing, and executing complex AI workflows. It provides:

- **Task Management**: Define and organize individual work units
- **Pipeline Orchestration**: Create dependency graphs between tasks
- **Model Integration**: Work with multiple AI models (OpenAI, Anthropic, local models)
- **State Management**: Checkpoint and resume workflows
- **YAML Configuration**: Define workflows declaratively

## Installation

First, make sure you have the orchestrator package installed:

In [None]:
# If running from the repository root
import sys
sys.path.insert(0, '../src')

# Import core components
import os
from orchestrator.core.task import Task
from orchestrator.core.pipeline import Pipeline
from orchestrator.models.openai_model import OpenAIModel
from orchestrator.utils.api_keys import load_api_keys
from orchestrator.orchestrator import Orchestrator
from orchestrator.state.state_manager import InMemoryStateManager

print("✅ Orchestrator imported successfully!")

## Core Concepts

### 1. Tasks

Tasks are the fundamental building blocks of workflows. Each task represents a single operation.

In [None]:
# Create a simple task
task = Task(
    id="hello_world",
    name="Hello World Task",
    action="generate",
    parameters={
        "prompt": "Say hello to the world",
        "max_tokens": 50
    }
)

print(f"Task ID: {task.id}")
print(f"Task Name: {task.name}")
print(f"Task Status: {task.status}")
print(f"Task Parameters: {task.parameters}")

### 2. Models

Models represent AI/LLM backends that can execute tasks. Let's create a real model using OpenAI:

In [None]:
# Load API keys from environment
load_api_keys()

# Create a real OpenAI model
model = OpenAIModel(
    name="gpt-3.5-turbo",
    api_key=os.environ.get("OPENAI_API_KEY"),  # Loaded from environment
)

print(f"Model: {model.name}")
print(f"Provider: OpenAI")
print(f"Ready for real AI interactions!")

# Note: Make sure you have set your OPENAI_API_KEY environment variable
# You can also use AnthropicModel with ANTHROPIC_API_KEY if preferred

### 3. Pipelines

Pipelines organize tasks and define their execution order through dependencies:

In [None]:
# Create a simple pipeline
pipeline = Pipeline(
    id="demo_pipeline",
    name="Demo Pipeline",
    description="A simple demonstration pipeline"
)

# Add our task to the pipeline
pipeline.add_task(task)

print(f"Pipeline: {pipeline.name}")
print(f"Number of tasks: {len(pipeline)}")
print(f"Tasks: {list(pipeline)}")

### 4. Orchestrator

The Orchestrator coordinates execution of pipelines using models:

In [None]:
# Create an orchestrator with in-memory state management
state_manager = InMemoryStateManager()
orchestrator = Orchestrator(state_manager=state_manager)

# Register our model
orchestrator.register_model(model)

print(f"Orchestrator created with {len(orchestrator.models)} model(s)")
print(f"Available models: {list(orchestrator.models.keys())}")

## Running Your First Pipeline

Now let's execute our pipeline:

In [None]:

async def run_pipeline():
    """Execute the demo pipeline."""
    result = await orchestrator.execute_pipeline(pipeline)
    return result

# Run the pipeline
result = await run_pipeline()

print(f"Pipeline execution result: {result}")
print(f"Task status: {task.status}")
print(f"Task result: {task.result}")

## Building a Multi-Task Pipeline

Let's create a more complex pipeline with multiple tasks and dependencies:

In [None]:
# Create a complex pipeline
complex_pipeline = Pipeline(
    id="analysis_pipeline",
    name="Text Analysis Pipeline",
    description="Analyze text through multiple steps"
)

# Task 1: Generate content
generate_task = Task(
    id="generate_content",
    name="Generate Content",
    action="generate",
    parameters={
        "prompt": "Write a short story about a robot learning to paint",
        "max_tokens": 200
    }
)

# Task 2: Analyze sentiment (depends on task 1)
sentiment_task = Task(
    id="analyze_sentiment",
    name="Analyze Sentiment",
    action="analyze",
    parameters={
        "prompt": "Analyze the sentiment of this text: {generate_content.result}",
        "analysis_type": "sentiment"
    },
    dependencies=["generate_content"]
)

# Task 3: Extract themes (depends on task 1)
theme_task = Task(
    id="extract_themes",
    name="Extract Themes",
    action="analyze",
    parameters={
        "prompt": "Extract the main themes from this text: {generate_content.result}",
        "analysis_type": "themes"
    },
    dependencies=["generate_content"]
)

# Task 4: Summarize analysis (depends on tasks 2 and 3)
summary_task = Task(
    id="summarize_analysis",
    name="Summarize Analysis",
    action="generate",
    parameters={
        "prompt": "Summarize this analysis: Sentiment: {analyze_sentiment.result}, Themes: {extract_themes.result}",
        "max_tokens": 100
    },
    dependencies=["analyze_sentiment", "extract_themes"]
)

# Add tasks to pipeline
for task in [generate_task, sentiment_task, theme_task, summary_task]:
    complex_pipeline.add_task(task)

print(f"Complex pipeline created with {len(complex_pipeline)} tasks")
print(f"Tasks: {list(complex_pipeline)}")

Let's visualize the execution order:

In [None]:
# Get the execution order
execution_order = complex_pipeline.get_execution_order()

print("Execution order (parallel groups):")
for i, level in enumerate(execution_order):
    print(f"  Level {i+1}: {level}")

# Get the critical path
critical_path = complex_pipeline.get_critical_path()
print(f"\nCritical path: {' -> '.join(critical_path)}")

Now let's execute the complex pipeline with real AI responses:

In [None]:
# Real models generate responses dynamically - no need to set up canned responses

Now let's execute the complex pipeline:

In [None]:
async def run_complex_pipeline():
    """Execute the complex analysis pipeline."""
    print("🚀 Starting complex pipeline execution...\n")
    
    # Execute the pipeline
    result = await orchestrator.execute_pipeline(complex_pipeline)
    
    print(f"\n✅ Pipeline execution completed with result: {result}\n")
    
    # Show results for each task
    print("📊 Task Results:")
    print("="*50)
    
    for task_id in execution_order[0] + execution_order[1] + execution_order[2]:
        task = complex_pipeline.get_task(task_id)
        print(f"\n🔸 {task.name} ({task.id})")
        print(f"   Status: {task.status}")
        if task.result:
            # Truncate long results for display
            result_text = task.result[:200] + "..." if len(task.result) > 200 else task.result
            print(f"   Result: {result_text}")
    
    return result

# Execute the complex pipeline
complex_result = await run_complex_pipeline()

## Pipeline Progress Monitoring

Let's check the progress of our pipeline:

In [None]:
# Get pipeline progress
progress = complex_pipeline.get_progress()

print("📈 Pipeline Progress:")
print(f"   Total tasks: {progress['total']}")
print(f"   Completed: {progress['completed']}")
print(f"   Running: {progress['running']}")
print(f"   Pending: {progress['pending']}")
print(f"   Failed: {progress['failed']}")
print(f"   Skipped: {progress['skipped']}")

print(f"\n✅ Pipeline complete: {complex_pipeline.is_complete()}")
print(f"❌ Pipeline failed: {complex_pipeline.is_failed()}")

## State Management and Checkpointing

The Orchestrator framework supports state management for long-running workflows:

In [None]:
# Save a checkpoint
checkpoint_id = "complex_pipeline_checkpoint"
execution_id = "demo_execution_001"

checkpoint_data = {
    "pipeline_id": complex_pipeline.id,
    "completed_tasks": complex_pipeline.get_completed_tasks(),
    "progress": complex_pipeline.get_progress(),
    "metadata": {
        "execution_time": "2024-01-01T12:00:00Z",
        "user": "demo_user"
    }
}

# Save checkpoint
success = await state_manager.save_checkpoint(
    checkpoint_id=checkpoint_id,
    execution_id=execution_id,
    data=checkpoint_data
)

print(f"✅ Checkpoint saved: {success}")

# List checkpoints
checkpoints = await state_manager.list_checkpoints(execution_id)
print(f"📂 Available checkpoints: {len(checkpoints)}")

# Load checkpoint
loaded_checkpoint = await state_manager.load_checkpoint(checkpoint_id)
if loaded_checkpoint:
    print(f"📥 Loaded checkpoint data: {loaded_checkpoint['data']['progress']}")
else:
    print("❌ Failed to load checkpoint")

## Working with Real Models

In production, you would use real AI models. Here's how to set them up:

In [None]:
# Example of how to use real models (commented out for demo)

# from orchestrator.models.openai_model import OpenAIModel
# from orchestrator.models.anthropic_model import AnthropicModel

# # OpenAI model
# openai_model = OpenAIModel(
#     name="gpt-4",
#     api_key="your-openai-api-key",
#     model="gpt-4"
# )

# # Anthropic model
# anthropic_model = AnthropicModel(
#     name="claude-3-sonnet",
#     api_key="your-anthropic-api-key",
#     model="claude-3-sonnet-20240229"
# )

# # Register with orchestrator
# orchestrator.register_model(openai_model)
# orchestrator.register_model(anthropic_model)

print("💡 In production, configure real models with API keys")
print("💡 Support for OpenAI, Anthropic, Google, and local models")
print("💡 See integration tests for examples of real API usage")

## Pipeline Serialization

Pipelines can be serialized to and from dictionaries for storage or transmission:

In [None]:
# Serialize pipeline to dictionary
pipeline_dict = complex_pipeline.to_dict()

print("📄 Pipeline serialized to dictionary:")
print(f"   ID: {pipeline_dict['id']}")
print(f"   Name: {pipeline_dict['name']}")
print(f"   Tasks: {len(pipeline_dict['tasks'])}")
print(f"   Created: {pipeline_dict['created_at']}")

# Recreate pipeline from dictionary
recreated_pipeline = Pipeline.from_dict(pipeline_dict)

print(f"\n🔄 Pipeline recreated: {recreated_pipeline.name}")
print(f"   Tasks: {len(recreated_pipeline)}")
print(f"   Same ID: {recreated_pipeline.id == complex_pipeline.id}")

## Summary

In this tutorial, you learned:

1. **Core Components**: Tasks, Models, Pipelines, and Orchestrator
2. **Pipeline Creation**: Building workflows with dependencies
3. **Execution**: Running pipelines with mock and real models
4. **Monitoring**: Tracking progress and analyzing results
5. **State Management**: Checkpointing and resuming workflows
6. **Serialization**: Saving and loading pipeline configurations

## Next Steps

- Explore the **YAML Configuration** tutorial for declarative pipeline definitions
- Learn about **Advanced Model Integration** for real AI providers
- Try the **Error Handling and Recovery** tutorial for robust workflows
- Check out **Production Deployment** patterns for scaling

---

**Happy orchestrating! 🎵**