# Metric Learning
credits to [Giuseppe Lisanti](https://www.unibo.it/sitoweb/giuseppe.lisanti/en), Samuele Salti and Riccardo Spezialetti


## Import Dependencies

In [3]:
from __future__ import print_function
from __future__ import division

import copy
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.utils as utils
import matplotlib.pyplot as plt
import numpy as np
import PIL
import random
import requests
import shutil
import os
import time

from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from timeit import default_timer as timer
from typing import Callable, Dict, List, Tuple, Union
from torchvision import datasets, models, transforms
from torchsummary import summary
from torch.optim import lr_scheduler
from torch.nn import functional as F
from torch.utils.tensorboard import SummaryWriter
from tqdm.auto import tqdm

## Runtime Settings



In [4]:
device = "cpu"
if torch.cuda.is_available:
  print('All good, a Gpu is available')
  device = torch.device("cuda:0")  
else:
  print('Please set GPU via Edit -> Notebook Settings.')
!nvidia-smi

All good, a Gpu is available
/bin/bash: line 1: nvidia-smi: command not found


## Reproducibility & Deterministic mode


In [None]:
def fix_random(seed: int) -> None:
    """Fix all the possible sources of randomness.

    Args:
        seed: the seed to use. 
    """
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

fix_random(seed=7)

# Face Identification/Recognition


Given images of faces, we can either **recognize the identity** of the people involved or **verify** they depict the same person. We can therefore efine two realted but different problems:

**1 - Face Verification**: A one-to-one mapping where given two images we have to confirm that they depict the same person.

**2 - Face Identification/Recognition**: A one-to-many mapping where given an image and a database of images of known faces (***gallery***), we have to identify the subject depicted in the picture.

## Face Detection


To solve face detection, we will use a *face detector* based on neural networks to crop the face of the subject from the photo.

Install the ```facenet-pytorch``` [package](https://github.com/timesler/facenet-pytorch).



In [None]:
!pip install facenet-pytorch

In [None]:
def generate_colors(num_colors: int) -> np.array:
    """Generates an array with RGB triplets representing colors.

    Args:
        num_colors: the number of colors to generate.

    Returns: 
        the generated colors.
    """
    np.random.seed(0)
    colors = np.random.uniform(0, 255, size=(num_colors, 3))
    time_in_ms = 1000 * time.time()
    np.random.seed(int(time_in_ms) % 2 ** 32)

    return colors

def draw_detection_results(image: Image, 
                           boxes: np.ndarray,
                           landmarks: np.ndarray, 
                           colors: np.ndarray) -> Image:
    """Draws the detected bounding boxes and landmarks on image.

    Args:
        image: the input image.
        boxes: the detected bounding boxes.
        landmarks: the detected landmarks.
        colors: the color to use to draw the bounding boxes.

    Returns:
        The image with the annotations.        
    """
    image_annotated = copy.deepcopy(image)
    painter = ImageDraw.Draw(image_annotated)

    for i, (box, point) in enumerate(zip(boxes, landmarks)):
        color = tuple(colors[i].astype(np.int32))
        x_min, y_min, x_max, y_max = box
        painter.rectangle([x_min, y_min, x_max, y_max], width=5, outline=color)
        for p in point:
            x, y = p
            painter.rectangle([x - 10, y - 10, x + 10, y + 10], width=4, 
                              outline=color, 
                              fill=color)

    return image_annotated

For face detection, we will use the *Multi-Task Cascaded Convolutional Neural Network (**MTCNN**)* presentend in the paper [Joint Face Detection and Alignment Using Multitask Cascaded Convolutional Networks](https://arxiv.org/pdf/1604.02878.pdf).

In [None]:
from facenet_pytorch import MTCNN

# Create a detector with default parameters
size_image = 160
detector = MTCNN(image_size=size_image)

Let's try the detector and draw the results on the image.

In [None]:
image_url = "https://www.basketinside.com/wp-content/uploads/2016/07/magic-jordan-bird.jpg"
response = requests.get(image_url, stream=True)

image = Image.open(response.raw).convert("RGB")
plt.imshow(image)
plt.axis("off")
plt.show()

The `detect` method returns the coordinates of bounding boxes where the algorithm has detected the faces, a confidence score for each box and the coordinates of 5 face landmarks (or keypoints).

In [None]:
boxes, scores, landmarks = detector.detect(image, landmarks=True)

image_with_annotations = draw_detection_results(image, boxes, landmarks, 
                                                generate_colors(len(boxes)))

plt.imshow(image_with_annotations)
plt.axis("off")
plt.show()

In [None]:
def crop_faces(image: Image, 
               boxes: np.ndarray,
               margin: int, 
               size_face: int) -> List[PIL.Image.Image]:
    """Crops the pixel in the image corresponding to the bounding boxes.

    Args:
        image: the input image.
        boxes: the bounding boxes
        margin: the margin to add to the bounding box, in terms of pixels in 
        the final image.
        size_face: the output image size in pixels, the image will be square.

    Returns:
        The extracted faces from the image.
    """
    faces = []
    width, height = image.size
    
    for box in boxes:
        x_min, y_min, x_max, y_max = box
        
        x_margin = margin * (x_max - x_min) / (size_face - margin)
        x_margin *= 0.5        
        y_margin = margin * (y_max - y_min) / (size_face - margin)
        y_margin *= 0.5

        x_min = int(max(x_min - x_margin, 0))
        y_min = int(max(y_min - y_margin, 0))
        x_max = int(min(x_max + x_margin, width))
        y_max = int(min(y_max + y_margin, height))

        face = np.asarray(image)[y_min:y_max, x_min:x_max]
        
        face_image = Image.fromarray(face).resize((size_face, size_face), 
                                                  Image.BILINEAR)
        faces.append(face_image)
    return faces

In [None]:
image_url = "https://www.basketinside.com/wp-content/uploads/2016/07/magic-jordan-bird.jpg"
response = requests.get(image_url, stream=True)

image = Image.open(response.raw).convert("RGB")
plt.imshow(image)
plt.axis("off")
plt.show()

In [None]:
margin = 30

boxes, scores = detector.detect(image, landmarks=False)
faces = crop_faces(image, boxes, margin, 160)

# Plotting 
figure = plt.figure(figsize=(15, 5))

for idx, face in enumerate(faces):
    figure.add_subplot(1, len(faces), idx + 1)
    plt.title(f'Score: {scores[idx]:.3f}')
    plt.imshow(face)
    plt.axis("off")
    

Let's create a face identification pipeline using the LFW dataset to build our gallery.

## Get the Labeled Faces in the Wild  [dataset](http://vis-www.cs.umass.edu/lfw/)

In [None]:
!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz

In [None]:
!tar -zxf lfw.tgz

In face verification/identification, there should be no overlapping identities between the training set and testing set. This [repository](https://github.com/happynear/FaceDatasets) provides some overlapping lists between several training and testing datasets. In our case we need the overlapping list [CASIA-LFW](https://github.com/happynear/FaceDatasets/blob/master/CASIA/webface_lfw_overlap_detail.txt).

In [None]:
!ls lfw

In [None]:
overlap_casia_lfw = ['William_Macy', 'Vanessa_Williams', 'Mary_Blige', 
                    'Laura_Elena_Harring', 'David_Kelley', 'Norm_Macdonald', 
                    'Hillary_Clinton', 'Wanda_de_la_Jesus', 'Michael_Jordan', 
                    'Nicole_Parker', 'Zhang_Ziyi', 'Prince_William', 'Liu_Ye', 
                    'Randy_Jackson', 'Jesse_James', 'John_Mabry', 'Richard_Cohen']

In [None]:
def remove_sub_folders(path_base: str, folders: str) -> int:
    """Removes all the sub folders inside the folder path_base.

    Args:
        path_base: the base path.
        folders: the folder to delete.

    Returns:
        the number of deleted folders.

    Raises:
        OSError: if is impossible to delete the folder.
    """
    num_deleted_folders = 0
    for folder in folders:
        path_folder = os.path.join(path_base, folder)
        try:
            shutil.rmtree(path_folder)
            num_deleted_folders += 1
        except OSError as e:
            print(f'Impossible to delete folder {path_folder}')
            raise
    return num_deleted_folders

In [None]:
remove_sub_folders('lfw', overlap_casia_lfw)

In [None]:
path_ds_lfw = "lfw"
dataset_lfw = torchvision.datasets.ImageFolder(path_ds_lfw)

identities = dataset_lfw.classes
num_identities = len(identities)

print(f'Images: {len(dataset_lfw)} - People: {num_identities}')

In [None]:
index_sample = 2026 

image, label = dataset_lfw[index_sample]

plt.imshow(image)
plt.title(identities[label])
plt.axis("off")
plt.show()

## Build the Model Gallery

We need to crop all the images before creating our embeddings gallery. Instead of cropping each single face from the image using the method shown above, we can use the [forward](https://github.com/timesler/facenet-pytorch/blob/dd0b0e4b5b124b599f75b87e570910e5d80c8848/models/mtcnn.py#L226) method of the MTCNN detector. However, it requires `PIL.Image`s as input, rather than `torch.tensors` as we have used so far. To create a batch of images, we can use
the [collate_pil](https://github.com/timesler/facenet-pytorch/blob/master/models/utils/training.py#L139) provided by the package.


In [None]:
from facenet_pytorch import training
num_workers = 2
size_batch = 16

loader_lfw = torch.utils.data.DataLoader(dataset_lfw, 
                                         batch_size=size_batch,
                                         pin_memory=True,
                                         num_workers=num_workers,
                                         collate_fn=training.collate_pil)

The attribute `samples` of a PyTorch `torchvision.datasets.ImageFolder` stores the path to the image and the class labels and they are the two information collated to create a mini-batch by the dataloader. However, we do not need the label, but we need the path of the image, to save its cropped vesion with the same path in a new folder. Hence, we overwrite the label with the path to the image in the lfw dataset.

In [None]:
print(dataset_lfw.samples)

In [None]:
dataset_lfw.samples = [(path, path) for path, _ in dataset_lfw.samples]
print(dataset_lfw.samples)

Now we can detect faces on the lfw dataset and save the cropped dataset.

In [None]:
detector = MTCNN(image_size=size_image,
                 margin=14,
                 device=device,
                 selection_method='center_weighted_size')

In [None]:
path_ds_lfw_cropped = f'{path_ds_lfw }_cropped'

for i, (x, paths_batch) in tqdm(enumerate(loader_lfw), total=len(loader_lfw)):
    path_crop = [p.replace(path_ds_lfw, path_ds_lfw_cropped) for p in paths_batch]
    detector(x, save_path=path_crop)
        

In [None]:
dataset_lfw_cropped = torchvision.datasets.ImageFolder(path_ds_lfw_cropped)

In [None]:
index_sample = 11413

image, label = dataset_lfw_cropped[index_sample]

plt.imshow(image)
plt.title(identities[label])
plt.axis("off")
plt.show()

Now we can create the dataset and the data loader for the cropped images. As regard image normalization, we can use the `fixed_image_standardization` [method](https://github.com/timesler/facenet-pytorch/blob/dd0b0e4b5b124b599f75b87e570910e5d80c8848/models/mtcnn.py#L508), since the pre-trained models have been trained with the images normalized according to this [procedure](https://github.com/davidsandberg/facenet). Using the [`torch.utils.data.SequentialSampler`](https://pytorch.org/docs/stable/data.html) we will read the elements sequentially, always in the same order.

In [None]:
from facenet_pytorch import fixed_image_standardization

transform_lfw = transforms.Compose([np.float32,
                                    transforms.ToTensor(),
                                    fixed_image_standardization])

dataset_lfw_cropped = torchvision.datasets.ImageFolder(path_ds_lfw_cropped, 
                                                       transform=transform_lfw)

sampler = torch.utils.data.SequentialSampler(dataset_lfw_cropped)
loader_lfw_cropped = torch.utils.data.DataLoader(dataset_lfw_cropped,
                                                 num_workers=num_workers,
                                                 batch_size=size_batch,
                                                 sampler=sampler)

Take the embedder pretrained on the Casia Web Face dataset.

In [None]:
from facenet_pytorch import InceptionResnetV1

embedder = InceptionResnetV1(pretrained='casia-webface').eval()
embedder.to(device)
summary(embedder, input_size=(3, 160, 160))

Now everything is ready to create the gallery.

In [None]:
gallery = []

with torch.no_grad():
    for image, labels in tqdm(loader_lfw_cropped):
        image = image.to(device)
        batch_embeddings = embedder(image)
        gallery.extend(batch_embeddings.to('cpu').numpy())     

gallery = np.asarray(gallery)
print(f'The gallery has: {len(gallery)} samples.')

Once we have created the gallery, in order to find the identity of a new subject we can classify its embedding using a k-Nearest Neighbor search. The most basic approach is the *brute force* which involves the computation of distances between all the embeddings in the gallery against the new one. Contrary, we will use a `K-D tree`, i.e. a tree-based data structure that helps overcome the computational cost of the brute force approach, whose implementation is available in `scikit-learn`.

In [None]:
from sklearn.neighbors import NearestNeighbors

def nearest_neighbor(sources: np.ndarray, 
                     targets: np.ndarray,
                     num_neighbors: int,
                     algorithm: str = 'kd_tree') -> Tuple[np.ndarray, np.ndarray]:
    """Computes nearest neighbor search.

    Estimates for each sample in source the nearest neighbor in target using 
    the specified algorithm.
    
    Args:
        sources: the source samples.
        targets: the target samples.
        num_neighbors: the number of neighbors to find.
        algorithm: the algorithm to use kd_tree or brute force.

    Returns:
        The euclidean distance from each sample in source to the nearest neighbor in target.
        The indices of the nearest neighbor points on target for each sample in source.
    """    
    kd_tree = NearestNeighbors(n_neighbors=num_neighbors, 
                               algorithm=algorithm, 
                               metric='euclidean')
    kd_tree.fit(targets)
    distances, indices = kd_tree.kneighbors(sources)

    return distances, indices

Let's do a sanity check.

In [None]:
num_neighbors = 3
idx_sanity = 7
nn_distances, nn_indices = nearest_neighbor(gallery[idx_sanity].reshape(1, -1),
                                            gallery,
                                            num_neighbors)

In [None]:
def show_neighbors(query_image: PIL.Image.Image,                   
                   indices: np.ndarray,
                   distances: np.ndarray,
                   dataset: torchvision.datasets,
                   threshold: float,
                   class_query: str = None,
                   transform=None) -> None:
    """Shows the query image together with the found nearest neighbors.

    Args:
        query_image: the input image.        
        indices: the indices of nearest neighbors.
        distances: the distances of nearest neighbors.
        dataset: the dataset on which the gallery was built.
        threshold: the matching threshold
        class_query: the name of the class of the query image.
        transform: the transformation to apply to the image.        

    Returns:
    """
    distances = distances.squeeze()
    indices = indices.squeeze()
    num_images = len(indices) + 1

    figure = plt.figure(figsize=(15, 5))
    figure.add_subplot(1, num_images, 1)
    plt.imshow(query_image)
    class_query_title = class_query if class_query else '' 
    plt.title(f'Query \n {class_query_title} ', color='g')
    plt.axis("off")
    
    for idx, (idx_nn, dist_nn) in enumerate(zip(indices, distances)):        
        figure.add_subplot(1, num_images, idx + 2)
        image, label_matched = dataset[idx_nn]        
        
        if isinstance(label_matched, str):
            class_match = label_matched.split('/')[-1].split('.')[-2]
        else:
            class_match = dataset.classes[label_matched]

        if class_query is not None:
            color = 'g' if class_query == class_match else 'r'
        else:
            color = 'g' if dist_nn <= threshold else 'r'

        plt.title(f'{class_match} \n Distance: {dist_nn:.3f}', color=color)
        if transform:
            image = transform(image)
        plt.imshow(image)
        plt.axis("off")

In [None]:
threshold = 0.85
show_neighbors(dataset_lfw[idx_sanity][0], nn_indices, nn_distances, dataset_lfw, 
               threshold)

Check one subject that is in the database.

In [None]:
def get_image_from_url(image_url:str , mode:str ='RGB') -> PIL.Image.Image:
    """Downloads and opens the image from the url.

    Args:
        image_url: the url for the image.
        mode: a string which defines the type and depth of a pixel in the image.

    Returns:
        The image read.
    """
    response = requests.get(image_url, stream=True)    
    return Image.open(response.raw).convert(mode)

In [None]:
def detect_and_embed(image: PIL.Image.Image, 
                     detector: nn.Module, 
                     embedder: nn.Module) -> Tuple[PIL.Image.Image,
                                                   np.ndarray]:
    """Detects the face from the image and then creates the embedding for it.

    Args:
        image: the input iamge.
        detector: the face detector.
        embedder: the embedder.

    Returns:
        The image of the cropped face.
        The embedding for the face.
    """
    detector.eval()
    embedder.eval()
    
    with torch.no_grad():
        image_face = detector(image)
        image_face_batch = image_face.unsqueeze(0)                
        embedding = embedder(image_face_batch.to(device))                
    
    embedding = embedding.to('cpu').numpy()    
    image_face = image_face.to('cpu')
    image_face = (image_face * (128. / 255.)) + (127. / 255.)
    image_face_pil = transforms.ToPILImage()(image_face)

    return image_face_pil, embedding

In [None]:
image_url = "https://www.automotorinews.it/wp-content/uploads/2022/04/Valentino-Rossi-1.jpg"
image = get_image_from_url(image_url)

image_face, embedding = detect_and_embed(image, detector, embedder)

nn_distances, nn_indices = nearest_neighbor(embedding,
                                            gallery,
                                            num_neighbors)
show_neighbors(image_face, 
               nn_indices, 
               nn_distances, 
               dataset_lfw,
               threshold)