CELDA 0 — Título (Markdown)

# Fase 03 — PrepareWindowsDS

Generación del **dataset final de ventanas temporales materializadas**.

Cada fila del dataset contiene:
- `OW_events`: eventos observados (ventana de observación)
- `PW_events`: eventos a predecir (ventana de predicción)

Este notebook reproduce **exactamente** la lógica de `03_preparewindowsds.py`.


CELDA 1 — Imports y bootstrap

In [1]:
import os
import sys
from pathlib import Path
from bisect import bisect_left
from datetime import datetime, timezone
from time import perf_counter
import json
import yaml

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
# Bootstrap para localizar el proyecto
NOTEBOOK_PATH = Path.cwd().resolve()
ROOT = NOTEBOOK_PATH
for _ in range(10):
    if (ROOT / "mlops4ofp").exists():
        break
    ROOT = ROOT.parent
else:
    raise RuntimeError("No se pudo localizar el project root")

sys.path.insert(0, str(ROOT))
print("Project root:", ROOT)


Project root: /Users/juancarlosduenaslopez/Documents/mlops/mlops4ofp


CELDA 2 — Contexto de ejecución

In [3]:
from mlops4ofp.tools.run_context import (
    detect_execution_dir,
    detect_project_root,
    assemble_run_context,
)

PHASE = "03_preparewindowsds"

execution_dir = detect_execution_dir()
project_root = detect_project_root(execution_dir)

ACTIVE_VARIANT = None  # se detecta automáticamente


In [4]:
# Selección de variante (requiere VARIANT)
variants_dir = project_root / "executions" / PHASE
variants = sorted(p.name for p in variants_dir.iterdir() if p.is_dir())

env_variant = os.getenv("VARIANT") or os.getenv("ACTIVE_VARIANT")

# env_variant = "v001"  # Para forzar una variante concreta (descomentar y asignar la variante deseada)

if not env_variant:
    raise RuntimeError(
        "❌ VARIANT no definido. Ejecuta el notebook con: make nb3-run VARIANT=v001"
    )
ACTIVE_VARIANT = env_variant

In [5]:
# Para forzar una variante concreta, descomentar la siguiente línea e indicar la variante deseada (ej: "v903")
#ACTIVE_VARIANT  = "v904"

variant_root = variants_dir / ACTIVE_VARIANT

ctx = assemble_run_context(
    project_root=project_root,
    phase=PHASE,
    variant=ACTIVE_VARIANT,
    variant_root=variant_root,
    execution_dir=execution_dir,
)

print("Variante activa:", ACTIVE_VARIANT)


Variante activa: v906


CELDA 3 — Carga de parámetros F03

In [6]:
with open(variant_root / "params.yaml", "r", encoding="utf-8") as f:
    params = yaml.safe_load(f)

OW = int(params["OW"])
LT = int(params["LT"])
PW = int(params["PW"])
nan_strategy = params.get("nan_strategy", "discard")
window_strategy = params.get("window_strategy", "synchro")
parent_phase = params.get("parent_phase", "02_prepareeventsds")
parent_variant = params["parent_variant"]

print("OW, LT, PW =", OW, LT, PW)
print("window_strategy =", window_strategy)
print("nan_strategy =", nan_strategy)
print("parent_phase =", parent_phase)
print("parent_variant =", parent_variant)



OW, LT, PW = 100 10 10
window_strategy = asynPW
nan_strategy = discard
parent_phase = 02_prepareeventsds
parent_variant = v902


CELDA 4 — Resolución de Tu desde F02

In [7]:
with open(
    project_root
    / "executions"
    / parent_phase
    / parent_variant
    / f"{parent_phase}_metadata.json",
    "r",
    encoding="utf-8",
) as f:
    meta_f02 = json.load(f)

Tu_raw = params.get("Tu", None)
if Tu_raw is not None:
    Tu = float(Tu_raw)
else:
    Tu_f02 = meta_f02.get("Tu", None)
    if Tu_f02 is None:
        raise RuntimeError(
            "No se pudo determinar Tu: es None en F03 params y en F02 metadata"
        )
    Tu = float(Tu_f02)

print(f"[F03] Tu resuelto = {Tu} (origen: {'params' if Tu_raw is not None else 'F02_metadata'})", flush=True)
print("Tu =", Tu)

[F03] Tu resuelto = 10.0 (origen: F02_metadata)


Tu = 10.0


CELDA 5 — Carga del dataset F02

In [8]:
input_dataset = (
    project_root
    / "executions"
    / parent_phase
    / parent_variant
    / f"{parent_phase}_dataset.parquet"
 )

print(f"[F03] leyendo dataset F02: {input_dataset}", flush=True)
t_read_start = perf_counter()
df = pq.read_table(input_dataset).to_pandas(
    split_blocks=True,
    self_destruct=True,
 )
t_read_elapsed = perf_counter() - t_read_start
print(f"[F03] dataset F02 cargado en {t_read_elapsed:,.1f}s", flush=True)

if not df["segs"].is_monotonic_increasing:
    df = df.sort_values("segs", kind="mergesort").reset_index(drop=True)

print("[F03] preparando arrays times/events...", flush=True)
t_arr_start = perf_counter()
times = df["segs"].to_numpy(dtype=np.int64, copy=False)
events = df["events"].to_numpy()
lengths = np.fromiter((len(evs) for evs in events), dtype=np.int64, count=len(events))
offsets = np.empty(len(events) + 1, dtype=np.int64)
offsets[0] = 0
np.cumsum(lengths, out=offsets[1:])
events_flat = [ev for evs in events for ev in evs]
times_flat = [t for t, evs in zip(times, events) for _ in evs]
has_event = lengths > 0
t_arr_elapsed = perf_counter() - t_arr_start
print(f"[F03] arrays listos en {t_arr_elapsed:,.1f}s | eventos totales: {len(events_flat):,}", flush=True)

print("F02 cargado:", len(df), "filas")

[F03] leyendo dataset F02: /Users/juancarlosduenaslopez/Documents/mlops/mlops4ofp/executions/02_prepareeventsds/v902/02_prepareeventsds_dataset.parquet


[F03] dataset F02 cargado en 0.7s


[F03] preparando arrays times/events...


[F03] arrays listos en 3.6s | eventos totales: 468,122


F02 cargado: 3887242 filas


CELDA 6 — Catálogo de eventos NaN

In [9]:
with open(
    project_root
    / "executions"
    / parent_phase
    / parent_variant
    / f"{parent_phase}_event_catalog.json",
    "r",
    encoding="utf-8",
) as f:
    catalog = json.load(f)

nan_codes = {
    code for name, code in catalog.items()
    if name.endswith("_NaN_NaN")
}

t_nan_start = perf_counter()
has_nan = np.array(
    [any(ev in nan_codes for ev in evs) for evs in events],
    dtype=bool,
 )
nan_prefix = np.cumsum(has_nan, dtype=np.int64)
t_nan_elapsed = perf_counter() - t_nan_start
print(f"[F03] has_nan + prefix en {t_nan_elapsed:,.1f}s", flush=True)

[F03] has_nan + prefix en 2.4s


CELDA 7 — Definición de ventanas

In [10]:
OW_end = OW
PW_start = OW + LT
PW_end = OW + LT + PW


In [11]:
def window_start_iterator():
    """
    Iterador de inicios de ventana (t0) según la estrategia.

    Reglas:
    - synchro  : todos los Tu
    - withinPW : todos los Tu
    - asynPW   : todos los Tu
    - asynOW   : solo bins con ≥1 evento en OW
    """
    t_start = times[0]

    # geometría local (evita dependencias de orden de celdas)
    OW_span_local = OW * Tu
    PW_start_local = (OW + LT) * Tu
    PW_span_local = PW * Tu
    total_span_local = PW_start_local + PW_span_local

    t_end = times[-1] - total_span_local

    if window_strategy in ("synchro", "withinPW", "asynPW"):
        t = t_start
        while t <= t_end:
            yield t
            t += Tu

    elif window_strategy == "asynOW":
        active_bins = np.unique(
            ((times[lengths > 0] - times[0]) // Tu).astype(np.int64)
        )
        for b in active_bins:
            t = t_start + b * Tu
            if t <= t_end:
                yield t

    else:
        raise ValueError(f"window_strategy no soportada: {window_strategy}")


In [12]:
def idx_range(t0, t1):
    return bisect_left(times, t0), bisect_left(times, t1)

def has_nan_in_range(i0, i1):
    if i0 >= i1:
        return False
    return (nan_prefix[i1 - 1] - (nan_prefix[i0 - 1] if i0 > 0 else 0)) > 0

CELDA 8 — Generación de ventanas materializadas

In [13]:
output_path = variant_root / f"{PHASE}_dataset.parquet"

schema = pa.schema([
    ("OW_events", pa.list_(pa.int32())),
    ("PW_events", pa.list_(pa.int32())),
])
writer = pq.ParquetWriter(output_path, schema, compression="snappy")
BATCH = 100
rows = []

total = 0
kept = 0
LOG_EVERY = 100_000
t_start = perf_counter()
t_loop_start = t_start

for t0 in window_start_iterator():
    total += 1

    i_ow_0, i_ow_1 = idx_range(t0, t0 + OW * Tu)
    i_pw_0, i_pw_1 = idx_range(
        t0 + PW_start * Tu,
        t0 + PW_end * Tu,
    )
    i_pw_start0, i_pw_start1 = idx_range(
        t0 + PW_start * Tu,
        t0 + (PW_start + 1) * Tu,
    )

    ow_len = i_ow_1 - i_ow_0
    pw_len = i_pw_1 - i_pw_0

    if ow_len == 0 and pw_len == 0:
        continue

    if nan_strategy == "discard":
        if has_nan_in_range(i_ow_0, i_ow_1):
            continue
        if has_nan_in_range(i_pw_0, i_pw_1):
            continue

    ow_start = offsets[i_ow_0]
    ow_end = offsets[i_ow_1]
    pw_start = offsets[i_pw_0]
    pw_end = offsets[i_pw_1]

    ow_events = events_flat[ow_start:ow_end]
    pw_events = events_flat[pw_start:pw_end]

    if len(ow_events) == 0 and len(pw_events) == 0:
        continue

    if window_strategy == "asynOW" and len(ow_events) == 0:
        continue
    
    if window_strategy == "withinPW" and len(pw_events) == 0:
        continue

    if window_strategy == "asynPW" and i_pw_start0 == i_pw_start1:
        continue

    rows.append({
        "OW_events": ow_events,
        "PW_events": pw_events,
    })
    kept += 1

    if total % LOG_EVERY == 0:
        elapsed = perf_counter() - t_start
        print(
            f"[F03] ventanas: {total:,} | "
            f"escritas: {kept:,} | "
            f"tiempo: {elapsed:,.1f}s",
            flush=True,
        )

    if len(rows) >= BATCH:
        writer.write_table(pa.Table.from_pylist(rows, schema))
        rows.clear()

if rows:
    writer.write_table(pa.Table.from_pylist(rows, schema))

writer.close()

elapsed_total = perf_counter() - t_start
loop_elapsed = perf_counter() - t_loop_start
print("Ventanas totales:", total)
print("Ventanas válidas :", kept)
print(f"Tiempo total     : {elapsed_total:,.1f}s")
print(f"Tiempo loop      : {loop_elapsed:,.1f}s")

[F03] ventanas: 100,000 | escritas: 46,780 | tiempo: 2.1s


[F03] ventanas: 300,000 | escritas: 164,094 | tiempo: 6.3s


[F03] ventanas: 400,000 | escritas: 241,052 | tiempo: 8.4s


[F03] ventanas: 500,000 | escritas: 306,993 | tiempo: 10.5s


[F03] ventanas: 600,000 | escritas: 387,208 | tiempo: 12.7s


[F03] ventanas: 700,000 | escritas: 451,569 | tiempo: 14.8s


[F03] ventanas: 800,000 | escritas: 522,332 | tiempo: 16.9s


[F03] ventanas: 1,000,000 | escritas: 674,406 | tiempo: 21.3s


[F03] ventanas: 1,100,000 | escritas: 748,407 | tiempo: 23.4s


[F03] ventanas: 1,200,000 | escritas: 834,415 | tiempo: 25.6s


[F03] ventanas: 1,500,000 | escritas: 945,097 | tiempo: 31.9s


[F03] ventanas: 1,600,000 | escritas: 1,013,577 | tiempo: 34.1s


[F03] ventanas: 1,700,000 | escritas: 1,104,329 | tiempo: 36.5s


[F03] ventanas: 1,800,000 | escritas: 1,193,780 | tiempo: 38.8s


[F03] ventanas: 1,900,000 | escritas: 1,278,355 | tiempo: 41.0s


[F03] ventanas: 2,000,000 | escritas: 1,375,621 | tiempo: 43.3s


[F03] ventanas: 2,100,000 | escritas: 1,424,099 | tiempo: 45.4s


[F03] ventanas: 2,200,000 | escritas: 1,483,336 | tiempo: 47.5s


[F03] ventanas: 2,300,000 | escritas: 1,541,890 | tiempo: 49.6s


[F03] ventanas: 2,500,000 | escritas: 1,632,314 | tiempo: 53.7s


[F03] ventanas: 2,700,000 | escritas: 1,797,282 | tiempo: 58.2s


[F03] ventanas: 2,900,000 | escritas: 1,972,543 | tiempo: 62.7s


[F03] ventanas: 3,000,000 | escritas: 2,053,790 | tiempo: 65.0s


[F03] ventanas: 3,100,000 | escritas: 2,122,912 | tiempo: 67.1s


[F03] ventanas: 3,200,000 | escritas: 2,190,735 | tiempo: 69.3s


[F03] ventanas: 3,400,000 | escritas: 2,321,354 | tiempo: 73.6s


[F03] ventanas: 3,600,000 | escritas: 2,478,250 | tiempo: 77.9s


[F03] ventanas: 3,700,000 | escritas: 2,572,185 | tiempo: 80.1s


[F03] ventanas: 3,800,000 | escritas: 2,647,302 | tiempo: 82.3s


Ventanas totales: 3948144
Ventanas válidas : 2763074
Tiempo total     : 85.5s
Tiempo loop      : 85.5s


CELDA 9 — Escritura del dataset FINAL

In [14]:
print("Dataset F03 generado:", output_path)

Dataset F03 generado: /Users/juancarlosduenaslopez/Documents/mlops/mlops4ofp/executions/03_preparewindowsds/v906/03_preparewindowsds_dataset.parquet


CELDA 10 — Validación rápida

In [15]:
df_check = pd.read_parquet(output_path)

assert df_check.shape[1] == 2
def is_sequence(x):
    return isinstance(x, (list, tuple)) or hasattr(x, "__iter__")

assert all(is_sequence(x) for x in df_check["OW_events"][:10])
assert all(is_sequence(x) for x in df_check["PW_events"][:10])
print("Comprobación de lectura exitosa")

Comprobación de lectura exitosa


In [16]:
print("[NB-F03] Ejecutando checks formales F03 sobre el dataset final...")

df_check = pd.read_parquet(output_path)

# Invariante global F03
bad = (
    (df_check["OW_events"].apply(len) == 0)
    & (df_check["PW_events"].apply(len) == 0)
).sum()
assert bad == 0, f"❌ ERROR: hay {bad} pares con OW y PW vacías"
print("✔ CHECK: no hay pares OW+PW vacíos")

# Checks específicos por estrategia
if window_strategy == "withinPW":
    bad = (df_check["PW_events"].apply(len) == 0).sum()
    assert bad == 0, f"❌ ERROR: withinPW tiene {bad} PW vacías"
    print("✔ CHECK: withinPW cumple PW no vacía")

if window_strategy == "asynPW":
    print("ℹ️ CHECK: asynPW usa evento al inicio de PW")

print("✔ TODOS LOS CHECKS F03 SUPERADOS")


[NB-F03] Ejecutando checks formales F03 sobre el dataset final...


✔ CHECK: no hay pares OW+PW vacíos
ℹ️ CHECK: asynPW usa evento al inicio de PW
✔ TODOS LOS CHECKS F03 SUPERADOS
