In [1]:
from sklearn.metrics.pairwise import euclidean_distances, cosine_similarity
from sklearn.metrics import accuracy_score
import json
import os
import ast
import pandas as pd
import numpy as np
import create_directory
import pathlib

# Load dữ liệu

In [2]:
# label2idx
with open(os.path.join(create_directory.recognition_model_dir, 'label2idx.json')) as json_file:
    label2idx = json.load(json_file)

# Load dữ liệu test
test_data_path = os.path.join(create_directory.marvel_data_dir, "distance_face_recognition/fixed_threshold")
with open(os.path.join(test_data_path, "test_feature.json"), "r") as outfile:
    x_test = json.load(outfile)
with open(os.path.join(test_data_path, "test_label_idx.json"), "r") as outfile:
    y_test = json.load(outfile)

# Face Recognition

## Xây dựng hàm

In [3]:
def create_train_data(data_path, sample):
    with open(os.path.join(data_path, "train_sample_{}_feature.json".format(sample)), "r") as outfile:
        train_feature = json.load(outfile)
    with open(os.path.join(data_path, "train_sample_{}_label_idx.json".format(sample)), "r") as outfile:
        train_label_idx = json.load(outfile)
    df = pd.DataFrame.from_dict({'feature': train_feature, 'label': train_label_idx}) 
    df["idx"] = df.index
    return df

In [4]:
# Hàm cập nhật threshold
def update_threshold(data, metric, pair):
    threshold_data = pd.DataFrame({'feature': [], 'label': [], 'threshold': [], 'idx': []})
    threshold_data = threshold_data.astype({"feature": object, "label": int, 'threshold': float, 'idx': int})
    for index in range(len(data)):
        register_data = data.iloc[index].to_dict()
        score_info = []
        # Tính score của điểm đang xét với các điểm khác trong threshold data
        for i in range(len(threshold_data)):
            item = {}
            if pair == 'cross_similarities':
                if threshold_data.iloc[i]['label']!=register_data['label']:
                    item['idx']=threshold_data.iloc[i]['idx']
                    if metric=='euclidean_distance':
                        item['score']=euclidean_distances(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(register_data['feature']).reshape(1, -1))[0][0]
                    elif metric=='cosine_similarity':
                        item['score']=cosine_similarity(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(register_data['feature']).reshape(1, -1))[0][0]
                    score_info.append(item)
            if pair == 'auto_similarities':
                if threshold_data.iloc[i]['label']==register_data['label']:
                    item['idx']=threshold_data.iloc[i]['idx']
                    if metric=='euclidean_distance':
                        item['score']=euclidean_distances(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(register_data['feature']).reshape(1, -1))[0][0]
                    elif metric=='cosine_similarity':
                        item['score']=cosine_similarity(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(register_data['feature']).reshape(1, -1))[0][0]
                    score_info.append(item)
        # Update threshold của các điểm trong threshold data
        for item in score_info:
            if pair =='cross_similarities':
                if metric=='euclidean_distance':
                    threshold_data.loc[threshold_data['idx']==item['idx'], 'threshold']=min(threshold_data.loc[threshold_data['idx']==item['idx']]['threshold'].item(), item['score'])
                elif metric=='cosine_similarity':
                    threshold_data.loc[threshold_data['idx']==item['idx'], 'threshold']=max(threshold_data.loc[threshold_data['idx']==item['idx']]['threshold'].item(), item['score'])
            if pair =='auto_similarities':
                if metric=='euclidean_distance':
                    threshold_data.loc[threshold_data['idx']==item['idx'], 'threshold']=max(threshold_data.loc[threshold_data['idx']==item['idx']]['threshold'].item(), item['score'])
                elif metric=='cosine_similarity':
                    threshold_data.loc[threshold_data['idx']==item['idx'], 'threshold']=min(threshold_data.loc[threshold_data['idx']==item['idx']]['threshold'].item(), item['score'])
        # Thêm threshold của điểm đang xét và thêm điểm đó vào threshold data
        if metric=='euclidean_distance':
            if pair=='cross_similarities':
                initial_threshold = 2 
                register_data['threshold'] = min([item['score'] for item in score_info]+[initial_threshold])
            if pair=='auto_similarities':
                initial_threshold = 0 
                register_data['threshold'] = max([item['score'] for item in score_info]+[initial_threshold])
        elif metric=='cosine_similarity':
            if pair=='cross_similarities':
                initial_threshold = 0 
                register_data['threshold'] = max([item['score'] for item in score_info]+[initial_threshold])
            if pair=='auto_similarities':
                initial_threshold = 1 
                register_data['threshold'] = min([item['score'] for item in score_info]+[initial_threshold])
        new_row = pd.DataFrame({'feature':[register_data['feature']], 'label': [register_data['label']], 'threshold':[register_data['threshold']], 'idx':[register_data['idx']]})
        threshold_data = pd.concat([threshold_data, new_row])
    return threshold_data

In [5]:
# Hàm dự đoán
def predict(feat, threshold_data, metric):
    candidate_score=[]
    for i in range(len(threshold_data)):
        if metric == 'euclidean_distance':
            score = euclidean_distances(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(feat).reshape(1, -1))[0][0]
            candidate_score.append(score)
        elif metric == 'cosine_similarity':
            score = cosine_similarity(np.array(threshold_data.iloc[i]['feature']).reshape(1, -1), np.array(feat).reshape(1, -1))[0][0]
            candidate_score.append(score)
    if metric == 'euclidean_distance':
        best_score = min(candidate_score)
        idx = candidate_score.index(best_score)
        if best_score < threshold_data.iloc[idx]['threshold']:
            return threshold_data.iloc[idx]['label']
        else:
            return label2idx['Unknown']
    elif metric == 'cosine_similarity':
        best_score = max(candidate_score)
        idx = candidate_score.index(best_score)
        if best_score > threshold_data.iloc[idx]['threshold']:
            return threshold_data.iloc[idx]['label']
        else:
            return label2idx['Unknown']

In [6]:
def predict_test(x_test, data_path, sample, metric, pair, save_threshold="False", save_path=""):
    data = create_train_data(data_path, sample)
    threshold_data = update_threshold(data, metric, pair)
    if save_threshold:
        threshold_data.to_csv(save_path, index=False)
    pred = [predict(feat, threshold_data, metric) for feat in x_test]
    return pred

## Thử nghiệm và lưu lại kết quả

In [7]:
data_path = os.path.join(create_directory.marvel_data_dir, "distance_face_recognition/fixed_threshold")
update_path = os.path.join(create_directory.marvel_data_dir, "distance_face_recognition/adaptive_threshold")
sample_list = [0,1,2,3]

### Cross similarities

#### Euclidean distance

##### Lấy kết quả dự đoán

In [8]:
update_threshold_path = os.path.join(update_path, 'threshold', 'cross_similarities', 'euclidean')
if not os.path.exists(update_threshold_path):
    os.makedirs(update_threshold_path)

In [9]:
result = {}
for sample in sample_list:
    print('Sample {}'.format(sample))
    save_threshold_path = os.path.join(update_threshold_path, 'sample_{}_threshold.csv'.format(sample))
    result['sample {}'.format(sample)]=predict_test(x_test, data_path, sample, metric='euclidean_distance', pair='cross_similarities', save_threshold=True, save_path=save_threshold_path)

Sample 0
Sample 1
Sample 2
Sample 3


##### Lưu Kết quả

In [10]:
def evaluation(true, pred):
    fa = 0  # False accept
    wa = 0  # Wrong answer
    fr = 0  # False reject
    accept = 0
    reject = 0

    for (i, j) in zip(true, pred):
        # Hệ thống nhận diện khuôn mặt đó có trong database
        if j != label2idx["Unknown"]:
            accept+=1
            # Hệ thống nhận diện khuôn mặt Unknown thành khuôn mặt trong database
            if i == label2idx["Unknown"]:
                fa+=1
            else:
                # Hệ thống nhận diện nhầm khuôn mặt trong database
                if i!=j:
                    wa+=1
        else:
            reject+=1
            if i != label2idx["Unknown"]:
                fr+=1
    # Mong muốn giảm fa, wa
    return (fa, wa, fr, accept, reject)

In [11]:
save_path = os.path.join(create_directory.result_dir, 'test_threshold/adaptive_threshold')
if not os.path.exists(save_path):
    os.makedirs(save_path)

In [12]:
def save_result(result, save_path, save_name):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    sample_list, fa_list, wa_list, fr_list, accept_list, reject_list, accuracy_list = [], [], [], [], [], [], []
    for sample, y_pred in result.items():
        fa, wa, fr, accept, reject = evaluation(y_test, y_pred)
        acc = accuracy_score(y_test, y_pred)
        sample_list.append(sample)
        fa_list.append(fa)
        wa_list.append(wa)
        fr_list.append(fr)
        accept_list.append(accept)
        reject_list.append(reject)
        accuracy_list.append(acc)
    result_dict = {'sample': sample_list, 'accept': accept_list, 'fa': fa_list, 'wa': wa_list, 
                   'reject': reject_list, 'fr': fr_list, 'accuracy': accuracy_list}
    df = pd.DataFrame(result_dict)
    df.to_csv(os.path.join(save_path, save_name+'.csv'), index=False)     

In [13]:
save_result(result, os.path.join(save_path, 'cross_similarities'), 'adaptive_result_euclidean')

#### Cosine Similarity

##### Lấy kết quả dự đoán

In [14]:
update_threshold_path = os.path.join(update_path, 'threshold', 'cross_similarities', 'cosine')
if not os.path.exists(update_threshold_path):
    os.makedirs(update_threshold_path)

In [15]:
result = {}
for sample in sample_list:
    save_threshold_path = os.path.join(update_threshold_path, 'sample_{}_threshold.csv'.format(sample))
    result['sample {}'.format(sample)]=predict_test(x_test, data_path, sample, metric='cosine_similarity', pair='cross_similarities', save_threshold=True, save_path=save_threshold_path)

##### Lưu Kết quả

In [16]:
save_result(result, os.path.join(save_path, 'cross_similarities'), 'adaptive_result_cosine')

### Auto similarities

#### Euclidean distance

##### Lấy kết quả dự đoán 

In [17]:
update_threshold_path = os.path.join(update_path, 'threshold', 'auto_similarities', 'euclidean')
if not os.path.exists(update_threshold_path):
    os.makedirs(update_threshold_path)

In [18]:
result = {}
for sample in sample_list:
    save_threshold_path = os.path.join(update_threshold_path, 'sample_{}_threshold.csv'.format(sample))
    result['sample {}'.format(sample)]=predict_test(x_test, data_path, sample, metric='euclidean_distance', pair='auto_similarities', save_threshold=True, save_path=save_threshold_path)

##### Lưu kết quả

In [19]:
save_result(result, os.path.join(save_path, 'auto_similarities'), 'adaptive_result_euclidean')

#### Cosine similarity

##### Lấy kết quả dự đoán

In [20]:
update_threshold_path = os.path.join(update_path, 'threshold', 'auto_similarities', 'cosine')
if not os.path.exists(update_threshold_path):
    os.makedirs(update_threshold_path)

In [21]:
result = {}
for sample in sample_list:
    save_threshold_path = os.path.join(update_threshold_path, 'sample_{}_threshold.csv'.format(sample))
    result['sample {}'.format(sample)]=predict_test(x_test, data_path, sample, metric='cosine_similarity', pair='auto_similarities', save_threshold=True, save_path=save_threshold_path)

##### Lưu kết quả

In [22]:
save_result(result, os.path.join(save_path, 'auto_similarities'), 'adaptive_result_cosine')