In [46]:
import time

import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
import torch.nn.functional as F

import os
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms

import argparse

import sys
sys.path.append( "/kaggle/input/assignment2input" )
# from model import SphereCNN
# from dataloader import LFW4Training, LFW4Eval
# from parser import parse_args # suggestion from vs code
from utils import set_seed, AverageMeter

# parser.py

In [47]:
def parse_args(args=None):
    parser = argparse.ArgumentParser(description="SphereFace")

    parser.add_argument('--seed', type=int, default=2021)
    parser.add_argument('--device', type=str, default="cuda:0")

    parser.add_argument('--batch_size', type=int, default=128) # batch size = 128 due to pg. 6 in CNNs Setup
    parser.add_argument('--epoch', type=int, default=100)
    parser.add_argument('--lr', type=float, default=1e-3)
    parser.add_argument('--eval_interval', type=int, default=20)

    # EDITING TO USE KAGGLE INPUT DATA
    parser.add_argument('--train_file', type=str, default="/kaggle/input/assignment2input/pairsDevTrain.txt")
    parser.add_argument('--eval_file', type=str, default="/kaggle/input/assignment2input/pairsDevTest.txt")
    parser.add_argument('--img_folder', type=str, default="/kaggle/input/lfwdata/lfw")

    if args is None:
      args=[]
    args = parser.parse_args(args)
    return args

# model.py

Design of Neural Network architecture.

## A-Softmax Loss Function

This implements the loss function (A-softmax Loss).

In [48]:
class AngularPenaltySMLoss(nn.Module):
    def __init__(self, in_features, out_features, eps=1e-7, m=None):
        super(AngularPenaltySMLoss, self).__init__()

        self.m = 4. if not m else m

        self.in_features = in_features
        self.out_features = out_features
        self.fc = nn.Linear(in_features, out_features, bias=False)
        self.eps = eps

    def forward(self, x, labels):
        '''
        input shape (N, in_features)
        '''
        assert len(x) == len(labels)
        assert torch.min(labels) >= 0
        assert torch.max(labels) < self.out_features

        # normalizes weights of linear layer
        for W in self.fc.parameters():
            W = F.normalize(W, p=2, dim=1)

        x = F.normalize(x, p=2, dim=1)

        wf = self.fc(x)

        # calculates numerator of loss function
        numerator = torch.cos(self.m * torch.acos(
            torch.clamp(torch.diagonal(wf.transpose(0, 1)[labels]), -1. + self.eps, 1 - self.eps)))

        excl = torch.cat([torch.cat((wf[i, :y], wf[i, y + 1:])).unsqueeze(0) for i, y in enumerate(labels)], dim=0)
        
        # calculates denominator of loss function
        denominator = torch.exp(numerator) + torch.sum(torch.exp(excl), dim=1)
        L = numerator - torch.log(denominator)

        return -torch.mean(L)



In [49]:
class SphereCNN(nn.Module):
    def __init__(self, class_num: int, feature=False):
        super(SphereCNN, self).__init__()
        self.class_num = class_num
        self.feature = feature

        # 4-LAYER CONVOLUTIONAL NEURAL NETWORK
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=128, kernel_size=3, stride=2)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=2)
        self.conv4 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=2)

        self.fc5 = nn.Linear(512 * 5 * 5, 512)
        self.angular = AngularPenaltySMLoss(512, self.class_num) # A-Softmax Loss

    def forward(self, x, y):
        # 4-Layer Convolution Network
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))  # batch_size (0) * out_channels (1) * height (2) * width (3)

        x = x.view(x.size(0), -1)  # batch_size (0) * (out_channels * height * width)
        x = self.fc5(x)

        if self.feature or y is None:
            return x
        else:
            x_angle = self.angular(x, y)
            return x, x_angle


In [50]:
# if __name__ == "__main__":
net = SphereCNN(50)
input = torch.ones(64, 3, 96, 96)
output = net(input, None)

# dataloader.py

<code>import os
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms</code>

## LFW4Training(Dataset)

Training

In [51]:
class LFW4Training(Dataset):
    def __init__(self, train_file: str, img_folder: str):
        self.img_folder = img_folder

        names = os.listdir(img_folder)
        self.name2label = {name: idx for idx, name in enumerate(names)}
        self.n_label = len(self.name2label)

        with open(train_file) as f:
            train_meta_info = f.read().splitlines()

        self.train_list = []
        for line in train_meta_info:
            line = line.split("\t")
            if len(line) == 3:
                self.train_list.append(os.path.join(line[0], line[0] + "_" + str(line[1]).zfill(4) + ".jpg"))
                self.train_list.append(os.path.join(line[0], line[0] + "_" + str(line[2]).zfill(4) + ".jpg"))
            elif len(line) == 4:
                self.train_list.append(os.path.join(line[0], line[0] + "_" + str(line[1]).zfill(4) + ".jpg"))
                self.train_list.append(os.path.join(line[2], line[2] + "_" + str(line[3]).zfill(4) + ".jpg"))
            else:
                pass

        self.transform = transforms.Compose([
            transforms.Resize(96),
            transforms.RandomHorizontalFlip(), # DATA AUGMENTATION - horizontally flipped as in pg.6
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                 std=[0.5, 0.5, 0.5]),
        ])

    def __getitem__(self, index):
        img_path = self.train_list[index]

        img = Image.open(os.path.join(self.img_folder, img_path))
        img = self.transform(img)

        name = img_path.split("/")[0]
        label = self.name2label[name]

        return img, label

    def __len__(self):
        return len(self.train_list)

## LFW4Eval(Dataset)

In [52]:
class LFW4Eval(Dataset):
    def __init__(self, eval_file: str, img_folder: str):
        self.img_folder = img_folder

        with open(eval_file) as f:
            eval_meta_info = f.read().splitlines()

        self.eval_list = []
        for line in eval_meta_info:
            line = line.split("\t")
            if len(line) == 3:
                eval_pair = (
                    os.path.join(line[0], line[0] + "_" + str(line[1]).zfill(4) + ".jpg"),
                    os.path.join(line[0], line[0] + "_" + str(line[2]).zfill(4) + ".jpg"),
                    1,
                )
                self.eval_list.append(eval_pair)
            elif len(line) == 4:
                eval_pair = (
                    os.path.join(line[0], line[0] + "_" + str(line[1]).zfill(4) + ".jpg"),
                    os.path.join(line[2], line[2] + "_" + str(line[3]).zfill(4) + ".jpg"),
                    0,
                )
                self.eval_list.append(eval_pair)
            else:
                pass

        self.transform = transforms.Compose([
            transforms.Resize(96),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                 std=[0.5, 0.5, 0.5]),
        ])

    def __getitem__(self, index):
        img_1_path, img_2_path, label = self.eval_list[index]

        img_1 = Image.open(os.path.join(self.img_folder, img_1_path))
        img_2 = Image.open(os.path.join(self.img_folder, img_2_path))
        img_1 = self.transform(img_1)
        img_2 = self.transform(img_2)

        return img_1, img_2, label

    def __len__(self):
        return len(self.eval_list)

# main.py

In [53]:
# TESTING AND RESULTS

def eval(data_loader: DataLoader, model: SphereCNN, device: torch.device, threshold: float = 0.5):
    model.eval()
    model.feature = True
    sim_func = nn.CosineSimilarity()

    cnt = 0.
    total = 0.

    t1 = time.time()
    with torch.no_grad():
        for img_1, img_2, label in data_loader:
            img_1 = img_1.to(device)
            img_2 = img_2.to(device)
            label = label.to(device)

            feat_1 = model(img_1, None)
            feat_2 = model(img_2, None)
            sim = sim_func(feat_1, feat_2)

            sim[sim > threshold] = 1
            sim[sim <= threshold] = 0

            total += sim.size(0)
            for i in range(sim.size(0)):
                if sim[i] == label[i]:
                    cnt += 1

    print("Acc.: %.4f; Time: %.3f" % (cnt / total, time.time() - t1))
    return

In [54]:
args = parse_args()

set_seed(args.seed)
device = torch.device(args.device)

# DATA LOADING
# size determination of train/validation sets
train_set_og = LFW4Training(args.train_file, args.img_folder)

validation_split = 0.2 # 20% of training data 
train_size = len(train_set_og)
val_size = int(validation_split * train_size)
train_size = train_size - val_size

# creating datasets
# train_set = LFW4Training(args.train_file, args.img_folder)
train_set, val_set = torch.utils.data.random_split(train_set_og, [train_size, val_size])
eval_set = LFW4Eval(args.eval_file, args.img_folder)

# creating data loader
train_loader = DataLoader(train_set, batch_size=args.batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=args.batch_size)
eval_loader = DataLoader(eval_set, batch_size=args.batch_size)


# INITIALIZE NEURAL NETWORK
model = SphereCNN(class_num=train_set_og.n_label)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr)

loss_record = AverageMeter()

# TRAINING DATA
for epoch in range(args.epoch):
    t1 = time.time()
    model.train()
    model.feature = False
    loss_record.reset()

    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        _, loss = model(inputs, targets)
        loss.backward()
        optimizer.step()

        loss_record.update(loss)

    print("Epoch: %s; Loss: %.3f; Time: %.3f" % (str(epoch).zfill(2), loss_record.avg, time.time() - t1))

    if (epoch + 1) % args.eval_interval == 0:
        torch.save(model.state_dict(), "/spherecnn.pth")
        eval(eval_loader, model, device)
        
        #VALIDATION - attempting to validate data
#         val_loss = 0.0
#         model.eval()
#         with torch.no_grad():
#             for inputs, targets in val_loader:
#                 inputs = inputs.to(device)
#                 targets = targets.to(device)

#                 outputs, loss = model(inputs, targets)
#                 val_loss += loss.item() * inputs.size(0)
#         val_loss /= len(val_set)
#         print("Validation Loss: %.3f" % val_loss)
        

Epoch: 00; Loss: 7.629; Time: 10.301
Epoch: 01; Loss: 7.388; Time: 8.811
Epoch: 02; Loss: 7.109; Time: 8.842
Epoch: 03; Loss: 6.923; Time: 8.534
Epoch: 04; Loss: 6.812; Time: 8.778
Epoch: 05; Loss: 6.741; Time: 8.846
Epoch: 06; Loss: 6.697; Time: 8.908
Epoch: 07; Loss: 6.663; Time: 9.101
Epoch: 08; Loss: 6.637; Time: 8.868
Epoch: 09; Loss: 6.613; Time: 8.601
Epoch: 10; Loss: 6.589; Time: 8.674
Epoch: 11; Loss: 6.564; Time: 8.839
Epoch: 12; Loss: 6.536; Time: 8.891
Epoch: 13; Loss: 6.508; Time: 8.862
Epoch: 14; Loss: 6.478; Time: 9.222
Epoch: 15; Loss: 6.447; Time: 8.831
Epoch: 16; Loss: 6.415; Time: 8.790
Epoch: 17; Loss: 6.385; Time: 8.613
Epoch: 18; Loss: 6.352; Time: 8.842
Epoch: 19; Loss: 6.318; Time: 8.923
Acc.: 0.5500; Time: 4.605
Epoch: 20; Loss: 6.284; Time: 9.144
Epoch: 21; Loss: 6.249; Time: 8.941
Epoch: 22; Loss: 6.216; Time: 8.639
Epoch: 23; Loss: 6.184; Time: 8.525
Epoch: 24; Loss: 6.150; Time: 8.793
Epoch: 25; Loss: 6.117; Time: 8.640
Epoch: 26; Loss: 6.083; Time: 8.606
E