
# NILM — Transformer con Atención Multiescala (Demo Didáctica)

Este cuaderno muestra un **modelo secuencia a secuencia** para **NILM** que emplea un **Transformer con atención multiescala**: una atención a escala **fina** (paso a paso) y otra a escala **gruesa** (después de *downsample*), cuyas salidas se **fusionan** para mejorar la captura de **transitorios rápidos** y **patrones de largo plazo**.

> **Objetivo**: Dado un segmento de la **señal agregada** (potencia total), predecir la **potencia del aparato objetivo** para cada instante.

**Contenido**:
1. Instalación rápida de dependencias (si aún no están instaladas).
2. Definición del modelo (PositionalEncoding, atención multiescala, Transformer).
3. Dataset sintético (para que puedas ejecutar sin archivos externos).
4. Entrenamiento mínimo y validación.
5. Visualización de predicciones.
6. Puntos de anclaje para usar datasets reales (UK-DALE/REFIT).


In [None]:

# ==== 1) Instalación rápida (opcional) ====
# Si ejecutas en un entorno limpio, descomenta estas líneas:
# %pip install torch --quiet
# %pip install matplotlib numpy scikit-learn --quiet
# (Si usas GPU con CUDA, revisa instrucciones específicas de PyTorch en https://pytorch.org/get-started/)


In [None]:

# ==== 2) Imports ====
import math, time, os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"
device


In [None]:

# ==== 3) Positional Encoding ====
class PositionalEncoding(nn.Module):
    """
    Agrega información de posición (tiempo) a los embeddings.
    Implementación clásica (seno/coseno).
    """
    def __init__(self, d_model: int, max_len: int = 10000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(pos * div)
        pe[:, 1::2] = torch.cos(pos * div)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x: [B, T, d_model]
        """
        T = x.size(1)
        return x + self.pe[:T, :].unsqueeze(0)


In [None]:

# ==== 4) Atención estándar (una escala) ====
class MultiHeadSelfAttention(nn.Module):
    """Self-Attention estándar de Transformer."""
    def __init__(self, d_model: int, num_heads: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model debe ser múltiplo de num_heads"
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_head = d_model // num_heads

        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
        self.proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        B, T, D = x.shape
        qkv = self.qkv(x)                    # [B, T, 3D]
        q, k, v = qkv.chunk(3, dim=-1)       # cada uno [B, T, D]

        def split_heads(t):
            return t.view(B, T, self.num_heads, self.d_head).transpose(1, 2)
        q, k, v = map(split_heads, (q, k, v))

        scores = (q @ k.transpose(-2, -1)) / math.sqrt(self.d_head)  # [B, h, T, T]
        if mask is not None:
            scores = scores.masked_fill(mask, float('-inf'))
        attn = scores.softmax(dim=-1)
        attn = self.dropout(attn)
        out = attn @ v  # [B, h, T, d_head]

        out = out.transpose(1, 2).contiguous().view(B, T, D)  # [B, T, D]
        out = self.proj(out)
        return out


In [None]:

# ==== 5) Bloque Multiescala ====
class MultiScaleBlock(nn.Module):
    """
    Atención a dos escalas:
      - fina: T pasos
      - gruesa: T/s pasos (después de AvgPool1d con factor s)
    Fusión: concat([fino, grueso_up]) -> proyección -> residual + MLP
    """
    def __init__(self, d_model=128, num_heads=4, mlp_ratio=4, dropout=0.1, scale_factor=4):
        super().__init__()
        self.scale = scale_factor

        self.norm_fine = nn.LayerNorm(d_model)
        self.attn_fine = MultiHeadSelfAttention(d_model, num_heads, dropout)

        self.norm_coarse = nn.LayerNorm(d_model)
        self.attn_coarse = MultiHeadSelfAttention(d_model, num_heads, dropout)

        self.fuse = nn.Linear(2 * d_model, d_model)

        hidden = d_model * mlp_ratio
        self.norm_ff = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, hidden),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, d_model),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        B, T, D = x.shape

        # Escala fina
        xf = self.norm_fine(x)
        yf = self.attn_fine(xf)  # [B, T, D]

        # Escala gruesa
        xc = self.norm_coarse(x).transpose(1, 2)  # [B, D, T]
        s = self.scale
        cut = T - (T % s)
        if cut == 0:
            yc_up = torch.zeros_like(yf)
        else:
            xc = xc[:, :, :cut]                        # [B, D, cut]
            pooled = F.avg_pool1d(xc, kernel_size=s, stride=s)  # [B, D, cut/s]
            pooled = pooled.transpose(1, 2)           # [B, Lc, D]
            yc = self.attn_coarse(pooled)             # [B, Lc, D]
            yc_up = yc.repeat_interleave(s, dim=1)    # [B, cut, D]
            if cut < T:
                pad = yc_up[:, -1:, :].expand(B, T - cut, D)
                yc_up = torch.cat([yc_up, pad], dim=1)

        # Fusión y residuales
        y = torch.cat([yf, yc_up], dim=-1)  # [B, T, 2D]
        y = self.fuse(y)                    # [B, T, D]
        x = x + y                           # residual 1

        z = self.norm_ff(x)
        z = self.ff(z)
        x = x + z                           # residual 2
        return x


In [None]:

# ==== 6) Modelo completo ====
class NILMTransformerMultiScale(nn.Module):
    """
    - Embedding lineal de entrada (W -> d_model)
    - Positional encoding
    - Varios bloques multiescala
    - Cabeza lineal para regresión de potencia (W)
    """
    def __init__(self, d_model=128, num_heads=4, depth=4, scale_factor=4, dropout=0.1):
        super().__init__()
        self.in_proj  = nn.Linear(1, d_model)
        self.pos_enc  = PositionalEncoding(d_model)
        self.blocks   = nn.ModuleList([
            MultiScaleBlock(d_model, num_heads, mlp_ratio=4, dropout=dropout, scale_factor=scale_factor)
            for _ in range(depth)
        ])
        self.norm_out = nn.LayerNorm(d_model)
        self.head     = nn.Linear(d_model, 1)

    def forward(self, x):
        """
        x: [B, T, 1]  (potencia agregada)
        return: [B, T, 1] (potencia del aparato)
        """
        x = self.in_proj(x)
        x = self.pos_enc(x)
        for blk in self.blocks:
            x = blk(x)
        x = self.norm_out(x)
        y = self.head(x)
        return y


In [None]:

# ==== 7) Dataset sintético ====
class DummyDataset(torch.utils.data.Dataset):
    """
    Genera pares (x_agregada, y_aparato) artificiales para probar el modelo.
    - y: pulsos aleatorios (aparato)
    - x: y + tendencia + ruido + otros "aparatos"
    """
    def __init__(self, T=512, N=1024, seed=0):
        super().__init__()
        g = torch.Generator().manual_seed(seed)
        self.X, self.Y = [], []
        for _ in range(N):
            y = torch.zeros(T)
            for _ in range(torch.randint(1, 5, (1,), generator=g)):
                amp = torch.randint(500, 1500, (1,), generator=g).float()
                start = torch.randint(0, T-20, (1,), generator=g).item()
                width = torch.randint(5, 30, (1,), generator=g).item()
                y[start:start+width] += amp
            trend = torch.linspace(0, 200, T)
            noise = torch.randn(T) * 50
            others = F.relu(torch.sin(torch.linspace(0, 20, T))*200)
            x = y + trend + noise + others
            self.X.append(x.unsqueeze(-1))
            self.Y.append(y.unsqueeze(-1))
        self.X = torch.stack(self.X)  # [N, T, 1]
        self.Y = torch.stack(self.Y)  # [N, T, 1]

    def __len__(self): return self.X.size(0)
    def __getitem__(self, i): return self.X[i], self.Y[i]


In [None]:

# ==== 8) Entrenamiento mínimo ====
def train_demo(epochs=5, batch_size=16, T=512, d_model=128, depth=4, scale_factor=4, lr=1e-3):
    ds_train = DummyDataset(T=T, N=512, seed=0)
    ds_val   = DummyDataset(T=T, N=128, seed=1)
    train_loader = torch.utils.data.DataLoader(ds_train, batch_size=batch_size, shuffle=True)
    val_loader   = torch.utils.data.DataLoader(ds_val,   batch_size=batch_size)

    model = NILMTransformerMultiScale(d_model=d_model, num_heads=4, depth=depth,
                                      scale_factor=scale_factor, dropout=0.1).to(device)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.L1Loss()

    def evaluate():
        model.eval()
        total = 0.0
        with torch.no_grad():
            for xb, yb in val_loader:
                xb, yb = xb.to(device), yb.to(device)
                pred = model(xb)
                loss = loss_fn(pred, yb)
                total += loss.item() * xb.size(0)
        return total / len(ds_val)

    for ep in range(1, epochs+1):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            loss = loss_fn(pred, yb)
            opt.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            opt.step()
        val_mae = evaluate()
        print(f"Epoch {ep:02d} | Val MAE: {val_mae:.2f}")
    return model

model = train_demo(epochs=5)


In [None]:

# ==== 9) Visualización de predicciones ====
ds_vis = DummyDataset(T=512, N=4, seed=2)
xv, yv = ds_vis[0]
model.eval()
with torch.no_grad():
    pv = model(xv.unsqueeze(0).to(device)).cpu().squeeze(0).squeeze(-1)

plt.figure(figsize=(12,4))
plt.plot(xv.squeeze(-1).numpy(), label="Agregada (entrada)")
plt.plot(yv.squeeze(-1).numpy(), label="Aparato (verdad)", linewidth=2)
plt.plot(pv.numpy(), label="Predicción (modelo)", linestyle="--")
plt.legend()
plt.title("NILM — Predicción de aparato (demo sintética)")
plt.xlabel("Tiempo (pasos)")
plt.ylabel("Potencia [W]")
plt.show()



## 10) Cómo conectarlo a un dataset real (UK-DALE / REFIT)

1. **Carga de datos**: convierte la señal agregada y la del aparato objetivo a un `numpy.ndarray` o `torch.Tensor` de forma \[T, 1], re-muestreado a **1 Hz** (o 6 s) de forma consistente.
2. **Ventaneo**: corta en ventanas de longitud `T` (p. ej. 3600) con solape (50%). Construye un `Dataset` como `DummyDataset` pero leyendo de tus arrays.
3. **Normalización**: escala potencias (p. ej., divide por 1000 para kW), y guarda los factores para des-escala en la salida.
4. **Entrenamiento**: crea un `DataLoader` y llama a `train_demo(...)` adaptado (pasando tu dataset).
5. **Métricas**: añade cálculo de MAE, RMSE, SAE/NDE y F1 (umbralizando On/Off). Puedes crear una celda adicional para métricas por aparato.
