# Civil Protection Agent (Colab Demo) ðŸ‡¦ðŸ‡±ðŸ‡¬ðŸ‡§
This notebook demonstrates how to build a **simple, auditable agent** for the domain of **Civil Protection / Mbrojtja Civile**.

## What the agent does (triage workflow)
Given an incident report (free text), the agent:
1. **Classifies** the incident type (flood / fire / earthquake)
2. **Assesses priority** (LOW / MEDIUM / HIGH)
3. **Retrieves** the relevant *official protocol* from a mini knowledge base using embeddings + FAISS
4. **Proposes actions** based on the protocol, adapted by priority
5. Runs a **safety/policy check** so risky actions require human approval
6. Produces an **audited report** with:
   - evidence used (protocol IDs, scores, metadata)
   - actions (approved + blocked)
   - a full tool-call audit log with timings

## Why this is an "agent"
It has:
- **Goal** (what we want to achieve)
- **State** (what we know so far)
- **Tools** (functions the agent can call)
- **Memory** (stored in state + audit log)
- **Orchestrator** (Observe â†’ Plan â†’ Act loop with step limits)

> Important: This demo is **safe-by-design**:
> - tools are **whitelisted**
> - there is a **max step limit**
> - actions that affect the public are **blocked** unless a human approves

In [None]:
# =========================================================
# 0) Install dependencies (Colab)
# =========================================================
# - sentence-transformers: multilingual embeddings for Albanian + English
# - faiss-cpu: fast similarity search (vector index)
# - pandas/numpy: data handling
!pip -q install sentence-transformers faiss-cpu pandas numpy

In [None]:
# =========================================================
# 1) Imports + configuration
# =========================================================
import os
import re
import time
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import faiss
from sentence_transformers import SentenceTransformer

# Optional: hide Hugging Face warning about missing HF_TOKEN.
# Public models do NOT require authentication.
os.environ["HF_HUB_DISABLE_IMPLICIT_TOKEN"] = "1"

np.random.seed(42)

## 2) Mini "protocol knowledge base" (toy)
In a real Civil Protection setting, this would be:
- a repository of SOPs/protocols, emergency plans, legal docs, etc.
- indexed via RAG: chunking â†’ embeddings â†’ vector DB

Here we use **3 official protocols** (bilingual text) plus structured action lists.

In [None]:
# =========================================================
# 2) Protocol database (toy KB)
# =========================================================
PROTOCOLS = [
    {
        "protocol_id": "FLOOD-2023",
        "incident_type": "flood",
        "source": "official",
        "year": 2023,
        "title": "Flood SOP (2023) / Procedura pÃ«r PÃ«rmbytje (2023)",
        "text": (
            "EN: If flood risk is confirmed, the municipality issues official warnings, "
            "activates emergency coordination, and communicates evacuation guidance.\n"
            "SQ: NÃ« rast rreziku pÃ«r pÃ«rmbytje, Bashkia lÃ«shon paralajmÃ«rime zyrtare, "
            "aktivizon koordinimin e emergjencÃ«s dhe komunikon udhÃ«zime pÃ«r evakuim."
        ),
        "actions": [
            "Issue official warning (municipality/civil protection).",
            "Activate emergency coordination center.",
            "Assess affected zones and vulnerable groups.",
            "Prepare evacuation routes and shelters.",
            "Communicate updates at fixed intervals.",
        ],
    },
    {
        "protocol_id": "FIRE-2022",
        "incident_type": "fire",
        "source": "official",
        "year": 2022,
        "title": "Fire SOP (2022) / Procedura pÃ«r Zjarr (2022)",
        "text": (
            "EN: Confirm fire location and spread risk. Dispatch firefighting units, "
            "coordinate with police, and protect critical infrastructure.\n"
            "SQ: Konfirmo lokacionin e zjarrit dhe rrezikun e pÃ«rhapjes. DÃ«rgo njÃ«sitÃ« zjarrfikÃ«se, "
            "koordino me policinÃ« dhe mbro infrastrukturÃ«n kritike."
        ),
        "actions": [
            "Verify fire location, wind conditions, and spread risk.",
            "Dispatch firefighting units and request reinforcements if needed.",
            "Coordinate traffic control and perimeter with police.",
            "Prioritize hospitals/schools/critical infrastructure.",
            "Issue public safety instructions (smoke, evacuation if needed).",
        ],
    },
    {
        "protocol_id": "QUAKE-2021",
        "incident_type": "earthquake",
        "source": "official",
        "year": 2021,
        "title": "Earthquake SOP (2021) / Procedura pÃ«r TÃ«rmet (2021)",
        "text": (
            "EN: After an earthquake, check for building damage, aftershocks, and casualties. "
            "Mobilize search-and-rescue and medical triage.\n"
            "SQ: Pas njÃ« tÃ«rmeti, kontrollo dÃ«met nÃ« ndÃ«rtesa, pasgoditjet dhe tÃ« lÃ«nduarit. "
            "Mobilizo kÃ«rkim-shpÃ«timin dhe triazhin mjekÃ«sor."
        ),
        "actions": [
            "Check for aftershocks and issue safety messaging.",
            "Assess structural damage; restrict access to unsafe buildings.",
            "Mobilize search-and-rescue + medical triage.",
            "Open temporary shelters and reunification points.",
            "Document situation reports every N minutes.",
        ],
    },
]

protocol_df = pd.DataFrame(PROTOCOLS)
protocol_df[["protocol_id","incident_type","source","year","title"]]

## 3) Build a semantic index (FAISS) for protocol retrieval
This is the RAG-style "offline" step for our tiny KB:
- embed each protocol text (multilingual)
- store vectors in FAISS for similarity search

In real systems:
- you would chunk long docs into passages
- use ANN index types (HNSW/IVF/PQ)
- add metadata filtering (source, year, authority, jurisdiction)

In [None]:
# =========================================================
# 3) Embeddings + FAISS index (offline indexing)
# =========================================================
embed_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

# Encode all protocol texts. Normalize -> dot product == cosine similarity.
X = embed_model.encode(protocol_df["text"].tolist(), normalize_embeddings=True).astype("float32")

d = X.shape[1]
index = faiss.IndexFlatIP(d)  # exact search (fine for tiny demo)
index.add(X)

print("Indexed protocols:", len(protocol_df), "| embedding dim:", d)

def retrieve_protocol(query: str, incident_type: Optional[str] = None, k: int = 3) -> List[Dict[str, Any]]:
    """Tool: retrieve relevant protocols (vector search).

    Parameters:
      - query: user report + optional hints
      - incident_type: metadata filter (if known)
      - k: how many results to return

    Returns:
      list of protocol dicts with a similarity score.
    """
    q = embed_model.encode([query], normalize_embeddings=True).astype("float32")
    scores, idxs = index.search(q, k)

    results = []
    for i, s in zip(idxs[0], scores[0]):
        row = protocol_df.iloc[int(i)].to_dict()
        if incident_type is not None and row["incident_type"] != incident_type:
            continue
        row["score"] = float(s)
        results.append(row)

    results.sort(key=lambda r: -r["score"])
    return results

## 4) Additional tools (classification, priority, policy)
Agents become powerful when they can call **tools**.
In critical domains, tools should be:
- deterministic where possible
- constrained (whitelist)
- logged
- safe (policy checks)

Here we implement **simple rule-based tools** to keep the demo runnable without keys.

In [None]:
# =========================================================
# 4) Other tools (safe, deterministic)
# =========================================================
def classify_incident(report_text: str) -> Dict[str, Any]:
    """Tool: classify incident type using simple keyword rules.

    Returns:
      {incident_type: 'flood'|'fire'|'earthquake'|'unknown', confidence: float}
    """
    t = report_text.lower()
    if any(w in t for w in ["flood", "pÃ«rmbyt", "overflow", "inond"]):
        return {"incident_type": "flood", "confidence": 0.8}
    if any(w in t for w in ["fire", "zjarr", "smoke", "flames"]):
        return {"incident_type": "fire", "confidence": 0.8}
    if any(w in t for w in ["earthquake", "tÃ«rmet", "aftershock", "seismic"]):
        return {"incident_type": "earthquake", "confidence": 0.8}
    return {"incident_type": "unknown", "confidence": 0.2}


def assess_priority(report_text: str) -> Dict[str, Any]:
    """Tool: estimate priority (toy heuristic).

    In practice, priority should be based on:
    - number of affected people
    - casualties/injuries
    - critical infrastructure risk
    - time sensitivity and escalation rules
    """
    t = report_text.lower()
    score = 0

    if any(w in t for w in ["many", "shumÃ«", "multiple", "several"]): score += 1
    if any(w in t for w in ["injured", "lÃ«nduar", "casualties"]): score += 2
    if any(w in t for w in ["hospital", "spital", "school", "shkoll"]): score += 2
    if any(w in t for w in ["collapsed", "shemb", "trapped", "bllokuar"]): score += 2
    if any(w in t for w in ["urgent", "immediately", "menjÃ«herÃ«"]): score += 1

    if score >= 4: level = "HIGH"
    elif score >= 2: level = "MEDIUM"
    else: level = "LOW"

    return {"priority": level, "risk_score": score}


def safety_policy_check(proposed_actions: List[str]) -> Dict[str, Any]:
    """Tool: policy/safety check on actions.

    Example rule:
    - actions that notify the public should require a human approval step
    """
    blocked = []
    allowed = []
    for a in proposed_actions:
        if re.search(r"\b(send|broadcast|issue)\b.*\b(alert|warning)\b", a.lower()):
            blocked.append({"action": a, "reason": "Requires human approval to notify public."})
        else:
            allowed.append(a)
    return {"allowed": allowed, "blocked": blocked}

## 5) Agent state + audit logging
We keep everything **JSON-serializable** so it can be stored, inspected, or replayed.

In [None]:
# =========================================================
# 5) Agent state + tool-call audit log
# =========================================================
@dataclass
class AgentState:
    goal: str
    incident_report: str
    incident_type: str = "unknown"
    priority: str = "UNKNOWN"
    evidence: List[Dict[str, Any]] = field(default_factory=list)
    proposed_actions: List[str] = field(default_factory=list)
    approved_actions: List[str] = field(default_factory=list)
    blocked_actions: List[Dict[str, Any]] = field(default_factory=list)
    steps_taken: List[str] = field(default_factory=list)
    done: bool = False
    failure_reason: Optional[str] = None


@dataclass
class ToolCallLog:
    step: int
    tool_name: str
    tool_input: Dict[str, Any]
    tool_output: Dict[str, Any]
    elapsed_ms: float

## 6) Orchestrator (Observe â†’ Plan â†’ Act)
The orchestrator enforces:
- **tool whitelist**
- **max steps**
- **audit logging**

In [None]:
# =========================================================
# 6) Civil Protection Agent (Orchestrator)
# =========================================================
class CivilProtectionAgent:
    def __init__(self, max_steps: int = 8):
        self.max_steps = max_steps
        self.allowed_tools = {
            "classify_incident": classify_incident,
            "assess_priority": assess_priority,
            "retrieve_protocol": retrieve_protocol,
            "safety_policy_check": safety_policy_check,
        }
        self.audit_log: List[ToolCallLog] = []

    def _call_tool(self, step: int, tool_name: str, **kwargs) -> Any:
        if tool_name not in self.allowed_tools:
            raise ValueError(f"Tool not allowed: {tool_name}")
        fn = self.allowed_tools[tool_name]
        t0 = time.time()
        out = fn(**kwargs)
        t1 = time.time()

        out_dict = out if isinstance(out, dict) else {"result": out}
        self.audit_log.append(
            ToolCallLog(
                step=step,
                tool_name=tool_name,
                tool_input=kwargs,
                tool_output=out_dict,
                elapsed_ms=(t1 - t0) * 1000.0,
            )
        )
        return out

    def _plan_next(self, state: AgentState) -> str:
        if state.incident_type == "unknown":
            return "classify"
        if state.priority == "UNKNOWN":
            return "priority"
        if not state.evidence:
            return "retrieve_protocol"
        if not state.proposed_actions:
            return "propose_actions"
        if not state.approved_actions and not state.blocked_actions:
            return "safety_check"
        return "finish"

    def run(self, goal: str, incident_report: str) -> Tuple[AgentState, List[ToolCallLog]]:
        state = AgentState(goal=goal, incident_report=incident_report)

        for step in range(1, self.max_steps + 1):
            if state.done:
                break

            decision = self._plan_next(state)
            state.steps_taken.append(decision)

            if decision == "classify":
                out = self._call_tool(step, "classify_incident", report_text=state.incident_report)
                state.incident_type = out["incident_type"]

            elif decision == "priority":
                out = self._call_tool(step, "assess_priority", report_text=state.incident_report)
                state.priority = out["priority"]

            elif decision == "retrieve_protocol":
                query = f"{state.incident_report}\nIncident type: {state.incident_type}"
                results = self._call_tool(step, "retrieve_protocol", query=query, incident_type=state.incident_type, k=3)
                state.evidence = results if isinstance(results, list) else results.get("result", [])
                if not state.evidence:
                    state.failure_reason = "No protocol found for incident type."
                    state.done = True

            elif decision == "propose_actions":
                best = sorted(state.evidence, key=lambda r: -r.get("score", 0.0))[0]
                base_actions = list(best.get("actions", []))
                if state.priority == "HIGH":
                    base_actions.insert(0, "[PRIORITY=HIGH] Activate immediate coordination and escalation.")
                state.proposed_actions = base_actions

            elif decision == "safety_check":
                out = self._call_tool(step, "safety_policy_check", proposed_actions=state.proposed_actions)
                state.approved_actions = out["allowed"]
                state.blocked_actions = out["blocked"]

            elif decision == "finish":
                state.done = True

            else:
                state.failure_reason = f"Unknown decision: {decision}"
                state.done = True

        if not state.done:
            state.done = True
            state.failure_reason = "Max steps reached (stop condition)."

        return state, self.audit_log

## 7) Audited report formatter

In [None]:
# =========================================================
# 7) Audited report formatting
# =========================================================
def format_audited_report(state: AgentState, audit_log: List[ToolCallLog]) -> str:
    lines = []
    lines.append("=== CIVIL PROTECTION AGENT REPORT (AUDITED) ===")
    lines.append(f"Goal / QÃ«llimi: {state.goal}")
    lines.append(f"Incident report / Raporti: {state.incident_report}")
    lines.append("")
    lines.append(f"Classified incident type / Tip incidenti: {state.incident_type}")
    lines.append(f"Priority / Prioriteti: {state.priority}")
    lines.append("")

    lines.append("Evidence consulted / Evidenca e konsultuar:")
    if state.evidence:
        for ev in state.evidence:
            lines.append(f"- {ev['protocol_id']} ({ev['source']}, {ev['year']}) score={ev.get('score',0):.3f}")
            lines.append(f"  Title: {ev['title']}")
    else:
        lines.append("- None")

    lines.append("")
    lines.append("Proposed actions / Veprime tÃ« propozuara:")
    for a in state.proposed_actions:
        lines.append(f"- {a}")

    lines.append("")
    lines.append("Approved actions / Veprime tÃ« lejuara:")
    for a in state.approved_actions:
        lines.append(f"- {a}")

    if state.blocked_actions:
        lines.append("")
        lines.append("Blocked actions (need human approval) / Veprime tÃ« bllokuara (kÃ«rkojnÃ« aprovimin njerÃ«zor):")
        for b in state.blocked_actions:
            lines.append(f"- {b['action']}  | Reason: {b['reason']}")

    if state.failure_reason:
        lines.append("")
        lines.append(f"Stop reason / Arsye ndalimi: {state.failure_reason}")

    lines.append("")
    lines.append("Audit log (tool calls) / RegjistÃ«r auditimi (thirrje mjetesh):")
    for entry in audit_log:
        lines.append(f"- Step {entry.step}: {entry.tool_name} ({entry.elapsed_ms:.1f} ms)")
        lines.append(f"  input: {json.dumps(entry.tool_input, ensure_ascii=False)}")
        out_str = json.dumps(entry.tool_output, ensure_ascii=False)
        lines.append(f"  output: {out_str[:350]}{'...' if len(out_str) > 350 else ''}")

    return "\n".join(lines)

## 8) Run the agent (example)
Try editing the incident report to test other incident types (fire, earthquake).

In [None]:
# =========================================================
# 8) Run example
# =========================================================
agent = CivilProtectionAgent(max_steps=8)

goal = (
    "Generate an action plan based ONLY on official protocol, with ordered steps and references. / "
    "Gjenero njÃ« plan veprimi bazuar vetÃ«m nÃ« protokoll zyrtar, me hapa tÃ« renditur dhe referenca."
)

incident_report = (
    "Raport: PÃ«rmbytje nÃ« zonÃ«n pranÃ« lumit. Ka disa familje tÃ« bllokuara dhe "
    "raportohet pÃ«r persona tÃ« lÃ«nduar. Nevojitet ndÃ«rhyrje menjÃ«herÃ«. "
    "EN summary: Flood near river; trapped families; injured people; urgent."
)

state, audit = agent.run(goal=goal, incident_report=incident_report)
print(format_audited_report(state, audit))

## 9) Exercises (optional)
1. Add a new protocol (e.g., landslide / rrÃ«shqitje dheu) and test classification + retrieval.
2. Replace the rule-based `classify_incident` with an LLM call (but keep tool whitelisting and audit logs).
3. Add a human approval step: blocked actions become allowed only if user confirms.
4. Add extra tools (weather forecast, maps, contact lists) with the same whitelist + audit pattern.