In [0]:
import torch
import time
import numpy as np
from matplotlib import pyplot as plt
import pylab as pl
from IPython import display
import warnings
warnings.filterwarnings('ignore')

In [0]:
from sklearn.datasets import fetch_lfw_people

_ = fetch_lfw_people(data_home='.', color=True)

In [0]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
from sklearn.model_selection import train_test_split

IMG_WIDTH = 250
IMG_HEIGHT = 250
DATA_MEAN = (0.5, 0.5, 0.5)    # define the mean for the scaling transform - PIL images already come given in 
DATA_STD = (0.5, 0.5, 0.5)        # define the standard deviation for the scaling transform

train_transform = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(),     # apply random horizontal flip
        transforms.RandomCrop(                          # apply random crop, after padding image with 4 values on each side, using `reflect` mode
            size=(IMG_WIDTH, IMG_HEIGHT), 
            padding=(4, 4), 
            padding_mode="reflect"),
        transforms.ToTensor(),
        # MTCNN(image_size=160, margin=32),
        transforms.Normalize(DATA_MEAN, DATA_STD)       # normalize the image tensor to [-1, 1] on each channel: img_norm = (img - data_mean) / data_std 
    ] 
)

test_transform = transforms.Compose(
    [
        transforms.ToTensor(),
        # MTCNN(image_size=160, margin=32),                          # on test set we only need to apply the same normalization
        transforms.Normalize(DATA_MEAN, DATA_STD) 
    ] 
)

data_set = ImageFolder("lfw_home/lfw_funneled")

classes = [el[1] for el in data_set]
filtered_classes = set(filter(lambda el: classes.count(el) >= 5, classes))

num_classes = len(classes)
print(f'Number of classes: {num_classes}')

data_set = [el for el in data_set if el[1] in filtered_classes]

X = [el[0] for el in data_set]
y = [el[1] for el in data_set]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

train_images = [(train_transform(X_train[i]), y_train[i]) for i in range(len(X_train))]
test_images = [(test_transform(X_test[i]), y_test[i]) for i in range(len(X_test))]

In [0]:
BATCH_SIZE_TRAIN = 100  #@param
BATCH_SIZE_TEST = 100  #@param
NO_WORKERS = 8  #@param
SHUFFLE_DATA = True

# create Dataset iterator object using the data previously downloaded
# we shuffle the data and sample repeatedly batches for training
train_loader = torch.utils.data.DataLoader(train_images, batch_size=BATCH_SIZE_TRAIN, 
                                           shuffle=SHUFFLE_DATA, 
                                           num_workers=NO_WORKERS)

# get a training batch of images and labels
(batch_train_images, batch_train_labels) = next(iter(train_loader))

# check that the shape of the training batches is the expected one
print(f'Shape of training images: {batch_train_images.size()}')
print(f'Shape of training labels: {batch_train_labels.size()}')

In [0]:
# we do the same for test dataset
test_loader = torch.utils.data.DataLoader(test_images, batch_size=BATCH_SIZE_TRAIN, 
                                          shuffle=SHUFFLE_DATA, 
                                          num_workers=NO_WORKERS)

def loopy_test_loader(dl):
    data_iter = iter(dl)
    
    while True:
        try:
            yield next(data_iter)
        except StopIteration:
            data_iter = iter(dl)
            yield next(data_iter)

(batch_test_images, batch_test_labels) = next(iter(test_loader))
print(f'Shape of test images: {batch_test_images.size()}')
print(f'Shape of test labels: {batch_test_labels.size()}')

In [0]:
del classes
del filtered_classes
del X
del y
del X_train
del X_test
del y_train
del y_test
del train_images
del test_images

In [0]:
import torch.nn as nn
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f'Running code @ {device}')

class ImageNet(nn.Module):
  def __init__(self):
    super().__init__()
    self.cnn1 = nn.Sequential(
      nn.Conv2d(3, 8, kernel_size=5, padding=2),
      nn.ReLU(inplace=True),
      nn.BatchNorm2d(8),

      nn.Conv2d(8, 16, kernel_size=5, padding=2),
      nn.ReLU(inplace=True),
      nn.BatchNorm2d(16),
      nn.MaxPool2d(2, stride=2)
    )

    self.cnn2 = nn.Sequential(
      nn.Conv2d(16, 32, kernel_size=3, padding=1),
      nn.ReLU(inplace=True),
      nn.BatchNorm2d(32),

      nn.Conv2d(32, 64, kernel_size=3, padding=1),
      nn.ReLU(inplace=True),
      nn.BatchNorm2d(64),
      nn.MaxPool2d(2, stride=2)
    )
        

    self.fc = nn.Sequential(
      nn.Linear(246016, 1024),
      nn.ReLU(inplace=True),
      nn.BatchNorm1d(1024),

      nn.Linear(1024, num_classes),
      nn.ReLU(inplace=True),
      nn.BatchNorm1d(num_classes)
    )

  def forward(self, x):
    x = self.cnn1(x)
    x = self.cnn2(x)
    x = torch.flatten(x, 1)
    x = self.fc(x)

    return x

In [0]:
net = ImageNet()
print(net)

In [0]:
net.train()                         # Default after init is train
net = net.to(device)
select = 2
inputs = batch_train_images.to(device)[:select]
target = batch_train_labels[:select]

output = net(inputs)
_, predicted = torch.max(output, 1)

print(output.size())
print(predicted)
print(target)

In [0]:
# Get number of parameters in a model by iterating through the model parameters
def get_num_params(model):
    num_params = 0
    for params in model.parameters():
        num_params += params.shape.numel()
        
    return num_params

print("Total number of parameters of models")
print(str(net.__class__), ": ", get_num_params(net))  

In [0]:
def top_k_accuracy(k, target, output):
    batch_size = target.size(0)

    _, pred = output.topk(k, 1, True, True)

    pred = pred.t()
    correct = pred.eq(target.to(device).view(1, -1).expand_as(pred))

    correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
    correct_k.mul_(100.0 / batch_size)
    
    return correct_k

In [0]:
lr_init = 0.02              # initial learning rate
lr_factor = 0.1             # learning rate decay factor
weight_decay_factor = 1e-4  # weight decay factor for L2 weight regularization
lr_schedule_milestones = [90e3, 100e3, 110e3]

# Define a Loss function and optimizer
criterion = nn.CrossEntropyLoss()

# Define optimizer - SGD with momentum and weight_decay for L2 weight regularization
optimizer = torch.optim.SGD(net.parameters(), lr=lr_init, momentum=0.9, weight_decay=weight_decay_factor)
#optimizer = torch.optim.Adam(net.parameters(), lr=lr_init, weight_decay=weight_decay_factor)

# Define learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=lr_schedule_milestones, gamma=lr_factor)

In [0]:
# Function that takes a list of losses and plots them.
REFRESH_EVERY = 1000

def plot_losses(train_losses, test_losses, steps, ct):
    if ct % REFRESH_EVERY == 0:
        display.clear_output(wait=True)
    
    display.display(pl.gcf())
    plt.plot(steps, train_losses, c='b')
    plt.plot(steps, test_losses, c='r')
    plt.show()
    time.sleep(1.0)

In [0]:
EPOCHS = 50  # @param
REPORT_TRAIN_EVERY = 100  # @param
PLOT_EVERY = 100  # @param
REPORT_TEST_EVERY = 100  # @param
TEST_ITERS = 100  # @param

In [0]:
train_iter = 0
train_losses = []
test_losses = []
steps = []

# simulate an inifinte test data provider by looping over the test data
test_data_provider = loopy_test_loader(test_loader)

# set model in train mode
net.train()

running_loss = 0.0
running_acc = 0.0
ct = 0

for epoch in range(EPOCHS):  # loop over the dataset multiple times
    
    for i, data in enumerate(train_loader, 0):
        # set the learning rate and decay according to iteration schedule
        lr_scheduler.step()
        
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        running_acc += top_k_accuracy(1, labels, outputs)
        
        if train_iter % REPORT_TRAIN_EVERY == REPORT_TRAIN_EVERY - 1:    # print every REPORT_TRAIN_EVERY mini-batch iterations
            train_loss = running_loss / REPORT_TRAIN_EVERY
            train_acc = running_acc / REPORT_TRAIN_EVERY
            
            print('[%d, %5d, %6d] LR: %.5f' % (epoch + 1, i + 1, train_iter, lr_scheduler.get_lr()[-1]))
            print('[%d, %5d] loss: %.5f, acc: %.5f' %
                  (epoch + 1, i + 1, train_loss, train_acc))
            
            train_losses.append(train_loss)
            steps.append(train_iter)
            
            running_loss = 0
            train_loss = 0
            running_acc = 0
            train_acc = 0
            
            
        if train_iter % PLOT_EVERY == 0:
            plot_losses(train_losses, test_losses, steps, train_iter)
            
        train_iter += 1
    
        if train_iter % REPORT_TEST_EVERY == 0:
            # set model in test mode
            net.eval()
            
            with torch.no_grad():
                # evaluate over at most TEST_ITER sub samples from the test_loader
                test_iter = 0
                test_loss = 0
                correct = 0
                
                while test_iter < TEST_ITERS:
                #for j, test_data in enumerate(test_loader, start=test_ct):
                    test_data = next(test_data_provider)
                        
                    # get the test inputs; data is a list of [inputs, labels]
                    test_inputs, test_labels = test_data[0].to(device), test_data[1].to(device)
                    
                    out = net(test_inputs)

                    test_loss += criterion(out, test_labels)
                    
                    correct += top_k_accuracy(1, test_labels, out)
                    
                    test_iter += 1
                    
                avg_test_loss = test_loss / TEST_ITERS

                test_losses.append(avg_test_loss)

                avg_acc = correct / TEST_ITERS
                
                print('[%d, %5d] avg_test_loss: %.5f, avg_test_acc: %.2f' 
                    % (epoch + 1, i + 1, avg_test_loss, avg_acc))
                
            # set model back in train mode
            net.train()
    
print('Finished Training')

In [0]:
!pip install facenet-pytorch
from facenet_pytorch import MTCNN, InceptionResnetV1

In [0]:
mtcnn = MTCNN(image_size=160, margin=32)
resnet = InceptionResnetV1(pretrained='vggface2').eval()

X = []
y = []

for data, target in data_set:
    cropped = mtcnn(data)
    x = resnet(cropped.unsqueeze(0))

    X.append(x.detach().numpy().flatten())
    y.append(target)

In [0]:
X_train, X_test, y_train, y_test = train_test_split(np.array(X), np.array(y), test_size=0.1, random_state=42)

print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

In [0]:
from statistics import mean, stdev
from sklearn.model_selection import cross_val_score
from sklearn.svm import SVC

kernel_vals = ['linear', 'rbf', 'poly', 'sigmoid']
C_vals = [0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 100.0]
degree_vals = [2, 3, 4, 5, 6, 7]

max_cv = (0, 1)
max_value = ()

for kernel_val in kernel_vals:
    for C_val in C_vals:
        for degree_val in degree_vals:
            clf = SVC(
                gamma='auto',
                kernel=kernel_val,
                C=C_val,
                degree=degree_val
            )

            cv_score = cross_val_score(clf, X_test, y_test, cv=5)

            if round(mean(cv_score), 3) == round(max_cv[0], 3) and stdev(cv_score) < max_cv[1]:
                max_cv = (mean(cv_score), stdev(cv_score))
                max_val = (kernel_val, C_val, degree_val)
            elif mean(cv_score) > max_cv[0]:
                max_cv = (mean(cv_score), stdev(cv_score))
                max_val = (kernel_val, C_val, degree_val)

print(
    'kernel:', max_val[0],
    'C:', max_val[1],
    'degree:', max_val[2]
)

In [0]:
from sklearn.neighbors import KNeighborsClassifier

n_neighbors_vals = [3, 4, 5, 6, 7, 8]
weights_vals = ['uniform', 'distance']
algorithm_vals = ['auto', 'ball_tree', 'kd_tree', 'brute']

max_cv = (0, 1)
max_value = ()

for n_neighbors_val in n_neighbors_vals:
    for weights_val in weights_vals:
        for algorithm_val in algorithm_vals:
            clf = KNeighborsClassifier(
                n_neighbors=n_neighbors_val,
                weights=weights_val,
                algorithm=algorithm_val
            )

            cv_score = cross_val_score(clf, X_test, y_test, cv=5)

            if round(mean(cv_score), 3) == round(max_cv[0], 3) and stdev(cv_score) < max_cv[1]:
                max_cv = (mean(cv_score), stdev(cv_score))
                max_val = (n_neighbors_val, weights_val, algorithm_val)
            elif mean(cv_score) > max_cv[0]:
                max_cv = (mean(cv_score), stdev(cv_score))
                max_val = (n_neighbors_val, weights_val, algorithm_val)

print(
    'n_neighbors:', max_val[0],
    'weights:', max_val[1],
    'algorithm:', max_val[2]
)

In [0]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

def evaluate_metrics(y_test, y_pred):
    print("accuracy:", accuracy_score(y_test, y_pred))
    print("precision:", precision_score(y_test, y_pred, average='weighted'))
    print("recall:", recall_score(y_test, y_pred, average='weighted'))
    print("f1:", f1_score(y_test, y_pred, average='weighted'))

In [0]:
clf = SVC(gamma='auto', kernel='linear', C=3)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)

evaluate_metrics(y_test, y_pred)

In [0]:
clf = KNeighborsClassifier(n_neighbors=3, weights='distance', algorithm='auto')
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)

evaluate_metrics(y_test, y_pred)

In [0]:
class ModifiedFaceNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.resnet = nn.Sequential(
          InceptionResnetV1(pretrained='vggface2'),
          nn.ReLU(inplace=True),
          nn.BatchNorm1d(512)
        )
        self.linear = nn.Sequential(
          nn.Linear(512, num_classes),
          nn.ReLU(inplace=True),
          nn.BatchNorm1d(num_classes)
        )

    def forward(self, x):
        x = self.resnet(x)
        x = self.linear(x)

    return x

In [0]:
net = ModifiedFaceNet()
net.resnet.requires_grad = False

net = net.to(device)

train_iter = 0
train_losses = []
test_losses = []
steps = []

# simulate an inifinte test data provider by looping over the test data
test_data_provider = loopy_test_loader(test_loader)

# set model in train mode
net.train()

running_loss = 0.0
running_acc = 0.0
ct = 0

for epoch in range(EPOCHS):  # loop over the dataset multiple times
    
    for i, data in enumerate(train_loader, 0):
        # set the learning rate and decay according to iteration schedule
        lr_scheduler.step()
        
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        running_acc += top_k_accuracy(1, labels, outputs)
        
        if train_iter % REPORT_TRAIN_EVERY == REPORT_TRAIN_EVERY - 1:    # print every REPORT_TRAIN_EVERY mini-batch iterations
            train_loss = running_loss / REPORT_TRAIN_EVERY
            train_acc = running_acc / REPORT_TRAIN_EVERY
            
            print('[%d, %5d, %6d] LR: %.5f' % (epoch + 1, i + 1, train_iter, lr_scheduler.get_lr()[-1]))
            print('[%d, %5d] loss: %.5f, acc: %.5f' %
                  (epoch + 1, i + 1, train_loss, train_acc))
            
            train_losses.append(train_loss)
            steps.append(train_iter)
            
            running_loss = 0
            train_loss = 0
            running_acc = 0
            train_acc = 0
            
            
        if train_iter % PLOT_EVERY == 0:
            plot_losses(train_losses, test_losses, steps, train_iter)
            
        train_iter += 1
    
        if train_iter % REPORT_TEST_EVERY == 0:
            # set model in test mode
            net.eval()
            
            with torch.no_grad():
                # evaluate over at most TEST_ITER sub samples from the test_loader
                test_iter = 0
                test_loss = 0
                correct = 0
                
                while test_iter < TEST_ITERS:
                #for j, test_data in enumerate(test_loader, start=test_ct):
                    test_data = next(test_data_provider)
                        
                    # get the test inputs; data is a list of [inputs, labels]
                    test_inputs, test_labels = test_data[0].to(device), test_data[1].to(device)
                    
                    out = net(test_inputs)

                    test_loss += criterion(out, test_labels)
                    
                    correct += top_k_accuracy(1, test_labels, out)
                    
                    test_iter += 1
                    
                avg_test_loss = test_loss / TEST_ITERS

                test_losses.append(avg_test_loss)

                avg_acc = correct / TEST_ITERS
                
                print('[%d, %5d] avg_test_loss: %.5f, avg_test_acc: %.2f' 
                    % (epoch + 1, i + 1, avg_test_loss, avg_acc))
                
            # set model back in train mode
            net.train()
    
print('Finished Training')