In [3]:
import sys
import random
import matplotlib.pyplot as plt
import os
from pathlib import Path
import argparse
import time 
import numpy as np

In [4]:
module_path = os.path.abspath(os.path.join('..'))
sys.path.append(module_path)
#if module_path not in sys.path:
sys.path.append(module_path+"/test_perf")

    
from src.init_runtime import init_runtime
from main import ROOT_DIR
from test_perf._model_wrappers import EfrsLocal, ModelWrapperBase, EfrsRestApi
from test_perf.dto import Dataset
from test_perf.dto import Image, Name
from test_perf.recognition_accuracy_test import get_lfw_dataset, get_test_dataset

In [5]:
from src import pyutils
from collections import namedtuple
Calculator = namedtuple('Calculator', 'graph sess')
import tensorflow as tf
from src.storage.constants import EMBEDDING_CALCULATOR_MODEL_FILENAME
from src.storage.storage import get_storage
CALCULATOR_VERSION = EMBEDDING_CALCULATOR_MODEL_FILENAME
BATCH_SIZE = 25
from src.face_recognition.dto.embedding import Embedding
import math
from typing import List

@pyutils.run_once
def _calculator() -> Calculator:
    with tf.Graph().as_default() as graph:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(get_storage().get_file(CALCULATOR_VERSION))
        tf.import_graph_def(graph_def, name='')
        return Calculator(graph=graph, sess=tf.Session(graph=graph))


def _calculate_embeddings(cropped_images):
    """
    Quick fix for a bug where it can't handle many cropped_images
    is to give one image at a time.
    """

    with tf.Graph().as_default() as graph:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(get_storage().get_file(CALCULATOR_VERSION))
        tf.import_graph_def(graph_def, name='')
        calculator = Calculator(graph=graph, sess=tf.Session(graph=graph))
    
        # Get tensors and constants
    images_placeholder = calculator.graph.get_tensor_by_name("input:0")
    embeddings = calculator.graph.get_tensor_by_name("embeddings:0")
    phase_train_placeholder = calculator.graph.get_tensor_by_name("phase_train:0")
    embedding_size = embeddings.get_shape()[1]

    # Run forward pass to calculate embeddings
    image_count = len(cropped_images)
    batches_per_epoch = int(math.ceil(1.0 * image_count / BATCH_SIZE))
    emb_array = np.zeros((image_count, embedding_size))
    for i in range(batches_per_epoch):
        start_index = i * BATCH_SIZE
        end_index = min((i + 1) * BATCH_SIZE, image_count)
        feed_dict = {images_placeholder: cropped_images, phase_train_placeholder: False}
        emb_array[start_index:end_index, :] = calculator.sess.run(embeddings, feed_dict=feed_dict)

    # Return DTO
    return [Embedding(array=emb, calculator_version=CALCULATOR_VERSION) for emb in emb_array]

def calculate_embedding(image: np.ndarray) -> Embedding:
    return _calculate_embeddings([image])[0]

def calculate_embeddings(cropped_images: List[np.ndarray]) -> List[Embedding]:
    return [calculate_embedding(image) for image in cropped_images]
    
    
    

In [6]:
import logging
import random
import string
import time
from abc import ABC, abstractmethod
from http import HTTPStatus

import requests

#from src.face_recognition.calc_embedding.calculator import calculate_embeddings
from src.face_recognition.classify_embedding.predict import predict_from_image_with_classifier
from src.face_recognition.classify_embedding.train import get_trained_classifier
from src.face_recognition.crop_faces.crop_faces import crop_one_face
from src.face_recognition.crop_faces.exceptions import NoFaceFoundError
from src.pyutils.serialization import numpy_to_jpg_file
from test_perf.dto import Image, Name


class ModelWrapperBase(ABC):
    @abstractmethod
    def add_face_example(self, img: Image, name: Name):
        pass

    @abstractmethod
    def train(self):
        pass

    @abstractmethod
    def recognize(self, img: Image) -> Name:
        pass


class EfrsLocal(ModelWrapperBase):
    def __init__(self):
        self._cropped_images = []
        self._names = []
        self._classifier = None

    def add_face_example(self, img: Image, name: Name):
        try:
            cropped_img = crop_one_face(img).img
        except NoFaceFoundError as e:
            logging.warning(f"Failed to add face example. Skipping. {str(e)}")
            return 1
        self._cropped_images.append(cropped_img)
        self._names.append(name)
        return 0

    def train(self):
        embeddings = calculate_embeddings(self._cropped_images)
        self._classifier = get_trained_classifier(embeddings, self._names)

    def recognize(self, img: Image) -> Name:
        try:
            predictions = predict_from_image_with_classifier(img=img, classifier=self._classifier, limit=1)
        except NoFaceFoundError as e:
            logging.warning(f"Face is not found in the image to be recognized. Skipping. {str(e)}")
            return ''
        return predictions[0].face_name    

In [20]:
TEST_CODE = ROOT_DIR / 'test_perf'
class mock_model(ModelWrapperBase):
    def __init__(self, dataset):
        self._dataset = dataset


    def add_face_example(self, img: Image, name: Name):
        time.sleep(random.random())
        return random.choice([0,0, 1])

    def train(self):
        time.sleep(random.random())


    def recognize(self, name):
        time.sleep(random.random())
        rand = random.choice([0, 1, 1, 1])
        logging.debug(rand)
        if rand == 1:
            return name
        elif rand == 0:
            return ""
        

In [21]:
# function that returns 1. number of detected faces for both models, 2. how much total time it took to add them,
# 3. time to train both models 
# 4. lists for the undetected faces from the ones that were attempted to be added
def calculate_detection(model_1: ModelWrapperBase, dataset: Dataset, model_2: ModelWrapperBase, dataset_size): 
    undetected_1 = 0
    list_to_remove_1 = []
    undetected_2 = 0
    list_to_remove_2 = []

    add_time_model_1 = 0
    add_time_model_2 = 0
    
    unique_detection = 0
    prev_name = None
    
    count_1 = 0 
    count_2 = 0
    
    while unique_detection <= dataset_size:
        for name, img in dataset.train:
            # for both: insure that we only train with one picture and will be testing only with one picture
            if name == prev_name:
                prev_name = name
                continue
            else: 
                unique_detection += 1
                prev_name = name


            if count_1 < dataset_size:
                # for the first model 
                # step 1: find the time of the start
                start_1 = time.time()
                # step 2: try to add face (or it was not detected --> was_not_detected ==1 else 0)
                was_not_detected = model_1.add_face_example(img, name)
                # step 3: find the end time
                end_1 = time.time()
                # step 4: find the time it took to add the picture 
                add_time_model_1 += (end_1 - start_1)
                # step 5: if was not detected add the count and add to the list to remove from dataset.test
                undetected_1 += was_not_detected
                if was_not_detected:
                    list_to_remove_1.append(name)
                count_1 += 1
                    
            if count_2 < dataset_size:
                # for the second model
                start_2 = time.time()
                was_not_detected = model_2.add_face_example(img, name)
                print (was_not_detected)
                end_2 = time.time()
                add_time_model_2 += (end_2 - start_2)
                if was_not_detected:
                    list_to_remove_2.append(name)
            
                undetected_2 += was_not_detected
                
                count_2 +=1 
        
    start_train_1 = time.time()
    model_1.train()
    end_train_1 = time.time()
    
    total_trian_time_1 = end_train_1 - start_train_1
    
    start_train_2 = time.time()
    model_2.train()
    end_train_2 = time.time()
    
    total_trian_time_2 = end_train_2 - start_train_2
    
    return undetected_1, undetected_2, list_to_remove_1, list_to_remove_2, \
add_time_model_1, add_time_model_2, total_trian_time_1, total_trian_time_2
    




In [16]:
def test_models(model_1: ModelWrapperBase, dataset: Dataset, model_2: ModelWrapperBase, dataset_size, list_to_remove_1, list_to_remove_2):  
    
    recognized_1 = 0
    recognized_2 = 0

    recognition_time_1 = 0
    recognition_time_2 = 0
    
    count_1 = 0 
    count_2 = 0 
    
    prev_name_1 = None
    prev_name_2 = None
    
    # for every element in the test dataset 
    for name, img in dataset.test:
        # if we reached the needed number of faces for both models, we can break out 
        if count_1 > dataset_size and count_2 > dataset_size:
            break
        # for the 2 models
        for model in range (2):
            # for first one 
            if model == 0:
                # check if we are already there in the count
                if count_1 <= dataset_size: 
                    # if the face was not detected by the model before, then it should not be recognized now
                    if name in list_to_remove_1:
                        prev_name_1 = name
                        continue
                    # if we already attempted to recognize this face in another picture, move on (inforces only one test)
                    elif name == prev_name_1:
                        prev_name_1 = name
                        continue
                    # otherwise try recognizing 
                    else:
                        start_recogn_1 = time.time()
                        logging.debug(start_recogn_1)
                        # if we actually recognized it right
                        if name == model_1.recognize(img): # remove name param when not testing the mock
                            # add to the recognized count 
                            recognized_1 += 1
                        end_recogn_1 =  time.time()
                        # add to the recognition time 
                        recognition_time_1 += (end_recogn_1 - start_recogn_1) 
                        prev_name_1 = name
                        count_1+= 1
            # do the same for the other model 
            elif model == 1:
                if count_2 <= dataset_size: 
                    if name in list_to_remove_2:
                        prev_name_2 = name
                        continue
                    elif name == prev_name_2:
                        prev_name_2 = name
                        continue
                    else:
                        start_recogn_2 = time.time()
                        logging.debug(start_recogn_2)
                        if name == model_2.recognize(name):
                            recognized_2 += 1
                        end_recogn_2 = time.time()
                        logging.debug(end_recogn_2)
                        recognition_time_2 += (end_recogn_2 - start_recogn_2) 
                        prev_name_2 = name
                        count_2 += 1
    return recognition_time_1, recognition_time_2, recognized_1, recognized_2


In [None]:
#%matplotlib inline
import mpld3
mpld3.enable_notebook()
plt.rcParams['figure.figsize'] = [9, 9]

def build_graph(dict):
    
    width_ = 100
    fig, ax = plt.subplots()
    x = []
    y1 = []
    y2 = []
    for key in dict:
        x.append(key)

        y1.append(dict[key][0])
        y2.append(dict[key][1])
    x = np.asarray(x)
    y1 = np.asarray(y1)
    y2 = np.asarray(y2)
    print(x)
    print(y1)
    print(y2)
    error_config = {'ecolor': '0.3'}

    #width = np.diff(x)
    #width = [  90 , 900, 9000, 90000]
    #width = [9, 90, 900, 9000]
    width = []
    for i in range (len(x)):
        number = x[i]-10**(i)
        width.append(number)
                     
    rects1 = plt.bar(x, y1, width, color='#028482', error_kw=error_config, label='Model #1', ec="k")
    rects2 = plt.bar(x + width, y2, width, color='#7ABA7A', error_kw=error_config, label='Model #2', ec="k")

    ax.set_xscale("log")
    ax.set_ylabel('% recognized')
    ax.set_title('Scores by size of dataset and model')
    width = np.asarray(width)
    ax.set_xticks(x + width / 2)
    ax.set_xticklabels(x)

    ax.legend()
    plt.tight_layout()
    # plt.figure()
    plt.show()


#mock_result = {10: (74, 98), 100: (78, 90), 1000: (81, 95), 10000: (80, 90)}
#build_graph(mock_result)

In [36]:

def main_test(test, host):
    
    init_runtime()
    dataset = get_test_dataset() if test else get_lfw_dataset()
    model_1 = EfrsRestApi(host) if host else EfrsLocal()
    #model_1 = EfrsLocal()
    #model_2 = EfrsLocal()
    #model_1 = mock_model(dataset)
    model_2 = mock_model(dataset)

    
    size_1 = 10
    size_2 = 100
    size_3 = 1000
    size_4 = 10000
    sizes = [size_1] #size_2, size_3, size_4
    
    recognized_dict = {}
    for size in sizes:
        undetected_1, undetected_2, list_to_remove_1, list_to_remove_2, \
add_time_model_1, add_time_model_2, total_trian_time_1, total_trian_time_2 = calculate_detection(model_1, dataset, model_2, size)
        print (undetected_1, undetected_2)
        detected_1 = size - undetected_1
        detected_percent_1 = (detected_1 / size) * 100 
        detected_2 = size - undetected_2
        detected_percent_2 = (detected_2 / size) * 100 
        recognition_time_1, recognition_time_2, recognized_1, recognized_2 = test_models(model_1, dataset, model_2, size, list_to_remove_1, list_to_remove_2)
        recognized_percent_1 = (recognized_1 / size) * 100 
        recognized_percent_2 = (recognized_2 / size) * 100
        recognized_dict[size] = (recognized_1, recognized_2)
     
        
        
        print(f'==================\n'
              f'Model #1 Performance: \n'
              f'Detected faces: {detected_percent_1}% ({detected_1}/{size})\n'
              f'Total time for adding faces: {add_time_model_1}\n'
              f'Total time for training model: {total_trian_time_1}\n'
              f'------------------\n'
              f'Recognized faces: {recognized_percent_1}% ({recognized_1}/{size})\n'
              f'Total time for recognizing faces: {recognition_time_2}\n')
        
        print(f'==================\n'
              f'Model #2 Performance: \n'
              f'Detected faces: {detected_percent_2}% ({detected_2}/{size})\n'
              f'Total time for adding faces: {add_time_model_2}\n'
              f'Total time for training model: {total_trian_time_2}\n'
              f'------------------\n'
              f'Recognized faces: {recognized_percent_2}% ({recognized_2}/{size})\n'
              f'Total time for recognizing faces: {recognition_time_2}\n')
        
        
        build_graph(recognized_dict)
    
  
host = None # put the host here to use the EfrsRestApi
test = False 
main_test(test, host)









DEBUG:root:Using MongoDB at localhost:27017
usage: ipykernel_launcher.py [-h] [--host HOST] [--test]
ipykernel_launcher.py: error: unrecognized arguments: -f /Users/elizabeth/Library/Jupyter/runtime/kernel-59ea3a4c-dc9f-4672-93b6-bc85d2c0ed95.json
  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


SystemExit: 2