# pip

In [2]:
# !pip install albumentations
# !pip install torchsummary
# !pip install matplotlib

# import

In [6]:
import os
import cv2
import glob
import time
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils import data
from torch.nn import Parameter
import torch.nn.functional as F
from torch.nn import DataParallel
from torch.optim.lr_scheduler import StepLR
import torchvision
from torchvision import datasets, transforms
from PIL import Image
import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch import ToTensorV2
from retinaface import RetinaFace

# Settings

In [3]:
seed = 99
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True

use_cuda = torch.cuda.is_available()  
device = torch.device("cuda" if usae_cuda else "cpu")
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} 
print("set vars and device done")
print(use_cuda)

In [3]:
class Config(object):
    env = 'default'
    classify = 'softmax'
    num_classes = 400 
    metric = 'arc_margin'
    easy_margin = False
    use_se = False
    loss = 'arcface_loss'

    display = False
    finetune = False # True 

    train_root = 'data/face_train'
    valid_root = 'data/face_valid'
    valid_list = 'data/pair.txt'
    
    checkpoints_path = 'checkpoints'
    load_model_path = 'models/pretrain_model.pt'
    test_model_path = 'set_your_pt.pt' 
    save_interval = 10 

    train_batch_size = 32
    test_batch_size = 10

    input_shape = (3, 112, 112)

    optimizer = 'adam' #'sgd'

    use_gpu = True 
    gpu_id = '0, 1, 2, 3'
    num_workers = 4  
    print_freq = 100
    max_epoch = 100
    lr = 1e-1 # 1e-4
    lr_step = 50
    lr_decay = 0.95
    weight_decay = 5e-4

In [None]:
opt = Config()

# DataLoader

In [8]:
train_image_path = sorted(glob.glob(f'{opt.train_root}/*/*.jpg'))
valid_image_path = sorted(glob.glob(f'{opt.valid_root}/**/*.jpg'))

image = cv2.imread(train_image_path[0], cv2.IMREAD_COLOR)
plt.imshow(image)

In [89]:
class DataLoader():
    def __init__(self, image_path, augmentation=None):
        self.image_path = image_path
        self.augmentation = augmentation

    def __len__(self):
        return len(self.image_path)
        
    def __getitem__(self, idx):
        label = int(self.image_path[idx].split('/')[3]) # set a personal id
            
        image = cv2.imread(self.image_path[idx], cv2.IMREAD_COLOR)        
        
        if self.augmentation:
            sample = {'image':image}
            sample = self.augmentation(**sample)
            image = sample['image']

        return image, label

In [90]:
train_transform = A.Compose([
                 A.HorizontalFlip(p=0.7),
                 A.Resize(112,112),
                 #A.OneOf([
                  #   A.HorizontalFlip(p=0.7),
                   #  A.ShiftScaleRotate(p=0.5)
                 #], p=1),
                 #A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
                 #A.Normalize((0.643025, 0.49486318, 0.424703), (0.20040053, 0.17854902, 0.16714329)),
                 ToTensorV2()
])

valid_transform = A.Compose([
                 A.Resize(112,112),
                #A.Normalize((0.643025, 0.49486318, 0.424703), (0.20040053, 0.17854902, 0.16714329)),
                ToTensorV2() 
])  

In [91]:
train_dataset = DataLoader(train_image_path, augmentation = train_transform)
valid_dataset = DataLoader(valid_image_path, augmentation = valid_transform)

train_loader = torch.utils.data.DataLoader(train_dataset, opt.train_batch_size, shuffle=True, drop_last = True)
valid_loader = torch.utils.data.DataLoader(valid_dataset, opt.test_batch_size , shuffle=False)

# Loss function

In [93]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=0, eps=1e-7): 
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.eps = eps
        self.ce = torch.nn.CrossEntropyLoss()

    def forward(self, input, target):
        logp = self.ce(input, target)
        p = torch.exp(-logp)
        loss = (1 - p) ** self.gamma * logp
        return loss.mean()
    
class ArcFaceLoss(nn.Module):
    def __init__(self, in_features, out_features, s=30.0, m=0.3): # s=30, m=0.5
        super().__init__()
        self.margin = ArcMarginProduct(in_features, out_features, s=s, m=m)
        self.cross_entropy = nn.CrossEntropyLoss()

    def forward(self, x, labels):
        cosine = self.margin(x, labels)
        logits = cosine / self.margin.s
        loss = self.cross_entropy(logits, labels)
        return loss
    
class ArcFocalLoss(nn.Module):
    def __init__(self, gamma=2.0, eps=1e-7, s=30.0):
        super(ArcFocalLoss, self).__init__()
        self.gamma = gamma
        self.eps = eps
        self.s = s

    def forward(self, logits, labels):
        # Arcface loss
        cos_theta = logits
        sin_theta = torch.sqrt(1 - cos_theta ** 2)
        phi_theta = cos_theta * self.s
        log_probs = F.log_softmax(phi_theta, dim=-1)
        one_hot_labels = F.one_hot(labels, num_classes=logits.size(-1))
        arc_loss = -one_hot_labels * log_probs
        arc_loss = arc_loss.sum(dim=-1)

        # Focal loss
        probs = F.softmax(logits, dim=-1)
        pt = probs * one_hot_labels + (1 - probs) * (1 - one_hot_labels)
        w = (1 - pt).pow(self.gamma)
        focal_loss = -w * torch.log(pt + self.eps)
        focal_loss = focal_loss.sum(dim=-1)
        loss = arc_loss * focal_loss
        loss = loss.mean()

        return loss

# Metrics

In [94]:
class ArcMarginProduct(nn.Module):
    r"""Implement of large margin arc distance: :
        Args:
            in_features: size of each input sample
            out_features: size of each output sample
            s: norm of input feature
            m: margin
            cos(theta + m)
        """
    def __init__(self, in_features, out_features, s=30.0, m=0.3, easy_margin=False): 
        super(ArcMarginProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, input, label):
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        input = input.to(device)
        self.weight = nn.Parameter(torch.randn(512,512).cuda())
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))
        sine = torch.sqrt((1.0 - torch.pow(cosine, 2)).clamp(0, 1))
        phi = cosine * self.cos_m - sine * self.sin_m
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)
        # --------------------------- convert label to one-hot ---------------------------
        one_hot = torch.zeros(cosine.size(), device='cuda')
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)
        # -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # you can use torch.where if your torch.__version__ is 0.4
        output *= self.s
        # print(output)

        return output


class AddMarginProduct(nn.Module):
    r"""Implement of large margin cosine distance: :
    Args:
        in_features: size of each input sample
        out_features: size of each output sample
        s: norm of input feature
        m: margin
        cos(theta) - m
    """
    def __init__(self, in_features, out_features, s=30.0, m=0.40):
        super(AddMarginProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

    def forward(self, input, label):
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        input = input.to(device)
        self.weight = nn.Parameter(torch.randn(512,512).cuda())
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))
        phi = cosine - self.m
        # --------------------------- convert label to one-hot ---------------------------
        one_hot = torch.zeros(cosine.size(), device='cuda')
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)
        # -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # you can use torch.where if your torch.__version__ is 0.4
        output *= self.s
        # print(output)

        return output

    def __repr__(self):
        return self.__class__.__name__ + '(' \
               + 'in_features=' + str(self.in_features) \
               + ', out_features=' + str(self.out_features) \
               + ', s=' + str(self.s) \
               + ', m=' + str(self.m) + ')'


class SphereProduct(nn.Module):
    r"""Implement of large margin cosine distance: :
    Args:
        in_features: size of each input sample
        out_features: size of each output sample
        m: margin
        cos(m*theta)
    """
    def __init__(self, in_features, out_features, m=4):
        super(SphereProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.m = m
        self.base = 1000.0
        self.gamma = 0.12
        self.power = 1
        self.LambdaMin = 5.0
        self.iter = 0
        self.weight = Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform(self.weight)

        # duplication formula
        self.mlambda = [
            lambda x: x ** 0,
            lambda x: x ** 1,
            lambda x: 2 * x ** 2 - 1,
            lambda x: 4 * x ** 3 - 3 * x,
            lambda x: 8 * x ** 4 - 8 * x ** 2 + 1,
            lambda x: 16 * x ** 5 - 20 * x ** 3 + 5 * x
        ]

    def forward(self, input, label):
        # lambda = max(lambda_min,base*(1+gamma*iteration)^(-power))
        self.iter += 1
        self.lamb = max(self.LambdaMin, self.base * (1 + self.gamma * self.iter) ** (-1 * self.power))

        # --------------------------- cos(theta) & phi(theta) ---------------------------
        cos_theta = F.linear(F.normalize(input), F.normalize(self.weight))
        cos_theta = cos_theta.clamp(-1, 1)
        cos_m_theta = self.mlambda[self.m](cos_theta)
        theta = cos_theta.data.acos()
        k = (self.m * theta / 3.14159265).floor()
        phi_theta = ((-1.0) ** k) * cos_m_theta - 2 * k
        NormOfFeature = torch.norm(input, 2, 1)

        # --------------------------- convert label to one-hot ---------------------------
        one_hot = torch.zeros(cos_theta.size())
        one_hot = one_hot.cuda() if cos_theta.is_cuda else one_hot
        one_hot.scatter_(1, label.view(-1, 1), 1)

        # --------------------------- Calculate output ---------------------------
        output = (one_hot * (phi_theta - cos_theta) / (1 + self.lamb)) + cos_theta
        output *= NormOfFeature.view(-1, 1)

        return output

    def __repr__(self):
        return self.__class__.__name__ + '(' \
               + 'in_features=' + str(self.in_features) \
               + ', out_features=' + str(self.out_features) \
               + ', m=' + str(self.m) + ')'

# Train

In [6]:
# import torch, gc
# gc.collect()
# torch.cuda.empty_cache()

def save_model(model, save_path, name, iter_cnt):
    save_name = os.path.join(save_path, name + '_' + str(iter_cnt) + '.pt')
    torch.save(model.state_dict(), save_name)
    return save_name

if __name__ == '__main__':
    
    # Set a loss function
    if opt.loss == 'focal_loss':
        criterion = FocalLoss(gamma=2)
    elif opt.loss == 'arcface_loss':
          criterion = ArcFaceLoss(512, 400)
    else:
        criterion = torch.nn.CrossEntropyLoss()

    # Set a metric
    if opt.metric == 'add_margin':
        metric_fc = AddMarginProduct(512, opt.num_classes, s=30, m=0.35)
    elif opt.metric == 'arc_margin':
        metric_fc = ArcMarginProduct(512, opt.num_classes, s=30, m=0.5, easy_margin=opt.easy_margin)
    elif opt.metric == 'sphere':
        metric_fc = SphereProduct(512, opt.num_classes, m=4)
    else:
        metric_fc = nn.Linear(512, opt.num_classes)

    # Use a pretrain model's weight
    ckpt = torch.load(opt.load_model_path, map_location=device)  # load checkpoints
    model = ckpt['backbone'].to(device)
    
    # Set an optimizer
    if opt.optimizer == 'sgd':
        optimizer = torch.optim.SGD([{'params': model.parameters()}, {'params': metric_fc.parameters()}],
                                    lr=opt.lr, weight_decay=opt.weight_decay)
    else:
        optimizer = torch.optim.Adam([{'params': model.parameters()}, {'params': metric_fc.parameters()}],
                                     lr=opt.lr, weight_decay=opt.weight_decay)
    scheduler = StepLR(optimizer, step_size=opt.lr_step, gamma=0.1)

    # Start training
    print('{} train iters per epoch:'.format(len(train_loader)))
    
    start = time.time()
    for i in range(opt.max_epoch):
        model.train()
        for ii, (data, label) in enumerate(train_loader):
            data, label = data.type(torch.float32).to(device), torch.tensor(list(label)).to(device)
            
            # Get a feature embedding
            feature = model(data)
            
            # Compute a similarity
            output = metric_fc(feature, label)
            
            loss = criterion(output, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            iters = i * len(train_loader) + ii

            if iters % opt.print_freq == 0:
                output = output.data.cpu().numpy()
                output = np.argmax(output, axis=1)
                label = label.data.cpu().numpy()
                
                # print(output, label)
                    
                acc = np.mean((output == label).astype(int))
                speed = opt.print_freq / (time.time() - start)
                time_str = time.asctime(time.localtime(time.time()))
                print('epoch {} | {} iter {} {} iters/s | loss {} | acc {}'.format(time_str, i, ii, speed, loss.item(), acc))

                start = time.time()
    
        scheduler.step()
        
        if i % opt.save_interval == 0 or i == opt.max_epoch:
            save_model(model, opt.checkpoints_path, opt.backbone, i)

        model.eval()

# Test

In [7]:
# Return a list that stores only the unique identifiers of the comparison subjects
def get_face_path(pair_list):
    with open(pair_list, 'r') as fd:
        pairs = fd.readlines()
    img_path = []
    for pair in pairs:
        splits = pair.split()
        
        if splits[0] not in img_path:
            img_path.append(splits[0])
            
        if splits[1] not in img_path:
            img_path.append(splits[1])
            
    return img_path

# Return a list that stores only the unique identifiers of the comparison subjects
def get_face_list(pair_list):
    with open(pair_list, 'r') as fd:
        pairs = fd.readlines()
    data_list = []
    for pair in pairs:
        splits = pair.split()

        if splits[0].split('/')[3] not in data_list:
            data_list.append(splits[0].split('/')[3])

        if splits[1].split('/')[3] not in data_list:
            data_list.append(splits[1].split('/')[3])
            
    return data_list

def load_image(img_path):
    image = cv2.imread(img_path)
    
    if image is None:
        return None
    
    image = cv2.resize(image, (112,112)) # Adjust the input size for the model
    image = image.transpose((2, 0, 1))
    image = image[np.newaxis, :, :, :]
    image = image.astype(np.float32, copy=False)
    image -= 127.5 
    image /= 127.5
    
    return image

def get_features(model, img_paths, batch_size=10):
    images = None
    features = None
    cnt = 0
    for i, path in enumerate(img_paths):
        image = load_image(path)
        if image is None:
            print('read {} error'.format(path))

        if images is None:
            images = image
        else:
            images = np.concatenate((images, image), axis=0)

        if images.shape[0] % batch_size == 0 or i == len(img_paths) - 1:
            cnt += 1
            
            data = torch.from_numpy(images)
            data = data.to(torch.device("cuda"))
            output = model(data)
            output = output.data.cpu().numpy()

            feature1 = output[::2]
            feature2 = output[1::2]
            feature = np.hstack((feature1, feature2))

            if features is None:
                features = feature
            else:
                features = np.vstack((features, feature))

            images = None
  
    return features, cnt

# Return a dictionary in the format {individual unique identifier: individual embedding vector}
def get_feature_dict(test_list, features): 
    fe_dict = {}
    for i, each in enumerate(test_list):
        fe_dict[each] = features[i]
    return fe_dict

# Measure similarity between two images and return accuracy
def test_performance(fe_dict, pair_list):
    with open(pair_list, 'r') as fd:
        pairs = fd.readlines()

    sims = []
    labels = []
    
    right = 0
    wrong = 0
    
    same_cnt = 0
    same_th = 0
    diff_cnt = 0
    diff_th = 0
    for pair in pairs:
        splits = pair.split()
        feature1 = fe_dict[splits[0].split('/')[3]]
        feature2 = fe_dict[splits[1].split('/')[3]]
        label = int(splits[2])
        
        sim = cosin_similarity(feature1, feature2)
        
        if sim > 0.89 and label:
            right += 1            
        if sim > 0.89 and not label:
            wrong += 1
        if not label and sim < 0.89:
            right += 1
            
        if label == 0:
            diff_cnt +=1
            diff_th += sim
            
        if label == 1:
            same_cnt +=1
            same_th += sim
            
        sims.append(sim)
        labels.append(label)
        
    acc = cal_accuracy(sims, labels)
    
    print("valid acc: ",right / (right + wrong) * 100)
    
    return acc

def cal_accuracy(y_score, y_true):
    y_score = np.asarray(y_score)
    y_true = np.asarray(y_true)
    best_acc = 0
    for i in range(len(y_score)):
        th = y_score[i]
        y_test = (y_score >= th)
        acc = np.mean((y_test == y_true).astype(int))
        if acc > best_acc:
            best_acc = acc

    return best_acc

def cosin_similarity(x1, x2):
    return np.dot(x1, x2) / (np.linalg.norm(x1) * np.linalg.norm(x2)

def test(model, img_paths, identity_list, compair_list, batch_size):
    s = time.time()
    features, cnt = get_features(model, img_paths, batch_size=batch_size)
    print(features.shape)
    t = time.time() - s
    print('total time is {}, average time is {}'.format(t, t / cnt))
    fe_dict = get_feature_dict(identity_list, features) 
    acc= test_performance(fe_dict, compair_list)
    return acc

if __name__ == '__main__':
    opt = Config()
    
    ckpt = torch.load(opt.test_model_path, map_location=device)  
    model = ckpt['backbone'].to(device)
    model = DataParallel(model)

    identity_list = get_face_list(opt.valid_list) 
    img_paths = get_face_path(opt.valid_list)

    model.eval()
    test(model, img_paths, identity_list, opt.valid_list, opt.test_batch_size)

# Get the someones' features

In [74]:
def get_feature(model, image, batch_size=1):
    image = cv2.resize(image, (112,112))
    image = image.transpose((2, 0, 1))
    image = image[np.newaxis, :, :, :]
    image = image.astype(np.float32, copy=False)
    image -= 127.5 
    image /= 127.5
    
    data = torch.from_numpy(image)
    data = data.to(torch.device("cuda"))
    feature = model(data)
    feature = feature.data.cpu().numpy()[0]

    return feature

In [75]:
ckpt = torch.load(test_model_path, map_location=device)
model = ckpt['backbone'].to(device)

In [78]:
image = cv2.imread("someone.jpg")
output = get_feature(model, image, batch_size=1)
# print(output)