In [None]:
import tensorflow as tf
import os
import time
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        print(e)

In [None]:
width_size = 512
height_size = 512

In [None]:
def define_custom_model(classes=4):
    model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(16, (3,3), activation='relu', input_shape=(width_size, height_size, 3)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Conv2D(32, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Conv2D(64, (3,3), activation='relu'),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(classes, activation='softmax')
    ])
    
    return model

In [None]:
def vgg16(classes=4):
    
    base_model = VGG16(weights="imagenet", include_top=False, input_shape=(width_size, height_size, 3))
    base_model.trainable = False ## Not trainable weights

    flatten_layer = layers.Flatten()
    dense_layer_1 = layers.Dense(50, activation='relu')
    dense_layer_2 = layers.Dense(20, activation='relu')
    prediction_layer = layers.Dense(classes, activation='softmax')


    model = models.Sequential([
        base_model,
        flatten_layer,
        #dense_layer_1,
        #dense_layer_2,
        prediction_layer
    ])
    
    return model
    

In [None]:
def convolutional_block(x, filter):
    x_skip = x
   
    x = tf.keras.layers.Conv2D(filter, (3,3), padding = 'same', strides = (2,2))(x)
    x = tf.keras.layers.BatchNormalization(axis=3)(x)
    x = tf.keras.layers.Activation('relu')(x)
    
    x = tf.keras.layers.Conv2D(filter, (3,3), padding = 'same')(x)
    x = tf.keras.layers.BatchNormalization(axis=3)(x)
    
    x_skip = tf.keras.layers.Conv2D(filter, (1,1), strides = (2,2))(x_skip)
    
    x = tf.keras.layers.Add()([x, x_skip])     
    x = tf.keras.layers.Activation('relu')(x)
    return x

def identity_block(x, filter):

    x_skip = x

    x = tf.keras.layers.Conv2D(filter, (3,3), padding = 'same')(x)
    x = tf.keras.layers.BatchNormalization(axis=3)(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Conv2D(filter, (3,3), padding = 'same')(x)
    x = tf.keras.layers.BatchNormalization(axis=3)(x)

    x = tf.keras.layers.Add()([x, x_skip])     
    x = tf.keras.layers.Activation('relu')(x)
    return x

In [None]:
def ResNet34(classes = 4):

    x_input = tf.keras.layers.Input((width_size, height_size, 3))
    x = tf.keras.layers.ZeroPadding2D((3, 3))(x_input)

    x = tf.keras.layers.Conv2D(64, kernel_size=7, strides=2, padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)
    x = tf.keras.layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    block_layers = [3, 4, 6, 3]
    filter_size = 64

    for i in range(4):
        if i == 0:

            for j in range(block_layers[i]):
                x = identity_block(x, filter_size)
        else:

            filter_size = filter_size*2
            x = convolutional_block(x, filter_size)
            for j in range(block_layers[i] - 1):
                x = identity_block(x, filter_size)
                
    x = tf.keras.layers.AveragePooling2D((2,2), padding = 'same')(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(512, activation = 'relu')(x)
    x = tf.keras.layers.Dense(classes, activation = 'softmax')(x)
    model = tf.keras.models.Model(inputs = x_input, outputs = x, name = "ResNet34")
    return model

In [None]:
dataset_dirty_insulator = '../dataset_dirty_insulator/train/'
test_size = 0.20
batch_size = 4
nr_epochs = 50
num_of_imgs_train = sum(len(files) for _, _, files in os.walk(dataset_dirty_insulator))
num_of_imgs_validation = int(num_of_imgs_train * test_size)
steps_p_epoch = int(num_of_imgs_train / batch_size)
val_steps = int((num_of_imgs_validation) / batch_size)

In [None]:
print(num_of_imgs_train, steps_p_epoch, val_steps)

In [None]:
model_vgg16 = vgg16()
model_resnet = ResNet34()
model_custom = define_custom_model()

all_models = [model_vgg16, model_resnet, model_custom]
all_names_models = ["model_vgg16"]
#all_names_models = ["model_vgg16", "model_resnet", "model_custom"]

In [None]:
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
        validation_split=test_size,
    )
trainGene = train_datagen.flow_from_directory(
    dataset_dirty_insulator,
    batch_size=batch_size,
    target_size=(width_size, height_size),
    class_mode='categorical'
)

evaluate = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)

validation_dataset=evaluate.flow_from_directory('../dataset_dirty_insulator/test/', 
                                         target_size = (width_size, height_size), batch_size = batch_size,
                                         class_mode = 'categorical')

In [None]:
for index in range(len(all_names_models)):
    since = time.time()
 
    model = all_models[index]
    opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.001)
    
    model.compile(optimizer=opt, 
                  loss=tf.keras.losses.categorical_crossentropy, 
                  metrics=['accuracy']),
    
    csv_logger = CSVLogger(f"{all_names_models[index]}.csv", append=True)

    model_checkpoint = ModelCheckpoint(f'{all_names_models[index]}.hdf5',
                                       monitor='loss',
                                       verbose=1,
                                       save_best_only=True)
    
    history = model.fit(trainGene, 
                    validation_steps = val_steps,
                    steps_per_epoch = steps_p_epoch,
                    epochs=nr_epochs,
                    callbacks=[model_checkpoint, csv_logger])
    
    time_elapsed = time.time() - since
    
    print(
        f"\n Total time: {round(time_elapsed // 60, 2)} minutes and {round(time_elapsed % 60, 2)} seconds"
    )


In [None]:
classes = sorted(os.listdir('../dataset_dirty_insulator/test/'))
print(classes)

In [None]:
model = tf.keras.models.load_model("../../neural_network/experimentos/classificator/dataset_inteiro/256/model_vgg16.hdf5")

dict_models = {"0": "soot",
               "1": 'excrement', 
               "2": 'clean', 
               "3": 'salt',
               }

dirty_translate = {"cinzas_vulcanicas": "soot",
               "dejetos": 'excrement', 
               "limpo": 'clean', 
               "sal": 'salt',
               }

results = []

path_dirt = '../dataset_dirty_insulator/test/'

for dirty in os.listdir(path_dirt):
    for idx, file in enumerate(os.listdir(f"{path_dirt}{dirty}")):
        file_name = f"{path_dirt}{dirty}/{file}"

        img = cv2.imread(file_name)
            
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (256, 256))

        x = np.expand_dims(img, axis=0)
        images = np.vstack([x])

        pred = list(model.predict(images, batch_size=batch_size, verbose = 0)[0])
        pred = dict_models[f'{pred.index(max(pred))}']
         
        results.append({
            "name_file": file_name,
            "category_pred": pred
        })


        plt.imshow(img)
        plt.title(f"Category: {dirty_translate[dirty]}\nPrediction: {pred}".upper())
        plt.axis('off')
        plt.savefig(f'./results/256/{dirty}/{file}')
        plt.show()

Evaluate the model

In [None]:
dict_models = {"soot": "0",
               "excrement": "1",
               "clean": "2",
               "salt": "3",
               }


In [None]:
classes = ['soot', 'excrement', 'clean', 'salt']

In [None]:
all_weights_models = ["../../neural_network/experimentos/classificator/dataset_inteiro/256/model_vgg16.hdf5"]

for index_models in range(len(all_weights_models)):
    
    model = tf.keras.models.load_model(all_weights_models[index_models]) 
    pred_arrays = []

    for index in range(len(validation_dataset.filepaths)):
        #print(validation_dataset.filepaths[index])

        img = cv2.imread(validation_dataset.filepaths[index])

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (256, 256))

        x = np.expand_dims(img, axis=0)
        images = np.vstack([x])

        pred = list(model.predict(images, batch_size=batch_size, verbose = 0)[0])
        pred_arrays.append(pred.index(max(pred)))
        #print(pred)

    print(all_weights_models[index_models])    
    ConfusionMatrixDisplay(confusion_matrix=confusion_matrix(pred_arrays, validation_dataset.classes), display_labels=dict_models).plot()
    plt.show()

In [None]:
def plot_confusion_matrix(y_true: np.array, y_pred: np.array, classes: list, title: str ="Confusion Matrix", cmap: plt = plt.cm.Purples, save_as: str = "fig.png") -> plt.subplot:
        """Plot a consfusion matrix with all classes, considering the y_true and y_pred of the models 
        Args:
            y_true (np.array): True answers about the results
            y_pred (np.array): Prediction of the model
            classes (list): List with all the classes names
            title (str, optional): A text to be the title of plot. Defaults to "Confusion Matrix".
            cmap (plt, optional): Type of colors to fill the table. Defaults to plt.cm.Purples.
            save_as (str, optional): The name so save  the plots. Defaults to "fig.png".
        Returns:
            plt.subp"lot: Return a a subplot of confusion matrix, 
        """
        cm = confusion_matrix(y_true, y_pred)
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
        fig, ax = plt.subplots()
        im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
        ax.figure.colorbar(im, ax=ax)
        ax.set(xticks=np.arange(cm.shape[1]),
            yticks=np.arange(cm.shape[0]),
            xticklabels=classes, yticklabels=classes,
            title=title,
            ylabel='True',
            xlabel='Predicted')

        plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                rotation_mode="anchor")

        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                ax.text(j, i, format(cm[i, j], '.2f'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")
        fig.tight_layout()
        fig.savefig(save_as)

        return ax


In [None]:
from sklearn.metrics import multilabel_confusion_matrix

pred_models = []

evaluate = tf.keras.preprocessing.image.ImageDataGenerator(rescale = 1/255)

validation_dataset=evaluate.flow_from_directory('../dataset_dirty_insulator/test/', 
                                         target_size = (width_size, height_size), batch_size = batch_size,
                                         class_mode = 'categorical')

all_weights_models = ["../../neural_network/experimentos/classificator/dataset_inteiro/256/model_vgg16.hdf5"]

for index_models in range(len(all_weights_models)):
    
    model = tf.keras.models.load_model(all_weights_models[index_models]) 
    pred_arrays = []

    for index in range(len(validation_dataset.filepaths)):
        #print(validation_dataset.filepaths[index])

        img = cv2.imread(validation_dataset.filepaths[index])

        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (256, 256))

        x = np.expand_dims(img, axis=0)
        images = np.vstack([x])

        pred_models.append(list(model.predict(images, batch_size=batch_size, verbose = 0)[0]))

        pred = list(model.predict(images, batch_size=batch_size, verbose = 0)[0])
        pred_arrays.append(pred.index(max(pred)))
        

    
    values = multilabel_confusion_matrix(pred_arrays, validation_dataset.classes)
    print(accuracy_score(pred_arrays, validation_dataset.classes))
    for index in range(len(values)):
        tp, fn = values[index][0]
        fp, tn = values[index][1]
        
    #ConfusionMatrixDisplay(confusion_matrix=confusion_matrix(pred_arrays, validation_dataset.classes), display_labels=validation_dataset.class_indices).plot()
    plot_confusion_matrix(pred_arrays, validation_dataset.classes, classes)
    plt.show()