Instalaciones necesarias

In [9]:
!pip install tensorflow



imports utilizados en el código.

In [10]:
import os
import csv
import tensorflow as tf
import numpy as np
from google.colab import drive

# Montar Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Funciones a usar.

In [11]:
def leer_datos(carpeta, index):
    # Definir ruta base
    ruta_base = '/content/drive/MyDrive/lab5micros/datos'
    ruta_carpeta = os.path.join(ruta_base, carpeta)
    input = []
    output= []

    # Verificar si el directorio existe
    if not os.path.isdir(ruta_carpeta):
        print(f"La carpeta '{ruta_carpeta}' no existe.")
        return input, output

    # Iterar los archivos CSV de un directorio
    for archivo in os.listdir(ruta_carpeta):
        if archivo.endswith('.csv'):
            ruta_archivo = os.path.join(ruta_carpeta, archivo)
            with open(ruta_archivo, 'r') as f:
                lector = csv.reader(f)
                datos_archivo = list(lector)[1:101]  # Omitir header
                input.append(datos_archivo)
                # Agregar el output correspondiente
                output.append([1 if i == index else 0 for i in range(3)])

    return input, output

def preparar_datos():
    # Inicializar semillas para reproducir resultados
    SEED = 6512
    np.random.seed(SEED)
    tf.random.set_seed(SEED)
    # Directorios a usar
    carpetas = ['arriba', 'jalar', 'revés', 'quieto'] #Quieto tiene output 0,0,0

    inputs = []
    outputs = []

    # Leer datos de cada directorio
    for i, carpeta in enumerate(carpetas):
        matriz_datos, identificador_archivos = leer_datos(carpeta, i)
        inputs.extend(matriz_datos)
        outputs.extend(identificador_archivos)

    # Normalizar datos
    num_max = max([max([max([abs(float(data)) for data in point])] for point in input) for input in inputs])[0]
    for i in range(len(inputs)):
      for j in range(len(inputs[i])):
        for k in range(len(inputs[i][j])):
          inputs[i][j][k] = (float(inputs[i][j][k])+ num_max/2)/(2*num_max)

    inputs = [[data for point in input for data in point] for input in inputs]

    # Convertir a np array
    inputs = np.array(inputs, dtype=float)
    outputs = np.array(outputs, dtype=float)

    #Shuffle
    indices = np.arange(len(inputs))
    np.random.shuffle(indices)
    inputs = inputs[indices]
    outputs = outputs[indices]

    # Dividir datos: 60% entrenar, 20% validación, 20% pruebas
    num_entrenamiento = int(0.6 * len(inputs))
    num_validacion = int(0.2 * len(inputs))

    x_train, y_train = inputs[:num_entrenamiento], outputs[:num_entrenamiento]
    x_val, y_val = inputs[num_entrenamiento:num_entrenamiento + num_validacion], outputs[num_entrenamiento:num_entrenamiento + num_validacion]
    x_test, y_test = inputs[num_entrenamiento + num_validacion:], outputs[num_entrenamiento + num_validacion:]

    return x_train, y_train, x_val, y_val, x_test, y_test

def crear_modelo():
    modelo = tf.keras.models.Sequential([
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(16, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(3, activation='softmax')  # Three outputs for the three types of movement
    ])

    modelo.compile(optimizer='rmsprop', loss='mse', metrics=['mae'])
    return modelo

Esta sección corresponde al código usado para generar archivos de dtos donde el movimiento inicia em movimientos varios de la lectura de datos, solamente se corre una vez puesto que cambia el contenido de las carpetas agregando archivos csv.

In [12]:
# import random

# carpetas = ['arriba', 'jalar', 'revés']
# quieto = 'quieto'

# def get_random_quieto():
#     # Definir ruta base
#     ruta_base = '/content/drive/MyDrive/lab5micros/datos'
#     ruta_carpeta = os.path.join(ruta_base, carpeta)
#     input = []
#     output= []

#     # Revisar si directorio existe
#     if not os.path.isdir(ruta_carpeta):
#         print(f"La carpeta '{ruta_carpeta}' no existe.")
#         return input, output

#     # Obtener todos los csv en el directorio
#     archivos_csv = [archivo for archivo in os.listdir(ruta_carpeta) if archivo.endswith('.csv')]

#     # En caso de no tener archivos csv
#     if not archivos_csv:
#         print(f"No hay archivos CSV en la carpeta '{ruta_carpeta}'.")
#         return input, output

#     # Seleccionar un archivo csv aleatorio
#     archivo_aleatorio = random.choice(archivos_csv)
#     ruta_archivo = os.path.join(ruta_carpeta, archivo_aleatorio)
#     # Leer el csv
#     with open(ruta_archivo, 'r') as f:
#         lector = csv.reader(f)
#         datos_archivo = list(lector)[1:101]  # Omitir encabezado
#         return datos_archivo


# # Duplicar archivos en todas las carpetas
# for i, carpeta in enumerate(carpetas):
#     # Set base path to Google Drive folder
#     ruta_base = '/content/drive/MyDrive/lab5micros/datos'
#     ruta_carpeta = os.path.join(ruta_base, carpeta)

#     input_data = []
#     output_data = []

#     # Iterar sobre los csv
#     for archivo in os.listdir(ruta_carpeta):
#         if archivo.endswith('.csv'):
#             ruta_archivo = os.path.join(ruta_carpeta, archivo)
#             with open(ruta_archivo, 'r') as f:
#                 lector = csv.reader(f)
#                 datos_archivo = list(lector)

#                 # Dupilcar datos
#                 header = datos_archivo[0]
#                 datos_sin_encabezado = datos_archivo[1:101]

#                 for dup in range(6):
#                   datos_quieto  = get_random_quieto()
#                   datos_quieto[1:101]

#                   # Indice inicial aleatorio
#                   random_index = random.randint(0, len(datos_archivo) - 1)

#                   # Crear nueva lista
#                   datos_combinados = []
#                   # Decide si inica quieto o en un movimiento
#                   use_quieto_first = random.choice([True, False])

#                   if use_quieto_first:
#                     # Llenar con quieto y luego agregar movimiento
#                     print(datos_quieto)
#                     print(datos_sin_encabezado)
#                     datos_combinados.extend(datos_quieto[:random_index])
#                     datos_combinados.extend(datos_sin_encabezado[random_index:])
#                     print(datos_combinados)
#                   else:
#                     # Llenar con movimiento y agregar quieto
#                     datos_combinados.extend(datos_sin_encabezado[:random_index])
#                     datos_combinados.extend(datos_quieto[random_index:])


#                   # Crear archivo nuevo
#                   nuevo_nombre_archivo = archivo.replace('.csv', f'_dup{dup}.csv')
#                   ruta_nuevo_archivo = os.path.join(ruta_carpeta, nuevo_nombre_archivo)

#                   # Llenar el archivo nuevo
#                   with open(ruta_nuevo_archivo, 'w', newline='') as f_nuevo:
#                       escritor = csv.writer(f_nuevo)
#                       escritor.writerow(header)
#                       escritor.writerows(datos_combinados)

#                   print(f"Archivo duplicado guardado como: {nuevo_nombre_archivo}")

Ejecutar el código, cumple la función de un main.

In [15]:
# Prepare data
input_train, output_train, input_val, output_val, input_test, output_test = preparar_datos()

model = crear_modelo()
model.fit(input_train, output_train, epochs=100, validation_data=(input_val, output_val))
test_loss, test_acc = model.evaluate(input_test, output_test)
# print(f"Test loss: {test_loss}")
# print(f"Test accuracy: {test_acc}")


# Obtener predicciones y calcular la matriz de confusión
y_prediction = model.predict(input_test)
y_prediction = np.argmax(y_prediction, axis=1)
y_test = np.argmax(output_test, axis=1)

# Crear matriz de confusión y normalizar por columnas
conf_matrix = tf.math.confusion_matrix(y_test, y_prediction)
conf_matrix = tf.cast(conf_matrix, dtype=tf.float32)
conf_matrix_normalized = conf_matrix / tf.reduce_sum(conf_matrix, axis=0)

# Imprimir la matriz de confusión normalizada
print("Matriz de Confusión Normalizada:\n", conf_matrix_normalized)


# Convert model to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('/content/drive/MyDrive/modelo_movimiento.tflite', 'wb') as f:
    f.write(tflite_model)
print("Modelo guardado en Google Drive")
print(input_test)

Epoch 1/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - loss: 0.1911 - mae: 0.4101 - val_loss: 0.1754 - val_mae: 0.3922
Epoch 2/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.1671 - mae: 0.3776 - val_loss: 0.1489 - val_mae: 0.3550
Epoch 3/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.1466 - mae: 0.3413 - val_loss: 0.1323 - val_mae: 0.3267
Epoch 4/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - loss: 0.1332 - mae: 0.3151 - val_loss: 0.1180 - val_mae: 0.2941
Epoch 5/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.1227 - mae: 0.2943 - val_loss: 0.1097 - val_mae: 0.2661
Epoch 6/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - loss: 0.1131 - mae: 0.2741 - val_loss: 0.0945 - val_mae: 0.2504
Epoch 7/100
[1m123/123[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/

Pasar el modelo entrenado de .tflite a .h

In [16]:
!echo "const unsigned char model[] = {" > /content/drive/MyDrive/model.h
!cat /content/drive/MyDrive/modelo_movimiento.tflite | xxd -i      >> /content/drive/MyDrive/model.h
!echo "};"                              >> /content/drive/MyDrive/model.h

print("\nDone")


Done
