STAMPS AI - V2 ; Cross-scope Synthesis (E + I + M)

Approach
- Retrieves top-k relevant excerpts independently from:
  - E-STAMP,
  - I-STAMP,
  - M-STAMP
- All retrieval is performed against authoritative STAMP documents only
- No user-provided filters are required

- Extracts abstracted themes within each STAMP product
- Themes are normalized, non-duplicative, and evidence-backed
- Each theme is explicitly linked to supporting document excerpts

- Aligns similar themes across E/I/M into canonical cross-STAMP themes 
- Tracks which products support each theme
- Preserves product-specific nuance while removing redundancy

- Groups evidence at the theme level
- Selects a small number of representative citations per theme
- Converts raw excerpts into concise, leadership-friendly evidence bullets
- All bullets remain grounded to page-level citations

- Identifies coverage gaps where themes appear in some STAMP products but not others
- Identifies defintion gaps where themes lack operational clarity
- Gaps are explicitly labeled and confidence-scored

- Produces a structured narrative including:
  - Executive summary
  - Cross-STAMP themes
  - Identified gaps and inconsistencies
  - Implications
- All claims are evidence-backed and citation-ready


Limitations:
- Retrieval uses keyword scoring still
- Insights still limited to what is documented in STAMP pdfs
- Quantitative questions are still out of scope


What's Next:
- Output consistency testing across diverse question types
- Theme and gap stability validation across repeated runs
- Tone and length calibration for leaders
- Citation accuracy verification at scale


Next Version: 
- Integrate STAMP data sources alongside documentation

In [0]:
%pip install -U mlflow pymupdf
dbutils.library.restartPython

Collecting mlflow
  Downloading mlflow-3.8.1-py3-none-any.whl.metadata (31 kB)
Collecting pymupdf
  Downloading pymupdf-1.26.7-cp310-abi3-manylinux_2_28_x86_64.whl.metadata (3.4 kB)
Collecting mlflow-skinny==3.8.1 (from mlflow)
  Downloading mlflow_skinny-3.8.1-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-tracing==3.8.1 (from mlflow)
  Downloading mlflow_tracing-3.8.1-py3-none-any.whl.metadata (19 kB)
Collecting Flask-CORS<7 (from mlflow)
  Downloading flask_cors-6.0.2-py3-none-any.whl.metadata (5.3 kB)
Collecting Flask<4 (from mlflow)
  Downloading flask-3.1.2-py3-none-any.whl.metadata (3.2 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.18.1-py3-none-any.whl.metadata (7.2 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-non

<bound method DBUtils.LibraryHandler.restartPython of Package 'dbutils.library'.>

In [0]:
import json, requests
import re
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Set, Tuple
from pyspark.sql import functions as F
import uuid, re
import fitz
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
%sql
DROP TABLE IF EXISTS stamp_ai_pages;
DROP TABLE IF EXISTS stamp_ai_chunks;
DROP TABLE IF EXISTS stamp_ai_ingest_log;

In [0]:
%sql
CREATE TABLE stamp_ai_pages (
  doc_id STRING,
  stamp_family STRING,
  stamp_product STRING,
  majcom STRING,
  installation STRING,
  stamp_name STRING,
  stamp_version STRING,
  source_pdf_path STRING,
  page_num INT,
  page_text STRING
);

CREATE TABLE IF NOT EXISTS stamp_ai_chunks (
  chunk_id STRING,
  doc_id STRING,
  stamp_family STRING,
  stamp_product STRING,
  majcom STRING,
  installation STRING,
  stamp_name STRING,
  stamp_version STRING,
  source_pdf_path STRING,
  page_num INT,
  section_title STRING,
  figure_refs STRING,
  chunk_text STRING
);

CREATE TABLE stamp_ai_ingest_log (
  source_pdf_path STRING,
  stamp_version STRING,
  ingested_at TIMESTAMP,
  doc_id STRING,
  status STRING,
  error STRING
)

In [0]:
ROOT = "s3://usaf-data-tenant-afimsc/FSRM/stamps/raw"

RUN_FILTER = None

In [0]:
def list_pdfs_recursive(root: str):
    out = []
    stack = [root]
    while stack:
        cur = stack.pop()
        for f in dbutils.fs.ls(cur):
            if f.path.lower().endswith(".pdf"):
                out.append(f.path)
            elif f.isDir():
                stack.append(f.path)
    return sorted(out)

pdfs = list_pdfs_recursive(ROOT)
pdfs = [str(x) for x in pdfs]
print("Total PDFs found:", len(pdfs))
print("Sameple:", pdfs[:5])

Total PDFs found: 95
Sameple: ['s3://usaf-data-tenant-afimsc/FSRM/stamps/raw/E/Dorms/DEC2025/E-STAMP - Built Infrastructure - DORMS - DEC2025.pdf', 's3://usaf-data-tenant-afimsc/FSRM/stamps/raw/E/Facilities/DEC2025/E-STAMP - Built Infrastructure - FACILITIES - DEC2025.pdf', 's3://usaf-data-tenant-afimsc/FSRM/stamps/raw/E/TNAP/DEC2025/E-STAMP - Built Infrastructure - TNAP - DEC2025.pdf', 's3://usaf-data-tenant-afimsc/FSRM/stamps/raw/E/Utilities/DEC2025/E-STAMP - Built Infrastructure - UTILITIES - DEC2025.pdf', 's3://usaf-data-tenant-afimsc/FSRM/stamps/raw/I/All/FEB2025/MacDill AFB/I-STAMP - Built Infrastructure - MacDill AFB - FEB2025.pdf']


In [0]:
def parse_meta_from_path(path: str):
    parts = path.split("/")
    raw_i = parts.index("raw")

    fam = parts[raw_i + 1]
    product = parts[raw_i + 2]
    run = parts[raw_i + 3]

    org = None
    if fam == 'I':
        org = parts[raw_i + 4] # installation
    elif fam == 'M':
        org = parts[raw_i + 4] # majcom

    stamp_name = f"{fam}-STAMP - Built Infrastructure - {product}"
    stamp_version = run

    return {
        "stamp_family": fam,
        "stamp_product": product,
        "stamp_version": stamp_version,
        "majcom": org if fam == 'M' else None,
        "installation": org if fam == 'I' else None,
        "stamp_name": stamp_name
    }

In [0]:
from pyspark.sql import functions as F

def already_ingested(pdf_path: str, run: str) -> bool:
    cond = (
        (F.col("source_pdf_path") == pdf_path) &
        (F.col("stamp_version") == run) &
        (F.col("status") == 'SUCCESS')
    )

    return (
        spark.table("stamp_ai_ingest_log")
        .where(cond)
        .limit(1)
        .count()
        > 0
    )

In [0]:
import uuid, re
import fitz
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

pages_schema = StructType([
    StructField("doc_id", StringType(), False),
    StructField("stamp_family", StringType(), True),
    StructField("stamp_product", StringType(), True),
    StructField("majcom", StringType(), True),
    StructField("installation", StringType(), True),
    StructField("stamp_name", StringType(), True),
    StructField("stamp_version", StringType(), True),
    StructField("source_pdf_path", StringType(), True),
    StructField("page_num", IntegerType(), True),
    StructField("page_text", StringType(), True),
])

chunks_schema = StructType([
    StructField("chunk_id", StringType(), False),
    StructField("doc_id", StringType(), True),
    StructField("stamp_family", StringType(), True),
    StructField("stamp_product", StringType(), True),
    StructField("majcom", StringType(), True),
    StructField("installation", StringType(), True),
    StructField("stamp_name", StringType(), True),
    StructField("stamp_version", StringType(), True),
    StructField("source_pdf_path", StringType(), True),
    StructField("page_num", IntegerType(), True),
    StructField("section_title", StringType(), True),
    StructField("figure_refs", StringType(), True),
    StructField("chunk_text", StringType(), True),
])

def s3_to_local_pdf(s3_path: str) -> str:
    tmp_dir = "dbfs:/tmp/stamps_ai"
    dbutils.fs.mkdirs(tmp_dir)
    local_dbfs = f"{tmp_dir}/{uuid.uuid4().hex}.pdf"
    dbutils.fs.cp(s3_path, local_dbfs, True) # overwrite
    return "/dbfs/" + local_dbfs.replace("dbfs:/", "")

def detect_figure_refs(text: str) -> str:
    if not text:
        return ""
    refs = re.findall(r"\b(?:Figure|Fig\.)\s*\d+(?:[A-Za-z]|\.\d+)?\b", text)
    uniq = sorted(set([r.strip() for r in refs]))
    return ", ".join(uniq)

def chunk_text(text: str, max_chars: int=1800, overlap: int=250):
    text = (text or "").strip()
    if not text: 
        return []
    paras = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
    chunks, cur = [], ""
    for p in paras:
        if len(cur) + len(p) + 2 <= max_chars:
            cur = (cur + "\n\n" + p).strip()
        else:
            if cur:
                chunks.append(cur)
            if len(p) > max_chars:
                start = 0
                while start < len(p):
                    end = min(start + max_chars, len(p))
                    chunks.append(p[start:end])
                    start = max(end - overlap, end)
                cur = ""
            else: 
                cur = p
    
    if cur:
        chunks.append(cur)

    final = []
    for i, c in enumerate(chunks):
        if i == 0:
            final.append(c)
        else:
            prev = chunks[i-1]
            tail = prev[-overlap:] if len(prev) > overlap else prev
            final.append((tail + '\n\n' + c).strip())
    return final

In [0]:
def write_ingest_log(source_pdf_path: str, stamp_version: str, doc_id, status: str, error: str | None):
    def esc(s):
        return s.replace('"', '\\"') if s is not None else None
    
    source_pdf_path = esc(source_pdf_path)
    stamp_version = esc(stamp_version)
    status = esc(status)
    error = esc(error[:2000]) if error else None

    doc_id_sql = f'"{esc(doc_id)}"' if doc_id else "NULL"
    error_sql = f'"{error}"' if error else "NULL"

    spark.sql(f"""
              INSERT INTO stamp_ai_ingest_log
              VALUES (
                  "{source_pdf_path}",
                  "{stamp_version}",
                  current_timestamp(),
                  {doc_id_sql},
                  "{status}",
                  {error_sql}
              )
    """)

def ingest_one_pdf(pdf_path: str):
    pdf_pag = str(pdf_path)
    meta = parse_meta_from_path(pdf_path)

    # optional run filter
    if RUN_FILTER and meta['stamp_version'] != RUN_FILTER:
        return ("SKIPPED", None, f"RUN_FILTER={RUN_FILTER}")
    
    if already_ingested(pdf_path, meta['stamp_version']):
        return ("SKIPPED", None, 'Already ingested')
    
    try:
        local_pdf = s3_to_local_pdf(pdf_path)
        doc = fitz.open(local_pdf)
        doc_id = uuid.uuid4().hex

        page_rows = []
        chunk_rows = []

        for i in range(len(doc)):
            page_num = i + 1
            page_text = doc.load_page(i).get_text("text") or ""
            page_text = page_text.replace("\u00a0", " ").strip()

            page_rows.append(Row(
                doc_id=doc_id,
                stamp_family=str(meta.get('stamp_family') or ""),
                stamp_product=str(meta.get('stamp_product') or ""),
                majcom=meta.get("majcom"),
                installation=meta.get("installation"),
                stamp_name=str(meta.get("stamp_name") or ""),
                stamp_version=str(meta.get("stamp_version") or ""),
                source_pdf_path=pdf_path,
                page_num=int(page_num),
                page_text=page_text
            ))

            figs = detect_figure_refs(page_text) or ""
            for c in chunk_text(page_text):
                chunk_rows.append(Row(
                    chunk_id=uuid.uuid4().hex,
                    doc_id=doc_id,
                    stamp_family=str(meta.get('stamp_family') or ""),
                    stamp_product=str(meta.get('stamp_product') or ""),
                    majcom=meta.get("majcom"),
                    installation=meta.get("installation"),
                    stamp_name=str(meta.get("stamp_name") or ""),
                    stamp_version=str(meta.get("stamp_version") or ""),
                    source_pdf_path=pdf_path,
                    page_num=int(page_num),
                    section_title=f"Page {page_num}",
                    figure_refs=figs,
                    chunk_text=c
                ))

        spark.createDataFrame(page_rows, schema=pages_schema).write.mode("append").saveAsTable("stamp_ai_pages")
        spark.createDataFrame(chunk_rows, schema=chunks_schema).write.mode("append").saveAsTable("stamp_ai_chunks")

        write_ingest_log(
            source_pdf_path=pdf_path,
            stamp_version=str(meta['stamp_version']),
            doc_id=doc_id,
            status='SUCCESS',
            error=None
        )

        return ("SUCCESS", doc_id, None)
        
    except Exception as e:
        write_ingest_log(
            source_pdf_path=pdf_path,
            stamp_version=str(meta.get("stamp_version") or ""),
            doc_id=None,
            status="FAILED",
            error=str(e)
        )

        return ("FAILED", None, str(e))
        

In [0]:
success = skipped = failed = 0
pdfs = [str(p) for p in pdfs]

print("Starting ingestion. PDF count:", len(pdfs))

for idx, p in enumerate(pdfs, start=1):
    p = str(p)

    try:
        status, doc_id, msg = ingest_one_pdf(p)

        if status == 'SUCCESS':
            success += 1
        elif status == 'SKIPPED':
            skipped += 1
        else:
            failed += 1
            print("\nFAILED:", p)
            print(" msg:", msg)
    except Exception as e:
        failed += 1
        print("\nEXCEPTION:", p)
        print(" ", str(e))

    if idx % 10 == 0 or idx == len(pdfs):
        print(f"[{idx}/{len(pdfs)}] success={success} skipped={skipped} failed={failed}")

print("DONE")
print("success=", success, 'skipped=', skipped, 'failed=', failed)

Starting ingestion. PDF count: 95
[10/95] success=10 skipped=0 failed=0
[20/95] success=20 skipped=0 failed=0
[30/95] success=30 skipped=0 failed=0
[40/95] success=40 skipped=0 failed=0
[50/95] success=50 skipped=0 failed=0
[60/95] success=60 skipped=0 failed=0
[70/95] success=70 skipped=0 failed=0
[80/95] success=80 skipped=0 failed=0
[90/95] success=90 skipped=0 failed=0
[95/95] success=95 skipped=0 failed=0
DONE
success= 95 skipped= 0 failed= 0


In [0]:
%sql
SELECT
status,
COUNT(*) AS n_files
FROM stamp_ai_ingest_log
GROUP BY status
ORDER BY n_files DESC;

status,n_files
SUCCESS,95


In [0]:
%sql
SELECT
ingested_at,
source_pdf_path,
stamp_version,
status,
error
FROM stamp_ai_ingest_log
WHERE status = 'FAILED'
ORDER BY ingested_at DESC
LIMIT 50;

ingested_at,source_pdf_path,stamp_version,status,error


In [0]:
%sql
SELECT
stamp_version,
stamp_family,
stamp_product,
COUNT(DISTINCT source_pdf_path) AS n_pdfs,
COUNT(*) AS n_chunks
FROM stamp_ai_chunks
GROUP BY stamp_version, stamp_family, stamp_product
ORDER BY stamp_version, stamp_family, stamp_product;

stamp_version,stamp_family,stamp_product,n_pdfs,n_chunks
DEC2025,E,Dorms,1,16
DEC2025,E,Facilities,1,19
DEC2025,E,TNAP,1,19
DEC2025,E,Utilities,1,24
FEB2025,I,All,2,34
SEP2025,I,All,79,1393
SEP2025,M,All,10,200


In [0]:
%sql
SELECT
stamp_family,
stamp_product,
stamp_version,
AVG(LENGTH(page_text)) AS avg_chars_per_page,
MIN(LENGTH(page_text)) AS min_chars_per_page,
MAX(LENGTH(page_text)) AS max_chars_per_page
FROM stamp_ai_pages
GROUP BY stamp_family, stamp_product, stamp_version
ORDER BY stamp_family, stamp_product, stamp_version;

stamp_family,stamp_product,stamp_version,avg_chars_per_page,min_chars_per_page,max_chars_per_page
E,Dorms,DEC2025,2341.5,944,3087
E,Facilities,DEC2025,2504.6666666666665,919,3292
E,TNAP,DEC2025,2728.625,918,3916
E,Utilities,DEC2025,3202.777777777778,919,5569
I,All,FEB2025,1940.35,1105,3085
I,All,SEP2025,1777.5388180764774,131,2328
M,All,SEP2025,1591.546153846154,872,2258


In [0]:
from pyspark.sql import functions as F

# inference helpers
def _norm(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "").strip().lower())

def build_scope_lookup():
    """
    pulling distinct installations and majcom names from indexed chunks so can infer org scope from question 
    """
    df = spark.table("stamp_ai_chunks")

    installs = (
        df.where(F.col("stamp_family") == "I")
        .select("installation")
        .where(F.col("installation").isNotNull() & (F.length("installation") > 0))
        .distinct()
        .collect()
    )

    majcoms = (
        df.where(F.col("stamp_family") == "M")
        .select("majcom")
        .where(F.col("majcom").isNotNull() & (F.length("majcom") > 0))
        .distinct()
        .collect()
    )

    installations = sorted({_norm(r['installation']): r['installation'] for r in installs}.items(), key=lambda x: len(x[0]), reverse=True)
    majcoms = sorted({_norm(r['majcom']): r['majcom'] for r in majcoms}.items(), key=lambda x: len(x[0]), reverse=True)

    return installations, majcoms

INSTALL_LOOKUP, MAJCOM_LOOKUP = build_scope_lookup()

def latest_stamp_version():
    """
    defaulting to latest versions since there's various versions in sharepoint
    """
    r = (
        spark.table("stamp_ai_chunks")
        .select("stamp_version")
        .where(F.col("stamp_version").isNotNull())
        .distinct()
        .orderBy(F.col("stamp_version").desc())
        .limit(1)
        .collect()
    )
    return r[0]['stamp_version'] if r else None

PRODUCT_KEYWORDS = {
    "Utilities": ['utilities', 'electric', 'power', 'water', 'wastewater', 'sewer', 'steam', 'natural gas'],
    "Facilities": ['facilities', 'facility', 'buildings', 'roof', 'hvac'],
    "Dorms": ['dorms', 'dorm', 'unaccompanied housing', 'barracks'],
    'TNAP': ['tnap', 'tactical', 'airfield', 'pavement', 'runway', 'taxiway', 'apron'],
}

FAMILY_HINTS = {
    "E": ['enterprise', 'e-stamp', 'e stamp'],
    'I': ['installation', 'i-stamp', 'i stamp', 'base', 'afb', 'afs', 'ab', 'sfs', 'sfb', 'angb'],
    'M': ['majcom', 'm-stamp', 'm stamp'],
}

def infer_product(q: str) -> str | None:
    qn = _norm(q)
    for prod, kws in PRODUCT_KEYWORDS.items():
        if any(k in qn for k in kws):
            return prod
    return None

def infer_family_from_text(q: str) -> str | None:
    qn = _norm(q)
    for fam, kws in FAMILY_HINTS.items():
        if any(k in qn for k in kws):
            return fam
    return None

def infer_org(q: str):
    """
    returns family, ins, majcom if org is detected from lookup tables
    installation match forces family = I and majcom match forces family = M
    """
    qn = _norm(q)

    for key, canonical in INSTALL_LOOKUP:
        if key and key in qn:
            return ("I", canonical, None)
    
    for key, canonical in MAJCOM_LOOKUP:
        if key and key in qn:
            return ("M", canonical, None)
        
    return (None, None, None)

def infer_version(q:str) -> str | None:
    qn = _norm(q)

    versions = (
        spark.table("stamp_ai_chunks")
        .select("stamp_version")
        .where(F.col("stamp_version").isNotNull())
        .distinct()
        .collect()
    )

    known = [v['stamp_version'] for v in versions if v['stamp_version']]
    known = sorted(known, key=lambda s: len(s), reverse = True)

    for v in known:
        if _norm(v) in qn:
            return v
        return None
    
# retriever helper

def retrieve_top_chunks(
    question: str,
    stamp_family: str | None=None,
    stamp_product: str | None=None,
    stamp_version: str | None=None,
    installation: str | None=None,
    majcom: str | None=None,
    top_k: int=8
):
    base = spark.table("stamp_ai_chunks")

    if stamp_family: base = base.filter(F.col("stamp_family") == stamp_family)
    if stamp_product: base = base.filter(F.col("stamp_product") == stamp_product)
    if stamp_version: base = base.filter(F.col("stamp_version") == stamp_version)
    if installation: base = base.filter(F.col('installation') == installation)
    if majcom: base = base.filter(F.col('majcom') == majcom)

    q = (question or "").lower()
    terms = [t.strip(" ,.;:()[]{}!?\"'").lower() for t in q.split() if len(t.strip(" ,.;:()[]{}!?\"'")) > 3][:18]

    score = None
    for t in terms:
        expr = F.when(F.lower(F.col("chunk_text")).contains(t), 1).otherwise(0)
        score = expr if score is None else (score + expr)

    ranked = base.withColumn("score", score if score is not None else F.lit(0)) \
        .orderBy(
            F.col("score").desc(),
            F.col("page_num").asc()
        )

    rows = ranked.limit(top_k).collect()
    return rows

@dataclass
class EvidenceRef:
    product: str
    doc_id: str
    page: Optional[int]
    chunk_id: Optional[str]
    snippet: str

@dataclass
class ThemeItem:
    theme: str
    description: str
    evidence: List[EvidenceRef]

def _build_theme_extraction_prompt(product: str, chunk_payloads: List[Dict[str, Any]], max_themes: int=7) -> str:
    formatted = []
    for idx, c in enumerate(chunk_payloads, start=1):
        txt = (c.get("text") or "").strip()
        txt = re.sub(r"\s", " ", txt)
        snippet = txt[:700]
        formatted.append({
            "i":idx,
            "doc_id":c.get("doc_id"),
            "page":c.get("page"),
            "chunk_id":c.get("chunk_id"),
            "text":snippet
        })

    instructions = f"""
    You are extracting THEMES from STAMP {product}-product document exercepts.
    
    Rules:
    - Output MUST be valid JSON (no markdown, no commentary)
    - Produce between 4 and {max_themes} themes. Prefer fewer, higher-quality themes.
    - Themes must be short noun phrases (3-7 words), normalized (no document-specific phrasing)
    - Each theme must have:
        - "theme": normalized label
        - "description": 1-2 sentences explaining what the theme means
        - "evidence_idx": list of excerpt indices that explicityly support the theme
    - Do not invent facts. if unclear, omit the theme.
    - Evidence must be explicity: only link an excerpt if it directly supports the theme 
    
    Return schema:
    {{
        "product": "{product}",
        "themes": [
            {{
                "theme": "...",
                "description": "...",
                "evidence_idx": [1, 3]
            }}
            ]
    }}
    
    Excerpts:
    {json.dumps(formatted, ensure_ascii=False)}
    """.strip()

    return instructions

def _safe_json_load(s: str) -> Dict[str, Any]:
    s = (s or "").strip()
    match = re.search(r"\{.*\}", s, flags=re.DOTALL)
    if match:
        s = match.group(0)
    return json.loads(s)

def _validate_theme_json(obj: Dict[str, Any], product: str) -> Dict[str, Any]:
    if not isinstance(obj, dict):
        raise ValueError("Theme output is not a JSON object")
    if obj.get("product") != product:
        obj['product'] = product

    themes = obj.get("themes", [])
    if not isinstance(themes, list):
        raise ValueError("Theme output missing non-empty 'themes' list")

    if len(themes) == 0:
        obj["themes"] = []
        return obj

    cleaned = []
    for t in themes:
        if not isinstance(t, dict):
            continue
        theme = (t.get("theme") or "").strip()
        desc = (t.get("description") or "").strip()
        evidence_idx = t.get("evidence_idx") or []
        if not theme or not desc or not isinstance(evidence_idx, list) or len(evidence_idx) == 0:
            continue

        theme = re.sub(r"\s+", " ", theme)
        theme = theme.strip(" -\t")

        cleaned.append({
            "theme": theme,
            "description": desc,
            "evidence_idx": [int(i) for i in evidence_idx if isinstance(i, (int, float, str)) and str(i).isdigit()]
        })

    if not cleaned:
        raise ValueError("No valid themes after cleaning and validation")

    obj["themes"] = cleaned
    return obj

def extract_themes_per_product(
    product: str,
    chunks: List[Dict[str, Any]],
    llm_call_fn,
    max_themes: int = 7
) -> List[ThemeItem]:
    prompt = _build_theme_extraction_prompt(product, chunks, max_themes=max_themes)
    raw = llm_call_fn(prompt)

    obj = _safe_json_load(raw)
    obj = _validate_theme_json(obj, product)

    if not obj.get("themes"):
        return []

    themes_out: List[ThemeItem] = []

    for t in obj['themes']:
        ev: List[EvidenceRef] = []

        for i in t.get("evidence_idx", []):
            if isinstance(i, int) and 1 <= i <= len(chunks):
                c = chunks[i - 1]
                txt = re.sub(r"\s+", " ", (c.get("text") or "").strip())
                ev.append(EvidenceRef(
                    product=product,
                    doc_id=str(c.get("doc_id") or ""),
                    page=c.get("page"),
                    chunk_id=str(c.get("chunk_id") or ""),
                    snippet=txt[:300]
                ))

        if ev:
            themes_out.append(ThemeItem(
                theme=t.get("theme", ""),
                description=t.get("description", ""),
                evidence=ev
            ))


    return themes_out

def _rows_to_chunks(rows, product: str) -> List[Dict[str, Any]]:
    chunks = []
    for r in rows:
        if hasattr(r, "asDict"):
            r = r.asDict(recursive=True)
        if not isinstance(r, dict):
            continue 

        chunks.append({
            "text": r.get("chunk_text") or r.get("text") or "",
            "page": r.get("page") or r.get("page_number"),
            "doc_id": r.get("doc_id") or r.get("source_doc") or r.get("path"),
            "chunk_id": r.get("chunk_id") or r.get("id")
        })
    return [c for c in chunks if c['text'].strip()]

def llm_call_fn(prompt: str, endpoint: str, temperature: float = 0.2, max_tokens: int = 1100) -> str:
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    host = "https://" + ctx.browserHostName().get()
    url = f"{host}/serving-endpoints/{endpoint}/invocations"
    headers = {
        "Authorization": f"Bearer {ctx.apiToken().get()}",
        "Content-Type": "application/json"
    }

    payload = {"messages": [{"role": "user", "content": prompt}], "temperature": float(temperature), "max_tokens": int(max_tokens)}
    resp = requests.post(url, headers=headers, data=json.dumps(payload))
    data = resp.json()

    return data['choices'][0]['message']['content']

@dataclass
class CanonicalTheme:
    canonical_theme: str
    canonical_description: str
    products: Set[str]
    members: List[Dict[str, str]]
    evidence: List[EvidenceRef]

def align_cross_product_themes(
    e_themes: List[ThemeItem],
    i_themes: List[ThemeItem],
    m_themes: List[ThemeItem],
    llm_one_arg_fn
) -> List[CanonicalTheme]:
    items = []
    idx_to_theme: Dict[str, ThemeItem] = {}
    for product, themes in [("E", e_themes), ("I", i_themes), ("M", m_themes)]:
        for j, t in enumerate(themes, start=1):
            tid = f"{product}{j}"
            items.append({
                "id": tid,
                "product": product,
                "theme": t.theme,
                "description": t.description
            })
            idx_to_theme[tid] = t

    prompt = f"""
    You are aligning THEMES across E-, I-, and M-STAMPS.
    
    Rules:
    - Output MUST be valid JSON only (no markdown, no commentary)
    - Do not invent new ideas beyond the input themes
    - Create canonical themes by grouping semantically similar input themes
    - Each input theme id must appear in exactly ONE group
    - Canonical theme names: 1-2 sentences, plain English, grounded in grouped themes
    - If a theme is unique, it should be a group with one member
    
    Return schema:
    {{
        "groups": [
            {{
                "canonical_theme": "...",
                "canonical_description": "...",
                "members": ['E1', 'I2', 'M4']
            }}]
    }}
    
    Input themes:
    {json.dumps(items, ensure_ascii=False)}
    """.strip()

    raw = llm_one_arg_fn(prompt)
    obj = _safe_json_load(raw)

    groups = obj.get("groups")
    if not isinstance(groups, list) or not groups:
        raise ValueError("Step 2 alignment returned no groups")
    out: List[CanonicalTheme] = []

    for g in groups:
        canon = (g.get("canonical_theme") or "").strip()
        cdesc = (g.get("canonical_description") or "").strip()
        members = g.get('members') or []

        if not canon or not isinstance(members, list) or len(members) == 0:
            continue

        products: Set[str] = set()
        member_records: List[Dict[str, str]] = []
        evidence: List[EvidenceRef] = []

        for mid in members:
            mid = str(mid).strip()
            if mid not in idx_to_theme: 
                continue
            t = idx_to_theme[mid]

            prod = mid[0]
            products.add(prod)
            member_records.append({"product": prod, "theme": t.theme})
            evidence.extend(t.evidence)

        seen: Set[Tuple[str, str, Optional[int], Optional[str]]] = set()
        deduped: List[EvidenceRef] = []
        for ev in evidence:
            key = (ev.product, ev.doc_id, ev.page, ev.chunk_id)
            if key in seen:
                continue
            seen.add(key)
            deduped.append(ev)

        out.append(CanonicalTheme(
            canonical_theme=canon,
            canonical_description=cdesc,
            products=products,
            members=member_records,
            evidence=deduped
        ))

    if not out:
        raise ValueError("Step 2 had no canonical themes after processing")
    return out

@dataclass
class EvidenceBullet:
    product: str
    page: Optional[int]
    doc_id: str
    chunk_id: Optional[str]
    bullet: str

@dataclass
class ThemeCard:
    canonical_theme: str
    canonical_description: str
    products: List[str]
    evidence: List[EvidenceBullet]

    @property
    def payload(self) -> Dict[str, Any]:
        return {
            "canonical_theme": self.canonical_theme,
            "canonical_description": self.canonical_description,
            "products": self.products,
            "evidence": [
                {
                    "product": eb.product,
                    "page": eb.page,
                    "doc_id": eb.doc_id,
                    "chunk_id": eb.chunk_id,
                    "bullet": eb.bullet,
                }
                for eb in self.evidence
            ],
        }

def _dedup_evidence_refs(evidence: List[EvidenceRef]) -> List[EvidenceRef]:
    seen: set = set()
    out: List[EvidenceRef] = []
    for ev in evidence:
        key = (ev.product, ev.doc_id, ev.page, ev.chunk_id)
        if key in seen:
            continue
        seen.add(key)
        out.append(ev)
    return out

def _chunk_to_source_id_lookup(all_citations: List[Dict[str, Any]]) -> Dict[Tuple[str, str, Optional[int], Optional[str]], int]:
    lookup = {}
    for c in all_citations:
        fam = c.get("stamp_family")
        path = c.get("source_pdf_path") or c.get("stamp_name") or ""
        page = c.get("page_num")
        key = (fam, str(path), page, None)
        lookup[key] = c.get("source_id")
    return lookup

def build_theme_cards(
    canonical_themes: List[CanonicalTheme],
    llm_one_arg_fn,
    max_evidence_per_theme: int=3
) -> List[ThemeCard]:
    cards: List[ThemeCard] = []

    for ct in canonical_themes[:max_evidence_per_theme]:
        ev_refs = _dedup_evidence_refs(ct.evidence)
        per_prod: Dict[str, List[EvidenceRef]] = {"E": [], "I": [], "M": []}
        for ev in ev_refs:
            if ev.product in per_prod and len(per_prod[ev.product]) < 2:
                per_prod[ev.product].append(ev)

        picked: List[EvidenceRef] = []
        for prod in ['E', 'I', 'M']:
            picked.extend(per_prod[prod])

        if len(picked) < max_evidence_per_theme:
            for ev in ev_refs:
                if ev in picked:
                    continue
                picked.append(ev)
                if len(picked) >= max_evidence_per_theme:
                    break
        picked = picked[:max_evidence_per_theme]

        bullets: List[EvidenceBullet] = []
        for ev in picked:
            prompt = f"""
            You will write ONE evidence bullet sentence grounded only in the excerpt 
            
            Rules:
            - Output plain text only (no quotes, no JSON)
            - One sentence, <= 25 words
            - Do NOT invent details not in the excerpt
            - Keep it leadership-friendly
            
            Theme: {ct.canonical_theme}
            
            Excerpt:
            {ev.snippet}
            """.strip()

            bullet = llm_one_arg_fn(prompt).strip()
            bullets.append(EvidenceBullet(
                product=ev.product,
                page=ev.page,
                doc_id=ev.doc_id,
                chunk_id=ev.chunk_id,
                bullet=bullet
            ))
        cards.append(ThemeCard(
            canonical_theme=ct.canonical_theme,
            canonical_description=ct.canonical_description,
            products=sorted(list(ct.products)),
            evidence=bullets
        ))

    return cards

@dataclass
class GapItem:
    gap_type: str
    theme: str
    present_in: List[str]
    missing_in: List[str]
    impact: str
    confidence: str

def identify_coverage_gaps(canonical_themes: List[CanonicalTheme],
                           llm_one_arg_fn,
                           max_themes_to_check: int=10) -> List[GapItem]:
    gaps: List[GapItem] = []
    all_products = {'E', 'I', 'M'}

    for ct in canonical_themes:
        present = set(ct.products)
        missing = sorted(list(all_products - present))
        if missing:
            impact = "Limits cross-product consistency and comparability for enterprise-level decisions"
            confidence = "high"
            gaps.append(GapItem(
                gap_type="coverage",
                theme=ct.canonical_theme,
                present_in=sorted(list(present)),
                missing_in=missing,
                impact=impact,
                confidence=confidence
            ))
    return gaps

def identify_definition_gaps(
    canonical_themes: List[CanonicalTheme],
    llm_one_arg_fn,
    max_themes_to_check: int = 10
) -> List[GapItem]:
    
    gaps: List[GapItem] = []

    ranked = sorted(
        canonical_themes,
        key = lambda ct: (len(ct.products), len(ct.evidence)),
        reverse=True
    )[:max_themes_to_check]

    for ct in ranked:
        ev_refs = _dedup_evidence_refs(ct.evidence)[:3]
        combined = "\n\n".join([f"[{ev.product} p.{ev.page}] {ev.snippet}" for ev in ev_refs])

        prompt = f"""
        You are assessing whether the sources clearly DEFINE the theme in an operational way
        
        Theme: {ct.canonical_theme}
        Theme description: {ct.canonical_description}
        
        Sources (excerpts):
        {combined}
        
        Task:
        Return valid JSON ONLY with these keys:
        - "is_definition_gap": true/false
        - "why": one sentence (<= 25 words) grounded in the excerpts
        - "confidence": "high" | "medium" | "low" 
        
        Definition gap criteria:
        - Missing explicity thresholds, criteria, roles/responsibilities, or decision procedure
        - Or only general statemetns without operational detail
        
        IF the excerpts DO provide operational definition, set is_definition_gap=false
        """.strip()

        raw = llm_one_arg_fn(prompt)
        obj = _safe_json_load(raw)

        is_gap = bool(obj.get('is_definition_gap'))
        if not is_gap:
            continue

        why = (obj.get('why') or '').strip
        conf = (obj.get('confidence') or 'low').strip().lower()
        if conf not in ['high', 'medium', 'low']:
            conf = 'low'

        gaps.append(GapItem(
            gap_type = 'definition',
            theme = ct.canonical_theme,
            present_in = sorted(list(ct.products)),
            missing_in = [],
            impact = why if why else "Theme lacks operational definition",
            confidence = conf
        ))

    return gaps

def build_gap_summary(
    canonical_themes: List[CanonicalTheme],
    llm_one_arg_fn,
    max_definition_checks: int = 10
) -> List[GapItem]:
    gaps = []
    gaps.extend(identify_coverage_gaps(canonical_themes, llm_one_arg_fn))
    gaps.extend(identify_coverage_gaps(canonical_themes, llm_one_arg_fn, max_themes_to_check=max_definition_checks))
    return gaps

def build_exec_synthesis_prompt(question: str, theme_cards: List[ThemeCard], gaps: List[GapItem]) -> str:
    tc_payload = []
    for tc in theme_cards:
        tc_payload.append({
            "theme": tc.canonical_theme,
            "description": tc.canonical_description,
            "products": tc.products,
            "evidence": [
                {
                    "product": b.product,
                    "page": b.page,
                    "doc_id": b.doc_id,
                    "bullet": b.bullet
                } for b in tc.evidence
            ]
        })
    
    gap_payload = []
    for g in gaps:
        gap_payload.append({
            "type": g.gap_type,
            "theme": g.theme,
            "present_in": g.present_in,
            "missing_in": g.missing_in,
            "impact": g.impact,
            "confidence": g.confidence
        })

    return f"""
    Role: You are an executive decision-support analyst synthesizing information across STAMP products (E, I, M) using only the provided sources. Your goal is to produce a reliable, repeatable, executive-ready synthesis suitable for senior leadership review.

    Hard rules (non-negotiable):
    1. use ONLY the cited sources provided. Do not infer, assume or generalize beyond what is explicitly supported by citations.
    2. Evidence attribution rule (global): Only assert that data covers a specific asset type, facilitiy type, mission area, or population if the cited source explicitly names it. If coverage is implied only through adjacent indicators (e.g., bed space, BCI, BUILDER, PRV, condition indices), describe the coverage as implied but not confirmed and list it as a limitation. Do not upgrade implied coverage to confirmed coverage
    3. Cross-Product Discipline: if a finding applies to only one STAMP product, state that explicitly. Do not imply enterprise-wide coverage unless supported across products. 
    4. No recommendations: do not suggest actions, priorities, investments, or next steps. Implications must describe constraints or enablers only.
    5. Tone: write in a concise, neutral, executive briefing style. Avoid speculative language ("appears", "likely", "suggests", etc)
    6. Conservative Resolution Rule: when evidence across theme cards is inconsistent or ambiguous regarding explicit coverage of an asset or facility type, default to the most conservative interpretation. If any reasonable uncertainty exists about whether coverage is explicity, treat the coverage as not explicitly confirmed and describe it as a limitation.

    Required Output Format (STRICT):
    Use exactly the following section headers and order. Do not number sections. Do not add or remove sections.
    ## Executive summary
    - 2-3 short paragraphs
    - Summarize what data exists, what it covers, and major limitations
    - No bullets
    ## Cross-STAMP Themes
    - Each theme must have: Bolded theme title, 2-4 sentences synthesizing evidence across products
    - No nested bullets
    - Do not introduce themes not grounded in citations
    ## Gaps and Inconsistences
    - Bulleted list only
    - Describe missing coverage, inconsistent scope, or unclear definitions
    - Explicitly state when coverage is implied but not confirmed
    ## Implications
    - 2-4 bullets
    - Describe what the data enables or constraints
    - No recommendations, prioritization, or action language

    Additional Constraints:
    - Do not claim completeness where evidence is partial
    - Do not reconcile contradictions unless explicitly supported by sources
    - If evidence is insufficient to answer definitively, state that clearly
    - Consistency and accuracy take priority over completeness
   
    
    QUESTION:
    {question}
    
    THEME CARDS (pre-synthesized evidence):
    {json.dumps(tc.payload, ensure_ascii=False)}
    
    GAPS (identified):
    {json.dumps(gap_payload, ensure_ascii=False)}
    """.strip()

def render_v25_markdown(v25: dict) -> str:
    syn = (v25 or {}).get("synthesis", {}) or {}
    es = (syn.get("executive_summary", {}) or {}).get("text", "") or ""
    key_points = syn.get("key_points", []) or []
    gaps = (v25 or {}).get("gaps_and_limits", []) or []

    lines = []
    if es.strip():
        lines.append(es.strip())
        lines.append("")
    if key_points:
        lines.append("## Key Points")
        for kp in key_points:
            txt = (kp or {}).get("text", "")
            if txt and txt.strip():
                lines.append(f"- {txt.strip()}")
        lines.append("")
    if gaps:
        lines.append("## Gaps & Limits")
        for g in gaps:
            gtype = (g or {}).get("type", "")
            desc = (g or {}).get("description", "")
            if (gtype or desc):
                lines.append(f"- **{gtype}**: {desc}".strip())
        lines.append("")

    return "\n".join(lines).strip()

In [0]:
import json, re, uuid
from datetime import datetime, timezone

def safe_json_loads(s: str):
    if not s or not str(s).strip():
        raise ValueError("Empty LLM output")

    txt = str(s).strip()

    m = re.search(r"```(?:json)?\s*(.*?)\s*```", txt, flags=re.DOTALL | re.IGNORECASE)
    if m:
        candidate = m.group(1).strip()
    else:
        candidate = txt

    try:
        return json.loads(candidate)
    except json.JSONDecodeError as e:
        start = max(e.pos - 250, 0)
        end = min(e.pos + 250, len(candidate))
        excerpt = candidate[start:end]
        raise ValueError(
            f"LLM return invalid JSON\n"
            f"Excerpt around error (pos {e.pos})\n{excerpt}\n\n"
            f"Raw output (first 1200 chars)\n{txt[:1200]}")

def rows_to_v25_citations(rows, start_id=1):
    citations = []
    sid = start_id

    for r in rows:
        citations.append({
            "citation_id": f"S{sid}",
            "stamp_family": r.get('stamp_family') if isinstance(r, dict) else getattr(r, 'stamp_family', None),
            "stamp_product": r.get('stamp_product') if isinstance(r, dict) else getattr(r, 'stamp_product', None),
            "stamp_version": r.get('stamp_version') if isinstance(r, dict) else getattr(r, 'stamp_version', None),
            "document_id": r.get("doc_id", "") if isinstance(r, dict) else getattr(r, "doc_id", ""),
            "document_title": r.get("stamp_name", "") if isinstance(r, dict) else getattr(r, "stamp_name", ""),
            "page": int(r.get("page_num")) if isinstance(r, dict) and r.get("page_num") is not None else (int(getattr(r, "page_num")) if hasattr(r, "page_num") and getattr(r, "page_num") is not None else None),
            "figure": r.get("figure_refs") if isinstance(r, dict) else getattr(r, "figure_refs", None),
            "chunk_id": r.get("chunk_id", "") if isinstance(r, dict) else getattr(r, "chunk_id", ""), 
            "quote": ""
        })
        sid += 1

    return citations, sid

def build_v25_json_prompt(question, theme_cards, gaps, citations, schema_version="2.5"):
    run_id = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + "__" + uuid.uuid4().hex[:6]

    cite_view = [
        {
            "citation_id": c["citation_id"],
            "stamp_family": c["stamp_family"],
            "stamp_product": c["stamp_product"],
            "stamp_version": c["stamp_version"],
            "page": c["page"],
            "chunk_id": c["chunk_id"],
            "document_title": c.get("document_title", "")
        }
        for c in citations
    ]

    json_skeleton = {
        "schema_version": schema_version,
        "run_id": run_id,
        "question": question,
        "retrieval_summary": {
            "families_considered": ["E", "I", "M"],
            "chunks_used": 0,
            "coverage_by_family": {"E": 0, "I": 0, "M": 0}
        },
        "claims": [],
            # {
            #     "claim_id": "C1",
            #     "statement": "",
            #     "support": [{"citation_id":"S1","relevance":"primary"}],
            #     "confidence": "low",
            #     "notes": ""
            # }
            # ],
            "synthesis": {
                "executive_summary": {"text":"", "derived_from_claims":[]},
                "key_points": [],
                "recommended_next_questions": []
            },
            "gaps_and_limits": [],
                # {
                #     "gap_id":"G1",
                #     "type":"no_evidence",
                #     "description": "",
                #     "impact":"",
                #     "affected_families":["E", "I", "M"],
                #     "related_claims":[]
                # }
                # ],
                "citations": cite_view,
                "evidence_metrics": {
                    "claims_count": 0,
                    "citations_count": len(cite_view),
                    "unique_documents_count": 0,
                    "coverage_by_family": {
                        "E": {"claims": 0, "citations":0},
                        "I": {"claims": 0, "citations":0},
                        "M": {"claims": 0, "citations":0}
                    },
                    "conflict_detected": False,
                    "thin_evidence_claim_ids": [],
                    "overall_confidence": "low"
                },
                "validation": {
                    "schema_version_valid": True,
                    "all_claims_supported": True,
                    "no_external_knowledge_used": True,
                    "all_synthesis_derived_from_claims": True,
                    "gaps_section_present": True,
                    "status":"valid",
                    "notes":""
    }
    }

    return f"""
    You are STAMP AI. 
    RETURN FORMAT (STRICT):
    - Return ONLY one fenced JSON block.
    - No text before or after
    - Use EXACT key names (match the skeleton exactly)
    - Use ONLY double quotes in JSON.
    - Do NOT invent citation_id values. Use only the provided citations list.

    Output exactly this structure (fill in values; keep keys the same):
    {json.dumps(json_skeleton, ensure_ascii=False, indent=2)}


    RULES:
    1. Every claim MUST have at least 1 support entry with a valid citation_id from citations[]
    2. Synthesis text MUST ONLY restate/combine claims, and MUST list derived_from_claims.
    3. If not supported: set claims=[] and add a gaps_and_limits entry with type="no_evidence".
    4. You may delete the example items in arrays, but keep arrays present (claims can be empty).
    5. You MUST keep citations[] as provided (you can include all of them).

    INPUTS:
    - Theme cards (guidance only): {json.dumps(make_json_safe(theme_cards), ensure_ascii=False)}
    - Gap summary (guidance only): {json.dumps(make_json_safe(gaps), ensure_ascii=False)}
    """.strip()

from dataclasses import asdict, is_dataclass

def make_json_safe(obj):
    if is_dataclass(obj):
        return asdict(obj)
    
    if hasattr(obj, "model_dump") and callable(getattr(obj, "model_dump")):
        return obj.model_dump()
    if hasattr(obj, "dict") and callable(getattr(obj, "dict")):
        return obj.dict()
    if hasattr(obj, "asDict") and callable(getattr(obj, "asDict")):
        return obj.asDict(recursive=True)
    if hasattr(obj, "__dict__"):
        return {k: make_json_safe(v) for k, v in obj.__dict__.items() if not k.startswith("_")}
    if isinstance(obj, (list, tuple)):
        return [make_json_safe(x) for x in obj]
    if isinstance(obj, dict):
        return {str(k): make_json_safe(v) for k, v in obj.items()}
    
    return obj

In [0]:
import json, requests

def ask_stamp_xscope(
    question: str,
    endpoint: str = 'databricks-claude-sonnet-4-5',
    top_k_each: int = 6
):
    q = question or ""

    try:
        version = latest_stamp_version()
    except Exception:
        version = None
    
    try:
        product = infer_product(q)
    except Exception:
        product = None

    e_rows = retrieve_top_chunks(
        question=q,
        stamp_family='E',
        stamp_product=product,
        stamp_version=version,
        top_k=top_k_each
    )

    i_rows = retrieve_top_chunks(
        question=q,
        stamp_family='I',
        stamp_product='All',
        stamp_version=version,
        top_k=top_k_each
    )

    m_rows = retrieve_top_chunks(
        question=q,
        stamp_family='M',
        stamp_product='All',
        stamp_version=version,
        top_k=top_k_each
    )

    e_chunks = _rows_to_chunks(e_rows, "E")
    i_chunks = _rows_to_chunks(i_rows, "I")
    m_chunks = _rows_to_chunks(m_rows, "M")

    _model = lambda p: llm_call_fn(p, endpoint=endpoint, temperature=0.0, max_tokens=700)

    e_themes = extract_themes_per_product("E", e_chunks, _model)
    i_themes = extract_themes_per_product("I", i_chunks, _model)
    m_themes = extract_themes_per_product("M", m_chunks, _model)

    canonical_themes = align_cross_product_themes(e_themes, i_themes, m_themes, _model)
    theme_cards = build_theme_cards(canonical_themes, _model, max_evidence_per_theme=3)
    gaps = build_gap_summary(canonical_themes, _model, max_definition_checks=8)

    # def pack_sources(rows, start_id=1):
    #     blocks = []
    #     citations = []
    #     source_id = start_id
    #     for r in rows:
    #         blocks.append( f"""SOURCE {source_id}
    #             STAMP: {r['stamp_name']} | Family={r['stamp_family']} Product={r['stamp_product']} Version={r['stamp_version']}
    #             ORG: installation={r['installation'] or "None"} majcom={r['majcom'] or "None"}
    #             PAGE: {r['page_num']} | FIGURES: {r['figure_refs'] or "none"}
    #             TEXT: 
    #             {r['chunk_text']}"""
    #             )

    #         citations.append({
    #             "source_id": source_id,
    #             "stamp_name": r['stamp_name'],
    #             "stamp_family": r['stamp_family'],
    #             "stamp_product": r['stamp_product'],
    #             "stamp_version": r['stamp_version'],
    #             'installation': r['installation'],
    #             'majcom': r['majcom'],
    #             "page_num": int(r['page_num']) if r['page_num'] is not None else None,
    #             "figure_refs": r['figure_refs'],
    #             "source_pdf_path": r['source_pdf_path'],
    #         }) 
    #         source_id += 1
        
    #     return blocks, citations, source_id
    
    # e_blocks, e_cites, next_id = pack_sources(e_rows, start_id=1)
    # i_blocks, i_cites, next_id = pack_sources(i_rows, start_id=next_id)
    # m_blocks, m_cites, next_id = pack_sources(m_rows, start_id=next_id)

    # all_blocks = e_blocks + i_blocks + m_blocks
    # all_citations = e_cites + i_cites + m_cites

    # v2.5 version of pack sources
    def _row_to_dict(r):
        return r.asDict(recursive=True) if hasattr(r, "asDict") else dict(r)
    
    e_rows_d = [_row_to_dict(r) for r in e_rows]
    i_rows_d = [_row_to_dict(r) for r in i_rows]
    m_rows_d = [_row_to_dict(r) for r in m_rows]

    e_cites, next_id = rows_to_v25_citations(e_rows_d, start_id=1)
    i_cites, next_id = rows_to_v25_citations(i_rows_d, start_id=next_id)
    m_cites, next_id = rows_to_v25_citations(m_rows_d, start_id=next_id)

    all_citations = e_cites + i_cites + m_cites

    if len(all_citations) == 0: 
        run_id = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M%SZ") + "__" + uuid.uuid4().hex[:6]
        v25 = {
            "schema_version": "2.5",
            "run_id": run_id,
            "question": q,
            "retrieval_summary": {
                "families_considered": ['E', 'I', 'M'],
                "chunks_used": 0,
                "coverage_by_family": {'E':0, 'I':0, 'M':0}
            },
            "claims": [],
            "synthesis": {
                "executive_summary": {'text': "Not supported by the STAMP for this query.", "derived_from_claims": []},
                "key_points": [],
                "recommended_next_questions": []
            },
            "gaps_and_limits": [{
                "gap_id": "G1",
                "type": "no_evidence",
                "description": "No relevant STAMP chunks were retrieved for the question.",
                "impact": "The system cannot answer without STAMP support.",
                "affected_families": ['E', 'I', 'M'],
                "related_claims": []
            }],
            "citations": [],
            "evidence_metrics": {
                "claims_count": 0,
                "citations_count": 0,
                "unique_documents_count": 0,
                "coverage_by_family": {"E":{"claims":0, "citations":0}, "I":{"claims":0, "citations":0}, "M":{"claims":0, "citations":0}},
                "conflict_detected": False,
                "thin_evidence_claim_ids": [],
                "overall_confidence": "low"
            },
            "validation": {
                "schema_version_valid": True,
                "all_claims_supported": True,
                "no_external_knowledge_used": True,
                "all_synthesis_derived_from_claims": True,
                "gaps_section_present": True,
                "status": "valid",
                "notes": ''
            }
        }

        return {
            "v25": v25, 
            "markdown": render_v25_markdown(v25),
            "debug": {"version": version, "product": product, "counts": {"E": len(e_rows), "I": len(i_rows), "M": len(m_rows)}}}

    # if len(all_blocks) == 0:
    #     return {
    #         "answer": "Not supported by the STAMPS",
    #         "citations": [],
    #         "debug": {"version": version, "product": product}
    #     }

    # prompt = build_exec_synthesis_prompt(question, theme_cards, gaps)
    v25_prompt = build_v25_json_prompt(q, theme_cards, gaps, all_citations, schema_version="2.5")
    #answer = llm_call_fn(prompt, endpoint=endpoint, temperature=0.2, max_tokens=1100)
    raw = llm_call_fn(v25_prompt, endpoint=endpoint, temperature=0.0, max_tokens=1800)

    v25 = safe_json_loads(raw)

    # return {
    #     "answer": answer,
    #     "citations": all_citations,
    #     "debug": {"version": version, "product": product, "counts": {"E": len(e_rows), "I": len(i_rows), "M": len(m_rows)}}
    # }

    return {
        "v25": v25,
        "markdown": render_v25_markdown(v25),
        "debug": {"version": version, "product": product, "counts": {"E": len(e_rows), "I": len(i_rows), "M": len(m_rows)}}
    }

In [0]:
# def print_xscope(out):
#     print(out['answer'])
#     print('\nCitations (by bucket):')
#     for c in out.get('citations', []):
#         org = c.get('installation') or c.get('majcom') or ''
#         org = f" | {org}" if org else ""
#         fig = f", {c['figure_refs']}" if c.get("figure_refs") else ''
#         print(f"- {c['stamp_family']}/{c['stamp_product']} {c['stamp_version']}{org} (p.{c['page_num']}{fig})")

In [0]:
# out = ask_stamp_xscope("what utilities risks are consistent across the enterprise, installations, and MAJCOMs and where do they differ?")
out = ask_stamp_xscope("What utilitites condition data exists across STAMP products?")
# out = ask_stamp_xscope("Across scopes, what are the top recurring drivers of infrastructure risk?")
display(out["markdown"])
out["v25"]

[0;31m---------------------------------------------------------------------------[0m
[0;31mJSONDecodeError[0m                           Traceback (most recent call last)
File [0;32m<command-5522249380927693>, line 17[0m, in [0;36msafe_json_loads[0;34m(s)[0m
[1;32m     16[0m [38;5;28;01mtry[39;00m:
[0;32m---> 17[0m     [38;5;28;01mreturn[39;00m json[38;5;241m.[39mloads(candidate)
[1;32m     18[0m [38;5;28;01mexcept[39;00m json[38;5;241m.[39mJSONDecodeError [38;5;28;01mas[39;00m e:

File [0;32m/usr/lib/python3.12/json/__init__.py:346[0m, in [0;36mloads[0;34m(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)[0m
[1;32m    343[0m [38;5;28;01mif[39;00m ([38;5;28mcls[39m [38;5;129;01mis[39;00m [38;5;28;01mNone[39;00m [38;5;129;01mand[39;00m object_hook [38;5;129;01mis[39;00m [38;5;28;01mNone[39;00m [38;5;129;01mand[39;00m
[1;32m    344[0m         parse_int [38;5;129;01mis[39;00m [38;5;28;01mNone[39;00m