# Face detection and recognition inference pipeline

The following example illustrates how to use the `facenet_pytorch` python package to perform face detection and recogition on an image dataset using an Inception Resnet V1 pretrained on the VGGFace2 dataset.

The following Pytorch methods are included:
* Datasets
* Dataloaders
* GPU/CPU processing

In [None]:
from facenet_pytorch import MTCNN, InceptionResnetV1
import torch
from torch.utils.data import DataLoader
from torchvision import datasets
import numpy as np
import pandas as pd
import os

workers = 0 if os.name == 'nt' else 4

#### Determine if an nvidia GPU is available

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

#### Define MTCNN module

Default params shown for illustration, but not needed. Note that, since MTCNN is a collection of neural nets and other code, the device must be passed in the following way to enable copying of objects when needed internally.

See `help(MTCNN)` for more details.

In [None]:
mtcnn = MTCNN(
    image_size=160, margin=0, min_face_size=20,
    thresholds=[0.6, 0.7, 0.7], factor=0.709, post_process=True,
    device=device, keep_all = True
)

#### Define Inception Resnet V1 module

Set classify=True for pretrained classifier. For this example, we will use the model to output embeddings/CNN features. Note that for inference, it is important to set the model to `eval` mode.

See `help(InceptionResnetV1)` for more details.

In [None]:
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

#### Define a dataset and data loader

We add the `idx_to_class` attribute to the dataset to enable easy recoding of label indices to identity names later one.

In [None]:
from PIL import Image, ImageDraw
aligned_faces = []
picture_tracked = []
detected_faces = [] 
image_paths = ["../drama/drama_01","../drama/drama_02"] 
for p in image_paths:
    for img in os.listdir(p):
        im = Image.open(os.path.join(p, img))
        im = im.convert('RGB')
        boxes, _ = mtcnn.detect(im)
        aligned, prob= mtcnn(im, return_prob=True)
        if aligned is not None:
            for a in aligned: 
                aligned_faces.append(a)
        if boxes is None:
            continue
        frame_draw = im.copy()
        draw = ImageDraw.Draw(frame_draw)
        for box in boxes:
            draw.rectangle(box.tolist(), outline=(255, 0, 0), width=6)
            face = frame_draw.crop(box)
            detected_faces.append(face)
        picture_tracked.append(frame_draw)

In [None]:
from IPython import display

d = display.display(picture_tracked[0], display_id=True)
i = 1
try:
    while i < len(picture_tracked):
        d.update(picture_tracked[i % len(picture_tracked)])
        i += 1
except KeyboardInterrupt:
    pass

In [None]:
for face in detected_faces:
    display.display(face, display_id=True)

In [None]:
# names = ["新垣結衣", "配角1", "星野源", "新垣結衣2", "配角2",
#          "配角3", "星野源2", "星野源3", "新垣結衣3", "新垣結衣4",
#          "配角2-2", "星野源4", "新垣結衣5", "星野源5","新垣結衣6", "吉高由里子",
#          "配角4", "吉高由里子2","配角4","內田有紀","吉高由里子3","內田有紀2","吉高由里子4",
#          "向井理","吉高由里子5", "配角5", "配角6","內田有紀3"]


names = ["Aragaki_Yui", "co_star1", "Hoshino_Gen", "Aragaki_Yui2", "co_star2",
         "co_star3", "Hoshino_Gen2", "Hoshino_Gen3", "Aragaki_Yui3", "Aragaki_Yui4",
         "co_star2-2", "Hoshino_Gen4", "Aragaki_Yui5", "Hoshino_Gen5","Aragaki_Yui6", "Yuriko_Yoshitaka",
         "co_star4", "Yuriko_Yoshitaka2","co_star4","Yuki_Uchida","Yuriko_Yoshitaka3","Yuki_Uchida2","Yuriko_Yoshitaka4",
         "Mukai_Osamu","Yuriko_Yoshitaka5", "co_star5", "co_star6","Yuki_Uchida3"]

#### Calculate image embeddings

MTCNN will return images of faces all the same size, enabling easy batch processing with the Resnet recognition module. Here, since we only have a few images, we build a single batch and perform inference on it. 

For real datasets, code should be modified to control batch sizes being passed to the Resnet, particularly if being processed on a GPU. For repeated testing, it is best to separate face detection (using MTCNN) from embedding or classification (using InceptionResnetV1), as calculation of cropped faces or bounding boxes can then be performed a single time and detected faces saved for future use.

In [None]:
aligned = torch.stack(tuple(aligned_faces)).to(device)

embeddings = resnet(aligned).detach().cpu()
embeddings

In [None]:
embeddings.shape

#### Print distance matrix for classes

In [None]:
dists = [[(e1 - e2).norm().item() for e2 in embeddings] for e1 in embeddings]
df = pd.DataFrame(dists, columns=names, index=names)
df 

In [None]:
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure

figure(figsize=(10, 10), dpi=80)
plt.yticks(np.arange(0.5, len(df.index), 1), df.index)
plt.xticks(np.arange(0.5, len(df.columns), 1), df.columns)
plt.xticks(rotation = 60) 
plt.pcolor(df)

In [None]:
!pip install requests 
!pip install requests-aws4auth
!pip install Elasticsearch==7.12.1
!pip install urllib3

In [None]:
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3

host = 'search-face-recog-sd5rhxmhulra6lqh67sxcp5nxi.us-west-2.es.amazonaws.com' # For example, my-test-domain.us-east-1.es.amazonaws.com
region = 'us-west-2' # e.g. us-west-1

service = 'es'
credentials = boto3.Session().get_credentials()



es = Elasticsearch(
    hosts = [{'host': host, 'port': 443}],
    http_auth = ('root','Peggy@@0218'),
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

In [None]:
# es.indices.delete(index='faces', ignore=[400, 404])


In [None]:
knn_index = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "face_vector": {
                "type": "knn_vector",
                "dimension": 512
            }
        }
    }
}

es.indices.create(index="faces",body=knn_index,ignore=400)

In [None]:
def es_import(vector, celebid, id):
    es.index(index='faces',
             id=id, 
             body={"face_vector": vector, 
                   "celebid":celebid})
        
# es_import([0 for i in range(0, 256)], "q1")

In [None]:
for idx, (name, vector) in enumerate(zip(names, embeddings)): 
    es_import(vector.tolist(), name, idx)
    

In [None]:
def post(vector):
    res = es.search(index="faces",
                    body={
                        "size": 5,
                            "_source": {
                                "exclude": ["face_vector"]
                            },
                            "min_score": 0.3,
                            "query": {
                                "knn": {
                                    "face_vector": {
                                        "vector": vector,
                                        "k": 5
                                    }
                                }
                            }
                    })
    return res



In [None]:
p="../drama/"
test_aligned = []
for img in ['test1.jpeg', 'test2.jpg']:
    im = Image.open(os.path.join(p, img))
    im = im.convert('RGB')
    boxes, _ = mtcnn.detect(im)
    aligned, prob= mtcnn(im, return_prob=True)
    for b in boxes: 
        display.display(im.crop(b), display_id=True)

    aligned = torch.stack(tuple(aligned)).to(device)
    embeddings = resnet(aligned).detach().cpu()
    for emb in embeddings:
        result = post(emb.tolist())
        print(result)
        