<a href="https://colab.research.google.com/github/LolaSM/TFG_Bio/blob/main/RNN_Bio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Script para entrenar un modelo LSTM sobre series temporales de pacientes.

Columnas del CSV:
- paciente_columna: identificador único de paciente
- grupo_columna: etiqueta de clasificación (target)
- tiempo: instante temporal (en segundos)
- posicionX, posicionY, posicionZ: coordenadas espaciales
- anguloEulerX, anguloEulerY, anguloEulerZ: orientación en ángulos de Euler

Pasos incluidos:
1. Importar librerías
2. Cargar y ordenar datos
3. Calcular velocidad y aceleración
4. Normalizar features
5. Crear secuencias de longitud fija
6. Codificar etiquetas
7. Dividir en train y test
8. Definir y compilar modelo LSTM
9. Entrenar y guardar history
10. Graficar métricas de entrenamiento
11. Evaluar rendimiento y guardar modelo

In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.utils import to_categorical
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

In [2]:
# 1. Cargar datos
df = pd.read_csv('/content/tablagrupos_AB.csv')
# Mapear etiquetas A/B a 1/0 para clasificación binaria (A: pain-free, B:pain-affected)
df['grupo_columna'] = df['grupo_columna'].map({'A': 0, 'B': 1})

# 2. Ordenar por paciente y tiempo para cálculo coherente de derivadas
df.sort_values(by=['paciente_columna', 'tiempo'], inplace=True)

In [3]:
# Comprueba que ha cargado bien:
print(df.shape)
df.head()

(371860, 9)


Unnamed: 0,paciente_columna,grupo_columna,tiempo,posicionX,posicionY,posicionZ,anguloEulerX,anguloEulerY,anguloEulerZ
0,1,0,56466.6141,0.1376,-0.19134,0.18056,-80.576508,40.16388,-26.151448
1,1,0,56486.5546,0.1376,-0.19134,0.18056,-80.576508,40.16388,-26.151448
2,1,0,56506.3472,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048
3,1,0,56526.7568,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048
4,1,0,56547.534,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048


In [4]:
# 1.b. Configuración de frecuencia de muestreo conocida
SAMPLING_FREQ_HZ = 50  # Frecuencia conocida: 50 Hz
EXPECTED_INTERVAL_MS = 1000.0 / SAMPLING_FREQ_HZ  # 20 ms entre muestras

print(f"Frecuencia de muestreo configurada: {SAMPLING_FREQ_HZ} Hz")
print(f"Intervalo esperado entre muestras: {EXPECTED_INTERVAL_MS:.1f} ms")

# Verificación opcional de la frecuencia real en los datos
df['dt_ms'] = df.groupby('paciente_columna')['tiempo'].diff()
median_dt_ms = df['dt_ms'].median()
if not pd.isna(median_dt_ms) and median_dt_ms > 0:
    real_freq = 1000.0 / median_dt_ms  # Convertir ms a Hz
    print(f"Frecuencia real calculada de los datos: {real_freq:.1f} Hz")
    if abs(real_freq - SAMPLING_FREQ_HZ) > 5:  # Tolerancia de 5 Hz
        print(f"ADVERTENCIA: La frecuencia real ({real_freq:.1f} Hz) difiere significativamente de la esperada ({SAMPLING_FREQ_HZ} Hz)")
else:
    print("No se pudo calcular frecuencia real de los datos")

# Limpiar columna auxiliar
df.drop(columns='dt_ms', inplace=True)


Frecuencia de muestreo configurada: 50 Hz
Intervalo esperado entre muestras: 20.0 ms
Frecuencia real calculada de los datos: 50.0 Hz


In [5]:
# 2. Calcular velocidad y aceleración por paciente (convertir tiempo a segundos)
df['tiempo_s'] = df['tiempo'] / 1000.0  # Convertir ms a segundos para cálculos físicos

for axis in ['X', 'Y', 'Z']:
    df[f'vel_{axis}'] = df.groupby('paciente_columna')[f'posicion{axis}'].diff() \
                         / df.groupby('paciente_columna')['tiempo_s'].diff()
    df[f'acel_{axis}'] = df.groupby('paciente_columna')[f'vel_{axis}'].diff() \
                          / df.groupby('paciente_columna')['tiempo_s'].diff()

df.fillna(0, inplace=True)

In [6]:
print(df.shape)
df.head()

(371860, 16)


Unnamed: 0,paciente_columna,grupo_columna,tiempo,posicionX,posicionY,posicionZ,anguloEulerX,anguloEulerY,anguloEulerZ,tiempo_s,vel_X,acel_X,vel_Y,acel_Y,vel_Z,acel_Z
0,1,0,56466.6141,0.1376,-0.19134,0.18056,-80.576508,40.16388,-26.151448,56.466614,0.0,0.0,0.0,0.0,0.0,0.0
1,1,0,56486.5546,0.1376,-0.19134,0.18056,-80.576508,40.16388,-26.151448,56.486555,0.0,0.0,0.0,0.0,0.0,0.0
2,1,0,56506.3472,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048,56.506347,-2.879864,-145.502066,2.627245,132.738727,-1.717814,-86.790706
3,1,0,56526.7568,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048,56.526757,0.0,141.103412,0.0,-128.725919,0.0,84.166947
4,1,0,56547.534,0.0806,-0.13934,0.14656,-84.195995,37.00198,-24.848048,56.547534,0.0,0.0,0.0,0.0,0.0,0.0


In [7]:
# 4. Selección y normalización de features
features = [
    'posicionX','posicionY','posicionZ',
    'anguloEulerX','anguloEulerY','anguloEulerZ',
    'vel_X','vel_Y','vel_Z',
    'acel_X','acel_Y','acel_Z'
]
df[features] = StandardScaler().fit_transform(df[features])


In [8]:
# 4. Función mejorada para crear secuencias con filtro de pacientes
def create_sequences(data, seq_length, min_sequences=5):
    """
    Crea secuencias temporales filtrando pacientes con pocas muestras

    Args:
        data: DataFrame con los datos
        seq_length: longitud de la secuencia temporal
        min_sequences: número mínimo de secuencias que debe generar un paciente
    """
    Xs, ys = [], []
    patients_used = []
    patients_skipped = []

    for patient_id, group in data.groupby('paciente_columna'):
        vals = group[features].values
        labs = group['grupo_columna'].values

        # Verificar si el paciente tiene suficientes muestras
        if len(vals) < seq_length:
            patients_skipped.append(patient_id)
            continue

        sequences_from_patient = len(vals) - seq_length + 1

        # Verificar si genera suficientes secuencias
        if sequences_from_patient >= min_sequences:
            patients_used.append(patient_id)
            for i in range(sequences_from_patient):
                Xs.append(vals[i:i+seq_length])
                ys.append(labs[i+seq_length-1])
        else:
            patients_skipped.append(patient_id)

    print(f"Pacientes utilizados: {len(patients_used)}")
    print(f"Pacientes omitidos por pocas muestras: {len(patients_skipped)}")

    return np.array(Xs), np.array(ys), patients_used


In [9]:
# 5. Definir ventana de tiempo optimizada para 50 Hz y dolor cervical
window_seconds = 2  # 2 segundos de historia (óptimo para 50 Hz)
seq_len = int(window_seconds * SAMPLING_FREQ_HZ)  # 2s * 50Hz = 100 muestras

print(f"\n=== CONFIGURACIÓN DE SECUENCIAS ===")
print(f"Ventana temporal: {window_seconds} segundos")
print(f"Longitud de secuencia: {seq_len} muestras")
print(f"Duración real por secuencia: {seq_len / SAMPLING_FREQ_HZ:.1f} segundos")
print(f"Resolución temporal: {1000/SAMPLING_FREQ_HZ:.1f} ms por muestra")


=== CONFIGURACIÓN DE SECUENCIAS ===
Ventana temporal: 2 segundos
Longitud de secuencia: 100 muestras
Duración real por secuencia: 2.0 segundos
Resolución temporal: 20.0 ms por muestra


In [10]:
# 6. Análisis inicial de pacientes
pacientes = df['paciente_columna'].unique()
etiq_pac = df.groupby('paciente_columna')['grupo_columna'].first().loc[pacientes].values

print(f"\n=== ANÁLISIS DE DATOS ===")
print(f"Total de pacientes: {len(pacientes)}")
print(f"Total de muestras: {len(df)}")
print(f"Promedio de muestras por paciente: {len(df)/len(pacientes):.1f}")

print(f"\nDistribución de clases:")
unique_labels, counts = np.unique(etiq_pac, return_counts=True)
for label, count in zip(unique_labels, counts):
    print(f"  {label}: {count} pacientes ({count/len(pacientes)*100:.1f}%)")



=== ANÁLISIS DE DATOS ===
Total de pacientes: 87
Total de muestras: 371860
Promedio de muestras por paciente: 4274.3

Distribución de clases:
  0: 43 pacientes (49.4%)
  1: 44 pacientes (50.6%)


In [13]:
# 7. Función para entrenar y evaluar modelo
def train_evaluate_model(X_train, y_train, X_test, y_test, le):
    """Entrena y evalúa un modelo LSTM optimizado para 50 Hz"""

    # Codificar etiquetas
    y_train_enc = le.fit_transform(y_train)
    y_test_enc = le.transform(y_test)
    y_train_cat = to_categorical(y_train_enc)
    y_test_cat = to_categorical(y_test_enc)

    # Definir modelo optimizado para secuencias de 50 Hz
    model = Sequential([
        LSTM(128, input_shape=(seq_len, len(features)), return_sequences=True),
        #Dropout(0.3),
        LSTM(64, return_sequences=True),
        #Dropout(0.3),
        LSTM(32),
        #Dropout(0.3),
        Dense(64, activation='relu'),
        #Dropout(0.2),
        Dense(32, activation='relu'),
        Dense(y_train_cat.shape[1], activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    # Entrenar con más épocas para aprovechar la resolución temporal
    history = model.fit(
        X_train, y_train_cat,
        epochs=75,  # Más épocas para datos de 50 Hz
        batch_size=32,
        validation_split=0.2,
        verbose=0
    )

    # Evaluar
    loss, acc = model.evaluate(X_test, y_test_cat, verbose=0)

    # Predicciones para métricas detalladas
    y_pred = model.predict(X_test, verbose=0)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true_classes = np.argmax(y_test_cat, axis=1)

    return acc, history, y_true_classes, y_pred_classes, model

In [None]:
# 8. Validación cruzada subject-wise
print(f"\n=== VALIDACIÓN CRUZADA SUBJECT-WISE (5-fold) ===")
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = []
all_y_true = []
all_y_pred = []

for fold, (train_idx, test_idx) in enumerate(skf.split(pacientes, etiq_pac)):
    print(f"\nFold {fold + 1}/5:")

    # Dividir pacientes
    train_pac_cv = pacientes[train_idx]
    test_pac_cv = pacientes[test_idx]

    # Crear conjuntos de datos
    df_train_cv = df[df['paciente_columna'].isin(train_pac_cv)]
    df_test_cv = df[df['paciente_columna'].isin(test_pac_cv)]

    # Crear secuencias
    X_train_cv, y_train_cv, _ = create_sequences(df_train_cv, seq_len)
    X_test_cv, y_test_cv, _ = create_sequences(df_test_cv, seq_len)

    if len(X_train_cv) == 0 or len(X_test_cv) == 0:
        print(f"  Fold {fold + 1} omitido: datos insuficientes")
        continue

    print(f"  Secuencias entrenamiento: {len(X_train_cv)}")
    print(f"  Secuencias test: {len(X_test_cv)}")

    # Entrenar y evaluar
    le_cv = LabelEncoder()
    acc, _, y_true_fold, y_pred_fold, _ = train_evaluate_model(
        X_train_cv, y_train_cv, X_test_cv, y_test_cv, le_cv
    )

    cv_scores.append(acc)
    all_y_true.extend(y_true_fold)
    all_y_pred.extend(y_pred_fold)

    print(f"  Precisión: {acc*100:.2f}%")

print(f"\n=== RESULTADOS VALIDACIÓN CRUZADA ===")
if len(cv_scores) > 0:
    print(f"Precisión promedio: {np.mean(cv_scores)*100:.2f}% ± {np.std(cv_scores)*100:.2f}%")
    print(f"Precisión por fold: {[f'{acc*100:.2f}%' for acc in cv_scores]}")
    print(f"Rango: {min(cv_scores)*100:.2f}% - {max(cv_scores)*100:.2f}%")
else:
  print("No se completó ningún fold de validación cruzada")

# 9. Métricas clínicas consolidadas de validación cruzada
if len(all_y_true) > 0:
    print(f"\n=== MÉTRICAS CLÍNICAS (Validación Cruzada) ===")
    le_final = LabelEncoder()
    le_final.fit(df['grupo_columna'])

    print("\nReporte de clasificación:")
    print(classification_report(all_y_true, all_y_pred,
                              target_names=le_final.classes_))

    print("\nMatriz de confusión:")
    cm = confusion_matrix(all_y_true, all_y_pred)
    print(cm)

    # Calcular métricas específicas para contexto médico
    if cm.size == 4:  # Clasificación binaria
        tn, fp, fn, tp = cm.ravel()
        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        print(f"\nMétricas clínicas:")
        print(f"  Sensibilidad (recall): {sensitivity:.3f}")
        print(f"  Especificidad: {specificity:.3f}")

# 10. Entrenamiento del modelo final con separación 70/30
print(f"\n=== MODELO FINAL (70/30 split) ===")
train_pac, test_pac = train_test_split(
    pacientes, test_size=0.3, random_state=42, stratify=etiq_pac
)

df_train = df[df['paciente_columna'].isin(train_pac)]
df_test = df[df['paciente_columna'].isin(test_pac)]

print(f"Pacientes en entrenamiento: {len(train_pac)}")
print(f"Pacientes en test: {len(test_pac)}")

# Verificar distribución temporal en train/test
print(f"\nDuración promedio por paciente:")
for split_name, df_split in [("Entrenamiento", df_train), ("Test", df_test)]:
    durations = []
    for patient_id, group in df_split.groupby('paciente_columna'):
        duration_ms = group['tiempo'].max() - group['tiempo'].min()
        durations.append(duration_ms / 1000.0)  # Convertir a segundos

    if durations:
        print(f"  {split_name}: {np.mean(durations):.1f}s ± {np.std(durations):.1f}s")

# Crear secuencias finales
X_train, y_train, patients_used_train = create_sequences(df_train, seq_len)
X_test, y_test, patients_used_test = create_sequences(df_test, seq_len)

print(f"\nSecuencias de entrenamiento: {len(X_train)}")
print(f"Secuencias de test: {len(X_test)}")
print(f"Ratio secuencias/paciente entrenamiento: {len(X_train)/len(patients_used_train):.1f}")
print(f"Ratio secuencias/paciente test: {len(X_test)/len(patients_used_test):.1f}")

# Entrenar modelo final
if len(X_train) > 0 and len(X_test) > 0:
    le_final = LabelEncoder()
    final_acc, history, y_true_final, y_pred_final, final_model = train_evaluate_model(
        X_train, y_train, X_test, y_test, le_final
    )

    print(f"\nPrecisión del modelo final: {final_acc*100:.2f}%")

    # 11. Métricas detalladas del modelo final
    print(f"\n=== MÉTRICAS FINALES ===")
    print("\nReporte de clasificación:")
    print(classification_report(y_true_final, y_pred_final,
                              target_names=le_final.classes_))

    print("\nMatriz de confusión:")
    cm_final = confusion_matrix(y_true_final, y_pred_final)
    print(cm_final)

# 12. Graficar métricas de entrenamiento
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.plot(history.history['accuracy'], label='Entrenamiento', linewidth=2)
    plt.plot(history.history['val_accuracy'], label='Validación', linewidth=2)
    plt.title('Precisión por época\n(50 Hz, 2s ventana)', fontsize=12)
    plt.xlabel('Época')
    plt.ylabel('Precisión')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 3, 2)
    plt.plot(history.history['loss'], label='Entrenamiento', linewidth=2)
    plt.plot(history.history['val_loss'], label='Validación', linewidth=2)
    plt.title('Pérdida por época\n(50 Hz, 2s ventana)', fontsize=12)
    plt.xlabel('Época')
    plt.ylabel('Pérdida')
    plt.legend()
    plt.grid(True, alpha=0.3)

    plt.subplot(1, 3, 3)
    plt.bar(range(len(cv_scores)), [s*100 for s in cv_scores], alpha=0.7)
    plt.title('Precisión por Fold\n(Validación Cruzada)', fontsize=12)
    plt.xlabel('Fold')
    plt.ylabel('Precisión (%)')
    plt.ylim(0, 100)
    plt.grid(True, alpha=0.3)

    for i, score in enumerate(cv_scores):
        plt.text(i, score*100 + 1, f'{score*100:.1f}%', ha='center', fontsize=10)

    plt.tight_layout()
    plt.show()

  # 13. Resumen final optimizado para 50 Hz
    print(f"\n=== RESUMEN FINAL (OPTIMIZADO 50 Hz) ===")
    print(f"Configuración temporal:")
    print(f"  • Frecuencia de muestreo: {SAMPLING_FREQ_HZ} Hz")
    print(f"  • Ventana temporal: {window_seconds} segundos")
    print(f"  • Longitud de secuencia: {seq_len} muestras")
    print(f"  • Resolución: {1000/SAMPLING_FREQ_HZ:.1f} ms por muestra")

    print(f"\nConfiguración del modelo:")
    print(f"  • Features utilizadas: {len(features)}")
    print(f"  • Arquitectura: 3 capas LSTM (128→64→32) + 2 Dense")
    print(f"  • Épocas de entrenamiento: 75")

    print(f"\nResultados:")
    if len(cv_scores) > 0:
        print(f"  • Precisión validación cruzada: {np.mean(cv_scores)*100:.2f}% ± {np.std(cv_scores)*100:.2f}%")
    print(f"  • Precisión modelo final: {final_acc*100:.2f}%")
    print(f"  • Pacientes procesados: {len(patients_used_train) + len(patients_used_test)}")
    print(f"  • Secuencias totales generadas: {len(X_train) + len(X_test)}")

    print(f"\nOptimizaciones para 50 Hz:")
    print(f"  ✓ Ventana de 2s captura patrones de movimiento cervical")
    print(f"  ✓ 100 muestras por secuencia proporcionan resolución adecuada")
    print(f"  ✓ Arquitectura LSTM adaptada para datos de alta frecuencia")
    print(f"  ✓ Tiempo convertido correctamente de ms a segundos para cálculos físicos")

else:
    print("No se pudieron generar suficientes secuencias para entrenar el modelo")
    print("Verifica que los pacientes tengan suficientes muestras temporales")


=== VALIDACIÓN CRUZADA SUBJECT-WISE (5-fold) ===

Fold 1/5:
Pacientes utilizados: 69
Pacientes omitidos por pocas muestras: 0
Pacientes utilizados: 18
Pacientes omitidos por pocas muestras: 0
  Secuencias entrenamiento: 283675
  Secuencias test: 79572
