In [None]:
import os
os.environ['PYTHONHASHSEED']=str(1)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import cv2
import keras
import random
from sklearn.metrics import classification_report
import time
#os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"   
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # para utilizar la CPU

In [None]:
def reset_random_seeds():
   os.environ['PYTHONHASHSEED']=str(1)
   tf.random.set_seed(1)
   np.random.seed(1)
   random.seed(1)
reset_random_seeds()

In [None]:
ruta = "C:/Users/Emilio/TESINA"

In [None]:
directorio_dataset = "C:/Users/Emilio/TESINA/DATASET_RASPBERRY"
dir_entrenamiento = os.path.join(directorio_dataset, 'entrenamiento')
dir_validacion = os.path.join(directorio_dataset, 'validacion')
dir_prueba = os.path.join(directorio_dataset, 'prueba')

TAMANIO_BATCH = 32
TAMANIO_IMG = (165,165)
INPUT_SHAPE = (165,165, 3)

initial_epochs=3
fine_tune_epochs=3

dataset_entrenamiento = tf.keras.utils.image_dataset_from_directory(dir_entrenamiento,
                                                                    labels='inferred',
                                                                    label_mode='binary',
                                                                    shuffle=True,
                                                                    batch_size=TAMANIO_BATCH,
                                                                    image_size=TAMANIO_IMG)

In [None]:
dataset_validacion = tf.keras.utils.image_dataset_from_directory(dir_validacion,
                                                                 labels='inferred',
                                                                 label_mode='binary',
                                                                 shuffle=True,
                                                                 batch_size=TAMANIO_BATCH,
                                                                 image_size=TAMANIO_IMG)

In [None]:
dataset_prueba = tf.keras.utils.image_dataset_from_directory(dir_prueba,
                                                             labels='inferred',
                                                             label_mode='binary',
                                                             shuffle=True,
                                                             batch_size=TAMANIO_BATCH,
                                                             image_size=TAMANIO_IMG,
                                                            )

In [None]:
class_names = dataset_entrenamiento.class_names
plt.figure(figsize=(10, 10))
for images, labels in dataset_entrenamiento.take(1):
  print(labels.shape)
  labels
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[labels[i].numpy()[0].astype('uint8')])
    plt.axis("off")

In [None]:
print('Lotes de validación: %d' % tf.data.experimental.cardinality(dataset_validacion))
print('Lotes de prueba: %d' % tf.data.experimental.cardinality(dataset_prueba))

In [None]:
#Config del dataset para mejorar el rendimiento
AUTOTUNE = tf.data.AUTOTUNE

dataset_entrenamiento = dataset_entrenamiento.prefetch(buffer_size=AUTOTUNE)
dataset_validacion = dataset_validacion.prefetch(buffer_size=AUTOTUNE)
dataset_prueba = dataset_prueba.prefetch(buffer_size=AUTOTUNE)

In [None]:
#Aumentación del conjunto de entrenamiento, aplicando distintos tipos de transformaciones para generar nuevas muestras del mismo.
data_augmentation = tf.keras.Sequential([
  tf.keras.layers.RandomFlip('horizontal'),
  tf.keras.layers.RandomRotation(0.2),
])

In [None]:
for image, _ in dataset_entrenamiento.take(1):
  plt.figure(figsize=(10, 10))
  first_image = image[0]
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    augmented_image = data_augmentation(tf.expand_dims(first_image, 0))
    plt.imshow(augmented_image[0] / 255)
    plt.axis('off')

In [None]:
# Reescalado de pixeles a un rango entre -1 y 1, que son los valores de entrada aceptados por el modelo MobileNetV2
preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

In [None]:
# Creación del modelo base a partir de MobileNet V2
SHAPE_IMG = TAMANIO_IMG + (3,)
base_model = tf.keras.applications.MobileNetV2(input_shape=SHAPE_IMG,
                                               include_top=False, #evita que las capas superiores de clasificación sean incluídas --> mejor para la extracción de características
                                               weights='imagenet')

In [None]:
image_batch, label_batch = next(iter(dataset_entrenamiento))
feature_batch = base_model(image_batch)
print(feature_batch.shape)

In [None]:
#Congelar la base convolucional, para evitar que los pesos de una capa se actualicen en el entrenamiento --> importante en Transfer Learning
base_model.trainable = False

In [None]:
#Convierte las características a un solo vector por imagen de 1280 elementos
global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
feature_batch_average = global_average_layer(feature_batch)
print(feature_batch_average.shape)

In [None]:
#Se aplica una capa Dense para convertir las características en una sola predicción por imagen
prediction_layer = tf.keras.layers.Dense(1,'sigmoid')
prediction_batch = prediction_layer(feature_batch_average)
print(prediction_batch.shape)

In [None]:
inputs = tf.keras.Input(INPUT_SHAPE)
x = data_augmentation(inputs)
x = preprocess_input(x)
x = base_model(x, training=False)
x = global_average_layer(x)
x = tf.keras.layers.Dropout(0.2)(x)
outputs = prediction_layer(x)
model = tf.keras.Model(inputs, outputs)

In [None]:
#Compilado del modelo
base_learning_rate = 0.0001
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              metrics=[tf.keras.metrics.BinaryAccuracy()])

In [None]:
model.summary()

In [None]:
len(model.trainable_variables)

In [None]:
loss0, accuracy0 = model.evaluate(dataset_validacion)

In [None]:
print("Pérdida inicial: {:.2f}".format(loss0))
print("Exactitud inicial: {:.2f}".format(accuracy0))

In [None]:
#Entrenamiento del modelo
callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=2)
history = model.fit(dataset_entrenamiento,
                    epochs=initial_epochs,
                    validation_data=dataset_validacion)

In [None]:
loss, accuracy = model.evaluate(dataset_prueba)
print('Pérdida: ', "{:.2f}".format(loss))
print('Exactitud :', "{:.2f}%".format(accuracy*100))

In [None]:
acc = history.history['binary_accuracy']
val_acc = history.history['val_binary_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Exactitud del entrenamiento')
plt.plot(val_acc, label='Exactitud de la validación')
plt.legend(loc='lower right')
plt.ylabel('Exactitud')
plt.ylim([min(plt.ylim()),1])
plt.title('Exactitud del entrenamiento y validación')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Pérdida del entrenamiento')
plt.plot(val_loss, label='Pérdida de la valicación')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Pérdida del entrenamiento y validación')
plt.xlabel('Época')
plt.show()

**OPTIMIZACIÓN (FINE-TUNING)**
Se entrenan los pesos de las capas superiores del modelo pre-entrenado junto con con el entrenamiento del clasificador agregado previamente. De esta forma, los pesos son ajustados de un mapa de características genérico a características asociadas al dataset. 
La idea es adaptar las características para funcionar con el nuevo dataset, sin sobreescribir el aprendizaje genérico propio del modelo pre-entrenado

In [None]:
##Descongelar modelo 
base_model.trainable = True

In [None]:
# Ajuste fino a partir de la capa 100
fine_tune_at = 100

# Se deben congelar todas las capas antes de la capa de ajuste fino
for layer in base_model.layers[:fine_tune_at]:
  layer.trainable = False

In [None]:
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              optimizer = tf.keras.optimizers.RMSprop(learning_rate=base_learning_rate/10),
              metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

In [None]:
model.summary()

In [None]:
len(model.trainable_variables)

In [None]:
total_epochs =  initial_epochs + fine_tune_epochs
tiempoInicio = time.time()

callback = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=3)

history_fine = model.fit(dataset_entrenamiento,
                         epochs=total_epochs,
#                          callbacks=[callback],
                         initial_epoch=history.epoch[-1],
                         validation_data=dataset_validacion)

tiempoTranscurrido = (time.time() - tiempoInicio)
tiempoTranscurrido = round(tiempoTranscurrido,1)
print ('\nTiempo de ejecución:', int(tiempoTranscurrido/60), 'minutos')

In [None]:
acc += history_fine.history['binary_accuracy']
val_acc += history_fine.history['val_binary_accuracy']

loss += history_fine.history['loss']
val_loss += history_fine.history['val_loss']

In [None]:
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Exactitud del entrenamiento')
plt.plot(val_acc, label='Exactitud de la validación')
plt.ylim([0.8, 1])
plt.plot([initial_epochs-1,initial_epochs-1],
          plt.ylim(), label='Comienzo del ajuste fino')
plt.legend(loc='lower right')
plt.title('Exactitud del entrenamiento y validación')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Pérdida del entrenamiento')
plt.plot(val_loss, label='Pérdida de la validación')
plt.ylim([0, 1.0])
plt.plot([initial_epochs-1,initial_epochs-1],
         plt.ylim(), label='Comienzo del ajuste fino')
plt.legend(loc='upper right')
plt.title('Pérdida del entrenamiento y validación')
plt.xlabel('Época')
plt.show()

In [None]:
loss, accuracy, precision, recall= model.evaluate(dataset_prueba)
print('Pérdida: ', "{:.2f}".format(loss))
print('Exactitud :', "{:.2f}%".format(accuracy*100))
print('Recall:', "{:.2f}%".format(recall*100))
print('Precisión:', "{:.2f}%".format(precision*100))

In [None]:
# Obtiene un lote del conjunto de prueba para su predicción
image_batch, label_batch = dataset_prueba.as_numpy_iterator().next()
predictions = model.predict_on_batch(image_batch).flatten()
predictions = tf.where(predictions < 0.5, 0, 1)
predictions = predictions.numpy()
print('Predicciones:\n', predictions)
print('Etiquetas:\n', label_batch.flatten().astype(np.int32))
print(classification_report(label_batch, predictions))

plt.figure(figsize=(10, 10))
for i in range(9):
  ax = plt.subplot(3, 3, i + 1)
  plt.imshow(image_batch[i].astype("uint8"))
  plt.title(class_names[predictions[i]])
  plt.axis("off")

In [None]:
# model =  tf.keras.models.load_model(ruta +'Somnolencia_MobileNetV2/model.h5')

In [None]:
#Generar matriz de confusión
import seaborn as sns
from sklearn.metrics import confusion_matrix

matriz = tf.math.confusion_matrix(
              label_batch,
              predictions,
              num_classes=None,
              weights=None,
              dtype=tf.dtypes.int32,
              name=None
          )
cf_matrix = confusion_matrix(label_batch, predictions)
sns.heatmap(cf_matrix, annot=True)

In [None]:
#model = model.save(ruta +"/Somnolencia_MobileNetV2/model.h5")

In [None]:
# Conversión del modelo a TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# Guardar el modelo
with open(ruta +'/Somnolencia_MobileNetV2/model.tflite', 'wb') as f:
  f.write(tflite_model)