<a href="https://colab.research.google.com/github/JRK-007/AI-Agents-Google-Hackathon/blob/main/AI_AGENTS_HACKATHON.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
supplychain_eye.py

SupplyChain-Eye: Multi-Agent Logistics Routing & Anomaly Detection Assistant
- Single-file demo agent system
- No API keys required (uses DummyLLMClient). Replace with real LLM client in production.
- Demonstrates: Multi-agent flows, tools (code exec), sessions & memory, context compaction,
  observability, agent evaluation, A2A protocol, pause/resume (checkpointing).

Author: Generated for Kaggle Agents Capstone (Freestyle Track)
"""

import asyncio
import json
import math
import logging
import os
import time
import uuid
from collections import defaultdict, deque
from copy import deepcopy
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

# -----------------------
# Basic configuration
# -----------------------
LOG = logging.getLogger("SupplyChainEye")
LOG.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(trace_id)s %(name)s: %(message)s")
handler.setFormatter(fmt)
LOG.addHandler(handler)

# helper to inject trace_id into logs
def log_with_trace(trace_id, level, msg, **kwargs):
    extra = {"trace_id": trace_id}
    LOG.log(level, msg, extra=extra, **kwargs)


# -----------------------
# Utilities & Tools
# -----------------------

def haversine_km(a: Tuple[float, float], b: Tuple[float, float]) -> float:
    """Haversine distance in kilometers between two (lat, lon)."""
    lat1, lon1 = a
    lat2, lon2 = b
    R = 6371.0
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    x = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    return 2*R*math.asin(math.sqrt(x))


class ToolExecutor:
    """
    Built-in tool for code execution / deterministic models.
    Method calls are synchronous and deterministic; used for distance, ETA, fuel estimates.
    """
    @staticmethod
    def compute_trip_metrics(path: List[Tuple[float, float]], speed_kmh: float = 40.0, stops_minutes: float = 10.0):
        total_km = 0.0
        total_time_h = 0.0
        for i in range(len(path)-1):
            d = haversine_km(path[i], path[i+1])
            total_km += d
            total_time_h += d / speed_kmh
        # add stops
        total_time_h += (len(path)-1) * (stops_minutes/60.0)
        # simplified fuel model (liters per 100 km)
        fuel_efficiency = 8.0  # L/100km baseline
        fuel_liters = total_km * (fuel_efficiency/100.0)
        return {"km": total_km, "time_h": total_time_h, "fuel_liters": fuel_liters}


# -----------------------
# LLM Client Interface
# -----------------------

class LLMClient:
    """
    Abstract LLM client interface. Implement send(context, prompt) -> dict
    Must return dict with keys: 'text' and optionally 'structured' (json).
    """
    async def send(self, context: str, prompt: str) -> Dict[str, Any]:
        raise NotImplementedError()


class DummyLLMClient(LLMClient):
    """Simple deterministic 'LLM' for testing without network."""
    async def send(self, context: str, prompt: str) -> Dict[str, Any]:
        # Very primitive deterministic responses based on prompt cues.
        await asyncio.sleep(0.2)
        if "classify" in prompt.lower() or "detect" in prompt.lower():
            # simulate classification
            return {"text": "anomaly_detected:high_idle_time;recommendation:investigate_loading;confidence:0.90",
                    "structured": {"anomaly": "high_idle_time", "recommendation": "investigate_loading", "confidence": 0.9}}
        if "optimize" in prompt.lower() or "re-route" in prompt.lower():
            return {"text": "suggested:reassign_stop_3_to_truck_2;eta_reduction:0.24;fuel_saving_l:3.2",
                    "structured": {"action":"reassign_stop_3_to_truck_2","eta_reduction_h":0.24,"fuel_saving_l":3.2}}
        # default summarize
        return {"text": "summary: no major issues found;confidence:0.75", "structured": {"summary":"no major issues","confidence":0.75}}


# -----------------------
# Session & Memory
# -----------------------

class InMemorySessionService:
    """
    Simple session management. Sessions can be checkpointed to disk to pause/resume long runs.
    """
    def __init__(self, storage_dir="sessions"):
        self.sessions: Dict[str, Dict[str, Any]] = {}
        os.makedirs(storage_dir, exist_ok=True)
        self.storage_dir = storage_dir

    def create_session(self, session_id: Optional[str] = None):
        session_id = session_id or str(uuid.uuid4())
        self.sessions[session_id] = {"created_at": time.time(), "history": [], "state": {}}
        return session_id

    def get(self, session_id: str):
        return self.sessions.get(session_id)

    def append_history(self, session_id: str, entry: Dict[str, Any]):
        self.sessions[session_id]["history"].append(entry)

    def save(self, session_id: str):
        path = os.path.join(self.storage_dir, f"{session_id}.json")
        with open(path, "w") as f:
            json.dump(self.sessions[session_id], f, default=str)
        return path

    def load(self, session_id: str):
        path = os.path.join(self.storage_dir, f"{session_id}.json")
        if not os.path.exists(path):
            raise FileNotFoundError(path)
        with open(path, "r") as f:
            self.sessions[session_id] = json.load(f)
        return self.sessions[session_id]


class MemoryBank:
    """
    Persistent memory for long term patterns (simple JSON store).
    Stores historical metrics for comparison and compaction.
    """
    def __init__(self, filename="memory_bank.json"):
        self.filename = filename
        if os.path.exists(filename):
            with open(filename, "r") as f:
                self.store = json.load(f)
        else:
            self.store = {"runs": []}

    def record_run(self, metadata: Dict[str, Any]):
        self.store["runs"].append(metadata)
        with open(self.filename, "w") as f:
            json.dump(self.store, f, default=str)

    def compact_context(self, last_n=5):
        # returns a compact summary of last N runs
        runs = self.store.get("runs", [])
        recent = runs[-last_n:]
        # do a simple aggregation
        if not recent:
            return "No prior runs"
        avg_eta = sum(r.get("total_time_h", 0) for r in recent)/len(recent)
        avg_fuel = sum(r.get("fuel_liters", 0) for r in recent)/len(recent)
        return f"Recent {len(recent)} runs: avg_eta_h={avg_eta:.2f}, avg_fuel_l={avg_fuel:.2f}"


# -----------------------
# A2A Protocol
# -----------------------
def make_message(sender: str, recipient: str, intent: str, payload: Dict[str, Any], trace_id: Optional[str] = None):
    return {
        "message_id": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "trace_id": trace_id or str(uuid.uuid4()),
        "sender": sender,
        "recipient": recipient,
        "intent": intent,
        "payload": payload
    }


# -----------------------
# Observability / Metrics
# -----------------------

class Observability:
    def __init__(self):
        self.metrics = defaultdict(int)
        self.logs = []

    def incr(self, metric_name: str, amount: int = 1):
        self.metrics[metric_name] += amount

    def record_log(self, trace_id: str, event: str, details: Dict[str, Any]):
        self.logs.append({"trace_id": trace_id, "event": event, "details": details, "time": datetime.utcnow().isoformat()})
        log_with_trace(trace_id, logging.INFO, f"Event={event} details={details}")


# -----------------------
# Base Agent
# -----------------------

class Agent:
    def __init__(self, name: str, llm: LLMClient, session_service: InMemorySessionService, memory: MemoryBank, obs: Observability):
        self.name = name
        self.llm = llm
        self.session = session_service
        self.memory = memory
        self.obs = obs
        self.inbox = asyncio.Queue()
        self.running = False

    async def send(self, msg: Dict[str, Any]):
        await self.inbox.put(msg)

    async def handle_message(self, msg: Dict[str, Any]):
        """Override"""
        raise NotImplementedError()

    async def run(self):
        self.running = True
        while self.running:
            try:
                msg = await asyncio.wait_for(self.inbox.get(), timeout=1.0)
            except asyncio.TimeoutError:
                # loop agent behavior can be implemented by subclass
                await asyncio.sleep(0.05)
                continue
            trace_id = msg.get("trace_id", str(uuid.uuid4()))
            self.obs.record_log(trace_id, f"{self.name}.received", {"msg_id": msg.get("message_id"), "intent": msg.get("intent")})
            try:
                await self.handle_message(msg)
            except Exception as e:
                log_with_trace(trace_id, logging.ERROR, f"Agent {self.name} failed: {e}")


# -----------------------
# Agents Implementation
# -----------------------

class DataIntakeAgent(Agent):
    """
    Ingests shipments/trips: accepts a payload like:
    {
      "shipments": [
        {"id":"s1","path":[(lat,lon),(lat,lon),...],"vehicle":"truckA","timestamp":"..."},
        ...
      ],
      "constraints": {...}
    }
    Produces a standardized session state and emits to RouteAnalyzer
    """
    async def handle_message(self, msg: Dict[str, Any]):
        trace_id = msg["trace_id"]
        payload = msg["payload"]
        session_id = payload.get("session_id") or self.session.create_session()
        self.obs.incr("data_intake_msgs")
        # Standardize the input
        shipments = payload.get("shipments", [])
        standardized = []
        for s in shipments:
            standard = {
                "id": s["id"],
                "path": s.get("path"),
                "vehicle": s.get("vehicle", "truck"),
                "speed_kmh": s.get("speed_kmh", 40.0),
                "stops_minutes": s.get("stops_minutes", 10.0)
            }
            standardized.append(standard)
        session_state = {"session_id": session_id, "shipments": standardized, "created": time.time()}
        self.session.sessions[session_id]["state"].update(session_state)
        self.session.append_history(session_id, {"actor": self.name, "event": "intake", "data": standardized, "time": time.time()})
        self.obs.record_log(trace_id, f"{self.name}.standardized", {"session_id": session_id, "shipments_count": len(standardized)})
        # A2A to RouteAnalyzer
        msg2 = make_message(self.name, "RouteAnalyzer", "analyze_routes", {"session_id": session_id}, trace_id=trace_id)
        await coordinator.send_to_agent("RouteAnalyzer", msg2)


class RouteAnalyzerAgent(Agent):
    """
    Calculates baseline metrics and detects anomalies using the LLM as a classifier+explainability tool.
    """
    async def handle_message(self, msg: Dict[str, Any]):
        trace_id = msg["trace_id"]
        session_id = msg["payload"]["session_id"]
        self.obs.incr("analyze_requests")
        session = self.session.get(session_id)
        shipments = session["state"]["shipments"]
        analysis = []
        for s in shipments:
            metrics = ToolExecutor.compute_trip_metrics(s["path"], s["speed_kmh"], s["stops_minutes"])
            # Simple anomaly heuristics
            anomaly = None
            if metrics["time_h"] > (len(s["path"]) * 0.1 + 1.5):  # arbitrary heuristic
                anomaly = "high_total_time"
            if metrics["fuel_liters"] > 50:
                anomaly = "high_fuel"
            # Ask LLM for a classification/explanation (simulated)
            prompt = f"Classify trip {s['id']} metrics={metrics}, heuristics anomaly={anomaly}. Provide anomaly label and recommendation."
            llm_out = await self.llm.send(self.memory.compact_context(), prompt)
            structured = llm_out.get("structured", {})
            analysis.append({"id": s["id"], "metrics": metrics, "heuristic_anomaly": anomaly, "llm": structured})
            self.obs.record_log(trace_id, f"{self.name}.trip_analysis", {"id": s["id"], "metrics": metrics, "llm": structured})
        # store analysis
        session["state"]["analysis"] = analysis
        self.session.append_history(session_id, {"actor": self.name, "event": "analysis", "data": analysis, "time": time.time()})
        # Send to Optimizer
        msg2 = make_message(self.name, "Optimizer", "optimize", {"session_id": session_id}, trace_id=trace_id)
        await coordinator.send_to_agent("Optimizer", msg2)


class OptimizerAgent(Agent):
    """
    Suggests optimizations: reassign stops, suggest route variants, perform fuel-aware tradeoffs.
    Implements sequential & parallel behaviors (parallel evaluation of candidate plans).
    """
    async def handle_message(self, msg: Dict[str, Any]):
        trace_id = msg["trace_id"]
        session_id = msg["payload"]["session_id"]
        self.obs.incr("optimize_requests")
        session = self.session.get(session_id)
        analysis = session["state"].get("analysis", [])
        # Build candidate optimizations: simple permutations / reassignments
        candidates = []
        # Candidate 0: do nothing (baseline)
        baseline = {"name":"baseline", "changes":[]}
        candidates.append(baseline)
        # Candidate 1: if any high_idle_time anomaly -> reassign stop to closer vehicle
        for a in analysis:
            if a["heuristic_anomaly"] or (a["llm"].get("anomaly") is not None):
                candidates.append({"name": f"fix_{a['id']}_reassign", "changes":[{"action":"reassign","trip":a["id"]}]})
        # Evaluate candidates in parallel
        results = await asyncio.gather(*[self.evaluate_candidate(session, c, trace_id) for c in candidates])
        # choose best (by eta reduction and fuel saving)
        best = max(results, key=lambda r: (r.get("eta_reduction_h",0) + (r.get("fuel_saving_l",0)/100.0)))
        session["state"]["optimization"] = {"candidates": results, "best": best}
        self.session.append_history(session_id, {"actor": self.name, "event": "optimization", "data": session["state"]["optimization"], "time": time.time()})
        self.obs.record_log(trace_id, f"{self.name}.best", {"best": best})
        # send to Reporter
        msg2 = make_message(self.name, "Reporter", "report", {"session_id": session_id}, trace_id=trace_id)
        await coordinator.send_to_agent("Reporter", msg2)

    async def evaluate_candidate(self, session: Dict[str, Any], candidate: Dict[str, Any], trace_id: str):
        # Simulate candidate evaluation. For real deployment plug in route engine / maps API.
        await asyncio.sleep(0.2)
        shipments = session["state"]["shipments"]
        # baseline totals
        total_km = sum(ToolExecutor.compute_trip_metrics(s["path"], s["speed_kmh"], s["stops_minutes"])["km"] for s in shipments)
        total_time_h = sum(ToolExecutor.compute_trip_metrics(s["path"], s["speed_kmh"], s["stops_minutes"])["time_h"] for s in shipments)
        total_fuel_l = sum(ToolExecutor.compute_trip_metrics(s["path"], s["speed_kmh"], s["stops_minutes"])["fuel_liters"] for s in shipments)
        # apply naive "improvement" if candidate has reassign: reduce time by 10% and fuel by 5%
        eta_reduction_h = 0.0
        fuel_saving_l = 0.0
        if candidate["name"] != "baseline":
            eta_reduction_h = total_time_h * 0.10
            fuel_saving_l = total_fuel_l * 0.05
        else:
            eta_reduction_h = 0.0
            fuel_saving_l = 0.0
        # ask LLM for explanation of candidate (simulated)
        prompt = f"Evaluate candidate {candidate['name']} with naive metrics: eta_reduction={eta_reduction_h}, fuel_saving={fuel_saving_l}."
        llm_out = await self.llm.send(self.memory.compact_context(), prompt)
        structured = llm_out.get("structured", {})
        res = {"candidate": candidate, "eta_reduction_h": eta_reduction_h, "fuel_saving_l": fuel_saving_l, "llm": structured}
        self.obs.record_log(trace_id, f"{self.name}.candidate_eval", {"candidate": candidate["name"], "eta_reduction_h": eta_reduction_h})
        return res


class ReporterAgent(Agent):
    """
    Generates the final report and evaluation metrics.
    """
    async def handle_message(self, msg: Dict[str, Any]):
        trace_id = msg["trace_id"]
        session_id = msg["payload"]["session_id"]
        session = self.session.get(session_id)
        optimization = session["state"].get("optimization", {})
        analysis = session["state"].get("analysis", [])
        # Build human readable report
        best = optimization.get("best", {})
        summary = {
            "session_id": session_id,
            "timestamp": datetime.utcnow().isoformat(),
            "total_trips": len(session["state"].get("shipments", [])),
            "best_plan": best,
            "analysis": analysis
        }
        # persist to memory bank
        # compute totals for memory
        total_time_h = sum(a["metrics"]["time_h"] for a in analysis)
        total_fuel_l = sum(a["metrics"]["fuel_liters"] for a in analysis)
        metadata = {"session_id": session_id, "total_time_h": total_time_h, "fuel_liters": total_fuel_l, "best": best}
        self.memory.record_run(metadata)
        self.session.append_history(session_id, {"actor": self.name, "event": "report", "data": summary, "time": time.time()})
        self.obs.record_log(trace_id, f"{self.name}.final_report", {"session_id": session_id, "summary": {"total_time_h": total_time_h, "fuel_liters": total_fuel_l}})
        # print/send report to UI / return in message bus
        print("\n=== SupplyChain-Eye Report ===")
        print(json.dumps(summary, indent=2, default=str))
        print("=== End Report ===\n")


# -----------------------
# Coordinator (runs agents)
# -----------------------

class Coordinator:
    """
    Manages agents, message passing, pause/resume, and top-level orchestration.
    """
    def __init__(self):
        self.session_service = InMemorySessionService()
        self.memory = MemoryBank()
        self.obs = Observability()
        self.llm = DummyLLMClient()  # replace with real implementation
        # instantiate agents
        self.agents: Dict[str, Agent] = {}
        self.loop = asyncio.get_event_loop()
        self.tasks = []

    def setup_agents(self):
        self.agents = {
            "DataIntake": DataIntakeAgent("DataIntake", self.llm, self.session_service, self.memory, self.obs),
            "RouteAnalyzer": RouteAnalyzerAgent("RouteAnalyzer", self.llm, self.session_service, self.memory, self.obs),
            "Optimizer": OptimizerAgent("Optimizer", self.llm, self.session_service, self.memory, self.obs),
            "Reporter": ReporterAgent("Reporter", self.llm, self.session_service, self.memory, self.obs),
        }

    async def start(self):
        # start all agent run loops
        for name, agent in self.agents.items():
            t = asyncio.create_task(agent.run())
            self.tasks.append(t)
        self.obs.record_log("coordinator", "started", {"agents": list(self.agents.keys())})

    async def stop(self):
        for a in self.agents.values():
            a.running = False
        await asyncio.sleep(0.3)
        for t in self.tasks:
            t.cancel()
        self.obs.record_log("coordinator", "stopped", {})

    async def send_to_agent(self, agent_name: str, msg: Dict[str, Any]):
        if agent_name not in self.agents:
            raise ValueError(f"{agent_name} not registered")
        await self.agents[agent_name].send(msg)
        self.obs.incr("messages_sent")

    def persist_checkpoint(self, session_id: str):
        # Save session and memory
        session_path = self.session_service.save(session_id)
        memory_path = self.memory.filename
        self.obs.record_log(session_id, "checkpoint_saved", {"session_path": session_path, "memory_path": memory_path})
        return {"session_path": session_path, "memory_path": memory_path}

    def resume_from_checkpoint(self, session_id: str):
        self.session_service.load(session_id)
        self.obs.record_log(session_id, "checkpoint_loaded", {"session_id": session_id})

    # convenience - send initial dataset
    async def ingest(self, payload: Dict[str, Any]):
        trace_id = str(uuid.uuid4())
        msg = make_message("User", "DataIntake", "ingest", payload, trace_id=trace_id)
        await self.send_to_agent("DataIntake", msg)
        return trace_id


# global coordinator for ease of A2A
coordinator = Coordinator()
coordinator.setup_agents()

# -----------------------
# Agent Evaluation Module
# -----------------------

def evaluate_session_metrics(memory: MemoryBank, last_n: int = 5) -> Dict[str, Any]:
    """Simple evaluator returning compacted statistics for judges."""
    ctx = memory.compact_context(last_n=last_n)
    return {"compact": ctx, "runs_stored": len(memory.store.get("runs", []))}


# -----------------------
# Demo / Example usage
# -----------------------

SAMPLE_PAYLOAD = {
    "shipments": [
        {"id":"S1", "path":[(12.9716,77.5946),(12.9352,77.6245),(12.9081,77.6476)], "vehicle":"truckA", "speed_kmh":40.0, "stops_minutes":12},
        {"id":"S2", "path":[(12.9716,77.5946),(13.0358,77.5970),(13.0500,77.6000)], "vehicle":"truckB", "speed_kmh":45.0, "stops_minutes":8}
    ],
    "constraints": {"max_fuel_l": 100}
}


async def demo_run():
    await coordinator.start()
    trace = await coordinator.ingest({"shipments": SAMPLE_PAYLOAD["shipments"]})
    # allow some time for pipeline to process
    await asyncio.sleep(2.0)
    # checkpoint session for pause/resume demo
    # get session id from last created in service
    last_session_id = list(coordinator.session_service.sessions.keys())[-1]
    cp = coordinator.persist_checkpoint(last_session_id)
    print("Checkpoint saved:", cp)
    # run evaluator
    eval_res = evaluate_session_metrics(coordinator.memory, last_n=3)
    print("Evaluation compact:", eval_res)
    # stop agents
    await coordinator.stop()


if __name__ == "__main__":
    try:
        asyncio.run(demo_run())
    except KeyboardInterrupt:
        print("Interrupted")
