In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tslearn.clustering import KShape, TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.utils import to_time_series_dataset
from sklearn.model_selection import train_test_split, KFold, ParameterGrid
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from scipy.signal import butter, filtfilt
from scipy.stats import mode

# Cargar datos
data = np.loadtxt("/mnt/DGX0Raid/databases/bonn_dataset_columns_correct.csv", delimiter=",")[:, :].T
dataDF = pd.DataFrame(data)
X = to_time_series_dataset(data[:, 1:])
y = data[:, 0]

# Función para extraer los primeros 21 segundos de cada señal EEG y usar yield
def extract_21_seconds_generator(X, fs=173.61, window_duration=8):
    """
    Generador para extraer los primeros 'window_duration' segundos de cada señal EEG.
    'fs' es la frecuencia de muestreo (samples per second).
    """
    # Convertir los segundos a número de muestras
    num_samples = int(window_duration * fs)

    # Iterar sobre las señales y extraer la parte correspondiente
    for signal in X:
        yield signal[:num_samples]

# Crear un generador para los segmentos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Usar el generador para obtener los primeros 21 segundos de cada señal
X_train_extracted = np.array(list(extract_21_seconds_generator(X_train)))
X_test_extracted = np.array(list(extract_21_seconds_generator(X_test)))

# Eliminar la última dimensión
X_train_extracted = X_train_extracted.squeeze(-1)
X_test_extracted = X_test_extracted.squeeze(-1)

# Función de filtro pasa bajos
def low_pass_filter(signal, cutoff=40.0, fs=173.61, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return filtfilt(b, a, signal)

# Parámetros del filtro
cutoff_freq = 40.0
fs = 173.61

# Filtrar las señales EEG
X_train_filtered = np.array([low_pass_filter(signal, cutoff=cutoff_freq, fs=fs) for signal in X_train_extracted])
X_test_filtered = np.array([low_pass_filter(signal, cutoff=cutoff_freq, fs=fs) for signal in X_test_extracted])

# Usar únicamente TimeSeriesScalerMeanVariance
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.)
X_train_scaled = scaler.fit_transform(X_train_filtered)
X_test_scaled = scaler.transform(X_test_filtered)

# Función para mapear clusters a etiquetas reales
def map_clusters_to_labels(clusters, true_labels):
    true_labels = np.array(true_labels, dtype=int)
    label_map = {}
    for cluster in np.unique(clusters):
        mask = (clusters == cluster)
        true_mode = mode(true_labels[mask], keepdims=True).mode[0]
        label_map[cluster] = true_mode
    return np.array([label_map[c] for c in clusters]), label_map

# 3) Define la malla de hiperparámetros
param_grid = {
    "n_clusters": [2],
    "metric": ["euclidean", "dtw"],
    "n_init": [1, 3, 5, 7, 10],
    "tol": [1e-4, 1e-5, 1e-6, 1e-7, 1e-8]
}

# Validación cruzada KFold con 10 folds en el conjunto de entrenamiento
kf = KFold(n_splits=10, shuffle=True, random_state=42)

best_cv_score = -np.inf
best_params = None

print("Iniciando validación cruzada sobre el grid de parámetros...")
for params in ParameterGrid(param_grid):
    cv_scores = []
    print(f"\nProbando configuración: {params}")
    for train_index, val_index in kf.split(X_train_scaled):
        X_cv_train = X_train_scaled[train_index]
        X_cv_val = X_train_scaled[val_index]
        y_cv_train = y_train[train_index]
        y_cv_val = y_train[val_index]

        # Entrenar el modelo KShape con la configuración actual
        model = TimeSeriesKMeans(n_clusters=params['n_clusters'],
                       metric=params['metric'],
                       n_init=params['n_init'],
                       tol=params['tol'],
                       random_state=0)
        model.fit(X_cv_train)

        # Predecir en el fold de validación
        y_pred_val = model.predict(X_cv_val)
        mapped_val, _ = map_clusters_to_labels(y_pred_val, y_cv_val)
        cv_acc = accuracy_score(y_cv_val, mapped_val)
        cv_scores.append(cv_acc)

    mean_cv_score = np.mean(cv_scores)
    print(f"Configuración: {params} -> CV Accuracy: {mean_cv_score:.4f}")
    if mean_cv_score > best_cv_score:
        best_cv_score = mean_cv_score
        best_params = params

print("\n====== Mejor configuración (CV) ======")
print("Mejor configuración:", best_params)
print("Mejor CV Accuracy:", best_cv_score)

# Reentrenar el modelo con la mejor configuración en todo el conjunto de entrenamiento
best_model = KShape(n_clusters=best_params['n_clusters'],
                    n_init=best_params['n_init'],
                    tol=best_params['tol'],
                    random_state=0)
best_model.fit(X_train_scaled)

# Evaluar en el conjunto de entrenamiento
y_pred_train = best_model.predict(X_train_scaled)
mapped_train, _ = map_clusters_to_labels(y_pred_train, y_train)
train_acc = accuracy_score(y_train, mapped_train)
print("\n====== Evaluación en el conjunto de entrenamiento ======")
print("Train Accuracy:", train_acc)
print("\nReporte de clasificación (Entrenamiento):")
print(classification_report(y_train, mapped_train))
cm_train = confusion_matrix(y_train, mapped_train)
print("\nMatriz de confusión (Entrenamiento):")
print(cm_train)

# Evaluar en el conjunto de prueba
y_pred_test = best_model.predict(X_test_scaled)
mapped_test, _ = map_clusters_to_labels(y_pred_test, y_test)
test_acc = accuracy_score(y_test, mapped_test)
print("\n====== Evaluación en el conjunto de prueba ======")
print("Test Accuracy:", test_acc)
print("\nReporte de clasificación (Prueba):")
print(classification_report(y_test, mapped_test))
cm_test = confusion_matrix(y_test, mapped_test)
print("\nMatriz de confusión (Prueba):")
print(cm_test)

Iniciando validación cruzada sobre el grid de parámetros...

Probando configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 0.0001}
Configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 0.0001} -> CV Accuracy: 0.7750

Probando configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-05}
Configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-05} -> CV Accuracy: 0.7750

Probando configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-06}
Configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-06} -> CV Accuracy: 0.7750

Probando configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-07}
Configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-07} -> CV Accuracy: 0.7750

Probando configuración: {'metric': 'euclidean', 'n_clusters': 2, 'n_init': 1, 'tol': 1e-08}
Configuración: {'metric': 'euclidean', 'n_cluster

KeyboardInterrupt: 

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tslearn.clustering import KShape
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.utils import to_time_series_dataset
from sklearn.model_selection import train_test_split, KFold, ParameterGrid
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from scipy.signal import butter, filtfilt
from scipy.stats import mode

# Cargar datos
data = np.loadtxt("/mnt/DGX0Raid/databases/bonn_dataset_columns_correct.csv", delimiter=",")[:, :].T
dataDF = pd.DataFrame(data)
X = to_time_series_dataset(data[:, 1:])
y = data[:, 0]

# Función para extraer los primeros 21 segundos de cada señal EEG y usar yield
def extract_21_seconds_generator(X, fs=173.61, window_duration=5):
    """
    Generador para extraer los primeros 'window_duration' segundos de cada señal EEG.
    'fs' es la frecuencia de muestreo (samples per second).
    """
    # Convertir los segundos a número de muestras
    num_samples = int(window_duration * fs)

    # Iterar sobre las señales y extraer la parte correspondiente
    for signal in X:
        yield signal[:num_samples]

# Crear un generador para los segmentos de entrenamiento y prueba
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Usar el generador para obtener los primeros 21 segundos de cada señal
X_train_extracted = np.array(list(extract_21_seconds_generator(X_train)))
X_test_extracted = np.array(list(extract_21_seconds_generator(X_test)))

# Eliminar la última dimensión
X_train_extracted = X_train_extracted.squeeze(-1)
X_test_extracted = X_test_extracted.squeeze(-1)

# Usar únicamente TimeSeriesScalerMeanVariance
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.)

# Seleccionar la señal filtrada con el orden deseado (por ejemplo, orden 2)
X_train_scaled = scaler.fit_transform(X_train_extracted)
X_test_scaled = scaler.transform(X_test_extracted)

# Función para mapear clusters a etiquetas reales
def map_clusters_to_labels(clusters, true_labels):
    true_labels = np.array(true_labels, dtype=int)
    label_map = {}
    for cluster in np.unique(clusters):
        mask = (clusters == cluster)
        true_mode = mode(true_labels[mask], keepdims=True).mode[0]
        label_map[cluster] = true_mode
    return np.array([label_map[c] for c in clusters]), label_map

# Definir el grid de hiperparámetros a probar
param_grid = {
    'n_clusters': [2],
    'n_init': [1, 3, 5, 7, 10],
    'tol': [1e-4, 1e-5, 1e-6, 1e-7, 1e-8]
}

# Validación cruzada KFold con 10 folds en el conjunto de entrenamiento
kf = KFold(n_splits=10, shuffle=True, random_state=42)

best_cv_score = -np.inf
best_params = None

print("Iniciando validación cruzada sobre el grid de parámetros...")
for params in ParameterGrid(param_grid):
    cv_scores = []
    print(f"\nProbando configuración: {params}")
    for train_index, val_index in kf.split(X_train_scaled):
        X_cv_train = X_train_scaled[train_index]
        X_cv_val = X_train_scaled[val_index]
        y_cv_train = y_train[train_index]
        y_cv_val = y_train[val_index]

        # Entrenar el modelo KShape con la configuración actual
        model = KShape(n_clusters=params['n_clusters'],
                       n_init=params['n_init'],
                       tol=params['tol'],
                       random_state=0)
        model.fit(X_cv_train)

        # Predecir en el fold de validación
        y_pred_val = model.predict(X_cv_val)
        mapped_val, _ = map_clusters_to_labels(y_pred_val, y_cv_val)
        cv_acc = accuracy_score(y_cv_val, mapped_val)
        cv_scores.append(cv_acc)

    mean_cv_score = np.mean(cv_scores)
    print(f"Configuración: {params} -> CV Accuracy: {mean_cv_score:.4f}")
    if mean_cv_score > best_cv_score:
        best_cv_score = mean_cv_score
        best_params = params

print("\n====== Mejor configuración (CV) ======")
print("Mejor configuración:", best_params)
print("Mejor CV Accuracy:", best_cv_score)

# Reentrenar el modelo con la mejor configuración en todo el conjunto de entrenamiento
best_model = KShape(n_clusters=best_params['n_clusters'],
                    n_init=best_params['n_init'],
                    tol=best_params['tol'],
                    random_state=0)
best_model.fit(X_train_scaled)

# Evaluar en el conjunto de entrenamiento
y_pred_train = best_model.predict(X_train_scaled)
mapped_train, _ = map_clusters_to_labels(y_pred_train, y_train)
train_acc = accuracy_score(y_train, mapped_train)
print("\n====== Evaluación en el conjunto de entrenamiento ======")
print("Train Accuracy:", train_acc)
print("\nReporte de clasificación (Entrenamiento):")
print(classification_report(y_train, mapped_train))
cm_train = confusion_matrix(y_train, mapped_train)
print("\nMatriz de confusión (Entrenamiento):")
print(cm_train)

# Evaluar en el conjunto de prueba
y_pred_test = best_model.predict(X_test_scaled)
mapped_test, _ = map_clusters_to_labels(y_pred_test, y_test)
test_acc = accuracy_score(y_test, mapped_test)
print("\n====== Evaluación en el conjunto de prueba ======")
print("Test Accuracy:", test_acc)
print("\nReporte de clasificación (Prueba):")
print(classification_report(y_test, mapped_test))
cm_test = confusion_matrix(y_test, mapped_test)
print("\nMatriz de confusión (Prueba):")
print(cm_test)


Iniciando validación cruzada sobre el grid de parámetros...

Probando configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 0.0001}
Configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 0.0001} -> CV Accuracy: 0.8075

Probando configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-05}
Configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-05} -> CV Accuracy: 0.8125

Probando configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-06}
Configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-06} -> CV Accuracy: 0.8125

Probando configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-07}
Configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-07} -> CV Accuracy: 0.8125

Probando configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-08}
Configuración: {'n_clusters': 2, 'n_init': 1, 'tol': 1e-08} -> CV Accuracy: 0.8125

Probando configuración: {'n_clusters': 2, 'n_init': 3, 'tol': 0.0001}
Configuración: {'n_clusters': 2, 'n_init': 3, 'tol': 0.0001} -> CV Accuracy: 0.8175

Probando configur

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tslearn.clustering import KShape
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
from tslearn.utils import to_time_series_dataset
from sklearn.model_selection import train_test_split, KFold, ParameterGrid
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report, adjusted_rand_score
from scipy.signal import butter, filtfilt
from scipy.stats import mode

# ===================== CARGA Y PREPROCESAMIENTO =====================
# Cargar datos
data = np.loadtxt("/mnt/DGX0Raid/databases/bonn_dataset_columns_correct.csv", delimiter=",").T
X_raw = data[:, 1:]
y = data[:, 0].astype(int)

# Convertir a dataset de series temporales
X = to_time_series_dataset(X_raw)

# Dividir en train y test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Extraer primeros 21 segundos
def extract_21_seconds_generator(X, fs=173.61, window_duration=21):
    num_samples = int(window_duration * fs)
    for signal in X:
        yield signal[:num_samples]

X_train_extracted = np.array(list(extract_21_seconds_generator(X_train))).squeeze(-1)
X_test_extracted = np.array(list(extract_21_seconds_generator(X_test))).squeeze(-1)

# Filtro pasa bajos
def low_pass_filter(signal, cutoff=40.0, fs=173.61, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return filtfilt(b, a, signal)

X_train_filtered = np.array([low_pass_filter(sig) for sig in X_train_extracted])
X_test_filtered = np.array([low_pass_filter(sig) for sig in X_test_extracted])

# Escalado
scaler = TimeSeriesScalerMeanVariance(mu=0., std=1.)
X_train_scaled = scaler.fit_transform(X_train_filtered)
X_test_scaled = scaler.transform(X_test_filtered)

# ===================== FUNCIÓN DE MAPEADO =====================
def map_clusters_to_labels(clusters, true_labels):
    true_labels = np.array(true_labels, dtype=int)
    label_map = {}
    for cluster in np.unique(clusters):
        mask = (clusters == cluster)
        true_mode = mode(true_labels[mask], keepdims=True).mode[0]
        label_map[cluster] = true_mode
    return np.array([label_map[c] for c in clusters]), label_map

# ===================== VALIDACIÓN CRUZADA =====================
kf = KFold(n_splits=10, shuffle=True, random_state=42)
param_grid = {'n_clusters': [2], 'n_init': [3, 5, 6, 7, 10], 'tol': [1e-3, 1e-5, 1e-6, 1e-7, 1e-8]}

fold_results = []
best_cv_score = -np.inf

print("Validación cruzada...\n")
for params in ParameterGrid(param_grid):
    print(f"Probando configuración: {params}")
    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_scaled), 1):
        X_tr, X_val = X_train_scaled[train_idx], X_train_scaled[val_idx]
        y_tr, y_val = y_train[train_idx], y_train[val_idx]

        model = KShape(**params, random_state=0)
        model.fit(X_tr)

        y_val_pred = model.predict(X_val)
        mapped_val, _ = map_clusters_to_labels(y_val_pred, y_val)

        metrics = {
            'fold': fold,
            'accuracy': accuracy_score(y_val, mapped_val),
            'precision': precision_score(y_val, mapped_val, average='macro'),
            'recall': recall_score(y_val, mapped_val, average='macro'),
            'f1': f1_score(y_val, mapped_val, average='macro'),
            'ari': adjusted_rand_score(y_val, y_val_pred)
        }
        fold_results.append(metrics)
        print(f"Fold {fold} - Acc: {metrics['accuracy']:.3f}, F1: {metrics['f1']:.3f}, ARI: {metrics['ari']:.3f}")

# Tabla de resultados
results_df = pd.DataFrame(fold_results)
print("\nResumen de validación cruzada:")
print(results_df.describe().loc[['mean', 'std']])

# ===================== MODELO FINAL =====================
best_params = param_grid['n_clusters'][0], param_grid['n_init'][0], param_grid['tol'][0]
model = KShape(n_clusters=2, n_init=5, tol=1e-6, random_state=0)
model.fit(X_train_scaled)

# ===================== EVALUACIÓN FINAL =====================
def evaluate_and_print(X_set, y_true, model, set_name=""):
    y_pred = model.predict(X_set)
    mapped_pred, _ = map_clusters_to_labels(y_pred, y_true)

    acc = accuracy_score(y_true, mapped_pred)
    prec = precision_score(y_true, mapped_pred, average='macro')
    rec = recall_score(y_true, mapped_pred, average='macro')
    f1 = f1_score(y_true, mapped_pred, average='macro')
    ari = adjusted_rand_score(y_true, y_pred)

    print(f"\n====== Evaluación en {set_name} ======")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall:    {rec:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"ARI:       {ari:.4f}")
    print("\nMatriz de confusión:")
    print(confusion_matrix(y_true, mapped_pred))

    return acc, prec, rec, f1, ari, mapped_pred

train_metrics = evaluate_and_print(X_train_scaled, y_train, model, "train")
test_metrics = evaluate_and_print(X_test_scaled, y_test, model, "test")

# ===================== VISUALIZACIÓN =====================
# Señales promedio por cluster
plt.figure(figsize=(10, 5))
for i, center in enumerate(model.cluster_centers_):
    plt.plot(center.ravel(), label=f'Centro cluster {i}')
plt.title("Centros de los clusters (KShape)")
plt.xlabel("Tiempos")
plt.ylabel("Amplitud")
plt.legend()
plt.grid()
plt.tight_layout()
plt.show()

# Señales individuales por cluster (hasta 5 señales por cluster)
assignments = model.predict(X_train_scaled)
for cluster_id in np.unique(assignments):
    signals = X_train_scaled[assignments == cluster_id]
    plt.figure(figsize=(10, 4))
    for i in range(min(5, len(signals))):
        plt.plot(signals[i].ravel(), alpha=0.6, label=f'Señal {i+1}')
    plt.title(f"Señales en cluster {cluster_id}")
    plt.grid()
    plt.tight_layout()
    plt.show()
