### facenet-pytorch LFW评估

本笔记本演示了如何针对LFW数据集评估性能。

In [4]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training, extract_face
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler, SequentialSampler
from torchvision import datasets, transforms
import numpy as np
import os
import pickle,pdb

data_dir = '../data/lfw/lfw'
pairs_path = '../data/lfw/pairs.txt'

batch_size = 2
epochs = 15
workers = 0 if os.name == 'nt' else 8

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('在该设备上运行: {}'.format(device))

在该设备上运行: cpu


In [5]:
mtcnn = MTCNN(
    image_size=160,
    margin=14,
    device=device,
    selection_method='center_weighted_size'
)

In [6]:
# 定义输入图像的数据加载器
dataset = datasets.ImageFolder(data_dir, transform=None)
# img_list_1=[img for (img,idx) in dataset.imgs]
# with open("img_list_1.pkl","wb") as file:
#     pickle.dump(img_list_1,file)
print(dataset)
print(len(dataset))
print(len(dataset.imgs))
print(len(dataset.classes))
print(dataset.classes[-1])
print(dataset.classes)
print(dataset.imgs)

Dataset ImageFolder
    Number of datapoints: 13231
    Root location: ../data/lfw/lfw
13231
13231
5749
Zydrunas_Ilgauskas
['AJ_Cook', 'AJ_Lamas', 'Aaron_Eckhart', 'Aaron_Guiel', 'Aaron_Patterson', 'Aaron_Peirsol', 'Aaron_Pena', 'Aaron_Sorkin', 'Aaron_Tippin', 'Abba_Eban', 'Abbas_Kiarostami', 'Abdel_Aziz_Al-Hakim', 'Abdel_Madi_Shabneh', 'Abdel_Nasser_Assidi', 'Abdoulaye_Wade', 'Abdul_Majeed_Shobokshi', 'Abdul_Rahman', 'Abdulaziz_Kamilov', 'Abdullah', 'Abdullah_Ahmad_Badawi', 'Abdullah_Gul', 'Abdullah_Nasseef', 'Abdullah_al-Attiyah', 'Abdullatif_Sener', 'Abel_Aguilar', 'Abel_Pacheco', 'Abid_Hamid_Mahmud_Al-Tikriti', 'Abner_Martinez', 'Abraham_Foxman', 'Aby_Har-Even', 'Adam_Ant', 'Adam_Freier', 'Adam_Herbert', 'Adam_Kennedy', 'Adam_Mair', 'Adam_Rich', 'Adam_Sandler', 'Adam_Scott', 'Adel_Al-Jubeir', 'Adelina_Avila', 'Adisai_Bodharamik', 'Adolfo_Aguilar_Zinser', 'Adolfo_Rodriguez_Saa', 'Adoor_Gopalakarishnan', 'Adrian_Annus', 'Adrian_Fernandez', 'Adrian_McPherson', 'Adrian_Murrell', 'Adria

In [7]:
# 覆盖数据集中的类标签以使用路径，以便在mtcnn批处理中保存输出
dataset.samples = [
    (p, p)
    for p, _ in dataset.samples
]

loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    collate_fn=training.collate_pil
)


In [8]:
crop_paths = []
box_probs = []

#mtcnn裁剪图片，并保存到_cropped
for i, (x, b_paths) in enumerate(loader):
    # print(b_paths[0])
    # pdb.set_trace()
    crops = [p.replace(data_dir, data_dir + '_cropped') for p in b_paths]
    crops = [p.replace("\\","/") for p in crops]
    # print(crops[0])
    # pdb.set_trace()
    mtcnn(x, save_path=crops)
    crop_paths.extend(crops)
    print('\r第 {} 批，共 {} 批'.format(i + 1, len(loader)), end='')

with open('./crop_paths.txt','wb') as file:
    pickle.dump(crop_paths,file)

第 22 批，共 6616 批

KeyboardInterrupt: 

In [None]:
print(len(crop_paths))
# with open('./crop_paths.txt','rb') as file:
#     crop_paths=pickle.load(file)
# print(crop_paths[-1])

# with open('./embed_to_cls.pkl','rb') as file:
#     embeddings_dict=pickle.load(file)

# print(list(embeddings_dict)[-1])

13231


In [None]:
# 为减少GPU内存使用，删除mtcnn
del mtcnn
torch.cuda.empty_cache()

In [None]:
# 从MTCNN裁剪的图像输出创建数据集和数据加载器

# transform.Compose是PyTorch中的一个类，用于将多个图像变换操作组合在一起。它的作用是将这些操作按照顺序依次应用于输入的图像数据。
trans = transforms.Compose([
    np.float32,
    transforms.ToTensor(),
    fixed_image_standardization
])

dataset = datasets.ImageFolder(data_dir + '_cropped', transform=trans)
print(len(dataset))
print(len(dataset.imgs))
print(len(dataset.classes))
print(dataset.classes[-1])
print(dataset.classes)
img_list_2=[img for (img,idx) in dataset.imgs]
with open("img_list_2.pkl","wb") as file:
    pickle.dump(img_list_2,file)

embed_loader = DataLoader(
    dataset,
    num_workers=workers,
    batch_size=batch_size,
    sampler=SequentialSampler(dataset)
)

13231
13231
5749
Zydrunas_Ilgauskas
['AJ_Cook', 'AJ_Lamas', 'Aaron_Eckhart', 'Aaron_Guiel', 'Aaron_Patterson', 'Aaron_Peirsol', 'Aaron_Pena', 'Aaron_Sorkin', 'Aaron_Tippin', 'Abba_Eban', 'Abbas_Kiarostami', 'Abdel_Aziz_Al-Hakim', 'Abdel_Madi_Shabneh', 'Abdel_Nasser_Assidi', 'Abdoulaye_Wade', 'Abdul_Majeed_Shobokshi', 'Abdul_Rahman', 'Abdulaziz_Kamilov', 'Abdullah', 'Abdullah_Ahmad_Badawi', 'Abdullah_Gul', 'Abdullah_Nasseef', 'Abdullah_al-Attiyah', 'Abdullatif_Sener', 'Abel_Aguilar', 'Abel_Pacheco', 'Abid_Hamid_Mahmud_Al-Tikriti', 'Abner_Martinez', 'Abraham_Foxman', 'Aby_Har-Even', 'Adam_Ant', 'Adam_Freier', 'Adam_Herbert', 'Adam_Kennedy', 'Adam_Mair', 'Adam_Rich', 'Adam_Sandler', 'Adam_Scott', 'Adel_Al-Jubeir', 'Adelina_Avila', 'Adisai_Bodharamik', 'Adolfo_Aguilar_Zinser', 'Adolfo_Rodriguez_Saa', 'Adoor_Gopalakarishnan', 'Adrian_Annus', 'Adrian_Fernandez', 'Adrian_McPherson', 'Adrian_Murrell', 'Adrian_Nastase', 'Adriana_Lima', 'Adriana_Perez_Navarro', 'Adrianna_Zuzic', 'Adrien_Brody', 

In [None]:
# 加载预训练的Resnet模型
resnet = InceptionResnetV1(
    classify=False,
    pretrained='vggface2'
).to(device)

In [None]:
# resnet获得cropped_image的嵌入式表示

classes = []
embeddings = []
resnet.eval()
cnt=1
with torch.no_grad():
    for xb, yb in embed_loader:
        print(f'\r第{cnt}批', end='');  cnt+=1
        xb = xb.to(device)
        b_embeddings = resnet(xb)
        b_embeddings = b_embeddings.to('cpu').numpy()
        classes.extend(yb.numpy())
        embeddings.extend(b_embeddings)

print(list(embeddings)[-1])

第6616批[-3.49269696e-02 -1.83519572e-02 -8.26636627e-02  3.74777131e-02
 -1.35031873e-02  4.23839092e-02 -1.67709719e-02  1.74232423e-02
 -2.67837513e-02  7.65299946e-02  3.87821123e-02 -1.02369511e-03
 -2.17276085e-02 -1.74816083e-02 -1.45403789e-02 -6.69238996e-03
  4.47100140e-02 -2.53509264e-02  4.56610508e-02 -6.08546883e-02
 -6.69097379e-02  1.63039844e-02 -2.22885571e-02 -2.34197397e-02
 -4.05290388e-02 -1.72726400e-02 -2.80570835e-02  3.63669284e-02
  1.73077546e-02  4.79086451e-02 -8.15229565e-02 -1.61334723e-02
  2.18898267e-03 -4.94049070e-03 -1.15613215e-01  3.60908359e-02
 -6.69658650e-03  1.12411845e-02  3.62104028e-02  1.01159303e-03
 -4.39503640e-02  2.40537990e-02  3.40069383e-02  4.11919914e-02
  8.86759236e-02  7.56868944e-02  2.73296237e-02 -4.59434167e-02
  5.61704934e-02 -4.94265184e-02  2.41842251e-02  1.17366994e-02
  2.59494465e-02 -1.54539598e-02 -7.36104548e-02  1.67265944e-02
 -3.21992710e-02  7.86618665e-02  3.04631237e-02 -2.18574703e-02
  1.49496016e-03 -6

In [None]:
# with open('./crop_paths.txt','rb') as file:
#     crop_paths=pickle.load(file)
print(len(crop_paths))
print(len(embeddings))
print(crop_paths[-1])
embeddings_dict = dict(zip(crop_paths,embeddings))
print(list(embeddings_dict.keys())[-1])

with open('./embed_to_cls.pkl','wb') as file:
    pickle.dump(embeddings_dict,file)

13231
13231
../data/lfw/lfw_cropped/Zydrunas_Ilgauskas/Zydrunas_Ilgauskas_0001.jpg
../data/lfw/lfw_cropped/Zydrunas_Ilgauskas/Zydrunas_Ilgauskas_0001.jpg


In [1]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training, extract_face
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler, SequentialSampler
from torchvision import datasets, transforms
import numpy as np
import os
import pickle,pdb

data_dir = '../data/lfw/lfw'
pairs_path = '../data/lfw/pairs.txt'

batch_size = 2
epochs = 15
workers = 0 if os.name == 'nt' else 8

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('在该设备上运行: {}'.format(device))

with open('./embed_to_cls.pkl','rb') as file:
    embeddings_dict=pickle.load(file)
# print(list(embeddings_dict.keys())[-1])
print(len(list(embeddings_dict.keys())))

在该设备上运行: cpu
13231


#### 使用距离度量评估嵌入，在官方LFW测试集上执行验证。

下一个块中的函数是从`facenet.src.lfw`复制粘贴的。不幸的是，该模块具有从`facenet`绝对导入的绝对导入，因此无法从子模块导入

添加了返回假阳性和假阴性的功能。

In [2]:
from sklearn.model_selection import KFold
from scipy import interpolate
import math

# 以下是从David Sandberg的FaceNet实现中提取的LFW函数
def distance(embeddings1, embeddings2, distance_metric=0):
    if distance_metric==0:
        # Euclidian distance
        diff = np.subtract(embeddings1, embeddings2)
        dist = np.sum(np.square(diff),1)
    elif distance_metric==1:
        # 基于余弦相似度的距离
        dot = np.sum(np.multiply(embeddings1, embeddings2), axis=1)
        norm = np.linalg.norm(embeddings1, axis=1) * np.linalg.norm(embeddings2, axis=1)
        similarity = dot / norm
        dist = np.arccos(similarity) / math.pi
    else:
        raise 'Undefined distance metric %d' % distance_metric

    return dist

# 根据识别结果计算ROC curve
def calculate_roc(thresholds, embeddings1, embeddings2, actual_issame, nrof_folds=10, distance_metric=0, subtract_mean=False):
    assert(embeddings1.shape[0] == embeddings2.shape[0])
    assert(embeddings1.shape[1] == embeddings2.shape[1])
    nrof_pairs = min(len(actual_issame), embeddings1.shape[0])
    nrof_thresholds = len(thresholds)
    k_fold = KFold(n_splits=nrof_folds, shuffle=False)

    tprs = np.zeros((nrof_folds,nrof_thresholds))
    fprs = np.zeros((nrof_folds,nrof_thresholds))
    accuracy = np.zeros((nrof_folds))

    is_false_positive = []
    is_false_negative = []

    indices = np.arange(nrof_pairs)

    for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)):
        if subtract_mean:
            mean = np.mean(np.concatenate([embeddings1[train_set], embeddings2[train_set]]), axis=0)
        else:
          mean = 0.0
        dist = distance(embeddings1-mean, embeddings2-mean, distance_metric)

        # 寻找折叠的最佳阈值
        acc_train = np.zeros((nrof_thresholds))
        for threshold_idx, threshold in enumerate(thresholds):
            _, _, acc_train[threshold_idx], _ ,_ = calculate_accuracy(threshold, dist[train_set], actual_issame[train_set])
        best_threshold_index = np.argmax(acc_train)
        for threshold_idx, threshold in enumerate(thresholds):
            tprs[fold_idx,threshold_idx], fprs[fold_idx,threshold_idx], _, _, _ = calculate_accuracy(threshold, dist[test_set], actual_issame[test_set])
        _, _, accuracy[fold_idx], is_fp, is_fn = calculate_accuracy(thresholds[best_threshold_index], dist[test_set], actual_issame[test_set])

        tpr = np.mean(tprs,0)
        fpr = np.mean(fprs,0)
        is_false_positive.extend(is_fp)
        is_false_negative.extend(is_fn)

    return tpr, fpr, accuracy, is_false_positive, is_false_negative

# 根据实际标签(actual_isame)与预测标签(predict_issame)计算准确率
def calculate_accuracy(threshold, dist, actual_issame):
    predict_issame = np.less(dist, threshold)
    tp = np.sum(np.logical_and(predict_issame, actual_issame))
    fp = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame)))
    tn = np.sum(np.logical_and(np.logical_not(predict_issame), np.logical_not(actual_issame)))
    fn = np.sum(np.logical_and(np.logical_not(predict_issame), actual_issame))

    is_fp = np.logical_and(predict_issame, np.logical_not(actual_issame))
    is_fn = np.logical_and(np.logical_not(predict_issame), actual_issame)

    tpr = 0 if (tp+fn==0) else float(tp) / float(tp+fn)
    fpr = 0 if (fp+tn==0) else float(fp) / float(fp+tn)
    acc = float(tp+tn)/dist.size
    return tpr, fpr, acc, is_fp, is_fn

def calculate_val(thresholds, embeddings1, embeddings2, actual_issame, far_target, nrof_folds=10, distance_metric=0, subtract_mean=False):
    assert(embeddings1.shape[0] == embeddings2.shape[0])
    assert(embeddings1.shape[1] == embeddings2.shape[1])
    nrof_pairs = min(len(actual_issame), embeddings1.shape[0])
    nrof_thresholds = len(thresholds)
    k_fold = KFold(n_splits=nrof_folds, shuffle=False)

    val = np.zeros(nrof_folds)
    far = np.zeros(nrof_folds)

    indices = np.arange(nrof_pairs)

    for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)):
        if subtract_mean:
            mean = np.mean(np.concatenate([embeddings1[train_set], embeddings2[train_set]]), axis=0)
        else:
          mean = 0.0
        dist = distance(embeddings1-mean, embeddings2-mean, distance_metric)

        # 找到使FAR = far_target的阈值
        far_train = np.zeros(nrof_thresholds)
        for threshold_idx, threshold in enumerate(thresholds):
            _, far_train[threshold_idx] = calculate_val_far(threshold, dist[train_set], actual_issame[train_set])
        if np.max(far_train)>=far_target:
            # print(far_train)
            # print(len(far_train))
            # print(far_train[:5])
            # print(len(thresholds))
            # print(thresholds)
            # pdb.set_trace()
            # f = interpolate.interp1d(far_train, thresholds, kind='slinear')     # 应该是误识率到 门限的函数，但y=f(x),x不可有重复值，
            # threshold = f(far_target)
            threshold =4.0 *(far_target-np.min(far_train))/(np.max(far_train)-np.min(far_train))
        else:
            threshold = 0.0
        # 计算当前fold的正确率与误识率
        val[fold_idx], far[fold_idx] = calculate_val_far(threshold, dist[test_set], actual_issame[test_set])

    val_mean = np.mean(val)
    far_mean = np.mean(far)
    val_std = np.std(val)
    return val_mean, val_std, far_mean

 # 计算正确率(val)与误识率(far,判错的占不一样的的占比)
def calculate_val_far(threshold, dist, actual_issame):
    predict_issame = np.less(dist, threshold)           # 第一元素<第二元素，小于门限判为同类
    true_accept = np.sum(np.logical_and(predict_issame, actual_issame))
    false_accept = np.sum(np.logical_and(predict_issame, np.logical_not(actual_issame)))    #测为相同但实际不同的样本
    n_same = np.sum(actual_issame)                          
    n_diff = np.sum(np.logical_not(actual_issame))          
    val = float(true_accept) / float(n_same)    
    far = float(false_accept) / float(n_diff)   
    return val, far


def evaluate(embeddings, actual_issame, nrof_folds=10, distance_metric=0, subtract_mean=False):
    # 计算评估指标
    thresholds = np.arange(0, 4, 0.01)
    embeddings1 = embeddings[0::2]
    embeddings2 = embeddings[1::2]
    tpr, fpr, accuracy, fp, fn  = calculate_roc(thresholds, embeddings1, embeddings2,
        np.asarray(actual_issame), nrof_folds=nrof_folds, distance_metric=distance_metric, subtract_mean=subtract_mean)
    thresholds = np.arange(0, 4, 0.001)
    val, val_std, far = calculate_val(thresholds, embeddings1, embeddings2,
        np.asarray(actual_issame), 1e-3, nrof_folds=nrof_folds, distance_metric=distance_metric, subtract_mean=subtract_mean)
    return tpr, fpr, accuracy, val, val_std, far, fp, fn

def add_extension(path):
    if os.path.exists(path+'.jpg'):
        return path+'.jpg'
    elif os.path.exists(path+'.png'):
        return path+'.png'
    else:
        raise RuntimeError('No file "%s" with extension png or jpg.' % path)

def get_paths(lfw_dir, pairs):
    nrof_skipped_pairs = 0
    path_list = []
    issame_list = []
    for pair in pairs:  # pair=(str,str,str)
        # print(f"len={len(pair)}")
        path0=lfw_dir+'/'+pair[0]
        path1=lfw_dir+'/'+pair[1]
        # print(os.path.exists(path0))
        # print(type(pair[2]))      

        # if len(pair) == 3:
        if pair[2]=="0":
            # path0 = add_extension(os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[1])))
            # path1 = add_extension(os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[2])))
            issame = True
        # elif len(pair) == 4:
        elif pair[2]=="1":
            # path0 = add_extension(os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[1])))
            # path1 = add_extension(os.path.join(lfw_dir, pair[2], pair[2] + '_' + '%04d' % int(pair[3])))
            issame = False
        # print(issame)
        if os.path.exists(path0) and os.path.exists(path1):    # 仅在两个路径都存在时添加配对
            # epath_list += (path0,path1)
            path_list.append(path0)
            path_list.append(path1)
            issame_list.append(issame)
        else:
            nrof_skipped_pairs += 1
    if nrof_skipped_pairs>0:
        print('跳过 %d 个图像对' % nrof_skipped_pairs)

    return path_list, issame_list

def read_pairs(pairs_filename):
    pairs = []
    with open(pairs_filename, 'r') as f:
        for line in f.readlines()[1:]:
            pair = line.strip().split()
            pairs.append(pair)
    return np.array(pairs, dtype=object)

In [3]:
pairs = read_pairs(pairs_path)
path_list, issame_list = get_paths(data_dir+'_cropped', pairs)
print(f"len of path_list={len(path_list)}")
print(f"len of issame_list={len(issame_list)}")
print(len(list(embeddings_dict.keys())))
print(len(path_list))

# embeddings_dict是裁剪图像 到 嵌入式向量 的映射
# path是pair_txt中出现的裁剪图像中的图片
embeddings = np.array([embeddings_dict[path] for path in path_list if path in embeddings_dict.keys() ])
missed=[path for path in path_list if path not in embeddings_dict.keys()]
print(missed)               # the last two is missed
img_path_list=list(embeddings_dict.keys())  
print("../data/lfw/lfw_cropped/Yasser_Arafat/Yasser_Arafat_0003.jpg" in img_path_list)
print("../data/lfw/lfw_cropped/Yasser_Arafat/Yasser_Arafat_0004.jpg" in img_path_list)
print("../data/lfw/lfw_cropped/Yasser_Arafat/Yasser_Arafat_0003.jpg" in path_list)
print("../data/lfw/lfw_cropped/Yasser_Arafat/Yasser_Arafat_0004.jpg" in path_list)
print("../data/lfw/lfw_cropped/Yasser_Arafat/Yasser_Arafat_0005.jpg" in path_list)
print(len(embeddings))

tpr, fpr, accuracy, val, val_std, far, fp, fn = evaluate(embeddings, issame_list)

跳过 5 个图像对
len of path_list=11988
len of issame_list=5994
13231
11988
[]
True
False
True
False
False
11988


ZeroDivisionError: float division by zero

In [None]:
print(accuracy)
np.mean(accuracy)


[0.995      0.995      0.99166667 0.99       0.99       0.99666667
 0.99       0.995      0.99666667 0.99666667]


0.9936666666666666