Per produrre un sistema di riconoscimento facciale basato sul deep learning vengono utilizzate le librerie python ```OpenCV```, che permette di manipolare le immagini, ```numpy```, che permette di gestire i dati in forma vettoriale e matriciale, e ```os```, che permette di esplorare i file navigando nei vari percorsi.

Inoltre vengono utilizzate le librerie ```TensorFlow``` e ```Keras```, che gestiscono l'apprendimento automatico e permettono di costruire delle reti neurali; e ```yoloface``` che si occupa del riconoscimento facciale, ovvero riconoscere la presenza di volti nelle immagini.

In [1]:
from yoloface import face_analysis
import cv2
import numpy as np
import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow.keras.backend as K
from sklearn.model_selection import train_test_split

Viene inizializzata la variabile ```face``` che verrà utilizzata nei metodi a seguire per poter riconoscere i volti in modo da estrari dai video e creare il dataset.

In [2]:
face=face_analysis()

yolov3-tiny_face.weights:: status : file already exists
yolov3_tiny_face.cfg:: status : file already exists
face_detection.weights:: status : file already exists
face_detection.cfg:: status : file already exists


Viene dichiarato il metodo ```acquisizione_video``` utilizzato appunto per registrare dei video da cui estrarre i volti per poter comporre il dataset

In [None]:
def acquisizione_video():
    src = 0 # webcam
    #src = 'rtsp://CV2023:Studente123@147.163.26.184:554/Streaming/Channels/101'
    #src = 'rtsp://CV2023:Studente123@147.163.26.182:554/Streaming/Channels/101'

    video = cv2.VideoCapture(src)

    video_FourCC = int(video.get(cv2.CAP_PROP_FOURCC))
    fps = video.get(cv2.CAP_PROP_FPS)
    H = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    W = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_size = (W, H)

    if video is None or not video.isOpened():
        print('--(!)Error opening video capture')
        exit(0) 

    key = ''
    out=None
    i=1

    while key!=ord('q'):

        ret, frame = video.read()
        cv2.imshow("frame", frame)
        key = cv2.waitKey(1)

        if key==ord('a'):
            out = cv2.VideoWriter('video/train_'+str(i)+'.mp4',  video_FourCC, fps, frame_size)
            i+=1
            if out is None or not out.isOpened():
                print('--(!)Error opening video capture')
                exit(0)
            while key!=ord('s'):
                cv2.imshow("frame", frame)
                out.write(frame)
                ret, frame = video.read()
                key= cv2.waitKey(1)
        
        if out is not None:
            out.release()

    cv2.destroyAllWindows()
    video.release()
    
acquisizione_video()

Il metodo ```detect_face```, attraverso yolo, rileva i volti da video che raffigurano 4 identità diverse, in modo da realizzare un dataset.

In [None]:
def detect_face(face, name):

    video = cv2.VideoCapture("video/"+name+".mov")

    if video is None or not video.isOpened(): 
        print('--(!)Error opening video capture')
        exit(0)

    ret, img = video.read()
    a=img.shape[0]
    b=img.shape[1]
    i=0
    j=0
    c=0
    while ret:


        
        _, box, conf = face.face_detection(frame_arr=img, model='full')
        for k in range(len(box)):
            c_x=box[k][0]+box[k][3]//2
            c_y=box[k][1]+box[k][2]//2
            l=max(box[k][2],box[k][3])
            new_x=c_x-l//2
            new_y=c_y-l//2
            face_d=img[new_y:new_y+l, new_x:new_x+l]
            if face_d.shape[0]>0 and face_d.shape[1]>0:
                cv2.imwrite("dataset/Daniele/"+name+"_"+str(i)+".png", face_d)
                i+=1
        
        
        
        #ritaglia pezzi dell'immagine ogni frame 
        crop=img[0:100, 0:100]
        cv2.imwrite("dataset/background_"+str(c)+".png", crop)
        c+=1
        crop=img[a//2:a//2+100, 0:100]
        cv2.imwrite("dataset/background_"+str(c)+".png", crop)
        c+=1
        crop=img[a//2:a//2+100, b-101:b-1]
        cv2.imwrite("dataset/background_"+str(c)+".png", crop)
        c+=1
            
        
        ret, img = video.read()

    video.release()
    return

for name in ["daniele"]:#, "antonio", "alessandro", "rosario"]:
    detect_face(face, name)

Il metodo ```preprocessing``` processa le immagini dei volti convertendole in scala di grigi, rimpicciolendole e normalizzandole.

In [3]:
def preprocessing(img):
    img=cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img=cv2.resize(img, (28,28))
    img=img.astype('float32')/255. - 0.5
    return img

Utilizzando i metodi precedenti, viene inizializzato il dataset: in un array sono contenute le immagini processate dei volti e in un altro le label corrispondenti.

In [4]:
## PREPARAZIONE DATASET ##
x_dataset=[]
y_dataset=[]

for dirpath, dirnames, filenames in  os.walk("dataset/"):
    for filename in filenames:

        img=cv2.imread(os.path.join(dirpath, filename))
        img=preprocessing(img)

        x_dataset.append(img)
        y_dataset.append(dirpath[8:])

x_dataset = np.asarray(x_dataset)
y_dataset = np.asarray(y_dataset)

Dopo aver creato il dataset si progetta la rete neurale responsabile che verrà addestrata attraverso quest'ultimo per apprendere in modo automatico il calcolo della distanza euclidea tra due immagini.

Si è provato a definire una classe ```Euclidean``` che si occupasse di calcolare la suddetta distanza euclidea, definendo una soglia secondo cui due immagini si possono definire uguali o diverse. E' stata scartata per dei problemi sorti durante l'addestramento che riguardavano il calcolo del gradiente.

In [None]:
class Euclidean(tf.keras.layers.Layer):
  def __init__(self):
    super(Euclidean, self).__init__()
    self.t = K.variable(0.)
    self.f = K.variable(1.)

  def call(self, d):
    distanza=K.sum(K.square(d[0] - d[1]), axis=-1)
    if distanza<=0.2:
      return self.t
    return self.f

La rete neurale che si è progettata è la rete siamese, costituita da due rami alla quale vengono dati in input due immagini. Questi rami condividono la stessa rete convoluzionale, alternata a layer di Max Pooling, e di conseguenza condividono gli stessi pesi. Dopo aver processato le due immagini, viene calcolata la distanza euclidea e viene restituita come output. Quindi la rete siamese si occupa di stabilire se le due immagini in input rappresentano lo stesso volto o meno.

Si definisce anche il metodo ```euclidean_distance``` che calcola la distanza euclidea sfruttando il backend di keras per gestire i tensori.

In [5]:
# PROGETTO RETE SIAMESE #

def euclidean_distance(d):
    return K.sum(K.square(d[0] - d[1]), axis=-1)


def get_branch(shape_input):
    input_img = keras.Input(shape=shape_input)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    #x = layers.Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    y = layers.Flatten()(x)
    model = keras.Model(input_img, y)
    model.summary()
    return model

def get_siamese(shape_input=(28, 28, 1)):
    branch = get_branch(shape_input)
    input_img_1 = keras.Input(shape=shape_input)
    input_img_2 = keras.Input(shape=shape_input)
    des_1 = branch(input_img_1)
    des_2 = branch(input_img_2)
    output= layers.Lambda(euclidean_distance, name="euclidean_layer")([des_1,des_2])
    model = keras.Model([input_img_1, input_img_2] , output)
    model.summary()
    return model

model=get_siamese()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 28, 28, 1)]       0         
                                                                 
 conv2d (Conv2D)             (None, 28, 28, 32)        320       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 14, 14, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 14, 14, 16)        4624      
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 7, 7, 16)         0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 7, 7, 8)           1160  

La rete complessiva è costituita da circa 6000 parametri.

Si è provato a definire la funzione di loss ```contrastive_loss```, spesso usata per le reti siamesi per l'addetsramento, ma sono stati riscontrati scarsi risultati,  in quanto le distanze calcolate erano tutte prossime allo 0, quindi la rete non era in grado di distinguere due immagini uguali o diverse.

Si è quindi scelta la funzione di loss ```binary_crossentropy```, anch'essa utilizzata spesso per le reti siamesi, più performante in quanto il saper distinguere due immagini può essere visto come un problema binario. La metrica scelta per valutare la rete è MSE, che misura appunto l'errore quadratico medio.

La rete viene quindi compilata attraverso il metodo ```compile``` del modello.

In [6]:
## COMPILING ##

def contrastive_loss(y_true, y_pred, margin = 0.7):
    return y_true * y_pred + (1.0 - y_true) * tf.math.maximum(margin - y_pred, 0.0)


model.compile(optimizer='adam', loss='binary_crossentropy', metrics=[tf.keras.metrics.MeanSquaredError()]) 

Poiché il dataset è di grandi dimensioni, viene definito il data generetor ```data_gen```, che crea un sottinsieme del dataset e restituisce due elementi: tutte le possibili coppie di immagini, e la label 1 o 0, a seconda che la coppia di immagini rappresenti la stessa persona o meno. 

In [7]:
## DATA GENERATOR ##

def data_gen(x_train, y_train, batch_size):
    while True:
        indici = np.random.randint(0, x_train.shape[0], batch_size)
        sub_set = x_train[indici]
        sub_set_y = y_train[indici]
        pairs = [[a, b] for idx, a in enumerate(sub_set) for b in sub_set[idx + 1:]]
        label = np.asarray([ 0 if a==b else 1 for idx, a in enumerate(sub_set_y) for b in sub_set_y[idx + 1:]])
        x1 = np.asarray([ a[0] for a in pairs])
        x2 = np.asarray([ a[1] for a in pairs])
        yield (x1, x2), label.astype('float32')

Si procede con l'addestramento. Viene diviso il dataset in due parti: il training set e il validation set, il 33% del dataset totale.

Viene utilizzato il data generator per costruire l'input da mandare alla rete, e viene implementato l'early stopping che monitora la validation loss con pazienza impostata a 3.

In [8]:
## ADDESTRAMENTO ##

x_train, x_val, y_train, y_val = train_test_split(x_dataset, y_dataset, test_size=0.33)

data_gen_batch=data_gen(x_train, y_train, 32)
data_gen_batch_val=data_gen(x_val, y_val, 32)

callback=tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

model.fit(x=data_gen_batch,
          epochs=10,
          steps_per_epoch=100,
          validation_steps=10,
          batch_size=32,
          shuffle=True,
          validation_data=data_gen_batch_val,
          callbacks=[callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10


<keras.callbacks.History at 0x24de336a100>

Si definisce il metodo ```predict``` che utilizza l'output della rete siamese, ovvero le distanze euclidee dell'immagine di test rispetto ad un sottinsieme di campioni del dataset. Le coppie possibili di immagini vengono generate allo stesso modo del data generetor, viene utilizzato il metodo ```predict``` del modello e viene restituita la label dell'immagine con distanza minima rispetto a quella di test.

Corrisponde quindi all'implementazione di un Nearest Neighbor.

In [9]:
def predict(model, x_test, x_dataset, y_dataset):

    pairs=[[x_test, b] for b in x_dataset]

    x1 = np.asarray([ a[0] for a in pairs])
    x2 = np.asarray([ a[1] for a in pairs])
    
    pred=(model.predict((x1,x2)))

    
    return y_dataset[np.argmin(pred)]

Viene inizializzato un sottinsieme del dataset, formato da 50 campioni scelti casualmente, che verrà utilizzato per la fase di testing, ovvero il riconoscimento dei volti nei video.

In [10]:
indici = np.random.randint(0, x_dataset.shape[0], 50)
subset_x = x_dataset[indici]
subset_y = y_dataset[indici]

Per misurare l'accuratezza, e quindi la bontà della rete nel saper riconoscere le immagini con le stesse identità e a distanza minima, viene estrapolato dal dataset un sottinsieme che corrisponde al 20% del totale, che viene sottoposto al testing per quantificare la percentuale di risposte corrette, ovvero di etichette assegnate correttamente, calcolata dal rapporto tra le risposte corrette e la dimensione del test set.

In [11]:
## TESTING ##

count=0
_, x_test, _, y_test = train_test_split(x_dataset, y_dataset, test_size=0.2)

for i in range(x_test.shape[0]):
    pred=predict(model, x_test[i], subset_x, subset_y)
    print("Predetto:" ,pred)
    print("Reale: ",y_test[i])
    if pred==y_test[i]:
        count+=1

print("Accuratezza: ",count/i)


Predetto: Daniele
Reale:  Daniele
Predetto: Sconosciuto
Reale:  Sconosciuto
Predetto: Daniele
Reale:  Daniele
Predetto: Rosario
Reale:  Rosario
Predetto: Daniele
Reale:  Daniele
Predetto: Rosario
Reale:  Daniele
Predetto: Antonio_S
Reale:  Antonio_S
Predetto: Alessandro
Reale:  Alessandro
Predetto: Sconosciuto
Reale:  Sconosciuto
Predetto: Antonio_S
Reale:  Antonio_S
Predetto: Rosario
Reale:  Rosario
Predetto: Daniele
Reale:  Daniele
Predetto: Rosario
Reale:  Antonio_S
Predetto: Rosario
Reale:  Rosario
Predetto: Sconosciuto
Reale:  Sconosciuto
Predetto: Sconosciuto
Reale:  Sconosciuto
Predetto: Daniele
Reale:  Antonio_S
Predetto: Daniele
Reale:  Daniele
Predetto: Rosario
Reale:  Rosario
Predetto: Rosario
Reale:  Daniele
Predetto: Daniele
Reale:  Rosario
Predetto: Daniele
Reale:  Daniele
Predetto: Antonio_S
Reale:  Antonio_S
Predetto: Rosario
Reale:  Rosario
Predetto: Rosario
Reale:  Rosario
Predetto: Antonio_S
Reale:  Antonio_S
Predetto: Daniele
Reale:  Daniele
Predetto: Rosario
Reale:

L'accuratezza risulta oscillare tra l'85% e l'87%.

Infine, per visualizzare qualitativamente i risultati, vengono assegnate le etichette ad un video in cui sono presenti dei volti.

Sfruttando i metodi precedenti e yolo, vengono rilevati i volti, preprocessate le immagini, dati come input alla rete e utilizzata la label restituita per costruire una bounding box sul video ed etichettarla.

In [12]:
## CREAZIONE VIDEO ##

video=cv2.VideoCapture("test.mp4")

video_FourCC = int(video.get(cv2.CAP_PROP_FOURCC))
fps = video.get(cv2.CAP_PROP_FPS)
H = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
W = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_size = (W, H)

out = cv2.VideoWriter('test_l.mp4',  video_FourCC, fps, frame_size)

if video is None or not video.isOpened() or out is None or not out.isOpened() :
    print('--(!)Error opening video capture')
    exit(0)


ret, frame=video.read()

while ret:

    _, box, conf = face.face_detection(frame_arr=frame, model='full')

    for k in range(len(box)):

        x, y, w, h=box[k][0], box[k][1], box[k][2], box[k][3]

        c_x=x+h//2
        c_y=y+w//2
        l=max(w,h)
        new_x=c_x-l//2
        new_y=c_y-l//2
        
        face_d=frame[new_y:new_y+l, new_x:new_x+l]

        if face_d.shape[0]>0 and face_d.shape[1]>0:
            face_d=preprocessing(face_d)

            pred=predict(model, face_d, subset_x, subset_y)

            frame=cv2.rectangle(frame, (x,y), (x+w,y+h), (74,51,29), 4)
            frame=cv2.putText(frame, pred, (x,y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,216,255), 2)
    
    out.write(frame)
    ret, frame=video.read()

video.release()
out.release()

