In [None]:
import glob
import pickle
import os
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import visualkeras

print(tf.__version__) # La versión utilizada fue la 2.10.0, compatible con python 3.8.0
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

gpu_devices = tf.config.experimental.list_physical_devices("GPU")
for device in gpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

In [None]:
# Cargamos los datos sin ruido
root_path = r"C:\Users\danie\OneDrive\Escritorio\TFG\TFG_entorno\datos_sinteticos_amass_definitivos"

pattern = os.path.join(root_path, '**', '*.pkl')

accumulated_data = []

for file_path in glob.glob(pattern, recursive=True):
    with open(file_path, 'rb') as file:

        data = pickle.load(file)

        if data['gt'] != 'unknown': # Si queremos todos los datos, eliminar esta línea
            accumulated_data.append(data)

print(f'Se han cargado los datos de {len(accumulated_data)} archivos.')

In [None]:
# Contamos el número de frames por clase
def calcular_total_frames(secuencias):
    return sum(len(secuencia['acc']) for secuencia in secuencias)

numero_stand = calcular_total_frames([secuencia for secuencia in accumulated_data if secuencia['gt'] == 'stand'])
numero_walk = calcular_total_frames([secuencia for secuencia in accumulated_data if secuencia['gt'] == 'walk'])
numero_run = calcular_total_frames([secuencia for secuencia in accumulated_data if secuencia['gt'] == 'run'])
numero_sit = calcular_total_frames([secuencia for secuencia in accumulated_data if secuencia['gt'] == 'sit'])

numero_frames = numero_stand + numero_walk + numero_run + numero_sit

print(f'El número de frames de stand es {numero_stand}.')
print(f'El número de frames de walk es {numero_walk}.')
print(f'El número de frames de run es {numero_run}.')
print(f'El número de frames de sit es {numero_sit}.')
print(f'El número total de frames es {numero_frames}.')

In [None]:
# Balanceamos las cuatro clases

# num_objetivo_frames es el número de frames deseado, ajustado al mínimo de las cuatro clases
num_objetivo_frames = min(numero_stand, numero_walk, numero_run, numero_sit)

# Separar secuencias por clase
secuencias_por_clase = {'walk': [], 'run': [], 'sit': [], 'stand': []}
for secuencia in accumulated_data:
    secuencias_por_clase[secuencia['gt']].append(secuencia)

# Función para calcular el total de frames de una lista de secuencias
def calcular_total_frames(secuencias):
    return sum(len(secuencia['acc']) for secuencia in secuencias)

# Balancear los datos para cada clase
np.random.seed(13)  # Para reproducibilidad
clases_para_balancear = ['walk', 'run', 'sit', 'stand']

frames_total_balanceados = 0

for clase in clases_para_balancear:
    while calcular_total_frames(secuencias_por_clase[clase]) > num_objetivo_frames:
        # Elegir y eliminar una secuencia aleatoria
        del_index = np.random.choice(range(len(secuencias_por_clase[clase])))
        del secuencias_por_clase[clase][del_index]
    frames_total_balanceados += calcular_total_frames(secuencias_por_clase[clase])

# Reconstruir la lista de secuencias balanceadas con la nueva estructura deseada
secuencias_balanceadas = []
for secuencias in secuencias_por_clase.values():
    for secuencia in secuencias:
        # Crear un nuevo diccionario para cada secuencia reorganizada
        new_secuencia = {
            'acc': [frame for frame in secuencia['acc']],
            'ori': [frame for frame in secuencia['ori']],
            'gt': secuencia['gt']  # Asociar 'gt' una sola vez por secuencia, no por frame
        }
        secuencias_balanceadas.append(new_secuencia)

print(f'El número total de frames balanceados es {frames_total_balanceados}.')

In [None]:
accumulated_data = secuencias_balanceadas

In [None]:
# Aislamos los datos en tres listas diferentes
gt = [data['gt'] for data in accumulated_data]
acc = [data['acc'] for data in accumulated_data]
ori = [data['ori'] for data in accumulated_data]

del accumulated_data
accumulated_data = [acc, ori]

In [None]:
print(accumulated_data[0][0][0].shape) # [acc/ori][sample num][frame num][IMU]
print(accumulated_data[1][0][0].shape)

In [None]:
def preparar_datos(datos):
    datos_aceleracion = datos[0]
    datos_orientacion = datos[1]
    
    n_imus = datos_aceleracion[0][0].shape[0]  # Número de IMUs
    
    # Determinar la longitud máxima de las secuencias
    max_length = max(max(len(grabacion) for grabacion in datos_aceleracion), max(len(grabacion) for grabacion in datos_orientacion))
 
    # Calcular el número total de características por frame después del aplanamiento
    num_features_aceleracion = n_imus * 3
    num_features_orientacion = n_imus * 3 * 3
    total_features = num_features_aceleracion + num_features_orientacion
    
    # Preparar el array para los datos con padding
    datos_rnn_con_padding = np.zeros((len(datos_aceleracion), max_length, total_features))
    
    for i in range(len(datos_aceleracion)):
        grabacion_aceleracion = datos_aceleracion[i]
        grabacion_orientacion = datos_orientacion[i]
        
        for j in range(min(len(grabacion_aceleracion), len(grabacion_orientacion))):
            # Aplanar los datos de aceleración y orientación
            aceleracion_aplanada = grabacion_aceleracion[j].reshape(-1)
            orientacion_aplanada = grabacion_orientacion[j].reshape(-1)
            
            # Concatenar aceleración y orientación aplanadas
            frame_aplanado = np.concatenate([aceleracion_aplanada, orientacion_aplanada])
            
            # Asignar al array de resultados
            datos_rnn_con_padding[i, j, :] = frame_aplanado
    
    return datos_rnn_con_padding

accumulated_data_rnn = preparar_datos(accumulated_data)

In [None]:
# Realizamos el One Hot Encoding
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

# Primero, convertimos las etiquetas categóricas a números enteros
label_encoder = LabelEncoder()
gt_int = label_encoder.fit_transform(gt)

# Luego, convertimos los números enteros a codificación one-hot
gt_one_hot = to_categorical(gt_int)

In [None]:
print(gt[200])

In [None]:
print(type(gt_one_hot[0]))
print(gt_one_hot[200])

In [None]:
# Extendemos las etiquetas de cada secuencia a cada uno de los frames
gt_categorical_extended = []

for i, frames in enumerate(acc):
    # Cantidad de frames en la grabación actual
    num_frames = len(accumulated_data_rnn[i])
    
    # Utiliza numpy.tile para repetir la etiqueta categórica para cada frame
    gt_repeated = np.tile(gt_one_hot[i], (num_frames, 1))
    
    gt_categorical_extended.append(gt_repeated)


In [None]:
# Convertir la lista de arrays en un único array de NumPy
Y = np.array(gt_categorical_extended)
#Y = np.array(gt_one_hot) #Cambiar cuando se use CNN-1D
# Verificar la forma de Y para confirmar que es correcta
print(Y.shape)

In [None]:
# Dividimos los datos
X_train, X_test, Y_train, Y_test = train_test_split(accumulated_data_rnn, Y, test_size = 0.3, random_state=13)

In [None]:
print(f"Train data shape: {X_train.shape}")
print(f"Train labels shape: {Y_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Test labels shape: {Y_test.shape}")

### Resultados

In [None]:
model = keras.models.load_model("modelo.h5")

In [None]:
loss, accuracy, precision, recall = model.evaluate(X_test, Y_test, verbose=1)
print(f"Accuracy en los datos de prueba: {accuracy * 100:.2f}%")

In [None]:
from sklearn.metrics import classification_report

Y_pred = model.predict(X_test)
y_pred = np.argmax(Y_pred, axis=1) #Cambiar a 1 cuando se clasifique por secuencias en lugar de por frames
y_test = np.argmax(Y_test, axis=1)

In [None]:
print(classification_report(y_test.flatten(), y_pred.flatten(), target_names=label_encoder.classes_))

# Matriz de confusión
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(y_test.flatten(), y_pred.flatten())
plt.figure(figsize=(8, 8))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()