In [None]:
import json
import os
from sklearn.model_selection import KFold
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np

# Configuración del dispositivo (CPU o GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Diccionario para mapear posiciones a índices
position_to_index = {
    '5050_guard': 0,
    'back1': 1,
    'back2': 2,
    'closed_guard1': 3,
    'closed_guard2': 4,
    'half_guard1': 5,
    'half_guard2': 6,
    'mount1': 7,
    'mount2': 8,
    'open_guard1': 9,
    'open_guard2': 10,
    'side_control1': 11,
    'side_control2': 12,
    'standing': 13,
    'takedown1': 14,
    'takedown2': 15,
    'turtle1': 16,
    'turtle2': 17
}

In [None]:

def load_annotations(file_path):
    """
    Carga las anotaciones preprocesadas desde un archivo JSON.
    """
    with open(file_path, 'r') as f:
        return json.load(f)


In [None]:
def remove_background(image):
    """
    Quita el fondo de una imagen utilizando segmentación de contornos con OpenCV.
    """
    # Convertir imagen PIL a array de NumPy
    image_np = np.array(image)

    # Convertir a escala de grises
    gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY)

    # Aplicar un umbral para segmentar la imagen
    _, thresh = cv2.threshold(gray, 50, 255, cv2.THRESH_BINARY)

    # Encontrar contornos
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Crear una máscara vacía
    mask = np.zeros_like(gray)

    # Dibujar el contorno más grande en la máscara
    if contours:
        largest_contour = max(contours, key=cv2.contourArea)
        cv2.drawContours(mask, [largest_contour], -1, (255), thickness=cv2.FILLED)

    # Aplicar la máscara a la imagen original
    result = cv2.bitwise_and(image_np, image_np, mask=mask)

    # Convertir de nuevo a imagen PIL
    return Image.fromarray(result)

In [None]:
class SequenceDataset(Dataset):
    def __init__(self, annotations, seq_length=10, position_to_index=None):
        """
        Inicializa el dataset para análisis de secuencias.
        """
        self.annotations = annotations
        self.seq_length = seq_length
        self.position_to_index = position_to_index

    def __len__(self):
        """
        Devuelve el número de secuencias disponibles.
        """
        return len(self.annotations) - self.seq_length

    def __getitem__(self, idx):
        """
        Devuelve una secuencia de datos y su etiqueta correspondiente.
        """
        sequence = [
            self.annotations[idx + i]['Pose1'] + self.annotations[idx + i]['Pose2']
            for i in range(self.seq_length)
        ]
        sequence = torch.tensor(sequence).view(self.seq_length, -1)
        label_str = self.annotations[idx + self.seq_length - 1]['Position']
        label = self.position_to_index[label_str]
        return sequence, torch.tensor(label).long()

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2):
        """
        Inicializa el modelo LSTM.
        """
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        """
        Define la pasada hacia adelante del modelo.
        """
        lstm_out, _ = self.lstm(x)
        final_output = lstm_out[:, -1, :]
        return self.fc(final_output)

In [None]:
def train_and_validate_lstm(train_loader, val_loader, model, criterion, optimizer, num_epochs=25, patience=5):
    """
    Entrena y valida el modelo LSTM, aplicando early stopping.
    """
    best_val_loss = float('inf')
    early_stopping_counter = 0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}')

        # Validación
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item() * inputs.size(0)

        val_loss /= len(val_loader.dataset)
        print(f'Validation Loss: {val_loss}')

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stopping_counter = 0
            torch.save(model.state_dict(), 'best_lstm_model.pth')
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                print("Early stopping")
                break

        torch.cuda.empty_cache()

In [None]:
def cross_validation_kfold_lstm(dataset, k_folds=5, batch_size=16, num_epochs=25, patience=5):
    """
    Realiza validación cruzada con K-Fold para el modelo LSTM.
    """
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        train_subset = torch.utils.data.Subset(dataset, train_idx)
        val_subset = torch.utils.data.Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=batch_size)

        input_size = 102  # 34 puntos clave x 3 coordenadas
        hidden_size = 128
        output_size = len(position_to_index)

        model = LSTMModel(input_size, hidden_size, output_size).to(device)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        train_and_validate_lstm(train_loader, val_loader, model, criterion, optimizer, num_epochs=num_epochs, patience=patience)

In [None]:
def main():
    """
    Función principal para ejecutar la validación cruzada con el modelo LSTM.
    """
    annotations_path = '../mnt/V3/annotations/annotations_preprocessed.json'
    annotations = load_annotations(annotations_path)

    seq_length = 10
    dataset = SequenceDataset(annotations, seq_length=seq_length, position_to_index=position_to_index)

    cross_validation_kfold_lstm(dataset, k_folds=5, batch_size=16, num_epochs=25, patience=5)

if __name__ == '__main__':
    main()


Fold 1/5
Epoch 1/25, Loss: 0.844924037750036
Validation Loss: 0.5425993465428843
Epoch 2/25, Loss: 0.34174617702567883
Validation Loss: 0.23433765280373106
Epoch 3/25, Loss: 0.2294065505723722
Validation Loss: 0.17555575729325423
Epoch 4/25, Loss: 0.17973897410981568
Validation Loss: 0.14013273709808005
Epoch 5/25, Loss: 0.13197529553369705
Validation Loss: 0.08146314235927393
Epoch 6/25, Loss: 0.11842318309123409
Validation Loss: 0.11764338512733388
Epoch 7/25, Loss: 0.10020242592775426
Validation Loss: 0.10198479420239952
Epoch 8/25, Loss: 0.08215252478832014
Validation Loss: 0.07265839341029996
Epoch 9/25, Loss: 0.07572244716049245
Validation Loss: 0.060198580578222464
Epoch 10/25, Loss: 0.07524538143084394
Validation Loss: 0.03975790597522223
Epoch 11/25, Loss: 0.058114879156189826
Validation Loss: 0.04195927360624435
Epoch 12/25, Loss: 0.05915015982120425
Validation Loss: 0.10217493696277344
Epoch 13/25, Loss: 0.05071203018317967
Validation Loss: 0.028703075885936718
Epoch 14/25, 