In [34]:
#   Imports
import os
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from torch import nn
from torchvision import transforms, utils, datasets, models
from torchvision.models import ResNet50_Weights
from torchvision.io import read_image
from sklearn.metrics import multilabel_confusion_matrix
import math
import wandb
import gc

In [3]:
#   Variables
device = ("cpu") # Use GPU or CPU for training

In [12]:
wandb.init(
    # set the wandb project where this run will be logged
    project="my-awesome-project"
)

In [29]:
# Create the new model
class CaricatureClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        self.model_wo_fc = nn.Sequential(*list(self.model.children())[:-1])

        self.cheekbones = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.cheeks = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.chin = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.ears = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.eyebrows = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.eyelids = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.eyes = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.facial_hair = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.forehead = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.hair = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.head = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.lips = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.mouth = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.neck = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.nose = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.skin = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.teeth = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        self.upper_lip = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.model_wo_fc(x)
        x = torch.flatten(x, 1)

        return [
            self.cheekbones(x),
            self.cheeks(x),
            self.chin(x),
            self.ears(x),
            self.eyebrows(x),
            self.eyelids(x),
            self.eyes(x),
            self.facial_hair(x),
            self.forehead(x),
            self.hair(x),
            self.head(x),
            self.lips(x),
            self.mouth(x),
            self.neck(x),
            self.nose(x),
            self.skin(x),
            self.teeth(x),
            self.upper_lip(x)
        ]

In [30]:
#   Create Dataloader
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
])

class CaricatureDataset(Dataset):
    def __init__(self, labels_file, root_dir, transform=None):
        self.labels = pd.read_csv(labels_file)
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        # This will read the first 5 images per person, this can probably be tweaked to include all images
        class_index = math.floor(idx/5)
        image_num = idx%5
        self_car = self.labels.iloc[class_index, 0].strip() + "_caricature"
        img_person = os.path.join(self.root_dir, self_car)
        labels = self.labels.iloc[class_index, 1].replace('[', '').replace(']', '').split('.')[:-1]
        labels = np.array([int(label) for label in labels])
        img = read_image(os.path.join(img_person, os.listdir(img_person)[image_num]))
        if img.shape[0] == 1:
            img = np.repeat(img, 3, axis=0)
        sample = img, labels
        return sample

In [31]:
#   Load the data
dataset = CaricatureDataset(labels_file='binary_labels.txt', root_dir='C:\\Users\\Jayam\\OneDrive\\Desktop\\School Work\\Grad school\\Research Work\\for Jay\\ourcar\\', transform=transform)
#   Split the data into train and test
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
#   Create the dataloaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

In [32]:
model = CaricatureClassifier()
loss_fn = torch.nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

def train_one_epoch(epoch_index):
    running_loss = 0.
    last_loss = 0.

    # Here, we use enumerate(training_loader) instead of
    # iter(training_loader) so that we can track the batch
    # index and do some intra-epoch reporting
    for i, data in enumerate(train_loader):
        # Every data instance is an input + label pair
        inputs, labels = data
        del data
        inputs, labels = inputs.float(), labels.float()
        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        output = model(inputs)
        del inputs
        output = torch.stack(output, dim=1)
        output = output.squeeze(2)
        # Compute the loss and its gradients
        loss = loss_fn(output, labels)
        del output
        del labels
        loss.backward()

        gc.collect()
        torch.cuda.empty_cache()
        # Adjust learning weights
        optimizer.step()

        # Gather data and report
        running_loss += loss.item()
        if i % 10 == 9:
            last_loss = running_loss / 10 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            #tb_x = epoch_index * len(train_loader) + i + 1
            #tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            #print("Loss/train", last_loss, tb_x)
            running_loss = 0.

    return last_loss

In [33]:
EPOCHS = 10
epoch_number = 0

for epoch in range(EPOCHS):
    print('EPOCH {}:'.format(epoch_number + 1))

    # Make sure gradient tracking is on, and do a pass over the data
    model.train(True)
    avg_loss = train_one_epoch(epoch_number)

    # We don't need gradients on to do reporting
    model.train(False)

    running_vloss = 0.0
    for i, vdata in enumerate(test_loader):
        vinputs, vlabels = vdata
        del vdata
        vinputs, vlabels = vinputs.float(), vlabels.float()
        voutputs = model(vinputs)
        del vinputs
        voutputs = torch.stack(voutputs, dim=1)
        voutputs = voutputs.squeeze(2)
        vloss = loss_fn(voutputs, vlabels)
        del vlabels
        del voutputs
        running_vloss += vloss

        gc.collect()
        torch.cuda.empty_cache()
        
    avg_vloss = running_vloss / (i + 1)
    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

    # Log the running loss averaged per batch
    # for both training and validation
#     writer.add_scalars('Training vs. Validation Loss',
#                     { 'Training' : avg_loss, 'Validation' : avg_vloss },
#                     epoch_number + 1)
#     writer.flush()

    #wandb.log({ 'Training' : avg_loss, 'Validation' : avg_vloss })

    # Track best performance, and save the model's state
#     if avg_vloss < best_vloss:
#         best_vloss = avg_vloss
#         model_path = 'model_{}_{}'.format(timestamp, epoch_number)
#         torch.save(model.state_dict(), model_path)

    epoch_number += 1

EPOCH 1:
  batch 10 loss: 0.630774837732315
  batch 20 loss: 0.5431512057781219
  batch 30 loss: 0.49928970336914064
  batch 40 loss: 0.5066579073667526
LOSS train 0.5066579073667526 valid 0.4768437147140503
EPOCH 2:
  batch 10 loss: 0.42252751588821413
  batch 20 loss: 0.4167929828166962
  batch 30 loss: 0.3955605447292328
  batch 40 loss: 0.43686647415161134
LOSS train 0.43686647415161134 valid 0.45564791560173035
EPOCH 3:
  batch 10 loss: 0.3752384901046753
  batch 20 loss: 0.34369239807128904
  batch 30 loss: 0.34292306900024416
  batch 40 loss: 0.3685119181871414
LOSS train 0.3685119181871414 valid 0.4269651174545288
EPOCH 4:
  batch 10 loss: 0.2868205100297928
  batch 20 loss: 0.2739515021443367
  batch 30 loss: 0.28700122237205505
  batch 40 loss: 0.2815452665090561
LOSS train 0.2815452665090561 valid 0.43192604184150696
EPOCH 5:
  batch 10 loss: 0.24500191509723662
  batch 20 loss: 0.2728478744626045
  batch 30 loss: 0.23853281289339065
  batch 40 loss: 0.23040230125188826
LOSS

In [19]:
wandb.finish()

In [35]:
def testing(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    all_true_labels = np.array([])
    all_predictions = np.array([])
    all_true_labels_itemized = np.zeros([1,18])
    all_predictions_itemized = np.zeros([1,18])
    with torch.no_grad():
        print(len(test_loader))
        for data, target in test_loader:
            data, target = data.float(), target.float()
            output = model(data)
            outputs = torch.stack(output, dim=1)
            predictions = np.round(outputs.detach().cpu().numpy())
            labels = target.detach().cpu().numpy()
            all_true_labels = np.append(all_true_labels, labels)
            all_predictions = np.append(all_predictions, predictions)
            predictions = predictions.transpose()
            all_true_labels_itemized = np.concatenate((all_true_labels_itemized, labels), axis=0)
            all_predictions_itemized = np.concatenate((all_predictions_itemized, predictions[0].T), axis=0)
    for index, prediction in enumerate(all_predictions):
        if prediction == all_true_labels[index]:
            correct += 1
    print('Accuracy: ', correct/len(all_predictions))
    print('Confusion Matrix: ', multilabel_confusion_matrix(all_true_labels, all_predictions))
    return all_true_labels_itemized, all_predictions_itemized

In [36]:
labels, predictions = testing(model, device, test_loader)

11
Accuracy:  0.7859078590785907
Confusion Matrix:  [[[153  90]
  [ 68 427]]

 [[427  68]
  [ 90 153]]]
