<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h2 style="text-align:left;">UT1 - BDA - Ejercicio Sensores de CO₂ — Borja Ramos Oliva</h2>
<p style="font-size:16px;">
Este es el cuaderno en el que desarrollaremos el pipeline completo del proceso: ingesta micro-batch, limpieza, almacenamiento y reporte Markdown.
</p>
</div>


<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h3 style="text-align:left;">1. Introducción y Objetivos</h3>
<p style="font-size:14px;">
En este cuaderno, realizaremos el ejercicio propuesto en unidad 1 de Big Data Aplicado (BDA). El objetivo principal es desarrollar un pipeline completo que abarque desde la ingesta de datos en micro-batch, pasando por la limpieza y almacenamiento de los mismos, hasta la generación de un reporte en formato Markdown. Utilizaremos datos de sensores de CO₂ para ilustrar cada etapa del proceso.

</p>
</div>

In [None]:
### 1.1 Importamos las librerías necesarias

from pathlib import Path
from datetime import datetime, timezone, timedelta
import json
import sqlite3
import os

import numpy as np
import pandas as pd

In [None]:
### 1.2 Definimos la configuración inicial de pandas
pd.set_option("display.max_rows", 10) # Mostrar todas las filas
pd.set_option("display.max_columns", None) # Mostrar todas las columnas
pd.set_option("display.width", 120) # Ajustar el ancho de la visualización


In [None]:
### 1.3 Definimos constantes y parámetros del proyecto. Su uso se explica en el reporte final.
### 1.4 Parámetros de calidad del aire según concentración de CO2 (ppm)
BANDAS = {
    "verde": (None, 700),     # buena calidad del aire
    "amarilla": (700, 1000),  # media
    "roja": (1000, None)      # mala (alerta)
}

HORARIO_LECTIVO = [(8, 14), (16, 21)] ### Definimos el horario lectivo, por ejemplo de 8 a 14h y de 16 a 21h

### 1.5 Definimos las rutas de los directorios del proyecto, ingesta, almacenamiento y reporte

# Generamos el directorio del proyecto si no existe con la siguiente línea (descomentarlo si es necesario)
# !mkdir -p project

ROOT = Path.cwd() / "project"
DATA = ROOT / "data" / "drops"
OUT  = ROOT / "output"
PARQ = OUT / "parquet"
QLTY = OUT / "quality"
CKPT = OUT / "checkpoints"
DB   = OUT / "ut1.db"
REPORT = OUT / "reporte.md"

for d in [DATA, OUT, PARQ, QLTY, CKPT]:
    d.mkdir(parents=True, exist_ok=True)

print("Estructura del proyecto creada o verificada ✅")
print(f"Raíz: {ROOT}")

Estructura del proyecto creada o verificada ✅
Raíz: /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1/project


In [None]:
### Podemos verificar el directorio (a mi me han fallado permisos en algunas ocasiones))
# import os, pathlib
# print("CWD:", os.getcwd())
# print("Nota: notebook está en:", pathlib.Path().resolve())

CWD: /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1
Nota: notebook está en: /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1


<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h3 style="text-align:left;">2. Generación / adquisición de datos</h3>
<p style="font-size:14px;">
Vamos a generar un dataset simulado siguiendo la estructura que se da en el enunciado.

En él, introduciremos una serie de inconsistencias que luego limpiaremos explicando el por qué. 

</p>
</div>

In [None]:
### 2.1 Generaremos un NDJSON con los datos simulados en el directorio de ingesta: {DATA} (fuente bronce)
### Se llamará "lecturas.log" en projecto/data/drops/lecturas.log
from datetime import timedelta
LOGFILE = DATA / "lecturas.log"

In [None]:
# 2.2 Definimos la función para generar el event_id

def generar_evento_id(ts_iso: str, aula: str) -> str:
    """
    Genera un identificador único de evento (`event_id`) para cada lectura.

    Args:
        ts_iso (str): Marca temporal en formato ISO (por ejemplo '2025-01-03T09:00:00Z').
        aula (str): Nombre o código del aula (por ejemplo 'A101').

    Returns:
        str: Identificador en formato 'aula-hhmmss', 
             por ejemplo 'a101-090000' para A101 a las 09:00:00.
    
    Ejemplo:
        >>> gen_event_id("2025-01-03T09:00:00Z", "A101")
        'a101-090000'
    """
    ts = pd.to_datetime(ts_iso, utc=True)
    return f"{str(aula).lower()}-{ts.strftime('%H%M%S')}"

In [None]:
# 2.3 Definimos la función para simular lecturas de sensores de CO2 con anomalías

def simular_lecturas(
    fp: Path,
    start_utc: datetime,
    minutes: int = 180,
    step_sec: int = 15,
    aulas=("A101", "A102", "A103"),
    seed: int = 7,
) -> None:
    """
    Simula un conjunto de lecturas de sensores de CO₂ y las guarda en formato NDJSON.

    Crea un archivo `.log` donde cada línea es un evento JSON con los campos:
    - ts: marca temporal ISO (UTC)
    - aula: identificador del aula
    - co2_ppm: valor de concentración de CO₂ en ppm
    - event_id: identificador único (aula-hhmmss)

    Además, introduce intencionadamente anomalías para probar la limpieza posterior:
    - Valores fuera de rango (co2_ppm < 0 o > 5000)
    - Spikes (incrementos repentinos > 1000 ppm)
    - Sensor atascado (valores constantes durante 5 min)
    - Líneas malformadas o con valores nulos
    - Registros fuera del horario lectivo (empieza 06:00Z)

    Args:
        fp (Path): Ruta de salida del archivo NDJSON.
        start_utc (datetime): Fecha/hora inicial (en UTC).
        minutes (int, optional): Duración total de la simulación en minutos. Por defecto 180 (3h).
        step_sec (int, optional): Intervalo entre lecturas en segundos. Por defecto 15.
        aulas (tuple[str], optional): Lista o tupla de aulas simuladas. Por defecto ('A101','A102','A103').
        seed (int, optional): Semilla aleatoria para reproducibilidad. Por defecto 7.

    Returns:
        None: Escribe directamente en el archivo NDJSON especificado.

    Ejemplo:
        >>> simulate_lecturas(Path("lecturas.log"), datetime(2025,1,3,6,0, tzinfo=timezone.utc))
    """
    rng = np.random.default_rng(seed)
    ts = start_utc.replace(second=0, microsecond=0)

    fp.parent.mkdir(parents=True, exist_ok=True)
    with fp.open("w", encoding="utf-8") as f:
        for i in range(int((minutes * 60) // step_sec)):
            for aula in aulas:
                # establecemos un patrón de clase (simula subida y bajada progresiva)
                minute = (ts.minute + ts.hour * 60) % 60
                if minute < 10:      base, noise = 500, 40
                elif minute < 25:    base, noise = 900, 80
                elif minute < 40:    base, noise = 1400, 120
                elif minute < 50:    base, noise = 1800, 150
                else:                base, noise = 900, 80

                val = int(np.clip(rng.normal(base, noise), -2000, 7000))

                # aquí creamos anomalías intencionadas
                r = rng.random()
                if r < 0.01:       # rango bajo
                    val = -50
                elif r < 0.02:     # rango alto
                    val = 7000
                if rng.random() < 0.01:  # aquí un "spike" positivo
                    val = int(np.clip(val + rng.choice([1500, 2000]), -2000, 7000))

                # sensor atascao , no varía en 5min. (20 lecturas de 15s)
                if aula == "A102" and 60*60 <= i*step_sec < 60*60 + 5*60:
                    val = 800

                rec = {
                    "ts": ts.isoformat().replace("+00:00", "Z"),
                    "aula": aula,
                    "co2_ppm": val,
                    "event_id": generar_evento_id(ts.isoformat(), aula),
                }
                f.write(json.dumps(rec) + "\n")

            # líneas malformadas y nulos
            if i == 45:
                f.write('{"ts": "2025-01-03T08:45:00Z", "aula": , "co2_ppm": 800}\n') # falta aula
            if i == 46:
                f.write('{"ts":"2025-01-03T08:46:00Z","aula":"","co2_ppm":null,"event_id":"mal-084600"}\n') # aula vacía, co2_ppm nulo

            ts += timedelta(seconds=step_sec) # avanzamos el tiempo 

    print(f"Generado NDJSON: {fp}  (líneas ≈{(minutes*60)//step_sec * len(aulas)})") # número aproximado de líneas 


# Ejecutar solo si no existe , porque si no, se sobreescribe
if not LOGFILE.exists():
    simular_lecturas(
        LOGFILE,
        start_utc=datetime(2025, 1, 3, 6, 0, tzinfo=timezone.utc),  # incluye tramo fuera de horario
        minutes=180,
        step_sec=15,
    )
else:
    print(f"Ya hay un archivo: {LOGFILE} en el directorio y no se regenerará otro.")

Generado NDJSON: /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1/project/data/drops/lecturas.log  (líneas ≈2160)


<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h3 style="text-align:left;">3. Ingesta de datos</h3>
<p style="font-size:14px;">
Preparamos las tablas de SQLite y preparamos la ingesta para que el programa lea solo las líneas nuevas del .log

</p>
</div>

In [None]:
### 3.1 Creación de la base de datos SQLite y las tablas necesarias para almacenar los datos procesados y helpers

from typing import Tuple

def ensure_db(db_path: Path) -> None:
    """
    Crea (si no existen) las tablas de trabajo:
      - raw_events: eventos tal cual llegan (idempotencia por event_id)
      - clean_events: se poblará en la fase de limpieza
      - quarantine: filas apartadas con causa de calidad
    Política de idempotencia: 'último gana' por _ingest_ts (UPSERT).
    """
    with sqlite3.connect(db_path) as con:
        cur = con.cursor()
        # raw_events: trazabilidad mínima + event_id único
        cur.execute("""
        CREATE TABLE IF NOT EXISTS raw_events (
            ts TEXT,
            aula TEXT,
            co2_ppm REAL,
            event_id TEXT PRIMARY KEY,
            _ingest_ts TEXT NOT NULL,
            _source_file TEXT,
            _pos INTEGER
        );
        """)
        # clean_events: mismo esquema que raw + campos derivados (se llenará después)
        cur.execute("""
        CREATE TABLE IF NOT EXISTS clean_events (
            ts TEXT,
            aula TEXT,
            co2_ppm REAL,
            event_id TEXT PRIMARY KEY,
            _ingest_ts TEXT NOT NULL,
            _source_file TEXT,
            _pos INTEGER
        );
        """)
        # quarantine: filas con problemas de calidad + causa
        cur.execute("""
        CREATE TABLE IF NOT EXISTS quarantine (
            ts TEXT,
            aula TEXT,
            co2_ppm REAL,
            event_id TEXT,
            cause TEXT NOT NULL,
            _ingest_ts TEXT NOT NULL,
            _source_file TEXT,
            _pos INTEGER
        );
        """)
        con.commit()


In [None]:
# 3.2 Definimos la función para insertar datos en raw_events con idempotencia

def upsert_raw(df: pd.DataFrame, db_path: Path) -> Tuple[int,int]:
    """
    Inserta df en raw_events con idempotencia por event_id.
    UPSERT: si event_id existe y el nuevo _ingest_ts es mayor, actualiza.
    Devuelve (insertados_nuevos, actualizados).
    """
    if df.empty:
        return (0, 0)
    with sqlite3.connect(db_path) as con:
        cur = con.cursor()
        cur.execute("PRAGMA journal_mode=WAL;")
        sql = """
        INSERT INTO raw_events (ts, aula, co2_ppm, event_id, _ingest_ts, _source_file, _pos)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(event_id) DO UPDATE SET
            ts=excluded.ts,
            aula=excluded.aula,
            co2_ppm=excluded.co2_ppm,
            _ingest_ts=excluded._ingest_ts,
            _source_file=excluded._source_file,
            _pos=excluded._pos
        WHERE excluded._ingest_ts > raw_events._ingest_ts;
        """
        before = cur.execute("SELECT COUNT(*) FROM raw_events;").fetchone()[0]
        updated = 0
        for row in df[["ts","aula","co2_ppm","event_id","_ingest_ts","_source_file","_pos"]].itertuples(index=False):
            cur.execute(sql, row)
            # sqlite no da updated_count por fila fácilmente; estimamos al final
        con.commit()
        after = cur.execute("SELECT COUNT(*) FROM raw_events;").fetchone()[0]
        inserted = max(after - before, 0)
        # Aproximación de actualizados: total operaciones - insertados (cuando hay colisiones)
        # (si quieres precisión fina, puedes comparar _ingest_ts previos vs nuevos)
        updated = len(df) - inserted
        return (inserted, updated)

ensure_db(DB)
print("SQLite listo ✅ →", DB)

SQLite listo ✅ → /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1/project/output/ut1.db


In [None]:
# 3.3 Definimos la función para leer solo las nuevas líneas del NDJSON con checkpointing

def read_new_lines(fp: Path, checkpoint_dir: Path) -> pd.DataFrame:
    """
    Lee SOLO las nuevas líneas de un NDJSON (apoyándose en un checkpoint .offset),
    parsea cada línea a dict y añade trazabilidad:
      - _ingest_ts: timestamp UTC de la ingesta
      - _source_file: nombre del archivo fuente
      - _pos: offset (byte) de inicio de línea
    Las líneas malformadas se devuelven como registros con ts/aula/co2_ppm/event_id = None
    y cause='malformed' (esto permite mandarlas luego a quarantine en la fase de limpieza).

    Args:
        fp (Path): ruta del archivo NDJSON (ej. project/data/drops/lecturas.log)
        checkpoint_dir (Path): carpeta donde guardar/leer el .offset

    Returns:
        pd.DataFrame: filas nuevas con trazabilidad (puede contener registros 'malformed')
    """
    checkpoint_dir.mkdir(parents=True, exist_ok=True)
    ck = checkpoint_dir / (fp.name + ".offset")

    start = int(ck.read_text()) if ck.exists() else 0
    rows = []

    with fp.open("rb") as f:
        f.seek(start)
        while True:
            pos = f.tell()
            b = f.readline()
            if not b:
                break
            raw = b.decode("utf-8", errors="replace").strip()
            try:
                rec = json.loads(raw)
            except Exception:
                rec = {"ts": None, "aula": None, "co2_ppm": None, "event_id": None, "cause": "malformed"}
            rec["_ingest_ts"] = pd.Timestamp.utcnow().isoformat()
            rec["_source_file"] = fp.name
            rec["_pos"] = pos
            rows.append(rec)

        # guarda nuevo offset
        ck.write_text(str(f.tell()))

    return pd.DataFrame(rows)

In [None]:
# 3.4 Definimos la función principal de ingesta de un micro-lote

def ingest_microbatch(logfile: Path, checkpoint_dir: Path, db_path: Path) -> pd.DataFrame:
    """
    Orquesta la ingesta de un micro-lote:
      1) Lee nuevas líneas con checkpoint (read_new_lines)
      2) Separa las 'malformed' (para quarantine en la siguiente fase)
      3) UPSERT en raw_events (idempotencia por event_id, 'último gana')

    Returns:
        pd.DataFrame: DataFrame con las filas parseadas (incluye posibles 'malformed')
    """
    df_raw = read_new_lines(logfile, checkpoint_dir)
    if df_raw.empty:
        print("No hay líneas nuevas en el log. ✅")
        return df_raw

    # Vista rápida
    print(f"Nuevas filas leídas: {len(df_raw)} (incluyendo posibles 'malformed')")

    # Inserta SOLO las que tienen event_id (las malformed irán a quarantine en limpieza)
    df_valid = df_raw[df_raw["event_id"].notna()].copy()
    ins, upd = upsert_raw(df_valid, db_path)
    print(f"UPSERT raw_events → insertadas: {ins}, actualizadas: {upd}")

    # Nota: las 'malformed' las apartaremos en la fase de limpieza (quarantine)
    return df_raw

# Ejecuta la ingesta del micro-lote actual
df_raw_batch = ingest_microbatch(LOGFILE, CKPT, DB)
df_raw_batch.head(8)

Nuevas filas leídas: 2162 (incluyendo posibles 'malformed')
UPSERT raw_events → insertadas: 2161, actualizadas: 0


Unnamed: 0,ts,aula,co2_ppm,event_id,_ingest_ts,_source_file,_pos,cause
0,2025-01-03T06:00:00Z,A101,500.0,a101-060000,2025-11-10T10:58:54.502987+00:00,lecturas.log,0,
1,2025-01-03T06:00:00Z,A102,464.0,a102-060000,2025-11-10T10:58:54.504524+00:00,lecturas.log,90,
2,2025-01-03T06:00:00Z,A103,502.0,a103-060000,2025-11-10T10:58:54.504558+00:00,lecturas.log,180,
3,2025-01-03T06:00:15Z,A101,475.0,a101-060015,2025-11-10T10:58:54.504574+00:00,lecturas.log,270,
4,2025-01-03T06:00:15Z,A102,504.0,a102-060015,2025-11-10T10:58:54.504586+00:00,lecturas.log,360,
5,2025-01-03T06:00:15Z,A103,527.0,a103-060015,2025-11-10T10:58:54.504602+00:00,lecturas.log,450,
6,2025-01-03T06:00:30Z,A101,423.0,a101-060030,2025-11-10T10:58:54.504627+00:00,lecturas.log,540,
7,2025-01-03T06:00:30Z,A102,490.0,a102-060030,2025-11-10T10:58:54.504643+00:00,lecturas.log,630,


In [None]:
### PONER EN MODO TABLA para el reporte final

# | Campo       | Descripción                                                                 |
# |-------------|-----------------------------------------------------------------------------|
# | ts          | marca temporal original del evento (UTC, formato ISO)                      |
# | aula       | aula simulada (A101, A102, A103)                                           |
# | co2_ppm    | valor de concentración en ppm (float)                                      |
# | event_id   | identificador único del evento (clave natural, usada para idempotencia)    |
# | _ingest_ts | momento en que el lote fue procesado (ahora real, 2025-11-10…)            |
# | _source_file| archivo fuente (lecturas.log)                                             |
# | _pos       | posición de lectura en bytes dentro del archivo (checkpoint)               |
# | cause      | nula por ahora → se rellenará en limpieza cuando detectemos anomalías      |

# aula simulada (A101, A102, A103)
# co2_ppm
# valor de concentración en ppm (float)
# event_id
# identificador único del evento (clave natural, usada para idempotencia)
# _ingest_ts
# momento en que el lote fue procesado (ahora real, 2025-11-10…)
# _source_file
# archivo fuente (lecturas.log)
# _pos
# posición de lectura en bytes dentro del archivo (checkpoint)
# cause
# nula por ahora → se rellenará en limpieza cuando detectemos anomalías


<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h3 style="text-align:left;">4. Limpieza de datos y valoración de calidad. Exportación a Parquet</h3>
<p style="font-size:14px;">
A partir de la ingesta anteriormente explicada, exportamos tanto datos crudos como limpios

</p>
</div>

In [None]:
# 4.1 Definimos la función para limpiar y exportar datos a Parquet

def clean_and_export(db_path: Path, parquet_dir: Path) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Lee los datos crudos (raw_events) desde SQLite, detecta anomalías y genera:
      - clean_events  → válidos, listos para análisis
      - quarantine    → inválidos con su 'cause' documentada
    Exporta ambos a Parquet particionado por fecha/hora (bronze y silver).

    Reglas aplicadas:
      • co2_ppm < 300 o > 5000 → out_of_range
      • aula vacío → missing_aula
      • ts fuera de horario lectivo → out_of_hours (08–14 y 16–21)
      • registros con 'malformed' → malformed

    Args:
        db_path (Path): Ruta a la base de datos SQLite.
        parquet_dir (Path): Carpeta base para Parquet (output/parquet/)

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: (df_clean, df_quarantine)
    """
    import numpy as np

    with sqlite3.connect(db_path) as con:
        df = pd.read_sql_query("SELECT * FROM raw_events;", con)

    # Asegurar tipado
    df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=True)
    df["co2_ppm"] = pd.to_numeric(df["co2_ppm"], errors="coerce")
    df["aula"] = df["aula"].astype("string")

    # Definimos el horario válido (08–14 y 16–21)
    def in_valid_hours(ts):
        if pd.isna(ts):
            return False
        h = ts.hour
        return (8 <= h < 14) or (16 <= h < 21)

    # Detectar causas
    df["cause"] = np.nan
    df.loc[df["co2_ppm"].isna(), "cause"] = "null_value"
    df.loc[df["aula"].isna() | (df["aula"].str.strip() == ""), "cause"] = "missing_aula"
    df.loc[(df["co2_ppm"] < 300) | (df["co2_ppm"] > 5000), "cause"] = "out_of_range"
    df.loc[~df["ts"].apply(in_valid_hours), "cause"] = df["cause"].fillna("out_of_hours")
    df.loc[df["event_id"].isna(), "cause"] = "malformed"

    # Separamos limpios / cuarentena
    df_clean = df[df["cause"].isna()].copy()
    df_quar = df[~df["cause"].isna()].copy()

    # Guardamos en SQLite
    with sqlite3.connect(db_path) as con:
        df_clean.to_sql("clean_events", con, if_exists="replace", index=False)
        df_quar.to_sql("quarantine", con, if_exists="replace", index=False)
        con.commit()

    print(f"Guardado en SQLite → clean: {len(df_clean)}, quarantine: {len(df_quar)}")

    ### EXPORTAMOS A PARQUET ###
    for name, data in {"raw": df, "clean": df_clean}.items():
        if data.empty:
            continue
        data = data.copy()
        data["year"] = data["ts"].dt.year
        data["month"] = data["ts"].dt.month
        data["day"] = data["ts"].dt.day
        data["hour"] = data["ts"].dt.hour

        outdir = parquet_dir / name
        data.to_parquet(
            outdir,
            partition_cols=["year", "month", "day", "hour"],
            index=False
        )
        print(f"Exportado a Parquet ({name}) → {outdir}")

    return df_clean, df_quar


### Ejecutamos la limpieza completa
df_clean, df_quar = clean_and_export(DB, PARQ)

  df.loc[df["co2_ppm"].isna(), "cause"] = "null_value"


Guardado en SQLite → clean: 702, quarantine: 1459
Exportado a Parquet (raw) → /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1/project/output/parquet/raw
Exportado a Parquet (clean) → /Users/b0rjen/Library/Mobile Documents/com~apple~CloudDocs/dev/BDA_Proyecto_UT1_RA1/project/output/parquet/clean


In [14]:
### Vamos a imprimir los resultados
print("\nEjemplo datos limpios:")
display(df_clean.head(5))

print("\nEjemplo quarantine:")
display(df_quar.head(5))


Ejemplo datos limpios:


Unnamed: 0,ts,aula,co2_ppm,event_id,_ingest_ts,_source_file,_pos,cause
1442,2025-01-03 08:00:00+00:00,A102,510.0,a102-080000,2025-11-10T10:58:54.515738+00:00,lecturas.log,130495,
1443,2025-01-03 08:00:00+00:00,A103,501.0,a103-080000,2025-11-10T10:58:54.515743+00:00,lecturas.log,130585,
1444,2025-01-03 08:00:15+00:00,A101,465.0,a101-080015,2025-11-10T10:58:54.515747+00:00,lecturas.log,130675,
1445,2025-01-03 08:00:15+00:00,A102,520.0,a102-080015,2025-11-10T10:58:54.515752+00:00,lecturas.log,130765,
1446,2025-01-03 08:00:15+00:00,A103,550.0,a103-080015,2025-11-10T10:58:54.515756+00:00,lecturas.log,130855,



Ejemplo quarantine:


Unnamed: 0,ts,aula,co2_ppm,event_id,_ingest_ts,_source_file,_pos,cause
0,2025-01-03 06:00:00+00:00,A101,500.0,a101-060000,2025-11-10T10:58:54.502987+00:00,lecturas.log,0,out_of_hours
1,2025-01-03 06:00:00+00:00,A102,464.0,a102-060000,2025-11-10T10:58:54.504524+00:00,lecturas.log,90,out_of_hours
2,2025-01-03 06:00:00+00:00,A103,502.0,a103-060000,2025-11-10T10:58:54.504558+00:00,lecturas.log,180,out_of_hours
3,2025-01-03 06:00:15+00:00,A101,475.0,a101-060015,2025-11-10T10:58:54.504574+00:00,lecturas.log,270,out_of_hours
4,2025-01-03 06:00:15+00:00,A102,504.0,a102-060015,2025-11-10T10:58:54.504586+00:00,lecturas.log,360,out_of_hours


In [15]:
### Vamos a ver qué errores hay en quarantine, registros en aula y causa.
df_quar["cause"].value_counts()

cause
out_of_hours    1408
out_of_range      50
missing_aula       1
Name: count, dtype: int64

In [None]:
### Veamos también el recuento de lecturas limpias por aula
df_clean["aula"].value_counts()

aula
A101    235
A103    234
A102    233
Name: count, dtype: Int64

<div style="background-color:#001f3f; color:white; padding:15px; border-radius:8px;">
<h3 style="text-align:left;">5. Otras opciones de control y mantenimiento</h3>
<p style="font-size:14px;">
Con estas funciones podemos limpiar para agilizar consultas en la BBDD y resetear el pipeline

</p>
</div>

In [None]:
### 5.1 Optimización de consultas en SQLite (OPCIONAL)

with sqlite3.connect(DB) as con:
    cur = con.cursor()
    cur.execute("CREATE INDEX IF NOT EXISTS ix_raw_ts ON raw_events(ts);")
    cur.execute("CREATE INDEX IF NOT EXISTS ix_clean_ts_aula ON clean_events(ts, aula);")
    con.commit()

In [None]:
# 5.1.1: Listar índices en la base de datos
with sqlite3.connect(DB) as con:
    cur = con.cursor()
    cur.execute("SELECT name, tbl_name FROM sqlite_master WHERE type='index';")
    print(cur.fetchall())


[('sqlite_autoindex_raw_events_1', 'raw_events'), ('ix_raw_ts', 'raw_events'), ('ix_clean_ts_aula', 'clean_events')]


In [None]:
# 5.1.2: plan de consulta para una búsqueda común
with sqlite3.connect(DB) as con:
    cur = con.cursor()
    query = "SELECT * FROM clean_events WHERE aula='A101' AND ts BETWEEN '2025-01-03T08:00' AND '2025-01-03T09:00';"
    plan = cur.execute("EXPLAIN QUERY PLAN " + query).fetchall()
print(plan)

[(3, 0, 164, 'SEARCH clean_events USING INDEX ix_clean_ts_aula (ts>? AND ts<?)')]


In [None]:
# 5.2 Helper para limpiar la base de datos.
def clear_database(db_path: Path) -> None:
    """
    Elimina todas las tablas de la base de datos SQLite.
    Útil para resetear el pipeline y empezar de nuevo.

    Args:
        db_path (Path): Ruta a la base de datos SQLite.

    Returns:
        None
    """
    with sqlite3.connect(db_path) as con:
        cur = con.cursor()
        cur.execute("DROP TABLE IF EXISTS raw_events;")
        cur.execute("DROP TABLE IF EXISTS clean_events;")
        cur.execute("DROP TABLE IF EXISTS quarantine;")
        con.commit()
    print(f"Base de datos limpiada: {db_path}")

In [None]:
# 5.3 Reseteo de pipeline
def reset_pipeline():
    for p in [PARQ, QLTY, CKPT]:
        if p.exists():
            for x in p.rglob("*"):
                if x.is_file(): x.unlink()
    if DB.exists(): DB.unlink()
    print("Reseteado output/, checkpoints/ y ut1.db")


In [21]:

# reset_pipeline() --> Descomentar para resetear todo el pipeline