# Laboratorio: CSV vs Parquet, Micro-batches y KPIs

Este cuaderno genera **datasets sintéticos** para tres casos (e-commerce, sensores IoT y logs de fraude), compara **CSV vs Parquet** (tamaño en disco y lectura simple), simula **micro-batches de 60 min** y construye **KPIs sencillos** con visualización en **matplotlib**.

**Contenido**:
1) Generación de datos sintéticos (3 casos)  
2) Comparativa CSV vs Parquet (tamaños)  
3) Simulación de micro-batches de 60 min  
4) KPIs básicos y gráficos  
5) Actividad flash para el equipo


In [None]:
# Dependencias (mantener aquí para Colab)
%pip -q install pyarrow pandas numpy matplotlib

In [None]:
import os
import math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

np.random.seed(42)
BASE = Path("/content")  # Colab
BASE.mkdir(parents=True, exist_ok=True)

def human_size(num_bytes: int) -> str:
    if num_bytes == 0:
        return "0 B"
    units = ["B","KB","MB","GB","TB"]
    power = min(int(math.log(num_bytes, 1024)), len(units)-1)
    return f"{num_bytes / (1024 ** power):.2f} {units[power]}"

## 1) Generación de datasets sintéticos
Se crean tres datasets con una ventana temporal de **2 días** con granularidad de minutos.

- **E-commerce**: pedidos (order_id, ts, customer_id, sku, qty, price, amount).  
- **IoT**: métricas de sensores (sensor_id, ts, temperature, humidity, status).  
- **Fraude**: eventos de transacciones (event_id, ts, user_id, ip, amount, is_flagged).

In [None]:
# Parámetros de la simulación
start = pd.Timestamp("2024-06-01 00:00:00")
end   = pd.Timestamp("2024-06-03 00:00:00")
ts_index = pd.date_range(start, end, freq="1min", inclusive="left")  # 2 días * 24h * 60

# --- E-COMMERCE ---
N_orders = 25000
ecom = pd.DataFrame({
    "order_id": np.arange(1, N_orders+1),
    "ts": np.random.choice(ts_index, size=N_orders, replace=True),
    "customer_id": np.random.randint(1000, 1999, size=N_orders),
    "sku": np.random.choice([f"SKU-{i:04d}" for i in range(500)], size=N_orders),
    "qty": np.random.randint(1, 5, size=N_orders),
    "price": np.round(np.random.uniform(5, 150, size=N_orders), 2),
})
ecom["amount"] = np.round(ecom["qty"] * ecom["price"], 2)

# --- IOT ---
N_sensors = 200
N_iot = 40000
iot = pd.DataFrame({
    "sensor_id": np.random.choice([f"S{i:03d}" for i in range(N_sensors)], size=N_iot),
    "ts": np.random.choice(ts_index, size=N_iot, replace=True),
    "temperature": np.round(np.random.normal(24, 3, size=N_iot), 2),
    "humidity": np.round(np.random.normal(55, 10, size=N_iot), 2),
})
iot["status"] = np.where((iot["temperature"]>30) | (iot["humidity"]>75), "alert", "ok")

# --- FRAUDE ---
N_events = 30000
fraud = pd.DataFrame({
    "event_id": np.arange(1, N_events+1),
    "ts": np.random.choice(ts_index, size=N_events, replace=True),
    "user_id": np.random.randint(5000, 7000, size=N_events),
    "ip": [f"192.168.{a}.{b}" for a,b in zip(np.random.randint(0,255,N_events), np.random.randint(1,255,N_events))],
    "amount": np.round(np.random.exponential(80, size=N_events), 2),
})
fraud["is_flagged"] = (np.random.rand(N_events) < 0.03) | (fraud["amount"] > 300)

ecom.sort_values("ts", inplace=True)
iot.sort_values("ts", inplace=True)
fraud.sort_values("ts", inplace=True)

ecom.head(), iot.head(), fraud.head()

## 2) CSV vs Parquet (tamaño y lectura simple)
Guardamos cada dataset en **CSV** y **Parquet** y medimos **tamaño en disco** y un tiempo simple de lectura (orientativo).

In [None]:
import time

paths = {
    "ecom_csv": BASE/"ecommerce.csv",
    "ecom_parq": BASE/"ecommerce.parquet",
    "iot_csv": BASE/"iot.csv",
    "iot_parq": BASE/"iot.parquet",
    "fraud_csv": BASE/"fraud.csv",
    "fraud_parq": BASE/"fraud.parquet",
}

# Guardar
ecom.to_csv(paths["ecom_csv"], index=False)
ecom.to_parquet(paths["ecom_parq"], index=False)
iot.to_csv(paths["iot_csv"], index=False)
iot.to_parquet(paths["iot_parq"], index=False)
fraud.to_csv(paths["fraud_csv"], index=False)
fraud.to_parquet(paths["fraud_parq"], index=False)

# Tamaños
sizes = []
for key, p in paths.items():
    sizes.append({"dataset": key, "bytes": os.path.getsize(p), "human": human_size(os.path.getsize(p))})
sizes_df = pd.DataFrame(sizes).sort_values("dataset")
sizes_df

In [None]:
# Lectura: medición simple (no exhaustiva)
bench = []

for name, p in [("ecom_csv", paths["ecom_csv"]), ("ecom_parq", paths["ecom_parq"]),
                ("iot_csv", paths["iot_csv"]),   ("iot_parq", paths["iot_parq"]),
                ("fraud_csv", paths["fraud_csv"]),("fraud_parq", paths["fraud_parq"])]:
    t0 = time.perf_counter()
    if name.endswith("csv"):
        _ = pd.read_csv(p)
    else:
        _ = pd.read_parquet(p)
    ms = (time.perf_counter() - t0) * 1000
    bench.append({"dataset": name, "read_ms": round(ms, 2)})

bench_df = pd.DataFrame(bench)
bench_df.sort_values("dataset")

## 3) Simulación de micro-batches (60 min)
Agrupamos por **hora** y contamos cuántos lotes se producirían por dataset (número de horas cubiertas).

In [None]:
def count_hourly_batches(df: pd.DataFrame, ts_col: str) -> int:
    hourly = pd.to_datetime(df[ts_col]).dt.floor("H")
    return hourly.nunique()

batches = pd.DataFrame({
    "dataset": ["ecommerce", "iot", "fraud"],
    "hourly_batches": [
        count_hourly_batches(ecom, "ts"),
        count_hourly_batches(iot, "ts"),
        count_hourly_batches(fraud, "ts"),
    ]
})
batches

## 4) KPIs y gráficos (matplotlib)
Se calculan **tres KPIs sencillos** por hora y se grafican en plots separados:
- **E-commerce**: *revenue* por hora (suma de `amount`)  
- **IoT**: temperatura media por hora  
- **Fraude**: conteo de eventos *flagged* por hora  

> Nota: se usa `matplotlib` sin estilos personalizados.

In [None]:
# --- E-commerce: revenue por hora ---
ecom_hour = (
    ecom.assign(hour=pd.to_datetime(ecom["ts"]).dt.floor("H"))
        .groupby("hour", as_index=False)["amount"].sum()
        .rename(columns={"amount":"revenue"})
)

plt.figure()
plt.plot(ecom_hour["hour"], ecom_hour["revenue"])
plt.title("E-commerce: Revenue por hora")
plt.xlabel("Hora")
plt.ylabel("Revenue")
plt.xticks(rotation=25)
plt.tight_layout()
plt.show()

# --- IoT: temperatura media por hora ---
iot_hour = (
    iot.assign(hour=pd.to_datetime(iot["ts"]).dt.floor("H"))
       .groupby("hour", as_index=False)["temperature"].mean()
)

plt.figure()
plt.plot(iot_hour["hour"], iot_hour["temperature"])
plt.title("IoT: Temperatura media por hora")
plt.xlabel("Hora")
plt.ylabel("Temperatura (°C)")
plt.xticks(rotation=25)
plt.tight_layout()
plt.show()

# --- Fraude: eventos flagged por hora ---
fraud_hour = (
    fraud.assign(hour=pd.to_datetime(fraud["ts"]).dt.floor("H"))
         .groupby("hour", as_index=False)["is_flagged"].sum()
         .rename(columns={"is_flagged":"flagged_count"})
)

plt.figure()
plt.plot(fraud_hour["hour"], fraud_hour["flagged_count"])
plt.title("Fraude: eventos flagged por hora")
plt.xlabel("Hora")
plt.ylabel("Flagged count")
plt.xticks(rotation=25)
plt.tight_layout()
plt.show()

ecom_hour.head(), iot_hour.head(), fraud_hour.head()

## 5) Actividad flash (completar por el equipo)

Rellenar aquí (3–5 minutos):

- **Caso elegido**: ☐ *e-commerce* ☐ *IoT* ☐ *fraude*  
- **V dominante hoy** y **V dominante si 2× tráfico** (3 líneas):  
  1) …  2) …  3) …  
- **1 decisión por capa** (Bronze/Silver/Gold):  
  - Bronze (ingesta, formato, particionado): …  
  - Silver (normalización, reglas): …  
  - Gold (KPI, ventana, SLA): …

> Opcional: añade conclusiones sobre CSV vs Parquet con los tamaños y tiempos observados.