# Train

In [11]:
import os
import os.path as osp

import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

from model import FaceMobileNet,myFaceMobileNet
from model.metric import ArcFace, CosFace
from model.loss import FocalLoss
from dataset import load_data
from config import Config as conf

import numpy as np
from PIL import Image
from ptflops import get_model_complexity_info

In [12]:
os.chdir("D:\Course\Graduate1\Data_Science\hw\hw5")

In [13]:
# Data Setup
dataloader, class_num = load_data(conf, training=True)
embedding_size = conf.embedding_size
device = conf.device

In [14]:
# Network Setup
net = FaceMobileNet(embedding_size).to(device)

if conf.metric == 'arcface':
    metric = ArcFace(embedding_size, class_num).to(device)
else:
    metric = CosFace(embedding_size, class_num).to(device)

net = nn.DataParallel(net)
metric = nn.DataParallel(metric)

In [15]:
# Training Setup
if conf.loss == 'focal_loss':
    criterion = FocalLoss(gamma=2)
else:
    criterion = nn.CrossEntropyLoss()

if conf.optimizer == 'sgd':
    optimizer = optim.SGD([{'params': net.parameters()}, {'params': metric.parameters()}], 
                            lr=conf.lr, weight_decay=conf.weight_decay)
else:
    optimizer = optim.Adam([{'params': net.parameters()}, {'params': metric.parameters()}],
                            lr=conf.lr, weight_decay=conf.weight_decay)

scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=conf.lr_step, gamma=0.1)

In [16]:
# Checkpoints Setup
os.makedirs(conf.checkpoints, exist_ok=True)

In [7]:
# Start training
net.train()

#Test at the same time
images = unique_image(conf.test_list)
images = [osp.join(conf.test_root, img) for img in images]
groups = group_image(images, conf.test_batch_size)
feature_dict = dict()

for e in range(conf.epoch):
    #if e<=20:
    #    continue
    for data, labels in tqdm(dataloader, desc=f"Epoch {e}/{conf.epoch}",
                             ascii=True, total=len(dataloader)):
        data = data.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        embeddings = net(data)
        thetas = metric(embeddings, labels)
        loss = criterion(thetas, labels)
        loss.backward()
        optimizer.step()

    print(f"Epoch {e}/{conf.epoch}, Loss: {loss}")
    # Test
    net.eval()
    feature_dict = dict()
    for group in groups:
        d = featurize(group, conf.test_transform, net, conf.device,low_light = False)
        feature_dict.update(d) 
    accuracy, threshold = compute_accuracy(feature_dict, conf.test_list, conf.test_root) 

    print(
        f"Test Model: FMN_{e}_sample{int(conf.train_sample_rate*100)}%.pth\n",
        f"Accuracy: {accuracy:.3f}\n"
        f"Threshold: {threshold:.3f}\n"
    )
    net.train()
    backbone_path = osp.join(conf.checkpoints, f"myFMN_{e}_sample{int(conf.train_sample_rate*100)}%.pth")
    torch.save(net.state_dict(), backbone_path)
    scheduler.step()

Epoch 0/30:  14%|#3        | 977/7119 [02:44<17:15,  5.93it/s]  


KeyboardInterrupt: 

# Test

In [17]:
def unique_image(pair_list) -> set:
    """Return unique image path in pair_list.txt"""
    with open(pair_list, 'r') as fd:
        pairs = fd.readlines()
    unique = set()
    for pair in pairs:
        id1, id2, _ = pair.split()
        unique.add(id1)
        unique.add(id2)
    return unique

In [18]:
def group_image(images: set, batch) -> list:
    """Group image paths by batch size"""
    images = list(images)
    size = len(images)
    res = []
    for i in range(0, size, batch):
        end = min(batch + i, size)
        res.append(images[i : end])
    return res

In [19]:
def _preprocess(images: list, transform, low_light = False) -> torch.Tensor:
    res = []
    for img in images:
        im = Image.open(img)
        if low_light:
            im = (np.sqrt (im)*2).astype(np.uint8)
            im = Image.fromarray(im)
        im = transform(im)
        res.append(im)
    data = torch.cat(res, dim=0)  # shape: (batch, 128, 128)
    data = data[:, None, :, :]    # shape: (batch, 1, 128, 128)
    return data

In [20]:
def featurize(images: list, transform, net, device,low_light = False) -> dict:
    """featurize each image and save into a dictionary
    Args:
        images: image paths
        transform: test transform
        net: pretrained model
        device: cpu or cuda
    Returns:
        Dict (key: imagePath, value: feature)
    """
    data = _preprocess(images, transform,low_light = low_light)
    data = data.to(device)
    net = net.to(device)
    with torch.no_grad():
        features = net(data) 
    res = {img: feature for (img, feature) in zip(images, features)}
    return res

In [21]:
def cosin_metric(x1, x2):
    return np.dot(x1, x2) / (np.linalg.norm(x1) * np.linalg.norm(x2))

In [22]:
def threshold_search(y_score, y_true):
    y_score = np.asarray(y_score)
    y_true = np.asarray(y_true)
    best_acc = 0
    best_th = 0
    for i in range(len(y_score)):
        th = y_score[i]
        y_test = (y_score >= th)
        acc = np.mean((y_test == y_true).astype(int))
        if acc > best_acc:
            best_acc = acc
            best_th = th
    return best_acc, best_th

In [23]:
def compute_accuracy(feature_dict, pair_list, test_root):
    with open(pair_list, 'r') as f:
        pairs = f.readlines()

    similarities = []
    labels = []
    for pair in pairs:
        img1, img2, label = pair.split()
        img1 = osp.join(test_root, img1)
        img2 = osp.join(test_root, img2)
        feature1 = feature_dict[img1].cpu().numpy()
        feature2 = feature_dict[img2].cpu().numpy()
        label = int(label)

        similarity = cosin_metric(feature1, feature2)
        similarities.append(similarity)
        labels.append(label)

    accuracy, threshold = threshold_search(similarities, labels)
    return accuracy, threshold

In [9]:
test_model = "checkpoints/26.pth" #conf.test_model
model = FaceMobileNet(conf.embedding_size)
model = nn.DataParallel(model)
model.load_state_dict(torch.load(test_model, map_location=conf.device))
model.eval()

images = unique_image(conf.test_list)
images = [osp.join(conf.test_root, img) for img in images]
groups = group_image(images, conf.test_batch_size)

feature_dict = dict()
for group in groups:
    d = featurize(group, conf.test_transform, model, conf.device,low_light = False)
    feature_dict.update(d) 
accuracy, threshold = compute_accuracy(feature_dict, conf.test_list, conf.test_root) 

print(
    f"Test Model: {test_model}\n"
    f"Accuracy: {accuracy:.3f}\n"
    f"Threshold: {threshold:.3f}\n"
)

Test Model: checkpoints/26.pth
Accuracy: 0.945
Threshold: 0.308



In [10]:
for i in range(30):
    test_model = "checkpoints/"+str(i)+".pth" #conf.test_model
    model = FaceMobileNet(conf.embedding_size)
    model = nn.DataParallel(model)
    model.load_state_dict(torch.load(test_model, map_location=conf.device))
    model.eval()

    images = unique_image(conf.test_list)
    images = [osp.join(conf.test_root, img) for img in images]
    groups = group_image(images, conf.test_batch_size)

    feature_dict = dict()
    for group in groups:
        d = featurize(group, conf.test_transform, model, conf.device,low_light = False)
        feature_dict.update(d) 
    accuracy, threshold = compute_accuracy(feature_dict, conf.test_list, conf.test_root) 

    print(
        f"Test Model: {test_model}\n"
        f"Accuracy: {accuracy:.3f}\n"
        f"Threshold: {threshold:.3f}\n"
    )

Test Model: checkpoints/0.pth
Accuracy: 0.846
Threshold: 0.445

Test Model: checkpoints/1.pth
Accuracy: 0.883
Threshold: 0.422

Test Model: checkpoints/2.pth
Accuracy: 0.886
Threshold: 0.372

Test Model: checkpoints/3.pth
Accuracy: 0.889
Threshold: 0.393

Test Model: checkpoints/4.pth
Accuracy: 0.906
Threshold: 0.341

Test Model: checkpoints/5.pth
Accuracy: 0.904
Threshold: 0.396

Test Model: checkpoints/6.pth
Accuracy: 0.904
Threshold: 0.347

Test Model: checkpoints/7.pth
Accuracy: 0.876
Threshold: 0.453

Test Model: checkpoints/8.pth
Accuracy: 0.912
Threshold: 0.373

Test Model: checkpoints/9.pth
Accuracy: 0.918
Threshold: 0.377

Test Model: checkpoints/10.pth
Accuracy: 0.940
Threshold: 0.308

Test Model: checkpoints/11.pth
Accuracy: 0.939
Threshold: 0.290

Test Model: checkpoints/12.pth
Accuracy: 0.939
Threshold: 0.320

Test Model: checkpoints/13.pth
Accuracy: 0.939
Threshold: 0.291

Test Model: checkpoints/14.pth
Accuracy: 0.939
Threshold: 0.303

Test Model: checkpoints/15.pth
Accu

# Get #params and FLOPs

In [24]:
with torch.cuda.device(0):
    net = model
    macs, params = get_model_complexity_info(net, (1,128,128), as_strings=False,
                                           print_per_layer_stat=False, verbose=True)
    print('{:<30}  {:<8}'.format('Computational complexity: ', macs*2))
    print('{:<30}  {:<8}'.format('Number of parameters: ', params))

Computational complexity:       474061824.0
Number of parameters:           1096896 


In [2]:
from model import myFaceMobileNet

In [3]:
model = myFaceMobileNet(conf.embedding_size)
model = nn.DataParallel(model)
#model.load_state_dict(torch.load(test_model, map_location=conf.device))
#model.eval()
