# Trabalho prático - Visão por Computador e Processamento de Imagem

#### Autores: Cláudio Moreira (PG47844), Filipe Fernandes(A83996)

Para o presente trabalho foi proposta a exploração de modelos de *Deep Learning*. Numa primeira parte, é suposto treinar modelos aplicando *data augmentation*, tanto em pré-processamento como dinâmico.

Neste *notebook* abordou-se os processos de alteração de imagens de forma massiva. Para isso utilizou-se as funções de processamento descritas posteriormente de modo a alterar o brilho, contraste, saturação, hue, rotação, rotação, translação e redimensionar as imagens. Começou-se por importar os seguintes modulos:

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import metrics
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping,TensorBoard, ReduceLROnPlateau
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization, LeakyReLU 
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

import os
import numpy as np

# plotting
import matplotlib.pyplot as plt
import pathlib
from PIL import Image
import IPython.display as display

# to display confusion matrix
import seaborn as sn
import pandas as pd

In [None]:
#GPU memory growth
physical_devices = tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs Avasilable: ", len(physical_devices))
tf.config.experimental.set_memory_growth(physical_devices[0], True)

# Massive data augmentation

### Funções auxiliares

In [None]:
def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  return parts[-2] == classNames

def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_png(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return tf.image.resize(img, [WIDTH, HEIGHT])


def get_bytes_and_label(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

def show_batch(image_batch, label_batch, epoch):
  columns = 5
  rows = BATCH_SIZE / columns + 1  
  epochImages = plt.figure(figsize=(15, 3 * rows))
  epochImages.suptitle('epoch {}'.format(epoch))
  for n in range(BATCH_SIZE):
      ax = plt.subplot(int(rows), columns, n+1)
      plt.imshow((image_batch[n]))
      plt.title(classNames[label_batch[n]==1][0])
      plt.axis('off')

def show_batchSimple(image_batch, label_batch):
  columns = 6
  rows = BATCH_SIZE / columns + 1  
  plt.figure(figsize=(10, 2 * rows))
  for n in range(BATCH_SIZE):
      ax = plt.subplot(int(rows), columns, n+1)
      plt.imshow((image_batch[n]))
      plt.title(classNames[label_batch[n]==1][0])
      plt.axis('off')

In [None]:
def prepare_callbacks(file_path):

    checkpointer = ModelCheckpoint(filepath= file_path, 
                               monitor = 'val_accuracy',
                               verbose=1, 
                               save_weights_only=True,
                               save_best_only=True)


    earlyStopper = EarlyStopping(monitor='val_loss', min_delta = 0.0001, patience = 15, verbose = 1)

    reduceLR = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.000000001, verbose = 1)

    return [checkpointer, earlyStopper, reduceLR]

In [None]:
def show_history(history):
    print(history.history.keys())

    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='lower right')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper right')
    plt.show()    
    
    
def show_accuracies(): 
    fig, ax = plt.subplots()
    X = np.arange(2)

    models = ['bad val set', 'good val set']
    plt.bar(X, [evalV1[1], evalV2[1]], width = 0.4, color = 'b', label='test')
    plt.bar(X + 0.4, [valV1[1], valV2[1]], color = 'r', width = 0.4, label = "val")
    plt.xticks(X + 0.4 / 2, models)
    plt.ylim(top = 1.0, bottom = 0.80)
    plt.legend(loc='upper left')
    plt.show()

    
def show_misclassified(predictions, ground_truth, images, num_rows= 5, num_cols=3):
    
    # Plot the first X test images with wrong predictions.
    num_images = num_rows*num_cols
    plt.figure(figsize=(2*2*num_cols, 2*num_rows))
    i = 0
    k = 0
    while k < len(images) and i < num_images:
        predicted_label = np.argmax(predictions[k])
        gt = np.where(ground_truth[k])[0][0]
        if predicted_label != gt:
            plt.subplot(num_rows, 2*num_cols, 2*i+1)
            plot_image(k, predictions[k], gt, images)
            plt.subplot(num_rows, 2*num_cols, 2*i+2)
            plot_value_array(k, predictions[k], ground_truth)
            i += 1
        k += 1
    plt.tight_layout()
    plt.show()


def plot_image(i, predictions_array, true_label, img):
  predictions_array, true_label, img = predictions_array, true_label, img[i]
  plt.grid(False)
  plt.xticks([])
  plt.yticks([])

  plt.imshow(img, cmap=plt.cm.binary)

  predicted_label = np.argmax(predictions_array)
  if predicted_label == true_label:
    color = 'blue'
  else:
    color = 'red'

  plt.xlabel("{} {:2.0f}% ({})".format(classNames[predicted_label],
                                100*np.max(predictions_array),
                                classNames[true_label]),
                                color=color)

def plot_value_array(i, predictions_array, true_label):
  predictions_array, true_label = predictions_array, true_label[i]
  plt.grid(False)
  plt.xticks(range(8))
  plt.yticks([])
  thisplot = plt.bar(range(8), predictions_array, color="#777777")
  plt.ylim([0, 1])
  predicted_label = np.argmax(predictions_array)

  thisplot[predicted_label].set_color('red')
  thisplot[np.where(true_label)[0][0]].set_color('blue')    

def plot_predictions(predictions, ground_truth, images, num_rows= 5, num_cols=3 ):

    num_images = num_rows*num_cols
    plt.figure(figsize=(2*2*num_cols, 2*num_rows))
    for i in range(min(num_images,len(images))):
        gt = np.where(ground_truth[i])[0][0]
        plt.subplot(num_rows, 2*num_cols, 2*i+1)
        plot_image(i, predictions[i], gt, images)
        plt.subplot(num_rows, 2*num_cols, 2*i+2)
        plot_value_array(i, predictions[i], ground_truth)
    plt.tight_layout()
    plt.show()

In [None]:
pathTrain = 'GTSRB_TP/train_images'
pathTest = 'GTSRB_TP/test_images'

In [None]:
BATCH_SIZE = 64
HEIGHT = 32
WIDTH = 32
#Classes names
data_dir = pathlib.Path(pathTrain)
classNames = np.array(os.listdir(data_dir))
print(classNames)

### Preparação do dataset

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

listset = tf.data.Dataset.list_files(pathTrain+"/*/*.png")
dataset = listset.map(get_bytes_and_label, num_parallel_calls = AUTOTUNE)

In [None]:
t = next(iter(dataset))
print(t[0].shape, t[1].shape)
#print(dataset)
# note: this only works if dataset is not repeating
train_Data_length = tf.data.experimental.cardinality(dataset).numpy()
print("Total images in dataset: ", train_Data_length)  

##### Aplicação da aleatoriedade no dataset

In [None]:
dataset = dataset.shuffle(buffer_size = train_Data_length)

In [None]:

#não está funcionar porque são faço batches no dataset
# image_batch, label_batch = next(iter(dataset))        
# show_batchSimple(image_batch, label_batch.numpy())

### Dataset de teste

In [None]:
listset = tf.data.Dataset.list_files(pathTest+"/*/*.png")
test_Data = listset.map(get_bytes_and_label, num_parallel_calls = AUTOTUNE)

In [None]:
test_Data_length = tf.data.experimental.cardinality(test_Data).numpy()
print("Total images in dataset: ", test_Data_length)

In [None]:
test_Data = test_Data.batch(batch_size = BATCH_SIZE)

### Processamento da Imagem

In [None]:
import tensorflow_addons as tfa

def process_brightness(image, label):
    
    img = tf.clip_by_value(tfa.image.random_hsv_in_yiq(image, 0.0, 1.0, 1.0, 0.1, 3.0),0,1)
    return img, label

def process_saturation(image, label):
    
    img = tf.clip_by_value(tfa.image.random_hsv_in_yiq(image, 0.0, 1.0, 3.0, 1.0, 1.0),0,1)
    return img, label

def process_contrast(image, label):
    
    img = tf.clip_by_value(tf.image.random_contrast(image, lower=0.1, upper=3.0, seed=None), 0, 1)
    return img, label

def process_hue(image, label):
    
    img = tf.image.random_hue(image, max_delta=0.2, seed=None)
    return img, label

def process_rotate(image, label):
    
    img = tfa.image.rotate(image, tf.random.uniform(shape=(), minval=-0.175, maxval=0.175))
    return img, label

def process_shear(image, label):
    
    img = tfa.image.rotate(image, tf.random.uniform(shape=(), minval=-0.175, maxval=0.175))
    sx = tf.random.uniform(shape=(), minval=-0.1, maxval=0.1, dtype=tf.dtypes.float32)
    img = tfa.image.transform(img, [1, sx, -sx*32,   0,1,0,  0,0])
    return img, label

def process_translate(image, label):

    img = tfa.image.rotate(image, tf.random.uniform(shape=(), minval=-0.175, maxval=0.175))
    tx = tf.random.uniform(shape=(), minval=-3, maxval=3, dtype=tf.dtypes.float32)
    ty = tf.random.uniform(shape=(), minval=-3, maxval=3, dtype=tf.dtypes.float32)  
    img = tfa.image.translate(img, [tx,ty])
    return img, label

def process_crop(image, label):
    
    c = tf.random.uniform(shape=(), minval=24, maxval=32, dtype=tf.dtypes.float32)
    img = tf.image.random_crop(image, size=[c,c,3])
    img = tf.image.resize(img ,size= [32,32])
    return img, label

In [None]:
train_dataset = dataset
# color ops
train_dataset = train_dataset.map(process_brightness)
train_dataset = train_dataset.concatenate(dataset.map(process_contrast))
train_dataset = train_dataset.concatenate(dataset.map(process_hue))
train_dataset = train_dataset.concatenate(dataset.map(process_saturation))

#geometry ops
train_dataset = train_dataset.concatenate(dataset.map(process_rotate))
train_dataset = train_dataset.concatenate(dataset.map(process_shear))
train_dataset = train_dataset.concatenate(dataset.map(process_translate))
train_dataset = train_dataset.concatenate(dataset.map(process_crop))



In [None]:
train_dataset_length = tf.data.experimental.cardinality(train_dataset).numpy()
print("Total images in dataset: ", train_dataset_length) 

In [None]:


train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(buffer_size = train_dataset_length)
train_dataset = train_dataset.batch(batch_size = BATCH_SIZE)
train_dataset = train_dataset.prefetch(buffer_size = AUTOTUNE)
train_dataset = train_dataset.repeat()

In [None]:
#ver os batchs
image_batch, label_batch = next(iter(train_dataset))        
show_batch(image_batch, label_batch.numpy(), 1)
image_batch, label_batch = next(iter(train_dataset))      
show_batch(image_batch, label_batch.numpy(), 2)
image_batch, label_batch = next(iter(train_dataset))      
show_batch(image_batch, label_batch.numpy(), 3)

#### Dataset de validação

In [None]:
train_size = int(0.8*train_dataset_length)
val_size = int(0.2*train_dataset_length)

In [None]:
train_Data = train_dataset.take(train_size)
val_Data = train_dataset.skip(train_size)
print(tf.data.experimental.cardinality(train_Data).numpy())

#### Modelo

In [None]:
modelV3 = Sequential()

modelV3.add(Conv2D(128, (5, 5),
                    input_shape=(32, 32, 3)))         
modelV3.add(LeakyReLU(alpha=0.01))  
modelV3.add(BatchNormalization())
modelV3.add(Dropout(0.5)) 

modelV3.add(Conv2D(196, (5, 5) )) 
modelV3.add(LeakyReLU(alpha=0.01))
modelV3.add(MaxPooling2D(pool_size=(2, 2)))
modelV3.add(BatchNormalization())
modelV3.add(Dropout(0.5)) 

modelV3.add(Conv2D(256, (5, 5) ) )   
modelV3.add(LeakyReLU(alpha=0.01))
modelV3.add(MaxPooling2D(pool_size=(2, 2)))
modelV3.add(BatchNormalization())
modelV3.add(Dropout(0.5)) 

modelV3.add(Flatten())
modelV3.add(LeakyReLU(alpha=0.0)) 
modelV3.add(Dense(384))
modelV3.add(LeakyReLU(alpha=0.0))             
modelV3.add(Dropout(0.5)) 

modelV3.add(Dense(43, activation='softmax'))


opt = Adam(learning_rate=0.0001)

modelV3.compile(optimizer = opt, loss='categorical_crossentropy', metrics=[ 'accuracy'])

In [None]:
file_pathV3 = './TrainModels/Massive_Augmentation.ckpt'

callbacksV3 = prepare_callbacks(file_pathV3)

historyV3 = modelV3.fit(train_Data, steps_per_epoch = train_size/BATCH_SIZE,
          epochs=50, 
          validation_data = val_Data,
          validation_steps=val_size/BATCH_SIZE, 
          callbacks = callbacksV3)

# historyV3 = modelV3.fit(dataV3,
#           epochs=50, 
#           validation_data = val_Data, 
#           callbacks = callbacksV3)      

In [None]:
show_history(historyV3)

modelV3.load_weights(file_pathV3)

evalV3 = modelV3.evaluate(test_Data, verbose=2)
valV3 = modelV3.evaluate(val_Data, verbose=2)

In [None]:
model_eval = modelV3.evaluate(test_Data, verbose=2)
print(model_eval)

model_val = modelV3.evaluate(val_Data, steps=1, batch_size=val_size, verbose=2)
print(model_val)

In [None]:
numpy_labels = []
numpy_images = []
pred = []

for images, labels in test_Data.take(-1):  # take all batches of dataset
    numpy_images.extend(images.numpy())
    numpy_labels.extend(labels.numpy())
    pred.extend(modelV3.predict(images.numpy()))

In [None]:
#show_misclassified(pred, numpy_labels, numpy_images, int((val_size - val_size*.9861 )/3 + 1))