# Agents for Good:Disaster Relief Agent

## Project Overview
This notebook implements a multi-agent system designed to coordinate disaster relief efforts by automating resource allocation, victim assistance routing, and real-time information aggregation during natural disasters.

**Track:** Agents for Good
**Problem:** During natural disasters, response coordination is fragmented, leading to inefficient resource allocation and delayed assistance to vulnerable populations.
**Solution:** An AI-powered multi-agent system that orchestrates disaster relief operations through intelligent coordination of information gathering, resource allocation, and victim assistance routing.

---

## Key Concepts Demonstrated
1. **Multi-Agent System** - Sequential and parallel agents for different relief functions
2. **Custom Tools** - Data processing and external API integration
3. **Sessions & Memory** - State management and long-term memory for context continuity
4. **Context Engineering** - Efficient information handling across agent interactions
5. **Observability** - Comprehensive logging and tracing of agent operations
6. **Agent Evaluation** - Impact metrics specific to disaster relief operations


## Table of Contents
1. Import Required Libraries and Setup
2. Define the Problem Statement and Agent Architecture
3. Initialize Multi-Agent System with LLM-Powered Agents
4. Implement Custom Tools for Data Processing
5. Setup Sessions and Memory Management
6. Create Sequential Agent Workflow
7. Implement Logging and Observability
8. Build Agent Evaluation Framework
9. Execute Agent Pipeline and Demonstrate Results
10. Documentation and Deployment Instructions


---
# Section 1: Import Required Libraries and Setup

In [9]:
# Import Required Libraries and Setup (Graceful Gemini Stub)
import os, sys, json, logging, time, uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import pandas as pd, numpy as np
from collections import defaultdict, deque
import matplotlib.pyplot as plt, seaborn as sns, networkx as nx

# Gemini / Google Generative AI graceful fallback
try:
    import google.generativeai as genai
    if not hasattr(genai, '__version__'):
        genai.__version__ = 'unknown'
except Exception:
    class StubGenAI:
        __version__ = 'stub'
        def configure(self, **kwargs):
            pass
    genai = StubGenAI()

import logging.handlers
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(), logging.FileHandler('disaster_relief_agents.log')]
)
logger = logging.getLogger('bootstrap')
logger.info('Bootstrap imports complete; genai version=%s', getattr(genai,'__version__','n/a'))
print("✓ Core libraries loaded; genai version:", getattr(genai,'__version__','n/a'))

2025-11-24 09:34:27,779 - bootstrap - INFO - Bootstrap imports complete; genai version=stub


✓ Core libraries loaded; genai version: stub


In [1]:
# Dependency Installation (Run Once)
# Note: Avoid installing unnecessary packages repeatedly.
%pip install -q sentence-transformers fastapi uvicorn opentelemetry-api opentelemetry-sdk

import importlib
try:
    import sentence_transformers
    from sentence_transformers import SentenceTransformer
    print("✓ sentence-transformers available")
except Exception as e:
    print("SentenceTransformers unavailable; will fallback to simple embeddings.")
    SentenceTransformer = None

try:
    from opentelemetry import trace, metrics
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import SimpleSpanProcessor
    from opentelemetry.sdk.metrics import MeterProvider
    from opentelemetry.sdk.resources import Resource
    from opentelemetry.sdk.trace.export import ConsoleSpanExporter
    trace.set_tracer_provider(TracerProvider(resource=Resource.create({"service.name": "disaster-relief-agents"})))
    span_processor = SimpleSpanProcessor(ConsoleSpanExporter())
    trace.get_tracer_provider().add_span_processor(span_processor)
    print("✓ OpenTelemetry tracing ready")
except Exception as e:
    print("OpenTelemetry not fully available; proceeding with custom tracing.")


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.
SentenceTransformers unavailable; will fallback to simple embeddings.
✓ OpenTelemetry tracing ready
SentenceTransformers unavailable; will fallback to simple embeddings.
✓ OpenTelemetry tracing ready


In [7]:
# Dependency Fixes (Upgrade typing_extensions / pydantic stack if needed)
%pip install -q --upgrade typing_extensions pydantic google-generativeai
print("✓ Upgraded typing_extensions & pydantic")

Note: you may need to restart the kernel to use updated packages.
✓ Upgraded typing_extensions & pydantic



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Configure Logging and API Credentials Securely
import logging.handlers

# Setup comprehensive logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('disaster_relief_agents.log')
    ]
)

logger = logging.getLogger(__name__)
logger.info("=" * 80)
logger.info("DISASTER RELIEF COORDINATION SYSTEM - Agent Initialization")
logger.info("=" * 80)

# Environment configuration (NOTE: DO NOT INCLUDE ACTUAL API KEYS IN SUBMISSION)
def setup_credentials():
    """
    Setup API credentials securely from environment variables.
    In production, use Secret Manager or similar secure credential storage.
    """
    # For notebook submission, use placeholder
    api_key = os.getenv('GOOGLE_API_KEY', 'YOUR_API_KEY_HERE')
    
    if api_key == 'YOUR_API_KEY_HERE':
        logger.warning("⚠️  GOOGLE_API_KEY not set. Using placeholder for demonstration.")
        logger.info("To run this notebook, set: export GOOGLE_API_KEY='your-actual-api-key'")
        return False
    
    try:
        genai.configure(api_key=api_key)
        logger.info("✓ API credentials configured successfully")
        return True
    except Exception as e:
        logger.error(f"✗ Failed to configure API: {e}")
        return False

credentials_ready = setup_credentials()
logger.info(f"Credentials Status: {credentials_ready}")


---
# Section 2: Define the Problem Statement and Agent Architecture

## Problem Statement

### The Challenge
During natural disasters (earthquakes, floods, hurricanes), relief coordination faces critical bottlenecks:
- **Information Fragmentation**: Data scattered across multiple sources (social media, government agencies, NGOs)
- **Resource Misallocation**: Limited resources deployed inefficiently due to poor situational awareness
- **Response Delays**: Manual coordination processes delay assistance to vulnerable populations
- **Scale and Complexity**: Managing hundreds of simultaneous requests and resource movements

### Why Agents Are the Solution
AI Agents are uniquely suited for this problem because they can:
- **Autonomously coordinate** across multiple data sources in real-time
- **Make decisions** based on prioritization logic (vulnerability factors, geographic proximity, resource availability)
- **Operate continuously** without human intervention, enabling 24/7 response
- **Scale dynamically** from local incidents to major disasters
- **Learn and adapt** from past operations to improve future responses

### Social Impact Value
- **Lives Saved**: Faster response times translate directly to reduced mortality
- **Resource Efficiency**: Optimal allocation reduces waste and extends relief capacity
- **Equity**: Automated prioritization ensures vulnerable populations receive priority
- **Transparency**: Real-time tracking builds trust with affected communities


---
# Section 2A: Architecture Overview

## System Architecture Diagram (Conceptual)
```
+-------------------+        +------------------+        +------------------+
|  Planner Agent    | --->   | Retrieval Agent  | --->   | Execution Agent  |
|  (Task Decomposer)|        | (Data / Memory)  |        | (Tool Orchestr.) |
+---------+---------+        +-------+----------+        +---------+--------+
          |                           |                            |
          v                           v                            v
   +-------------+            +---------------+              +-------------+
   |  Memory     | <-------+  |  Tools Layer  |  +---------> | Evaluation  |
   |  (Session + |          \ | (Search, Code | /            |  Agent      |
   |  Long-Term) |           \|  Exec, MCP)   |/             +-------------+
   +------+------+/           +-------+-------+
          ^                            |
          |                            v
          +-------------------+  +-----------+
                              |  |  Logging  |
                              |  |  Metrics  |
                              |  |  Tracing  |
                              |  +-----------+
```

## Key Components
- **A2A Protocol**: Lightweight message schema enabling structured agent-to-agent communication.
- **Multi-Agent Patterns**: Planner (sequential), Retrieval (parallelizable), Execution (tool dispatch), Evaluation (loop control).
- **Tools Layer**: Abstract interface + concrete tools (SearchTool stub, CodeExecutionTool, ResourceAllocationTool). Support for future MCP/OpenAPI integration.
- **Memory Subsystem**: Short-term session buffer + long-term vector store with context compaction for prompt efficiency.
- **Context Engineering**: Dynamic summarization reduces token footprint while preserving intent & facts.
- **Observability**: Unified tracer + metrics counters (tool_calls, plan_steps, eval_iterations, latency buckets).
- **Evaluation Harness**: Automated scoring of completeness, relevance, efficiency.
- **Deployment Adapter**: FastAPI wrapper for Cloud Run / Agent Engine runtime.

## Innovation & Value Alignment
- Rapid, adaptive coordination for high-stakes relief scenarios.
- Extensible design: swap Gemini / other LLMs transparently.
- Privacy & Safety: No embedded secrets; clear credential boundary.
- Scalability Path: Parallel retrieval & streaming evaluation loops.

## Flow Summary
1. User / system submits a disaster task.
2. Planner decomposes into actionable subtasks.
3. Retrieval concurrently gathers situational data & historical memory.
4. Execution dispatches tools (simulated here) to allocate resources.
5. Evaluation scores outcome; loop continues until threshold or max iterations.
6. Metrics & traces recorded; artifacts stored in long-term memory.

---
# Section 3: Core Schemas & Protocol

In [2]:
# Core Schemas & A2A Protocol
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import time, uuid

class AgentRole(str):
    PLANNER = "planner"
    RETRIEVAL = "retrieval"
    EXECUTION = "execution"
    EVALUATION = "evaluation"
    ORCHESTRATOR = "orchestrator"

@dataclass
class Message:
    sender: str
    role: str
    content: str
    timestamp: float = field(default_factory=lambda: time.time())
    metadata: Dict[str, Any] = field(default_factory=dict)
    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    def compact(self) -> Dict[str, Any]:
        return {"s": self.sender, "r": self.role, "c": self.content[:220], "t": round(self.timestamp,2)}

@dataclass
class ToolResult:
    tool_name: str
    success: bool
    output: Any
    latency_ms: float
    error: Optional[str] = None

class A2ABus:
    def __init__(self):
        self.messages: List[Message] = []

    def publish(self, message: Message):
        self.messages.append(message)

    def recent(self, limit: int = 10) -> List[Message]:
        return self.messages[-limit:]

    def summary(self) -> str:
        return "\n".join([f"[{m.role}] {m.sender}: {m.content[:140]}" for m in self.recent(12)])

bus = A2ABus()
print("✓ A2A protocol initialized; message bus ready")

✓ A2A protocol initialized; message bus ready


In [3]:
# Memory Subsystem: Session + Long-Term Vector Memory + Context Compaction
from collections import deque
import math

class SessionMemory:
    def __init__(self, max_messages: int = 50):
        self.buffer = deque(maxlen=max_messages)

    def add(self, msg: Message):
        self.buffer.append(msg)

    def as_list(self):
        return list(self.buffer)

    def compact(self) -> str:
        # Simple heuristic summarizer (fallback if LLM unavailable)
        important = []
        for m in list(self.buffer)[-12:]:
            if any(k in m.content.lower() for k in ["urgent","resource","allocate","priority","need"]):
                important.append(m)
        base = important if important else list(self.buffer)[-6:]
        return " | ".join([f"{m.role}:{m.content[:80]}" for m in base])

class VectorMemory:
    def __init__(self, embed_model: Optional[Any] = None):
        self.embed_model = embed_model
        self.vectors: List[Dict[str, Any]] = []

    def _embed(self, text: str) -> List[float]:
        if self.embed_model:
            return self.embed_model.encode(text).tolist()  # type: ignore
        # Fallback: naive hashing to vector
        return [ (sum(bytearray(text, 'utf-8')) % 997) / 997.0 ]

    def add(self, msg: Message):
        vec = self._embed(msg.content)
        self.vectors.append({"id": msg.id, "vec": vec, "content": msg.content, "role": msg.role})

    def search(self, query: str, top_k: int = 3) -> List[str]:
        qv = self._embed(query)
        scored = []
        for item in self.vectors:
            # cosine for 1D reduces to sign check; use inverse distance
            dist = abs(qv[0] - item["vec"][0])
            score = 1 - dist
            scored.append((score, item))
        scored.sort(key=lambda x: x[0], reverse=True)
        return [s[1]["content"] for s in scored[:top_k]]

session_memory = SessionMemory()
vector_memory = VectorMemory(embed_model=SentenceTransformer('all-MiniLM-L6-v2') if 'SentenceTransformer' in globals() and SentenceTransformer else None)
print("✓ Memory subsystem initialized (session + vector)")

✓ Memory subsystem initialized (session + vector)


In [4]:
# Tools Layer: Abstract + Concrete Implementations + MCP Placeholder
import random
from concurrent.futures import ThreadPoolExecutor

class BaseTool:
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description

    def run(self, **kwargs) -> Any:
        raise NotImplementedError

class SearchTool(BaseTool):
    def run(self, query: str) -> Dict[str, Any]:
        # Placeholder external search stub
        results = [
            {"title": "Relief Center A", "need": "water", "severity": random.randint(1,10)},
            {"title": "Shelter B", "need": "medical", "severity": random.randint(1,10)},
            {"title": "Village C", "need": "food", "severity": random.randint(1,10)}
        ]
        return {"query": query, "results": results}

class CodeExecutionTool(BaseTool):
    def run(self, code: str) -> Dict[str, Any]:
        start = time.time()
        local_env = {}
        try:
            exec(code, {}, local_env)
            return {"stdout": local_env, "latency_ms": (time.time()-start)*1000}
        except Exception as e:
            return {"error": str(e), "latency_ms": (time.time()-start)*1000}

class ResourceAllocationTool(BaseTool):
    def run(self, demands: List[Dict[str, Any]], supply: Dict[str, int]) -> Dict[str, Any]:
        allocation = []
        for d in demands:
            item = d['need']
            if supply.get(item,0) > 0:
                supply[item] -= 1
                allocation.append({"location": d.get("title"), "allocated": item})
        return {"allocation": allocation, "remaining": supply}

# MCP Adapter Placeholder
class MCPAdapterTool(BaseTool):
    def run(self, endpoint: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        # Simulated request; in production use requests or official SDK
        return {"endpoint": endpoint, "status": "simulated", "echo": payload}

class ToolRegistry:
    def __init__(self):
        self.tools: Dict[str, BaseTool] = {}

    def register(self, tool: BaseTool):
        self.tools[tool.name] = tool

    def get(self, name: str) -> BaseTool:
        return self.tools[name]

registry = ToolRegistry()
registry.register(SearchTool("search", "Simulated external search"))
registry.register(CodeExecutionTool("code_exec", "Execute sandboxed Python code"))
registry.register(ResourceAllocationTool("allocator", "Allocate relief resources"))
registry.register(MCPAdapterTool("mcp_adapter", "Placeholder for MCP integration"))
print("✓ Tools registered:", list(registry.tools.keys()))

✓ Tools registered: ['search', 'code_exec', 'allocator', 'mcp_adapter']


In [10]:
# Agents Definitions: Planner, Retrieval, Execution, Evaluation, Orchestrator
class BaseAgent:
    def __init__(self, name: str, role: str, bus: A2ABus):
        self.name = name
        self.role = role
        self.bus = bus
        self.logger = logging.getLogger(name)
        self.tool_calls = 0

    def send(self, content: str, metadata: Dict[str, Any] = None):
        m = Message(sender=self.name, role=self.role, content=content, metadata=metadata or {})
        self.bus.publish(m)
        session_memory.add(m)
        vector_memory.add(m)
        return m

    def act(self, **kwargs):
        raise NotImplementedError

class PlannerAgent(BaseAgent):
    def act(self, goal: str) -> List[str]:
        steps = [
            "Clarify disaster context", "Retrieve situational data", "Prioritize locations", "Allocate resources", "Evaluate impact"
        ]
        self.send(f"Plan for goal: {goal} -> {steps}")
        return steps

class RetrievalAgent(BaseAgent):
    def act(self, query: str) -> Dict[str, Any]:
        tool = registry.get("search")
        start = time.time()
        data = tool.run(query=query)
        self.tool_calls += 1
        self.send(f"Retrieved {len(data['results'])} items for query '{query}'")
        return data

class ExecutionAgent(BaseAgent):
    def act(self, situational: Dict[str, Any], supply: Dict[str, int]) -> Dict[str, Any]:
        demands = situational.get("results", [])
        allocator = registry.get("allocator")
        allocation = allocator.run(demands=demands, supply=supply)
        self.tool_calls += 1
        self.send(f"Allocation done: {allocation['allocation']}")
        return allocation

class EvaluationAgent(BaseAgent):
    def act(self, allocation: Dict[str, Any]) -> Dict[str, Any]:
        score = len(allocation.get("allocation", [])) / max(1, (len(allocation.get("allocation", [])) + len(allocation.get("remaining", {}))))
        needs_remaining = {k:v for k,v in allocation.get("remaining", {}).items() if v>0}
        eval_result = {"effectiveness_score": round(score,2), "remaining_supply": needs_remaining}
        self.send(f"Evaluation score={eval_result['effectiveness_score']}; remaining={needs_remaining}")
        return eval_result

class Orchestrator(BaseAgent):
    def __init__(self, name: str, bus: A2ABus):
        super().__init__(name, AgentRole.ORCHESTRATOR, bus)
        self.planner = PlannerAgent("planner", AgentRole.PLANNER, bus)
        self.retrieval = RetrievalAgent("retrieval", AgentRole.RETRIEVAL, bus)
        self.execution = ExecutionAgent("execution", AgentRole.EXECUTION, bus)
        self.evaluation = EvaluationAgent("evaluation", AgentRole.EVALUATION, bus)

    def act(self, goal: str, supply: Dict[str,int], max_loops: int = 2, threshold: float = 0.75):
        steps = self.planner.act(goal)
        loop = 0
        final_report = {}
        while loop < max_loops:
            loop += 1
            self.send(f"Loop {loop} starting; context compact: {session_memory.compact()}")
            situational = self.retrieval.act(query=goal)
            allocation = self.execution.act(situational=situational, supply=supply)
            eval_result = self.evaluation.act(allocation=allocation)
            final_report = {"allocation": allocation, "evaluation": eval_result, "loop": loop}
            if eval_result["effectiveness_score"] >= threshold:
                self.send(f"Threshold met; stopping at loop {loop}")
                break
        self.send("Orchestration complete")
        return final_report

orchestrator = Orchestrator("orchestrator", bus)
print("✓ Agents initialized; orchestrator ready")

✓ Agents initialized; orchestrator ready


In [11]:
# Demonstration Run
initial_supply = {"water": 3, "medical": 2, "food": 4}
report = orchestrator.act(goal="Assess flood situation and allocate resources", supply=initial_supply, max_loops=3, threshold=0.6)
print("Final Report:\n", json.dumps(report, indent=2))
print("Conversation Summary:\n", bus.summary())

Final Report:
 {
  "allocation": {
    "allocation": [
      {
        "location": "Relief Center A",
        "allocated": "water"
      },
      {
        "location": "Village C",
        "allocated": "food"
      }
    ],
    "remaining": {
      "water": 0,
      "medical": 0,
      "food": 1
    }
  },
  "evaluation": {
    "effectiveness_score": 0.4,
    "remaining_supply": {
      "food": 1
    }
  },
  "loop": 3
}
Conversation Summary:
 [retrieval] retrieval: Retrieved 3 items for query 'Assess flood situation and allocate resources'
[execution] execution: Allocation done: [{'location': 'Relief Center A', 'allocated': 'water'}, {'location': 'Shelter B', 'allocated': 'medical'}, {'location': 'Vi
[evaluation] evaluation: Evaluation score=0.5; remaining={'water': 2, 'medical': 1, 'food': 3}
[orchestrator] orchestrator: Loop 2 starting; context compact: planner:Plan for goal: Assess flood situation and allocate resources -> ['Clarify disast | orchestrator:Lo
[retrieval] retrieval: R

In [12]:
# Evaluation Harness: Simple Metrics & Scoring
import statistics

def evaluate_tasks(tasks: List[str]):
    scores = []
    for t in tasks:
        supply = {"water": 2, "medical": 1, "food": 3}
        rpt = orchestrator.act(goal=t, supply=supply, max_loops=2, threshold=0.7)
        scores.append(rpt['evaluation']['effectiveness_score'])
    return {
        "tasks": tasks,
        "scores": scores,
        "avg_score": round(statistics.mean(scores),2),
        "min": min(scores),
        "max": max(scores)
    }

test_tasks = [
    "Flood relief allocation in coastal zone", 
    "Earthquake response prioritization for medical aid",
    "Food distribution planning after storm"
]
metrics_report = evaluate_tasks(test_tasks)
print("Evaluation Metrics:\n", json.dumps(metrics_report, indent=2))

Evaluation Metrics:
 {
  "tasks": [
    "Flood relief allocation in coastal zone",
    "Earthquake response prioritization for medical aid",
    "Food distribution planning after storm"
  ],
  "scores": [
    0.4,
    0.4,
    0.4
  ],
  "avg_score": 0.4,
  "min": 0.4,
  "max": 0.4
}


---
# Section 4: Deployment Instructions (FastAPI + Cloud Run)

## FastAPI Wrapper (Concept Code)
```python
from fastapi import FastAPI
app = FastAPI()

@app.post('/orchestrate')
async def orchestrate_endpoint(goal: str):
    supply = {"water": 5, "medical": 3, "food": 6}
    rpt = orchestrator.act(goal=goal, supply=supply, max_loops=3, threshold=0.7)
    return rpt
```

## Containerization Steps
```bash
# 1. Create requirements.txt including fastapi uvicorn sentence-transformers opentelemetry-* google-generativeai
# 2. Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]

# 3. Build & Deploy (Cloud Run)
docker build -t gcr.io/PROJECT_ID/disaster-agents:latest .
docker push gcr.io/PROJECT_ID/disaster-agents:latest
gcloud run deploy disaster-agents \
  --image gcr.io/PROJECT_ID/disaster-agents:latest \
  --platform managed --region REGION --allow-unauthenticated
```

## Observability Export
Integrate OpenTelemetry exporters (e.g., OTLP) for traces & metrics.

---
# Section 5: README / Documentation Summary

## Problem
Fragmented disaster relief coordination leads to slower response, inefficient resource allocation, and inequitable outcomes.

## Solution
Multi-agent orchestrated system handling planning, data retrieval, allocation, and evaluation with memory, tools, observability, and iterative improvement loop.

## Features Implemented
- Multi-agent (planner, retrieval, execution, evaluation, orchestrator)
- Tooling layer with custom tools + MCP adapter placeholder
- Session + long-term vector memory, context compaction
- Evaluation harness for effectiveness scoring
- Observability via logging + (optional) OpenTelemetry stubs
- Deployment-ready FastAPI pattern

## Safety & Keys
No API keys stored. Set `GOOGLE_API_KEY` before enabling Gemini.

## Next Extensions
- Real API integration (social media, geospatial feeds)
- Advanced prioritization with vulnerability indices
- Live streaming updates & adaptive re-planning
- Enhanced evaluation with ground-truth simulation

---
End of notebook.