In [1]:
import llama_cpp
from llama_cpp import llama_supports_gpu_offload

print("Versión llama-cpp-python:", llama_cpp.__version__)
print("GPU offload soportado:", llama_supports_gpu_offload())


ggml_cuda_init: GGML_CUDA_FORCE_MMQ:    no
ggml_cuda_init: GGML_CUDA_FORCE_CUBLAS: no
ggml_cuda_init: found 1 CUDA devices:
  Device 0: NVIDIA GeForce RTX 4050 Laptop GPU, compute capability 8.9, VMM: yes


Versión llama-cpp-python: 0.3.16
GPU offload soportado: True


In [2]:
from __future__ import annotations

import json
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Tuple

import numpy as np
import fitz  # pymupdf
from sentence_transformers import SentenceTransformer
from llama_cpp import Llama


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# ===== RUTAS EN TU MONOREPO =====
REPO_ROOT = Path("..").resolve()
MODEL_PATH = REPO_ROOT / "models" / "qwen2.5-3b-instruct-q5_k_m.gguf"

INPUT_JSON_PATH = REPO_ROOT / "Data" / "input_LLM.json"

OUTPUT_JSON_PATH = REPO_ROOT / "Data" / "output_LLM.json"

RAG_NORMAS_DIR = REPO_ROOT / "Data" / "rag_normas"

print("MODEL_PATH:", MODEL_PATH)
print("INPUT_JSON_PATH:", INPUT_JSON_PATH)
print("RAG_NORMAS_DIR:", RAG_NORMAS_DIR)
print("OUTPUT_JSON_PATH:", OUTPUT_JSON_PATH)


MODEL_PATH: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/models/qwen2.5-3b-instruct-q5_k_m.gguf
INPUT_JSON_PATH: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/input_LLM.json
RAG_NORMAS_DIR: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/rag_normas
OUTPUT_JSON_PATH: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/output_LLM.json


In [4]:
llm = Llama(
    model_path=str(MODEL_PATH),
    n_ctx=6000,
    n_threads=os.cpu_count() or 8,
    n_gpu_layers=10,          # si tienes CUDA: prueba 20, 30, 35...
    chat_format="chatml",    # Qwen2.5 Instruct suele ir bien con ChatML
    verbose=False,
)

print("LLM loaded OK")


llama_context: n_ctx_per_seq (6000) < n_ctx_train (32768) -- the full capacity of the model will not be utilized


LLM loaded OK


In [5]:
with open(INPUT_JSON_PATH, "r", encoding="utf-8") as f:
    payload = json.load(f)

payload.keys()


dict_keys(['obra_id', 'obra_tipo', 'obra_descripcion', 'ubicacion', 'normativa_objetivo', 'materiales', 'resumen'])

In [6]:
# ============================================================
# RAG: Selección de páginas + OCR sólo donde haga falta
# (para NO procesar PDFs completos como NOM-001 (780 pág.))
# ============================================================

# --- (A) Resolver PDFs (busca primero en Data/rag_normas, si no, usa rutas locales) ---
# Ajusta estos nombres para que coincidan con tus archivos reales.
PDF_SOURCES = {
    "NOM-001-SEDE-2012": ["NOM-001-SEDE-2012.pdf"],
    "Reglamento_Construcciones_CDMX": ["RGTO_CONSTRUCCIONES_22_04_2022.pdf", "Reglamento_Construcciones_CDMX.pdf"],
    "NACDMX-007-RNAT-2019": ["GOCDMX_21-07-20_SEDEMA.pdf", "NACDMX-007-RNAT-2019.pdf"],
    "Ley_Residuos_Solidos_CDMX": ["5e9cfdc1fa63fdf6120fd92f434a3e407d58af30.pdf", "Ley_Residuos_Solidos_CDMX.pdf"],
    "GHG_Protocol_Scope3": ["protocolo_spanish.pdf", "GHG_Protocol.pdf"],
    "ISO_14064-1": ["ISO_14064-1_2018.pdf", "Plantilla_NORMAISO14064-1.pdf"],
    "ISO_14067": ["UNE-EN-ISO-14067.pdf"],
    # Si tienes RENE en PDF, agrégalo aquí:
    "RENE": ["RENE.pdf"],
    # Si tienes NADF-018 en PDF, agrégalo aquí:
    "NADF-018-AMBT-2009": ["NADF-018-AMBT-2009.pdf"],
}

def resolve_pdf_path(doc_key: str, rag_dir: Path) -> Path | None:
    """
    Busca un PDF por nombre dentro de Data/rag_normas.
    Si no existe, intenta también en /mnt/data (útil en esta demo).
    """
    candidates = PDF_SOURCES.get(doc_key, [])
    for name in candidates:
        p1 = rag_dir / name
        if p1.exists():
            return p1
        p2 = Path("/mnt/data") / name
        if p2.exists():
            return p2
    return None

# --- (B) Keywords por norma (y también tomaremos materiales/ubicación del JSON) ---
NORM_KEYWORDS: Dict[str, List[str]] = {
    "NOM-001-SEDE-2012": [
        "instalaciones eléctricas", "vivienda", "Artículo 110", "Artículo 210", "Artículo 220",
        "Artículo 230", "Artículo 240", "Artículo 250", "puesta a tierra", "circuitos derivados",
        "alimentadores", "tableros", "contactos", "conductores", "sobrecorriente",
    ],
    "Reglamento_Construcciones_CDMX": [
        "licencia", "manifestación de construcción", "obra", "seguridad estructural", "dictamen",
        "director responsable de obra", "corresponsable", "protección civil", "demolición",
        "modificación", "ampliación", "reparación", "instalaciones",
    ],
    "NACDMX-007-RNAT-2019": [
        "residuos de la construcción", "demolición", "clasificación", "manejo integral",
        "plan de manejo", "separación", "acopio", "centros de acopio", "bitácora",
        "transporte", "disposición final", "reciclaje", "reutilización",
    ],
    "Ley_Residuos_Solidos_CDMX": [
        "residuos", "gestión integral", "aprovechamiento", "valorización", "acopio",
        "recolección", "transporte", "tratamiento", "disposición final", "sanciones",
        "residuos de la construcción", "demolición",
    ],
    "GHG_Protocol_Scope3": [
        "límite organizacional", "límite operacional", "alcance 1", "alcance 2", "alcance 3",
        "emisiones indirectas", "cálculo", "factores de emisión", "calidad de datos",
        "verificación", "reporte", "principios",
    ],
    "ISO_14064-1": [
        "límites de la organización", "límites de informe", "emisiones directas", "emisiones indirectas",
        "inventario", "año base", "cuantificación", "selección", "recopilación de datos",
        "principios", "exactitud", "transparencia",
    ],
    "ISO_14067": [
        "huella de carbono de productos", "unidad funcional", "unidad declarada", "límite del sistema",
        "ciclo de vida", "calidad de datos", "objetivo y alcance", "metodología", "cuantificación",
    ],
    "RENE": ["energía", "eficiencia", "edificación", "requisitos", "envolvente", "instalaciones"],
    "NADF-018-AMBT-2009": ["polvo", "partículas", "obra", "construcción", "control", "mitigación", "emisiones"],
}

def normalize_text(s: str) -> str:
    return re.sub(r"\s+", " ", (s or "")).strip().lower()

def score_page(text: str, keywords: List[str]) -> int:
    t = normalize_text(text)
    score = 0
    for kw in keywords:
        kw_n = normalize_text(kw)
        if not kw_n:
            continue
        score += t.count(kw_n)
    return score

def select_pages_by_keywords(pdf_path: Path, keywords: List[str], max_pages: int = 25, add_neighbors: int = 1) -> List[int]:
    """
    Selecciona páginas con más ocurrencias de keywords (usando texto embebido del PDF).
    NO usa OCR en esta fase (rápido). Devuelve números de página 0-index.
    """
    doc = fitz.open(str(pdf_path))
    scored: List[Tuple[int, int]] = []
    for i in range(doc.page_count):
        txt = doc.load_page(i).get_text("text") or ""
        s = score_page(txt, keywords)
        if s > 0:
            scored.append((i, s))

    scored.sort(key=lambda x: x[1], reverse=True)
    top_pages = [p for p, _ in scored[:max_pages]]

    # Expandir con páginas vecinas para no cortar contexto
    expanded = set()
    for p in top_pages:
        for j in range(p - add_neighbors, p + add_neighbors + 1):
            if 0 <= j < doc.page_count:
                expanded.add(j)

    return sorted(expanded)

def try_ocr_page(pdf_path: Path, page_no: int, dpi: int = 220) -> str:
    """
    OCR de una sola página. Si pytesseract no está disponible, devuelve texto vacío.
    """
    try:
        import pytesseract
        from PIL import Image
    except Exception:
        return ""

    doc = fitz.open(str(pdf_path))
    page = doc.load_page(page_no)
    pix = page.get_pixmap(dpi=dpi, alpha=False)
    img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
    text = pytesseract.image_to_string(img, lang="spa")
    return text

def extract_selected_pages(pdf_path: Path, pages: List[int], ocr_if_text_lt: int = 80) -> List[Dict[str, Any]]:
    """
    Extrae contenido por página. Usa:
    - texto embebido si hay suficiente
    - OCR sólo si el texto embebido es muy poco (típico de páginas escaneadas)
    """
    doc = fitz.open(str(pdf_path))
    out: List[Dict[str, Any]] = []

    for p in pages:
        page = doc.load_page(p)
        txt = (page.get_text("text") or "").strip()
        used = "text_layer"

        if len(txt) < ocr_if_text_lt:
            ocr_txt = (try_ocr_page(pdf_path, p) or "").strip()
            if len(ocr_txt) > len(txt):
                txt = ocr_txt
                used = "ocr"

        # limpiar mínimo
        txt = re.sub(r"[ \t]+", " ", txt)
        txt = re.sub(r"\n{3,}", "\n\n", txt).strip()

        out.append({
            "page": p,               # 0-index
            "used": used,            # "text_layer" | "ocr"
            "text": txt,
        })
    return out

def infer_keywords_from_payload(payload: Dict[str, Any]) -> List[str]:
    kws: List[str] = []

    # ubicación
    ub = payload.get("ubicacion", {}) or {}
    for k in ["pais", "estado", "alcaldia_municipio"]:
        if ub.get(k):
            kws.append(str(ub[k]))

    # obra tipo / descripción
    if payload.get("obra_tipo"):
        kws.append(str(payload["obra_tipo"]))
    if payload.get("obra_descripcion"):
        kws.append(str(payload["obra_descripcion"]))

    # materiales
    for m in payload.get("materiales", []) or []:
        if m.get("material_nombre"):
            kws.append(str(m["material_nombre"]))
        if m.get("categoria"):
            kws.append(str(m["categoria"]))

    # algunas palabras generales útiles
    kws += ["obra", "construcción", "vivienda", "residuos", "emisiones", "CO2", "huella de carbono"]

    # normalizar y deduplicar manteniendo orden
    seen = set()
    out = []
    for k in kws:
        k2 = normalize_text(k)
        if k2 and k2 not in seen:
            seen.add(k2)
            out.append(k)
    return out

def flatten_normas(normativa_objetivo: Dict[str, Any]) -> List[str]:
    """
    Convierte el objeto normativa_objetivo en una lista plana de claves.
    """
    norms: List[str] = []
    for _, arr in (normativa_objetivo or {}).items():
        if isinstance(arr, list):
            norms.extend(arr)
    # dedupe
    seen = set()
    out = []
    for n in norms:
        if n not in seen:
            seen.add(n)
            out.append(n)
    return out

# --- (C) Construir manifest de páginas por norma, basado en el JSON de entrada ---
normas_objetivo = flatten_normas(payload.get("normativa_objetivo", {}))
payload_kws = infer_keywords_from_payload(payload)

page_manifest: Dict[str, Any] = {
    "obra_id": payload.get("obra_id"),
    "docs": [],
    "missing_docs": [],
}

rag_pages: List[Dict[str, Any]] = []

for norma in normas_objetivo:
    # mapear norma -> doc_key del catálogo
    doc_key = norma  # por defecto: ya coincide con tus claves (ISO_14064-1, etc.)

    pdf_path = resolve_pdf_path(doc_key, RAG_NORMAS_DIR)
    if not pdf_path:
        page_manifest["missing_docs"].append({"doc_key": doc_key, "norma": norma})
        continue

    # keywords = (específicas de la norma) + (del payload)
    kw = list(dict.fromkeys((NORM_KEYWORDS.get(doc_key, []) + payload_kws)))

    # seleccionar páginas
    pages = select_pages_by_keywords(pdf_path, kw, max_pages=25, add_neighbors=1)

    # si no encontró nada, al menos toma el índice/intro (primeras 3-5 páginas)
    if not pages:
        pages = list(range(0, min(5, fitz.open(str(pdf_path)).page_count)))

    # extraer (OCR sólo si hace falta)
    extracted = extract_selected_pages(pdf_path, pages, ocr_if_text_lt=80)

    page_manifest["docs"].append({
        "doc_key": doc_key,
        "path": str(pdf_path),
        "selected_pages_0index": pages,
        "extraction_mode_stats": {
            "text_layer": sum(1 for e in extracted if e["used"] == "text_layer"),
            "ocr": sum(1 for e in extracted if e["used"] == "ocr"),
        },
    })

    # guardar páginas como docs RAG a nivel página (para chunking + trazabilidad)
    for e in extracted:
        rag_pages.append({
            "doc_id": doc_key,
            "path": str(pdf_path),
            "page": e["page"],          # 0-index
            "used": e["used"],
            "content": e["text"],
        })

# Persistir manifest (para depurar y repetir el mismo corte)
RAG_NORMAS_DIR.mkdir(parents=True, exist_ok=True)
manifest_path = RAG_NORMAS_DIR / "page_manifest.json"
manifest_path.write_text(json.dumps(page_manifest, ensure_ascii=False, indent=2), encoding="utf-8")

print("Normas objetivo:", normas_objetivo)
print("Páginas seleccionadas (docs):", len(page_manifest["docs"]))
print("Docs faltantes:", len(page_manifest["missing_docs"]))
print("RAG pages (total):", len(rag_pages))
print("Manifest guardado en:", manifest_path)
page_manifest["docs"][:2]

Normas objetivo: ['RENE', 'ISO_14064-1', 'ISO_14067', 'GHG_Protocol_Scope3', 'NACDMX-007-RNAT-2019', 'NADF-018-AMBT-2009', 'Ley_Residuos_Solidos_CDMX', 'Reglamento_Construcciones_CDMX', 'NOM-001-SEDE-2012']
Páginas seleccionadas (docs): 5
Docs faltantes: 4
RAG pages (total): 194
Manifest guardado en: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/rag_normas/page_manifest.json


[{'doc_key': 'ISO_14064-1',
  'path': '/home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/rag_normas/ISO_14064-1_2018.pdf',
  'selected_pages_0index': [1,
   2,
   3,
   4,
   5,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   40,
   41,
   42,
   48,
   49,
   50,
   51,
   52,
   53,
   54,
   55,
   56,
   57],
  'extraction_mode_stats': {'text_layer': 39, 'ocr': 0}},
 {'doc_key': 'ISO_14067',
  'path': '/home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/rag_normas/UNE-EN-ISO-14067.pdf',
  'selected_pages_0index': [0, 1, 2, 3, 4],
  'extraction_mode_stats': {'text_layer': 5, 'ocr': 0}}]

In [7]:
# ============================================================
# Chunking (desde páginas seleccionadas) + Embeddings
# Cada chunk conserva: doc_id + page + chunk_id
# ============================================================

def chunk_text(text: str, chunk_size: int = 900, overlap: int = 120) -> List[str]:
    text = re.sub(r"\s+", " ", text).strip()
    if not text:
        return []
    chunks = []
    start = 0
    while start < len(text):
        end = min(len(text), start + chunk_size)
        chunks.append(text[start:end])
        start = end - overlap
        if start < 0:
            start = 0
        if end == len(text):
            break
    return chunks

all_chunks: List[Dict[str, Any]] = []
for d in rag_pages:
    for i, ch in enumerate(chunk_text(d["content"], chunk_size=900, overlap=140)):
        all_chunks.append({
            "doc_id": d["doc_id"],
            "page": d["page"],  # 0-index
            "used": d["used"],
            "chunk_id": f"{d['doc_id']}::p{d['page']:04d}::chunk{i:04d}",
            "text": ch,
        })

print("Total chunks:", len(all_chunks))

# Embeddings
embed_model = SentenceTransformer("all-MiniLM-L6-v2")

chunk_texts = [c["text"] for c in all_chunks]
chunk_embs = embed_model.encode(chunk_texts, normalize_embeddings=True)

print("Embeddings ready:", chunk_embs.shape)



Total chunks: 995


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 1283.04it/s, Materializing param=pooler.dense.weight]                             
BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


Embeddings ready: (995, 384)


In [8]:
# (opcional) tabla rápida para inspección
import pandas as pd
df = pd.DataFrame(all_chunks[:10])
print(df)

        doc_id  page        used                       chunk_id  \
0  ISO_14064-1     1  text_layer  ISO_14064-1::p0001::chunk0000   
1  ISO_14064-1     1  text_layer  ISO_14064-1::p0001::chunk0001   
2  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0000   
3  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0001   
4  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0002   
5  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0003   
6  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0004   
7  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0005   
8  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0006   
9  ISO_14064-1     2  text_layer  ISO_14064-1::p0002::chunk0007   

                                                text  
0  ﻿ ISO 14064-1:2018 (traducción oficial) ﻿ DOCU...  
1  iza Versión española publicada en 2019 Traducc...  
2  ﻿ ISO 14064-1:2018 (traducción oficial) ﻿ Pról...  
3  .......................................

In [9]:
from typing import List, Dict, Any

def retrieve_context(query: str, top_k: int = 4, filter_doc_ids: List[str] | None = None) -> str:
    """Devuelve un string con los top-k chunks (útil para debug rápido)."""
    q_emb = embed_model.encode([query], normalize_embeddings=True)[0]
    scores = np.dot(chunk_embs, q_emb)

    # Filtrar por doc_ids si los damos (ej: ISO_14067)
    if filter_doc_ids:
        mask = np.array([c["doc_id"] in set(filter_doc_ids) for c in all_chunks], dtype=bool)
        scores = np.where(mask, scores, -1e9)

    top_idx = np.argsort(scores)[-top_k:][::-1]
    selected = [all_chunks[i] for i in top_idx]

    context = "\n\n".join(
        f"[{s['doc_id']} | {s['chunk_id']} | page={s.get('page', -1)}]\n{s['text']}"
        for s in selected
    )
    return context


def retrieve_context_items(
    query: str,
    top_k: int = 4,
    filter_doc_ids: List[str] | None = None
) -> List[Dict[str, Any]]:
    """Devuelve items con metadata (doc_id/page/chunk_id/score/text) para que el LLM pueda citar evidencia."""
    q_emb = embed_model.encode([query], normalize_embeddings=True)[0]
    scores = np.dot(chunk_embs, q_emb)

    if filter_doc_ids:
        allowed = set(filter_doc_ids)
        mask = np.array([c["doc_id"] in allowed for c in all_chunks], dtype=bool)
        scores = np.where(mask, scores, -1e9)

    top_idx = np.argsort(scores)[-top_k:][::-1]
    items: List[Dict[str, Any]] = []
    for i in top_idx:
        c = all_chunks[i]
        items.append({
            "doc_id": c["doc_id"],
            "page": int(c.get("page", -1)),
            "chunk_id": c["chunk_id"],
            "score": float(scores[i]),
            "text": c["text"],
        })
    return items


def format_items_for_prompt(items: List[Dict[str, Any]], max_chars: int = 2500) -> str:
    """Convierte items a un bloque compacto con citas. Recorta para no explotar tokens."""
    lines = []
    for it in items:
        snippet = (it.get("text") or "").strip().replace("\n", " ")
        snippet = snippet[:900]
        lines.append(
            f"- [doc={it['doc_id']}|page={it['page']}|chunk={it['chunk_id']}|score={it['score']:.3f}] {snippet}"
        )
    s = "\n".join(lines)
    if len(s) > max_chars:
        s = s[:max_chars] + "\n[...TRUNCATED ITEMS...]"
    return s


# prueba rápida (debug)
print(retrieve_context("ISO 14067 unidad funcional límites del sistema reporte verificación", top_k=3, filter_doc_ids=["ISO_14067"]))


[ISO_14067 | ISO_14067::p0003::chunk0000 | page=3]
EXTRACTO DEL DOCUMENTO UNE-EN ISO 14067 6.4.2 Recopilación de datos ................................................................................................... 34 6.4.3 Validación de datos ........................................................................................................ 35 6.4.4 Relación de los datos con los procesos unitarios y la unidad funcional o declarada .................................................................................................... 35 6.4.5 Ajuste de los límites del sistema ............................................................................... 35 6.4.6 Asignación ......................................................................................................................... 36 6.4.7 Seguimiento del desempeño de la HCP ................................................................... 38 6.4.8 Evaluación del efe

[ISO_14067 | ISO_14067::p0001::chunk0000 | page

In [10]:
SYSTEM_PROMPT = """Eres un auditor técnico de cumplimiento normativo y huella de carbono para obras de construcción en México.
Tu tarea: evaluar si la obra cumple o qué información falta para evaluar el cumplimiento según normas objetivo.

IMPORTANTE:
- Usa ÚNICAMENTE la evidencia del JSON de entrada y el EVIDENCE_PACK (proveniente del RAG).
- NO inventes datos. Si falta algo, marca 'needs_info' y lista faltantes concretos y preguntas.
- Si el EVIDENCE_PACK contiene citas [doc=...|page=...|chunk=...], debes usarlas en 'evidencia_usada'.
- Devuelve SIEMPRE un JSON válido (sin texto extra) con el esquema solicitado."""

OUTPUT_SCHEMA_HINT = """
Debes devolver EXACTAMENTE este formato JSON:

{
  "obra_id": "...",
  "evaluacion_fecha": "YYYY-MM-DD",
  "resultado_global": {
    "estatus": "pass|fail|needs_info",
    "riesgo": "bajo|medio|alto",
    "razones": ["..."]
  },
  "por_norma": [
    {
      "norma": "...",
      "estatus": "pass|fail|needs_info",
      "hallazgos": ["..."],
      "evidencia_usada": [
        {
          "tipo": "json|rag",
          "ref": "materiales[0].factor_emision_kgco2e_por_unidad | doc=ISO_14067|page=12|chunk=...",
          "extracto": "..."
        }
      ],
      "faltantes": ["..."],
      "acciones_recomendadas": ["..."]
    }
  ],
  "preguntas_para_completar": ["..."]
}

Reglas:
- 'pass' solo si hay evidencia suficiente.
- 'fail' si hay evidencia de incumplimiento.
- 'needs_info' si faltan datos para dictaminar.
- Si existe evidencia RAG (citas con doc/page/chunk), 'evidencia_usada' NO puede quedar vacía.
- 'evidencia_usada' puede referir rutas del JSON (ej: "materiales[0].co2e_kg") o citas del RAG.
"""


In [11]:
from datetime import date
from typing import List, Dict, Any

def extract_normas(normativa_objetivo: Dict[str, Any]) -> List[str]:
    normas = []
    for _, lst in (normativa_objetivo or {}).items():
        if isinstance(lst, list):
            normas.extend(lst)
    # unique preserving order
    seen = set()
    out = []
    for n in normas:
        if n not in seen:
            out.append(n)
            seen.add(n)
    return out


# ==========
# LLM chiquito como "planner" y "extractor" (agentic RAG)
# ==========

def _try_parse_json_local(s: str) -> Dict[str, Any]:
    s = (s or "").strip()
    if s.startswith("```"):
        s = re.sub(r"^```(json)?", "", s).strip()
        s = re.sub(r"```$", "", s).strip()
    start = s.find("{")
    end = s.rfind("}")
    if start >= 0 and end >= 0 and end > start:
        s = s[start:end+1]
    return json.loads(s)

def llm_json(system_prompt: str, user_obj: Dict[str, Any], max_tokens: int = 700) -> Dict[str, Any]:
    resp = llm.create_chat_completion(
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": json.dumps(user_obj, ensure_ascii=False)},
        ],
        temperature=0.1,
        max_tokens=max_tokens,
    )
    raw = resp["choices"][0]["message"]["content"]
    try:
        return _try_parse_json_local(raw)
    except Exception:
        # último intento: recortar a bloque JSON
        start = raw.find("{")
        end = raw.rfind("}")
        if start != -1 and end != -1 and end > start:
            return json.loads(raw[start:end+1])
        raise


PLANNER_SYSTEM = """Eres un asistente que SOLO genera queries de búsqueda para un RAG de normas.
Devuelve SOLO JSON válido.

Reglas:
- Para cada norma: 2 a 4 queries MUY específicas (no genéricas).
- Prioriza texto normativo: "debe", "deberá", "obligatorio", "requisito", "registro", "bitácora", "anexo", "sanción".
- Incluye expected_evidence (palabras clave esperadas).
- top_k por norma: 2 o 3 (máximo).
Esquema exacto:
{
  "plans": [
    {"norma": "...", "queries": ["..."], "expected_evidence": ["..."], "top_k": 2}
  ]
}
"""

def plan_queries_for_normas(normas: List[str], payload: Dict[str, Any]) -> Dict[str, Any]:
    payload_small = {
        "obra_tipo": payload.get("obra_tipo"),
        "ubicacion": payload.get("ubicacion"),
        "materiales": [m.get("material_nombre") for m in payload.get("materiales", [])],
        "normas": normas,
    }
    out = llm_json(PLANNER_SYSTEM, payload_small, max_tokens=700)
    plans = {}
    for p in out.get("plans", []):
        n = p.get("norma")
        if n:
            plans[n] = p
    return plans


EXTRACTOR_SYSTEM = """Eres un auditor técnico. Te paso fragmentos (chunks) con metadatos (doc_id/page/chunk_id).
Devuelve SOLO JSON válido.

Objetivo:
- Extraer requisitos/obligaciones aplicables a la obra
- Identificar evidencia usada (copiando doc_id/page/chunk_id)
- Indicar faltantes y preguntas concretas

Reglas:
- Si hay chunks, evidencia_usada NO puede estar vacía.
- En evidencia_usada: incluye extracto <= 200 caracteres.
Esquema:
{
  "norma": "...",
  "hallazgos": ["..."],
  "evidencia_usada": [{"doc_id":"...","page":12,"chunk_id":"...","extracto":"..."}],
  "faltantes": ["..."],
  "preguntas": ["..."]
}
"""

def extract_from_items(norma: str, items: List[Dict[str, Any]], payload: Dict[str, Any]) -> Dict[str, Any]:
    prompt_payload = {
        "norma": norma,
        "obra": {
            "obra_tipo": payload.get("obra_tipo"),
            "ubicacion": payload.get("ubicacion"),
            "materiales": [m.get("material_nombre") for m in payload.get("materiales", [])],
            "co2e_total_kg": payload.get("resumen", {}).get("co2e_total_kg"),
        },
        "chunks": items,
    }
    return llm_json(EXTRACTOR_SYSTEM, prompt_payload, max_tokens=900)


def build_evidence_pack(normas: List[str], payload: Dict[str, Any], max_items_per_norma: int = 4) -> Dict[str, Any]:
    """Planner -> Retriever -> Extractor. Devuelve un paquete compacto para el LLM final."""
    if not normas:
        return {"por_norma": [], "docs_missing": []}

    # doc ids disponibles
    doc_ids_available = set()
    if "rag_pages" in globals() and isinstance(rag_pages, list):
        doc_ids_available = {p.get("doc_id") for p in rag_pages if isinstance(p, dict) and p.get("doc_id")}

    plans = plan_queries_for_normas(normas, payload)

    por_norma = []
    docs_missing = []

    for norma in normas:
        plan = plans.get(norma, {
            "norma": norma,
            "queries": [f"{norma} requisitos obligatorios evidencia documental debe deberá anexo registro bitácora sanción"],
            "expected_evidence": [],
            "top_k": 2
        })

        filter_doc = [norma] if norma in doc_ids_available else None
        if filter_doc is None:
            docs_missing.append(norma)

        pool = []
        seen = set()

        for q in (plan.get("queries") or [])[:4]:
            items = retrieve_context_items(q, top_k=int(plan.get("top_k", 2)), filter_doc_ids=filter_doc)
            for it in items:
                if it["chunk_id"] not in seen:
                    seen.add(it["chunk_id"])
                    # recorta texto por seguridad
                    it2 = dict(it)
                    it2["text"] = (it2.get("text") or "")[:1200]
                    pool.append(it2)

        pool = sorted(pool, key=lambda x: x.get("score", 0.0), reverse=True)[:max_items_per_norma]

        if pool:
            extracted = extract_from_items(norma, pool, payload)
        else:
            extracted = {
                "norma": norma,
                "hallazgos": [],
                "evidencia_usada": [],
                "faltantes": [
                    "No se encontró evidencia en el RAG para esta norma (PDF no disponible, páginas no seleccionadas, o queries insuficientes)."
                ],
                "preguntas": [f"¿Puedes proporcionar el PDF o indicar páginas/secciones clave de {norma}?"]
            }

        por_norma.append(extracted)

    return {"por_norma": por_norma, "docs_missing": list(dict.fromkeys(docs_missing))}


# ==========
# Ejecución: generar EVIDENCE_PACK y luego el JSON final
# ==========

normas_objetivo = extract_normas(payload.get("normativa_objetivo", {}))
print("Normas objetivo:", normas_objetivo)

evidence_pack = build_evidence_pack(normas_objetivo, payload, max_items_per_norma=4)
print("Docs missing:", evidence_pack.get("docs_missing"))
print("Por norma:", len(evidence_pack.get("por_norma", [])))

today_str = date.today().isoformat()

# Payload reducido para ahorrar tokens
payload_llm = {
    "obra_id": payload.get("obra_id"),
    "obra_tipo": payload.get("obra_tipo"),
    "obra_descripcion": payload.get("obra_descripcion"),
    "ubicacion": payload.get("ubicacion"),
    "normativa_objetivo": payload.get("normativa_objetivo"),
    "materiales": [
        {
            "material_nombre": m.get("material_nombre"),
            "categoria": m.get("categoria"),
            "cantidad": m.get("cantidad"),
            "unidad": m.get("unidad"),
            "co2e_kg": m.get("co2e_kg"),
            "factor_emision_kgco2e_por_unidad": m.get("factor_emision_kgco2e_por_unidad"),
            "proveedor": m.get("proveedor", {}),
        }
        for m in (payload.get("materiales") or [])
    ],
    "resumen": {
        "co2e_total_kg": (payload.get("resumen") or {}).get("co2e_total_kg"),
        "notas": (payload.get("resumen") or {}).get("notas", []),
    },
}

USER_PROMPT = f"""
INPUT_JSON_OBRA (reducido):
{json.dumps(payload_llm, ensure_ascii=False, indent=2)}

EVIDENCE_PACK (RAG resumido con citas doc/page/chunk):
{json.dumps(evidence_pack, ensure_ascii=False, indent=2)}

{OUTPUT_SCHEMA_HINT}

Hoy es: {today_str}
"""

resp = llm.create_chat_completion(
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": USER_PROMPT},
    ],
    temperature=0.1,
    max_tokens=1800,
)

raw = resp["choices"][0]["message"]["content"]
raw[:600]


Normas objetivo: ['RENE', 'ISO_14064-1', 'ISO_14067', 'GHG_Protocol_Scope3', 'NACDMX-007-RNAT-2019', 'NADF-018-AMBT-2009', 'Ley_Residuos_Solidos_CDMX', 'Reglamento_Construcciones_CDMX', 'NOM-001-SEDE-2012']


JSONDecodeError: Expecting ',' delimiter: line 42 column 6 (char 1800)

In [None]:
def try_parse_json(s: str) -> Dict[str, Any]:
    # intenta detectar el primer bloque JSON
    s = s.strip()
    if s.startswith("```"):
        s = re.sub(r"^```(json)?", "", s).strip()
        s = re.sub(r"```$", "", s).strip()
    # extraer desde la primera { hasta la última }
    start = s.find("{")
    end = s.rfind("}")
    if start >= 0 and end >= 0:
        s = s[start:end+1]
    return json.loads(s)

def fix_json_with_llm(bad_text: str) -> str:
    fix_prompt = f"""
Arregla la salida para que sea SOLO un JSON válido y que respete exactamente el esquema.
NO agregues texto extra. SOLO JSON.

Salida a corregir:
{bad_text}
"""
    r = llm.create_chat_completion(
        messages=[
            {"role": "system", "content": "Eres un corrector estricto de JSON. Devuelve SOLO JSON válido."},
            {"role": "user", "content": fix_prompt},
        ],
        temperature=0.0,
        max_tokens=1200,
    )
    return r["choices"][0]["message"]["content"]

try:
    out_json = try_parse_json(raw)
except Exception as e:
    print("JSON parse failed, repairing...", e)
    repaired = fix_json_with_llm(raw)
    out_json = try_parse_json(repaired)

out_json


{'obra_id': 'OBRA-001',
 'evaluacion_fecha': '2026-02-01',
 'resultado_global': {'estatus': 'needs_info',
  'riesgo': 'medio',
  'razones': ['Falta información sobre la implementación de RCD y control de polvo según NACDMX-007-RNAT-2019 y NADF-018-AMBT-2009.',
   'Falta información sobre la emisión de CO2 de los materiales según normas ISO_14064-1, ISO_14067 y GHG_Protocol_Scope3.',
   'Falta información sobre la emisión de CO2e de los materiales según normas ISO_14064-1, ISO_14067 y GHG_Protocol_Scope3.']},
 'por_norma': [{'norma': 'RENE',
   'estatus': 'needs_info',
   'evidencia_usada': [],
   'faltantes': ['Implementación de RCD y control de polvo según NACDMX-007-RNAT-2019 y NADF-018-AMBT-2009.'],
   'acciones_recomendadas': ['Revisar la implementación de RCD y control de polvo según NACDMX-007-RNAT-2019 y NADF-018-AMBT-2009.']},
  {'norma': 'ISO_14064-1',
   'estatus': 'needs_info',
   'evidencia_usada': [],
   'faltantes': ['Emisión de CO2 de los materiales según normas ISO_1406

In [None]:
# Asegurar obra_id y fecha si el modelo no los pone bien
out_json["obra_id"] = payload.get("obra_id", out_json.get("obra_id"))
out_json["evaluacion_fecha"] = out_json.get("evaluacion_fecha") or today_str

OUTPUT_JSON_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_JSON_PATH, "w", encoding="utf-8") as f:
    json.dump(out_json, f, ensure_ascii=False, indent=2)

print("Saved:", OUTPUT_JSON_PATH)


Saved: /home/harielpadillasanchez/Documentos/hackathon/Tec_environmental_risk/Data/output_LLM.json


In [None]:
assert "resultado_global" in out_json
assert "por_norma" in out_json
assert isinstance(out_json["por_norma"], list)

print("Estatus global:", out_json["resultado_global"]["estatus"])
print("Normas evaluadas:", len(out_json["por_norma"]))
print("Preguntas:", len(out_json.get("preguntas_para_completar", [])))


Estatus global: needs_info
Normas evaluadas: 3
Preguntas: 15
