# Lab 07: Workflow Agent - From Simple to Complex

Think of workflows like a restaurant kitchen during dinner rush. The head chef doesn't cook every dish alone - they coordinate specialized stations (salad prep, grill, desserts) that work in parallel and sequence to deliver complete meals efficiently.

In this notebook, you'll learn to build workflows that coordinate specialized AI agents to tackle complex tasks that would overwhelm a single agent.

**What You'll Build:**
1. Simple sequential workflow (manual approach)
2. Parallel processing workflow (using workflow tool)
3. Complex branching workflow with dependencies

## Setup

First, let's apply a workaround for using strands-agents-tools in code-server:

In [None]:
# Workaround for strands-agents-tools in code-server
import logging
import sys

# Patch rich Console before importing strands tools library
from rich.console import Console
_orig_init = Console.__init__
def _patched_init(self, *args, **kwargs):
    kwargs['force_terminal'] = True
    kwargs['force_jupyter'] = False
    _orig_init(self, *args, **kwargs)
Console.__init__ = _patched_init

logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s', stream=sys.stderr, force=True)

In [None]:
# Additional setup and imports
import warnings
warnings.filterwarnings(action="ignore", message=r"datetime.datetime.utcnow") 

sys.path.append('../..')
from util.strands_retry import call_with_retry
from strands import Agent
from strands_tools import workflow
import time

# Test the setup
agent = Agent(
    model="us.anthropic.claude-sonnet-4-20250514-v1:0",
    system_prompt="You are a helpful assistant that provides concise responses.",
    callback_handler=None
)

response = call_with_retry(lambda: agent("Hello! Tell me a joke."))
print(f"Setup test:\n{response}")

## Part 1: Simple Sequential Workflow (Manual)

Let's start with a real scenario: **analyzing a new product idea**. We'll break this into steps:
1. Research the market
2. Analyze the findings
3. Write a final report

This is like an assembly line - each step builds on the previous one.

In [None]:
# Create our specialized team
researcher = Agent(
    system_prompt="You research market trends, competitors, and opportunities. Keep responses concise.",
    callback_handler=None
)

analyst = Agent(
    system_prompt="You analyze research data and identify key insights and patterns. Keep responses concise.",
    callback_handler=None
)

writer = Agent(
    system_prompt="You write clear, actionable business reports based on analysis. Keep responses concise."
)

def analyze_product_idea(product_idea):
    print(f"üîç Starting analysis for: {product_idea}\n")
    
    # Step 1: Research
    print("üìä Step 1: Market Research")
    research = call_with_retry(lambda: researcher(f"Research the market for {product_idea}. Focus on market size, competitors, and trends."))
    print(f"Research complete: {str(research)[:100]}...\n")
    
    # Step 2: Analysis
    print("üß† Step 2: Data Analysis")
    analysis = call_with_retry(lambda: analyst(f"Analyze this research and identify opportunities: {research}"))
    print(f"Analysis complete: {str(analysis)[:100]}...\n")
    
    # Step 3: Report
    print("üìù Step 3: Report Writing")
    report = call_with_retry(lambda: writer(f"Create a business report based on this analysis: {analysis}"))
    
    return report

# Test it out
result = analyze_product_idea("AI-powered fitness app")
print("\n" + "="*50)
print("FINAL REPORT:")
print("="*50)
print(result)

## Part 2: Parallel Processing with Workflow Tool

The manual approach works, but what if we want **parallel processing**? 

Imagine you're planning a software project. You need:
- Market research (can run independently)
- Technical feasibility study (can run independently) 
- Final recommendation (needs both above to complete)

This is where the workflow tool shines!

In [None]:
# Create an orchestrator agent
orchestrator = Agent(tools=[workflow])

# Define our parallel workflow
print("üèóÔ∏è Creating parallel workflow...")
call_with_retry(lambda:
    orchestrator.tool.workflow(action="create",
    workflow_id="saas_analysis", 
    tasks=[
        {
            "task_id": "market_research",
            "description": "Research the market for AI-powered project management tools",
            "system_prompt": "You are a market researcher. Focus on market size, competitors, and user needs.",
            "priority": 5
        },
        {
            "task_id": "tech_feasibility",
            "description": "Analyze technical requirements for building an AI project management tool", 
            "system_prompt": "You are a senior software architect. Focus on technical challenges and solutions.",
            "priority": 5  # Same priority = can run in parallel
        },
        {
            "task_id": "final_recommendation",
            "description": "Create final recommendation based on market and technical analysis",
            "dependencies": ["market_research", "tech_feasibility"],  # Wait for both
            "system_prompt": "You are a product strategist. Combine insights to make recommendations.",
            "priority": 3
        }
    ]
))

print("‚úÖ Workflow created!")

# Start execution
print("\nüöÄ Starting workflow execution...")
start_time = time.time()
call_with_retry(lambda: orchestrator.tool.workflow(action="start", workflow_id="saas_analysis"))


# Check status
status = call_with_retry(lambda: orchestrator.tool.workflow(action="status", workflow_id="saas_analysis"))
end_time = time.time()

print(f"\n‚è±Ô∏è Execution completed in {end_time - start_time:.2f} seconds")
print("\n" + "="*50)
print("WORKFLOW STATUS:")
print("="*50)
print(status)

# Clean up
call_with_retry(lambda: orchestrator.tool.workflow(action="delete", workflow_id="saas_analysis"))

## Part 3: Real-World Example - Code Review Pipeline

Let's build something every developer faces: **automated code reviews**.

Our pipeline:
1. **Security scan** (runs immediately)
2. **Performance check** (runs immediately, parallel to security)
3. **Style review** (runs immediately, parallel to both above)
4. **Final summary** (waits for all three to complete)

This shows the power of parallel processing with a final aggregation step.

In [None]:
# Create code review workflow
code_reviewer = Agent(tools=[workflow])

print("üîç Setting up code review pipeline...")
call_with_retry(lambda: code_reviewer.tool.workflow(
    code_reviewer,
    action="create",
    workflow_id="code_review",
    tasks=[
        {
            "task_id": "security_scan",
            "description": "Scan Python code for security vulnerabilities like SQL injection, XSS, etc.",
            "system_prompt": "You are a security expert. Look for vulnerabilities and provide specific recommendations.",
            "priority": 5
        },
        {
            "task_id": "performance_check",
            "description": "Check Python code for performance issues like inefficient loops, memory leaks, etc.", 
            "system_prompt": "You are a performance expert. Identify bottlenecks and suggest optimizations.",
            "priority": 5  # Runs parallel with security
        },
        {
            "task_id": "style_review",
            "description": "Review Python code style, naming conventions, and best practices",
            "system_prompt": "You enforce PEP 8 and Python best practices. Focus on readability and maintainability.",
            "priority": 5  # Runs parallel with both above
        },
        {
            "task_id": "final_summary",
            "description": "Combine all reviews into prioritized, actionable feedback for developers",
            "dependencies": ["security_scan", "performance_check", "style_review"],
            "system_prompt": "You create clear, prioritized feedback. Focus on what developers should fix first.",
            "priority": 1
        }
    ]
))

print("‚úÖ Code review pipeline ready!")

# Simulate code review
sample_code = """
def get_user_data(user_id):
    query = "SELECT * FROM users WHERE id = " + str(user_id)
    result = db.execute(query)
    users = []
    for row in result:
        users.append(row)
    return users
"""

print(f"\nüìù Reviewing this code:\n{sample_code}")
print("\nüîÑ Running parallel code review...")

start_time = time.time()
call_with_retry(lambda: code_reviewer.tool.workflow(action="start", workflow_id="code_review"))

# Get results
call_with_retry(lambda: code_reviewer.tool.workflow(code_reviewer, action="status", workflow_id="code_review"))
end_time = time.time()

print(f"\n‚è±Ô∏è Review completed in {end_time - start_time:.2f} seconds")
print("\n" + "="*50)
print("CODE REVIEW RESULTS:")
print("="*50)
print(status)

# Clean up
call_with_retry(lambda: code_reviewer.tool.workflow(action="delete", workflow_id="code_review"))

## Part 4: Advanced - Blog Content Pipeline

Let's build a complete **blog content creation pipeline** that shows complex dependencies:

1. **Outline creation** (starts first)
2. **SEO research** (can run parallel with outline)
3. **Draft writing** (needs outline to complete)
4. **SEO optimization** (needs both draft AND SEO research)
5. **Final editing** (needs optimized content)

This shows how workflows handle complex dependency chains.

In [None]:
# Create blog content pipeline
content_creator = Agent(tools=[workflow])

print("üìù Setting up blog content pipeline...")
call_with_retry(lambda: content_creator.tool.workflow(
    action="create",
    workflow_id="blog_pipeline",
    tasks=[
        {
            "task_id": "outline",
            "description": "Create a detailed outline for a blog post about AI in healthcare",
            "system_prompt": "You create structured, engaging blog outlines with clear sections and key points.",
            "priority": 5
        },
        {
            "task_id": "seo_research", 
            "description": "Research SEO keywords and optimization strategies for AI healthcare content",
            "system_prompt": "You are an SEO expert. Find relevant keywords and optimization opportunities.",
            "priority": 5  # Can run parallel with outline
        },
        {
            "task_id": "draft_writing",
            "description": "Write a compelling blog post draft based on the outline",
            "dependencies": ["outline"],  # Needs outline first
            "system_prompt": "You write engaging, informative blog content that connects with readers.",
            "priority": 4
        },
        {
            "task_id": "seo_optimization",
            "description": "Optimize the draft for SEO using the research findings",
            "dependencies": ["draft_writing", "seo_research"],  # Needs both!
            "system_prompt": "You optimize content for search engines while maintaining readability.",
            "priority": 3
        },
        {
            "task_id": "final_editing",
            "description": "Final editing and polishing of the optimized content",
            "dependencies": ["seo_optimization"],
            "system_prompt": "You are a professional editor. Polish content for clarity and impact.",
            "priority": 2
        }
    ]
))

print("‚úÖ Blog pipeline ready!")
print("\nüìä Dependency chain:")
print("   outline ‚îÄ‚îÄ‚îÄ‚îÄ‚îê")
print("               ‚îú‚îÄ‚Üí draft_writing ‚îÄ‚îÄ‚îÄ‚îÄ‚îê")
print("   seo_research ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚Üí seo_optimization ‚îÄ‚Üí final_editing")
print("                                    ‚îÇ")
print("                                    ‚îî‚îÄ‚Üí (waits for both)")

print("\nüöÄ Starting blog creation...")
start_time = time.time()
call_with_retry(lambda: content_creator.tool.workflow(action="start", workflow_id="blog_pipeline"))

# Get final results
status = call_with_retry(lambda: content_creator.tool.workflow(action="status", workflow_id="blog_pipeline"))
end_time = time.time()

print(f"\n‚è±Ô∏è Blog creation completed in {end_time - start_time:.2f} seconds")
print("\n" + "="*50)
print("BLOG CREATION RESULTS:")
print("="*50)
print(status)

# Clean up
call_with_retry(lambda: content_creator.tool.workflow(action="delete", workflow_id="blog_pipeline"))

## Key Takeaways

üéØ **When to use each approach:**
- **Manual sequential**: Simple 2-3 step processes where you need full control
- **Workflow tool**: Complex processes with parallel tasks and dependencies

‚ö° **Performance benefits:**
- Parallel processing can significantly reduce total execution time
- Dependencies ensure tasks run in the right order
- Priorities control which ready tasks run first

üèóÔ∏è **Design patterns you learned:**
1. **Sequential pipeline**: A ‚Üí B ‚Üí C (like our product analysis)
2. **Fan-out, fan-in**: A ‚Üí [B,C,D] ‚Üí E (like code review)
3. **Complex dependencies**: Mixed parallel and sequential (like blog pipeline)

üöÄ **Next steps:**
- Try building workflows for your own use cases
- Experiment with different dependency patterns
- Monitor execution times to optimize performance

Remember: Workflows are like project management for AI agents. Start simple, then add complexity as needed!