In [3]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model=512, num_heads=8):
        super().__init__()
        assert d_model % num_heads == 0, 'El tamaño de embedding debe ser divisible entre num_heads.'
        
        self.d_v = d_model // num_heads
        self.d_k = self.d_v
        self.num_heads = num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # 1) Proyectamos Q, K, V
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 2) Calculamos los scores de atención
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention = F.softmax(scores, dim=-1)
        
        # 3) Mezclamos valores
        weighted_values = torch.matmul(attention, V)
        
        # 4) Reorganizamos y aplicamos la proyección de salida
        weighted_values = weighted_values.transpose(1, 2).contiguous()
        weighted_values = weighted_values.view(batch_size, -1, self.num_heads * self.d_k)
        out = self.W_o(weighted_values)
        
        return out, attention

class PositionFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

class EncoderSubLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = PositionFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        # 1) Self-attention
        attn_out, _ = self.self_attn(x, x, x, mask)
        x = x + self.dropout1(attn_out)  # skip connection
        x = self.norm1(x)
        
        # 2) Feed-forward
        ffn_out = self.ffn(x)
        x = x + self.dropout2(ffn_out)  # skip connection
        x = self.norm2(x)
        
        return x

class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            EncoderSubLayer(d_model, num_heads, d_ff, dropout) 
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)
        
    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class PositionalEmbedding(nn.Module):
    def __init__(self, d_model, max_seq_len=512):
        super().__init__()
        
        pos_embed = torch.zeros(max_seq_len, d_model)
        token_pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pos_embed[:, 0::2] = torch.sin(token_pos * div_term)
        pos_embed[:, 1::2] = torch.cos(token_pos * div_term)
        
        pos_embed = pos_embed.unsqueeze(0)  # (1, max_seq_len, d_model)
        self.register_buffer('pos_embed', pos_embed)
        
    def forward(self, x):
        """
        x: (batch_size, seq_len, d_model)
        """
        seq_len = x.size(1)
        return x + self.pos_embed[:, :seq_len, :]

In [19]:
class TransformerEncoderClassifierWithCLS(nn.Module):
    """
    Agrega un token [CLS] entrenable al inicio de la secuencia.
    La salida de la posición 0 (ese [CLS]) se usa para la clasificación final.
    """
    def __init__(self, d_model, num_heads, d_ff, num_layers, input_dim, num_classes, max_seq_len=512, dropout=0.1):
        super().__init__()
        
        # Proyectamos la entrada a d_model (ej. 80 -> 64)
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # Token CLS entrenable
        self.cls_token = nn.Parameter(torch.randn(1, d_model))
        
        # Embedding posicional
        self.pos_embedding = PositionalEmbedding(d_model, max_seq_len)
        
        # Encoder
        self.encoder = Encoder(d_model, num_heads, d_ff, num_layers, dropout)
        
        # Capa final de clasificación
        self.classifier = nn.Linear(d_model, num_classes)
        
        self.d_model = d_model
        
    def forward(self, x, mask=None):
        """
        x: (batch_size, seq_len, input_dim)
        """
        batch_size = x.size(0)
        
        # 1) Proyectamos la entrada a d_model
        x = self.input_projection(x) # (B, seq_len, d_model)
        
        # 2) Construimos un batch de [CLS] tokens -> (B, 1, d_model)
        cls_tokens = self.cls_token.unsqueeze(0).expand(batch_size, -1, -1)
        
        # 3) Concatenamos el CLS al inicio de la secuencia
        x = torch.cat([cls_tokens, x], dim=1) # (B, seq_len+1, d_model)
        
        # 4) Sumamos el embedding posicional
        x = self.pos_embedding(x) # (B, seq_len+1, d_model)
        
        # 5) Pasamos por el encoder
        x = self.encoder(x, mask) # (B, seq_len+1, d_model)
        
        # 6) Tomamos la posición 0 (el token CLS)
        cls_vector = x[:, 0, :] # (B, d_model)
        
        # 7) Clasificación
        logits = self.classifier(cls_vector) # (B, num_classes)
        
        return logits

In [20]:
class SyntheticTimeSeriesDataset(Dataset):
    """
    Genera datos sintéticos para clasificación con series temporales.
    Cada muestra es (seq_len, input_dim).
    """
    def __init__(self, n_samples=2000, seq_len=50, input_dim=80, num_classes=2):
        super().__init__()
        self.seq_len = seq_len
        self.input_dim = input_dim
        self.num_classes = num_classes
        
        # Generamos X aleatorios: (n_samples, seq_len, input_dim)
        self.X = torch.randn(n_samples, seq_len, input_dim)
        
        # Generamos etiquetas aleatorias (0..num_classes-1)
        self.Y = torch.randint(0, num_classes, (n_samples,))
        
    def __len__(self):
        return self.X.size(0)
    
    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

In [21]:
# Parámetros del dataset sintético
n_samples  = 2000
seq_len    = 50
input_dim  = 80   
num_classes= 2
    
dataset = SyntheticTimeSeries(
    n_samples=n_samples,
    seq_len=seq_len,
    input_dim=input_dim,
    num_classes=num_classes
)

In [22]:
# Obtenemos la primera instancia (x=features, y=label)
first_x, first_y = dataset[0]

# Imprimimos en consola la forma y los datos
print("Shape de la primera instancia:", first_x.shape)
print("Datos de la primera instancia (X):\n", first_x)
print("Etiqueta (Y):", first_y)

Shape de la primera instancia: torch.Size([50, 80])
Datos de la primera instancia (X):
 tensor([[-1.4244, -0.9550,  0.1341,  ..., -0.5628,  0.6911,  1.0790],
        [ 0.3659,  0.2115, -0.3337,  ...,  1.6594, -0.1341, -0.2431],
        [-1.4377,  1.5517, -0.6442,  ..., -0.2413,  1.1100,  0.2479],
        ...,
        [ 1.0732,  0.4124,  0.6573,  ...,  0.5138, -0.6348,  0.8971],
        [ 1.1215,  0.5846,  1.0779,  ..., -1.5125, -0.8529,  2.1013],
        [ 1.1741,  0.8247, -0.2051,  ...,  0.7012,  0.1913,  0.2018]])
Etiqueta (Y): tensor(0)


In [23]:
def train_one_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss = 0.0
    
    for batch_x, batch_y in dataloader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)
        
        optimizer.zero_grad()
        logits = model(batch_x)  # (batch_size, num_classes)
        loss = criterion(logits, batch_y)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataloader)

def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch_x, batch_y in dataloader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)
            
            logits = model(batch_x)
            loss = criterion(logits, batch_y)
            total_loss += loss.item()
            
            # Precisión
            preds = torch.argmax(logits, dim=1)
            correct += (preds == batch_y).sum().item()
            total += batch_y.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy

In [24]:
if __name__ == "__main__":
    SEED = 42
    torch.manual_seed(SEED)
    if device.type == 'cuda':
        torch.cuda.manual_seed(SEED)
    
    # Parámetros del dataset sintético
    n_samples  = 2000
    seq_len    = 50
    input_dim  = 80   # Cada paso tiene 80 features
    num_classes= 2    # Clasificación binaria (0 o 1)

    # Parámetros del Transformer
    d_model   = 64
    num_heads = 4
    d_ff      = 128
    num_layers= 2
    max_seq_len = seq_len + 1  # sumamos 1 porque ahora hay CLS token
    
    # Hiperparámetros de entrenamiento
    batch_size = 32
    lr         = 1e-3
    epochs     = 5
    
    # 1) Dataset y DataLoader
    dataset = SyntheticTimeSeriesDataset(
        n_samples=n_samples,
        seq_len=seq_len,
        input_dim=input_dim,
        num_classes=num_classes
    )
    train_size = int(0.8 * len(dataset))
    val_size   = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)
    
    # 2) Modelo con CLS real
    model = TransformerEncoderClassifierWithCLS(
        d_model=d_model,
        num_heads=num_heads,
        d_ff=d_ff,
        num_layers=num_layers,
        input_dim=input_dim,
        num_classes=num_classes,
        max_seq_len=max_seq_len, # recordatorio: 1 slot extra para el CLS
        dropout=0.1
    ).to(device)
    
    # 3) Definimos pérdida y optimizador
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # 4) Entrenamiento
    for epoch in range(1, epochs+1):
        train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        val_loss, val_acc = evaluate(model, val_loader, criterion)
        
        print(f"Epoch [{epoch}/{epochs}] "
              f"Train Loss: {train_loss:.4f} | "
              f"Val Loss: {val_loss:.4f} | "
              f"Val Acc: {val_acc*100:.2f}%")

    print("¡Entrenamiento finalizado!")

Epoch [1/5] Train Loss: 0.7227 | Val Loss: 0.7111 | Val Acc: 46.25%
Epoch [2/5] Train Loss: 0.7021 | Val Loss: 0.6906 | Val Acc: 53.50%
Epoch [3/5] Train Loss: 0.7002 | Val Loss: 0.6917 | Val Acc: 53.75%
Epoch [4/5] Train Loss: 0.6930 | Val Loss: 0.7020 | Val Acc: 46.50%
Epoch [5/5] Train Loss: 0.6780 | Val Loss: 0.7077 | Val Acc: 54.00%
¡Entrenamiento finalizado!
