In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.model_selection import train_test_split
import numpy as np
import kerastuner as kt
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Define una función para cargar los datos
def load_data():
    # Cargar datos desde un archivo JSON
    df = pd.read_json('../data/processed/realData.json', orient='records', lines=True)
    
    # Convertir las cadenas en listas de números (si es necesario)
    df['cop_x'] = df['cop_x'].apply(lambda x: np.array(eval(x)) if isinstance(x, str) else np.array(x))
    df['cop_y'] = df['cop_y'].apply(lambda x: np.array(eval(x)) if isinstance(x, str) else np.array(x))

    # Normalización de las series de tiempo
    scaler_x = StandardScaler()
    scaler_y = StandardScaler()

    X_cop_x = np.array(df['cop_x'].tolist())
    X_cop_y = np.array(df['cop_y'].tolist())

    X_cop_x = scaler_x.fit_transform(X_cop_x)
    X_cop_y = scaler_y.fit_transform(X_cop_y)

    X = np.stack((X_cop_x, X_cop_y), axis=-1)

    # Mapeo de las etiquetas a números (asumiendo clases: 'Healthy', 'Diabetic', 'Neuropathic')
    label_map = {'Healthy': 0, 'Diabetic': 1, 'Neuropathic': 2}
    y = df['class'].map(label_map).values

    # Crear etiquetas para nivel 1 y nivel 2
    level1_labels = (y > 0).astype(int)  # 0: Healthy, 1: Diabetic (incluye Neuropathic)
    level2_labels = (y == 2).astype(int)  # 0: Diabetic sin neuropatía, 1: Neuropathic

    # Separar los datos en entrenamiento y validación
    X_train, X_val, level1_labels_train, level1_labels_val, level2_labels_train, level2_labels_val = train_test_split(
        X, level1_labels, level2_labels, test_size=0.2, random_state=42)
    
    return X_train, X_val, level1_labels_train, level1_labels_val, level2_labels_train, level2_labels_val

# Clase Dataset personalizado para PyTorch
class CustomDataset(Dataset):
    def __init__(self, X, level1_labels, level2_labels):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.level1_labels = torch.tensor(level1_labels, dtype=torch.long)
        self.level2_labels = torch.tensor(level2_labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.level1_labels[idx], self.level2_labels[idx]

# Define la función para construir el modelo ResNet en PyTorch
class CustomResNet(nn.Module):
    def __init__(self, num_blocks, filters_list, kernel_size_list):
        super(CustomResNet, self).__init__()
        self.in_channels = 8
        
        # Primera secuencia de capas convolucionales
        self.conv1 = nn.Conv1d(self.in_channels, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        
        # Segunda secuencia de bloques residuales
        self.layers = self._make_layers(num_blocks, filters_list, kernel_size_list)
        
        # Capa de Global Average Pooling (GAP)
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        
        # Capa softmax para clasificación
        self.softmax = nn.Softmax(dim=1)
        
        # Capas completamente conectadas para los dos niveles
        self.fc1 = nn.Linear(filters_list[-1], 2)  # Salida para nivel 1
        self.fc2 = nn.Linear(filters_list[-1], 2)  # Salida para nivel 2
    
    def _make_layers(self, num_blocks, filters_list, kernel_size_list):
        layers = []
        in_channels = 64
        for i in range(num_blocks):
            out_channels = filters_list[i]
            kernel_size = kernel_size_list[i]
            layers.append(ResidualBlock(in_channels, out_channels, kernel_size))
            in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.maxpool(out)
        
        out = self.layers(out)
        
        out = self.global_avg_pool(out)
        out = out.view(out.size(0), -1)
        
        # Calcular las salidas para nivel 1 y nivel 2
        out1 = self.fc1(out)
        out2 = self.fc2(out)
        
        # Aplicar softmax a las salidas
        out1 = self.softmax(out1)
        out2 = self.softmax(out2)
        
        return out1, out2

# Define un bloque residual para la ResNet 1D
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channels)
        
        self.downsample = nn.Sequential(
            nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm1d(out_channels)
        ) if in_channels != out_channels else None

    def forward(self, x):
        identity = x
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = self.relu(out)
        
        return out

  import kerastuner as kt


In [2]:
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        correct1 = 0
        correct2 = 0
        
        for inputs, targets1, targets2 in train_loader:
            optimizer.zero_grad()
            outputs1, outputs2 = model(inputs)
            
            loss1 = criterion(outputs1, targets1)
            loss2 = criterion(outputs2, targets2)
            loss = loss1 + loss2  # Sumar las pérdidas de ambos niveles
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            
            _, predicted1 = torch.max(outputs1, 1)
            _, predicted2 = torch.max(outputs2, 1)
            correct1 += (predicted1 == targets1).sum().item()
            correct2 += (predicted2 == targets2).sum().item()
        
        # Evaluar en datos de validación
        model.eval()
        val_loss = 0.0
        val_correct1 = 0
        val_correct2 = 0
        
        with torch.no_grad():
            for inputs, targets1, targets2 in val_loader:
                outputs1, outputs2 = model(inputs)
                
                loss1 = criterion(outputs1, targets1)
                loss2 = criterion(outputs2, targets2)
                loss = loss1 + loss2
                
                val_loss += loss.item()
                
                _, predicted1 = torch.max(outputs1, 1)
                _, predicted2 = torch.max(outputs2, 1)
                val_correct1 += (predicted1 == targets1).sum().item()
                val_correct2 += (predicted2 == targets2).sum().item()
        
        # Calcular métricas de entrenamiento y validación
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        accuracy1 = correct1 / len(train_loader.dataset)
        accuracy2 = correct2 / len(train_loader.dataset)
        val_accuracy1 = val_correct1 / len(val_loader.dataset)
        val_accuracy2 = val_correct2 / len(val_loader.dataset)
        
        print(f"Epoch [{epoch+1}/{num_epochs}], "
              f"Train Loss: {train_loss:.4f}, Train Acc1: {accuracy1:.4f}, Train Acc2: {accuracy2:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc1: {val_accuracy1:.4f}, Val Acc2: {val_accuracy2:.4f}")

    return model


In [3]:
X_train, X_val, level1_labels_train, level1_labels_val, level2_labels_train, level2_labels_val = load_data()


In [16]:
level2_labels_val.shape

(87,)

In [4]:
from torch.utils.data import DataLoader

# Crear datasets personalizados
train_dataset = CustomDataset(X_train, level1_labels_train, level2_labels_train)
val_dataset = CustomDataset(X_val, level1_labels_val, level2_labels_val)

# Crear DataLoaders
batch_size = 64  # Puedes ajustar el tamaño del lote según tus necesidades
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


In [5]:
# Instanciar el modelo
num_blocks = 3  # Puedes ajustar el número de bloques según tus necesidades
filters_list = [64, 128, 128]  # Ajusta los filtros para cada bloque según lo necesites
kernel_size_list = [3, 3, 3]  # Ajusta los tamaños de kernel para cada bloque según lo necesites

model = CustomResNet(num_blocks, filters_list, kernel_size_list)

# Definir criterio de pérdida y optimizador
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)  # Puedes ajustar la tasa de aprendizaje según sea necesario


In [6]:
# Entrenar el modelo
num_epochs = 10  # Puedes ajustar el número de épocas según sea necesario
trained_model = train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs)


RuntimeError: Given groups=1, weight of size [64, 8, 3], expected input[64, 500, 2] to have 8 channels, but got 500 channels instead