# Ingesta de datos (Bronce)
Este notebook implementa la **etapa de ingesta** del pipeline Big Data (nivel *Bronce*).
El objetivo es:
- Leer archivos NDJSON crudos (uno por línea = un evento).
- Detectar y aislar líneas rotas o con errores.
- Añadir trazabilidad: `_source_file`, `_ingest_ts`, `_batch_id`.
- Generar dos DataFrames: uno limpio (`df`) y otro con errores (`bad_df`).


In [None]:
import os
import sys
import json
import pandas as pd
from typing import Iterable, List, Tuple


In [None]:
def iter_lines(path: str) -> Iterable[str]:
    """
    Itera sobre las líneas de un archivo de texto (NDJSON).
    - Verifica que el archivo exista.
    - Devuelve cada línea como string.
    """
    if not os.path.isfile(path):
        print(f"[ERROR] No se encontró el fichero: {path}", file=sys.stderr)
        sys.exit(2)

    with open(path, "r", encoding="utf-8", errors="ignore") as fh:
        for line in fh:
            yield line


**Explicación:**

Esta función es un generador: devuelve las líneas una por una (sin cargar todo el archivo a memoria).Ideal para archivos grandes (“streaming read”).Si el archivo no existe, se corta el proceso.

In [None]:
def read_ndjson_bronze(path: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Lee un archivo NDJSON (un JSON por línea).
    Devuelve:
      - df: líneas válidas
      - bad_df: líneas rotas (no parseables)
    """

    rows: List[dict] = []
    bad: List[dict] = []

    # Recorre línea a línea
    for line in iter_lines(path):
        line = line.strip()
        if not line:
            continue

        try:
            obj = json.loads(line)
            obj["_source_file"] = os.path.basename(path)
            rows.append(obj)
        except Exception:
            bad.append({
                "line": line,
                "_source_file": os.path.basename(path),
                "_error": "invalid_json"
            })

    # Crea DataFrames
    df = pd.DataFrame(rows)
    bad_df = pd.DataFrame(bad)

    # Metadata de ingesta
    ts_now = pd.Timestamp.now(tz="UTC")
    batch_id = os.getpid()

    for d in (df, bad_df):
        d["_ingest_ts"] = ts_now
        d["_batch_id"] = batch_id

    return df, bad_df


1. Lee el archivo NDJSON línea por línea.
2. Cada línea intenta convertirse en JSON:
   - Si es válida → se guarda en `rows`.
   - Si falla → se guarda en `bad` con el error `invalid_json`.
3. Crea dos DataFrames:
   - `df`: los eventos válidos.
   - `bad_df`: las líneas rotas.
4. Añade columnas de trazabilidad:
   - `_source_file`: nombre del archivo de origen.
   - `_ingest_ts`: marca de tiempo de ingesta.
   - `_batch_id`: identificador único de lote (PID ).


Nivel BRONCE = datos crudos con trazabilidad, mínima transformación.
En etapas posteriores (PLATA y ORO) se harán:
  - Tipado y normalización de campos.
  - Definición de sesiones (timeouts, bots).
  - Cálculo de métricas y reportes (embudos, top paths, etc.).
