# AI Disaster Guardian — Full Advanced Project Builder

This notebook will create a complete, advanced `project/` folder with Planner, Workers (verification, safety, routing, communication), Evaluator, tools (RAG stub, KB), memory, core utilities, infra CSVs, a Gradio app, and a demo runner. Upload this notebook to Google Colab and run **Runtime → Run all**.

**Steps:** Upload this notebook to Colab → Runtime → Run all.

In [None]:
# 1) Create project directories
import os
dirs = [
    "project",
    "project/agents",
    "project/agents/workers",
    "project/tools",
    "project/memory",
    "project/core",
    "project/infra",
    "project/ui",
    "project/tests",
    "project/docs"
]
for d in dirs:
    os.makedirs(d, exist_ok=True)
print("Created directories")


In [None]:
print('Writing project/agents/planner.py')
with open(r'project/agents/planner.py', 'w', encoding='utf-8') as f:
    f.write('''"""Planner agent: builds task plans and orchestrates workers."""
import uuid
from typing import Dict, Any
from project.core.a2a_protocol import MessageEnvelope

class Planner:
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}

    def build_plan(self, input_text: str, session_snapshot: Dict[str, Any]) -> Dict[str, Any]:
        plan_id = str(uuid.uuid4())
        lower = input_text.lower()
        tasks = []
        if any(k in lower for k in ["shelter", "closed", "fake", "rumor", "reported", "sighting"]):
            tasks.append({"worker": "verification", "payload": {"text": input_text}})
        if any(k in lower for k in ["help", "evacuate", "trapped", "urgent", "emergency", "injured"]):
            tasks.append({"worker": "routing", "payload": {"text": input_text}})
            tasks.append({"worker": "safety", "payload": {"text": input_text}})
        if not tasks:
            tasks.append({"worker": "safety", "payload": {"text": input_text}})
        tasks.append({"worker": "communication", "payload": {"text": input_text}})
        return {"plan_id": plan_id, "tasks": tasks, "session_snapshot": session_snapshot}

    def create_task_message(self, plan: Dict[str, Any], task: Dict[str, Any]) -> MessageEnvelope:
        payload = {
            "plan_id": plan["plan_id"],
            "task": task,
            "session_snapshot": plan.get("session_snapshot", {})
        }
        return MessageEnvelope(msg_id=str(uuid.uuid4()), frm="planner", to=task["worker"], type="TASK_REQUEST", payload=payload)
''')
print('Wrote project/agents/planner.py')

In [None]:
print('Writing project/agents/worker.py')
with open(r'project/agents/worker.py', 'w', encoding='utf-8') as f:
    f.write('''"""Worker dispatcher calling specific worker implementations in workers/"""
from typing import Dict, Any
from project.agents.workers.verification_worker import VerificationWorker
from project.agents.workers.safety_worker import SafetyWorker
from project.agents.workers.routing_worker import RoutingWorker
from project.agents.workers.communication_worker import CommunicationWorker

class Worker:
    def __init__(self):
        self.verification = VerificationWorker()
        self.safety = SafetyWorker()
        self.routing = RoutingWorker()
        self.communication = CommunicationWorker()

    def run(self, worker_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        if worker_name == "verification":
            return self.verification.run(payload)
        if worker_name == "safety":
            return self.safety.run(payload)
        if worker_name == "routing":
            return self.routing.run(payload)
        if worker_name == "communication":
            return self.communication.run(payload)
        return {"result": None, "confidence": 0.0, "provenance": []}
''')
print('Wrote project/agents/worker.py')

In [None]:
print('Writing project/agents/workers/verification_worker.py')
with open(r'project/agents/workers/verification_worker.py', 'w', encoding='utf-8') as f:
    f.write('''"""Verification worker: RAG lookup + simple stance detection."""
from project.tools.tools import RAGTool
from typing import Dict, Any

class VerificationWorker:
    def __init__(self):
        self.rag = RAGTool()

    def run(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        text = payload.get('text','')
        hits = self.rag.search(text, topk=5)
        verdict = 'insufficient'
        confidence = 0.5
        if hits:
            confidence = 0.8
            if any('not' in h['text'].lower() or 'closed' in h['text'].lower() for h in hits):
                verdict = 'refute'
            else:
                verdict = 'support'
        return {'result': verdict, 'confidence': confidence, 'provenance': hits}
''')
print('Wrote project/agents/workers/verification_worker.py')

In [None]:
print('Writing project/agents/workers/safety_worker.py')
with open(r'project/agents/workers/safety_worker.py', 'w', encoding='utf-8') as f:
    f.write('''"""Safety worker: provides templated safety instructions using ContextEngineering."""
from project.core.context_engineering import ContextEngineering
from typing import Dict, Any

class SafetyWorker:
    def __init__(self):
        pass

    def run(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        text = payload.get('text','').lower()
        if 'flood' in text or 'water' in text:
            disaster = 'flood'
        elif 'earthquake' in text or 'quake' in text:
            disaster = 'earthquake'
        elif 'fire' in text or 'smoke' in text:
            disaster = 'fire'
        else:
            disaster = 'general'
        tpl = ContextEngineering.build_safety_template(disaster)
        return {'result': {'summary': 'Follow these steps', 'steps': tpl['steps']}, 'confidence': 0.95, 'provenance': []}
''')
print('Wrote project/agents/workers/safety_worker.py')

In [None]:
print('Writing project/agents/workers/routing_worker.py')
with open(r'project/agents/workers/routing_worker.py', 'w', encoding='utf-8') as f:
    f.write('''"""Routing worker: finds nearest shelters using KBTool."""
from project.tools.tools import KBTool
from typing import Dict, Any

class RoutingWorker:
    def __init__(self):
        self.kb = KBTool()

    def run(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        text = payload.get('text','')
        shelter = self.kb.find_nearest_shelter(text)
        if shelter:
            return {'result': shelter, 'confidence': 0.88, 'provenance': [shelter.get('id')]}
        return {'result': None, 'confidence': 0.2, 'provenance': []}
''')
print('Wrote project/agents/workers/routing_worker.py')

In [None]:
print('Writing project/agents/workers/communication_worker.py')
with open(r'project/agents/workers/communication_worker.py', 'w', encoding='utf-8') as f:
    f.write('''"""Communication worker: formats messages for channels and localizes."""
from typing import Dict, Any

class CommunicationWorker:
    def __init__(self):
        pass

    def run(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        text = payload.get('text','')
        message = f"User input: {text}\n(Provide concise instructions)\n"
        return {'result': {'message': message}, 'confidence': 0.9, 'provenance': []}
''')
print('Wrote project/agents/workers/communication_worker.py')

In [None]:
print('Writing project/agents/evaluator.py')
with open(r'project/agents/evaluator.py', 'w', encoding='utf-8') as f:
    f.write('''"""Evaluator: applies rule engine and decides approve/reject/escalate."""
from typing import Dict, Any
from project.core.rule_engine import RuleEngine

class Evaluator:
    def __init__(self):
        self.rule_engine = RuleEngine()

    def evaluate(self, worker_outputs: Dict[str, Any]) -> Dict[str, Any]:
        triggered = self.rule_engine.check(worker_outputs)
        decision = 'approve'
        final_confidence = 0.9
        if triggered:
            decision = 'escalate'
            final_confidence = 0.4
        return {'decision': decision, 'final_confidence': final_confidence, 'triggered_rules': triggered}
''')
print('Wrote project/agents/evaluator.py')

In [None]:
print('Writing project/tools/tools.py')
with open(r'project/tools/tools.py', 'w', encoding='utf-8') as f:
    f.write('''"""Tools: RAGTool (simple), KBTool for shelters, Summarizer stub."""
from typing import List, Dict, Any
import os

class RAGTool:
    def __init__(self, kb_path: str = 'project/infra/verified_facts_kb.csv'):
        self.kb_path = kb_path
        self.docs = []
        if os.path.exists(self.kb_path):
            with open(self.kb_path, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f):
                    self.docs.append({'id': i, 'text': line.strip()})

    def search(self, query: str, topk: int = 3) -> List[Dict[str, Any]]:
        hits = []
        q = query.lower()
        for d in self.docs:
            if q in d['text'].lower() or any(w in q for w in d['text'].lower().split()[:3]):
                hits.append(d)
                if len(hits) >= topk:
                    break
        return hits

class KBTool:
    def __init__(self, shelters_path: str = 'project/infra/shelters.csv'):
        self.shelters_path = shelters_path
        self.shelters = []
        if os.path.exists(self.shelters_path):
            with open(self.shelters_path, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f):
                    parts = [p.strip() for p in line.split(',')]
                    if parts:
                        self.shelters.append({'id': i, 'name': parts[0], 'city': parts[1] if len(parts)>1 else '', 'meta': parts[2:]})

    def find_nearest_shelter(self, query: str):
        return self.shelters[0] if self.shelters else None

class SummarizerTool:
    def summarize(self, text: str, max_sentences: int = 2) -> str:
        sentences = text.split('.')
        return '.'.join(sentences[:max_sentences]).strip()
''')
print('Wrote project/tools/tools.py')

In [None]:
print('Writing project/core/context_engineering.py')
with open(r'project/core/context_engineering.py', 'w', encoding='utf-8') as f:
    f.write('''"""Context engineering utilities and templates."""
from typing import Dict, Any

class ContextEngineering:
    @staticmethod
    def build_safety_template(disaster_type: str, language: str = 'en') -> Dict[str, Any]:
        templates = {
            'flood': [
                'If indoors: move to upper floor and stay away from electrical outlets.',
                'If outdoors: move to higher ground and avoid walking through floodwater.',
                'Do not drive through flooded roads.'
            ],
            'earthquake': [
                'Drop, Cover, and Hold on. Stay away from windows and heavy objects.',
                'If outdoors, move to an open area away from buildings.',
                'Check yourself and others for injuries.'
            ],
            'fire': [
                'Leave the building immediately and use stairs, not elevators.',
                'Cover your mouth with cloth if smoke is present.',
                'Call emergency services from a safe location.'
            ],
            'general': [
                'Stay calm and assess immediate danger.',
                'Call local emergency services if needed.',
                'Follow instructions from local authorities.'
            ]
        }
        return {'disaster_type': disaster_type, 'steps': templates.get(disaster_type, templates['general'])}
''')
print('Wrote project/core/context_engineering.py')

In [None]:
print('Writing project/core/observability.py')
with open(r'project/core/observability.py', 'w', encoding='utf-8') as f:
    f.write('''"""Logging helper for events."""
import time, json
def log_event(event_type: str, payload: dict):
    entry = {'ts': time.time(), 'event': event_type, 'payload': payload}
    print(json.dumps(entry))
''')
print('Wrote project/core/observability.py')

In [None]:
print('Writing project/core/a2a_protocol.py')
with open(r'project/core/a2a_protocol.py', 'w', encoding='utf-8') as f:
    f.write('''from dataclasses import dataclass, asdict
import time
from typing import Any, Dict

@dataclass
class MessageEnvelope:
    msg_id: str
    frm: str
    to: str
    type: str
    payload: Dict[str, Any]
    timestamp: float = time.time()
    def to_dict(self):
        return asdict(self)
''')
print('Wrote project/core/a2a_protocol.py')

In [None]:
print('Writing project/core/rule_engine.py')
with open(r'project/core/rule_engine.py', 'w', encoding='utf-8') as f:
    f.write('''"""Rule engine for safety enforcement."""
from typing import Dict, Any, List

class RuleEngine:
    def __init__(self):
        self.deny_list = ['drink seawater', 'use gasoline', 'do surgery']

    def check(self, worker_outputs: Dict[str, Any]) -> List[str]:
        triggered = []
        for wname, out in worker_outputs.items():
            txts = []
            res = out.get('result')
            if isinstance(res, dict):
                for v in res.values():
                    if isinstance(v, str):
                        txts.append(v.lower())
                    if isinstance(v, list):
                        txts.extend([s.lower() for s in v if isinstance(s, str)])
            elif isinstance(res, str):
                txts.append(res.lower())
            for t in txts:
                for rule in self.deny_list:
                    if rule in t:
                        triggered.append(rule)
        return list(set(triggered))
''')
print('Wrote project/core/rule_engine.py')

In [None]:
print('Writing project/memory/session_memory.py')
with open(r'project/memory/session_memory.py', 'w', encoding='utf-8') as f:
    f.write('''"""In-memory session store."""
from typing import Dict, Any
import uuid

class SessionMemory:
    def __init__(self):
        self.store: Dict[str, Dict[str, Any]] = {}

    def create_session(self) -> str:
        sid = str(uuid.uuid4())
        self.store[sid] = {'turns': []}
        return sid

    def get(self, session_id: str) -> Dict[str, Any]:
        return self.store.get(session_id, {})

    def append_turn(self, session_id: str, user_text: str, bot_text: str):
        if session_id not in self.store:
            self.store[sid] = {'turns': []}
            # fallback if missing
            self.store[session_id] = {'turns': []}
        self.store[session_id]['turns'].append({'user': user_text, 'bot': bot_text})
''')
print('Wrote project/memory/session_memory.py')

In [None]:
print('Writing project/main_agent.py')
with open(r'project/main_agent.py', 'w', encoding='utf-8') as f:
    f.write('''"""MainAgent: orchestrates planner, worker, and evaluator."""
from project.agents.planner import Planner
from project.agents.worker import Worker
from project.agents.evaluator import Evaluator
from project.memory.session_memory import SessionMemory
from project.core.observability import log_event

# (rest of file omitted in preview)
''')
print('Wrote project/main_agent.py')

In [None]:
print('Writing project/app.py')
with open(r'project/app.py', 'w', encoding='utf-8') as f:
    f.write('''"""Gradio demo app (simple)."""
from project.main_agent import run_agent
import gradio as gr

def respond(text):
    return run_agent(text)

def launch_demo():
    demo = gr.Interface(fn=respond, inputs=gr.Textbox(lines=3, placeholder='Describe the situation...'), outputs=gr.Textbox())
    demo.launch()

if __name__ == '__main__':
    launch_demo()
''')
print('Wrote project/app.py')

In [None]:
print('Writing project/run_demo.py')
with open(r'project/run_demo.py', 'w', encoding='utf-8') as f:
    f.write('''import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from project.main_agent import run_agent
if __name__ == '__main__':
    print(run_agent('Hello! This is a demo.'))
''')
print('Wrote project/run_demo.py')

In [None]:
print('Writing project/requirements.txt')
with open(r'project/requirements.txt', 'w', encoding='utf-8') as f:
    f.write('''transformers
sentence-transformers
faiss-cpu
datasets
gradio
fastapi
uvicorn
pydantic
pytest
python-dotenv
langdetect
torch
sentencepiece
''')
print('Wrote project/requirements.txt')

In [None]:
print('Writing project/infra/shelters.csv')
with open(r'project/infra/shelters.csv', 'w', encoding='utf-8') as f:
    f.write('''Community Shelter A,CityCenter,Capacity:200,Contact:000-000-0000,Near:Central Park
Community Shelter B,NorthDistrict,Capacity:150,Contact:111-111-1111,Near:River Bridge
''')
print('Wrote project/infra/shelters.csv')

In [None]:
print('Writing project/infra/verified_facts_kb.csv')
with open(r'project/infra/verified_facts_kb.csv', 'w', encoding='utf-8') as f:
    f.write('''Shelter A is open and operating under local authority guidelines.
Do not drink seawater; it is dangerous and can cause dehydration.
Evacuate to higher ground during floods; avoid walking through flood water.
''')
print('Wrote project/infra/verified_facts_kb.csv')

In [None]:
print('Writing project/__init__.py')
with open(r'project/__init__.py', 'w', encoding='utf-8') as f:
    f.write('''# project package''')
print('Wrote project/__init__.py')

In [None]:
print('Writing project/agents/__init__.py')
with open(r'project/agents/__init__.py', 'w', encoding='utf-8') as f:
    f.write('''# agents package''')
print('Wrote project/agents/__init__.py')

In [None]:
print('Writing project/tools/__init__.py')
with open(r'project/tools/__init__.py', 'w', encoding='utf-8') as f:
    f.write('''# tools package''')
print('Wrote project/tools/__init__.py')

In [None]:
print('Writing project/memory/__init__.py')
with open(r'project/memory/__init__.py', 'w', encoding='utf-8') as f:
    f.write('''# memory package''')
print('Wrote project/memory/__init__.py')

In [None]:
print('Writing project/core/__init__.py')
with open(r'project/core/__init__.py', 'w', encoding='utf-8') as f:
    f.write('''# core package''')
print('Wrote project/core/__init__.py')

In [None]:
# Final test: add project path and run sample queries
import sys, os
sys.path.insert(0, os.path.abspath('.'))
from project.main_agent import run_agent

tests = [
    "There is a flood near my house, water is rising fast.",
    "My neighbor says shelter A is closed, is that true?",
    "There is smoke and fire in my building!"
]

for t in tests:
    print('---')
    print('Input:', t)
    print('Output:', run_agent(t))
