# Multi-Agent Communication and Coordination with AILF ZeroMQ Utilities

This notebook demonstrates how multiple agents can communicate and coordinate using the `ailf.messaging.zmq` utilities to accomplish a common, distributed task. We'll implement a simple pipeline pattern:

1.  **Task Generator Agent**: Creates tasks and distributes them.
2.  **Task Processor Agent(s)**: Receive tasks, perform simulated work, and send back results.
3.  **Result Collector Agent**: Gathers results from the processor agents.

This example uses ZMQ's PUSH/PULL socket types via the `ZMQPush` and `ZMQPull` classes from `ailf.messaging.zmq`, which are well-suited for task distribution and collection pipelines.

## 1. Setup

Import necessary libraries. We'll use `ailf.messaging.zmq` utilities for ZeroMQ communication patterns, along with `threading` to run agents concurrently, `time` for simulating work, and `uuid` for unique task IDs.

In [None]:
import time
import threading
import uuid
import json
from typing import Dict, Any, Union

# Try to import ailf.messaging; fall back to raw zmq if not installed
try:
    import zmq
    from ailf.messaging import (
        ZMQPush, 
        ZMQPull, 
        ZMQPublisher, 
        ZMQSubscriber
    )
    print(f"Using AILF Messaging ZMQ utilities with PyZMQ version: {zmq.pyzmq_version()}")
    using_ailf = True
except ImportError:
    # Fallback to raw ZMQ if AILF is not installed
    import zmq
    print(f"AILF Messaging not found. Using raw PyZMQ version: {zmq.pyzmq_version()}")
    print(f"To install AILF Messaging, run: pip install -e /workspaces/template-python-dev")
    using_ailf = False

In [None]:
def create_zmq_push(context=None, use_ailf=True):
    """Create a PUSH socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf:
        return ZMQPush(context)
    else:
        socket = context.socket(zmq.PUSH) if context else zmq.Context.instance().socket(zmq.PUSH)
        return socket

def create_zmq_pull(context=None, use_ailf=True):
    """Create a PULL socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf:
        return ZMQPull(context)
    else:
        socket = context.socket(zmq.PULL) if context else zmq.Context.instance().socket(zmq.PULL)
        return socket
        
def create_zmq_pub(context=None, use_ailf=True):
    """Create a PUB socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf:
        return ZMQPublisher(context)
    else:
        socket = context.socket(zmq.PUB) if context else zmq.Context.instance().socket(zmq.PUB)
        return socket
        
def create_zmq_sub(context=None, use_ailf=True):
    """Create a SUB socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf:
        return ZMQSubscriber(context)
    else:
        socket = context.socket(zmq.SUB) if context else zmq.Context.instance().socket(zmq.SUB)
        return socket
        
def send_json(socket, data, use_ailf=True):
    """Send JSON data using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'push_json'):
        socket.push_json(data)
    elif use_ailf and using_ailf and hasattr(socket, 'publish'):
        socket.publish('', data)  # Empty topic for broadcast
    elif hasattr(socket, 'send_json'):
        socket.send_json(data)
    else:
        socket.send(json.dumps(data).encode('utf-8'))
        
def recv_json(socket, use_ailf=True):
    """Receive JSON data using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'pull_json'): # For ZMQPull
        return socket.pull_json()
    elif use_ailf and using_ailf and hasattr(socket, 'receive'): # For ZMQSubscriber
        # AILF's ZMQSubscriber.receive() returns (topic, message)
        # The message might already be deserialized by AILF if it was JSON.
        _topic, received_message = socket.receive()
        
        if isinstance(received_message, (dict, list)):
            # Message is already a Python dict/list (deserialized by AILF)
            return received_message
        elif isinstance(received_message, str):
            # Message is a string, attempt to parse as JSON
            try:
                return json.loads(received_message)
            except json.JSONDecodeError as e:
                raise ValueError(f"Failed to decode JSON string from AILF subscriber: {received_message}") from e
        elif isinstance(received_message, bytes):
            # Message is bytes, attempt to decode UTF-8 then parse as JSON
            try:
                return json.loads(received_message.decode('utf-8'))
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                raise ValueError(f"Failed to decode JSON bytes from AILF subscriber: {received_message}") from e
        else:
            # Unexpected message type
            raise TypeError(f"Unexpected message type from AILF subscriber: {type(received_message)}, value: {received_message}")
    elif hasattr(socket, 'recv_json'): # Raw ZMQ with built-in json support
        return socket.recv_json()
    else: # Raw ZMQ, manual deserialization
        data = socket.recv()
        try:
            return json.loads(data.decode('utf-8'))
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            raise ValueError(f"Failed to decode JSON from raw ZMQ socket: {data}") from e
        
def bind_socket(socket, address, use_ailf=True):
    """Bind socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'bind'):
        socket.bind(address)
    else:
        socket.bind(address)
        
def connect_socket(socket, address, use_ailf=True):
    """Connect socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'connect'):
        socket.connect(address)
    else:
        socket.connect(address)
        
def subscribe_topic(socket, topic, use_ailf=True):
    """Subscribe to topic using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'subscribe'):
        socket.subscribe(topic)
    else:
        socket.setsockopt(zmq.SUBSCRIBE, topic.encode('utf-8') if isinstance(topic, str) else topic)
        
def close_socket(socket, use_ailf=True):
    """Close socket using either AILF or raw ZMQ."""
    if use_ailf and using_ailf and hasattr(socket, 'close'):
        socket.close()
    elif hasattr(socket, 'close'):
        socket.close()

## 2. ZMQ Configuration and Message Structures

Define the network addresses for our ZMQ sockets and simple structures for messages.

In [None]:
# Addresses for ZMQ sockets
TASK_VENTILATOR_ADDR = "tcp://127.0.0.1:5557"  # For sending tasks from generator to processors
RESULT_COLLECTOR_ADDR = "tcp://127.0.0.1:5558" # For sending results from processors to collector
CONTROL_SYNC_ADDR = "tcp://127.0.0.1:5559" # For synchronizing start of workers

# Message structure examples (can be formalized with Pydantic if needed)
def create_task_message(data: Any) -> Dict[str, Any]:
    return {"task_id": str(uuid.uuid4()), "data": data, "timestamp": time.time()}

def create_result_message(task_id: str, result_data: Any, worker_id: str) -> Dict[str, Any]:
    return {"task_id": task_id, "result_data": result_data, "worker_id": worker_id, "timestamp": time.time()}

## 3. Agent Implementations

We'll define wrapper functions to handle the two different implementations (AILF ZMQ or raw ZMQ), and then define our agent functions.

### 3.1 Task Generator Agent

This agent generates a specified number of tasks and PUSHes them to the Task Processor agents.

In [None]:
def task_generator_agent(num_tasks: int, num_workers: int, use_ailf: bool = True):
    """Generates tasks and sends them to processor agents."""
    context = zmq.Context()
    
    # Socket to send messages on (PUSH)
    ventilator_socket = create_zmq_push(context, use_ailf)
    bind_socket(ventilator_socket, TASK_VENTILATOR_ADDR, use_ailf)
    print(f"[Generator] Task ventilator bound to {TASK_VENTILATOR_ADDR}")

    # Socket for worker synchronization (PUB)
    sync_socket = create_zmq_pub(context, use_ailf)
    bind_socket(sync_socket, CONTROL_SYNC_ADDR, use_ailf)
    print(f"[Generator] Sync service bound to {CONTROL_SYNC_ADDR}")

    # Wait for all workers to connect (simple sync)
    print(f"[Generator] Waiting for {num_workers} worker(s) to be ready...")
    time.sleep(num_workers * 0.5 + 1) # Give workers time to start and connect
    
    # Send synchronization signal to all workers
    print("[Generator] Sending start signal to workers.")
    if use_ailf and using_ailf and hasattr(sync_socket, 'publish'):
        sync_socket.publish("START", "")
    else:
        sync_socket.send_string("START")

    print(f"[Generator] Starting to send {num_tasks} tasks...")
    for i in range(num_tasks):
        task_data = f"Task_{i+1}_payload"
        message = create_task_message(task_data)
        send_json(ventilator_socket, message, use_ailf)
        print(f"[Generator] Sent task: {message['task_id']} ({task_data})")
        time.sleep(0.1)  # Simulate some delay between sending tasks
    
    # Send a sentinel value to indicate end of tasks for each worker
    for _ in range(num_workers):
        send_json(ventilator_socket, {"task_id": "END_OF_TASKS", "data": None}, use_ailf)
        print("[Generator] Sent END_OF_TASKS signal.")

    print("[Generator] All tasks sent. Closing sockets.")
    close_socket(ventilator_socket, use_ailf)
    close_socket(sync_socket, use_ailf)
    context.term()
    print("[Generator] Finished.")

### 3.2 Task Processor Agent

This agent (of which there can be multiple instances) PULLs tasks, simulates processing, and PUSHes results.

In [None]:
def task_processor_agent(worker_id: str, use_ailf: bool = True):
    """Receives tasks, processes them, and sends results."""
    context = zmq.Context()
    
    # Socket to receive messages from (PULL)
    receiver_socket = create_zmq_pull(context, use_ailf)
    connect_socket(receiver_socket, TASK_VENTILATOR_ADDR, use_ailf)
    print(f"[Worker-{worker_id}] Connected to task ventilator at {TASK_VENTILATOR_ADDR}")

    # Socket to send results to (PUSH)
    results_sender_socket = create_zmq_push(context, use_ailf)
    connect_socket(results_sender_socket, RESULT_COLLECTOR_ADDR, use_ailf)
    print(f"[Worker-{worker_id}] Connected to result collector at {RESULT_COLLECTOR_ADDR}")

    # Socket to receive synchronization signal (SUB)
    sync_subscriber = create_zmq_sub(context, use_ailf)
    connect_socket(sync_subscriber, CONTROL_SYNC_ADDR, use_ailf)
    subscribe_topic(sync_subscriber, "START", use_ailf)
    print(f"[Worker-{worker_id}] Subscribed to sync service at {CONTROL_SYNC_ADDR}")

    # Wait for the start signal
    print(f"[Worker-{worker_id}] Waiting for start signal...")
    if use_ailf and using_ailf and hasattr(sync_subscriber, 'receive'):
        topic, _ = sync_subscriber.receive()
        sync_message = topic  # In AILF, the topic is the message for simple string messages
    else:
        sync_message = sync_subscriber.recv_string()
        
    if sync_message == "START":
        print(f"[Worker-{worker_id}] Received START signal. Beginning task processing.")
    else:
        print(f"[Worker-{worker_id}] Received unexpected sync message: {sync_message}. Exiting.")
        close_socket(receiver_socket, use_ailf)
        close_socket(results_sender_socket, use_ailf)
        close_socket(sync_subscriber, use_ailf)
        context.term()
        return

    while True:
        try:
            task_message = recv_json(receiver_socket, use_ailf)
            task_id = task_message.get("task_id")
            
            if task_id == "END_OF_TASKS":
                print(f"[Worker-{worker_id}] Received END_OF_TASKS. Shutting down.")
                # Forward the END_OF_TASKS signal to the collector so it knows this worker is done
                send_json(results_sender_socket, {"task_id": "WORKER_DONE", "worker_id": worker_id}, use_ailf)
                break
            
            task_data = task_message.get("data")
            print(f"[Worker-{worker_id}] Received task: {task_id} ({task_data})")
            
            # Simulate work
            processing_time = 0.5 + (int(worker_id) * 0.1) # Vary processing time slightly per worker
            time.sleep(processing_time)
            result_data = f"Result_for_({task_data})_by_Worker-{worker_id}"
            
            result_message = create_result_message(task_id, result_data, worker_id)
            send_json(results_sender_socket, result_message, use_ailf)
            print(f"[Worker-{worker_id}] Sent result for task: {task_id}")
            
        except zmq.error.ContextTerminated:
            print(f"[Worker-{worker_id}] Context terminated, exiting loop.")
            break
        except Exception as e:
            print(f"[Worker-{worker_id}] Error processing task: {e}")
            break # Exit on error for simplicity

    print(f"[Worker-{worker_id}] Closing sockets.")
    close_socket(receiver_socket, use_ailf)
    close_socket(results_sender_socket, use_ailf)
    close_socket(sync_subscriber, use_ailf)
    context.term()
    print(f"[Worker-{worker_id}] Finished.")

### 3.3 Result Collector Agent

This agent PULLs results from the Task Processor agents and prints them.

In [None]:
def result_collector_agent(num_expected_tasks: int, num_workers: int, use_ailf: bool = True):
    """Collects results from processor agents."""
    context = zmq.Context()
    
    # Socket to receive results on (PULL)
    results_receiver_socket = create_zmq_pull(context, use_ailf)
    bind_socket(results_receiver_socket, RESULT_COLLECTOR_ADDR, use_ailf)
    print(f"[Collector] Result collector bound to {RESULT_COLLECTOR_ADDR}")

    collected_results = []
    workers_done_count = 0

    print(f"[Collector] Waiting for results... Expected {num_expected_tasks} task results and {num_workers} worker done signals.")
    
    # We expect num_expected_tasks results + num_workers WORKER_DONE signals
    total_expected_messages = num_expected_tasks + num_workers
    received_messages = 0

    while received_messages < total_expected_messages:
        try:
            result_message = recv_json(results_receiver_socket, use_ailf)
            received_messages += 1
            task_id = result_message.get("task_id")

            if task_id == "WORKER_DONE":
                workers_done_count += 1
                print(f"[Collector] Worker {result_message.get('worker_id')} reported done. ({workers_done_count}/{num_workers} workers done)")
            else:
                collected_results.append(result_message)
                print(f"[Collector] Received result for task: {task_id} from Worker-{result_message.get('worker_id')}. Data: {result_message.get('result_data')}")
            
        except zmq.error.ContextTerminated:
            print("[Collector] Context terminated, exiting loop.")
            break
        except Exception as e:
            print(f"[Collector] Error receiving result: {e}")
            break # Exit on error for simplicity

    print(f"\n[Collector] --- Summary ---")
    print(f"[Collector] Collected {len(collected_results)} results out of {num_expected_tasks} expected task results.")
    print(f"[Collector] {workers_done_count} workers reported done.")
    # for res in collected_results:
    #     print(f"  - Task {res['task_id']}: {res['result_data']} (from Worker-{res['worker_id']})")
    
    print("[Collector] Closing socket.")
    close_socket(results_receiver_socket, use_ailf)
    context.term()
    print("[Collector] Finished.")

## 4. Orchestration and Demonstration

Now, let's run these agents concurrently using threads.

In [None]:
def run_demo(use_ailf: bool = True):
    """Run the multi-agent ZMQ demonstration.
    
    Args:
        use_ailf: Whether to use AILF ZMQ utilities (if available)
    """
    NUM_TASKS_TO_GENERATE = 10
    NUM_PROCESSOR_WORKERS = 2

    implementation = "AILF ZMQ utilities" if use_ailf and using_ailf else "raw ZMQ"
    print(f"--- Starting Multi-Agent {implementation} Demonstration ---")

    # Create threads for each agent
    collector_thread = threading.Thread(
        target=result_collector_agent, 
        args=(NUM_TASKS_TO_GENERATE, NUM_PROCESSOR_WORKERS, use_ailf)
    )
    
    worker_threads = []
    for i in range(NUM_PROCESSOR_WORKERS):
        worker_id = str(i + 1)
        thread = threading.Thread(
            target=task_processor_agent, 
            args=(worker_id, use_ailf)
        )
        worker_threads.append(thread)
        
    generator_thread = threading.Thread(
        target=task_generator_agent, 
        args=(NUM_TASKS_TO_GENERATE, NUM_PROCESSOR_WORKERS, use_ailf)
    )

    # Start threads
    print("\nStarting Result Collector Agent...")
    collector_thread.start()
    
    time.sleep(0.5) # Give collector a moment to bind

    print("\nStarting Task Processor Agents...")
    for thread in worker_threads:
        thread.start()
        
    time.sleep(1) # Give workers time to connect to sync service

    print("\nStarting Task Generator Agent...")
    generator_thread.start()

    # Wait for all threads to complete
    print("\nWaiting for agents to finish...")
    generator_thread.join()
    print("Generator agent joined.")
    for thread in worker_threads:
        thread.join()
    print("All worker agents joined.")
    collector_thread.join()
    print("Collector agent joined.")

    print(f"\n--- Multi-Agent {implementation} Demonstration Complete ---")

if __name__ == "__main__":
    # Prefer using AILF ZMQ utilities if available, otherwise fall back to raw ZMQ
    run_demo(use_ailf=True)

## 5. Conclusion

This notebook demonstrated a basic multi-agent system using AILF's ZeroMQ utilities for communication and coordination. We saw:

- A **Task Generator** distributing work using a ZMQPush socket.
- Multiple **Task Processors** receiving work via ZMQPull sockets, processing it, and sending results via ZMQPush sockets.
- A **Result Collector** gathering results using a ZMQPull socket.
- A simple synchronization mechanism using ZMQPublisher/ZMQSubscriber to coordinate the start of workers.
- Fallback to raw ZMQ when AILF messaging utilities are not available.

**Advantages of Using AILF Messaging:**

1. **Abstraction**: Higher-level abstractions over ZeroMQ operations
2. **Resource Management**: Automatic resource cleanup and better error handling
3. **Type Safety**: Method signatures provide better guidance on usage
4. **Consistent API**: Uniform interface for different messaging patterns
5. **Integration**: Seamless integration with other AILF components like logging and monitoring

**Potential Extensions:**

- **Error Handling**: Implementing more robust error handling using AILF's logging mechanisms
- **Monitoring**: Adding metrics collection for performance and reliability
- **Dynamic Workers**: Creating workers that can join and leave the pool dynamically
- **Complex Task Payloads**: Using Pydantic models for structured message validation
- **Integration with AI Logic**: Each `task_processor_agent` could internally use an AI engine (like `AIEngine`) from the AILF framework to perform its task

The AILF messaging utilities provide a robust foundation for building distributed agent systems without the need to handle many of the low-level ZeroMQ details directly.