## Agent Communication Architecture

In [5]:
import asyncio
import json
import uuid
from datetime import datetime
from typing import Dict, List, Any
from dataclasses import dataclass

print("ADK AGENT COMMUNICATION")
print("=" * 30)

@dataclass
class SimpleTask:
    task_id: str
    task_type: str
    payload: Dict[str, Any]
    result: Any = None

print("Core types ready")

ADK AGENT COMMUNICATION
Core types ready


## Agent Coordination System

In [6]:
# Simple Agent Coordinator (Fixed ADK Syntax)
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.models.lite_llm import LiteLlm
from google.genai import types

class SimpleAgentCoordinator:
    """Basic agent communication using ADK with local LLaMA"""
    
    def __init__(self):
        self.workers = {}
        self.tasks = {}
        
        # Use local LLaMA model
        self.local_model = LiteLlm(model="ollama_chat/llama3.2:3b")
        
    async def setup_coordinator(self):
        """Initialize coordinator agent with local model"""
        self.coordinator = Agent(
            name="Coordinator",
            model=self.local_model,
            instruction="You coordinate tasks between worker agents. Communicate with structured JSON."
        )
        
        self.coord_session = InMemorySessionService()
        self.coord_runner = Runner(
            agent=self.coordinator,
            app_name="coordination", 
            session_service=self.coord_session
        )
        
        await self.coord_session.create_session(
            app_name="coordination",
            user_id="system", 
            session_id="coord"
        )
        print("Coordinator agent ready (local LLaMA)")
    
    async def add_worker(self, worker_id: str, specialization: str):
        """Add worker agent with local model"""
        worker = Agent(
            name=f"Worker_{worker_id}",
            model=self.local_model,
            instruction=f"You are a {specialization} worker. Process tasks and return JSON results."
        )
        
        session = InMemorySessionService()
        runner = Runner(
            agent=worker,
            app_name="coordination",
            session_service=session
        )
        
        await session.create_session(
            app_name="coordination",
            user_id="system",
            session_id=worker_id
        )
        
        self.workers[worker_id] = {
            'agent': worker,
            'runner': runner,
            'session': session,
            'specialization': specialization
        }
        
        print(f"Worker {worker_id} ({specialization}) ready (local LLaMA)")
    
    async def send_task_to_worker(self, worker_id: str, task: SimpleTask):
        """Send task to specific worker agent"""
        worker = self.workers[worker_id]
        
        # Create task message
        task_message = f"""
        Process this task:

        Task ID: {task.task_id}
        Type: {task.task_type}
        Data: {json.dumps(task.payload)}

        Return JSON with: {{"task_id": "{task.task_id}", "result": "your_result", "status": "completed"}}
        """
        
        # Send to worker agent
        message = types.Content(role="user", parts=[types.Part(text=task_message)])
        
        async for event in worker['runner'].run_async(
            user_id="system",
            session_id=worker_id, 
            new_message=message
        ):
            if event.is_final_response():
                response = event.content.parts[0].text
                
                try:
                    result = json.loads(response)
                    task.result = result
                except:
                    task.result = {"raw_response": response}
                
                break
        
        self.tasks[task.task_id] = task
        print(f"Task {task.task_id[:8]} → {worker_id}")
        print(f"Result: {str(task.result)[:100]}...")
        
        return task.result

In [7]:
coordinator = SimpleAgentCoordinator()
await coordinator.setup_coordinator()

# Add workers
await coordinator.add_worker("data_worker", "data processing")
await coordinator.add_worker("content_worker", "content generation")

print("\nAgent communication system ready (using local LLaMA 3.2:3b)")

Coordinator agent ready (local LLaMA)
Worker data_worker (data processing) ready (local LLaMA)
Worker content_worker (content generation) ready (local LLaMA)

Agent communication system ready (using local LLaMA 3.2:3b)


## Testing

In [8]:
# Agent Communication Test
async def test_agent_communication():
    """Test agent-to-agent communication patterns"""
    
    print("TESTING AGENT COMMUNICATION")
    print("=" * 35)
    
    # Test tasks for different workers
    tasks = [
        SimpleTask(
            task_id=str(uuid.uuid4()),
            task_type="data_analysis", 
            payload={"dataset": "sales_data.csv", "operation": "calculate_trends"}
        ),
        SimpleTask(
            task_id=str(uuid.uuid4()),
            task_type="content_creation",
            payload={"topic": "Agent Communication", "length": "500 words"}
        ),
        SimpleTask(
            task_id=str(uuid.uuid4()),
            task_type="data_processing", 
            payload={"data": [1,2,3,4,5], "operation": "calculate_statistics"}
        )
    ]
    
    print("Processing tasks through agent communication:")
    
    # Distribute tasks
    for task in tasks:
        if "data" in task.task_type:
            await coordinator.send_task_to_worker("data_worker", task)
        else:
            await coordinator.send_task_to_worker("content_worker", task)
        
        await asyncio.sleep(1)  # Brief pause between tasks
    
    print(f"\nProcessed {len(tasks)} tasks via agent communication")
    print(f"Agent Messages: {len(tasks) * 2} (request + response)")
    print(f"Workers Used: {len(coordinator.workers)}")
    
    return tasks

# Run demonstration
completed_tasks = await test_agent_communication()

# NEW SECTION - Show detailed agent responses
print("\nDETAILED AGENT RESPONSES:")
print("=" * 40)

for i, task in enumerate(completed_tasks, 1):
    print(f"\nTask {i}: {task.task_type}")
    print(f"Input Payload: {task.payload}")
    
    if task.result:
        if 'raw_response' in task.result:
            print(f"   Agent Response: {task.result['raw_response']}...")
        else:
            print(f"   Parsed JSON: {task.result}")
            if 'result' in task.result:
                print(f"Task Result: {task.result['result']}")
    else:
        print(f"No response received")

TESTING AGENT COMMUNICATION
Processing tasks through agent communication:
Task fdd42bdd → data_worker
Result: {'raw_response': '```json\n{\n  "task_id": "fdd42bdd-a94e-44ee-96ce-6ef9ba50b8d9",\n  "result": "{\\...
Task 2dd44445 → content_worker
Result: {'raw_response': 'Here is the processed task result in JSON format:\n\n```\n{\n  "task_id": "2dd4444...
Task ade847f3 → data_worker
Result: {'raw_response': '```json\n{\n  "task_id": "ade847f3-56da-4d49-b134-af331e5a459d",\n  "result": "{\\...

Processed 3 tasks via agent communication
Agent Messages: 6 (request + response)
Workers Used: 2

DETAILED AGENT RESPONSES:

📋 Task 1: data_analysis
   Input Payload: {'dataset': 'sales_data.csv', 'operation': 'calculate_trends'}
   Agent Response: ```json
{
  "task_id": "fdd42bdd-a94e-44ee-96ce-6ef9ba50b8d9",
  "result": "{\"trend\": \"increasing\", \"growth_rate\": 0.05}",
  "status": "completed"
}
```

I processed the task and calculated the trends in the sales data. The result indicates that t