In [None]:
import torch
import gc
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, models
from torch import nn
from torch import optim
import json
import PIL
from PIL import Image
import io
import cv2
import torchvision.transforms.functional as TF
from torchvision.utils import save_image
from torchvision.transforms import ToPILImage
from matplotlib import pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.utils.data as data
import torch.optim as optim
from torch.optim import lr_scheduler
from tqdm import tqdm
import time
import os
import copy
import pickle
import urllib.request
import requests
from matplotlib.pyplot import imshow
import random
from utils import *
from resnet50_ft_dims_2048 import *

In [None]:
#Function to display image and predicted class
def imstats(name):
    # read in image and view it
    x = readim(name, forward_normalize) 
    imshow_tensor(x[0], inv_normalize)

    #get predicted class and probability
    prob = lay2(pretrained_model(x.cuda()))
    maxcls = prob.max(1)
    print("Class is {} ({}) with confidence {}%".format(maxcls.indices.item(),class_dict[maxcls.indices.item()],round(maxcls.values.item()*100,4)))

In [None]:
model_img_size = 224
model_transform = transforms.Compose([transforms.Resize((model_img_size,model_img_size)),
                                         transforms.ToTensor(),
                                         transforms.Normalize(mean=[91.4953, 103.8827, 131.0912],
                                                              std=[1, 1, 1])
                                     ])

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = resnet50_ft("vgg_face_testimages/resnet50_ft_dims_2048.pth")
model.to(device)
model.eval()

In [None]:
def load_data(img, shape=None):
    short_size = 224.0
    crop_size = shape
    im_shape = np.array(img.size)    # in the format of (width, height, *)
    img = img.convert('RGB')

    ratio = float(short_size) / np.min(im_shape)
    img = img.resize(size=(int(np.ceil(im_shape[0] * ratio)),   # width
                           int(np.ceil(im_shape[1] * ratio))),  # height
                     resample=PIL.Image.BILINEAR)

    x = np.array(img)  # image has been transposed into (height, width)
    newshape = x.shape[:2]
    h_start = (newshape[0] - crop_size[0])//2
    w_start = (newshape[1] - crop_size[1])//2
    x = x[h_start:h_start+crop_size[0], w_start:w_start+crop_size[1]]
    x = x - mean
    return x

def image_encoding(model, images):
    #print('==> compute image-level feature encoding.')
    num_faces = len(images)
    im_array = np.array([load_data(img=i, shape=(224, 224, 3)) for i in images])
    im_tensor = torch.Tensor(im_array.transpose(0, 3, 1, 2))
    im_tensor = im_tensor.to(device)
    f  = model(im_tensor)
    classif = f[0]
    feat = f[1].detach().cpu().numpy()[:, :, 0, 0]
    face_feats = feat / np.sqrt(np.sum(feat ** 2, -1, keepdims=True))
    return classif, face_feats

In [None]:
def predict_features(images):
    out, face_feats = image_encoding(model, images)
    return out, face_feats

def fetch_images(paths):
    images = []
    for im in paths:
        images.append(Image.open(im))
    return images

def process_dataset(dataset):
    svm_input = []
    svm_labels = []
    for i in tqdm(range(0,len(dataset),30)):
        batch = dataset[i:min(i+30,len(dataset))]
        images = fetch_images(batch)
        out, feats = predict_features(images)
        for ind, path in enumerate(batch):
            svm_input.append(feats[ind])
            svm_labels.append(int(path.split("/")[-2].split("n00")[-1]))
        del images
        del feats
        del out
    return svm_input, svm_labels

In [None]:
def most_occuring_label(pred):
    #Use softmax to get predicted probability and view it
    lay2 = torch.nn.Softmax(dim=1)
    prob = lay2(pred)
    maxOcls = prob.max(1)
    labels, counts = maxOcls.indices.unique(return_counts=True)
    return labels[counts.max(0)[1]], counts[counts.max(0)[1]], counts.sum()

In [None]:
mean = (131.0912, 103.8827, 91.4953)
batch_size = 1

vgg_test_dir = "/nobackup/vgg2face/test/"
class_dir_list = os.listdir(vgg_test_dir)
train_dataset_paths = []
test_dataset_paths = []

for class_dir in class_dir_list:
    class_dir = vgg_test_dir+class_dir+"/"
    all_images = []
    for image in os.listdir(class_dir):
        all_images.append(class_dir+image)
    random.shuffle(all_images)
    test_dataset_paths += all_images[:50]
    train_dataset_paths += all_images[50:]

train_input, train_labels = process_dataset(train_dataset_paths)
test_input, test_labels = process_dataset(test_dataset_paths)

In [None]:
class ClassifierDataset(Dataset):
    
    def __init__(self, X_data, y_data):
        self.X_data = X_data
        self.y_data = y_data
        
    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]
        
    def __len__ (self):
        return len(self.X_data)

X_train = np.array(train_input)
y_train = np.array(train_labels)
class_labels = np.unique(y_train)
class_dict = {}
for i,v in enumerate(class_labels):
    class_dict[v]=i
y_train = np.array([class_dict[x] for x in train_labels])
X_val = np.array(test_input)
y_val = np.array([class_dict[x] for x in test_labels])
train_dataset = ClassifierDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train).long())
val_dataset = ClassifierDataset(torch.from_numpy(X_val).float(), torch.from_numpy(y_val).long())

In [None]:
EPOCHS = 10
BATCH_SIZE = 16
LEARNING_RATE = 0.0007
NUM_FEATURES = 2048
NUM_CLASSES = 500

In [None]:
train_loader = DataLoader(dataset=train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True
)
val_loader = DataLoader(dataset=val_dataset, batch_size=16)

from sklearn import svm

In [None]:
class MulticlassClassification(nn.Module):
    def __init__(self, num_feature, num_class):
        super(MulticlassClassification, self).__init__()
        
        self.layer_1 = nn.Linear(num_feature, 1024)
        self.layer_2 = nn.Linear(1024, 512)
        self.layer_3 = nn.Linear(512, 128)
        self.layer_out = nn.Linear(128, num_class) 
        
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.2)
        self.batchnorm1 = nn.BatchNorm1d(1024)
        self.batchnorm2 = nn.BatchNorm1d(512)
        self.batchnorm3 = nn.BatchNorm1d(128)
        
    def forward(self, x):
        x = self.layer_1(x)
        x = self.batchnorm1(x)
        x = self.relu(x)
        
        x = self.layer_2(x)
        x = self.batchnorm2(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.layer_3(x)
        x = self.batchnorm3(x)
        x = self.relu(x)
        x = self.dropout(x)
        
        x = self.layer_out(x)
        
        return x

In [None]:
model_nn = MulticlassClassification(num_feature = NUM_FEATURES, num_class=NUM_CLASSES)
model_nn.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_nn.parameters(), lr=LEARNING_RATE)

def multi_acc(y_pred, y_test):
    y_pred_softmax = torch.log_softmax(y_pred, dim = 1)
    _, y_pred_tags = torch.max(y_pred_softmax, dim = 1)    
    
    correct_pred = (y_pred_tags == y_test).float()
    acc = correct_pred.sum() / len(correct_pred)
    
    acc = torch.round(acc) * 100
    
    return acc

accuracy_stats = {
    'train': [],
    "val": []
}
loss_stats = {
    'train': [],
    "val": []
}

In [None]:
print("Begin training.")
for e in tqdm(range(1, EPOCHS+1)):
    
    # TRAINING
    train_epoch_loss = 0
    train_epoch_acc = 0
    model_nn.train()
    for X_train_batch, y_train_batch in train_loader:
        X_train_batch, y_train_batch = X_train_batch.to(device), y_train_batch.to(device)
        optimizer.zero_grad()
        
        y_train_pred = model_nn(X_train_batch)
        
        train_loss = criterion(y_train_pred, y_train_batch)
        train_acc = multi_acc(y_train_pred, y_train_batch)
        
        train_loss.backward()
        optimizer.step()
        
        train_epoch_loss += train_loss.item()
        train_epoch_acc += train_acc.item()
        
        
    # VALIDATION    
    with torch.no_grad():
        
        val_epoch_loss = 0
        val_epoch_acc = 0
        
        model_nn.eval()
        for X_val_batch, y_val_batch in val_loader:
            X_val_batch, y_val_batch = X_val_batch.to(device), y_val_batch.to(device)
            
            y_val_pred = model_nn(X_val_batch)
                        
            val_loss = criterion(y_val_pred, y_val_batch)
            val_acc = multi_acc(y_val_pred, y_val_batch)
            
            val_epoch_loss += val_loss.item()
            val_epoch_acc += val_acc.item()
    loss_stats['train'].append(train_epoch_loss/len(train_loader))
    loss_stats['val'].append(val_epoch_loss/len(val_loader))
    accuracy_stats['train'].append(train_epoch_acc/len(train_loader))
    accuracy_stats['val'].append(val_epoch_acc/len(val_loader))
                              
    
    print(f'Epoch {e+0:03}: | Train Loss: {train_epoch_loss/len(train_loader):.5f} | Val Loss: {val_epoch_loss/len(val_loader):.5f} | Train Acc: {train_epoch_acc/len(train_loader):.3f}| Val Acc: {val_epoch_acc/len(val_loader):.3f}')

In [None]:
torch.save(model_nn.state_dict(), "vgg2_test_classifier.pt")