## Uso del PDF de capa de aplicación CAN

Si cuentas con un PDF que define la capa de aplicación de la red CAN del vehículo, úsalo para construir `definiciones_dbc` cuando no dispongas de un `.dbc`:

- Extrae por señal: factor (scale), offset, unidad y una breve descripción/comentario.
- Vuelca esa información a un JSON con formato:

```
{
  "Velocidad_Vehiculo": {"factor": 1.0, "offset": 0.0, "unit": "km/h", "comment": "Velocidad del vehículo"},
  "Pedal_Freno": {"factor": 1.0, "offset": 0.0, "unit": "%", "comment": "Posición pedal freno"}
}
```

Luego, carga con `cargar_definiciones_dbc(path_json_def=...)` y el flujo aplicará los factores/offset y usará las unidades y comentarios en las descripciones.


# Ingeniería de Requerimientos LLM RAG — Detección de Eventos CAN y Descripciones Multi-señal

Este notebook extiende el flujo previo para:
- Integración profunda con definiciones DBC (factor, offset, unidad, comentario)
- Reemplazo de segmentación fija por detección de eventos basados en reglas
- Generación de descripciones multi-señal con correlaciones
- Metadatos mejorados para indexación/búsqueda en RAG
- Optimización por streaming al leer BLF cuando sea posible.

In [20]:
# Instalación opcional (solo si hace falta) y librerías
# Nota: Ejecuta esta celda si tu entorno no tiene estas dependencias.
try:
    import pandas as pd
    import numpy as np
except Exception:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pandas", "numpy"])  # noqa
    import pandas as pd
    import numpy as np

# Librerías opcionales para CAN/DBC
try:
    import can  # python-can
except Exception:
    can = None

try:
    import cantools  # para parsear DBC (opcional pero recomendado)
except Exception:
    cantools = None

from dataclasses import dataclass, asdict
from typing import List, Tuple, Dict, Optional, Any
import json
import math
import io

# Utilidades genéricas
def _safe_pct_change(arr: np.ndarray) -> float:
    if arr.size < 2:
        return 0.0
    a, b = float(arr[0]), float(arr[-1])
    if a == 0:
        return float("inf") if b != 0 else 0.0
    return (b - a) / abs(a)


def _coef_variacion(arr: np.ndarray) -> float:
    if arr.size == 0:
        return 0.0
    m = float(np.mean(arr))
    s = float(np.std(arr))
    return 0.0 if m == 0 else s / abs(m)


In [21]:
# Carga de definiciones DBC (factor, offset, unidad, comentario)
# Devuelve un diccionario: { signal_name: {"factor": float, "offset": float, "unit": str, "comment": str} }

# Hacer esta celda auto-contenida para evitar NameError si se ejecuta primero
from typing import Optional, Dict, Any, List
try:
    import json
except Exception:
    import json  # aseguramos disponibilidad
try:
    import cantools  # para parsear DBC (opcional)
except Exception:
    cantools = None


def cargar_definiciones_dbc(path_dbc: Optional[str] = None,
                            path_json_def: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
    """
    Carga definiciones de señales desde un archivo .dbc (vía cantools) o
    desde un JSON ya estructurado. Si ambos son None, devuelve {}.
    """
    definiciones: Dict[str, Dict[str, Any]] = {}

    # Prioridad 1: JSON ya preparado
    if path_json_def:
        with open(path_json_def, "r", encoding="utf-8") as f:
            data = json.load(f)
        # Validar formato mínimo
        if isinstance(data, dict):
            for sig, meta in data.items():
                definiciones[str(sig)] = {
                    "factor": float(meta.get("factor", 1.0)),
                    "offset": float(meta.get("offset", 0.0)),
                    "unit": str(meta.get("unit", "")),
                    "comment": str(meta.get("comment", "")),
                }
        return definiciones

    # Prioridad 2: DBC si cantools está disponible
    if path_dbc and cantools is not None:
        db = cantools.database.load_file(path_dbc)
        # Recorremos señales en todos los mensajes
        for msg in db.messages:
            for sig in msg.signals:
                definiciones[sig.name] = {
                    "factor": float(getattr(sig, "scale", 1.0) or 1.0),
                    "offset": float(getattr(sig, "offset", 0.0) or 0.0),
                    "unit": str(getattr(sig, "unit", "") or ""),
                    "comment": str(getattr(sig, "comment", "") or ""),
                }
        return definiciones

    return definiciones


def cargar_definiciones_dbc_multiples(paths_dbc: List[str]) -> Dict[str, Dict[str, Any]]:
    """Carga y fusiona definiciones desde múltiples DBC. La primera aparición gana,
    salvo que una posterior aporte unit/comment no vacíos.
    """
    defs: Dict[str, Dict[str, Any]] = {}
    if not paths_dbc or cantools is None:
        return defs
    for path in paths_dbc:
        try:
            db = cantools.database.load_file(path)
            for msg in db.messages:
                for sig in msg.signals:
                    nuevo = {
                        "factor": float(getattr(sig, "scale", 1.0) or 1.0),
                        "offset": float(getattr(sig, "offset", 0.0) or 0.0),
                        "unit": str(getattr(sig, "unit", "") or ""),
                        "comment": str(getattr(sig, "comment", "") or ""),
                    }
                    if sig.name not in defs:
                        defs[sig.name] = nuevo
                    else:
                        # Completar unit/comment si estaban vacíos
                        if not defs[sig.name].get("unit") and nuevo.get("unit"):
                            defs[sig.name]["unit"] = nuevo["unit"]
                        if not defs[sig.name].get("comment") and nuevo.get("comment"):
                            defs[sig.name]["comment"] = nuevo["comment"]
        except Exception:
            continue
    return defs


def cargar_definiciones_combinadas(paths_dbc: List[str],
                                   path_json_def: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
    """Fusiona definiciones de múltiples DBC con un JSON opcional.
    Estrategia: DBCs primero, luego JSON sobreescribe/completa.
    """
    defs = cargar_definiciones_dbc_multiples(paths_dbc)
    if path_json_def:
        try:
            with open(path_json_def, "r", encoding="utf-8") as f:
                data = json.load(f)
            if isinstance(data, dict):
                for sig, meta in data.items():
                    defs[sig] = {
                        "factor": float(meta.get("factor", defs.get(sig, {}).get("factor", 1.0))),
                        "offset": float(meta.get("offset", defs.get(sig, {}).get("offset", 0.0))),
                        "unit": str(meta.get("unit", defs.get(sig, {}).get("unit", ""))),
                        "comment": str(meta.get("comment", defs.get(sig, {}).get("comment", ""))),
                    }
        except Exception:
            pass
    return defs


def normalizar_nombre_senal(nombre: str) -> str:
    return nombre.strip().replace(" ", "_")


In [22]:
# Dataclasses ampliadas para metadatos y entradas JSONL

@dataclass
class CANEventMetadata:
    indice_segmento: int
    red_can: Optional[str]
    senales_involucradas: List[str]
    analisis_estadistico: Dict[str, Dict[str, float]]  # por señal
    correlaciones_detectadas: Optional[List[Dict[str, Any]]] = None
    tipo_evento: Optional[str] = None
    tipo_evento_complejo: Optional[str] = None
    comentarios_dbc: Optional[Dict[str, str]] = None  # por señal


@dataclass
class RAGDatasetEntry:
    id_evento: str
    descripcion_textual: str
    descripcion_tecnica: str
    metadatos: CANEventMetadata
    fuente: Optional[str] = None  # ruta BLF/CSV u origen

    def to_jsonl_entry(self) -> str:
        payload = {
            "id": self.id_evento,
            "text": self.descripcion_textual,
            "technical_description": self.descripcion_tecnica,
            "metadata": asdict(self.metadatos),
            "source": self.fuente,
        }
        return json.dumps(payload, ensure_ascii=False)


In [23]:
# Detección de eventos basada en reglas sobre columnas disponibles del DataFrame
# Devuelve lista de (indice_inicio, indice_fin) en términos de índices del DataFrame
from typing import Iterable


def detectar_segmentos_eventos(df_red: pd.DataFrame,
                               min_duracion: int = 15,
                               columnas_umbral: Optional[Dict[str, Dict[str, float]]] = None) -> List[Tuple[int, int]]:
    """
    Detecta segmentos operativos significativos usando reglas: si alguna condición se activa
    (p. ej. Velocidad_Vehiculo > 5, Pedal_Freno > 10, Corriente_Carga != 0) se considera dentro de evento.

    columnas_umbral: {col_name: {"op": ">|>=|<|<=|!=|==", "valor": float}}
    min_duracion: longitud mínima (en número de filas) para considerar un segmento.
    """
    if df_red.empty:
        return []

    # Reglas por defecto (adáptalas a tus nombres reales de columnas)
    default_rules = {
        "Velocidad_Vehiculo": {"op": ">", "valor": 5.0},
        "Pedal_Freno": {"op": ">", "valor": 10.0},
        "Corriente_Carga": {"op": "!=", "valor": 0.0},
        "Torque_Motor_Nm": {"op": ">", "valor": 5.0},
    }
    rules = columnas_umbral or default_rules

    # Construye máscara booleana por fila: True si alguna regla se cumple
    mask = np.zeros(len(df_red), dtype=bool)
    for col, spec in rules.items():
        if col in df_red.columns:
            op = spec.get("op", ">")
            val = spec.get("valor", 0.0)
            serie = pd.to_numeric(df_red[col], errors="coerce")
            if op == ">":
                m = serie > val
            elif op == ">=":
                m = serie >= val
            elif op == "<":
                m = serie < val
            elif op == "<=":
                m = serie <= val
            elif op == "!=":
                m = serie != val
            elif op == "==":
                m = serie == val
            else:
                m = serie > val
            mask = mask | m.fillna(False).values

    # Extraer segmentos contiguos True de longitud >= min_duracion
    segmentos: List[Tuple[int, int]] = []
    en_evento = False
    ini = 0
    for i, v in enumerate(mask):
        if v and not en_evento:
            en_evento = True
            ini = i
        elif not v and en_evento:
            fin = i
            if fin - ini >= min_duracion:
                segmentos.append((ini, fin))
            en_evento = False
    # Cierre si termina en True
    if en_evento:
        fin = len(mask)
        if fin - ini >= min_duracion:
            segmentos.append((ini, fin))

    return segmentos


# Construcción automática de reglas desde DBC + DF
import numpy as np

def construir_reglas_automaticas(df: pd.DataFrame,
                                 definiciones_dbc: Dict[str, Dict[str, Any]],
                                 percentil_activacion: float = 60.0) -> Dict[str, Dict[str, float]]:
    """
    Genera reglas de detección usando unidades/comentarios del DBC y estadísticas del DF.
    Heurísticas:
    - Velocidad: unit ~ km/h|m/s|mph o nombre con speed/velocidad -> op '>' p60
    - Freno/Pedal: nombre con brake/freno/pedal -> umbral 0.2 si max<=1.5, si no 10
    - Torque: unit ~ Nm o nombre con torque -> op '>' p60
    - Corriente: unit ~ A|Amp -> op '!=' 0 (si hay variación), si no '>' p60
    """
    if df is None or df.empty:
        return {}
    rules: Dict[str, Dict[str, float]] = {}

    def pctl(col):
        s = pd.to_numeric(df[col], errors='coerce').dropna()
        return float(np.percentile(s, percentil_activacion)) if len(s) else 0.0

    def find_candidates(pred):
        # Buscar por DBC (unit/comment) y por nombre de columna
        cands = []
        for sig, meta in (definiciones_dbc or {}).items():
            unit = str(meta.get('unit', '')).lower()
            comment = str(meta.get('comment', '')).lower()
            if pred(sig.lower(), unit, comment):
                # Si la columna existe en df (normalizada o no), úsala
                if sig in df.columns:
                    cands.append(sig)
                else:
                    alt = normalizar_nombre_senal(sig)
                    if alt in df.columns:
                        cands.append(alt)
        # Añadir por nombres de columnas
        for col in df.columns:
            cl = col.lower()
            if pred(cl, cl, cl) and col not in cands:
                cands.append(col)
        return cands

    # Predicados
    is_speed   = lambda name, unit, comm: (('km/h' in unit) or ('m/s' in unit) or ('mph' in unit)
                                           or ('speed' in name) or ('velocidad' in name))
    is_brake   = lambda name, unit, comm: (('brake' in name) or ('freno' in name) or ('pedal' in name))
    is_torque  = lambda name, unit, comm: (('nm' in unit) or ('torque' in name))
    is_current = lambda name, unit, comm: (('a' == unit.strip()) or ('amp' in unit) or ('corriente' in name) or ('current' in name) or ('ibatt' in name))

    # Velocidad
    speed_cols = find_candidates(is_speed)
    if speed_cols:
        col = speed_cols[0]
        rules[col] = {"op": ">", "valor": round(pctl(col), 3)}

    # Freno/Pedal
    brake_cols = find_candidates(is_brake)
    if brake_cols:
        col = brake_cols[0]
        s = pd.to_numeric(df[col], errors='coerce').dropna()
        mx = float(s.max()) if len(s) else 0.0
        rules[col] = {"op": ">", "valor": 0.2 if mx <= 1.5 else 10.0}

    # Torque
    torq_cols = find_candidates(is_torque)
    if torq_cols:
        col = torq_cols[0]
        rules[col] = {"op": ">", "valor": round(pctl(col), 3)}

    # Corriente
    curr_cols = find_candidates(is_current)
    if curr_cols:
        col = curr_cols[0]
        s = pd.to_numeric(df[col], errors='coerce').fillna(0)
        if (s != 0).any():
            rules[col] = {"op": "!=", "valor": 0.0}
        else:
            rules[col] = {"op": ">", "valor": round(pctl(col), 3)}

    return rules


In [24]:
# Generador de descripciones (señal y multi-señal) con integración DBC

class GeneradorDescripcionesTextual:
    def __init__(self, definiciones_dbc: Dict[str, Dict[str, Any]]):
        self.definiciones_dbc = definiciones_dbc or {}

    def _aplicar_factor_offset(self, senal: str, serie: pd.Series) -> Tuple[pd.Series, Dict[str, Any]]:
        info = self.definiciones_dbc.get(senal, {})
        factor = float(info.get("factor", 1.0))
        offset = float(info.get("offset", 0.0))
        unit = str(info.get("unit", ""))
        comment = str(info.get("comment", ""))
        serie_fisica = serie.astype(float) * factor + offset
        return serie_fisica, {"factor": factor, "offset": offset, "unit": unit, "comment": comment}

    def analizar_serie_temporal_blf(self, serie: pd.Series, unidad: str = "") -> Dict[str, float]:
        arr = serie.astype(float).values
        return {
            "mean": float(np.mean(arr)) if arr.size else 0.0,
            "min": float(np.min(arr)) if arr.size else 0.0,
            "max": float(np.max(arr)) if arr.size else 0.0,
            "std": float(np.std(arr)) if arr.size else 0.0,
            "coef_variacion": _coef_variacion(arr),
            "cambio_porcentual": _safe_pct_change(arr),
            "unidad": unidad,
        }

    def generar_descripcion_senal(self,
                                  signal_name: str,
                                  serie: pd.Series,
                                  red_can: Optional[str] = None) -> Tuple[str, str, Dict[str, float], Dict[str, Any]]:
        """
        Integra DBC: aplica factor/offset y usa unidad y comment en las plantillas.
        Retorna: (descripcion_textual, descripcion_tecnica, analisis_estadistico, info_dbc)
        """
        serie_fisica, info = self._aplicar_factor_offset(signal_name, serie)
        stats = self.analizar_serie_temporal_blf(serie_fisica, unidad=info.get("unit", ""))

        # Plantilla textual básica por señal
        desc = (
            f"Señal {signal_name} en red {red_can or 'CAN'} muestra valor medio {stats['mean']:.2f} {stats['unidad']} "
            f"(min {stats['min']:.2f}, max {stats['max']:.2f}). "
            f"Cambio porcentual={stats['cambio_porcentual']:.2f}, CV={stats['coef_variacion']:.2f}."
        )

        # Descripción técnica ampliada con comentario DBC
        tech = (
            f"Unidad={stats['unidad']}; factor={info.get('factor')}, offset={info.get('offset')}. "
            f"Comentario DBC: {info.get('comment', '')}"
        )

        return desc, tech, stats, info

    def generar_descripcion_evento_multisenal(self,
                                              segmento_df: pd.DataFrame,
                                              senales_relevantes: List[str],
                                              matriz_correlacion: pd.DataFrame,
                                              red_can: Optional[str],
                                              indice_segmento: int) -> Tuple[str, str, Dict[str, Dict[str, float]], Dict[str, str], List[Dict[str, Any]]]:
        """
        Crea descripciones ricas considerando varias señales y sus correlaciones.
        Retorna: (desc_textual, desc_tecnica, analisis_por_senal, comentarios_dbc, correlaciones_detectadas)
        """
        analisis_por_senal: Dict[str, Dict[str, float]] = {}
        comentarios_dbc: Dict[str, str] = {}
        piezas_texto: List[str] = []

        # Calcular análisis por señal con unidades físicas
        for s in senales_relevantes:
            if s in segmento_df.columns:
                d, t, stats, info = self.generar_descripcion_senal(s, segmento_df[s], red_can)
                analisis_por_senal[s] = stats
                if info.get("comment"):
                    comentarios_dbc[s] = info["comment"]
                piezas_texto.append(f"{s}: mean={stats['mean']:.2f}{stats['unidad']} (Δ%={stats['cambio_porcentual']:.2f})")

        # Extraer correlaciones significativas
        correlaciones_detectadas: List[Dict[str, Any]] = []
        if matriz_correlacion is not None and not matriz_correlacion.empty:
            umbral = 0.6
            for a in senales_relevantes:
                for b in senales_relevantes:
                    if a < b and a in matriz_correlacion.index and b in matriz_correlacion.columns:
                        r = float(matriz_correlacion.loc[a, b])
                        if abs(r) >= umbral and not math.isnan(r):
                            correlaciones_detectadas.append({"senal_a": a, "senal_b": b, "r": r})

        # Texto principal del evento
        titulo = f"Evento {indice_segmento} — Red {red_can or 'CAN'}"
        resumen_senales = ", ".join(piezas_texto)
        resumen_corr = "; ".join([f"{c['senal_a']}~{c['senal_b']} (r={c['r']:.2f})" for c in correlaciones_detectadas])

        desc_textual = (
            f"{titulo}: Se observaron variaciones en {len(senales_relevantes)} señales clave. "
            f"Principales: {resumen_senales}. "
            + (f"Correlaciones destacadas: {resumen_corr}." if resumen_corr else "")
        )

        desc_tecnica = (
            "Descripción multi-señal con correlaciones y unidades físicas. "
            f"Señales consideradas: {', '.join(senales_relevantes)}."
        )

        return desc_textual, desc_tecnica, analisis_por_senal, comentarios_dbc, correlaciones_detectadas


In [25]:
# Clasificación de metadatos basada en estadísticas y señales involucradas

class GeneradorMetadatosCompleto:
    def clasificar_evento_inteligente(self,
                                      analisis_estadistico: Dict[str, Dict[str, float]],
                                      senales_involucradas: List[str]) -> Tuple[Optional[str], Optional[str]]:
        """
        Retorna (tipo_evento simple, tipo_evento_complejo) basado en heurísticas de
        cambio_porcentual y coef_variacion.
        """
        # Heurísticas simples de ejemplo — adapta a tu dominio
        tipo_evento = None
        tipo_evento_complejo = None

        # Señales clave si existen
        v = analisis_estadistico.get("Velocidad_Vehiculo", {})
        rpm = analisis_estadistico.get("Velocidad_Motor_RPM", {})
        i_batt = analisis_estadistico.get("Corriente_Bateria", {})
        torque = analisis_estadistico.get("Torque_Motor_Nm", {})
        freno = analisis_estadistico.get("Pedal_Freno", {})

        # Aceleración fuerte: cambio % alto en velocidad o rpm + torque alto
        if (v.get("cambio_porcentual", 0) > 0.3 or rpm.get("cambio_porcentual", 0) > 0.3) and torque.get("mean", 0) > 20:
            tipo_evento = "Aceleracion_Fuerte"
            # Regeneración si corriente batería negativa (recuperando) y CV alto
            if i_batt.get("mean", 0) < -5 and i_batt.get("coef_variacion", 0) > 0.2:
                tipo_evento_complejo = "Aceleracion_Fuerte_Regeneracion"

        # Frenado: pedal freno alto y caída de velocidad
        if freno.get("mean", 0) > 10 and v.get("cambio_porcentual", 0) < -0.2:
            tipo_evento = tipo_evento or "Frenado"

        # Marcha lenta: velocidad baja y CV bajo en rpm
        if v.get("mean", 0) < 2 and rpm.get("coef_variacion", 1) < 0.05:
            tipo_evento = tipo_evento or "Marcha_Lenta"

        return tipo_evento, tipo_evento_complejo


In [26]:
# Constructor del dataset RAG con detección de eventos y descripciones multi-señal

class ConstructorDatasetRAG:
    def __init__(self,
                 definiciones_dbc: Dict[str, Dict[str, Any]],
                 red_can: Optional[str] = None,
                 top_n_senales: int = 5,
                 detection_rules: Optional[Dict[str, Dict[str, float]]] = None,
                 min_duracion_evento: int = 15):
        self.defs = definiciones_dbc or {}
        self.red_can = red_can
        self.top_n_senales = max(3, int(top_n_senales))
        self.desc_gen = GeneradorDescripcionesTextual(self.defs)
        self.meta_gen = GeneradorMetadatosCompleto()
        self.detection_rules = detection_rules or None
        self.min_duracion_evento = int(min_duracion_evento)

    def _seleccionar_senales_relevantes(self, df_segmento: pd.DataFrame) -> List[str]:
        # Criterio: mayor varianza relativa o mayor cambio % (si la columna es numérica)
        variancias: Dict[str, float] = {}
        cambios: Dict[str, float] = {}
        for col in df_segmento.columns:
            if pd.api.types.is_numeric_dtype(df_segmento[col]):
                arr = df_segmento[col].astype(float).values
                variancias[col] = float(np.var(arr)) if arr.size else 0.0
                cambios[col] = abs(_safe_pct_change(arr))
        # Combinar puntuación simple
        puntaje = {c: (variancias.get(c, 0.0) + cambios.get(c, 0.0)) for c in df_segmento.columns}
        orden = sorted(puntaje.keys(), key=lambda k: puntaje[k], reverse=True)
        # Filtrar señales obvias de tiempo/índice si existen
        orden = [c for c in orden if c.lower() not in ("timestamp", "time", "index")]
        return orden[: self.top_n_senales]

    def generar_evento_can_completo(self,
                                    df_segmento: pd.DataFrame,
                                    indice_segmento: int,
                                    fuente: Optional[str] = None) -> Optional[RAGDatasetEntry]:
        if df_segmento.empty:
            return None
        senales_relevantes = self._seleccionar_senales_relevantes(df_segmento)
        if not senales_relevantes:
            return None

        # Matriz de correlación sobre señales relevantes (usando Pearson por defecto)
        try:
            corr = df_segmento[senales_relevantes].corr().fillna(0.0)
        except Exception:
            corr = pd.DataFrame(np.eye(len(senales_relevantes)), index=senales_relevantes, columns=senales_relevantes)

        desc, tech, analisis_por_senal, comentarios_dbc, correlaciones = (
            self.desc_gen.generar_descripcion_evento_multisenal(
                segmento_df=df_segmento,
                senales_relevantes=senales_relevantes,
                matriz_correlacion=corr,
                red_can=self.red_can,
                indice_segmento=indice_segmento,
            )
        )

        # Clasificación
        tipo_evento, tipo_evento_complejo = self.meta_gen.clasificar_evento_inteligente(
            analisis_estadistico=analisis_por_senal,
            senales_involucradas=senales_relevantes,
        )

        metadatos = CANEventMetadata(
            indice_segmento=indice_segmento,
            red_can=self.red_can,
            senales_involucradas=senales_relevantes,
            analisis_estadistico=analisis_por_senal,
            correlaciones_detectadas=correlaciones,
            tipo_evento=tipo_evento,
            tipo_evento_complejo=tipo_evento_complejo,
            comentarios_dbc=comentarios_dbc,
        )

        entry = RAGDatasetEntry(
            id_evento=f"evento_{indice_segmento}",
            descripcion_textual=desc,
            descripcion_tecnica=tech,
            metadatos=metadatos,
            fuente=fuente,
        )
        return entry

    def construir_dataset_completo(self,
                                   df: pd.DataFrame,
                                   fuente: Optional[str] = None) -> List[RAGDatasetEntry]:
        """
        Reemplaza segmentación fija por detección de eventos.
        df: DataFrame ya decodificado (ideálmente con columnas de señales físicas
            o al menos brutas con factores/offset en self.defs).
        """
        dataset: List[RAGDatasetEntry] = []
        segmentos = detectar_segmentos_eventos(df,
                                               min_duracion=self.min_duracion_evento,
                                               columnas_umbral=self.detection_rules)
        for idx, (ini, fin) in enumerate(segmentos, start=1):
            df_seg = df.iloc[ini:fin].reset_index(drop=True)
            entry = self.generar_evento_can_completo(df_seg, indice_segmento=idx, fuente=fuente)
            if entry is not None:
                dataset.append(entry)
        return dataset


In [27]:
# Escritura JSONL y ejemplo de uso

def escribir_jsonl(entries: List[RAGDatasetEntry], path_salida: str) -> None:
    with open(path_salida, "w", encoding="utf-8") as f:
        for e in entries:
            f.write(e.to_jsonl_entry() + "\n")


# --- Ejemplo de uso (ajusta rutas y columnas a tu caso) ---
# 1) Cargar definiciones DBC (usa path a .dbc o a un JSON con definiciones)
# defs = cargar_definiciones_dbc(path_dbc=r"C:\ruta\a\tu\archivo.dbc")
# o bien
# defs = cargar_definiciones_dbc(path_json_def=r"C:\ruta\a\definiciones_dbc.json")

# 2) Preparar DataFrame decodificado con columnas de señales (ejemplo sintético)
# data = {
#     "timestamp": np.arange(0, 300),
#     "Velocidad_Vehiculo": np.clip(np.linspace(0, 80, 300) + np.random.randn(300)*2, 0, None),
#     "Velocidad_Motor_RPM": np.clip(np.linspace(900, 3000, 300) + np.random.randn(300)*50, 0, None),
#     "Torque_Motor_Nm": np.abs(np.random.randn(300)*10 + 20),
#     "Pedal_Freno": np.concatenate([np.zeros(100), np.ones(50)*20, np.zeros(150)]),
#     "Corriente_Bateria": np.concatenate([np.ones(150)*(-5), np.ones(150)*(10)])
# }
# df = pd.DataFrame(data)

# 3) Construir dataset
# constructor = ConstructorDatasetRAG(definiciones_dbc=defs, red_can="CAN_1", top_n_senales=5)
# entries = constructor.construir_dataset_completo(df, fuente="sintetico")

# 4) Guardar JSONL para RAG/Watson Discovery
# escribir_jsonl(entries, r"C:\ruta\salida\dataset_rag_decode_ev.jsonl")

print("Notebook preparado: carga tus definiciones DBC y DataFrames/BLF para ejecutar el flujo.")


Notebook preparado: carga tus definiciones DBC y DataFrames/BLF para ejecutar el flujo.


In [None]:
# (Opcional Avanzado) Decodificación BLF -> DataFrame por streaming usando cantools + python-can

# Hacer esta celda más robusta si se ejecuta antes de la celda de imports
import pandas as pd
from typing import List, Optional, Set
try:
    import can  # python-can
except Exception:
    can = None
try:
    import cantools
except Exception:
    cantools = None


def decodificar_blf_a_dataframe_stream(path_blf: str,
                                       path_dbc: Optional[str] = None,
                                       max_mensajes: Optional[int] = None) -> pd.DataFrame:
    """
    Decodifica un .blf a DataFrame usando streaming. Requiere cantools y python-can.
    Advertencia: La estructura resultante depende del DBC; señales ausentes no aparecerán.
    """
    if can is None or cantools is None:
        raise RuntimeError("Se requiere python-can y cantools para decodificación BLF.")
    if not path_dbc:
        raise ValueError("Proporciona path_dbc para decodificar por DBC.")

    db = cantools.database.load_file(path_dbc)
    id_to_msg = {m.frame_id: m for m in db.messages}

    filas = []
    cont = 0
    with can.BLFReader(path_blf) as reader:
        for msg in reader:
            if max_mensajes and cont >= max_mensajes:
                break
            cont += 1
            try:
                m = id_to_msg.get(msg.arbitration_id)
                if m is None:
                    continue
                decoded = m.decode(bytes(msg.data))  # dict {signal_name: value}
                decoded_row = {normalizar_nombre_senal(k): v for k, v in decoded.items()}
                decoded_row["timestamp"] = float(msg.timestamp)
                filas.append(decoded_row)
            except Exception:
                # Ignora mensajes no decodificables
                continue
    if not filas:
        return pd.DataFrame()

    df = pd.DataFrame(filas)
    # Orden aproximado por timestamp
    if "timestamp" in df.columns:
        df = df.sort_values("timestamp").reset_index(drop=True)
    return df


def decodificar_blf_a_dataframe_stream_multi(path_blf: str,
                                             paths_dbc: List[str],
                                             max_mensajes: Optional[int] = None) -> pd.DataFrame:
    """
    Variante multi-DBC: carga varias bases y trata de decodificar cada mensaje con la que corresponda.
    Si hay colisiones de frame_id, intenta en orden hasta que alguna decodifique sin error.
    """
    if can is None or cantools is None:
        raise RuntimeError("Se requiere python-can y cantools para decodificación BLF.")
    if not paths_dbc:
        raise ValueError("Proporciona al menos un DBC en paths_dbc.")

    dbs = []
    for p in paths_dbc:
        try:
            dbs.append(cantools.database.load_file(p))
        except Exception:
            continue
    if not dbs:
        return pd.DataFrame()

    # Mapa: frame_id -> lista de definiciones de mensaje
    map_msgs = {}
    for db in dbs:
        for m in db.messages:
            map_msgs.setdefault(m.frame_id, []).append(m)

    filas = []
    cont = 0
    with can.BLFReader(path_blf) as reader:
        for msg in reader:
            if max_mensajes and cont >= max_mensajes:
                break
            cont += 1
            lst = map_msgs.get(msg.arbitration_id)
            if not lst:
                continue
            decoded_ok = False
            for m in lst:
                try:
                    decoded = m.decode(bytes(msg.data))
                    decoded_row = {normalizar_nombre_senal(k): v for k, v in decoded.items()}
                    decoded_row["timestamp"] = float(msg.timestamp)
                    filas.append(decoded_row)
                    decoded_ok = True
                    break
                except Exception:
                    continue
            # si ninguna pudo decodificar, saltar
    if not filas:
        return pd.DataFrame()

    df = pd.DataFrame(filas)
    if "timestamp" in df.columns:
        df = df.sort_values("timestamp").reset_index(drop=True)
    return df


def decodificar_blf_a_dataframe_stream_multi_filtrado(path_blf: str,
                                                      paths_dbc: List[str],
                                                      signals_allow: Optional[Set[str]] = None,
                                                      max_mensajes: Optional[int] = None,
                                                      resample_ms: Optional[int] = None) -> pd.DataFrame:
    """
    Igual que la versión multi-DBC, pero sólo conserva señales en signals_allow y permite remuestrear por tiempo.
    Esto evita DataFrames "anchísimos" y reduce memoria.
    - signals_allow: nombres normalizados (normalizar_nombre_senal) a conservar. Si None, no filtra.
    - resample_ms: p.ej. 100 para remuestrear a 10 Hz (media por ventana).
    """
    if can is None or cantools is None:
        raise RuntimeError("Se requiere python-can y cantools para decodificación BLF.")
    if not paths_dbc:
        raise ValueError("Proporciona al menos un DBC en paths_dbc.")

    dbs = []
    for p in paths_dbc:
        try:
            dbs.append(cantools.database.load_file(p))
        except Exception:
            continue
    if not dbs:
        return pd.DataFrame()

    map_msgs = {}
    for db in dbs:
        for m in db.messages:
            map_msgs.setdefault(m.frame_id, []).append(m)

    filas = []
    cont = 0
    with can.BLFReader(path_blf) as reader:
        for msg in reader:
            if max_mensajes and cont >= max_mensajes:
                break
            cont += 1
            lst = map_msgs.get(msg.arbitration_id)
            if not lst:
                continue
            for m in lst:
                try:
                    decoded = m.decode(bytes(msg.data))
                    if signals_allow is not None:
                        decoded_row = {normalizar_nombre_senal(k): v for k, v in decoded.items() if normalizar_nombre_senal(k) in signals_allow}
                        if not decoded_row:
                            # Nada que conservar de este mensaje
                            break
                    else:
                        decoded_row = {normalizar_nombre_senal(k): v for k, v in decoded.items()}
                    decoded_row["timestamp"] = float(msg.timestamp)
                    filas.append(decoded_row)
                    break
                except Exception:
                    continue
    if not filas:
        return pd.DataFrame()

    df = pd.DataFrame(filas)
    if "timestamp" in df.columns:
        df = df.sort_values("timestamp").reset_index(drop=True)

    # Remuestreo opcional por tiempo
    if resample_ms and not df.empty and "timestamp" in df.columns:
        try:
            ts = pd.to_datetime(df["timestamp"], unit="s")
            df = df.drop(columns=[c for c in df.columns if c == "timestamp"]).set_index(ts)
            # Sólo numéricas para agregación
            num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
            df = df[num_cols]
            df = df.resample(f"{int(resample_ms)}ms").mean().ffill().reset_index()
            df.rename(columns={"index": "timestamp"}, inplace=True)
            df["timestamp"] = df["timestamp"].astype("int64") / 1e9
        except Exception:
            # Si falla el remuestreo, devuelve sin remuestrear
            df = pd.DataFrame(filas)
            if "timestamp" in df.columns:
                df = df.sort_values("timestamp").reset_index(drop=True)

    return df

In [31]:
# Diagnóstico rápido de reglas de eventos y sugerencias de umbrales
import numpy as np
import pandas as pd
from typing import Dict, Optional, Tuple, List

OPERADORES = {
    '>':  lambda s, v: s > v,
    '>=': lambda s, v: s >= v,
    '<':  lambda s, v: s < v,
    '<=': lambda s, v: s <= v,
    '!=': lambda s, v: s != v,
    '==': lambda s, v: s == v,
}

def diagnosticar_reglas_eventos(df: pd.DataFrame,
                                reglas: Dict[str, Dict[str, float]],
                                min_duracion: int = 15) -> None:
    if df is None or df.empty:
        print("DF vacío: no hay datos para diagnosticar.")
        return
    print(f"Filas: {len(df)} | Columnas: {len(df.columns)}")
    presentes = [c for c in reglas.keys() if c in df.columns]
    ausentes = [c for c in reglas.keys() if c not in df.columns]
    print("Reglas con columnas presentes:", presentes or '<ninguna>')
    if ausentes:
        print("Reglas con columnas AUSENTES (no se aplican):", ausentes)
    mask_total = np.zeros(len(df), dtype=bool)
    for col, spec in reglas.items():
        if col not in df.columns:
            continue
        op = spec.get('op', '>')
        val = spec.get('valor', 0.0)
        serie = pd.to_numeric(df[col], errors='coerce').fillna(np.nan)
        try:
            m = OPERADORES.get(op, OPERADORES['>'])(serie, val)
        except Exception:
            m = OPERADORES['>'](serie, val)
        true_count = int(np.nansum(m.values))
        ratio = true_count / max(1, len(serie))
        print(f" - {col} {op} {val}: {true_count} filas verdaderas ({ratio:.1%})")
        mask_total |= m.fillna(False).values
    total_true = int(mask_total.sum())
    print(f"Filas que cumplen AL MENOS una regla: {total_true} ({total_true/max(1,len(df)):.1%})")
    # Extraer segmentos
    segs: List[Tuple[int,int]] = []
    en_evento = False
    ini = 0
    for i, v in enumerate(mask_total):
        if v and not en_evento:
            en_evento = True
            ini = i
        elif not v and en_evento:
            fin = i
            if fin - ini >= min_duracion:
                segs.append((ini, fin))
            en_evento = False
    if en_evento:
        fin = len(mask_total)
        if fin - ini >= min_duracion:
            segs.append((ini, fin))
    print(f"Segmentos detectados (min_duracion={min_duracion} filas): {len(segs)}")
    if segs:
        for k,(a,b) in enumerate(segs[:10],1):
            print(f"   [{k}] {a}-{b} (duración {b-a} filas)")
        if len(segs) > 10:
            print(f"   ... y {len(segs)-10} más")


def sugerir_reglas_basicas(df: pd.DataFrame) -> Dict[str, Dict[str, float]]:
    """Intenta adivinar columnas típicas y proponer umbrales iniciales.
    Usa heurísticas por nombre y percentiles/escala.
    """
    cols = list(df.columns)
    def pick(cands: List[str]) -> Optional[str]:
        for c in cols:
            cl = c.lower()
            if any(k in cl for k in cands):
                return c
        return None
    col_speed   = pick(['velocidad', 'vehicle_speed', 'veh_speed', 'speed'])
    col_brake   = pick(['freno', 'brake'])
    col_torque  = pick(['torque'])
    col_current = pick(['corriente', 'current', 'ibatt', 'battery_current'])

    reglas: Dict[str, Dict[str, float]] = {}
    def pctl(col, p):
        s = pd.to_numeric(df[col], errors='coerce').dropna()
        return float(np.percentile(s, p)) if len(s) else 0.0

    if col_speed:
        # Umbral en el percentil 60 para activar eventos de movimiento
        reglas[col_speed] = {'op': '>', 'valor': round(pctl(col_speed, 60), 3)}
    if col_brake:
        s = pd.to_numeric(df[col_brake], errors='coerce').dropna()
        if len(s):
            mx = float(s.max())
            # Si parece 0..1, umbral 0.2; si 0..100, umbral 10
            umbral = 0.2 if mx <= 1.5 else 10.0
        else:
            umbral = 10.0
        reglas[col_brake] = {'op': '>', 'valor': umbral}
    if col_torque:
        reglas[col_torque] = {'op': '>', 'valor': round(pctl(col_torque, 60), 3)}
    if col_current:
        # Considerar actividad cuando corriente != 0
        reglas[col_current] = {'op': '!=', 'valor': 0.0}

    return reglas

# Ejemplo de uso (descomentando):
# df_test = decodificar_blf_a_dataframe_stream_multi(PATHS_BLF[0], PATHS_DBC, max_mensajes=50000)
# reglas_auto = sugerir_reglas_basicas(df_test)
# print('Reglas sugeridas:', reglas_auto)
# diagnosticar_reglas_eventos(df_test, reglas_auto, min_duracion=10)


## Selección interactiva de archivos DBC/JSON y BLF

En las siguientes celdas se seleccionan:
- Uno o varios archivos DBC y/o un JSON de definiciones (usa uno o ambos según tu caso).
- Uno o varios archivos BLF (o una carpeta con muchos BLF).
- La ruta de salida del JSONL para cargar en el RAG de IBM.

Si la ventana de selección no se abre en tu entorno, el flujo te pedirá las rutas por consola. Para múltiples rutas, ase deben separar con punto y coma (;).

In [16]:
# Prueba rápida de Tkinter (opcional) para verificar que los diálogos se pueden abrir en tu entorno
# Ejecuta esta celda: debería abrir un cuadro de diálogo de archivo. Si no aparece, verás un diagnóstico.
import sys
import platform

try:
    import tkinter as tk
    from tkinter import filedialog
    _tk_loaded = True
except Exception as e:
    _tk_loaded = False
    print("tkinter no está disponible en este intérprete.")
    print("Detalle:", repr(e))

if _tk_loaded:
    try:
        root = tk.Tk()
        try:
            root.withdraw()
            # Forzar al frente
            root.attributes('-topmost', True)
            root.lift()
            root.update()
        except Exception:
            pass
        print(f"Python: {sys.version.split()[0]} | SO: {platform.system()} | Tk version: {root.tk.call('info','patchlevel')}")
        print("Abriendo un diálogo de prueba... (puede aparecer detrás de otras ventanas)")
        try:
            path = filedialog.askopenfilename(title="Prueba de diálogo de archivo")
            print("Selección:", path or "<cancelado>")
        finally:
            try:
                root.destroy()
            except Exception:
                pass
    except Exception as e:
        print("Fallo al abrir el diálogo de Tkinter.")
        print("Sugerencias: 1) Cambia de intérprete Python a uno con Tk instalado, 2) Ejecuta VS Code fuera de servidor remoto, 3) Usa el fallback por consola.")
        print("Detalle:", repr(e))


Python: 3.13.7 | SO: Windows | Tk version: 8.6.15
Abriendo un diálogo de prueba... (puede aparecer detrás de otras ventanas)
Selección: <cancelado>
Selección: <cancelado>


In [17]:
# Utilidades para seleccionar archivos con diálogo (tkinter) o por consola
import os
from pathlib import Path
import sys
import platform

# Intentar usar diálogo gráfico
try:
    import tkinter as tk
    from tkinter import filedialog
    _TK_OK = True
except Exception:
    _TK_OK = False


def _with_root():
    """Crea una raíz Tk configurada para aparecer en primer plano."""
    root = tk.Tk()
    try:
        root.withdraw()
        root.attributes('-topmost', True)
        root.lift()
        root.update()
    except Exception:
        pass
    return root


def _select_file_dialog(title: str, filetypes=(('All files', '*.*'),)) -> str:
    if not _TK_OK:
        print("tkinter no disponible; usando entrada por consola.")
        return ""
    try:
        root = _with_root()
        path = filedialog.askopenfilename(title=title, filetypes=filetypes)
        try:
            root.destroy()
        except Exception:
            pass
        return path or ""
    except Exception as e:
        print("No se pudo abrir el diálogo (Tkinter). Detalle:", repr(e))
        return ""


def _select_files_dialog(title: str, filetypes=(('All files', '*.*'),)) -> list:
    if not _TK_OK:
        print("tkinter no disponible; usando entrada por consola.")
        return []
    try:
        root = _with_root()
        paths = filedialog.askopenfilenames(title=title, filetypes=filetypes)
        try:
            root.destroy()
        except Exception:
            pass
        return list(paths) if paths else []
    except Exception as e:
        print("No se pudo abrir el diálogo múltiple (Tkinter). Detalle:", repr(e))
        return []


def _select_folder_dialog(title: str) -> str:
    if not _TK_OK:
        print("tkinter no disponible; usando entrada por consola.")
        return ""
    try:
        root = _with_root()
        path = filedialog.askdirectory(title=title)
        try:
            root.destroy()
        except Exception:
            pass
        return path or ""
    except Exception as e:
        print("No se pudo abrir el diálogo de carpeta (Tkinter). Detalle:", repr(e))
        return ""


def _save_file_dialog(title: str, defaultextension='.jsonl', filetypes=(('JSONL', '*.jsonl'), ('All files', '*.*'))) -> str:
    if not _TK_OK:
        print("tkinter no disponible; usando entrada por consola.")
        return ""
    try:
        root = _with_root()
        path = filedialog.asksaveasfilename(title=title, defaultextension=defaultextension, filetypes=filetypes)
        try:
            root.destroy()
        except Exception:
            pass
        return path or ""
    except Exception as e:
        print("No se pudo abrir el diálogo de guardado (Tkinter). Detalle:", repr(e))
        return ""


def _buscar_por_extension(base: Path, patron: str) -> list:
    try:
        return [str(p.resolve()) for p in base.rglob(patron)]
    except Exception:
        return []


def solicitar_rutas_interactivas():
    print("Selecciona uno o más DBC y/o un JSON de definiciones (puedes dejar alguno vacío).")
    print(f"Entorno: Python {sys.version.split()[0]} | SO {platform.system()} | Tkinter: {'OK' if _TK_OK else 'NO'}")

    # 1) Selección múltiple de DBC por diálogo
    paths_dbc = _select_files_dialog("Selecciona archivos DBC", filetypes=(("DBC", "*.dbc"), ("All files", "*.*")))

    # 2) Si no hay selección, intentar carpeta con DBC
    if not paths_dbc:
        folder_dbc = _select_folder_dialog("Selecciona carpeta con DBC")
        if folder_dbc:
            paths_dbc = [str(p.resolve()) for p in Path(folder_dbc).rglob("*.dbc")]

    # 3) Fallback por texto (pero sin input() si el frontend no lo soporta)
    paths_dbc = paths_dbc or []

    # Normalizar y quitar duplicados
    norm_dbc = []
    seen_dbc = set()
    for p in paths_dbc:
        q = str(Path(p).resolve())
        if q not in seen_dbc:
            seen_dbc.add(q)
            norm_dbc.append(q)

    # JSON opcional
    path_json_def = _select_file_dialog("Selecciona JSON de definiciones (opcional)", filetypes=(("JSON", "*.json"), ("All files", "*.*")))
    path_json_def = path_json_def or ""

    print("Selecciona uno o más archivos BLF")
    # 1) Intentar selección múltiple por diálogo
    paths_blf = _select_files_dialog("Selecciona archivos BLF", filetypes=(("BLF", "*.blf"), ("All files", "*.*")))

    # 2) Si no hay selección, intentar seleccionar carpeta y buscar *.blf recursivamente
    if not paths_blf:
        folder = _select_folder_dialog("Selecciona carpeta con BLF")
        if folder:
            paths_blf = [str(p.resolve()) for p in Path(folder).rglob("*.blf")]

    # 3) Sin input(): si sigue vacío, el usuario puede usar la celda de widgets de abajo
    paths_blf = paths_blf or []

    # Normalizar y quitar duplicados conservando orden
    norm_blf = []
    seen = set()
    for p in paths_blf:
        q = str(Path(p).resolve())
        if q not in seen:
            seen.add(q)
            norm_blf.append(q)

    print("Selecciona ruta de salida JSONL")
    output_jsonl = _save_file_dialog("Guardar dataset JSONL", defaultextension=".jsonl") or ""

    return norm_dbc, path_json_def, norm_blf, output_jsonl


# Ejecuta para capturar rutas (si el frontend no soporta input(), no bloqueará)
try:
    PATHS_DBC, PATH_JSON_DEF, PATHS_BLF, OUTPUT_JSONL = solicitar_rutas_interactivas()
except Exception as e:
    print("Selección por diálogos no disponible en este entorno. Usa la celda de widgets de abajo.")
    print("Detalle:", repr(e))
    PATHS_DBC, PATH_JSON_DEF, PATHS_BLF, OUTPUT_JSONL = [], "", [], ""

print("DBC seleccionados:", len(PATHS_DBC))
print("JSON definiciones:", PATH_JSON_DEF or "<no especificado>")
print("BLF seleccionados:", len(PATHS_BLF))
print("Salida JSONL:", OUTPUT_JSONL or "<no especificado>")


Selecciona uno o más DBC y/o un JSON de definiciones (puedes dejar alguno vacío).
Entorno: Python 3.13.7 | SO Windows | Tkinter: OK
Selecciona uno o más archivos BLF
Selecciona uno o más archivos BLF
Selecciona ruta de salida JSONL
Selecciona ruta de salida JSONL
DBC seleccionados: 4
JSON definiciones: <no especificado>
BLF seleccionados: 4
Salida JSONL: C:/Users/henry/OneDrive/Documentos/2. Formación/Maestria/Proyecto Integrador/Datos/pREUBA.jsonl
DBC seleccionados: 4
JSON definiciones: <no especificado>
BLF seleccionados: 4
Salida JSONL: C:/Users/henry/OneDrive/Documentos/2. Formación/Maestria/Proyecto Integrador/Datos/pREUBA.jsonl


In [None]:
# Selector por widgets (no bloquea y evita input()): usa este formulario si no aparecen ventanas
import os
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display

# Widgets
w_dbc_text = widgets.Text(
    value=';',
    placeholder='Rutas DBC separadas por ; (opcional)',
    description='DBCs:',
    layout=widgets.Layout(width='95%')
)
w_json_text = widgets.Text(
    value='',
    placeholder='Ruta JSON de definiciones (opcional)',
    description='JSON:',
    layout=widgets.Layout(width='95%')
)
w_blf_text = widgets.Text(
    value=';',
    placeholder='Rutas BLF separadas por ;',
    description='BLFs:',
    layout=widgets.Layout(width='95%')
)
w_out_text = widgets.Text(
    value=str(Path.cwd() / 'dataset_rag_decode_ev.jsonl'),
    placeholder='Ruta de salida JSONL',
    description='Salida:',
    layout=widgets.Layout(width='95%')
)
w_btn_scan_dbc = widgets.Button(description='Buscar DBC en workspace', button_style='')
w_btn_scan_blf = widgets.Button(description='Buscar BLF en workspace', button_style='')
w_btn_set = widgets.Button(description='Usar valores', button_style='success')
w_status = widgets.HTML(value='')

box = widgets.VBox([
    widgets.HTML('<b>Formulario de selección (widgets)</b> — Pega rutas o usa los botones de búsqueda.'),
    widgets.HBox([w_dbc_text, w_btn_scan_dbc]),
    w_json_text,
    widgets.HBox([w_blf_text, w_btn_scan_blf]),
    w_out_text,
    w_btn_set,
    w_status
])

def _scan(pattern: str, max_items=20):
    try:
        found = [str(p.resolve()) for p in Path.cwd().rglob(pattern)]
        return found[:max_items]
    except Exception:
        return []


def _uniq_split(val: str):
    parts = [s.strip() for s in val.split(';') if s.strip()]
    out, seen = [], set()
    for p in parts:
        q = str(Path(p).resolve())
        if q not in seen:
            seen.add(q)
            out.append(q)
    return out


def _on_scan_dbc(_):
    found = _scan('*.dbc')
    if found:
        w_dbc_text.value = ';'.join(found)
        w_status.value = f'<span style="color:green">Sugeridos {len(found)} DBC</span>'
    else:
        w_status.value = '<span style="color:red">No se encontraron DBC en el workspace</span>'


def _on_scan_blf(_):
    found = _scan('*.blf', max_items=30)
    if found:
        w_blf_text.value = ';'.join(found)
        w_status.value = f'<span style="color:green">Sugeridos {len(found)} BLF</span>'
    else:
        w_status.value = '<span style="color:red">No se encontraron BLF en el workspace</span>'


def _on_set(_):
    global PATHS_DBC, PATH_JSON_DEF, PATHS_BLF, OUTPUT_JSONL
    PATHS_DBC = _uniq_split(w_dbc_text.value)
    PATH_JSON_DEF = w_json_text.value.strip()
    PATHS_BLF = _uniq_split(w_blf_text.value)
    OUTPUT_JSONL = w_out_text.value.strip()
    w_status.value = (
        f'<b>Asignado</b>: DBC={len(PATHS_DBC)} | JSON={PATH_JSON_DEF or "<no>"} | BLF={len(PATHS_BLF)} | Salida={OUTPUT_JSONL or "<no>"}'
    )

w_btn_scan_dbc.on_click(_on_scan_dbc)
w_btn_scan_blf.on_click(_on_scan_blf)
w_btn_set.on_click(_on_set)

display(box)


In [13]:
# Asignación directa de rutas (opcional)
# Si las variables siguen vacías, puedes asignarlas aquí manualmente y ejecutar esta celda.
try:
    PATHS_DBC
except NameError:
    PATHS_DBC = []
try:
    PATHS_BLF
except NameError:
    PATHS_BLF = []
try:
    PATH_JSON_DEF
except NameError:
    PATH_JSON_DEF = ""
try:
    OUTPUT_JSONL
except NameError:
    OUTPUT_JSONL = ""

print("Valores actuales:")
print("  DBCs:", PATHS_DBC)
print("  JSON definiciones:", PATH_JSON_DEF or "<no>")
print("  BLFs:", PATHS_BLF)
print("  Salida JSONL:", OUTPUT_JSONL or "<no>")

# Ejemplos (descomenta y ajusta):
# PATHS_DBC = [r"C:\\ruta\\a\\red1.dbc", r"C:\\ruta\\a\\red2.dbc"]
# PATHS_BLF = [r"C:\\ruta\\a\\log1.blf", r"C:\\ruta\\a\\log2.blf"]
# PATH_JSON_DEF = r"C:\\ruta\\a\\definiciones.json"  # opcional
# OUTPUT_JSONL = r"C:\\ruta\\salida\\dataset_rag_decode_ev.jsonl"


Valores actuales:
  DBCs: ['C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\IP_JZ - AUX_CHG.dbc', 'C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\IP_JZ - CAN CARROC.DBC', 'C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\IP_JZ - CAN CATL.dbc', 'C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\IP_JZ - CAN EV.DBC']
  JSON definiciones: <no>
  BLFs: ['C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\Logs_Recorrido2_19092025\\Logging_2025-09-19_07-07-52.blf', 'C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\Logs_Recorrido2_19092025\\Logging_2025-09-19_07-25-15.blf', 'C:\\Users\\henry\\OneDrive\\Documentos\\2. Formación\\Maestria\\Proyecto Integrador\\Datos\\Logs_Recorrido2_19092025\\Logging_2025-09-19_07-27-46.blf', 'C:\\Users\\henr

In [32]:
# Orquestación: decodificar BLF(s), construir dataset y exportar JSONL

# Aseguramos dependencias mínimas en esta celda
import os
import pandas as pd
import numpy as np

# Variables de configuración rápida
RED_CAN = "CAN_1"
TOP_N_SENALES = 5
MIN_DURACION_EVENTO = 10  # filas

# Cargar definiciones combinadas (múltiples DBC + JSON opcional)
if PATHS_DBC or PATH_JSON_DEF:
    defs = cargar_definiciones_combinadas(paths_dbc=PATHS_DBC or [], path_json_def=PATH_JSON_DEF or None)
else:
    defs = {}

if PATHS_BLF and not PATHS_DBC:
    print("Advertencia: Para decodificar BLF se requiere al menos un DBC. Suministra PATHS_DBC o decodifica fuera del notebook.")

entries_totales = []
detection_rules = None

for idx_blf, path_blf in enumerate(PATHS_BLF, start=1):
    try:
        if PATHS_DBC:
            df = decodificar_blf_a_dataframe_stream_multi(path_blf=path_blf, paths_dbc=PATHS_DBC)
        else:
            df = pd.DataFrame()
        if df.empty:
            print(f"BLF vacío o no decodificable: {path_blf}")
            continue

        # Construcción automática de reglas (solo una vez, usando el primer DF no vacío)
        if detection_rules is None:
            detection_rules = construir_reglas_automaticas(df, defs)
            print("Reglas auto-generadas:", detection_rules)
            if not detection_rules:
                print("No se pudieron inferir reglas desde DBC/DF; se usarán reglas por defecto internas.")

        constructor = ConstructorDatasetRAG(
            definiciones_dbc=defs,
            red_can=RED_CAN,
            top_n_senales=TOP_N_SENALES,
            detection_rules=detection_rules,
            min_duracion_evento=MIN_DURACION_EVENTO,
        )

        # Construir entradas por eventos
        entries = constructor.construir_dataset_completo(df, fuente=path_blf)
        print(f"{path_blf}: {len(entries)} eventos")
        entries_totales.extend(entries)
    except Exception as e:
        print(f"Error procesando {path_blf}: {e}")

# Escribir JSONL
if entries_totales:
    os.makedirs(os.path.dirname(OUTPUT_JSONL), exist_ok=True)
    escribir_jsonl(entries_totales, OUTPUT_JSONL)
    print(f"Dataset JSONL escrito en: {OUTPUT_JSONL} ({len(entries_totales)} eventos totales)")
else:
    print("No se generaron entradas. Revisa reglas auto-generadas/umbrales de eventos o la cobertura del DBC.")


Reglas auto-generadas: {'WhBasedVehSpeed': {'op': '>', 'valor': 11.851}, 'Brake_Application_Pressure': {'op': '>', 'valor': 0.2}, 'EngTorqueMode_SPN899': {'op': '>', 'valor': 0.0}, 'Param_MaxCurrentPerTray': {'op': '>', 'valor': 0.0}}
C:\Users\henry\OneDrive\Documentos\2. Formación\Maestria\Proyecto Integrador\Datos\Logs_Recorrido2_19092025\Logging_2025-09-19_07-07-52.blf: 0 eventos
C:\Users\henry\OneDrive\Documentos\2. Formación\Maestria\Proyecto Integrador\Datos\Logs_Recorrido2_19092025\Logging_2025-09-19_07-25-15.blf: 0 eventos
C:\Users\henry\OneDrive\Documentos\2. Formación\Maestria\Proyecto Integrador\Datos\Logs_Recorrido2_19092025\Logging_2025-09-19_07-25-15.blf: 0 eventos
Error procesando C:\Users\henry\OneDrive\Documentos\2. Formación\Maestria\Proyecto Integrador\Datos\Logs_Recorrido2_19092025\Logging_2025-09-19_07-27-46.blf: Unable to allocate 9.76 GiB for an array with shape (470, 2787399) and data type float64
Error procesando C:\Users\henry\OneDrive\Documentos\2. Formación\

In [None]:
# Utilidades de ajuste automático: duración mínima en filas y selección heurística de señales desde DBC
import numpy as np
import pandas as pd
from typing import Dict, List, Set

def calcular_min_duracion_filas(df: pd.DataFrame, segundos: float = 2.0) -> int:
    """Convierte una duración en segundos a filas según la mediana de delta timestamp.
    Devuelve al menos 3 filas para evitar falsos positivos.
    """
    if df is None or df.empty or "timestamp" not in df.columns:
        return max(3, int(segundos * 10))  # suponer ~10 Hz si no hay timestamp
    ts = pd.to_numeric(df["timestamp"], errors="coerce").dropna().values
    if len(ts) < 3:
        return max(3, int(segundos * 10))
    d = np.diff(ts)
    d = d[(d > 0) & np.isfinite(d)]
    if len(d) == 0:
        return max(3, int(segundos * 10))
    med = float(np.median(d))
    if med <= 0 or not np.isfinite(med):
        return max(3, int(segundos * 10))
    filas = int(np.ceil(segundos / med))
    return max(3, filas)


def senales_interes_desde_defs(defs: Dict[str, Dict[str, Dict]]) -> Set[str]:
    """Construye una allowlist de señales típicas a partir de nombres en defs (normalizados).
    Palabras clave: velocidad, brake/freno, torque, corriente, pedal, acelerador, rpm, soc, voltaje.
    Limita a 40 señales para mantener memoria controlada.
    """
    if not defs:
        return set()
    keywords = [
        "velocidad", "speed", "veh_speed", "vehicle_speed",
        "brake", "freno",
        "torque",
        "corriente", "current", "ibatt", "battery",
        "pedal", "acelerador", "throttle",
        "rpm", "engine_speed",
        "soc", "state_of_charge",
        "volt", "voltage",
    ]
    out: List[str] = []
    for _, sigs in defs.items():
        for raw_name in sigs.keys():
            n = normalizar_nombre_senal(raw_name)
            l = n.lower()
            if any(k in l for k in keywords):
                out.append(n)
    # Quitar duplicados conservando orden
    seen = set()
    uniq = []
    for s in out:
        if s not in seen:
            seen.add(s)
            uniq.append(s)
    return set(uniq[:40])


In [None]:
# Orquestación (robusta y memory-safe): filtrado por señales, remuestreo y duración adaptativa
import os
import pandas as pd
from typing import Optional, Set

# Configuración rápida
RED_CAN = globals().get("RED_CAN", "CAN_1")
TOP_N_SENALES = int(globals().get("TOP_N_SENALES", 5))
RESAMPLE_MS = 100                 # remuestreo a 10 Hz aprox
MAX_MENSAJES_SAMPLE = 200_000     # para inferir reglas sin agotar memoria
MAX_MENSAJES_FULL: Optional[int] = None  # o fija, p.ej. 1_500_000
MIN_DURACION_S = 2.0              # duración mínima de evento en segundos

# Cargar definiciones combinadas si no existen
if 'defs' not in globals() or not isinstance(defs, dict) or not defs:
    try:
        defs = cargar_definiciones_combinadas(paths_dbc=PATHS_DBC or [], path_json_def=PATH_JSON_DEF or None)
    except Exception:
        defs = {}

if not PATHS_BLF:
    print("No hay BLF para procesar. Revisa PATHS_BLF (usa la celda de selección anterior).")
else:
    # 1) Construir allowlist heurística a partir del DBC
    allow_heur: Set[str] = senales_interes_desde_defs(defs)
    if allow_heur:
        print(f"Allowlist heurística desde DBC: {len(allow_heur)} señales")
    else:
        print("No se pudo derivar allowlist desde DBC; se continuará sin filtro en la muestra (riesgo de memoria).")

    # 2) Decodificar una muestra del primer BLF con filtro para proponer reglas
    reglas_auto = None
    df_sample = pd.DataFrame()
    try:
        df_sample = decodificar_blf_a_dataframe_stream_multi_filtrado(
            path_blf=PATHS_BLF[0],
            paths_dbc=PATHS_DBC,
            signals_allow=allow_heur if allow_heur else None,
            max_mensajes=MAX_MENSAJES_SAMPLE,
            resample_ms=RESAMPLE_MS,
        )
    except Exception as e:
        print("Error al decodificar muestra:", e)

    if not df_sample.empty:
        try:
            reglas_auto = sugerir_reglas_basicas(df_sample)
            print("Reglas auto-generadas:", reglas_auto)
            diagnosticar_reglas_eventos(df_sample, reglas_auto, min_duracion=10)
        except Exception as e:
            print("No se pudieron sugerir/diagnosticar reglas:", e)
    else:
        print("Muestra vacía; no se pudieron sugerir reglas. Se usará un conjunto básico.")
        reglas_auto = reglas_auto or {}

    # 3) Señales finales a conservar (unión reglas + heurística)
    allow_final: Set[str] = set(reglas_auto.keys()) | allow_heur
    if allow_final:
        print(f"Allowlist final: {len(allow_final)} señales")

    # 4) Procesar todos los BLF con filtrado, remuestreo y duración adaptativa
    entries_totales = []
    for path_blf in PATHS_BLF:
        try:
            df = decodificar_blf_a_dataframe_stream_multi_filtrado(
                path_blf=path_blf,
                paths_dbc=PATHS_DBC,
                signals_allow=allow_final if allow_final else None,
                max_mensajes=MAX_MENSAJES_FULL,
                resample_ms=RESAMPLE_MS,
            )
            if df.empty:
                print(f"BLF vacío o no decodificable (filtrado): {path_blf}")
                continue

            min_filas = calcular_min_duracion_filas(df, segundos=MIN_DURACION_S)
            constructor = ConstructorDatasetRAG(
                definiciones_dbc=defs,
                red_can=RED_CAN,
                top_n_senales=TOP_N_SENALES,
                detection_rules=reglas_auto if reglas_auto else None,
                min_duracion_evento=min_filas,
            )
            entries = constructor.construir_dataset_completo(df, fuente=path_blf)
            print(f"{path_blf}: {len(entries)} eventos (min {min_filas} filas)")
            entries_totales.extend(entries)
        except Exception as e:
            print(f"Error procesando {path_blf}: {e}")

    # 5) Guardar JSONL
    if entries_totales:
        try:
            os.makedirs(os.path.dirname(OUTPUT_JSONL), exist_ok=True)
            escribir_jsonl(entries_totales, OUTPUT_JSONL)
            print(f"Dataset JSONL escrito en: {OUTPUT_JSONL} ({len(entries_totales)} eventos totales)")
        except Exception as e:
            print("No fue posible escribir el JSONL:", e)
    else:
        print("No se generaron entradas. Revisa las reglas, la allowlist y el remuestreo.")
