# Obtención de los datos

Usaremos el dataset de trashnet que es un conjunto de datos que tiene alrededor de 2500 imágenes de seis tipos diferentes de basura: vidrio, papel, cartón, plástico, metal y basura.

```consola
git clone https://github.com/garythung/trashnet
```

## Dependencias

In [None]:
import zipfile
import os
import numpy as np

## Proceso

In [None]:
# Descomprimir el zip con la data
import zipfile

# Ruta de los datos
ruta_datos: str = './trashnet/data'

# Ruta del archivo zip
ruta_zip: str = './trashnet/data/dataset-resized.zip'

# Crear una instancia de ZipFile
archivo_zip: zipfile.ZipFile = zipfile.ZipFile(ruta_zip, 'r')

# Extraer todos los archivos del archivo zip en la ruta especificada
archivo_zip.extractall(ruta_datos)

# Cerrar el archivo zip
archivo_zip.close()

In [None]:
import os
import numpy as np

# Rutas para la validación y entrenamiento
ruta_validacion: str = './data/valid'
ruta_entrenamiento: str = './data/train'

# Comprobar si los directorios de validación y entrenamiento existen, si no, se crean
if not os.path.exists(ruta_validacion):
    os.makedirs(ruta_validacion)
if not os.path.exists(ruta_entrenamiento):
    os.makedirs(ruta_entrenamiento)

# Ruta de los datos sin comprimir
ruta_datos_descomprimidos: str = './trashnet/data/dataset-resized'

# Recorrer cada archivo en la ruta de datos descomprimidos
for archivo in os.listdir(ruta_datos_descomprimidos):
    # Se crean nuevos directorios para los datos de entrenamiento y validación
    if archivo != '.DS_Store':
        if not os.path.exists(os.path.join(ruta_entrenamiento, archivo)):
            os.makedirs(os.path.join(ruta_entrenamiento, archivo))
        if not os.path.exists(os.path.join(ruta_validacion, archivo)):
            os.makedirs(os.path.join(ruta_validacion, archivo))

# Volver a recorrer cada archivo en la ruta de datos descomprimidos
for archivo in os.listdir(ruta_datos_descomprimidos):
    if archivo != '.DS_Store':
        # Extraer el 20% de las imágenes para la validación (siguiendo la "regla" del 80/20)
        for subarchivo in os.listdir(os.path.join(ruta_datos_descomprimidos, archivo)):
            if subarchivo != '.DS_Store':
                if np.random.rand(1) < 0.2:
                    os.rename(os.path.join(ruta_datos_descomprimidos, archivo, subarchivo), os.path.join(ruta_validacion, archivo, subarchivo))
                else:
                    os.rename(os.path.join(ruta_datos_descomprimidos, archivo, subarchivo), os.path.join(ruta_entrenamiento, archivo, subarchivo))


Verificamos que cada clase tenga la misma cantidad de imágenes

In [None]:
import os

# Directorio raíz
directorio_raiz = "./data/"

# Extensión de imagen (cambiar si es necesario)
extension_imagen = ".jpg"

# Subdirectorios de entrenamiento y validación
subdirectorios_entrenamiento = [os.path.join(directorio_raiz, "train", subdirectorio) for subdirectorio in os.listdir(os.path.join(directorio_raiz, "train"))]
subdirectorios_validacion = [os.path.join(directorio_raiz, "valid", subdirectorio) for subdirectorio in os.listdir(os.path.join(directorio_raiz, "valid"))]

# Contar el mínimo número de imágenes en los subdirectorios de entrenamiento y validación
minimo_imagenes_entrenamiento = min([len([archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)]) for subdirectorio in subdirectorios_entrenamiento])
minimo_imagenes_validacion = min([len([archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)]) for subdirectorio in subdirectorios_validacion])

# Eliminar imágenes adicionales en los subdirectorios de entrenamiento
for subdirectorio in subdirectorios_entrenamiento:
    archivos_imagen = [archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)]
    conteo_imagenes = len(archivos_imagen)
    diferencia = conteo_imagenes - minimo_imagenes_entrenamiento

    for i in range(diferencia):
        imagen_a_eliminar = archivos_imagen[i]
        ruta_imagen_a_eliminar = os.path.join(subdirectorio, imagen_a_eliminar)
        os.remove(ruta_imagen_a_eliminar)

    conteo_imagenes_restantes = len([archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)])
    print(f"Cantidad de imágenes restantes en {subdirectorio}: {conteo_imagenes_restantes}")

# Eliminar imágenes adicionales en los subdirectorios de validación
for subdirectorio in subdirectorios_validacion:
    archivos_imagen = [archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)]
    conteo_imagenes = len(archivos_imagen)
    diferencia = conteo_imagenes - minimo_imagenes_validacion

    for i in range(diferencia):
        imagen_a_eliminar = archivos_imagen[i]
        ruta_imagen_a_eliminar = os.path.join(subdirectorio, imagen_a_eliminar)
        os.remove(ruta_imagen_a_eliminar)

    conteo_imagenes_restantes = len([archivo for archivo in os.listdir(subdirectorio) if archivo.endswith(extension_imagen)])
    print(f"Cantidad de imágenes restantes en {subdirectorio}: {conteo_imagenes_restantes}")




# Desarrollo

### Dependencias

In [1]:
from keras.preprocessing.image import ImageDataGenerator
import tensorflow as tf
from keras import layers
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np
from keras.models import load_model
from keras import regularizers

import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

2023-05-24 01:50:11.366484: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-24 01:50:11.393224: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-05-24 01:50:11.393748: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Preprocesamiento de los datos

In [None]:
# Se definen los sets de entrenamiento y validación
train_data_generator: ImageDataGenerator = ImageDataGenerator(
    rescale=1.0/255.0,
    width_shift_range=0.3,
    height_shift_range=0.3,
    rotation_range=30,
    horizontal_flip=True,
    fill_mode='nearest',
    shear_range=0.3,
    zoom_range=0.4
)
    
val_data_generator: ImageDataGenerator = ImageDataGenerator(rescale=1./255)

# Crea los generadores de data para entrenamiento y validación 
train_generator = train_data_generator.flow_from_directory(
    train_path,
    target_size=(150, 150),
    batch_size=160,
    class_mode='categorical',
    shuffle=True  # Se mezcla la data de entrenamiento
)

val_generator = val_data_generator.flow_from_directory(
    validation_path,
    target_size=(150, 150),
    batch_size=160,
    class_mode='categorical',
    shuffle=False # No se mezcla la data de validación 
)

Mostramos algunas de las imagenes post-procesamiento

In [None]:
# Obtén un lote de imágenes y etiquetas del generador de entrenamiento
images, labels = next(train_generator)

# Define los nombres de las clases en el mismo orden que se utilizó al crear el generador
class_names = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']

# Muestra las primeras 20 imágenes
for i in range(20):
    plt.subplot(4, 5, i+1)
    plt.imshow(images[i])
    plt.title(class_names[labels[i].argmax()])
    plt.axis('off')

plt.tight_layout()
plt.show()

## Creación del modelo	

Utilizamos el modelo base de ResNet101V2  y le hacemos un fine-tuning para que se adapte a nuestro problema.

In [None]:
# Create a base model using a pre-trained ResNet50V2 model
# Create a base model using a pre-trained ResNet101V2 model
base_model = tf.keras.applications.ResNet101V2(
    input_shape=(150, 150, 3),
    include_top=False,
    weights='imagenet'
)

# Freeze the pre-trained layers
for layer in base_model.layers:
    layer.trainable = False

# Add new layers on top of the base model
x = base_model.output
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.Dropout(0.5)(x)
# add another layer
x = layers.Dense(128, activation='relu', kernel_regularizer=regularizers.l2(0.01))(x)
x = layers.Dropout(0.5)(x)
predictions = layers.Dense(6, activation='softmax')(x)

# Define the model
model = tf.keras.models.Model(inputs=base_model.input, outputs=predictions)

# Compile the model with a lower learning rate
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Print the model summary
model.summary()

### Pre-ajuste

In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

# Pass the callback to the fit method
pre_fine_tuning_history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=100,
    callbacks=[early_stop]
)

### Fine-tuning

In [None]:
# Unfreeze the last few layers of the base model
for layer in base_model.layers[-20:]:  # Increase the number of layers to fine-tune
    if not isinstance(layer, layers.BatchNormalization):  # Don't apply regularization to BatchNormalization layers
        layer.kernel_regularizer = regularizers.l2(0.01)  # Add L2 regularization
    layer.trainable = True

# Recompile the model (necessary after changing layer trainability)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.00001),  # Experiment with the learning rate
    loss='categorical_crossentropy',
    metrics=['accuracy']
)


In [None]:
# Define the early stopping callback
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)

# Continue training the model with fine-tuning
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=100,
    callbacks=[early_stop]
)

## Post-entrenamiento

In [2]:
model_path = './model/ResNet101V2_TrashClassifierV1.h5'

In [None]:
# Guardamos el resultado del modelo
if not os.path.exists('model'):
    os.makedirs('model')
model.save(model_path)

In [3]:
# Carga el modelo desde el archivo
model = load_model(model_path)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 150, 150, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 156, 156, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 75, 75, 64)   9472        ['conv1_pad[0][0]']              
                                                                                                  
 pool1_pad (ZeroPadding2D)      (None, 77, 77, 64)   0           ['conv1_conv[0][0]']         

In [None]:
accuracy = history.history['accuracy']
val_accuracy = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(len(accuracy))

plt.plot(epochs, accuracy, 'r', label='Precision de entrenamiento')
plt.plot(epochs, val_accuracy, 'b', label='Precision de validacion')
plt.title('Valor de la precision de entrenamiento y validacion')
plt.legend(loc=0)
plt.figure()
plt.show()


In [None]:
# plot loss
plt.plot(epochs, loss, 'r', label='Perdida de entrenamiento')
plt.plot(epochs, val_loss, 'b', label='Perdida de validacion')
plt.title('Perdida de entrenamiento y validacion')
plt.legend(loc=0)
plt.figure()
plt.show()

In [None]:
# get the labels of the test images.
test_labels = val_generator.classes

# make a prediction
predictions = model.predict(val_generator)

# Get most likely class
predicted_classes = np.argmax(predictions, axis=1)

errors = np.where(predicted_classes != test_labels)[0]
print("Número de errores = {}/{}".format(len(errors),val_generator.samples))
accuracy = (val_generator.samples-len(errors))/val_generator.samples
print("Precisión = ", accuracy*100, "%")

cm = confusion_matrix(test_labels, predicted_classes)
cm_plot_labels = ['cartón','vidrio','metal','papel','plastico','basura (miscelaneo)']

plt.figure(figsize=(10,10))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=cm_plot_labels, yticklabels=cm_plot_labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

In [None]:
load_img = tf.keras.preprocessing.image.load_img

# show some misclassified examples
for iterator in range(len(errors)):
    ax, fig = plt.subplots(1,1)
    pred_class = np.argmax(predictions[errors[iterator]])
    pred_label = cm_plot_labels[pred_class]

    title = 'Original label:{}, Prediction :{}, confidence : {:.3f}'.format(
        cm_plot_labels[test_labels[errors[iterator]]], pred_label, predictions[errors[iterator]][pred_class])

    original = load_img('{}/{}'.format(validation_path, val_generator.filenames[errors[iterator]]))
    plt.imshow(original)
    plt.title(title)
    plt.show()
    if iterator == 9:
        break

In [None]:
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize
from sklearn.multiclass import OneVsRestClassifier
import matplotlib.pyplot as plt
from scipy import interp
from itertools import cycle

# Binarize the output
y_true_bin = label_binarize(y_true, classes=[0, 1, 2, ..., num_classes-1])

# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(num_classes):
    fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_true_bin.ravel(), y_pred.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

# Plot all ROC curves
plt.figure()
plt.plot(fpr["micro"], tpr["micro"],
         label='micro-average ROC curve (area = {0:0.2f})'
               ''.format(roc_auc["micro"]),
         color='deeppink', linestyle=':', linewidth=4)

colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
for i, color in zip(range(num_classes), colors):
    plt.plot(fpr[i], tpr[i], color=color, lw=lw,
             label='ROC curve of class {0} (area = {1:0.2f})'
             ''.format(i, roc_auc[i]))

plt.plot([0, 1], [0, 1], 'k--', lw=lw)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Some extension of Receiver operating characteristic to multi-class')
plt.legend(loc="lower right")
plt.show()


## Pruebas

In [7]:
from keras.utils import load_img, img_to_array
import numpy
import io

# classes = {'cardboard': 0, 'glass': 1, 'metal': 2, 'paper': 3, 'plastic': 4, 'trash': 5}
classes = {0: 'cardboard', 1: 'glass', 2: 'metal', 3: 'paper', 4: 'plastic', 5: 'trash'}

def predict_image(model, image, img_size=(150, 150)):
    image = load_img(image, target_size=img_size)
    input_arr = img_to_array(image)
    input_arr = numpy.array([input_arr])  # Convert single image to a batch.
    
    # Get the model's predictions for this batch of images
    predictions = model.predict(input_arr)

    # Get the index of the highest score in the predictions array
    predicted_class = numpy.argmax(predictions[0])
    print(predictions, predicted_class)
    
    return classes[predicted_class], max(predictions[0])

In [8]:
# Example usage:
img_path = 'data/prueba_1.jpeg'  
predicted_class = predict_image(model, img_path)
predicted_class

[[0.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00 4.3156413e-15
  1.0000000e+00]] 5


('trash', 1.0)