In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import copy

# Tratamiento de datos

In [None]:
# Verificar si hay GPU disponible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Cargar datos desde archivos .npy
X = np.load("InGameDataOrdenada\Space4x3Time5.npy")
y = np.load("InGameDataOrdenada\LabelsSpace4x3Time5.npy")

# Convertir los datos a tensores de PyTorch
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)

# Preprocesamiento: Normalización de los datos
scaler = MinMaxScaler()
n_samples, sequence_length, n_features = X.shape
X_flat = X.view(-1, n_features)
X_flat = torch.tensor(scaler.fit_transform(X_flat), dtype=torch.float32)
X = X_flat.view(n_samples, sequence_length, n_features)

# División de los datos en 65% entrenamiento, 15% validación y 20% prueba
train_size = int(0.65 * n_samples)
val_size = int(0.15 * n_samples)
test_size = n_samples - train_size - val_size

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# Mover los datos a la GPU si está disponible
X_train, X_val, X_test = X_train.to(device), X_val.to(device), X_test.to(device)
y_train, y_val, y_test = y_train.to(device), y_val.to(device), y_test.to(device)


# Simple+L2

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads=4, num_layers=2, hidden_dim=128, dropout=0.1, sequence_length=18):
        super(TransformerClassifier, self).__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.pos_encoder = nn.Parameter(torch.zeros(1, sequence_length, hidden_dim))
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(hidden_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.embedding(x)
        x = x + self.pos_encoder[:, :x.size(1), :]
        x = self.transformer_encoder(x)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc_out(x)
        return x

In [None]:
# Configuración de los hiperparámetros
input_dim = n_features
num_classes = len(torch.unique(y))
num_heads = 8
num_layers = 4
hidden_dim = 256
dropout = 0.1
sequence_length = 18

# Crear el modelo con los nuevos hiperparámetros
model = TransformerClassifier(input_dim, num_classes, num_heads, num_layers, hidden_dim, dropout, sequence_length).to(device)

# Hiperparámetros de entrenamiento
learning_rate = 0.0001
batch_size = 8
num_epochs = 1000
patience = 50  # Para early stopping
weight_decay = 0.01

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay = weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True)

# Early stopping y guardado del mejor modelo
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
early_stopping_counter = 0

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Evaluar en el conjunto de validación
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in zip(X_val, y_val):
            inputs, labels = inputs.unsqueeze(0), labels.unsqueeze(0)
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_acc = correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(X_val)}, Val Acc: {val_acc * 100:.2f}%')

    # Guardar el mejor modelo
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
    
    # Adjust learning rate based on validation loss
    scheduler.step(val_loss)

    # Early stopping
    if early_stopping_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

# Cargar el mejor modelo
model.load_state_dict(best_model_wts)

torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'models/ModeloSimple.pth')


In [None]:
# Evaluación del modelo en función de la longitud de la secuencia
model.eval()
accuracies = []

with torch.no_grad():
    for seq_len in range(1, sequence_length + 1):
        test_outputs = model(X_test[:, :seq_len, :])
        _, predicted = torch.max(test_outputs, 1)
        accuracy = (predicted == y_test).sum().item() / len(y_test)
        accuracies.append(accuracy)
        print(f'Sequence Length: {seq_len}, Accuracy: {accuracy * 100:.2f}%')


# Padding + Regularizador L2

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, X, y, max_length, incomplete_prob=0.5):
        self.X = X
        self.y = y
        self.max_length = max_length
        self.incomplete_prob = incomplete_prob
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        input_seq = self.X[idx]
        target_seq = self.y[idx]
        seq_length = input_seq.size(0)
        
        if torch.rand(1).item() < self.incomplete_prob:
            seq_length = torch.randint(1, input_seq.size(0), (1,)).item()
        
        input_seq = input_seq[:seq_length]
        target_seq = target_seq
        
        # Rellenar con padding si es necesario
        padded_input_seq = torch.zeros(self.max_length, input_seq.size(1))
        padded_input_seq[:seq_length] = input_seq
        
        return padded_input_seq, target_seq, seq_length

class CustomLoss(nn.Module):
    def __init__(self, base_loss_fn, incomplete_weight=1.0):
        super(CustomLoss, self).__init__()
        self.base_loss_fn = base_loss_fn
        self.incomplete_weight = incomplete_weight
    
    def forward(self, predictions, targets, sequence_lengths, max_length):
        base_loss = self.base_loss_fn(predictions, targets)
        
        incomplete_mask = (sequence_lengths < max_length).float()
        
        regularization_loss = incomplete_mask.mean() * self.incomplete_weight
        
        total_loss = base_loss + regularization_loss
        
        return total_loss

In [None]:
# Configuración de los hiperparámetros
input_dim = n_features
num_classes = len(torch.unique(y))
num_heads = 8
num_layers = 4
hidden_dim = 256
dropout = 0.1
sequence_length = 18

# Crear el modelo con los nuevos hiperparámetros
model = TransformerClassifier(input_dim, num_classes, num_heads, num_layers, hidden_dim, dropout, sequence_length).to(device)

# Hiperparámetros de entrenamiento
learning_rate = 0.0001
batch_size = 8
num_epochs = 1000
patience = 50  # Para early stopping
incomplete_weight = 1.25
weight_decay = 0.01
incomplete_prob = 0.75

criterion = CustomLoss(nn.CrossEntropyLoss(), incomplete_weight=incomplete_weight)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True)

# Early stopping y guardado del mejor modelo
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
early_stopping_counter = 0

train_dataset = CustomDataset(X_train, y_train, max_length=sequence_length, incomplete_prob=incomplete_prob)
val_dataset = CustomDataset(X_val, y_val, max_length=sequence_length, incomplete_prob=incomplete_prob)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1, shuffle=False)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels, seq_lengths in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels, seq_lengths, sequence_length)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Evaluar en el conjunto de validación
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels, seq_lengths in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels, seq_lengths, sequence_length)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_acc = correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(val_loader)}, Val Acc: {val_acc * 100:.2f}%')

    # Guardar el mejor modelo
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
    
    # Ajustar la tasa de aprendizaje basada en la pérdida de validación
    scheduler.step(val_loss)

    # Early stopping
    if early_stopping_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

# Cargar el mejor modelo
model.load_state_dict(best_model_wts)

torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'models/ModeloPaddingL2.pth')


In [None]:
# Evaluación del modelo en función de la longitud de la secuencia
model.eval()
accuracies = []

with torch.no_grad():
    for seq_len in range(1, sequence_length + 1):
        test_outputs = model(X_test[:, :seq_len, :])
        _, predicted = torch.max(test_outputs, 1)
        accuracy = (predicted == y_test).sum().item() / len(y_test)
        accuracies.append(accuracy)
        print(f'Sequence Length: {seq_len}, Accuracy: {accuracy * 100:.2f}%')

# Regularizador LSTM + L2

In [None]:
class TransformerLSTMClassifier(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads=4, num_layers=2, hidden_dim=128, lstm_hidden_dim=64, dropout=0.1, sequence_length=18):
        super(TransformerLSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, lstm_hidden_dim, batch_first=True, bidirectional=True)
        self.embedding = nn.Linear(lstm_hidden_dim * 2, hidden_dim)
        self.pos_encoder = nn.Parameter(torch.zeros(1, sequence_length, hidden_dim))
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(hidden_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, src_key_padding_mask=None):
        x, _ = self.lstm(x)
        x = self.embedding(x)
        x = x + self.pos_encoder[:, :x.size(1), :]
        x = self.transformer_encoder(x, src_key_padding_mask=src_key_padding_mask)
        x = self.dropout(x.mean(dim=1))  # Global average pooling con dropout
        x = self.fc_out(x)
        return x


In [None]:
# Configuración de los hiperparámetros
input_dim = n_features
num_classes = len(torch.unique(y))
num_heads = 8
num_layers = 4
hidden_dim = 256
lstm_hidden_dim = 128
dropout = 0.1
sequence_length = 18

# Crear el modelo con los nuevos hiperparámetros
model = TransformerLSTMClassifier(input_dim, num_classes, num_heads, num_layers, hidden_dim, lstm_hidden_dim, dropout, sequence_length).to(device)

# Hiperparámetros de entrenamiento
learning_rate = 0.0001
batch_size = 8
num_epochs = 1000
patience = 50  # Para early stopping
weight_decay = 0.01  # Regularización L2

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True)

# Early stopping y guardado del mejor modelo
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
early_stopping_counter = 0

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Evaluar en el conjunto de validación
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in zip(X_val, y_val):
            inputs, labels = inputs.unsqueeze(0), labels.unsqueeze(0)
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_acc = correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(X_val)}, Val Acc: {val_acc * 100:.2f}%')

    # Guardar el mejor modelo
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
    
    # Adjust learning rate based on validation loss
    scheduler.step(val_loss)

    # Early stopping
    if early_stopping_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

# Cargar el mejor modelo
model.load_state_dict(best_model_wts)

torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'models/ModeloLTSML2.pth')

In [41]:
# Evaluación del modelo en función de la longitud de la secuencia
model.eval()
accuracies = []

with torch.no_grad():
    for seq_len in range(1, sequence_length + 1):
        test_outputs = model(X_test[:, :seq_len, :])
        _, predicted = torch.max(test_outputs, 1)
        accuracy = (predicted == y_test).sum().item() / len(y_test)
        accuracies.append(accuracy)
        print(f'Sequence Length: {seq_len}, Accuracy: {accuracy * 100:.2f}%')

Sequence Length: 1, Accuracy: 28.99%
Sequence Length: 2, Accuracy: 34.10%
Sequence Length: 3, Accuracy: 37.43%
Sequence Length: 4, Accuracy: 41.44%
Sequence Length: 5, Accuracy: 44.86%
Sequence Length: 6, Accuracy: 48.26%
Sequence Length: 7, Accuracy: 51.65%
Sequence Length: 8, Accuracy: 55.03%
Sequence Length: 9, Accuracy: 57.87%
Sequence Length: 10, Accuracy: 61.81%
Sequence Length: 11, Accuracy: 65.61%
Sequence Length: 12, Accuracy: 68.78%
Sequence Length: 13, Accuracy: 71.98%
Sequence Length: 14, Accuracy: 75.83%
Sequence Length: 15, Accuracy: 79.92%
Sequence Length: 16, Accuracy: 83.59%
Sequence Length: 17, Accuracy: 88.99%
Sequence Length: 18, Accuracy: 99.08%


# Regularización LSTM + L2 + Padding

In [42]:
# Configuración de los hiperparámetros
input_dim = n_features
num_classes = len(torch.unique(y))
num_heads = 8
num_layers = 4
hidden_dim = 256
lstm_hidden_dim = 128
dropout = 0.1
sequence_length = 18

# Crear el modelo con los nuevos hiperparámetros
model = TransformerLSTMClassifier(input_dim, num_classes, num_heads, num_layers, hidden_dim, lstm_hidden_dim, dropout, sequence_length).to(device)

# Hiperparámetros de entrenamiento
learning_rate = 0.0001
batch_size = 8
num_epochs = 1000
patience = 50   # Para early stopping
incomplete_weight = 1.25
weight_decay = 0.01
incomplete_prob = 0.75

criterion = CustomLoss(nn.CrossEntropyLoss(), incomplete_weight=incomplete_weight)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay = weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, verbose=True)

# Early stopping y guardado del mejor modelo
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
early_stopping_counter = 0

train_dataset = CustomDataset(X_train, y_train, max_length=sequence_length, incomplete_prob=incomplete_prob)
val_dataset = CustomDataset(X_val, y_val, max_length=sequence_length, incomplete_prob=incomplete_prob)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1, shuffle=False)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels, seq_lengths in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels, seq_lengths, sequence_length)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Evaluar en el conjunto de validación
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels, seq_lengths in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels, seq_lengths, sequence_length)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_acc = correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(val_loader)}, Val Acc: {val_acc * 100:.2f}%')

    # Guardar el mejor modelo
    if val_acc > best_acc:
        best_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1
    
    # Ajustar la tasa de aprendizaje basada en la pérdida de validación
    scheduler.step(val_loss)

    # Early stopping
    if early_stopping_counter >= patience:
        print(f'Early stopping at epoch {epoch+1}')
        break

# Cargar el mejor modelo
model.load_state_dict(best_model_wts)
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'models/ModeloPaddingLTSML2.pth')

Epoch 1/1000, Loss: 1.9678275684900892, Val Loss: 1.9036529521681245, Val Acc: 50.29%
Epoch 2/1000, Loss: 1.7566033567087802, Val Loss: 1.7085897440641014, Val Acc: 64.64%
Epoch 3/1000, Loss: 1.664201238776178, Val Loss: 1.7332869194947347, Val Acc: 62.32%
Epoch 4/1000, Loss: 1.6566128669187963, Val Loss: 1.6286559091739443, Val Acc: 68.09%
Epoch 5/1000, Loss: 1.6363911732164447, Val Loss: 1.7317043379250008, Val Acc: 63.25%
Epoch 6/1000, Loss: 1.6377385176119712, Val Loss: 1.605908765009943, Val Acc: 69.35%
Epoch 7/1000, Loss: 1.6288631382411207, Val Loss: 1.6090242422455086, Val Acc: 69.08%
Epoch 8/1000, Loss: 1.6222143826887547, Val Loss: 1.6460261438942214, Val Acc: 68.00%
Epoch 9/1000, Loss: 1.6169576401195367, Val Loss: 1.6501141067225786, Val Acc: 68.39%
Epoch 10/1000, Loss: 1.5973092316590518, Val Loss: 1.6163308498354843, Val Acc: 69.08%
Epoch 11/1000, Loss: 1.6068216298425626, Val Loss: 1.6298259108114, Val Acc: 68.93%
Epoch 12/1000, Loss: 1.5847147391938767, Val Loss: 1.6380

In [43]:
# Evaluación del modelo en función de la longitud de la secuencia
model.eval()
accuracies = []

with torch.no_grad():
    for seq_len in range(1, sequence_length + 1):
        test_outputs = model(X_test[:, :seq_len, :])
        _, predicted = torch.max(test_outputs, 1)
        accuracy = (predicted == y_test).sum().item() / len(y_test)
        accuracies.append(accuracy)
        print(f'Sequence Length: {seq_len}, Accuracy: {accuracy * 100:.2f}%')

Sequence Length: 1, Accuracy: 46.07%
Sequence Length: 2, Accuracy: 46.30%
Sequence Length: 3, Accuracy: 47.36%
Sequence Length: 4, Accuracy: 48.10%
Sequence Length: 5, Accuracy: 49.29%
Sequence Length: 6, Accuracy: 48.84%
Sequence Length: 7, Accuracy: 50.78%
Sequence Length: 8, Accuracy: 53.14%
Sequence Length: 9, Accuracy: 56.97%
Sequence Length: 10, Accuracy: 60.84%
Sequence Length: 11, Accuracy: 64.37%
Sequence Length: 12, Accuracy: 67.81%
Sequence Length: 13, Accuracy: 70.94%
Sequence Length: 14, Accuracy: 74.75%
Sequence Length: 15, Accuracy: 78.82%
Sequence Length: 16, Accuracy: 83.05%
Sequence Length: 17, Accuracy: 88.50%
Sequence Length: 18, Accuracy: 98.67%
