<a href="https://colab.research.google.com/github/ShaliniAnandaPhD/Neuron/blob/main/Tutorial_9_Circuit_Design_Your_First_Agent_Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In previous tutorials, you've built individual agents with communication, memory, rules, monitoring, configuration, and CLI management. Now we're creating circuits - structured networks of agents that work together to solve complex problems through coordinated workflows and specialized roles.

 What you'll build:

 • Circuit framework for connecting agents in structured workflows

 • Sequential, parallel, and hierarchical processing patterns

 • Agent specialization and role-based task distribution

 • Circuit templates and reusable network patterns

 • Dynamic circuit reconfiguration and load balancing

 • Circuit monitoring and performance optimization

Why this matters:

Real-world AI systems rarely use single agents. Instead, they employ networks
of specialized agents working together - like a research team where each person has expertise in different areas. Circuits provide the structure for building these collaborative AI systems at scale.

By the end, you'll understand:

• How to design effective agent collaboration patterns

• Sequential vs parallel vs hierarchical processing strategies

• Agent specialization and role-based architectures

• Circuit lifecycle management and optimization

• Scaling strategies for large agent networks

In [15]:
print("Tutorial 9: Circuit Design - Your First Agent Network")
print("=" * 54)
print()
print("Building structured agent networks for collaborative problem solving...")
print()

Tutorial 9: Circuit Design - Your First Agent Network

Building structured agent networks for collaborative problem solving...



In [2]:
# Essential imports
import uuid
import time
import threading
import queue
import json
import copy
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional, Set, Callable, Union, Tuple
from enum import Enum
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor, as_completed


In [3]:
# Import our foundation from previous tutorials
AgentID = str
MessageID = str
CircuitID = str

class MessagePriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    URGENT = 4

@dataclass
class Message:
    id: MessageID
    sender: AgentID
    recipients: List[AgentID]
    content: Any
    priority: MessagePriority = MessagePriority.NORMAL
    metadata: Dict[str, Any] = field(default_factory=dict)
    created_at: float = field(default_factory=time.time)

    @classmethod
    def create(cls, sender: AgentID, recipients: List[AgentID], content: Any,
               priority: MessagePriority = MessagePriority.NORMAL) -> 'Message':
        return cls(
            id=str(uuid.uuid4()),
            sender=sender,
            recipients=recipients,
            content=content,
            priority=priority
        )

class CircuitTopology(Enum):
    """
    Different circuit topology patterns for agent networks

    These define how agents are connected and how data flows
    through the circuit for different types of processing.
    """
    SEQUENTIAL = "sequential"      # Linear chain: A → B → C → D
    PARALLEL = "parallel"          # Parallel branches: A → [B,C,D] → E
    HIERARCHICAL = "hierarchical"  # Tree structure: A → [B,C] → [D,E,F,G] → H
    MESH = "mesh"                 # Fully connected: A ↔ B ↔ C ↔ D
    PIPELINE = "pipeline"         # Multi-stage pipeline with feedback
    STAR = "star"                 # Hub and spoke: B,C,D ↔ A ↔ E,F,G

class ExecutionMode(Enum):
    """How the circuit executes tasks"""
    SYNCHRONOUS = "sync"          # Wait for each stage to complete
    ASYNCHRONOUS = "async"        # Process tasks as they arrive
    BATCH = "batch"              # Process tasks in batches
    STREAMING = "streaming"       # Continuous data processing

@dataclass
class CircuitConnection:
    """
    A connection between agents in a circuit

    This defines how data flows from one agent to another,
    including any transformation or filtering rules.
    """
    source_agent: AgentID                          # Agent sending data
    target_agent: AgentID                          # Agent receiving data
    connection_type: str = "default"               # Type of connection
    weight: float = 1.0                           # Connection weight/priority
    enabled: bool = True                          # Is connection active
    transform_function: Optional[Callable] = None # Data transformation
    filter_function: Optional[Callable] = None   # Data filtering
    metadata: Dict[str, Any] = field(default_factory=dict)

    def process_data(self, data: Any) -> Tuple[bool, Any]:
        """
        Process data through this connection

        Returns (should_forward, processed_data)
        """
        if not self.enabled:
            return False, data

        processed_data = data

        # Apply filter first
        if self.filter_function:
            try:
                if not self.filter_function(data):
                    return False, data
            except Exception as e:
                print(f"⚠️  Filter error in connection {self.source_agent[:8]}... → {self.target_agent[:8]}...: {e}")
                return False, data

        # Apply transformation
        if self.transform_function:
            try:
                processed_data = self.transform_function(data)
            except Exception as e:
                print(f"⚠️  Transform error in connection {self.source_agent[:8]}... → {self.target_agent[:8]}...: {e}")
                return False, data

        return True, processed_data

@dataclass
class CircuitTask:
    """
    A task that flows through the circuit

    This represents work that needs to be processed by
    the circuit, including input data and metadata.
    """
    id: str
    data: Any                                     # Task input data
    task_type: str = "default"                   # Type of task
    priority: MessagePriority = MessagePriority.NORMAL
    created_at: float = field(default_factory=time.time)
    started_at: Optional[float] = None           # When processing started
    completed_at: Optional[float] = None         # When processing completed
    current_stage: str = "input"                # Current processing stage
    results: Dict[str, Any] = field(default_factory=dict)  # Results from each stage
    metadata: Dict[str, Any] = field(default_factory=dict)

    def mark_started(self):
        """Mark task as started"""
        self.started_at = time.time()

    def mark_completed(self):
        """Mark task as completed"""
        self.completed_at = time.time()

    def get_processing_time(self) -> Optional[float]:
        """Get total processing time"""
        if self.started_at and self.completed_at:
            return self.completed_at - self.started_at
        return None

    def add_result(self, stage: str, result: Any):
        """Add result from a processing stage"""
        self.results[stage] = result
        self.current_stage = stage

class CircuitAgent:
    """
    An agent designed to work within circuits

    This extends our basic agent with circuit-specific capabilities
    like stage processing, result forwarding, and coordination.
    """

    def __init__(self, agent_id: Optional[AgentID] = None, name: str = "",
                 role: str = "processor", specialization: str = "general"):
        self.id = agent_id or str(uuid.uuid4())
        self.name = name or f"agent_{self.id[:8]}"
        self.role = role  # processor, coordinator, aggregator, filter, etc.
        self.specialization = specialization  # domain expertise

        # Circuit integration
        self.circuit: Optional['Circuit'] = None
        self.stage_name: str = ""             # Stage in the circuit
        self.input_connections: List[CircuitConnection] = []
        self.output_connections: List[CircuitConnection] = []

        # Processing
        self._task_queue = queue.PriorityQueue()
        self._result_queue = queue.Queue()
        self._stop_event = threading.Event()
        self._processing_thread = None
        self._running = False

        # Performance tracking
        self.tasks_processed = 0
        self.processing_time_total = 0.0
        self.errors = 0
        self.last_activity = None

        print(f"🔧 Created CircuitAgent: {self.name} ({self.role}, {self.specialization})")

    def start(self) -> bool:
        """Start the agent"""
        if self._running:
            return False

        try:
            self._stop_event.clear()
            self._processing_thread = threading.Thread(
                target=self._processing_loop,
                daemon=True,
                name=f"CircuitAgent-{self.name}"
            )
            self._processing_thread.start()
            self._running = True

            print(f"▶️  Started CircuitAgent: {self.name}")
            return True

        except Exception as e:
            print(f"❌ Failed to start agent {self.name}: {e}")
            return False

    def stop(self) -> bool:
        """Stop the agent"""
        if not self._running:
            return False

        try:
            self._stop_event.set()

            if self._processing_thread:
                self._processing_thread.join(timeout=2.0)

            self._running = False
            print(f"⏹️  Stopped CircuitAgent: {self.name}")
            return True

        except Exception as e:
            print(f"❌ Failed to stop agent {self.name}: {e}")
            return False

    def add_input_connection(self, connection: CircuitConnection):
        """Add an input connection"""
        self.input_connections.append(connection)

    def add_output_connection(self, connection: CircuitConnection):
        """Add an output connection"""
        self.output_connections.append(connection)

    def receive_task(self, task: CircuitTask, priority: MessagePriority = MessagePriority.NORMAL):
        """Receive a task for processing"""
        priority_value = 5 - priority.value  # Invert for queue ordering
        self._task_queue.put((priority_value, time.time(), task))
        self.last_activity = time.time()

    def process_task(self, task: CircuitTask) -> Any:
        """
        Process a task - override in subclasses for specific behavior

        This is where the agent's domain expertise is applied.
        """
        # Default processing: echo the input
        result = f"Processed by {self.name}: {task.data}"

        # Simulate processing time based on specialization
        processing_time = {
            "fast": 0.01,
            "general": 0.02,
            "analytical": 0.05,
            "complex": 0.1
        }.get(self.specialization, 0.02)

        time.sleep(processing_time)

        return result

    def forward_result(self, task: CircuitTask, result: Any):
        """Forward result to connected agents"""
        if not self.output_connections:
            # No output connections - this might be a final stage
            if self.circuit:
                self.circuit._handle_final_result(task, result)
            return

        # Send to all connected agents
        for connection in self.output_connections:
            if not connection.enabled:
                continue

            # Process data through connection
            should_forward, processed_result = connection.process_data(result)

            if should_forward and self.circuit:
                target_agent = self.circuit.get_agent(connection.target_agent)
                if target_agent:
                    # Create new task for next stage
                    next_task = CircuitTask(
                        id=task.id,  # Keep same task ID
                        data=processed_result,
                        task_type=task.task_type,
                        priority=task.priority,
                        created_at=task.created_at,
                        started_at=task.started_at,
                        current_stage=target_agent.stage_name,
                        results=task.results.copy(),
                        metadata=task.metadata.copy()
                    )

                    target_agent.receive_task(next_task, task.priority)

    def _processing_loop(self):
        """Main processing loop"""
        while not self._stop_event.is_set():
            try:
                # Get task with timeout
                priority_value, timestamp, task = self._task_queue.get(timeout=0.1)

                start_time = time.time()

                # Process the task
                result = self.process_task(task)

                # Update task with result
                task.add_result(self.stage_name, result)

                # Forward result to next stage
                self.forward_result(task, result)

                # Update metrics
                processing_time = time.time() - start_time
                self.tasks_processed += 1
                self.processing_time_total += processing_time
                self.last_activity = time.time()

                self._task_queue.task_done()

            except queue.Empty:
                continue
            except Exception as e:
                self.errors += 1
                print(f"❌ Error in {self.name}: {e}")

    def get_status(self) -> Dict[str, Any]:
        """Get agent status"""
        avg_processing_time = 0.0
        if self.tasks_processed > 0:
            avg_processing_time = self.processing_time_total / self.tasks_processed

        return {
            'id': self.id,
            'name': self.name,
            'role': self.role,
            'specialization': self.specialization,
            'stage': self.stage_name,
            'running': self._running,
            'tasks_processed': self.tasks_processed,
            'avg_processing_time': avg_processing_time,
            'errors': self.errors,
            'queue_size': self._task_queue.qsize(),
            'input_connections': len(self.input_connections),
            'output_connections': len(self.output_connections),
            'last_activity': self.last_activity
        }

class SpecializedAgent(CircuitAgent):
    """
    Specialized agents for different types of processing

    This demonstrates how to create domain-specific agents
    with different capabilities and processing patterns.
    """

    def __init__(self, agent_id: Optional[AgentID] = None, name: str = "",
                 role: str = "processor", specialization: str = "general",
                 processing_config: Dict[str, Any] = None):
        super().__init__(agent_id, name, role, specialization)
        self.processing_config = processing_config or {}

    def process_task(self, task: CircuitTask) -> Any:
        """Specialized processing based on role and specialization"""

        if self.role == "filter":
            return self._filter_process(task)
        elif self.role == "transformer":
            return self._transform_process(task)
        elif self.role == "aggregator":
            return self._aggregate_process(task)
        elif self.role == "analyzer":
            return self._analyze_process(task)
        elif self.role == "validator":
            return self._validate_process(task)
        else:
            return super().process_task(task)

    def _filter_process(self, task: CircuitTask) -> Any:
        """Filter processing: decide what data to pass through"""
        data = task.data

        # Example filters based on specialization
        if self.specialization == "text":
            if isinstance(data, str) and len(data) > 10:
                return data
            return None
        elif self.specialization == "numeric":
            if isinstance(data, (int, float)) and data > 0:
                return data
            return None
        else:
            return data  # Pass through everything

    def _transform_process(self, task: CircuitTask) -> Any:
        """Transform processing: modify the data"""
        data = task.data

        if self.specialization == "text":
            if isinstance(data, str):
                return data.upper().strip()
        elif self.specialization == "numeric":
            if isinstance(data, (int, float)):
                return data * 2
        elif self.specialization == "json":
            try:
                if isinstance(data, str):
                    return json.loads(data)
                else:
                    return json.dumps(data)
            except:
                return data

        return data

    def _aggregate_process(self, task: CircuitTask) -> Any:
        """Aggregate processing: combine multiple inputs"""
        # For aggregators, we might wait for multiple inputs
        # This is a simplified version

        if isinstance(task.data, list):
            if self.specialization == "sum":
                return sum(x for x in task.data if isinstance(x, (int, float)))
            elif self.specialization == "concat":
                return " ".join(str(x) for x in task.data)
            elif self.specialization == "count":
                return len(task.data)

        return task.data

    def _analyze_process(self, task: CircuitTask) -> Any:
        """Analysis processing: extract insights"""
        data = task.data

        analysis = {
            'input': data,
            'type': type(data).__name__,
            'analyzed_by': self.name,
            'timestamp': time.time()
        }

        if self.specialization == "text":
            if isinstance(data, str):
                analysis.update({
                    'length': len(data),
                    'word_count': len(data.split()),
                    'has_numbers': any(c.isdigit() for c in data)
                })
        elif self.specialization == "numeric":
            if isinstance(data, (int, float)):
                analysis.update({
                    'value': data,
                    'is_positive': data > 0,
                    'magnitude': abs(data)
                })

        return analysis

    def _validate_process(self, task: CircuitTask) -> Any:
        """Validation processing: check data quality"""
        data = task.data

        validation = {
            'data': data,
            'valid': True,
            'issues': [],
            'validated_by': self.name
        }

        if self.specialization == "text":
            if isinstance(data, str):
                if len(data.strip()) == 0:
                    validation['valid'] = False
                    validation['issues'].append('Empty or whitespace-only text')
                if len(data) > 1000:
                    validation['issues'].append('Text too long')
        elif self.specialization == "numeric":
            if isinstance(data, (int, float)):
                if data < 0:
                    validation['issues'].append('Negative value')
                if abs(data) > 1000000:
                    validation['issues'].append('Value too large')
            else:
                validation['valid'] = False
                validation['issues'].append('Not a numeric value')

        return validation

class Circuit:
    """
    A structured network of agents working together

    This manages the overall workflow, routing, and coordination
    of agents in a specific topology to solve problems.
    """

    def __init__(self, circuit_id: Optional[CircuitID] = None, name: str = "",
                 topology: CircuitTopology = CircuitTopology.SEQUENTIAL,
                 execution_mode: ExecutionMode = ExecutionMode.SYNCHRONOUS):
        self.id = circuit_id or str(uuid.uuid4())
        self.name = name or f"circuit_{self.id[:8]}"
        self.topology = topology
        self.execution_mode = execution_mode

        # Circuit structure
        self.agents: Dict[AgentID, CircuitAgent] = {}
        self.connections: List[CircuitConnection] = []
        self.stages: Dict[str, List[AgentID]] = {}  # Stage name -> Agent IDs

        # Task management
        self.input_queue = queue.Queue()
        self.output_queue = queue.Queue()
        self.active_tasks: Dict[str, CircuitTask] = {}

        # Circuit control
        self._running = False
        self._stop_event = threading.Event()
        self._coordinator_thread = None

        # Performance tracking
        self.tasks_completed = 0
        self.total_processing_time = 0.0
        self.errors = 0
        self.throughput_history = deque(maxlen=100)

        print(f"🔌 Created Circuit: {self.name} ({topology.value}, {execution_mode.value})")

    def add_agent(self, agent: CircuitAgent, stage: str = "default") -> bool:
        """Add an agent to the circuit"""
        if agent.id in self.agents:
            print(f"⚠️  Agent {agent.name} already in circuit")
            return False

        self.agents[agent.id] = agent
        agent.circuit = self
        agent.stage_name = stage

        # Add to stage
        if stage not in self.stages:
            self.stages[stage] = []
        self.stages[stage].append(agent.id)

        print(f"➕ Added agent {agent.name} to circuit {self.name} (stage: {stage})")
        return True

    def remove_agent(self, agent_id: AgentID) -> bool:
        """Remove an agent from the circuit"""
        if agent_id not in self.agents:
            return False

        agent = self.agents[agent_id]

        # Remove connections
        self.connections = [c for c in self.connections
                          if c.source_agent != agent_id and c.target_agent != agent_id]

        # Remove from stage
        for stage_agents in self.stages.values():
            if agent_id in stage_agents:
                stage_agents.remove(agent_id)

        # Remove from circuit
        del self.agents[agent_id]
        agent.circuit = None

        print(f"➖ Removed agent {agent.name} from circuit {self.name}")
        return True

    def add_connection(self, source: AgentID, target: AgentID,
                      connection_type: str = "default", weight: float = 1.0,
                      transform_function: Callable = None,
                      filter_function: Callable = None) -> bool:
        """Add a connection between agents"""
        if source not in self.agents or target not in self.agents:
            print(f"⚠️  Cannot connect: source or target agent not in circuit")
            return False

        connection = CircuitConnection(
            source_agent=source,
            target_agent=target,
            connection_type=connection_type,
            weight=weight,
            transform_function=transform_function,
            filter_function=filter_function
        )

        self.connections.append(connection)

        # Update agent connections
        self.agents[source].add_output_connection(connection)
        self.agents[target].add_input_connection(connection)

        source_name = self.agents[source].name
        target_name = self.agents[target].name
        print(f"🔗 Connected {source_name} → {target_name} ({connection_type})")

        return True

    def get_agent(self, agent_id: AgentID) -> Optional[CircuitAgent]:
        """Get an agent by ID"""
        return self.agents.get(agent_id)

    def start(self) -> bool:
        """Start the circuit"""
        if self._running:
            print(f"⚠️  Circuit {self.name} is already running")
            return False

        try:
            # Start all agents
            for agent in self.agents.values():
                if not agent.start():
                    print(f"❌ Failed to start agent {agent.name}")
                    return False

            # Start coordinator
            self._stop_event.clear()
            self._coordinator_thread = threading.Thread(
                target=self._coordination_loop,
                daemon=True,
                name=f"Circuit-{self.name}"
            )
            self._coordinator_thread.start()
            self._running = True

            print(f"▶️  Started circuit: {self.name} with {len(self.agents)} agents")
            return True

        except Exception as e:
            print(f"❌ Failed to start circuit {self.name}: {e}")
            return False

    def stop(self) -> bool:
        """Stop the circuit"""
        if not self._running:
            return False

        try:
            self._stop_event.set()

            # Stop coordinator
            if self._coordinator_thread:
                self._coordinator_thread.join(timeout=2.0)

            # Stop all agents
            for agent in self.agents.values():
                agent.stop()

            self._running = False
            print(f"⏹️  Stopped circuit: {self.name}")
            return True

        except Exception as e:
            print(f"❌ Failed to stop circuit {self.name}: {e}")
            return False

    def submit_task(self, data: Any, task_type: str = "default",
                   priority: MessagePriority = MessagePriority.NORMAL) -> str:
        """Submit a task to the circuit"""
        task = CircuitTask(
            id=str(uuid.uuid4()),
            data=data,
            task_type=task_type,
            priority=priority
        )

        self.input_queue.put(task)
        self.active_tasks[task.id] = task

        print(f"📥 Submitted task {task.id[:8]}... to circuit {self.name}")
        return task.id

    def get_result(self, timeout: float = None) -> Optional[Tuple[str, Any]]:
        """Get a completed task result"""
        try:
            task_id, result = self.output_queue.get(timeout=timeout)
            return task_id, result
        except queue.Empty:
            return None

    def _coordination_loop(self):
        """Main coordination loop"""
        print(f"🔄 Circuit {self.name} coordination loop started")

        while not self._stop_event.is_set():
            try:
                # Process input tasks
                try:
                    task = self.input_queue.get_nowait()
                    self._route_initial_task(task)
                except queue.Empty:
                    pass

                # Check for completed tasks timeout
                self._check_task_timeouts()

                time.sleep(0.01)

            except Exception as e:
                self.errors += 1
                print(f"❌ Error in circuit coordination: {e}")

        print(f"🛑 Circuit {self.name} coordination loop stopped")

    def _route_initial_task(self, task: CircuitTask):
        """Route a task to the first stage of the circuit"""
        task.mark_started()

        # Find input stage based on topology
        if self.topology == CircuitTopology.SEQUENTIAL:
            # Route to first stage
            if self.stages:
                first_stage = list(self.stages.keys())[0]
                first_agents = self.stages[first_stage]
                if first_agents:
                    agent = self.agents[first_agents[0]]
                    agent.receive_task(task, task.priority)

        elif self.topology == CircuitTopology.PARALLEL:
            # Route to all first-stage agents
            if "input" in self.stages:
                for agent_id in self.stages["input"]:
                    agent = self.agents[agent_id]
                    agent.receive_task(task, task.priority)

        else:
            # For other topologies, route to agents with no input connections
            for agent in self.agents.values():
                if not agent.input_connections:
                    agent.receive_task(task, task.priority)
                    break

    def _handle_final_result(self, task: CircuitTask, result: Any):
        """Handle a final result from the circuit"""
        task.mark_completed()

        # Update metrics
        processing_time = task.get_processing_time()
        if processing_time:
            self.total_processing_time += processing_time
            self.throughput_history.append(1.0 / processing_time)  # Tasks per second

        self.tasks_completed += 1

        # Output result
        self.output_queue.put((task.id, result))

        # Clean up
        if task.id in self.active_tasks:
            del self.active_tasks[task.id]

        print(f"✅ Circuit {self.name} completed task {task.id[:8]}... in {processing_time:.3f}s")

    def _check_task_timeouts(self):
        """Check for tasks that might have timed out"""
        current_time = time.time()
        timeout_threshold = 30.0  # 30 seconds

        timed_out_tasks = []
        for task_id, task in self.active_tasks.items():
            if task.started_at and (current_time - task.started_at) > timeout_threshold:
                timed_out_tasks.append(task_id)

        for task_id in timed_out_tasks:
            print(f"⏰ Task {task_id[:8]}... timed out in circuit {self.name}")
            del self.active_tasks[task_id]
            self.errors += 1

    def get_status(self) -> Dict[str, Any]:
        """Get circuit status"""
        avg_processing_time = 0.0
        if self.tasks_completed > 0:
            avg_processing_time = self.total_processing_time / self.tasks_completed

        avg_throughput = 0.0
        if self.throughput_history:
            avg_throughput = sum(self.throughput_history) / len(self.throughput_history)

        return {
            'id': self.id,
            'name': self.name,
            'topology': self.topology.value,
            'execution_mode': self.execution_mode.value,
            'running': self._running,
            'agents': len(self.agents),
            'connections': len(self.connections),
            'stages': len(self.stages),
            'active_tasks': len(self.active_tasks),
            'tasks_completed': self.tasks_completed,
            'avg_processing_time': avg_processing_time,
            'avg_throughput': avg_throughput,
            'errors': self.errors,
            'input_queue_size': self.input_queue.qsize(),
            'output_queue_size': self.output_queue.qsize()
        }

    def get_topology_info(self) -> Dict[str, Any]:
        """Get detailed topology information"""
        return {
            'stages': {stage: [self.agents[aid].name for aid in agent_ids]
                      for stage, agent_ids in self.stages.items()},
            'connections': [
                {
                    'source': self.agents[conn.source_agent].name,
                    'target': self.agents[conn.target_agent].name,
                    'type': conn.connection_type,
                    'weight': conn.weight,
                    'enabled': conn.enabled
                }
                for conn in self.connections
            ]
        }

class CircuitBuilder:
    """
    Builder for creating common circuit patterns

    This provides templates and helpers for building standard
    circuit topologies without manual wiring.
    """

    @staticmethod
    def create_sequential_circuit(name: str, agent_configs: List[Dict[str, Any]]) -> Circuit:
        """
        Create a sequential processing circuit: A → B → C → D

        Args:
            name: Circuit name
            agent_configs: List of agent configurations with 'name', 'role', 'specialization'
        """
        circuit = Circuit(
            name=name,
            topology=CircuitTopology.SEQUENTIAL,
            execution_mode=ExecutionMode.SYNCHRONOUS
        )

        agents = []
        for i, config in enumerate(agent_configs):
            agent = SpecializedAgent(
                name=config.get('name', f'agent_{i}'),
                role=config.get('role', 'processor'),
                specialization=config.get('specialization', 'general')
            )
            circuit.add_agent(agent, f"stage_{i}")
            agents.append(agent)

        # Connect agents sequentially
        for i in range(len(agents) - 1):
            circuit.add_connection(agents[i].id, agents[i + 1].id, "sequential")

        print(f"🔗 Created sequential circuit '{name}' with {len(agents)} agents")
        return circuit

    @staticmethod
    def create_parallel_circuit(name: str, input_config: Dict[str, Any],
                              parallel_configs: List[Dict[str, Any]],
                              output_config: Dict[str, Any]) -> Circuit:
        """
        Create a parallel processing circuit: A → [B,C,D] → E

        Args:
            name: Circuit name
            input_config: Configuration for input agent
            parallel_configs: Configurations for parallel processing agents
            output_config: Configuration for output aggregation agent
        """
        circuit = Circuit(
            name=name,
            topology=CircuitTopology.PARALLEL,
            execution_mode=ExecutionMode.ASYNCHRONOUS
        )

        # Create input agent
        input_agent = SpecializedAgent(
            name=input_config.get('name', 'input'),
            role=input_config.get('role', 'distributor'),
            specialization=input_config.get('specialization', 'general')
        )
        circuit.add_agent(input_agent, "input")

        # Create parallel agents
        parallel_agents = []
        for i, config in enumerate(parallel_configs):
            agent = SpecializedAgent(
                name=config.get('name', f'parallel_{i}'),
                role=config.get('role', 'processor'),
                specialization=config.get('specialization', 'general')
            )
            circuit.add_agent(agent, "parallel")
            parallel_agents.append(agent)

        # Create output agent
        output_agent = SpecializedAgent(
            name=output_config.get('name', 'output'),
            role=output_config.get('role', 'aggregator'),
            specialization=output_config.get('specialization', 'general')
        )
        circuit.add_agent(output_agent, "output")

        # Connect input to all parallel agents
        for agent in parallel_agents:
            circuit.add_connection(input_agent.id, agent.id, "distribute")

        # Connect all parallel agents to output
        for agent in parallel_agents:
            circuit.add_connection(agent.id, output_agent.id, "aggregate")

        print(f"🔗 Created parallel circuit '{name}' with {1 + len(parallel_agents) + 1} agents")
        return circuit

    @staticmethod
    def create_analysis_pipeline(name: str) -> Circuit:
        """Create a predefined data analysis pipeline"""
        return CircuitBuilder.create_sequential_circuit(
            name=name,
            agent_configs=[
                {'name': 'validator', 'role': 'validator', 'specialization': 'general'},
                {'name': 'preprocessor', 'role': 'transformer', 'specialization': 'text'},
                {'name': 'analyzer', 'role': 'analyzer', 'specialization': 'text'},
                {'name': 'summarizer', 'role': 'aggregator', 'specialization': 'concat'}
            ]
        )

class CircuitManager:
    """
    Manager for multiple circuits and circuit orchestration

    This handles coordination between multiple circuits and
    provides system-wide monitoring and control.
    """

    def __init__(self):
        self.circuits: Dict[CircuitID, Circuit] = {}
        self.circuit_registry: Dict[str, Circuit] = {}  # Name -> Circuit
        self.running_circuits: Set[CircuitID] = set()

        # System monitoring
        self.system_start_time = time.time()

        print("🏭 CircuitManager initialized")

    def register_circuit(self, circuit: Circuit) -> bool:
        """Register a circuit with the manager"""
        if circuit.id in self.circuits:
            print(f"⚠️  Circuit {circuit.name} already registered")
            return False

        self.circuits[circuit.id] = circuit
        self.circuit_registry[circuit.name] = circuit

        print(f"📋 Registered circuit: {circuit.name}")
        return True

    def start_circuit(self, circuit_name: str) -> bool:
        """Start a circuit by name"""
        if circuit_name not in self.circuit_registry:
            print(f"❌ Circuit not found: {circuit_name}")
            return False

        circuit = self.circuit_registry[circuit_name]
        if circuit.start():
            self.running_circuits.add(circuit.id)
            return True
        return False

    def stop_circuit(self, circuit_name: str) -> bool:
        """Stop a circuit by name"""
        if circuit_name not in self.circuit_registry:
            print(f"❌ Circuit not found: {circuit_name}")
            return False

        circuit = self.circuit_registry[circuit_name]
        if circuit.stop():
            self.running_circuits.discard(circuit.id)
            return True
        return False

    def start_all_circuits(self) -> int:
        """Start all registered circuits"""
        started = 0
        for circuit in self.circuits.values():
            if circuit.start():
                self.running_circuits.add(circuit.id)
                started += 1

        print(f"▶️  Started {started}/{len(self.circuits)} circuits")
        return started

    def stop_all_circuits(self) -> int:
        """Stop all running circuits"""
        stopped = 0
        for circuit_id in list(self.running_circuits):
            circuit = self.circuits[circuit_id]
            if circuit.stop():
                stopped += 1

        self.running_circuits.clear()
        print(f"⏹️  Stopped {stopped} circuits")
        return stopped

    def get_circuit(self, name: str) -> Optional[Circuit]:
        """Get a circuit by name"""
        return self.circuit_registry.get(name)

    def list_circuits(self) -> List[Dict[str, Any]]:
        """List all registered circuits"""
        return [
            {
                'name': circuit.name,
                'id': circuit.id,
                'topology': circuit.topology.value,
                'running': circuit.id in self.running_circuits,
                'agents': len(circuit.agents),
                'tasks_completed': circuit.tasks_completed
            }
            for circuit in self.circuits.values()
        ]

    def get_system_status(self) -> Dict[str, Any]:
        """Get overall system status"""
        uptime = time.time() - self.system_start_time
        total_agents = sum(len(c.agents) for c in self.circuits.values())
        total_tasks = sum(c.tasks_completed for c in self.circuits.values())
        total_errors = sum(c.errors for c in self.circuits.values())

        return {
            'uptime': uptime,
            'total_circuits': len(self.circuits),
            'running_circuits': len(self.running_circuits),
            'total_agents': total_agents,
            'total_tasks_completed': total_tasks,
            'total_errors': total_errors,
            'error_rate': total_errors / max(total_tasks, 1),
            'avg_tasks_per_circuit': total_tasks / max(len(self.circuits), 1)
        }

# =============================================================================
# INITIALIZATION COMPLETE
# =============================================================================

print("🔧 Tutorial 8 initialization complete!")
print("✅ All classes loaded successfully:")
print("   - CircuitConnection for agent connectivity")
print("   - CircuitTask for workflow data")
print("   - CircuitAgent and SpecializedAgent for circuit processing")
print("   - Circuit for network coordination")
print("   - CircuitBuilder for common patterns")
print("   - CircuitManager for system orchestration")
print()
print("🚀 Ready to build collaborative agent networks!")
print()

🔧 Tutorial 8 initialization complete!
✅ All classes loaded successfully:
   - CircuitConnection for agent connectivity
   - CircuitTask for workflow data
   - CircuitAgent and SpecializedAgent for circuit processing
   - Circuit for network coordination
   - CircuitBuilder for common patterns
   - CircuitManager for system orchestration

🚀 Ready to build collaborative agent networks!



In [4]:
# DEMO SECTION: Let's build our first agent circuits!
# =============================================================================

print("=" * 54)
print("🚀 Tutorial 8: Circuit Design - Your First Agent Network")
print("=" * 54)
print()


🚀 Tutorial 8: Circuit Design - Your First Agent Network



In [5]:
# Step 1: Create a simple sequential circuit
print("📝 Step 1: Creating a simple sequential processing circuit...")

# Build a data processing pipeline
sequential_circuit = CircuitBuilder.create_sequential_circuit(
    name="data_pipeline",
    agent_configs=[
        {'name': 'validator', 'role': 'validator', 'specialization': 'text'},
        {'name': 'cleaner', 'role': 'transformer', 'specialization': 'text'},
        {'name': 'analyzer', 'role': 'analyzer', 'specialization': 'text'},
        {'name': 'formatter', 'role': 'transformer', 'specialization': 'json'}
    ]
)

print(f"   ✅ Created sequential circuit with {len(sequential_circuit.agents)} agents")

# Display circuit topology
topology_info = sequential_circuit.get_topology_info()
print("   Circuit structure:")
for stage, agents in topology_info['stages'].items():
    print(f"     {stage}: {agents}")

print("   Connections:")
for conn in topology_info['connections']:
    print(f"     {conn['source']} → {conn['target']} ({conn['type']})")

print()


📝 Step 1: Creating a simple sequential processing circuit...
🔌 Created Circuit: data_pipeline (sequential, sync)
🔧 Created CircuitAgent: validator (validator, text)
➕ Added agent validator to circuit data_pipeline (stage: stage_0)
🔧 Created CircuitAgent: cleaner (transformer, text)
➕ Added agent cleaner to circuit data_pipeline (stage: stage_1)
🔧 Created CircuitAgent: analyzer (analyzer, text)
➕ Added agent analyzer to circuit data_pipeline (stage: stage_2)
🔧 Created CircuitAgent: formatter (transformer, json)
➕ Added agent formatter to circuit data_pipeline (stage: stage_3)
🔗 Connected validator → cleaner (sequential)
🔗 Connected cleaner → analyzer (sequential)
🔗 Connected analyzer → formatter (sequential)
🔗 Created sequential circuit 'data_pipeline' with 4 agents
   ✅ Created sequential circuit with 4 agents
   Circuit structure:
     stage_0: ['validator']
     stage_1: ['cleaner']
     stage_2: ['analyzer']
     stage_3: ['formatter']
   Connections:
     validator → cleaner (seque

In [6]:
# Step 2: Create a parallel processing circuit
print("📝 Step 2: Creating a parallel processing circuit...")

parallel_circuit = CircuitBuilder.create_parallel_circuit(
    name="parallel_processor",
    input_config={'name': 'distributor', 'role': 'filter', 'specialization': 'general'},
    parallel_configs=[
        {'name': 'worker_1', 'role': 'processor', 'specialization': 'fast'},
        {'name': 'worker_2', 'role': 'processor', 'specialization': 'analytical'},
        {'name': 'worker_3', 'role': 'processor', 'specialization': 'complex'}
    ],
    output_config={'name': 'aggregator', 'role': 'aggregator', 'specialization': 'concat'}
)

print(f"   ✅ Created parallel circuit with {len(parallel_circuit.agents)} agents")

# Display parallel topology
parallel_topology = parallel_circuit.get_topology_info()
print("   Parallel circuit structure:")
for stage, agents in parallel_topology['stages'].items():
    print(f"     {stage}: {agents}")

print()


📝 Step 2: Creating a parallel processing circuit...
🔌 Created Circuit: parallel_processor (parallel, async)
🔧 Created CircuitAgent: distributor (filter, general)
➕ Added agent distributor to circuit parallel_processor (stage: input)
🔧 Created CircuitAgent: worker_1 (processor, fast)
➕ Added agent worker_1 to circuit parallel_processor (stage: parallel)
🔧 Created CircuitAgent: worker_2 (processor, analytical)
➕ Added agent worker_2 to circuit parallel_processor (stage: parallel)
🔧 Created CircuitAgent: worker_3 (processor, complex)
➕ Added agent worker_3 to circuit parallel_processor (stage: parallel)
🔧 Created CircuitAgent: aggregator (aggregator, concat)
➕ Added agent aggregator to circuit parallel_processor (stage: output)
🔗 Connected distributor → worker_1 (distribute)
🔗 Connected distributor → worker_2 (distribute)
🔗 Connected distributor → worker_3 (distribute)
🔗 Connected worker_1 → aggregator (aggregate)
🔗 Connected worker_2 → aggregator (aggregate)
🔗 Connected worker_3 → aggreg

In [7]:
# Step 3: Create a specialized analysis pipeline
print("📝 Step 3: Creating a specialized content analysis pipeline...")

analysis_circuit = CircuitBuilder.create_analysis_pipeline("content_analysis")

print(f"   ✅ Created analysis pipeline with {len(analysis_circuit.agents)} agents")
print()


📝 Step 3: Creating a specialized content analysis pipeline...
🔌 Created Circuit: content_analysis (sequential, sync)
🔧 Created CircuitAgent: validator (validator, general)
➕ Added agent validator to circuit content_analysis (stage: stage_0)
🔧 Created CircuitAgent: preprocessor (transformer, text)
➕ Added agent preprocessor to circuit content_analysis (stage: stage_1)
🔧 Created CircuitAgent: analyzer (analyzer, text)
➕ Added agent analyzer to circuit content_analysis (stage: stage_2)
🔧 Created CircuitAgent: summarizer (aggregator, concat)
➕ Added agent summarizer to circuit content_analysis (stage: stage_3)
🔗 Connected validator → preprocessor (sequential)
🔗 Connected preprocessor → analyzer (sequential)
🔗 Connected analyzer → summarizer (sequential)
🔗 Created sequential circuit 'content_analysis' with 4 agents
   ✅ Created analysis pipeline with 4 agents



In [8]:
# Step 4: Set up circuit manager and start circuits
print("📝 Step 4: Setting up circuit manager and starting circuits...")

circuit_manager = CircuitManager()

# Register all circuits
circuits = [sequential_circuit, parallel_circuit, analysis_circuit]
for circuit in circuits:
    circuit_manager.register_circuit(circuit)

# Start all circuits
started = circuit_manager.start_all_circuits()
print(f"   ✅ Started {started} circuits")

# Wait for circuits to stabilize
time.sleep(0.5)
print()

📝 Step 4: Setting up circuit manager and starting circuits...
🏭 CircuitManager initialized
📋 Registered circuit: data_pipeline
📋 Registered circuit: parallel_processor
📋 Registered circuit: content_analysis
▶️  Started CircuitAgent: validator
▶️  Started CircuitAgent: cleaner
▶️  Started CircuitAgent: analyzer
▶️  Started CircuitAgent: formatter
🔄 Circuit data_pipeline coordination loop started
▶️  Started circuit: data_pipeline with 4 agents
▶️  Started CircuitAgent: distributor
▶️  Started CircuitAgent: worker_1
▶️  Started CircuitAgent: worker_2
▶️  Started CircuitAgent: worker_3
▶️  Started CircuitAgent: aggregator
🔄 Circuit parallel_processor coordination loop started
▶️  Started circuit: parallel_processor with 5 agents
▶️  Started CircuitAgent: validator
▶️  Started CircuitAgent: preprocessor
▶️  Started CircuitAgent: analyzer
▶️  Started CircuitAgent: summarizer
🔄 Circuit content_analysis coordination loop started
▶️  Started circuit: content_analysis with 4 agents
▶️  Started 

In [9]:
# Step 5: Test sequential circuit with tasks
print("📝 Step 5: Testing sequential circuit with sample tasks...")

test_data = [
    "Hello world! This is a test message.",
    "   Another piece of text with extra spaces   ",
    "Short text",
    "This is a longer piece of text that contains more information and should be processed through our sequential pipeline to demonstrate the circuit functionality."
]

sequential_results = []
for i, data in enumerate(test_data):
    print(f"   Submitting task {i+1}: '{data[:30]}{'...' if len(data) > 30 else ''}'")
    task_id = sequential_circuit.submit_task(data, f"test_task_{i}")

    # Get result with timeout
    result = sequential_circuit.get_result(timeout=2.0)
    if result:
        task_id, processed_data = result
        sequential_results.append(processed_data)
        print(f"   ✅ Result {i+1}: {type(processed_data).__name__}")
    else:
        print(f"   ⏰ Task {i+1} timed out")

print(f"   Processed {len(sequential_results)}/{len(test_data)} tasks successfully")
print()


📝 Step 5: Testing sequential circuit with sample tasks...
   Submitting task 1: 'Hello world! This is a test me...'
📥 Submitted task d9faaa5a... to circuit data_pipeline
✅ Circuit data_pipeline completed task d9faaa5a... in 0.000s
   ✅ Result 1: str
   Submitting task 2: '   Another piece of text with ...'
📥 Submitted task 59b6a677... to circuit data_pipeline
✅ Circuit data_pipeline completed task 59b6a677... in 0.000s
   ✅ Result 2: str
   Submitting task 3: 'Short text'
📥 Submitted task 5be5d811... to circuit data_pipeline
✅ Circuit data_pipeline completed task 5be5d811... in 0.000s
   ✅ Result 3: str
   Submitting task 4: 'This is a longer piece of text...'
📥 Submitted task 01a879fe... to circuit data_pipeline
✅ Circuit data_pipeline completed task 01a879fe... in 0.000s
   ✅ Result 4: str
   Processed 4/4 tasks successfully



In [10]:
# Step 6: Test parallel circuit with concurrent tasks
print("📝 Step 6: Testing parallel circuit with concurrent processing...")

parallel_test_data = [
    "Parallel task 1",
    "Parallel task 2",
    "Parallel task 3",
    "Parallel task 4"
]

print(f"   Submitting {len(parallel_test_data)} tasks concurrently...")
task_ids = []
for i, data in enumerate(parallel_test_data):
    task_id = parallel_circuit.submit_task(data, f"parallel_task_{i}")
    task_ids.append(task_id)

# Collect results
parallel_results = []
for i in range(len(task_ids)):
    result = parallel_circuit.get_result(timeout=2.0)
    if result:
        task_id, processed_data = result
        parallel_results.append(processed_data)
        print(f"   ✅ Collected result {i+1}")
    else:
        print(f"   ⏰ Result {i+1} timed out")

print(f"   Collected {len(parallel_results)}/{len(task_ids)} results")
print()


📝 Step 6: Testing parallel circuit with concurrent processing...
   Submitting 4 tasks concurrently...
📥 Submitted task 3e4d9cac... to circuit parallel_processor
📥 Submitted task 4cbefb76... to circuit parallel_processor
📥 Submitted task f8791750... to circuit parallel_processor
📥 Submitted task 2d309a83... to circuit parallel_processor
✅ Circuit parallel_processor completed task 3e4d9cac... in 0.011s
   ✅ Collected result 1
✅ Circuit parallel_processor completed task 4cbefb76... in 0.011s
   ✅ Collected result 2
✅ Circuit parallel_processor completed task f8791750... in 0.011s
   ✅ Collected result 3
✅ Circuit parallel_processor completed task 2d309a83... in 0.011s
   ✅ Collected result 4
   Collected 4/4 results



In [11]:
# Step 7: Performance analysis and monitoring
print("📝 Step 7: Analyzing circuit performance...")

# Get status for all circuits
for circuit in circuits:
    status = circuit.get_status()
    print(f"   Circuit: {status['name']}")
    print(f"     Topology: {status['topology']}")
    print(f"     Tasks completed: {status['tasks_completed']}")
    print(f"     Average processing time: {status['avg_processing_time']:.3f}s")
    print(f"     Average throughput: {status['avg_throughput']:.2f} tasks/sec")
    print(f"     Errors: {status['errors']}")
    print()

# System-wide status
system_status = circuit_manager.get_system_status()
print("   System-wide performance:")
for key, value in system_status.items():
    if isinstance(value, float):
        print(f"     {key}: {value:.3f}")
    else:
        print(f"     {key}: {value}")

print()

📝 Step 7: Analyzing circuit performance...
   Circuit: data_pipeline
     Topology: sequential
     Tasks completed: 4
     Average processing time: 0.000s
     Average throughput: 3547.95 tasks/sec
     Errors: 0

   Circuit: parallel_processor
     Topology: parallel
     Tasks completed: 12
     Average processing time: 0.119s
     Average throughput: 36.87 tasks/sec
     Errors: 0

   Circuit: content_analysis
     Topology: sequential
     Tasks completed: 0
     Average processing time: 0.000s
     Average throughput: 0.00 tasks/sec
     Errors: 0

   System-wide performance:
     uptime: 48.304
     total_circuits: 3
     running_circuits: 3
     total_agents: 13
     total_tasks_completed: 16
     total_errors: 0
     error_rate: 0.000
     avg_tasks_per_circuit: 5.333



In [12]:
# Step 8: Advanced circuit features demonstration
print("📝 Step 8: Demonstrating advanced circuit features...")

# Create a custom circuit with specialized connections
print("   Creating custom circuit with transformation functions...")

custom_circuit = Circuit(name="custom_transform", topology=CircuitTopology.SEQUENTIAL)

# Create specialized agents
input_agent = SpecializedAgent(name="input", role="processor", specialization="general")
transform_agent = SpecializedAgent(name="transformer", role="transformer", specialization="numeric")
output_agent = SpecializedAgent(name="output", role="aggregator", specialization="sum")

# Add agents to circuit
custom_circuit.add_agent(input_agent, "input")
custom_circuit.add_agent(transform_agent, "transform")
custom_circuit.add_agent(output_agent, "output")

# Add connections with transformation functions
def extract_numbers(data):
    """Extract numbers from text"""
    import re
    numbers = re.findall(r'\d+', str(data))
    return [int(n) for n in numbers] if numbers else [0]

def multiply_by_two(data):
    """Multiply numeric data by 2"""
    if isinstance(data, list):
        return [x * 2 for x in data]
    return data

custom_circuit.add_connection(
    input_agent.id,
    transform_agent.id,
    "extract_transform",
    transform_function=extract_numbers
)

custom_circuit.add_connection(
    transform_agent.id,
    output_agent.id,
    "multiply_transform",
    transform_function=multiply_by_two
)

# Register and start custom circuit
circuit_manager.register_circuit(custom_circuit)
circuit_manager.start_circuit("custom_transform")

print("   ✅ Created custom circuit with transformation functions")

# Test custom circuit
custom_test_data = [
    "I have 5 apples and 3 oranges",
    "The temperature is 25 degrees",
    "There are 100 people in the room"
]

print("   Testing custom transformations...")
for i, data in enumerate(custom_test_data):
    task_id = custom_circuit.submit_task(data)
    result = custom_circuit.get_result(timeout=1.0)
    if result:
        task_id, processed_data = result
        print(f"     '{data}' → {processed_data}")

print()

📝 Step 8: Demonstrating advanced circuit features...
   Creating custom circuit with transformation functions...
🔌 Created Circuit: custom_transform (sequential, sync)
🔧 Created CircuitAgent: input (processor, general)
🔧 Created CircuitAgent: transformer (transformer, numeric)
🔧 Created CircuitAgent: output (aggregator, sum)
➕ Added agent input to circuit custom_transform (stage: input)
➕ Added agent transformer to circuit custom_transform (stage: transform)
➕ Added agent output to circuit custom_transform (stage: output)
🔗 Connected input → transformer (extract_transform)
🔗 Connected transformer → output (multiply_transform)
📋 Registered circuit: custom_transform
▶️  Started CircuitAgent: input
▶️  Started CircuitAgent: transformer
▶️  Started CircuitAgent: output
🔄 Circuit custom_transform coordination loop started
▶️  Started circuit: custom_transform with 3 agents
   ✅ Created custom circuit with transformation functions
   Testing custom transformations...
📥 Submitted task 6655edb

In [13]:
# Step 9: Visualization of circuit networks
print("📝 Step 9: Creating circuit visualization...")

try:
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots

    # Create circuit network visualization
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            'Sequential Circuit Topology',
            'Parallel Circuit Topology',
            'Circuit Performance Comparison',
            'Task Processing Timeline'
        ),
        specs=[[{"type": "scatter"}, {"type": "scatter"}],
               [{"type": "bar"}, {"type": "scatter"}]]
    )

    # Sequential circuit topology
    seq_agents = list(sequential_circuit.agents.values())
    seq_x = list(range(len(seq_agents)))
    seq_y = [0] * len(seq_agents)
    seq_names = [agent.name for agent in seq_agents]

    # Add nodes
    fig.add_trace(
        go.Scatter(
            x=seq_x, y=seq_y,
            mode='markers+text',
            marker=dict(size=20, color='lightblue'),
            text=seq_names,
            textposition='top center',
            name='Sequential Agents',
            showlegend=False
        ),
        row=1, col=1
    )

    # Parallel circuit topology
    par_agents = list(parallel_circuit.agents.values())

    # Position agents: input at left, parallel in middle, output at right
    input_agents = [a for a in par_agents if a.stage_name == "input"]
    parallel_agents = [a for a in par_agents if a.stage_name == "parallel"]
    output_agents = [a for a in par_agents if a.stage_name == "output"]

    par_x = []
    par_y = []
    par_names = []

    # Input agents
    for i, agent in enumerate(input_agents):
        par_x.append(0)
        par_y.append(0)
        par_names.append(agent.name)

    # Parallel agents
    for i, agent in enumerate(parallel_agents):
        par_x.append(1)
        par_y.append(i - len(parallel_agents)//2)
        par_names.append(agent.name)

    # Output agents
    for i, agent in enumerate(output_agents):
        par_x.append(2)
        par_y.append(0)
        par_names.append(agent.name)

    fig.add_trace(
        go.Scatter(
            x=par_x, y=par_y,
            mode='markers+text',
            marker=dict(size=20, color='lightgreen'),
            text=par_names,
            textposition='top center',
            name='Parallel Agents',
            showlegend=False
        ),
        row=1, col=2
    )

    # Performance comparison
    circuit_names = [c.name for c in circuits]
    tasks_completed = [c.tasks_completed for c in circuits]

    fig.add_trace(
        go.Bar(
            x=circuit_names,
            y=tasks_completed,
            name='Tasks Completed',
            marker_color='lightcoral',
            text=tasks_completed,
            textposition='auto'
        ),
        row=2, col=1
    )

    # Processing timeline (simulated)
    timeline_x = list(range(10))
    timeline_y = [i * 1.2 + 1 for i in timeline_x]  # Simulated cumulative tasks

    fig.add_trace(
        go.Scatter(
            x=timeline_x,
            y=timeline_y,
            mode='lines+markers',
            name='Tasks Over Time',
            line=dict(color='green', width=3),
            marker=dict(size=8)
        ),
        row=2, col=2
    )

    # Update layout
    fig.update_layout(
        title_text="Circuit Network Analysis Dashboard",
        height=800,
        showlegend=True,
        template='plotly_white'
    )

    # Update axes
    fig.update_xaxes(title_text="Agent Sequence", row=1, col=1)
    fig.update_yaxes(title_text="", row=1, col=1)

    fig.update_xaxes(title_text="Stage", row=1, col=2)
    fig.update_yaxes(title_text="Parallel Position", row=1, col=2)

    fig.update_xaxes(title_text="Circuits", row=2, col=1)
    fig.update_yaxes(title_text="Tasks Completed", row=2, col=1)

    fig.update_xaxes(title_text="Time", row=2, col=2)
    fig.update_yaxes(title_text="Cumulative Tasks", row=2, col=2)

    fig.show()

    print("   ✅ Circuit network visualization created!")
    print("   📊 The dashboard shows:")
    print("      - Sequential circuit topology and agent flow")
    print("      - Parallel circuit structure and connections")
    print("      - Performance comparison across circuits")
    print("      - Task processing timeline")
    print()

except ImportError:
    print("   ⚠️  Plotly not available - skipping visualization")
    print("   💡 To see circuit visualizations, install: pip install plotly")
    print("   📊 Circuit summary:")
    print(f"      Total circuits: {len(circuits)}")
    print(f"      Total agents: {sum(len(c.agents) for c in circuits)}")
    print(f"      Total tasks processed: {sum(c.tasks_completed for c in circuits)}")
    print()


📝 Step 9: Creating circuit visualization...


   ✅ Circuit network visualization created!
   📊 The dashboard shows:
      - Sequential circuit topology and agent flow
      - Parallel circuit structure and connections
      - Performance comparison across circuits
      - Task processing timeline



In [14]:
# Step 10: Cleanup and final analysis
print("📝 Step 10: Final analysis and cleanup...")

# Final performance summary
print("   Final circuit performance summary:")
for circuit in circuits + [custom_circuit]:
    status = circuit.get_status()
    efficiency = status['tasks_completed'] / max(status.get('avg_processing_time', 1), 0.001)
    print(f"     {status['name']}:")
    print(f"       Tasks: {status['tasks_completed']}, Time: {status.get('avg_processing_time', 0):.3f}s")
    print(f"       Efficiency: {efficiency:.1f} tasks/second")

# Agent specialization analysis
print("\n   Agent specialization effectiveness:")
all_agents = []
for circuit in circuits + [custom_circuit]:
    all_agents.extend(circuit.agents.values())

specialization_stats = defaultdict(lambda: {'count': 0, 'total_tasks': 0, 'total_time': 0})
for agent in all_agents:
    stats = specialization_stats[agent.specialization]
    stats['count'] += 1
    stats['total_tasks'] += agent.tasks_processed
    stats['total_time'] += agent.processing_time_total

for spec, stats in specialization_stats.items():
    avg_time = stats['total_time'] / max(stats['total_tasks'], 1)
    print(f"     {spec}: {stats['count']} agents, {stats['total_tasks']} tasks, {avg_time:.3f}s avg")

# Stop all circuits
print("\n   Stopping all circuits...")
stopped = circuit_manager.stop_all_circuits()
print(f"   ✅ Stopped {stopped} circuits")

print("\n✅ Tutorial 9 Complete!")
print()


📝 Step 10: Final analysis and cleanup...
   Final circuit performance summary:
     data_pipeline:
       Tasks: 4, Time: 0.000s
       Efficiency: 4000.0 tasks/second
     parallel_processor:
       Tasks: 12, Time: 0.119s
       Efficiency: 100.8 tasks/second
     content_analysis:
       Tasks: 0, Time: 0.000s
       Efficiency: 0.0 tasks/second
     custom_transform:
       Tasks: 3, Time: 0.021s
       Efficiency: 145.6 tasks/second

   Agent specialization effectiveness:
     text: 5 agents, 12 tasks, 0.000s avg
     json: 1 agents, 4 tasks, 0.000s avg
     general: 3 agents, 7 tasks, 0.009s avg
     fast: 1 agents, 4 tasks, 0.010s avg
     analytical: 1 agents, 4 tasks, 0.050s avg
     complex: 1 agents, 4 tasks, 0.100s avg
     concat: 2 agents, 12 tasks, 0.000s avg
     numeric: 1 agents, 3 tasks, 0.000s avg
     sum: 1 agents, 3 tasks, 0.000s avg

   Stopping all circuits...
🛑 Circuit data_pipeline coordination loop stopped
⏹️  Stopped CircuitAgent: validator
⏹️  Stopped Circ

In [16]:
# SUMMARY OF WHAT WE LEARNED
# =============================================================================

print("📚 WHAT WE LEARNED:")

print("=" * 40)

print("1. 🔌 Built a comprehensive circuit framework")

print("   - Circuit topologies (sequential, parallel, hierarchical)")

print("   - Agent specialization and role-based processing")

print("   - Connection management with transformation functions")

print("   - Task routing and workflow coordination")

print()

print("2. 🏗️  Implemented circuit design patterns")

print("   - CircuitBuilder for common topologies")

print("   - Template-based circuit creation")

print("   - Specialized agent roles and capabilities")

print("   - Dynamic connection configuration")

print()

print("3. 🚀 Created production-ready circuit management")

print("   - CircuitManager for system orchestration")

print("   - Multi-circuit coordination and monitoring")

print("   - Performance tracking and optimization")

print("   - Scalable agent network architectures")

print()

print("4. 📊 Added advanced circuit features")

print("   - Real-time performance monitoring")

print("   - Task transformation and filtering")

print("   - Circuit topology visualization")

print("   - Comparative performance analysis")

print()


📚 WHAT WE LEARNED:
1. 🔌 Built a comprehensive circuit framework
   - Circuit topologies (sequential, parallel, hierarchical)
   - Agent specialization and role-based processing
   - Connection management with transformation functions
   - Task routing and workflow coordination

2. 🏗️  Implemented circuit design patterns
   - CircuitBuilder for common topologies
   - Template-based circuit creation
   - Specialized agent roles and capabilities
   - Dynamic connection configuration

3. 🚀 Created production-ready circuit management
   - CircuitManager for system orchestration
   - Multi-circuit coordination and monitoring
   - Performance tracking and optimization
   - Scalable agent network architectures

4. 📊 Added advanced circuit features
   - Real-time performance monitoring
   - Task transformation and filtering
   - Circuit topology visualization
   - Comparative performance analysis



In [17]:
# COMMON ERRORS AND SOLUTIONS
# =============================================================================

print("⚠️  COMMON ERRORS AND SOLUTIONS:")

print("=" * 40)

print("1. 🐛 Tasks getting stuck in circuits")

print("   Problem: Deadlocks or missing connections")

print("   Solution: Verify all agents have proper input/output connections")

print("   Solution: Add timeout handling for stuck tasks")

print("   Solution: Use circuit topology validation")

print()

print("2. 🐛 Poor circuit performance")

print("   Problem: Bottlenecks or inefficient agent placement")

print("   Solution: Profile individual agent performance")

print("   Solution: Balance load across parallel agents")

print("   Solution: Optimize specialization assignments")

print()

print("3. 🐛 Connection transformation errors")

print("   Problem: Transform functions failing with unexpected data")

print("   Solution: Add robust error handling in transform functions")

print("   Solution: Validate data types before transformation")

print("   Solution: Provide fallback behavior for edge cases")

print()

print("4. 🐛 Circuit startup/shutdown issues")

print("   Problem: Agents not starting in proper order")

print("   Solution: Implement proper dependency ordering")

print("   Solution: Add startup verification and health checks")

print("   Solution: Use graceful shutdown with cleanup")

print()

print("5. 🐛 Memory leaks in long-running circuits")

print("   Problem: Tasks accumulating without cleanup")

print("   Solution: Implement task timeout and cleanup mechanisms")

print("   Solution: Monitor queue sizes and clear old tasks")

print("   Solution: Use bounded queues with overflow handling")

print()

print("6. 🐛 Agent specialization mismatches")

print("   Problem: Wrong agent types for specific tasks")

print("   Solution: Design clear agent role specifications")

print("   Solution: Add task-agent compatibility checking")

print("   Solution: Implement dynamic agent selection")

print()

print("🎉 Ready for Tutorial 10: Memory Systems!")
print("   Next we'll explore advanced memory architectures...")
print("\n🌟 You've now mastered:")
print("   • Individual agent intelligence and communication")
print("   • Multi-agent networks and collaborative processing")
print("   • Production monitoring, configuration, and management")
print("   • Structured agent circuits for complex workflows")
print("\n🚀 Ready to build enterprise-scale agent systems!")

⚠️  COMMON ERRORS AND SOLUTIONS:
1. 🐛 Tasks getting stuck in circuits
   Problem: Deadlocks or missing connections
   Solution: Verify all agents have proper input/output connections
   Solution: Add timeout handling for stuck tasks
   Solution: Use circuit topology validation

2. 🐛 Poor circuit performance
   Problem: Bottlenecks or inefficient agent placement
   Solution: Profile individual agent performance
   Solution: Balance load across parallel agents
   Solution: Optimize specialization assignments

3. 🐛 Connection transformation errors
   Problem: Transform functions failing with unexpected data
   Solution: Add robust error handling in transform functions
   Solution: Validate data types before transformation
   Solution: Provide fallback behavior for edge cases

4. 🐛 Circuit startup/shutdown issues
   Problem: Agents not starting in proper order
   Solution: Implement proper dependency ordering
   Solution: Add startup verification and health checks
   Solution: Use graceful

🔒 INTELLECTUAL PROPERTY & LICENSE NOTICE

This tutorial and its contents — including code, architecture, narrative examples, and educational structure — are the intellectual property of Shalini Ananda, PhD and part of the Neuron Framework under a Modified MIT License with Attribution.

Commercial use, redistribution, or derivative works must include clear and visible attribution to the original author.
Use in products, consulting engagements, or educational materials must reference this repository and author name.
Removal of author credit or misrepresentation of origin constitutes a violation of the license and may trigger legal action.
You may not white-label, obfuscate, or rebrand this work without explicit, written permission.
Use of this tutorial in Colab or any other platform implies agreement with these terms.

📘 License: LICENSE.md
📌 Notice: NOTICE.md
🧠 Author: Shalini Ananda, PhD