# Strands Swarm Testing and Development

This notebook demonstrates and tests the EdgeMind MEC orchestration system using Strands agents.

## Overview

- **5 Specialized Agents**: OrchestratorAgent, LoadBalancerAgent, DecisionCoordinatorAgent, ResourceMonitorAgent, CacheManagerAgent
- **Swarm Coordination**: Real Strands multi-agent consensus for MEC site selection
- **Threshold Integration**: ThresholdMonitor → SwarmCoordinator → Strands Swarm
- **Performance Target**: Sub-100ms orchestration decisions

## Prerequisites

```bash
pip install 'strands-agents[openai]' strands-agents-tools
```

Set your OpenAI API key:

```bash
export OPENAI_API_KEY="your-api-key-here"
```


## 1. Setup and Imports


In [None]:
import os
import sys
import asyncio
import time
import json
from datetime import UTC, datetime
from typing import Dict, Any, List

# Add project root to path
sys.path.append("..")

# Check if OpenAI API key is set
if not os.getenv("OPENAI_API_KEY"):
    print(
        "⚠️  WARNING: OPENAI_API_KEY not set. Please set it before running Strands agents."
    )
    print("   export OPENAI_API_KEY='your-api-key-here'")
else:
    print("✅ OpenAI API key is configured")

In [None]:
# Import Strands framework
try:
    from strands import Agent
    from strands.multiagent import Swarm

    print("✅ Strands framework imported successfully")
except ImportError as e:
    print(f"❌ Failed to import Strands: {e}")
    print(
        "   Please install: pip install 'strands-agents[openai]' strands-agents-tools"
    )
    sys.exit(1)

In [None]:
# Import our MEC orchestration components
try:
    from config import ThresholdConfig
    from src.data.metrics_generator import MECMetrics
    from src.orchestrator.threshold_monitor import (
        ThresholdMonitor,
        ThresholdEvent,
        SeverityLevel,
        EventType,
    )
    from src.agents.orchestrator_agent import OrchestratorAgent
    from src.agents.load_balancer_agent import LoadBalancerAgent
    from src.agents.decision_coordinator_agent import DecisionCoordinatorAgent
    from src.agents.resource_monitor_agent import ResourceMonitorAgent
    from src.agents.cache_manager_agent import CacheManagerAgent
    from src.swarm.swarm_coordinator import SwarmCoordinator

    print("✅ MEC orchestration components imported successfully")
except ImportError as e:
    print(f"❌ Failed to import MEC components: {e}")
    print("   Make sure you're running from the project root directory")

## 2. Test Individual Strands Agents

Let's test each specialized agent individually to understand their behavior and system prompts.


In [None]:
# Test OrchestratorAgent
print("=== Testing OrchestratorAgent ===")
orchestrator = OrchestratorAgent(mec_site="MEC_A")
print(f"Agent ID: {orchestrator.agent_id}")
print(f"MEC Site: {orchestrator.mec_site}")
print(f"Status: {orchestrator.get_agent_status()}")
print(f"System Prompt Preview: {orchestrator.agent.system_prompt[:200]}...")

In [None]:
# Test LoadBalancerAgent
print("=== Testing LoadBalancerAgent ===")
load_balancer = LoadBalancerAgent(mec_site="MEC_B")
print(f"Agent ID: {load_balancer.agent_id}")
print(f"Specialization: {load_balancer.get_agent_status()['specialization']}")
print(f"System Prompt Preview: {load_balancer.agent.system_prompt[:200]}...")

In [None]:
# Test DecisionCoordinatorAgent
print("=== Testing DecisionCoordinatorAgent ===")
decision_coordinator = DecisionCoordinatorAgent(mec_site="MEC_C")
print(f"Agent ID: {decision_coordinator.agent_id}")
print(
    f"Specialization: {decision_coordinator.get_agent_status()['specialization']}"
)
print(
    f"System Prompt Preview: {decision_coordinator.agent.system_prompt[:200]}..."
)

In [None]:
# Test ResourceMonitorAgent
print("=== Testing ResourceMonitorAgent ===")
resource_monitor = ResourceMonitorAgent(mec_site="MEC_A")
print(f"Agent ID: {resource_monitor.agent_id}")
print(
    f"Specialization: {resource_monitor.get_agent_status()['specialization']}"
)
print(
    f"System Prompt Preview: {resource_monitor.agent.system_prompt[:200]}..."
)

In [None]:
# Test CacheManagerAgent
print("=== Testing CacheManagerAgent ===")
cache_manager = CacheManagerAgent(mec_site="MEC_B")
print(f"Agent ID: {cache_manager.agent_id}")
print(f"Specialization: {cache_manager.get_agent_status()['specialization']}")
print(f"System Prompt Preview: {cache_manager.agent.system_prompt[:200]}...")

## 3. Create and Test Strands Swarm

Now let's create a Strands swarm with our specialized agents and test the coordination.


In [None]:
# Create Strands Swarm with our agents
print("=== Creating Strands Swarm ===")

# Create the swarm with orchestrator as entry point
swarm = Swarm(
    agents=[
        orchestrator.agent,
        load_balancer.agent,
        decision_coordinator.agent,
        resource_monitor.agent,
        cache_manager.agent,
    ],
    entry_point=orchestrator.agent,
    max_handoffs=10,
    max_iterations=15,
    execution_timeout=30.0,  # 30 seconds for testing
    node_timeout=10.0,  # 10 seconds per agent
    repetitive_handoff_detection_window=6,
    repetitive_handoff_min_unique_agents=3,
)

print(f"✅ Swarm created with {len(swarm.agents)} agents")
print(f"Entry point: {swarm.entry_point.name}")
print(f"Max handoffs: {swarm.max_handoffs}")
print(f"Execution timeout: {swarm.execution_timeout}s")

## 4. Test Simple Swarm Coordination

Let's test the swarm with a simple MEC orchestration scenario.


In [None]:
# Test simple swarm coordination
print("=== Testing Simple Swarm Coordination ===")

simple_task = """
MEC ORCHESTRATION REQUEST

Scenario: Gaming application experiencing high latency
Current Site: MEC_A (CPU: 85%, GPU: 90%, Latency: 120ms)
Available Sites: MEC_B (CPU: 60%, GPU: 55%), MEC_C (CPU: 40%, GPU: 35%)

Task: Coordinate as a swarm to select the optimal MEC site for load balancing.
Each agent should contribute their expertise:
- LoadBalancer: Assess site capacity and performance
- ResourceMonitor: Provide current metrics analysis
- CacheManager: Consider model availability and cache performance
- DecisionCoordinator: Facilitate consensus and make final decision

Target: Sub-100ms total coordination time
"""

start_time = time.perf_counter()

try:
    # Execute swarm coordination
    result = swarm(simple_task)

    coordination_time = (time.perf_counter() - start_time) * 1000

    print(f"\n✅ Swarm coordination completed in {coordination_time:.2f}ms")
    print(f"Status: {result.status}")
    print(f"Execution count: {result.execution_count}")
    print(f"Execution time: {result.execution_time}ms")

    # Show agent participation
    print(f"\nAgent participation:")
    for i, node in enumerate(result.node_history):
        print(f"  {i+1}. {node.node_id}")

    # Show final result
    print(f"\nFinal Result:")
    print(
        result.result[:500] + "..."
        if len(result.result) > 500
        else result.result
    )

except Exception as e:
    coordination_time = (time.perf_counter() - start_time) * 1000
    print(f"❌ Swarm coordination failed after {coordination_time:.2f}ms")
    print(f"Error: {e}")

## 5. Test Threshold-Triggered Swarm Activation

Now let's test the complete integration: ThresholdMonitor → SwarmCoordinator → Strands Swarm


In [None]:
# Initialize threshold monitoring and swarm coordination
print("=== Setting up Threshold-Triggered Swarm System ===")

# Create threshold configuration
thresholds = ThresholdConfig()
print(
    f"Thresholds: CPU {thresholds.cpu_threshold_percent}%, GPU {thresholds.gpu_threshold_percent}%, Latency {thresholds.latency_threshold_ms}ms"
)

# Create threshold monitor
monitor = ThresholdMonitor(thresholds)
print(f"✅ ThresholdMonitor created")

# Create swarm coordinator
coordinator = SwarmCoordinator()
print(f"✅ SwarmCoordinator created with {len(coordinator.agents)} agents")

# Connect threshold monitor to swarm coordinator
monitor.add_breach_callback(coordinator.activate_swarm)
print(f"✅ Threshold monitor connected to swarm coordinator")

# Show system status
swarm_status = coordinator.get_swarm_status()
print(f"\nSystem Status:")
print(f"  Swarm state: {swarm_status['state']}")
print(f"  Total agents: {swarm_status['total_agents']}")
print(
    f"  Healthy MEC sites: {swarm_status['healthy_sites']}/{swarm_status['total_sites']}"
)
print(f"  Swarm available: {swarm_status['swarm_available']}")

In [None]:
# Test threshold breach scenario
print("=== Testing Threshold Breach Scenario ===")

# Create metrics that will trigger threshold breach
breach_metrics = MECMetrics(
    site_id="MEC_A",
    cpu_utilization=95.0,  # Exceeds 80% threshold
    gpu_utilization=92.0,  # Exceeds 80% threshold
    memory_utilization=75.0,
    queue_depth=60,  # Exceeds 50 threshold
    response_time_ms=150.0,  # Exceeds 100ms threshold
    network_latency={"MEC_B": 18.0, "MEC_C": 22.0},
    timestamp=datetime.now(UTC),
)

print(
    f"Breach metrics: CPU {breach_metrics.cpu_utilization}%, GPU {breach_metrics.gpu_utilization}%, Latency {breach_metrics.response_time_ms}ms"
)

# Monitor thresholds (this will trigger swarm if breaches detected)
start_time = time.perf_counter()

threshold_events = monitor.check_thresholds(breach_metrics)

total_time = (time.perf_counter() - start_time) * 1000

print(f"\n📊 Threshold Check Results:")
print(f"  Events generated: {len(threshold_events)}")
print(f"  Total time: {total_time:.2f}ms")

for event in threshold_events:
    print(
        f"  🚨 {event.metric_name}: {event.current_value} > {event.threshold_value} (severity: {event.severity.value})"
    )

In [None]:
# Check swarm coordination results
print("=== Swarm Coordination Results ===")

swarm_events = coordinator.get_event_history()
print(f"Swarm events generated: {len(swarm_events)}")

if swarm_events:
    latest_event = swarm_events[-1]
    print(f"\n📋 Latest Swarm Event:")
    print(f"  Event type: {latest_event['event_type']}")
    print(f"  Success: {latest_event['success']}")
    print(f"  Duration: {latest_event['duration_ms']}ms")
    print(f"  Participants: {latest_event['participants']}")

    if latest_event.get("decision"):
        decision = latest_event["decision"]
        print(f"\n🎯 Swarm Decision:")
        print(f"  Selected site: {decision['selected_site']}")
        print(f"  Confidence: {decision['confidence_score']:.2f}")
        print(f"  Execution time: {decision['execution_time_ms']}ms")
        print(f"  Reasoning: {decision['reasoning']}")

        if decision.get("participants"):
            print(
                f"  Participating agents: {', '.join(decision['participants'])}"
            )
else:
    print("No swarm events generated - check if thresholds were breached")

## 6. Performance Analysis

Let's analyze the performance of our swarm coordination system.


In [None]:
# Performance analysis
print("=== Performance Analysis ===")

# Get swarm metrics
swarm_metrics = coordinator.get_swarm_metrics()
print(f"\n📈 Swarm Performance Metrics:")
print(f"  Total decisions: {swarm_metrics['total_decisions']}")
print(f"  Total events: {swarm_metrics['total_events']}")
print(f"  Agent count: {swarm_metrics['agent_count']}")
print(f"  Execution timeout: {swarm_metrics['execution_timeout']}s")
print(f"  Max handoffs: {swarm_metrics['max_handoffs']}")

# Analyze timing performance
if swarm_events:
    total_orchestration_time = sum(
        event["duration_ms"] for event in swarm_events
    )
    avg_orchestration_time = total_orchestration_time / len(swarm_events)

    print(f"\n⏱️  Timing Analysis:")
    print(f"  Total orchestration time: {total_orchestration_time:.2f}ms")
    print(f"  Average orchestration time: {avg_orchestration_time:.2f}ms")
    print(f"  Target: <100ms per decision")

    if avg_orchestration_time < 100:
        print(f"  ✅ PERFORMANCE TARGET MET!")
    else:
        print(f"  ⚠️  Performance target not met - optimization needed")

# Agent status summary
agent_status = coordinator.get_agent_status()
print(f"\n🤖 Agent Status Summary:")
for agent_name, status in agent_status.items():
    print(
        f"  {status['agent_type']}: {status['status']} (site: {status['mec_site']})"
    )

## 7. Test Multiple Scenarios

Let's test different threshold breach scenarios to see how the swarm responds.


In [None]:
# Test multiple scenarios
print("=== Testing Multiple Scenarios ===")

scenarios = [
    {
        "name": "Gaming - High CPU Load",
        "metrics": MECMetrics(
            site_id="MEC_A",
            cpu_utilization=88.0,
            gpu_utilization=75.0,
            memory_utilization=65.0,
            queue_depth=45,
            response_time_ms=85.0,
            network_latency={"MEC_B": 15.0, "MEC_C": 20.0},
            timestamp=datetime.now(UTC),
        ),
    },
    {
        "name": "Automotive - High Latency",
        "metrics": MECMetrics(
            site_id="MEC_B",
            cpu_utilization=70.0,
            gpu_utilization=65.0,
            memory_utilization=60.0,
            queue_depth=35,
            response_time_ms=125.0,  # High latency
            network_latency={"MEC_A": 25.0, "MEC_C": 18.0},
            timestamp=datetime.now(UTC),
        ),
    },
    {
        "name": "Healthcare - Queue Overload",
        "metrics": MECMetrics(
            site_id="MEC_C",
            cpu_utilization=75.0,
            gpu_utilization=70.0,
            memory_utilization=68.0,
            queue_depth=65,  # High queue depth
            response_time_ms=95.0,
            network_latency={"MEC_A": 20.0, "MEC_B": 22.0},
            timestamp=datetime.now(UTC),
        ),
    },
]

scenario_results = []

for i, scenario in enumerate(scenarios):
    print(f"\n--- Scenario {i+1}: {scenario['name']} ---")

    start_time = time.perf_counter()

    # Check thresholds
    events = monitor.check_thresholds(scenario["metrics"])

    scenario_time = (time.perf_counter() - start_time) * 1000

    result = {
        "scenario": scenario["name"],
        "events": len(events),
        "time_ms": scenario_time,
        "breaches": [f"{e.metric_name}:{e.current_value}" for e in events],
    }

    scenario_results.append(result)

    print(f"  Events: {len(events)}")
    print(f"  Time: {scenario_time:.2f}ms")
    for event in events:
        print(
            f"    🚨 {event.metric_name}: {event.current_value} (severity: {event.severity.value})"
        )

# Summary
print(f"\n📊 Scenario Summary:")
for result in scenario_results:
    print(
        f"  {result['scenario']}: {result['events']} events, {result['time_ms']:.2f}ms"
    )

## 8. Visualize Agent Handoffs

Let's create a simple visualization of how agents hand off to each other in the swarm.


In [None]:
# Visualize agent handoffs (if we have swarm results)
print("=== Agent Handoff Visualization ===")

if swarm_events and swarm_events[-1].get("decision"):
    latest_decision = swarm_events[-1]["decision"]
    participants = latest_decision.get("participants", [])

    print(f"\n🔄 Agent Handoff Flow:")
    print(f"  Entry Point: {orchestrator.agent_id}")

    if participants:
        print(f"  Participants: {len(participants)} agents")
        for i, agent_id in enumerate(participants):
            arrow = " → " if i < len(participants) - 1 else ""
            print(f"    {i+1}. {agent_id}{arrow}")

    # Create a simple text-based flow diagram
    print(f"\n📋 Coordination Flow:")
    print(f"  1. 🚨 Threshold Breach Detected")
    print(f"  2. 🤖 OrchestratorAgent activates swarm")
    print(f"  3. 🔄 Agents collaborate via handoffs")
    print(f"  4. 🎯 DecisionCoordinator reaches consensus")
    print(f"  5. ✅ Decision executed: {latest_decision['selected_site']}")
    print(f"  6. 📊 Performance: {latest_decision['execution_time_ms']}ms")
else:
    print("No swarm coordination results available for visualization")
    print("Try running a threshold breach scenario first")

## 9. Test Error Handling and Edge Cases

Let's test how the system handles various error conditions and edge cases.


In [None]:
# Test edge cases
print("=== Testing Edge Cases ===")

# Test 1: Normal metrics (no breach)
print("\n1. Testing normal metrics (no threshold breach):")
normal_metrics = MECMetrics(
    site_id="MEC_A",
    cpu_utilization=45.0,
    gpu_utilization=40.0,
    memory_utilization=50.0,
    queue_depth=25,
    response_time_ms=75.0,
    network_latency={"MEC_B": 15.0, "MEC_C": 18.0},
    timestamp=datetime.now(UTC),
)

normal_events = monitor.check_thresholds(normal_metrics)
print(f"  Events generated: {len(normal_events)} (expected: 0)")

# Test 2: Extreme values
print("\n2. Testing extreme threshold breaches:")
extreme_metrics = MECMetrics(
    site_id="MEC_A",
    cpu_utilization=99.0,
    gpu_utilization=98.0,
    memory_utilization=95.0,
    queue_depth=100,
    response_time_ms=500.0,
    network_latency={"MEC_B": 50.0, "MEC_C": 60.0},
    timestamp=datetime.now(UTC),
)

extreme_events = monitor.check_thresholds(extreme_metrics)
print(f"  Events generated: {len(extreme_events)}")
for event in extreme_events:
    print(
        f"    🔥 {event.metric_name}: {event.current_value} (severity: {event.severity.value})"
    )

# Test 3: System status after multiple tests
print("\n3. Final system status:")
final_status = coordinator.get_swarm_status()
final_metrics = coordinator.get_swarm_metrics()

print(f"  Swarm state: {final_status['state']}")
print(f"  Total decisions made: {final_metrics['total_decisions']}")
print(f"  Total events logged: {final_metrics['total_events']}")
print(
    f"  All agents operational: {final_status['total_agents'] == len(coordinator.agents)}"
)

## 10. Summary and Next Steps

Let's summarize our testing results and identify areas for improvement.


In [None]:
# Final summary
print("=== Testing Summary ===")

# Collect all results
total_events = len(coordinator.get_event_history())
total_decisions = coordinator.get_swarm_metrics()["total_decisions"]

print(f"\n📊 Test Results:")
print(f"  ✅ Strands agents created and configured: 5")
print(f"  ✅ Swarm coordination system: Operational")
print(f"  ✅ Threshold monitoring integration: Working")
print(f"  ✅ Total swarm events generated: {total_events}")
print(f"  ✅ Total decisions made: {total_decisions}")

# Performance assessment
if swarm_events:
    avg_time = sum(e["duration_ms"] for e in swarm_events) / len(swarm_events)
    performance_status = (
        "✅ MET" if avg_time < 100 else "⚠️  NEEDS OPTIMIZATION"
    )
    print(f"  🎯 Performance target (<100ms): {performance_status}")
    print(f"     Average coordination time: {avg_time:.2f}ms")

print(f"\n🔧 System Architecture Validation:")
print(f"  ✅ OrchestratorAgent: Entry point and coordination trigger")
print(f"  ✅ LoadBalancerAgent: MEC site selection specialist")
print(f"  ✅ DecisionCoordinatorAgent: Consensus management")
print(f"  ✅ ResourceMonitorAgent: Performance monitoring")
print(f"  ✅ CacheManagerAgent: Model caching optimization")

print(f"\n🚀 Next Steps for Development:")
print(f"  1. Implement real MCP tools (metrics_monitor, container_ops, etc.)")
print(f"  2. Add comprehensive unit and integration tests")
print(f"  3. Optimize swarm configuration for sub-100ms targets")
print(f"  4. Create Streamlit dashboard integration")
print(f"  5. Add error handling and fallback mechanisms")
print(f"  6. Implement performance monitoring and alerting")

print(f"\n✅ Strands Swarm Testing Complete!")
print(f"   Ready for Task 3.5: Building comprehensive test suite")

## Appendix: Agent System Prompts

For reference, here are the complete system prompts for each agent:


In [None]:
# Display all agent system prompts for documentation
print("=== Agent System Prompts ===")

agents_info = [
    ("OrchestratorAgent", orchestrator),
    ("LoadBalancerAgent", load_balancer),
    ("DecisionCoordinatorAgent", decision_coordinator),
    ("ResourceMonitorAgent", resource_monitor),
    ("CacheManagerAgent", cache_manager),
]

for agent_name, agent_obj in agents_info:
    print(f"\n--- {agent_name} ---")
    print(f"Agent ID: {agent_obj.agent_id}")
    print(f"MEC Site: {agent_obj.mec_site}")
    print(f"System Prompt:")
    print(agent_obj.agent.system_prompt)
    print("-" * 80)