# FirstLight LLM Control Plane -- MAIA Analytics Demo

**Geospatial Event Intelligence, steered by AI.**

---

FirstLight converts **(area, time window, event type)** into actionable decision products
-- flood extent maps, damage assessments, infrastructure impact reports -- using satellite imagery
and contextual data.

What makes FirstLight different from a traditional geospatial pipeline is its **LLM Control Plane**:
an API surface purpose-built for AI agents to **discover**, **submit**, **steer**, and **audit**
analysis jobs without human intervention.

This notebook walks through the full platform capability using a **Houston, TX flood detection**
scenario.

### What We Will Cover

| Section | Capability | Why It Matters |
|---------|-----------|----------------|
| 1 | Setup & Health Check | Verify the deployment is live |
| 2 | LLM Router (Tool Discovery) | AI agents discover algorithms at runtime -- no hardcoded knowledge |
| 3 | Job Creation | Submit analysis with AOI geometry + LLM reasoning |
| 4 | Pipeline Phase Transitions | Atomic state machine with TOCTOU concurrency guards |
| 5 | LLM Reasoning Injection | AI records its chain of thought with confidence scores |
| 6 | Parameter Tuning | Adjust algorithms mid-flight via JSON merge-patch |
| 7 | Escalation Workflow | Human-in-the-loop when AI confidence is low |
| 8 | Context Lakehouse | Spatial queries across accumulated satellite data, buildings, infrastructure, weather |
| 9 | Partner Integration (Metrics & Queue) | Pipeline health dashboard data |
| 10 | SSE Event Stream | Real-time event delivery with CloudEvents v1.0 |
| 11 | Standards: OGC & STAC | Interoperability with GIS tools and open geospatial standards |
| 12 | Summary | Architecture recap and endpoint map |

### API Surfaces

| Prefix | Audience | Purpose |
|--------|----------|--------|
| `/control/v1/*` | LLM Agents (MAIA) | Job lifecycle, reasoning, escalations, tool discovery |
| `/internal/v1/*` | Partner Backend | SSE events, webhooks, metrics, queue status |
| `/oapi/*` | GIS Tools | OGC API Processes -- standards-compliant interface |
| `/stac/*` | Any Client | STAC Catalog -- discover published analysis results |

---

## 1. Setup & Configuration

Configure the API base URL, authentication, and helper functions.

Authentication uses the **`X-API-Key`** header -- a simple, stateless key-based auth
that maps to a tenant context on the server side. Each API key is scoped to a single
customer, ensuring multi-tenant isolation at the middleware layer.

In [None]:
import json
import requests
from datetime import datetime, timezone
from IPython.display import display, Markdown, HTML

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
BASE_URL = "http://localhost:8000"
API_KEY = "demo-18254ee7d18f5926"

HEADERS = {
    "X-API-Key": API_KEY,
    "Content-Type": "application/json",
}

# ---------------------------------------------------------------------------
# Helper functions
# ---------------------------------------------------------------------------

def api_get(path, params=None):
    """Authenticated GET request. Returns (status_code, parsed_json_or_text)."""
    url = f"{BASE_URL}{path}"
    try:
        resp = requests.get(url, headers=HEADERS, params=params, timeout=30)
        try:
            return resp.status_code, resp.json()
        except ValueError:
            return resp.status_code, resp.text
    except requests.exceptions.ConnectionError as e:
        print(f"ERROR: Cannot connect to {url}")
        print(f"Is the FirstLight API running? Start with: uvicorn api.main:app")
        return 0, None


def api_post(path, body=None):
    """Authenticated POST request."""
    url = f"{BASE_URL}{path}"
    try:
        resp = requests.post(url, headers=HEADERS, json=body, timeout=30)
        try:
            return resp.status_code, resp.json()
        except ValueError:
            return resp.status_code, resp.text
    except requests.exceptions.ConnectionError as e:
        print(f"ERROR: Cannot connect to {url}")
        return 0, None


def api_patch(path, body=None):
    """Authenticated PATCH request."""
    url = f"{BASE_URL}{path}"
    try:
        resp = requests.patch(url, headers=HEADERS, json=body, timeout=30)
        try:
            return resp.status_code, resp.json()
        except ValueError:
            return resp.status_code, resp.text
    except requests.exceptions.ConnectionError as e:
        print(f"ERROR: Cannot connect to {url}")
        return 0, None


def api_delete(path):
    """Authenticated DELETE request."""
    url = f"{BASE_URL}{path}"
    try:
        resp = requests.delete(url, headers=HEADERS, timeout=30)
        if resp.status_code == 204:
            return 204, None
        try:
            return resp.status_code, resp.json()
        except ValueError:
            return resp.status_code, resp.text
    except requests.exceptions.ConnectionError as e:
        print(f"ERROR: Cannot connect to {url}")
        return 0, None


def pp(data):
    """Pretty-print JSON data."""
    if data is not None:
        print(json.dumps(data, indent=2, default=str))
    else:
        print("(no data)")


print(f"Target API:  {BASE_URL}")
print(f"API Key:     {API_KEY[:12]}...")
print(f"Timestamp:   {datetime.now(timezone.utc).isoformat()}")
print("\nReady.")

### Health Check

Verify the API is reachable and responsive.

In [None]:
status_code, health = api_get("/api/v1/health")
print(f"HTTP {status_code}")
pp(health)

---

## 2. LLM Router: Tool Discovery

**`GET /control/v1/tools`**

This is where the LLM Control Plane starts. Before an AI agent can submit a job, it needs to
know **what analysis algorithms are available** and **what parameters they accept**.

FirstLight returns tool schemas in **OpenAI function-calling format** -- the same JSON schema
structure used by GPT-4, Claude, Gemini, and other LLMs for native tool use. This means MAIA
can discover FirstLight's capabilities at runtime without any hardcoded knowledge of the platform.

> **Why This Matters:** Traditional geospatial platforms require custom integrations per algorithm.
> With the LLM Router, adding a new algorithm to FirstLight automatically makes it available
> to every connected AI agent -- zero integration work on the partner side.

In [None]:
status_code, tools_response = api_get("/control/v1/tools")
print(f"HTTP {status_code}\n")

if status_code == 200 and tools_response:
    tools = tools_response.get("tools", [])
    print(f"Discovered {len(tools)} tool(s):\n")
    
    for tool in tools:
        print(f"  Name:        {tool.get('name')}")
        print(f"  Description: {tool.get('description', '')[:120]}")
        params = tool.get('parameters', {})
        prop_names = list(params.get('properties', {}).keys())
        if prop_names:
            print(f"  Parameters:  {', '.join(prop_names)}")
        print()
else:
    print("Could not retrieve tools.")
    pp(tools_response)

### How an LLM Uses These Schemas

Each tool schema follows the OpenAI function-calling convention. Here is how it plugs into
an LLM's tool-use system:

In [None]:
# Show the full schema of the first tool as an example
if tools_response and tools_response.get("tools"):
    example_tool = tools_response["tools"][0]
    
    print("=" * 70)
    print("EXAMPLE: Full tool schema (OpenAI function-calling format)")
    print("=" * 70)
    pp(example_tool)
    
    print("\n" + "=" * 70)
    print("How this would appear in an LLM API call:")
    print("=" * 70)
    
    # Show how this maps to the OpenAI tools parameter
    openai_format = {
        "type": "function",
        "function": example_tool
    }
    print("\n# In an OpenAI/Claude API call, this tool schema becomes:")
    pp(openai_format)
    
    print("\n# The LLM would call it like:")
    print(f'# tool_call(name="{example_tool["name"]}", arguments={{...}})')
else:
    print("No tools available to display.")

---

## 3. Create an Analysis Job

**`POST /control/v1/jobs`**

### Scenario: Houston Flood Detection

MAIA's NLP pipeline has detected flood-related NOAA weather alerts in the Houston, TX area.
Multiple river gauges along Buffalo Bayou are reporting above-flood-stage water levels.

The AI agent creates a flood detection job covering the **Houston Ship Channel / Buffalo Bayou**
area -- approximately 25 km2 of historically flood-prone terrain.

The AOI (Area of Interest) is a GeoJSON Polygon in **WGS84 (EPSG:4326)**.

> **Why This Matters:** The job creation request carries not just the technical parameters,
> but also the AI's **reasoning** for initiating the analysis. This creates an audit trail
> from the very first decision -- why did the AI think a flood analysis was needed?

In [None]:
# Houston Ship Channel / Buffalo Bayou AOI
# WGS84 (EPSG:4326), ~25 km2 of historically flood-prone area
HOUSTON_AOI = {
    "type": "Polygon",
    "coordinates": [[
        [-95.3698, 29.7604],   # NW corner -- near downtown Houston
        [-95.2900, 29.7604],   # NE corner -- east of Ship Channel
        [-95.2900, 29.7100],   # SE corner -- south of Turning Basin
        [-95.3698, 29.7100],   # SW corner
        [-95.3698, 29.7604],   # close the ring
    ]],
}

create_body = {
    "event_type": "flood",
    "aoi": HOUSTON_AOI,
    "parameters": {
        "sensitivity": "medium",
        "min_area_km2": 0.1,
        "include_sar": True,
    },
    "reasoning": (
        "Detected potential flood event in Houston area based on "
        "NOAA Weather Alert WEA-2026-0451. Multiple river gauges "
        "reporting above-flood-stage levels along Buffalo Bayou. "
        "Initiating SAR and optical flood detection analysis."
    ),
}

print("Request body:")
pp(create_body)

In [None]:
status_code, job_response = api_post("/control/v1/jobs", create_body)
print(f"HTTP {status_code}\n")
pp(job_response)

# Extract the job_id for use in subsequent calls
JOB_ID = None
if status_code == 201 and job_response:
    JOB_ID = job_response.get("job_id")
    print(f"\nJob ID: {JOB_ID}")
    print(f"Phase:  {job_response.get('phase')}")
    print(f"Status: {job_response.get('status')}")
else:
    print("\nJob creation failed. Check API connectivity and authentication.")

### Retrieve Full Job Detail

**`GET /control/v1/jobs/{job_id}`**

The detail view includes the stored AOI geometry, computed area in km2, current parameters,
and hypermedia links to related resources (events, checkpoints).

In [None]:
if JOB_ID:
    status_code, detail = api_get(f"/control/v1/jobs/{JOB_ID}")
    print(f"HTTP {status_code}\n")
    pp(detail)
else:
    print("No job ID available -- skipping.")

---

## 4. Pipeline Phase Transitions

**`POST /control/v1/jobs/{job_id}/transition`**

A job moves through seven phases:

```
QUEUED --> DISCOVERING --> INGESTING --> NORMALIZING --> ANALYZING --> REPORTING --> COMPLETE
```

Each transition is **atomic** and protected by a **TOCTOU (Time-of-Check-Time-of-Use) guard**.
The caller must specify the **expected** current state and the **target** state. If another actor
(another AI agent, a human operator, or the pipeline itself) has already moved the job, the
transition fails with **HTTP 409 Conflict** instead of silently corrupting state.

```json
{
  "expected_phase": "QUEUED",
  "expected_status": "PENDING",
  "target_phase": "QUEUED",
  "target_status": "VALIDATING",
  "reason": "Starting input validation"
}
```

> **Why This Matters:** In a multi-agent system, multiple AI agents might try to advance the
> same job simultaneously. The TOCTOU guard ensures exactly-once state transitions -- the
> foundation for reliable distributed pipelines.

In [None]:
# Define the full transition path from QUEUED/PENDING through to ANALYZING/ANALYZING
# We will complete the pipeline in a later section after reasoning and escalation.

TRANSITIONS = [
    {
        "expected_phase": "QUEUED", "expected_status": "PENDING",
        "target_phase": "QUEUED", "target_status": "VALIDATING",
        "reason": "Validating input parameters and AOI geometry",
    },
    {
        "expected_phase": "QUEUED", "expected_status": "VALIDATING",
        "target_phase": "QUEUED", "target_status": "VALIDATED",
        "reason": "AOI is a valid WGS84 polygon with reasonable bounds",
    },
    {
        "expected_phase": "QUEUED", "expected_status": "VALIDATED",
        "target_phase": "DISCOVERING", "target_status": "DISCOVERING",
        "reason": "Searching satellite data catalogs (STAC, Copernicus, USGS)",
    },
    {
        "expected_phase": "DISCOVERING", "expected_status": "DISCOVERING",
        "target_phase": "DISCOVERING", "target_status": "DISCOVERED",
        "reason": "Found 3 Sentinel-1 GRD and 2 Sentinel-2 L2A scenes",
    },
    {
        "expected_phase": "DISCOVERING", "expected_status": "DISCOVERED",
        "target_phase": "INGESTING", "target_status": "INGESTING",
        "reason": "Downloading and staging satellite imagery from AWS S3",
    },
    {
        "expected_phase": "INGESTING", "expected_status": "INGESTING",
        "target_phase": "INGESTING", "target_status": "INGESTED",
        "reason": "5 scenes staged, checksums verified (2.3 GB total)",
    },
    {
        "expected_phase": "INGESTING", "expected_status": "INGESTED",
        "target_phase": "NORMALIZING", "target_status": "NORMALIZING",
        "reason": "Band alignment, CRS reprojection to UTM 15N (EPSG:32615)",
    },
    {
        "expected_phase": "NORMALIZING", "expected_status": "NORMALIZING",
        "target_phase": "NORMALIZING", "target_status": "NORMALIZED",
        "reason": "All scenes co-registered, resampled to 10m, radiometrically calibrated",
    },
    {
        "expected_phase": "NORMALIZING", "expected_status": "NORMALIZED",
        "target_phase": "ANALYZING", "target_status": "ANALYZING",
        "reason": "Running flood detection: SAR coherence + NDWI + ML classifier",
    },
]

print(f"Walking through {len(TRANSITIONS)} transitions...\n")

if JOB_ID:
    for i, t in enumerate(TRANSITIONS, 1):
        sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/transition", t)
        phase = body.get("phase", "?") if isinstance(body, dict) else "?"
        status_val = body.get("status", "?") if isinstance(body, dict) else "?"
        
        marker = "OK" if 200 <= sc < 300 else f"ERR {sc}"
        print(f"  [{marker}] {i:2d}. {t['expected_phase']}/{t['expected_status']} "
              f"--> {phase}/{status_val}")
        print(f"       Reason: {t['reason']}")
    
    print(f"\nJob is now at: {phase}/{status_val}")
else:
    print("No job ID available -- skipping transitions.")

---

## 5. LLM Reasoning Injection

**`POST /control/v1/jobs/{job_id}/reasoning`**

At any point during the job lifecycle, an AI agent can inject its **analytical reasoning** into
the event log. Each reasoning entry includes:

- **reasoning** -- Free-text chain of thought (up to 64 KB)
- **confidence** -- Numeric score (0.0 to 1.0)
- **payload** -- Structured data supporting the reasoning

These entries become part of the immutable event log alongside state transitions, creating
a complete audit trail of both **algorithmic results** and **AI interpretation**.

> **Why This Matters:** When a flood map goes to an emergency manager, they need to know
> not just *what* the algorithm found, but *why* the AI decided this was significant.
> Reasoning entries close that gap.

In [None]:
# Reasoning 1: SAR coherence analysis interpretation

reasoning_sar = {
    "reasoning": (
        "SAR coherence analysis indicates significant ground displacement in grid "
        "cells 4-7, consistent with surface water accumulation. Coherence drop from "
        "0.85 to 0.23 in the Buffalo Bayou corridor. Cross-referencing with USGS "
        "gauge station 08074000 confirms water levels 4.2 ft above flood stage. "
        "Confidence: HIGH. Recommending focused analysis on eastern quadrant where "
        "displacement is greatest."
    ),
    "confidence": 0.85,
    "payload": {
        "affected_grid_cells": [4, 5, 6, 7],
        "coherence_drop": {"from": 0.85, "to": 0.23},
        "gauge_station": "USGS-08074000",
        "water_level_above_flood_stage_ft": 4.2,
    },
}

if JOB_ID:
    sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/reasoning", reasoning_sar)
    print(f"HTTP {sc}")
    pp(body)
else:
    print("No job ID available.")

In [None]:
# Transition to QUALITY_CHECK substatus

if JOB_ID:
    sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/transition", {
        "expected_phase": "ANALYZING",
        "expected_status": "ANALYZING",
        "target_phase": "ANALYZING",
        "target_status": "QUALITY_CHECK",
        "reason": "Running QA checks on detection output",
    })
    print(f"HTTP {sc} -- Transitioned to ANALYZING/QUALITY_CHECK")
    pp(body)

---

## 6. Parameter Tuning Mid-Flight

**`PATCH /control/v1/jobs/{job_id}/parameters`**

Based on intermediate results, the AI agent can **adjust algorithm parameters without restarting
the job**. This uses **JSON merge-patch** semantics (RFC 7386):

- Send only the keys you want to change
- Set a key to `null` to remove it
- Unmentioned keys are preserved

Here, MAIA increases sensitivity and lowers the change threshold after observing strong
SAR coherence signals in the eastern quadrant.

> **Why This Matters:** Traditional pipelines require resubmission to change parameters.
> Mid-flight tuning lets the AI react to intermediate results in real time -- adjusting
> the analysis as evidence accumulates, not after the fact.

In [None]:
parameter_patch = {
    "sensitivity": "high",
    "min_change_threshold": 0.15,
    "focus_quadrant": "NE",
}

print("Patch body (only changed keys):")
pp(parameter_patch)

if JOB_ID:
    sc, body = api_patch(f"/control/v1/jobs/{JOB_ID}/parameters", parameter_patch)
    print(f"\nHTTP {sc}")
    print("\nResulting merged parameters:")
    pp(body)
else:
    print("\nNo job ID available.")

---

## 7. Escalation Workflow

**`POST /control/v1/jobs/{job_id}/escalations`** -- Create escalation  
**`PATCH /control/v1/jobs/{job_id}/escalations/{eid}`** -- Resolve escalation

When the AI encounters something **outside its confidence threshold**, it escalates to a
human operator. Escalations have:

- **severity** -- `LOW`, `MEDIUM`, `HIGH`, `CRITICAL`
- **reason** -- Why the AI is uncertain
- **context** -- Structured data to help the human reviewer

The job **continues processing** while awaiting human input -- escalation is not a stop signal,
it is a parallel review request.

### Scenario

MAIA detects an anomalous SAR reflectance pattern in grid cell 6 that could indicate either:
- **(a)** Genuine flood extent beyond historical norms, or
- **(b)** A SAR sensor artifact from wind-roughened water surface

Confidence is 0.62 -- below the 0.75 autonomous reporting threshold.

> **Why This Matters:** This is the **human-in-the-loop** pattern done right. The AI does not
> stop and wait -- it flags the issue, provides structured context, and lets the human resolve
> it when ready. The full escalation lifecycle is recorded in the audit trail.

In [None]:
# Create the escalation

escalation_body = {
    "severity": "HIGH",
    "reason": (
        "Detected anomalous reflectance pattern in grid cell 6 that could indicate "
        "either (a) genuine flash flood extent beyond historical norms or (b) SAR "
        "sensor artifact from wind-roughened water surface. Confidence in flood "
        "classification: 0.62 -- below the 0.75 threshold for autonomous reporting. "
        "Requesting human review of SAR backscatter imagery before finalizing results."
    ),
    "context": {
        "grid_cell": 6,
        "confidence": 0.62,
        "threshold": 0.75,
        "possible_causes": [
            "genuine_flood_extent",
            "wind_roughened_surface_artifact",
        ],
        "recommended_action": "Review SAR backscatter in QuickLook viewer",
    },
}

ESCALATION_ID = None

if JOB_ID:
    sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/escalations", escalation_body)
    print(f"HTTP {sc}")
    pp(body)
    
    if isinstance(body, dict):
        ESCALATION_ID = body.get("escalation_id")
        print(f"\nEscalation ID: {ESCALATION_ID}")
        print(f"Severity:      {body.get('severity')}")
else:
    print("No job ID available.")

In [None]:
# Resolve the escalation -- human operator confirms genuine flood extent

resolve_body = {
    "resolution": (
        "Reviewed SAR backscatter in QuickLook viewer. Pattern in grid cell 6 is "
        "consistent with genuine flood extent -- the irregular boundary matches the "
        "Greens Bayou overflow pattern from historical events. Wind artifact ruled out "
        "based on VV/VH polarization ratio analysis. Classification confidence upgraded "
        "to 0.91. Approved for reporting."
    ),
}

if JOB_ID and ESCALATION_ID:
    sc, body = api_patch(
        f"/control/v1/jobs/{JOB_ID}/escalations/{ESCALATION_ID}",
        resolve_body,
    )
    print(f"HTTP {sc}")
    pp(body)
    
    if isinstance(body, dict) and body.get("resolved_at"):
        print(f"\nEscalation resolved at: {body['resolved_at']}")
else:
    print("No escalation to resolve.")

### Complete the Pipeline

With the escalation resolved and confidence upgraded, we walk through the final pipeline
stages to **COMPLETE**.

In [None]:
# Final transitions: QUALITY_CHECK --> ANALYZED --> REPORTING --> ASSEMBLING --> REPORTED --> COMPLETE

FINAL_TRANSITIONS = [
    {
        "expected_phase": "ANALYZING", "expected_status": "QUALITY_CHECK",
        "target_phase": "ANALYZING", "target_status": "ANALYZED",
        "reason": "Analysis complete -- flood extent map generated",
    },
    {
        "expected_phase": "ANALYZING", "expected_status": "ANALYZED",
        "target_phase": "REPORTING", "target_status": "REPORTING",
        "reason": "Generating analysis products and report",
    },
    {
        "expected_phase": "REPORTING", "expected_status": "REPORTING",
        "target_phase": "REPORTING", "target_status": "ASSEMBLING",
        "reason": "Assembling final deliverables (GeoTIFF, GeoJSON, PDF)",
    },
    {
        "expected_phase": "REPORTING", "expected_status": "ASSEMBLING",
        "target_phase": "REPORTING", "target_status": "REPORTED",
        "reason": "Report assembled and validated",
    },
    {
        "expected_phase": "REPORTING", "expected_status": "REPORTED",
        "target_phase": "COMPLETE", "target_status": "COMPLETE",
        "reason": "Job complete -- results published to STAC catalog",
    },
]

if JOB_ID:
    # Inject final reasoning before completing
    reasoning_final = {
        "reasoning": (
            "Flood detection analysis complete for Houston Ship Channel area. "
            "Key findings: (1) Buffalo Bayou overflow detected along 4.2 km stretch "
            "between Waugh Dr and US-59. (2) Estimated flood extent: 3.8 km2. "
            "(3) 94 structures identified within flood boundary using building footprint "
            "overlay. (4) SAR coherence method corroborated by NDWI optical analysis with "
            "87% spatial agreement. Confidence in overall assessment: 0.91."
        ),
        "confidence": 0.91,
        "payload": {
            "flood_extent_km2": 3.8,
            "affected_structures": 94,
            "sar_optical_agreement": 0.87,
            "primary_method": "SAR coherence change detection",
            "corroboration": "NDWI optical thresholding",
        },
    }
    sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/reasoning", reasoning_final)
    print(f"Final reasoning recorded: HTTP {sc}")
    if isinstance(body, dict):
        print(f"  Event sequence: {body.get('event_seq')}")
    print()
    
    # Walk through final transitions
    for t in FINAL_TRANSITIONS:
        sc, body = api_post(f"/control/v1/jobs/{JOB_ID}/transition", t)
        phase = body.get("phase", "?") if isinstance(body, dict) else "?"
        status_val = body.get("status", "?") if isinstance(body, dict) else "?"
        marker = "OK" if 200 <= sc < 300 else f"ERR {sc}"
        print(f"  [{marker}] {phase}/{status_val} -- {t['reason']}")
    
    print(f"\nPipeline COMPLETE.")
else:
    print("No job ID available.")

---

## 8. Context Lakehouse

The Context Lakehouse is PostGIS-backed storage that accumulates **all contextual data**
across analysis jobs -- satellite datasets, building footprints, critical infrastructure,
and weather observations.

This is what turns individual analyses into **compound intelligence**. When a second flood
event hits the same area, the satellite scenes and building data from the first analysis
are already cached -- reducing ingestion time and enabling temporal comparisons.

All endpoints accept **spatial filters** (bbox) and return paginated results.

> **Why This Matters:** MAIA does not just get analysis results -- it gets the raw spatial
> context to perform its own reasoning. "37 buildings in the flood extent", "2 hospitals
> within 2 km", "12,400 population affected" -- these decision products come from
> spatial joins against the lakehouse.

### 8a. Query Datasets

**`GET /control/v1/context/datasets`**

Returns accumulated satellite scene metadata: source catalog, footprint geometry, bands,
resolution, cloud cover, acquisition date. Supports spatial (bbox) and temporal (date range)
filters.

In [None]:
# Query datasets in the Houston AOI bbox
HOUSTON_BBOX = "-95.3698,29.7100,-95.2900,29.7604"

sc, body = api_get("/control/v1/context/datasets", params={"bbox": HOUSTON_BBOX, "page_size": 5})
print(f"HTTP {sc}")
pp(body)

### 8b. Query Buildings

**`GET /control/v1/context/buildings`**

Returns building footprints from OpenStreetMap and other sources within the specified bbox.

In [None]:
sc, body = api_get("/control/v1/context/buildings", params={"bbox": HOUSTON_BBOX, "page_size": 5})
print(f"HTTP {sc}")
pp(body)

### 8c. Query Critical Infrastructure

**`GET /control/v1/context/infrastructure`**

Returns hospitals, fire stations, schools, and other critical facilities. Supports
filtering by type (e.g., `?type=hospital`).

In [None]:
sc, body = api_get("/control/v1/context/infrastructure", params={"bbox": HOUSTON_BBOX, "page_size": 5})
print(f"HTTP {sc}")
pp(body)

### 8d. Query Weather Observations

**`GET /control/v1/context/weather`**

Returns weather observations (rain gauges, NOAA alerts, station data) with spatial and
temporal filtering.

In [None]:
sc, body = api_get("/control/v1/context/weather", params={"bbox": HOUSTON_BBOX, "page_size": 5})
print(f"HTTP {sc}")
pp(body)

### 8e. Lakehouse Summary

**`GET /control/v1/context/summary`**

Returns aggregate statistics across the entire lakehouse: row counts per table,
spatial extent, distinct data sources, and usage statistics.

In [None]:
sc, body = api_get("/control/v1/context/summary")
print(f"HTTP {sc}")
pp(body)

### 8f. Per-Job Context Usage

**`GET /control/v1/jobs/{job_id}/context`**

Shows how much context data was **ingested** (fetched fresh) versus **reused** (already in
the lakehouse from a previous job) for a specific job. This is the key metric for the
lakehouse effect -- over time, reuse increases and ingestion costs decrease.

In [None]:
if JOB_ID:
    sc, body = api_get(f"/control/v1/jobs/{JOB_ID}/context")
    print(f"HTTP {sc}")
    pp(body)
else:
    print("No job ID available.")

---

## 9. Partner Integration: Metrics & Queue

These endpoints are designed for MAIA's **operations dashboard** -- real-time visibility
into pipeline health.

Metrics are served from **materialized views** that refresh every 30 seconds. This means
responses come back in under 100ms regardless of how many jobs are in the system -- the
dashboard can poll every few seconds without impacting API performance.

> **Why This Matters:** MAIA's operations team needs to know at a glance: How many jobs
> completed? How many failed? What is the P95 latency? Are any jobs stuck? These
> endpoints provide that view with zero computation at query time.

### 9a. Pipeline Health Metrics

**`GET /internal/v1/metrics`**

Aggregated metrics: jobs completed/failed (1h and 24h windows), P50/P95 job duration,
active SSE connections, and webhook delivery success rate.

In [None]:
sc, body = api_get("/internal/v1/metrics")
print(f"HTTP {sc}")
pp(body)

### 9b. Queue Summary

**`GET /internal/v1/queue/summary`**

Per-phase job counts, stuck job count, and jobs awaiting human review (open escalations).

In [None]:
sc, body = api_get("/internal/v1/queue/summary")
print(f"HTTP {sc}")
pp(body)

---

## 10. SSE Event Stream

**`GET /internal/v1/events/stream`**

FirstLight provides a **Server-Sent Events (SSE)** endpoint for real-time event delivery.
Every state transition, reasoning entry, escalation, and parameter change is broadcast
as a **CloudEvents v1.0** envelope.

### Capabilities

| Feature | Description |
|---------|-------------|
| **Last-Event-ID** | Reconnect and replay missed events from a specific sequence number |
| **customer_id scoping** | Multi-tenant isolation -- each customer only sees their events |
| **Event type filtering** | `?type=job.transition` to subscribe to specific event types |
| **Backpressure** | Max 500 buffered events per connection -- slow consumers are disconnected gracefully |
| **Heartbeat** | 30-second keepalive comments to detect dead connections |

The SSE endpoint is best consumed from a **backend service or CLI** rather than a notebook.
Here is how to connect:

In [None]:
# SSE is a long-lived streaming connection -- not practical to run in a notebook cell.
# Here is how to connect from a backend or CLI:

curl_command = f"""# Connect to SSE event stream (run in terminal)
curl -N -H "X-API-Key: {API_KEY}" \\
     -H "Accept: text/event-stream" \\
     "{BASE_URL}/internal/v1/events/stream"

# With Last-Event-ID for replay after reconnection:
curl -N -H "X-API-Key: {API_KEY}" \\
     -H "Accept: text/event-stream" \\
     -H "Last-Event-ID: 42" \\
     "{BASE_URL}/internal/v1/events/stream"

# Filtered by event type:
curl -N -H "X-API-Key: {API_KEY}" \\
     -H "Accept: text/event-stream" \\
     "{BASE_URL}/internal/v1/events/stream?type=job.transition"""

print(curl_command)

print("\n" + "=" * 70)
print("CloudEvents v1.0 envelope example:")
print("=" * 70)

example_event = {
    "specversion": "1.0",
    "id": "evt-42",
    "source": "firstlight/control-plane",
    "type": "job.transition",
    "subject": JOB_ID or "<job-uuid>",
    "time": datetime.now(timezone.utc).isoformat(),
    "datacontenttype": "application/json",
    "data": {
        "job_id": JOB_ID or "<job-uuid>",
        "phase": "ANALYZING",
        "status": "QUALITY_CHECK",
        "actor": "maia-agent",
        "reasoning": "Running QA checks on detection output",
    },
}
pp(example_event)

### Webhook Registration

For push-based delivery (instead of SSE pull), MAIA can register webhooks:

| Method | Endpoint | Description |
|--------|---------|-------------|
| `POST` | `/internal/v1/webhooks` | Register a new webhook (HTTPS required, SSRF-protected) |
| `GET` | `/internal/v1/webhooks` | List registered webhooks (cursor-based pagination) |
| `DELETE` | `/internal/v1/webhooks/{id}` | Deregister a webhook |

In [None]:
# List existing webhook subscriptions
sc, body = api_get("/internal/v1/webhooks")
print(f"HTTP {sc}")
pp(body)

---

## 11. Standards Integration: OGC & STAC

FirstLight does not lock data into a proprietary format. The same algorithms and results
are accessible through **open geospatial standards**.

### OGC API Processes (`/oapi/*`)

The same flood detection algorithms available through the LLM Router are also exposed
as OGC API Processes. This means QGIS, ArcGIS Pro, and any OGC-compliant client can
submit and monitor jobs without using the LLM Control Plane.

### STAC Catalog (`/stac/*`)

Every completed analysis is published as a STAC Item. The STAC catalog makes results
discoverable by spatial and temporal search -- standard practice in the Earth observation
community.

> **Why This Matters:** MAIA is not the only consumer. Emergency managers use QGIS.
> Researchers use Python + pystac. GIS teams use ArcGIS. Open standards mean FirstLight
> fits into existing workflows without lock-in.

In [None]:
# Check OGC API Processes
print("OGC API Processes (/oapi/processes):")
print("=" * 50)
sc, body = api_get("/oapi/processes")
print(f"HTTP {sc}")
if sc == 200:
    if isinstance(body, dict):
        processes = body.get("processes", [])
        print(f"Available processes: {len(processes)}")
        for p in processes[:5]:
            print(f"  - {p.get('id', '?')}: {p.get('title', p.get('description', ''))[:60]}")
    else:
        pp(body)
elif sc == 404:
    print("OGC endpoint not deployed on this instance (pygeoapi optional).")
else:
    pp(body)

In [None]:
# Check STAC Catalog
print("STAC Catalog (/stac):")
print("=" * 50)
sc, body = api_get("/stac")
print(f"HTTP {sc}")
if sc == 200:
    pp(body)
elif sc == 404:
    print("STAC endpoint not deployed on this instance (stac-fastapi optional).")
else:
    pp(body)

print("\nSTAC Collections (/stac/collections):")
print("=" * 50)
sc, body = api_get("/stac/collections")
print(f"HTTP {sc}")
if sc == 200 and isinstance(body, dict):
    collections = body.get("collections", [])
    print(f"Collections: {len(collections)}")
    for c in collections[:5]:
        print(f"  - {c.get('id')}: {c.get('title', c.get('description', ''))[:60]}")
elif sc == 404:
    print("STAC collections not yet available.")
else:
    pp(body)

---

## 12. Summary

### What We Demonstrated

| # | Capability | What Happened |
|---|-----------|---------------|
| 1 | **Tool Discovery** | AI agent discovered available algorithms at runtime via OpenAI-compatible schemas |
| 2 | **Job Creation** | Submitted flood analysis with GeoJSON AOI + initial reasoning |
| 3 | **Phase Transitions** | Walked through 7 phases with atomic TOCTOU-guarded state transitions |
| 4 | **Reasoning Injection** | AI recorded chain of thought with confidence scores and structured payloads |
| 5 | **Parameter Tuning** | Adjusted algorithm sensitivity mid-flight via JSON merge-patch |
| 6 | **Escalation Workflow** | Flagged low-confidence finding for human review, then resolved it |
| 7 | **Context Lakehouse** | Queried accumulated spatial data across datasets, buildings, infrastructure, weather |
| 8 | **Metrics & Queue** | Retrieved pipeline health and queue status from materialized views |
| 9 | **SSE Events** | Showed real-time event streaming with CloudEvents v1.0 envelopes |
| 10 | **OGC & STAC** | Verified standards-compliant interfaces for GIS tool interoperability |

### Complete Endpoint Map

#### LLM Control Plane (`/control/v1`)

| Method | Endpoint | Description |
|--------|---------|-------------|
| `GET` | `/control/v1/tools` | Discover algorithm tool schemas (OpenAI format) |
| `POST` | `/control/v1/jobs` | Create a new analysis job |
| `GET` | `/control/v1/jobs` | List jobs (filter by phase, status, event_type, bbox) |
| `GET` | `/control/v1/jobs/{id}` | Get full job detail |
| `POST` | `/control/v1/jobs/{id}/transition` | Atomic state transition (TOCTOU guard) |
| `POST` | `/control/v1/jobs/{id}/reasoning` | Inject LLM reasoning entry |
| `PATCH` | `/control/v1/jobs/{id}/parameters` | Tune parameters mid-flight (merge-patch) |
| `POST` | `/control/v1/jobs/{id}/escalations` | Create escalation |
| `PATCH` | `/control/v1/jobs/{id}/escalations/{eid}` | Resolve escalation |
| `GET` | `/control/v1/jobs/{id}/escalations` | List escalations for a job |
| `GET` | `/control/v1/jobs/{id}/context` | Per-job context usage (ingested vs reused) |

#### Context Lakehouse (`/control/v1/context`)

| Method | Endpoint | Description |
|--------|---------|-------------|
| `GET` | `/control/v1/context/datasets` | Query satellite datasets (bbox, date range, source) |
| `GET` | `/control/v1/context/buildings` | Query building footprints (bbox) |
| `GET` | `/control/v1/context/infrastructure` | Query critical infrastructure (bbox, type) |
| `GET` | `/control/v1/context/weather` | Query weather observations (bbox, time range) |
| `GET` | `/control/v1/context/summary` | Lakehouse-wide statistics |

#### Partner Integration (`/internal/v1`)

| Method | Endpoint | Description |
|--------|---------|-------------|
| `GET` | `/internal/v1/events/stream` | SSE event stream (CloudEvents v1.0) |
| `GET` | `/internal/v1/metrics` | Pipeline health metrics (materialized view) |
| `GET` | `/internal/v1/queue/summary` | Queue status (per-phase counts, stuck jobs) |
| `POST` | `/internal/v1/webhooks` | Register webhook subscription |
| `GET` | `/internal/v1/webhooks` | List webhook subscriptions |
| `DELETE` | `/internal/v1/webhooks/{id}` | Deregister webhook |

#### Standards

| Method | Endpoint | Description |
|--------|---------|-------------|
| `GET` | `/oapi/processes` | OGC API Processes (standards-compliant algorithms) |
| `GET` | `/stac/collections` | STAC Catalog (published analysis results) |
| `GET` | `/api/v1/health` | Platform health check |

### Architecture Highlights

- **PostGIS** stores all spatial state -- jobs, context data, events. Every geometry is indexed
  for fast spatial queries.
- **Materialized views** power the metrics and queue endpoints -- pre-computed every 30 seconds,
  so dashboard polling is effectively free.
- **CloudEvents v1.0** envelopes on the SSE stream ensure interoperability with any event-driven
  architecture.
- **TOCTOU guards** on state transitions prevent race conditions in multi-agent environments.
- **X-API-Key auth** with tenant-scoped data isolation -- MAIA sees only MAIA's jobs.
- **JSON merge-patch** for parameter updates follows RFC 7386 -- no custom protocol.

### Key Differentiators

1. **LLM-Native** -- Built for AI agents from day one. Tool discovery, reasoning injection,
   and confidence-gated escalation are first-class features, not afterthoughts.

2. **Context Lakehouse** -- Accumulated spatial data compounds across jobs. The second analysis
   in an area is faster and richer than the first.

3. **Open Standards** -- OGC API Processes + STAC + CloudEvents. No vendor lock-in.
   GIS tools, AI agents, and custom dashboards all consume the same data.

4. **Full Audit Trail** -- Every decision, transition, reasoning entry, and escalation is
   recorded as an immutable event. Complete explainability from trigger to deliverable.

In [None]:
# Final status check on our demo job
if JOB_ID:
    sc, detail = api_get(f"/control/v1/jobs/{JOB_ID}")
    if sc == 200 and isinstance(detail, dict):
        print(f"Demo Job Summary")
        print(f"================")
        print(f"  Job ID:     {detail.get('job_id')}")
        print(f"  Phase:      {detail.get('phase')}")
        print(f"  Status:     {detail.get('status')}")
        print(f"  Event Type: {detail.get('event_type')}")
        print(f"  AOI Area:   {detail.get('aoi_area_km2')} km2")
        print(f"  Created:    {detail.get('created_at')}")
        print(f"  Updated:    {detail.get('updated_at')}")
    else:
        print(f"HTTP {sc}")
        pp(detail)
else:
    print("No job was created during this session.")

print("\n" + "=" * 70)
print("  Demo complete. Questions?")
print("=" * 70)