# Xponentially Workflow Testing Notebook

This notebook allows you to test the planner/executor/worker architecture step-by-step.

## Features
- Test individual workers in isolation
- See state changes at each step
- Debug planner and executor decisions
- Visualize execution flow

## Setup: Imports and Configuration

In [1]:
import json
from datetime import datetime
from typing import Dict, Any
from pprint import pprint

# Import our components
from helpers.state import State
from helpers.planner import planner_node
from helpers.executor import executor_node
from workers.todoist_fetcher import todoist_fetcher_node
from workers.task_classifier import task_classifier_node
from workers.research_processor import research_processor_node
from workers.next_action_processor import next_action_processor_node
from workers.markdown_writer import markdown_writer_node

print("‚úì All imports successful!")

‚úì All imports successful!


## Mock Data: Sample Todoist Tasks

In [2]:
# Create sample tasks to avoid API calls during testing
sample_tasks = [
    {
        "id": "task_001",
        "content": "Research quantum computing frameworks",
        "description": "Look into Qiskit, Cirq, and other quantum computing frameworks",
        "labels": ["research", "technology"],
        "priority": 4,
        "due_date": "2025-11-07",
        "project_id": "proj_123"
    },
    {
        "id": "task_002",
        "content": "Buy groceries",
        "description": "Milk, bread, eggs",
        "labels": ["errands"],
        "priority": 2,
        "due_date": "2025-11-07",
        "project_id": "proj_456"
    },
    {
        "id": "task_003",
        "content": "Build mental model of transformer architecture",
        "description": "Understand attention mechanism, positional encoding, and multi-head attention",
        "labels": ["learning", "ai"],
        "priority": 3,
        "due_date": "2025-11-07",
        "project_id": "proj_789"
    }
]

print(f"Created {len(sample_tasks)} sample tasks:")
for task in sample_tasks:
    print(f"  - {task['content']}")

Created 3 sample tasks:
  - Research quantum computing frameworks
  - Buy groceries
  - Build mental model of transformer architecture


## Helper Functions: State Visualization

In [3]:
def print_state_summary(state: Dict[str, Any], title: str = "State"):
    """Print a formatted summary of the current state"""
    print(f"\n{'='*60}")
    print(f"  {title}")
    print(f"{'='*60}")
    
    print(f"\nüìã Tasks: {len(state.get('todoist_tasks', []))} tasks")
    if state.get('todoist_tasks'):
        for i, task in enumerate(state['todoist_tasks'], 1):
            print(f"   {i}. {task['content']}")
    
    print(f"\nüè∑Ô∏è  Classifications: {len(state.get('task_classifications', {}))}")
    if state.get('task_classifications'):
        for task_id, task_type in state['task_classifications'].items():
            print(f"   {task_id}: {task_type}")
    
    print(f"\n‚úÖ Processed Results: {len(state.get('processed_results', {}))}")
    if state.get('processed_results'):
        for task_id, result in state['processed_results'].items():
            preview = result[:80] + "..." if len(result) > 80 else result
            print(f"   {task_id}: {preview}")
    
    print(f"\nüìù Plan Steps: {len(state.get('plan', {}))}")
    if state.get('plan'):
        for step, details in state['plan'].items():
            print(f"   Step {step}: {details['agent']} - {details['action']}")
    
    print(f"\nüìç Current Step: {state.get('current_step', 'N/A')}")
    print(f"üì¨ Messages: {len(state.get('messages', []))}")
    print(f"\n{'='*60}\n")


def print_messages(state: Dict[str, Any], last_n: int = None):
    """Print message history with color coding"""
    messages = state.get('messages', [])
    if last_n:
        messages = messages[-last_n:]
    
    print(f"\n{'='*60}")
    print(f"  Message History ({len(messages)} messages)")
    print(f"{'='*60}\n")
    
    for i, msg in enumerate(messages, 1):
        sender = getattr(msg, 'name', 'unknown')
        content = msg.content[:200] + "..." if len(msg.content) > 200 else msg.content
        print(f"{i}. [{sender}]:")
        print(f"   {content}\n")


def show_state_diff(old_state: Dict, new_state: Dict):
    """Show what changed between two states"""
    print(f"\n{'='*60}")
    print(f"  State Changes")
    print(f"{'='*60}\n")
    
    # Check key fields
    fields_to_check = [
        'todoist_tasks', 'task_classifications', 'processed_results',
        'plan', 'current_step', 'messages', 'current_task_index'
    ]
    
    for field in fields_to_check:
        old_val = old_state.get(field)
        new_val = new_state.get(field)
        
        if old_val != new_val:
            print(f"üîÑ {field}:")
            
            if field == 'messages':
                old_count = len(old_val) if old_val else 0
                new_count = len(new_val) if new_val else 0
                print(f"   {old_count} ‚Üí {new_count} messages")
                if new_count > old_count:
                    new_msgs = new_val[old_count:]
                    for msg in new_msgs:
                        sender = getattr(msg, 'name', 'unknown')
                        content_preview = msg.content[:100]
                        print(f"   + [{sender}]: {content_preview}...")
            
            elif isinstance(old_val, (dict, list)):
                old_len = len(old_val) if old_val else 0
                new_len = len(new_val) if new_val else 0
                print(f"   Length: {old_len} ‚Üí {new_len}")
            
            else:
                print(f"   {old_val} ‚Üí {new_val}")
            
            print()

print("‚úì Helper functions defined!")

‚úì Helper functions defined!


## Test 1: Initial State Setup

In [4]:
# Create initial state with sample tasks
initial_state = {
    "user_query": "Process today's Todoist tasks",
    "messages": [],
    "todoist_tasks": sample_tasks,  # Use mock data
    "task_limit": 3,
    "enabled_agents": [
        "todoist_fetcher",
        "task_classifier",
        "research_processor",
        "next_action_processor",
        "markdown_writer"
    ],
    "current_task_index": 0,
}

print_state_summary(initial_state, "Initial State")


  Initial State

üìã Tasks: 3 tasks
   1. Research quantum computing frameworks
   2. Buy groceries
   3. Build mental model of transformer architecture

üè∑Ô∏è  Classifications: 0

‚úÖ Processed Results: 0

üìù Plan Steps: 0

üìç Current Step: N/A
üì¨ Messages: 0




## Test 2: Planner Node

In [6]:
# Test the planner
print("Running planner...\n")

planner_result = planner_node(initial_state)

# Extract the update from Command
state_after_planner = {**initial_state, **planner_result.update}

print(f"Planner routing to: {planner_result.goto}")
print(f"\nGenerated Plan:")
print(json.dumps(state_after_planner.get('plan'), indent=2))

print_messages(state_after_planner, last_n=1)

Running planner...

Planner routing to: executor

Generated Plan:
{
  "1": {
    "agent": "todoist_fetcher",
    "action": "Fetch today's tasks from Todoist"
  },
  "2": {
    "agent": "task_classifier",
    "action": "Classify each task into type"
  },
  "3": {
    "agent": "research_processor",
    "action": "Process research tasks"
  },
  "4": {
    "agent": "next_action_processor",
    "action": "Process short tasks"
  },
  "5": {
    "agent": "markdown_writer",
    "action": "Generate markdown report from all results"
  }
}

  Message History (1 messages)

1. [planner]:
   Created execution plan:
Step 1: todoist_fetcher - Fetch today's tasks from Todoist
Step 2: task_classifier - Classify each task into type
Step 3: research_processor - Process research tasks
Step 4: ne...



## Test 3: Task Classifier

In [7]:
# Test task classifier with our sample tasks
print("Running task_classifier...\n")

classifier_result = task_classifier_node(state_after_planner)

# Apply updates
state_after_classifier = {**state_after_planner, **classifier_result.update}

print(f"Classifier routing to: {classifier_result.goto}")
print(f"\nTask Classifications:")
pprint(state_after_classifier.get('task_classifications'))

print("\nState changes:")
show_state_diff(state_after_planner, state_after_classifier)

Running task_classifier...

Classifier routing to: executor

Task Classifications:
{'task_001': 'research', 'task_002': 'short', 'task_003': 'learning'}

State changes:

  State Changes

üîÑ task_classifications:
   None ‚Üí {'task_001': 'research', 'task_002': 'short', 'task_003': 'learning'}

üîÑ messages:
   1 ‚Üí 1 messages



## Test 4: Executor Node - First Decision

In [8]:
# Test executor decision making
print("Running executor (first decision)...\n")

# Set current step to 1
state_for_executor = {**state_after_classifier, 'current_step': 1}

executor_result = executor_node(state_for_executor)

state_after_executor = {**state_for_executor, **executor_result.update}

print(f"Executor decision: Route to '{executor_result.goto}'")
print(f"Agent query: {state_after_executor.get('agent_query', 'N/A')}")
print(f"Next step: {state_after_executor.get('current_step')}")

print_messages(state_after_executor, last_n=2)

Running executor (first decision)...

Executor decision: Route to 'todoist_fetcher'
Agent query: Fetch today's tasks from Todoist API
Next step: 2

  Message History (1 messages)

1. [executor]:
   Step 1: Routing to todoist_fetcher. Reason: The current plan step is to fetch tasks, which hasn't been executed yet.



## Test 5: Next Action Processor

In [None]:
# Test next action processor with a short task
print("Running next_action_processor...\n")

# Set up state for processor
state_for_processor = {
    **state_after_classifier,
    'current_task_index': 1,  # "Buy groceries" task
    'agent_query': 'Process the short task'
}

processor_result = next_action_processor_node(state_for_processor)

state_after_processor = {**state_for_processor, **processor_result.update}

print(f"Processor routing to: {processor_result.goto}")
print(f"\nProcessed Results:")
pprint(state_after_processor.get('processed_results'))

print("\nLatest message:")
print_messages(state_after_processor, last_n=1)

## Test 6: Research Processor

In [None]:
# Test research processor with a research task
print("Running research_processor...\n")

state_for_research = {
    **state_after_processor,
    'current_task_index': 0,  # "Research quantum computing" task
    'agent_query': 'Process the research task'
}

research_result = research_processor_node(state_for_research)

state_after_research = {**state_for_research, **research_result.update}

print(f"Research processor routing to: {research_result.goto}")
print(f"\nProcessed Results:")
for task_id, result in state_after_research.get('processed_results', {}).items():
    print(f"\n{task_id}:")
    print(f"  {result[:200]}...")

print("\nLatest message:")
print_messages(state_after_research, last_n=1)

## Test 7: Markdown Writer

In [None]:
# Test markdown writer
print("Running markdown_writer...\n")

markdown_result = markdown_writer_node(state_after_research)

state_after_markdown = {**state_after_research, **markdown_result.update}

print(f"Markdown writer routing to: {markdown_result.goto}")

# Find the markdown content in messages
messages = state_after_markdown.get('messages', [])
for msg in reversed(messages):
    if hasattr(msg, 'name') and msg.name == 'markdown_writer':
        print("\nGenerated Markdown Report:")
        print("=" * 60)
        print(msg.content)
        break

## Test 8: Complete State Summary

In [None]:
# Show final state
print_state_summary(state_after_markdown, "Final State After All Workers")

print("\n" + "=" * 60)
print("  All Messages in Order")
print("=" * 60)
print_messages(state_after_markdown)

## Test 9: Execution Flow Visualization

In [None]:
def visualize_execution_flow(state: Dict[str, Any]):
    """Visualize which workers were executed in order"""
    messages = state.get('messages', [])
    
    print("\n" + "=" * 60)
    print("  Execution Flow")
    print("=" * 60 + "\n")
    
    workers = []
    for msg in messages:
        if hasattr(msg, 'name'):
            workers.append(msg.name)
    
    # Create flow diagram
    flow = " ‚Üí ".join(workers)
    print(flow)
    
    # Count worker invocations
    print("\n" + "=" * 60)
    print("  Worker Invocation Count")
    print("=" * 60 + "\n")
    
    from collections import Counter
    counts = Counter(workers)
    
    for worker, count in counts.items():
        bar = "‚ñà" * count
        print(f"{worker:25} {bar} ({count})")

# Visualize the flow
visualize_execution_flow(state_after_markdown)

## Test 10: Step-by-Step Executor Loop

This cell simulates how the executor makes decisions through multiple steps

In [None]:
def run_executor_steps(initial_state: Dict, max_steps: int = 5):
    """Run executor for multiple steps and show decisions"""
    current_state = initial_state.copy()
    
    print("\n" + "=" * 60)
    print("  Executor Decision Loop")
    print("=" * 60 + "\n")
    
    for i in range(max_steps):
        print(f"\n--- Step {i+1} ---")
        print(f"Current plan step: {current_state.get('current_step', 'N/A')}")
        
        try:
            result = executor_node(current_state)
            
            print(f"Decision: Route to '{result.goto}'")
            print(f"Next step: {result.update.get('current_step', 'N/A')}")
            
            # Check if we're done
            if result.goto == "__end__" or result.goto == "END":
                print("\n‚úì Workflow complete!")
                break
            
            # Apply updates
            current_state = {**current_state, **result.update}
            
        except Exception as e:
            print(f"Error: {e}")
            break
    
    return current_state

# Run the executor loop
# Note: This requires a valid plan in the state
if state_after_planner.get('plan'):
    final_state = run_executor_steps(state_after_planner, max_steps=10)
else:
    print("Need to run planner first to get a plan!")

## Debugging: Inspect Any State Field

In [None]:
# Use this cell to inspect any field in detail
# Uncomment and modify as needed:

# print("Plan:")
# pprint(state_after_planner.get('plan'))

# print("\nClassifications:")
# pprint(state_after_classifier.get('task_classifications'))

# print("\nProcessed Results:")
# pprint(state_after_research.get('processed_results'))

# print("\nAll Tasks:")
# pprint(sample_tasks)