# One-shot Learning para reconhecimento facial
## Projeto final desenvolvido na Escola de Engenharia de Piracicaba - EEP

### Dependências do projeto

In [2]:
import os
import uuid
import itertools
import cv2
import numpy as np
from matplotlib import pyplot as plt
import tensorflow as tf
import tensorflowjs as tfjs
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Layer, MaxPooling2D, Dense, Flatten, Input 
from tensorflow.keras.metrics import Precision, Recall
from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix

### Habilitar GPU para o Tensorflow

In [3]:
gpus = tf.config.list_physical_devices('GPU')

for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

### Criação de pasta base das imagens

In [4]:
# Tamanho padrão em pixels da imagem de entrada
TAMANHO_PADRAO_IMAGEM = 105 

DIR_POSITIVAS = os.path.join('data', 'positivas')
DIR_NEGATIVAS = os.path.join('data', 'negativas')
DIR_ANCORAS = os.path.join('data', 'ancoras')

os.makedirs(DIR_POSITIVAS, exist_ok=True)
os.makedirs(DIR_NEGATIVAS, exist_ok=True)
os.makedirs(DIR_ANCORAS, exist_ok=True)

### LFW - Labeled Faces in the Wild

In [None]:
# Para imagens negativas utilizamos o seguinte dataset:
# http://vis-www.cs.umass.edu/lfw/lfw.tgz

### Capturar imagens âncoras e positivas com a Webcam

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Frame com 250x250 pixels
    frame = frame[120:120+250,200:200+250, :]
    
    # Coletar imagens âncoras pressionando a tecla A do teclado 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        imagem = os.path.join(DIR_ANCORAS, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imagem, frame)
    
    # Coletar imagens positivas pressionando a tecla P do teclado 
    if cv2.waitKey(1) & 0XFF == ord('p'):
        imagem = os.path.join(DIR_POSITIVAS, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imagem, frame)

    cv2.imshow('Coletar imagens', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
    
cap.release()
cv2.destroyAllWindows()

### Data Augmentation

In [None]:
def data_augmentation(imagem):
    data = []
    for i in range(40):
        imagem = tf.image.stateless_random_jpeg_quality(imagem, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        imagem = tf.image.stateless_random_brightness(imagem, max_delta=0.02, seed=(1,2))
        imagem = tf.image.stateless_random_contrast(imagem, lower=0.6, upper=1, seed=(1,3))
        imagem = tf.image.stateless_random_flip_left_right(imagem, seed=(np.random.randint(100),np.random.randint(100)))
        imagem = tf.image.stateless_random_saturation(imagem, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(imagem)
    
    return data

In [None]:
# Aplicando nas imagens positivas
for arquivo in os.listdir(os.path.join(DIR_POSITIVAS)):
    caminho = os.path.join(DIR_POSITIVAS, arquivo)
    img = cv2.imread(caminho)
    imagens = data_augmentation(img) 
    
    for imagem in imagens:
        cv2.imwrite(os.path.join(DIR_POSITIVAS, '{}.jpg'.format(uuid.uuid1())), imagem.numpy())

In [None]:
# Aplicando nas imagens âncoras
for arquivo in os.listdir(os.path.join(DIR_ANCORAS)):
    caminho = os.path.join(DIR_ANCORAS, arquivo)
    img = cv2.imread(caminho)
    imagens = data_augmentation(img) 
    
    for imagem in imagens:
        cv2.imwrite(os.path.join(DIR_ANCORAS, '{}.jpg'.format(uuid.uuid1())), imagem.numpy())

### Obter imagens de cada categoria

In [None]:
positivas = tf.data.Dataset.list_files(DIR_POSITIVAS + '\*.jpg').take(500) # 500 imagens positivas
ancoras = tf.data.Dataset.list_files(DIR_NEGATIVAS + '\*.jpg').take(500) # 500 imagens âncoras
negativas = tf.data.Dataset.list_files(DIR_ANCORAS + '\*.jpg').take(500) # 500 imagens negativas

### Gerando dataset com pares de imagens positivas e âncoras, negativas e âncoras

In [None]:
pares_negativos = tf.data.Dataset.zip((ancoras, negativas, tf.data.Dataset.from_tensor_slices(tf.zeros(len(ancoras)))))
pares_positivos = tf.data.Dataset.zip((ancoras, positivas, tf.data.Dataset.from_tensor_slices(tf.ones(len(ancoras)))))

data = pares_positivos.concatenate(pares_negativos)

### Função de pré-processamento de imagem de entrada

In [None]:
def pre_processamento(caminho):
    img = tf.io.read_file(caminho)
    imagem = tf.io.decode_jpeg(img)
    imagem = tf.image.resize(imagem, (TAMANHO_PADRAO_IMAGEM, TAMANHO_PADRAO_IMAGEM))
    imagem = imagem / 255.0
    return imagem

### Função de pré-processamento dos pares

In [None]:
def pre_processamento_siameses(input, validacao, label):
    return (pre_processamento(input), pre_processamento(validacao), label)

### Geração dos pares rotulados

In [None]:
data = data.map(pre_processamento_siameses)
data = data.cache()
data = data.shuffle(buffer_size=10000)

### Separação de treino e teste

In [None]:
# 70% para treino
dados_treino = data.take(round(len(data) *.7))
dados_treino = dados_treino.batch(16)
dados_treino = dados_treino.prefetch(8)

# 30% para teste
dados_teste = data.skip(round(len(data) *.7))
dados_teste = dados_teste.take(round(len(data)*.3))
dados_teste = dados_teste.batch(16)
dados_teste = dados_teste.prefetch(8)

### Criação da Rede neural One-shot Learning

In [5]:
def ModeloOneShotLearning(): 
    # Camada de entrada
    input = Input(shape=(TAMANHO_PADRAO_IMAGEM, TAMANHO_PADRAO_IMAGEM, 3), name='input')
    # Primeira camada de convolução
    conv1 = Conv2D(64, (10,10), activation='relu')(input)
    # Primeira camada de max pooling
    max1 = MaxPooling2D(64, (2,2), padding='same')(conv1)
    # Segunda camada de convolução
    conv2 = Conv2D(128, (7,7), activation='relu')(max1)
    # Segunda camada de max pooling
    max2 = MaxPooling2D(64, (2,2), padding='same')(conv2)
    # Terceira camada de convolução
    conv3 = Conv2D(128, (4,4), activation='relu')(max2)
    # Terceira camada de max pooling
    max3 = MaxPooling2D(64, (2,2), padding='same')(conv3)
    # Quarta camada de convolução
    conv4 = Conv2D(256, (4,4), activation='relu')(max3)
    # Unindo dados na camada de flatten para array unidimensional
    flatten = Flatten()(conv4)
    # Camada densa com ativação sigmoid para classificação binária
    densa = Dense(4096, activation='sigmoid')(flatten)

    return Model(inputs=[input], outputs=[densa], name='modelo_one_shot')

In [6]:
modelo = ModeloOneShotLearning()

modelo.summary()

Model: "modelo_one_shot"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input (InputLayer)          [(None, 105, 105, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 96, 96, 64)        19264     
                                                                 
 max_pooling2d (MaxPooling2D  (None, 48, 48, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 42, 42, 128)       401536    
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 21, 21, 128)      0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 18, 18, 128)   

### Camada customizada para calcular distância L1

In [7]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()
      
    def call(self, entrada_img, validacao_img):
        return tf.math.abs(entrada_img - validacao_img)

### Criação da rede neural siamesa

In [8]:
def RedeNeuralSiamesa(): 
    input = Input(name='input', shape=(TAMANHO_PADRAO_IMAGEM, TAMANHO_PADRAO_IMAGEM, 3))
    validacao = Input(name='validacao', shape=(TAMANHO_PADRAO_IMAGEM, TAMANHO_PADRAO_IMAGEM, 3))
    camada_customizada = L1Dist()
    camada_customizada._name = 'distancia_euclidiana'
    distancia_euclidiana = camada_customizada(modelo(input), modelo(validacao))
    classificador = Dense(1, activation='sigmoid')(distancia_euclidiana)
   
    return Model(inputs=[input, validacao], outputs=classificador, name='RedeNeuralSiamesa')

In [9]:
modelo_rede_siamesa = RedeNeuralSiamesa()
modelo_rede_siamesa.summary()

Model: "RedeNeuralSiamesa"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input (InputLayer)             [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validacao (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 modelo_one_shot (Functional)   (None, 4096)         38960448    ['input[0][0]',                  
                                                                  'validacao[0][0]

### Otimizador e função de custo

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(1e-4)

### Definindo diretório de checkpoints para treino

In [None]:
checkpoint_diretorio = './treino_checkpoints'
checkpoint_prefixo = os.path.join(checkpoint_diretorio, 'checkpoint')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=modelo_rede_siamesa)

### Criação da função um passo de treinamento

In [None]:
def etapa_treino(batch):
    # Gravando operações para o cálculo do gradiente
    with tf.GradientTape() as tape:     
        # Pares
        X = batch[:2]
        # Rótulos
        y = batch[2]
        
        # Calculando a saída da rede neural
        yhat = modelo_rede_siamesa(X, training=True)
        # Calculando a perda
        loss = binary_cross_loss(y, yhat)
        
    # Calculando o gradiente
    grad = tape.gradient(loss, modelo_rede_siamesa.trainable_variables)
    
    # Atualizando os pesos da rede neural
    opt.apply_gradients(zip(grad, modelo_rede_siamesa.trainable_variables))
        
    # Retornando a perda
    return loss

### Função para treinar o modelo

In [None]:
def treino(data, EPOCAS):
    for epoca in range(1, EPOCAS + 1):
        print('\n Época {}/{}'.format(epoca, EPOCAS))
        
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        for idx, batch in enumerate(data):
            loss = etapa_treino(batch)
            yhat = modelo_rede_siamesa.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
            
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Salvando checkpoint
        if epoca % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefixo)

### Treinamento do modelo

In [None]:
treino(dados_treino, 30)

### Validando precisão e recall em toda base de dados

In [None]:
recall = Recall()
precision = Precision()

predicoes = []
valores_reais = []

for teste_img_input, teste_img_validacao, y_true in dados_teste.as_numpy_iterator():
    y_pred = modelo_rede_siamesa.predict([teste_img_input, teste_img_validacao])
    recall.update_state(y_true, y_pred)
    precision.update_state(y_true, y_pred) 
    
    for predicao in y_pred:
        predicoes.append(predicao)
        
    for valor_real in y_true:
        valores_reais.append(valor_real)

print("Recall:", recall.result().numpy())
print("Precision:", precision.result().numpy())

### Função para criar matriz de confusão

In [None]:
def plot_confusion_matrix(cm, classes,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
  
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Rótulos verdadeiros')
    plt.xlabel('Rótulos preditos')

### Matriz de confusão

In [None]:
confusion = confusion_matrix([1 if predicao > 0.5 else 0 for predicao in predicoes], valores_reais)

plt.figure()
plot_confusion_matrix(confusion, classes=[0, 1],
                      title='Matriz de confusão')

plt.show()

### Salvar modelo

In [None]:
# Primeira forma de salvar o modelo com extensão h5
modelo_rede_siamesa.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
modelo_rede_siamesa.save('model.h5')

In [None]:
# Utilizando tensorflow.js para salvar o modelo e recarregá-lo na web
tfjs.converters.save_keras_model(modelo_rede_siamesa, os.path.join('tfjs_model'))

modelo = tf.keras.models.load_model('model.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

modelo.save('./model_keras')

In [None]:
# Comando para converter o modelo para o formato web com camada customizada
!tensorflowjs_converter --input_format=tf_saved_model ./model_keras ./model_web