In [97]:
from io import StringIO

import matplotlib.pyplot as plt
from scipy.io import arff
import seaborn as sns
from loguru import logger
import yaml

from datetime import datetime
import polars as pl
import pandas as pd
import numpy as np
import sys
import os
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

# MODEL
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import plot_model
from sklearn.preprocessing import (
    LabelEncoder, 
    StandardScaler,
    label_binarize
)
from sklearn.metrics import (
    classification_report, 
    confusion_matrix, 
    accuracy_score,
    precision_recall_fscore_support,
    balanced_accuracy_score,
    roc_auc_score,
    roc_curve
)

# Save variables for model
import joblib

# PERSONAL FUNCTIONS
from utils import *
from models.main import *
from models.optimizer import ViterbiLiteDecoder
from functions.windows import create_feature_windows # creación de ventanas e ingenieria de características

In [None]:
# resultados = { 'Sit': [], 'Stand': [], 'Type': []}

In [None]:
resultados[target][:]

[(0.0, 0.0, 0.0),
 (0.11764705882352941, 0.0, -0.11764705882352941),
 (0.058823529411764705, 0.0, -0.058823529411764705),
 (0.0, 0.0, 0.0),
 (0.0, 0.0, 0.0),
 (0.0, 0.0, 0.0),
 (0.0, 0.0, 0.0),
 (0.0, 0.0, 0.0),
 (0.0, 0.0, 0.0)]

In [None]:
resultados['Type']

{'Sit': [(0.08333333333333333, 0.0, -0.08333333333333333),
  (0.058823529411764705, 0.0, -0.058823529411764705),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.25, 0.25, 0.0),
  (0.0, 0.0, 0.0),
  (0.25, 0.2916666666666667, 0.041666666666666685),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0)],
 'Stand': [(0.14285714285714285, 0.19047619047619047, 0.047619047619047616),
  (0.06666666666666667, 0.0, -0.06666666666666667),
  (0.0, 0.0, 0.0),
  (0.125, 0.125, 0.0),
  (0.058823529411764705, 0.0, -0.058823529411764705),
  (0.0, 0.0, 0.0),
  (0.5333333333333333, 0.5333333333333333, 0.0),
  (0.11764705882352941, 0.0, -0.11764705882352941)],
 'Type': [(0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.0, 0.0, 0.0),
  (0.11764705882352941, 0.0, -0.11764705882352941),
  (0.058823529411764705, 0.0, -0.058823529411764705),
  (0.0, 0.0

In [1470]:
num = "09"

In [1522]:
path = r"F:\UPC\Tesis\HARbit-Model\src\data\real-data\type-data\type_left_04.json"
data = r"type_left_04"

In [1523]:
import json
import os

with open(path, 'rb') as file:
    data = json.load(file)

gyro_df = data['gyro']
accel_df = data['accel']

In [1524]:
target = "Type"

In [1525]:
accel_temp = pl.DataFrame(accel_df)
gyro_temp = pl.DataFrame(gyro_df)

In [1526]:
accel_temp = accel_temp.with_columns(pl.lit('A').alias('Usuario'))
gyro_temp  = gyro_temp.with_columns(pl.lit('A').alias('Usuario'))

In [1527]:
accel_temp   = accel_temp.with_columns(pl.lit(target).alias('gt'))
gyro_temp    = gyro_temp.with_columns(pl.lit(target).alias('gt'))

In [1528]:
df_accel = normalize_columns(accel_temp,
                            user_col_name  = "Usuario", 
                            timestamp_col_name = "timestamp", 
                            label_col_name = "gt", 
                            x_col_name = "x", 
                            y_col_name = "y", 
                            z_col_name = "z")

df_gyro = normalize_columns(gyro_temp, 
                            user_col_name  = "Usuario", 
                            timestamp_col_name = "timestamp", 
                            label_col_name = "gt", 
                            x_col_name = "x", 
                            y_col_name = "y", 
                            z_col_name = "z")

In [1529]:
# df_all_sensors = df_gyro.join(df_accel, on = ['Subject-id', 'Timestamp', 'Activity Label'], how = "inner")

In [1530]:
# df_gyro = df_all_sensors.select([
#     'Subject-id',
#     'Timestamp',
#     'Activity Label',
#     'X',
#     'Y',
#     'Z'
# ])

# df_accel = df_all_sensors.select([
#     'Subject-id',
#     'Timestamp',
#     'Activity Label',
#     pl.col('X_right').alias('X'),
#     pl.col('Y_right').alias('Y'),
#     pl.col('Z_right').alias('Z')
# ])

In [1531]:
df_accel = convert_timestamp(df_accel)
df_gyro = convert_timestamp(df_gyro)

In [1532]:
# df_all_sensors = df_gyro.join(df_accel, on = ['Subject-id', 'Timestamp', 'Activity Label'], how = "inner")

In [1533]:
# df_gyro = df_all_sensors.select(
#     pl.col('Subject-id'),
#     pl.col('Timestamp'),
#     pl.col('Activity Label'),
#     pl.col('X'),
#     pl.col('Y'),
#     pl.col('Z')
# )

# df_accel = df_all_sensors.select(
#     pl.col('Subject-id'),
#     pl.col('Timestamp'),
#     pl.col('Activity Label'),
#     pl.col('X_right').alias('X'),
#     pl.col('Y_right').alias('Y'),
#     pl.col('Z_right').alias('Z')
# )

In [1534]:
def plot_axes_by_activity(df_gyro, df_accel, max_samples_per_activity=1000, figsize=(15, 12)):
    """
    Visualiza los ejes X, Y, Z del giroscopio y acelerómetro por cada actividad
    """
    # Obtener actividades únicas
    activities = df_gyro['Activity Label'].unique().tolist()
    n_activities = len(activities)
    
    # Crear subplots
    fig, axes = plt.subplots(n_activities, 2, figsize=figsize)
    if n_activities == 1:
        axes = axes.reshape(1, -1)
    
    for i, activity in enumerate(activities):
        # Filtrar datos por actividad
        gyro_activity = df_gyro[df_gyro['Activity Label'] == activity]
        accel_activity =  df_accel[df_accel['Activity Label'] == activity]
        
        # Limitar muestras para mejor visualización
        if len(gyro_activity) > max_samples_per_activity:
            gyro_activity = gyro_activity.head(max_samples_per_activity)
        if len(accel_activity) > max_samples_per_activity:
            accel_activity = accel_activity.head(max_samples_per_activity)
        
        # Convertir a pandas para plotting
        gyro_pd = gyro_activity.copy()
        accel_pd = accel_activity.copy()
        
        # Plot Giroscopio
        axes[i, 0].plot(gyro_pd['X'], label='X', alpha=0.7, linewidth=1)
        axes[i, 0].plot(gyro_pd['Y'], label='Y', alpha=0.7, linewidth=1)
        axes[i, 0].plot(gyro_pd['Z'], label='Z', alpha=0.7, linewidth=1)
        axes[i, 0].set_title(f'Giroscopio - {activity}')
        axes[i, 0].set_ylabel('Valor')
        axes[i, 0].legend()
        axes[i, 0].grid(True, alpha=0.3)
        
        # Plot Acelerómetro
        axes[i, 1].plot(accel_pd['X'], label='X', alpha=0.7, linewidth=1)
        axes[i, 1].plot(accel_pd['Y'], label='Y', alpha=0.7, linewidth=1)
        axes[i, 1].plot(accel_pd['Z'], label='Z', alpha=0.7, linewidth=1)
        axes[i, 1].set_title(f'Acelerómetro - {activity}')
        axes[i, 1].set_ylabel('Valor')
        axes[i, 1].legend()
        axes[i, 1].grid(True, alpha=0.3)
        
        # Añadir xlabel solo en la última fila
        if i == n_activities - 1:
            axes[i, 0].set_xlabel('Muestra')
            axes[i, 1].set_xlabel('Muestra')
    
    plt.tight_layout()
    plt.show()

# Ejecutar visualización
# plot_axes_by_activity(df_gyro.to_pandas(), df_accel.to_pandas(), max_samples_per_activity = 2000,  figsize = (20, 5))

In [1535]:
features_gyro   = create_feature_windows(df_gyro, window_seconds = 5, overlap_percent=50, sampling_rate = 20)
features_accel  = create_feature_windows(df_accel, window_seconds = 5, overlap_percent=50, sampling_rate = 20)

In [1536]:
# model_dic = adaptive_transfer_learning_cnn_lstm(
#     base_model_path = r'F:\UPC\Tesis\HARbit-Model\src\models\version\sensors\accel_cnn-lstm_wisdm_91_cluster_user.h5', 
#     target_X = features_accel.drop(columns = ['Subject-id', 'Activity Label']),
#     target_y = features_accel['Activity Label'],
#     target_le = ,
#     source_X = None,
#     source_y = None, 
#     source_le = None,
#     validation_split = 0.2,
#     progressive_training = True
# )

In [1537]:
# features_combined = pd.merge(
#     features_gyro,
#     features_accel, 
#     on=['Subject-id', 'Activity Label', 'window_start', 'window_end', 'sample_count'], 
#     how="inner"
# )

In [1538]:
# features_combined.shape

In [1539]:
X, y, _, le = prepare_features_for_cnn_lstm_sequences(
    features_accel, 
    group_size=8, 
    step_size=1
)

print(f"Forma de X (secuencias): {X.shape}")   # (N, group_size, features)
print(f"Forma de y: {y.shape}")
print(f"Actividades únicas: {np.unique(y)}")

✅ Secuencias creadas: (19, 8, 68)
  Num features: 68
  Clases: ['Type']
Forma de X (secuencias): (19, 8, 68)
Forma de y: (19,)
Actividades únicas: [0]


In [1540]:
y = le.inverse_transform(y)

In [1541]:
from tensorflow import keras

model = keras.models.load_model(r'F:\UPC\Tesis\HARbit-Model\src\models\version\sensors\accel_cnn-lstm_wisdm_91_cluster_user.h5')
label_encoder = joblib.load(r'F:\UPC\Tesis\HARbit-Model\src\models\meta\sensors\accel_cnn-lstm_wisdm_91_cluster_user.joblib')



In [1542]:
y = label_encoder.transform(y)

In [1543]:
print("Evaluando modelo...")
# Predicciones
y_pred = model.predict(X)
y_pred_classes = np.argmax(y_pred, axis=1)

# Obtener las clases que realmente existen en este conjunto de datos
unique_classes = np.unique(y)

# Calcular accuracy SOLO en esas clases
mask = np.isin(y, unique_classes)
filtered_accuracy = accuracy_score(y[mask], y_pred_classes[mask])

print(f"✅ Accuracy filtrado (solo clases presentes en el dataset): {filtered_accuracy:.4f}")


# Métricas básicas
test_loss, test_accuracy = model.evaluate(X, y, verbose=0)
print(f"\nPérdida en test: {test_loss:.4f}")
print(f"Precisión en test: {test_accuracy:.4f}")

# Reporte de clasificación detallado
print("\n" + "="*50)
print("REPORTE DE CLASIFICACIÓN")
print("="*50)
print(classification_report(
    y, 
    y_pred_classes, 
    labels=unique_classes,               # solo las clases presentes
    target_names=label_encoder.classes_[unique_classes],
    digits=4
))


Evaluando modelo...


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 779ms/step
✅ Accuracy filtrado (solo clases presentes en el dataset): 0.0000

Pérdida en test: 6.2503
Precisión en test: 0.0000

REPORTE DE CLASIFICACIÓN
              precision    recall  f1-score   support

        Type     0.0000    0.0000    0.0000      19.0

   micro avg     0.0000    0.0000    0.0000      19.0
   macro avg     0.0000    0.0000    0.0000      19.0
weighted avg     0.0000    0.0000    0.0000      19.0



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [1544]:
fq = {label_encoder.classes_[yb]: 0 for yb in np.unique(y_pred_classes)}

for lb in y_pred_classes:
    fq[label_encoder.classes_[lb]] += 1

In [1545]:
fq

{'Eat': 6, 'Others': 4, 'Sit': 1, 'Stand': 2, 'Walk': 6}

In [1546]:
from models.optimizer.ViterbiLiteDecoder import ViterbiLiteDecoder

In [1547]:
def apply_viterbi_to_har_results(model, X_test, y_test, label_encoder, 
                                visualization=True, save_results=True):
    """
    Aplica Viterbi-lite a resultados de HAR
    """
    print("🚀 APLICANDO VITERBI-LITE A RESULTADOS HAR")
    print("=" * 60)
    
    # 1. Obtener probabilidades del modelo
    print("🔍 Extrayendo probabilidades...")
    probabilities = model.predict(X_test)
    original_predictions = np.argmax(probabilities, axis=1)
    
    # 2. Crear decodificador con clases específicas
    classes = label_encoder.classes_
    decoder = ViterbiLiteDecoder(
        classes=classes,
        transition_penalty=2.5,  # Ajustar según tus necesidades
        self_bonus=1.8,
        min_duration={
            'Walk': 3,       # 15 segundos
            'Sit': 4,        # 20 segundos  
            'Stand': 2,      # 10 segundos
            'Type': 6,       # 30 segundos
            'Eat': 4,        # 20 segundos
            'Write': 4,      # 20 segundos
            'Workouts': 8,   # 40 segundos
            'Others': 1      # Sin restricción
        }
    )
    
    # 3. Decodificar secuencia
    print("🧠 Decodificando secuencia con Viterbi...")
    decoded_sequence, viterbi_scores = decoder.decode_complete_pipeline(
        probabilities=probabilities,
        apply_duration_constraints=True
    )
    
    # 4. Evaluar mejoras
    from sklearn.metrics import accuracy_score, classification_report
    
    original_accuracy = accuracy_score(y_test, original_predictions)
    viterbi_accuracy = accuracy_score(y_test, decoded_sequence)
    
    print(f"\n📊 RESULTADOS:")
    print(f"🔵 Accuracy original: {original_accuracy:.4f}")
    print(f"🟢 Accuracy con Viterbi: {viterbi_accuracy:.4f}")
    print(f"📈 Mejora: {viterbi_accuracy - original_accuracy:.4f}")
    
    # 5. Análisis de cambios
    changes = np.sum(original_predictions != decoded_sequence)
    print(f"🔄 Frames modificados: {changes}/{len(y_test)} ({100*changes/len(y_test):.1f}%)")
    
    # 7. Visualización
    # if visualization:
    #     decoder.visualize_decoding(
    #         original_sequence=original_predictions,
    #         decoded_sequence=decoded_sequence,
    #         probabilities=probabilities,
    #         save_path='viterbi_analysis.png' if save_results else None
    #     )
    
    # 8. Guardar resultados
    # if save_results:
    #     import joblib
    #     results = {
    #         'original_predictions': original_predictions,
    #         'viterbi_predictions': decoded_sequence,
    #         'probabilities': probabilities,
    #         'viterbi_scores': viterbi_scores,
    #         'accuracy_improvement': viterbi_accuracy - original_accuracy,
    #         'decoder_config': {
    #             'classes': classes.tolist(),
    #             'transition_penalty': decoder.transition_penalty,
    #             'self_bonus': decoder.self_bonus,
    #             'min_duration': decoder.min_duration
    #         }
    #     }
        
    #     joblib.dump(results, 'viterbi_har_results.joblib')
    #     print("💾 Resultados guardados en 'viterbi_har_results.joblib'")
    
    return {
        'decoder': decoder,
        'original_predictions': original_predictions,
        'viterbi_predictions': decoded_sequence,
        'original_accuracy': original_accuracy,
        'viterbi_accuracy': viterbi_accuracy,
        'improvement': viterbi_accuracy - original_accuracy
    }

# Ejemplo de uso con tu modelo
def run_viterbi_on_trained_model():
    """
    Ejecuta Viterbi en tu modelo ya entrenado
    """
    # Cargar tu modelo y datos
    from tensorflow import keras
    import joblib
    
    model = keras.models.load_model(r'F:\UPC\Tesis\HARbit-Model\src\models\version\sensors\accel_cnn-lstm_wisdm_91_cluster_user.h5')
    label_encoder = joblib.load(r'F:\UPC\Tesis\HARbit-Model\src\models\meta\sensors\accel_cnn-lstm_wisdm_91_cluster_user.joblib')
    
    # X_test, y_test = cargar_tus_datos_de_test()
    
    # Aplicar Viterbi
    results = apply_viterbi_to_har_results(
        model=model,
        X_test=X,
        y_test=y,
        label_encoder=label_encoder,
        visualization=True,
        save_results=True
    )
    
    print(f"🎉 Viterbi completado con mejora de {results['improvement']:.4f}")
    
    return results

In [1548]:
rs = run_viterbi_on_trained_model()




🚀 APLICANDO VITERBI-LITE A RESULTADOS HAR
🔍 Extrayendo probabilidades...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 530ms/step
🧠 Decodificando secuencia con Viterbi...
🧠 Decodificando con probabilidades (Viterbi completo)
⏱️ Aplicando restricciones de duración mínima

📊 RESULTADOS:
🔵 Accuracy original: 0.0000
🟢 Accuracy con Viterbi: 0.0000
📈 Mejora: 0.0000
🔄 Frames modificados: 5/19 (26.3%)
🎉 Viterbi completado con mejora de 0.0000


In [1549]:
resultados[target].append((rs['original_accuracy'], rs['viterbi_accuracy'], rs['improvement']))