In [5]:
from facenet_pytorch import MTCNN, InceptionResnetV1
import numpy as np

In [6]:
mtcnn = MTCNN(image_size=160, margin=16)
resnet = InceptionResnetV1(pretrained='vggface2').eval()

In [7]:
from torch.utils.data import Dataset, SubsetRandomSampler
import os
from PIL import Image, ImageOps
from torchvision import transforms
import torch
import numpy as np
import cv2
import torch.nn as nn
import torch.optim as optim

class LFW(Dataset):
    def __init__(self, path_to_data, transform=None):
        self.transform = transform
        data_same_person = []
        data_dif_person = []
        with open(path_to_data, 'r') as f:
            line = f.read()
            line = line.split('\n')
            line = [i.split('\t') for i in line]
            [data_same_person.append(i) if len(i) == 3 else data_dif_person.append(i) for i in line]
        data_same_person_res = []
        for i in data_same_person:
            if len(i[1]) == 1:
                inp = f'./lfw/{i[0]}/{i[0]}_000{i[1]}.jpg'
            elif len(i[1]) ==2:
                inp = f'./lfw/{i[0]}/{i[0]}_00{i[1]}.jpg'
            else:
                inp = f'./lfw/{i[0]}/{i[0]}_0{i[1]}.jpg'
            if len(i[2]) == 1:
                tar = f'./lfw/{i[0]}/{i[0]}_000{i[2]}.jpg'
            elif len(i[2]) ==2:
                tar = f'./lfw/{i[0]}/{i[0]}_00{i[2]}.jpg'
            else:
                tar = f'./lfw/{i[0]}/{i[0]}_0{i[2]}.jpg'
            data_same_person_res.append([inp, tar, 1.0])

        #data_same_person_res = [[f'./lfw/{i[0]}/{i[0]}_000{i[1]}.jpg' if len(i[1]) == 1 else f'./lfw/{i[0]}/{i[0]}_00{i[1]}.jpg',
                                 #f'./lfw/{i[0]}/{i[0]}_000{i[2]}.jpg' if len(i[2]) == 1 else f'./lfw/{i[0]}/{i[0]}_00{i[2]}.jpg',
                                 #1.0] for i in data_same_person]
        del (data_dif_person[-1])
        data_dif_person_res = []
        for i in data_dif_person:
            if len(i[1]) == 1:
                inp = f'./lfw/{i[0]}/{i[0]}_000{i[1]}.jpg'
            elif len(i[1]) ==2:
                inp = f'./lfw/{i[0]}/{i[0]}_00{i[1]}.jpg'
            else:
                inp = f'./lfw/{i[0]}/{i[0]}_0{i[1]}.jpg'
            if len(i[3]) == 1:
                tar = f'./lfw/{i[2]}/{i[2]}_000{i[3]}.jpg'
            elif len(i[3]) ==2:
                tar = f'./lfw/{i[2]}/{i[2]}_00{i[3]}.jpg'
            else:
                tar = f'./lfw/{i[2]}/{i[2]}_0{i[3]}.jpg'
            data_dif_person_res.append([inp, tar, 0.0])
        #data_dif_person_res = [[f'./lfw/{i[0]}/{i[0]}_000{i[1]}.jpg' if len(i[1]) == 1 else f'./lfw/{i[0]}/{i[0]}_00{i[1]}.jpg',
        #                        f'./lfw/{i[2]}/{i[2]}_000{i[3]}.jpg' if len(i[3]) == 1 else f'./lfw/{i[2]}/{i[2]}_00{i[3]}.jpg',
        #                        0.0] for i in data_dif_person]
        self.data = data_same_person_res + data_dif_person_res

    def __len__(self):
        return len(self.data)

    '''def __getitem__(self, index):
        img_input = mtcnn(Image.open(self.data[index][0])).detach().numpy()
        img_target = mtcnn(Image.open(self.data[index][1])).detach().numpy()
        label = float(self.data[index][2])
        return img_input, img_target, label'''

    def __getitem__(self, index):
        img_inp = Image.open(self.data[index][0])
        img_input = mtcnn(img_inp).detach().numpy()

        tar = Image.open(self.data[index][1])
        img_target = mtcnn(tar).detach().numpy()
        img_target = np.reshape(img)
        label = float(self.data[index][2])
        return img_input, img_target, label


class LFW_Train(Dataset):
    def __init__(self, path_to_data, transform=None):
        self.transform = transform
        data_same_person = []
        data_dif_person = []
        with open(path_to_data, 'r') as f:
            line = f.read()
            line = line.split('\n')
            line = [i.split('\t') for i in line]
            [data_same_person.append(i) if len(i) == 3 else data_dif_person.append(i) for i in line]
        res = []
        for i in data_same_person:
            for j in data_dif_person:
                if i[0] in j:
                    index = j.index(i[0])
                    if index == 0:
                        index = 2
                    else:
                        index = 0
                    tmp = i.copy()
                    tmp.append(j[index])
                    tmp.append(j[index+1])
                    res.append(tmp)
        data_res = []
        for i in res:
            if len(i[1]) == 1:
                anchor = f'./lfw/{i[0]}/{i[0]}_000{i[1]}.jpg'
            elif len(i[1]) == 2:
                anchor = f'./lfw/{i[0]}/{i[0]}_00{i[1]}.jpg'
            else:
                anchor = f'./lfw/{i[0]}/{i[0]}_0{i[1]}.jpg'
            if len(i[2]) == 1:
                pos = f'./lfw/{i[0]}/{i[0]}_000{i[2]}.jpg'
            elif len(i[2]) == 2:
                pos = f'./lfw/{i[0]}/{i[0]}_00{i[2]}.jpg'
            else:
                pos = f'./lfw/{i[0]}/{i[0]}_0{i[2]}.jpg'
            if len(i[4]) == 1:
                neg = f'./lfw/{i[3]}/{i[3]}_000{i[4]}.jpg'
            elif len(i[4]) == 2:
                neg = f'./lfw/{i[3]}/{i[3]}_00{i[4]}.jpg'
            else:
                neg = f'./lfw/{i[3]}/{i[3]}_0{i[4]}.jpg'
            data_res.append([anchor, pos, neg])
        self.data = data_res

    def __len__(self):
        return len(self.data)

    '''def __getitem__(self, index):
        img_input = mtcnn(Image.open(self.data[index][0])).detach().numpy()
        img_target = mtcnn(Image.open(self.data[index][1])).detach().numpy()
        label = float(self.data[index][2])
        return img_input, img_target, label'''

    ''' def __getitem__(self, index):
        img_inp = Image.open(self.data[index][0])
        img_inp = np.asarray(ImageOps.grayscale(img_inp))
        img_inp = cv2.merge([img_inp, img_inp, img_inp])
        anchor = mtcnn(img_inp).detach().numpy()

        pos = Image.open(self.data[index][1])
        pos = np.asarray(ImageOps.grayscale(pos))
        pos = cv2.merge([pos, pos, pos])
        pos = mtcnn(pos).detach().numpy()

        tar = Image.open(self.data[index][2])
        tar = np.asarray(ImageOps.grayscale(tar))
        tar = cv2.merge([tar, tar, tar])
        neg = mtcnn(tar).detach().numpy()

        return anchor, pos, neg'''

    def __getitem__(self, index):
        img_inp = Image.open(self.data[index][0])
        anchor = mtcnn(img_inp).detach().numpy()
        anchor = np.reshape(anchor, (anchor.shape[1], anchor.shape[2], 3))

        anchor = cv2.cvtColor(anchor, cv2.COLOR_BGR2GRAY)


        pos = Image.open(self.data[index][1])
        pos = mtcnn(pos).detach().numpy()
        pos = np.reshape(pos, (pos.shape[1], pos.shape[2], 3))

        pos = cv2.cvtColor(pos, cv2.COLOR_BGR2GRAY)
        pos = cv2.merge([pos, pos, pos])
        pos = np.reshape(pos, (3, pos.shape[0], pos.shape[0]))

        neg = Image.open(self.data[index][2])
        neg = mtcnn(neg).detach().numpy()
        neg = np.reshape(neg, (neg.shape[1], neg.shape[2], 3))

        neg = cv2.cvtColor(neg, cv2.COLOR_BGR2GRAY)
        neg = cv2.merge([neg, neg, neg])
        neg = np.reshape(neg, (3, neg.shape[0], neg.shape[0]))


        if self.transform:
            anchor = self.transform(image=anchor)['image']
            anchor = controller(anchor)
            anchor = cv2.merge([anchor, anchor, anchor])
            anchor = np.reshape(anchor, (3, anchor.shape[0], anchor.shape[0]))

        return anchor, pos, neg



In [4]:
import random
def controller(img, brightness=40, contrast=127):
    p = random.randint(1, 3)
    if p == 2:
        return img
    p = random.randint(1, 3)
    if p == 1:
        brightness = 40
    elif p == 2:
        brightness = 60
    else:
        brightness = 80
    brightness = int((brightness - 0) * (255 - (-255)) / (510 - 0) + (-255))
    contrast = int((contrast - 0) * (127 - (-127)) / (254 - 0) + (-127))
    if brightness != 0:
        if brightness > 0:
            shadow = brightness
            max = 255
        else:
            shadow = 0
            max = 255 + brightness
        al_pha = (max - shadow) / 255
        ga_mma = shadow
        # The function addWeighted calculates
        # the weighted sum of two arrays
        cal = cv2.addWeighted(img, al_pha,
                              img, 0, ga_mma)
    else:
        cal = img
    if contrast != 0:
        Alpha = float(131 * (contrast + 127)) / (127 * (131 - contrast))
        Gamma = 127 * (1 - Alpha)
        # The function addWeighted calculates
        # the weighted sum of two arrays
        cal = cv2.addWeighted(cal, Alpha,
                              cal, 0, Gamma)
    return cal

In [5]:
import albumentations as A

train_dataset = LFW_Train("./pairsDevTrain.txt",
                        transform=A.Compose([
                                A.HorizontalFlip(p=0.2),
                                A.Rotate(limit=(0, 15), p=0.3)
                            ])
                      )
test_dataset = LFW("./pairsDevTest.txt",
                       transform=A.Compose([
                                A.HorizontalFlip(p=0.2),
                                A.Rotate(limit=(0, 15), p=0.3)
                            ])
                      )

In [1]:
resnet

NameError: name 'resnet' is not defined

In [6]:
device = torch.device("cuda:0")
resnet = resnet.to(device)
batch_size = 64

data_size = len(train_dataset)
validation_fraction = .2


val_split = int(np.floor((validation_fraction) * data_size))
indices = list(range(data_size))
np.random.seed(43)
np.random.shuffle(indices)

val_indices, train_indices = indices[:val_split], indices[val_split:]

train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
                                           sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,
                                         sampler=val_sampler)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

In [7]:
from sklearn.metrics import f1_score
def compute_f1(model, grt):
    model.eval()
    for x, y, z in grt:
        x = x.to(device=device)
        y = y.to(device=device)
        z = z.to(device=device)
        predictx = model(x)
        predicty = model(y)
        predictz = model(z)
        pred = []
        label = []
        for i in range(len(predictx)):
            pred.append((1+sum(predictx[i]*predicty[i]))/2)
            pred.append((1+sum(predictx[i]*predictz[i]))/2)
            label.append(1.0)
            label.append(0.0)
        pred = np.array(list(map(lambda x: int(x > 0.8), pred)))
        label = np.array(label)
        return f1_score(label, pred)

def compute_f1_test(model, grt):
    model.eval()
    for x, y, label in grt:
        x = x.to(device=device)
        y = y.to(device=device)

        predictx = model(x)
        predicty = model(y)
        pred = []
        for i in range(len(predictx)):
            pred.append((1+sum(predictx[i]*predicty[i]))/2)
        print(pred)
        pred = np.array(list(map(lambda x: int(x > 0.8), pred)))
        return f1_score(label, pred)

In [8]:
def train_model(model, train_loader, val_loader, loss, optimizer, num_epochs):
    loss_history = []
    val_history = []
    for epoch in range(num_epochs):
        model.train() # Enter train mode
        loss_accum = 0
        for i_step, (a, p, n) in enumerate(train_loader):
            a_gpu = torch.Tensor(a).to(device)
            p_gpu = torch.Tensor(p).to(device)
            n_gpu = torch.Tensor(n).to(device)
            a_gpu.requires_grad = True
            p_gpu.requires_grad = True
            n_gpu.requires_grad = True
            loss_value = loss(a_gpu, p_gpu, n_gpu)
            optimizer.zero_grad()
            loss_value.backward()
            optimizer.step()
            loss_accum += loss_value

        ave_loss = loss_accum / i_step
        val_f1 = compute_f1(model, val_loader)

        loss_history.append(float(ave_loss))
        val_history.append(val_f1)

        print("Average loss: %f, Val f1: %f" % (ave_loss, val_f1))

    return loss_history, val_history

In [10]:
params_old = list([param for name, param in resnet.named_parameters() if not name.startswith('logits.')])
params_new = list([param for name, param in resnet.named_parameters() if name.startswith('logits.')])

parameters = [
    {'params': params_old, 'lr': 0.0001, 'momentum': 0.9},
    {'params': params_new, 'lr': 0.001, 'momentum': 0.9}
]
optimizer = optim.Adam(parameters)
loss = nn.TripletMarginLoss(margin=1.0, p=2)
train_model(resnet, train_loader, val_loader, loss, optimizer, 100)
compute_f1_test(resnet, test_loader)

Average loss: 1.480623, Val f1: 0.732673
Average loss: 1.476205, Val f1: 0.693878
Average loss: 1.478031, Val f1: 0.732673
Average loss: 1.468294, Val f1: 0.732673
Average loss: 1.482786, Val f1: 0.732673
Average loss: 1.469846, Val f1: 0.757282
Average loss: 1.452557, Val f1: 0.666667
Average loss: 1.494706, Val f1: 0.745098
Average loss: 1.472408, Val f1: 0.720000
Average loss: 1.452492, Val f1: 0.720000
Average loss: 1.417670, Val f1: 0.666667
Average loss: 1.499693, Val f1: 0.757282
Average loss: 1.463369, Val f1: 0.732673
Average loss: 1.450621, Val f1: 0.825688
Average loss: 1.494826, Val f1: 0.720000
Average loss: 1.479028, Val f1: 0.720000
Average loss: 1.497553, Val f1: 0.693878
Average loss: 1.464062, Val f1: 0.732673
Average loss: 1.463396, Val f1: 0.666667
Average loss: 1.464835, Val f1: 0.680412
Average loss: 1.490364, Val f1: 0.707071
Average loss: 1.466642, Val f1: 0.680412
Average loss: 1.468490, Val f1: 0.652632
Average loss: 1.492148, Val f1: 0.745098
Average loss: 1.

  if method is "Min":
  if method is "Min":


KeyboardInterrupt: 