## Face search demo using OpenDistro knn and machine learning.
inspired from https://github.com/colorzhang/face-search

High level steps:

1. face detection and feature extraction
2. feature vector index in ES
3. 1:N vector search

## Detect face and extact features

In [63]:
%matplotlib inline
import math
import os
import os.path

import face_recognition
from face_recognition.face_recognition_cli import image_files_in_folder

import ssl

from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context

import matplotlib.pyplot as plt 
import matplotlib.image as mpimg
from matplotlib.pyplot import imshow

from PIL import Image
import PIL.Image
from IPython import display
import cv2

from io import BytesIO
import io

import base64
from base64 import b64decode

import numpy as np

import re

from tqdm import tqdm


In [64]:
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
    
es = Elasticsearch(hosts=[{'host': '127.0.0.1', 'port': 9200}],
                   scheme="https",
                   verify_certs=False,
                   ssl_context=ssl_context, 
                   http_auth=('admin', 'admin')
                  )

  "When using `ssl_context`, all other SSL related kwargs are ignored"


In [65]:
def convertTobase64(filename):
    # Convert digital data to binary format
    with open(filename, "rb") as image_file:
        encoded_string = base64.b64encode(image_file.read())
    return encoded_string

In [66]:
def display_img(img_path):
    image = plt.imread(img_path)

    fig, ax = plt.subplots(figsize=(16,8))
    ax.imshow(image)
    ax.axis('off')

In [67]:
def display_base64img(base64_string):
    i = base64.b64decode(base64_string)
    i = io.BytesIO(i)
    i = mpimg.imread(i, format='JPG')
    plt.imshow(i, interpolation='nearest')
    plt.show()

In [68]:
def display_crop(area, img):
    top, right, bottom, left = area
    imag = PIL.Image.open(img)
    cropped_img = imag.crop((left, top, right, bottom))
    plt.imshow(cropped_img, interpolation='nearest')
    plt.show()
    imshow(np.asarray(cropped_img))

In [89]:
def extract_features(train_dir):
    X = []
    y = []
    img_paths = []
    countries = []
    functions = []
    names = []
    # remember file is train/country_function_name/file.jpg
    file_regex = r"(.*)_(.*)_(.*)*"
    # Loop through each person in the training set
    pbar = tqdm(os.listdir(train_dir))
    for class_dir in pbar:
        if not os.path.isdir(os.path.join(train_dir, class_dir)):
            continue
        pbar.set_description(f"Processing {class_dir}")
        matches = re.match(file_regex, class_dir, re.MULTILINE)
        
        country = matches[1]
        function = matches[2]
        name = matches[3]
        
        # Loop through each training image for the current person
        for img_path in image_files_in_folder(os.path.join(train_dir, class_dir)):

            image = face_recognition.load_image_file(img_path)
            face_bounding_boxes = face_recognition.face_locations(image)

            if len(face_bounding_boxes) != 1:
                pass
                # If there are no people (or too many people) in a training image, skip the image.
                # print("Image {} not suitable for training: {}".format(img_path, "Didn't find a face" if len(face_bounding_boxes) < 1 else "Found more than one face"))
            else:
                # Add face encoding for current image to the training set
                X.append(face_recognition.face_encodings(image, known_face_locations=face_bounding_boxes)[0])
                y.append(class_dir)
                img_paths.append(img_path)
                countries.append(country)
                functions.append(function)
                names.append(name)
        
    return X, y, img_paths, countries, functions, names

In [84]:
def es_create_index(idx_name="idx_faces"):
    #Define KNN Elasticsearch index maping
    knn_index = {
        "settings": {
            "index.knn": True
        },
        "mappings": {
            "properties": {
                "face_img_vector": {
                    "type": "knn_vector",
                    "dimension": 128
                }
            }
        }
    }

    #Creating the Elasticsearch index
    es.indices.create(index=idx_name, body=knn_index, ignore=400)
    es.indices.get(index=idx_name)

In [85]:
def es_import(element, idx_name="idx_faces"):
    es.index(index=idx_name,
             body={"face_img_vector": element[0], 
                   "class": element[1],
                   "img_path": element[2],
                   "country": element[3],
                   "function_type": element[4],
                   "name": element[5],
                   "base64": convertTobase64(element[2])
                  },
            ignore=400)

In [86]:
def es_clean(idx_name="idx_faces"):
    res = es.delete_by_query(index=idx_name, body={"query": {"match_all": {}}})
    return res

In [87]:
def es_search_face(face_feature, top_k=1, idx_name="idx_faces"):
    res = es.search(request_timeout=30, 
                    index=idx_name,
                    body={'size': top_k, 
                          'query': {
                              'knn': {
                                  'face_img_vector': {
                                      'vector': face_feature, 
                                      'k': top_k
                                  }
                              }
                          }
                         },
                    ignore=400)

In [None]:
TRAIN_DIR = "train"
vectors, classes, img_paths, countries, functions, names = extract_features(TRAIN_DIR)
print(classes)
print(img_paths)
print(countries)
print(functions)
print(names)
print(len(vectors[0]))

Processing russia_pm_dmitriy-anatolyevich-medvedev:   1%|          | 1/168 [00:04<13:09,  4.73s/it]           

In [None]:
for vectors, names, img_paths, countries, functions in zip(vectors, classes, img_paths, countries, functions, names):
    # print(vector, name)
    es_import([vectors, names, img_paths, countries, functions, names])

## 1:N face search

In [None]:
test_img_path = 'G7.jpeg'
display_img(test_img_path)

image = face_recognition.load_image_file(test_img_path)
face_bounding_boxes = face_recognition.face_locations(image)

face_features = face_recognition.face_encodings(image, known_face_locations=face_bounding_boxes)

In [None]:
k = 1
DISTANCE_THRESHHOLD = 0.5
idx_name = 'idx_faces'
for idx, face_feature in enumerate(face_features):
    print(f"################ FACE #{idx} #######")
    display_crop(face_bounding_boxes[idx], test_img_path)
    res = es_search_face(face_feature)
    print("Return top 1 with score: %s" % res['hits']['max_score'])

    if (res['hits']['hits'][0]['_score'] >= DISTANCE_THRESHHOLD):
        print("Found %s" % res['hits']['hits'][0]['_source']['name'])
#        display_img(res['hits']['hits'][0]['_source']['base64'])
        display_base64img(res['hits']['hits'][0]['_source']['base64'])
        
    else:
        print("No faces found in DB.")