import library

In [5]:
import numpy as np
import torch
import torch.nn as nn
from torch.nn import Parameter, DataParallel
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms as T
import os
from PIL import Image
import pandas as pd
import math
from sklearn.model_selection import train_test_split
from tqdm import tqdm

build dataset

In [22]:
def get_data_df(identity_root, identity_list):
    path_list = []
    identity_label_list = []
    for identity in tqdm(identity_list):
        image_list = os.listdir(os.path.join(identity_root, identity)) 
        for img in image_list:
            path = os.path.join(identity_root, identity, img)
            path_list.append(path)
            identity_label_list.append(identity) 

    data_df = pd.DataFrame({'img_path': path_list, 'identity': identity_label_list})
    data_df['identity_code'] = pd.Categorical(data_df['identity']).codes # convert identity to unique code(int)
    data_df['identity_code'] = data_df['identity_code'].astype('int32')
    return data_df

In [23]:
# buid the dataframe from the the path
identity_root = 'lfw_funneled'
identity_list = os.listdir(identity_root)
identity_list = [identity for identity in identity_list if os.path.isdir(os.path.join(identity_root, identity))] #only folder is identity

# train test val split
train_identity_list, test_identity_list = train_test_split(identity_list, test_size=0.2, random_state=42)
test_identity_list, val_identity_list = train_test_split(test_identity_list, test_size=0.5, random_state=42)

# build the dataframe
train_df = get_data_df(identity_root, train_identity_list)
test_df = get_data_df(identity_root, test_identity_list)
val_df = get_data_df(identity_root, val_identity_list)
print('train: {}, val: {}, test: {}'.format(len(train_df), len(val_df), len(test_df)))

100%|██████████| 4599/4599 [00:00<00:00, 12329.65it/s]
100%|██████████| 575/575 [00:00<00:00, 12688.95it/s]
100%|██████████| 575/575 [00:00<00:00, 12632.85it/s]

train: 10220, val: 1523, test: 1489





In [25]:
class FaceDataset(Dataset):
    def __init__(self, data_df, input_shape, phase="train"):
        self.path = data_df['img_path'].values
        self.label = data_df['identity_code'].values
        self.phase = phase
        self.input_shape = input_shape
        if self.phase == 'train':
            self.transforms = T.Compose([
                T.RandomCrop(self.input_shape[1:]),
                T.RandomHorizontalFlip(),
                T.ToTensor(),
                T.Normalize(mean=[0.5], std=[0.5])
            ])
        else:   
            self.transforms = T.Compose([
                T.CenterCrop(self.input_shape[1:]),
                T.ToTensor(),
                T.Normalize(mean=[0.5], std=[0.5])
            ])
            
    def __len__(self):
        return len(self.label)
    
    def __getitem__(self, index):
        path = self.path[index]
        label = self.label[index]
        data = Image.open(path)
        data = data.convert('L')
        data = self.transforms(data)
        return data.float(), label

In [34]:
# dataloader
data_df = val_df.copy()
input_shape = (1, 250, 250)
phase = 'train'
dataset = FaceDataset(data_df, input_shape, phase)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
num_classes = len(np.unique(data_df['identity'].values))
print(num_classes)

575


In [36]:
for data, label in dataloader:
    print(data.shape)
    print(label.shape)
    break

torch.Size([32, 1, 250, 250])
torch.Size([32])


build model

In [37]:
# load pretrained model
model_resnet18 = torch.hub.load('pytorch/vision:v0.6.0', 'resnet18', weights=True)
# change first and last layer
model_resnet18.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
model_resnet18.fc = nn.Linear(512, 512)

Using cache found in C:\Users\earth/.cache\torch\hub\pytorch_vision_v0.6.0


loss function

In [38]:
class ArcMarginProduct(nn.Module):
    r"""Implement of large margin arc distance: :
        Args:
            in_features: size of each input sample
            out_features: size of each output sample
            s: norm of input feature
            m: margin

            cos(theta + m)
        """
    def __init__(self, in_features, out_features, s=30.0, m=0.50, easy_margin=False):
        super(ArcMarginProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, input, label):
        # --------------------------- cos(theta) & phi(theta) ---------------------------
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))
        sine = torch.sqrt((1.0 - torch.pow(cosine, 2)).clamp(0, 1))
        phi = cosine * self.cos_m - sine * self.sin_m
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)
        # --------------------------- convert label to one-hot ---------------------------
        # one_hot = torch.zeros(cosine.size(), requires_grad=True, device='cuda')
        one_hot = torch.zeros(cosine.size(), device='cuda')
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)
        # -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # you can use torch.where if your torch.__version__ is 0.4
        output *= self.s
        # print(output)

        return output

train

In [39]:
# test the training step
data_df = val_df.copy()    
num_classes = len(np.unique(data_df['identity'].values))
model = DataParallel(model_resnet18)
metric_fc = DataParallel(ArcMarginProduct(512, num_classes, s=30, m=0.5, easy_margin=False))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam([{'params': model.parameters()}, {'params': metric_fc.parameters()}], lr=0.001)
model = model.train()

In [43]:
for data, label in tqdm(dataloader):
    feature = model(data)
    output = metric_fc(feature, label)
    loss = criterion(output, label.long().cuda())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    output = output.data.cpu().numpy()
    output = np.argmax(output, axis=1)
    label = label.data.cpu().numpy()
    acc = np.mean((output == label).astype(int))

100%|██████████| 48/48 [00:18<00:00,  2.53it/s]


In [None]:
# hyper parameters
data_df = data_df.copy()
num_epochs = 10
batch_size = 32
learning_rate = 0.001
num_classes = len(np.unique(data_df['identity'].values))

# model, metric_fc, criterion, optimizer
model = DataParallel(model)
metric_fc = DataParallel(ArcMarginProduct(512, num_classes, s=30, m=0.5, easy_margin=False))
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam([{'params': model.parameters()}, {'params': metric_fc.parameters()}], lr=learning_rate)

# train
model.train()
for epoch in range(num_epochs):
    for data, label in dataloader:
        feature = model(data)
        output = metric_fc(feature, label)
        loss = criterion(output, label.long().cuda())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))