In [0]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import models, transforms, datasets
import matplotlib.pyplot as plt
import time
import os
import copy
plt.ion()
from google.colab import drive
drive.mount('/content/drive')
from PIL import Image

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!pip install git+https://github.com/aleju/imgaug
from imgaug import augmenters as iaa
import imgaug as ia

# Data Augmentation
- apply transforms to flip, crop, blur, scale, rotate, shear images and change hue / saturation

In [0]:
class ImgAugTransform:
  def __init__(self):
    self.aug = iaa.Sequential([
        iaa.Fliplr(0.5),
        iaa.Crop(percent=(0, 0.1)),
        iaa.Sometimes(0.5, iaa.GaussianBlur(sigma = (0, 0.5))),
        iaa.AdditiveGaussianNoise(loc = 0, scale = (0.0, 0.05*255), per_channel = 0.5),
        iaa.Multiply((0.8, 1.2), per_channel = 0.2),
        iaa.Affine(
        scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},
        translate_percent={"x": (-0.2, 0.2), "y": (-0.2, 0.2)},
        rotate=(-25, 25),
        shear=(-8, 8)
    ),
        iaa.AddToHueAndSaturation(value = (-10, 10), per_channel = True),
        iaa.ContrastNormalization((0.75, 1.5))
    ], random_order = True)
      
  def __call__(self, img):
    img = np.array(img)
    img = self.aug.augment_image(img)
    return Image.fromarray(img)

# Building Dataloaders
- transforms + resizing to 512 (no conceptual limit; can be increased with more memory)
- center-cropping didn't improve results but this may depend on the images
- folder structure should be train / val and then folders with the class names

In [0]:
data_transforms = {
    'train':transforms.Compose([
        transforms.Resize((512, 512)),
        ImgAugTransform(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val':transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

## you may need to change this to your data directory
data_dir = 'drive/My Drive/couples'

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                      data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x],
                                          shuffle = True, batch_size = 48,
                                          num_workers = 4) for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Visualizing Batch
- takes one batch (with data augmentation) from the dataloader and visualizes it

In [0]:
def imshow(inp, title = None):
  inp = inp.numpy().transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std*inp + mean
  inp = np.clip(inp, 0, 1)
  plt.imshow(inp)
  if title is not None:
    plt.title(title)
  plt.pause(0.001)
  
inputs,classes = next(iter(dataloaders['train']))
out = torchvision.utils.make_grid(inputs)
imshow(out, title = [class_names[x] for x in classes])

# Mixed Precision Training
- uses less memory, is faster and leads to slightly better generalization

In [0]:
!pip install git+https://github.com/NVIDIA/apex
from apex import amp

# Early Stopping
- stops training if validation loss did not improve for the last three epochs
- regularization technique

In [0]:
## https://github.com/Bjarten/early-stopping-pytorch

class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience = 7, verbose = False):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = 0

    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        #torch.save(model.state_dict(), 'drive/My Drive/checkpoint.pt')
        self.val_loss_min = val_loss
        
early_stopping = EarlyStopping(patience=3, verbose=True)

# Training Loop
- keeps track of loss and accuracy
- saves the model with the highest validation accuracy

In [0]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
  since = time.time()
  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0
  
  for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    print('-'*10)
    
    for phase in ['train', 'val']:
      if phase == 'train':
        scheduler.step()
        model.train()
      else:
        model.eval()
        
      running_loss = 0.0
      running_corrects = 0
      for inputs, labels in dataloaders[phase]:
        inputs=inputs.to(device)
        labels=labels.to(device)
        optimizer.zero_grad()
        
        with torch.set_grad_enabled(phase == 'train'):
          outputs = model(inputs)
          _, preds = torch.max(outputs, 1)
          loss = criterion(outputs,labels)
          
          if phase == 'train':
            with amp.scale_loss(loss, optimizer) as scaled_loss:
              scaled_loss.backward()
            optimizer.step()
            
        running_loss += loss.item()*inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
      epoch_loss = running_loss / dataset_sizes[phase]
      epoch_acc = running_corrects.double() / dataset_sizes[phase]
      print('{} Loss: {:.4f} Acc: {:.4f}'.format(
          phase, epoch_loss, epoch_acc))
      
      if phase == 'val' and epoch_acc >= best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())
      if phase == 'val':
        early_stopping(epoch_loss, model)
        
    if early_stopping.early_stop:
      print("Early stopping")
      break
    print()
    
  time_elapsed = time.time() - since
  print('Training complete in {:.0f}m {:.0f}s'.format(
      time_elapsed // 60, time_elapsed % 60))
  print('Best val Acc: {:4f}'.format(best_acc))
  model.load_state_dict(best_model_wts)
  return model

# Visualize Model Predictions
- grabs a few validation images and displays them together with their predicted class

In [0]:
def visualize_model(model, num_images = 6):
  was_training = model.training
  model.eval()
  images_so_far = 0
  fig = plt.figure()
  
  with torch.no_grad():
    for i, (inputs, labels) in enumerate(dataloaders['val']):
      inputs = inputs.to(device)
      labels = labels.to(device)
      outputs = model(inputs)
      _, preds = torch.max(outputs, 1)
      
      for j in range(inputs.size()[0]):
        images_so_far += 1
        ax = plt.subplot(num_images // 2, 2, images_so_far)
        ax.axis('off')
        ax.set_title('predicted: {}'.format(class_names[preds[j]]))
        imshow(inputs.cpu().data[j])
        if images_so_far == num_images:
          model.train(mode = was_training)
          return model.train(mode = was_training)

# Spatial Pyramid Pooling Layer & Power Mean Transformation
- creates fixed sized representation from convolutional outputs of variable sized images
- uses different filters to represent bins with different features

In [0]:
## SPP from https://github.com/yueruchen/sppnet-pytorch/blob/master/spp_layer.py

import math

def spatial_pyramid_pool(previous_conv, num_sample, previous_conv_size, out_pool_size):
    '''
    previous_conv: a tensor vector of previous convolution layer
    num_sample: an int number of image in the batch
    previous_conv_size: an int vector [height, width] of the matrix features size of previous convolution layer
    out_pool_size: a int vector of expected output size of max pooling layer
    
    returns: a tensor vector with shape [1 x n] is the concentration of multi-level pooling
    '''    
    
    for i in range(len(out_pool_size)):
        h_wid = int(math.ceil(previous_conv_size[0] / out_pool_size[i]))
        w_wid = int(math.ceil(previous_conv_size[1] / out_pool_size[i]))
        h_pad = (h_wid*out_pool_size[i] - previous_conv_size[0] + 1)//2
        w_pad = (w_wid*out_pool_size[i] - previous_conv_size[1] + 1)//2
        
        maxpool = nn.MaxPool2d((h_wid, w_wid), stride = (h_wid, w_wid), padding = (h_pad, w_pad))
        #avgpool = nn.AvgPool2d((h_wid, w_wid), stride = (h_wid, w_wid), padding = (h_pad, w_pad))
        x = maxpool(previous_conv)
        ## tried to also concat average pooling but did not increase model performance, maybe dependent on application
        #y = avgpool(previous_conv)
        #z = torch.cat((x, y), dim = -1)
        
        if(i == 0):
            spp = x.view(num_sample, -1)
        else:
            spp = torch.cat((spp, x.view(num_sample, -1)), 1)
            
    return spp

class PMT(nn.Module):
  def __init__(self):
    super(PMT, self).__init__()
    
  def forward(self, x):
    ## tried to apply PMT prior to first convolution, did not increase model performance
    #x_1 = torch.sign(x)*torch.log(1 + abs(x))
    #x_2 = torch.sign(x)*(torch.log(1 + abs(x)))**2
    #x = torch.cat((x, x_1, x_2), dim = 3)
    return x
  
class SPP(nn.Module):
  def __init__(self):
    super(SPP, self).__init__()
    
    ## features incoming from ResNet-34 (after SPP/PMT)
    self.lin1 = nn.Linear(2*43520, 100)
    
    self.relu = nn.ReLU()
    self.bn1 = nn.BatchNorm1d(100)
    self.dp1 = nn.Dropout(0.5)
    self.lin2 = nn.Linear(100, 2)
    
  def forward(self, x):
    # SPP
    x = spatial_pyramid_pool(x, x.shape[0], [x.shape[2], x.shape[3]], [8, 4, 2, 1])
    
    # PMT
    x_1 = torch.sign(x)*torch.log(1 + abs(x))
    x_2 = torch.sign(x)*(torch.log(1 + abs(x)))**2
    x = torch.cat((x_1, x_2), dim = 1)
    
    # fully connected classification part
    x = self.lin1(x)
    x = self.bn1(self.relu(x))
    
    #1
    x1 = self.lin2(self.dp1(x))
    #2
    x2 = self.lin2(self.dp1(x))
    #3
    x3 = self.lin2(self.dp1(x))
    #4
    x4 = self.lin2(self.dp1(x))
    #5
    x5 = self.lin2(self.dp1(x))
    #6
    x6 = self.lin2(self.dp1(x))
    #7
    x7 = self.lin2(self.dp1(x))
    #8
    x8 = self.lin2(self.dp1(x))
    
    x = torch.mean(torch.stack([x1, x2, x3, x4, x5, x6, x7, x8]), dim = 0)
    
    return x

# Training the DeepConnection Model
- best results with a pretrained ResNet-34 model
- exchange the AdaptiveAvgPool layer with a spatial pyramid pooling layer
- add power mean transformation after SPP
- add some linear layers (initialized with Xavier initialization) for classification

In [0]:
# initialization for linear layers
def init_weights(m):
    if type(m) == nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight)
        m.bias.data.fill_(0.)

import warnings
warnings.filterwarnings("ignore")

model = models.resnet34(pretrained = True)

# freeze ResNet-34 layers
for param in model.parameters():
  param.require_grad = False  
  

# DeepConnection
class PMT_SPP(nn.Module):
  def __init__(self):
    super(PMT_SPP, self).__init__()
    self.fc = PMT()
    self.convs = nn.Sequential(*list(model.children())[:-2])
    self.fc2 = SPP()
    
  def forward(self, x):
    x = self.fc(x)
    x = self.convs(x)
    x = self.fc2(x)
    
    return x
  
  
# instantiate model, put on GPU and initialize linear layers 
deep_connection = PMT_SPP().cuda()
deep_connection.fc2.apply(init_weights)
  
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(deep_connection.parameters(),lr = 0.0001, weight_decay = 1e-1)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer_ft, 15)
model_ft, optimizer_ft = amp.initialize(deep_connection, optimizer_ft, opt_level = "O1")

model_ft = train_model(model_ft, criterion, optimizer_ft, scheduler,
                    num_epochs = 16)

# Hook Function to get Inputs / Outputs of Intermediate Layers
- exchange 'input' to 'output.detach()' to save the output of intermediate layers

In [0]:
activation = {}

def get_activation(name):
    def hook(model, input, output):
        activation[name] = input
    return hook

## save all inputs to layers subsequent to convolutional layers
for name, layer in model_ft.fc2.named_modules():
    layer.register_forward_hook(get_activation(name))

# Test Model on a Single New Image
- hooks will store all designated inputs /  output once you run this

In [0]:
# need to change that to your directory
example = 'drive/My Drive/couples/couple-2436263_960_720.jpg'

imsize = (512,512)
loader = transforms.Compose([transforms.Scale(imsize), 
                             transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

def image_loader(image_name):
    """load image, returns cuda tensor"""
    image = Image.open(image_name)
    image = loader(image).float()
    image = image.unsqueeze(0)
    return image.cuda()
  
example_img = image_loader(example)
out = model_ft(example_img)
_, preds = torch.max(out, 1)

print('predicted: {}'.format(class_names[preds]))
imshow(example_img.cpu().data[0])

# Create State of Activation Plots
- plots class-specific weights of final linear layer (100, 2) as scatter plot, which are constant for a trained model
- colors points according to the input value of a given image stored in the hooks
- multiplication of weights and input values (with subsequent summation) yields class probabilities

In [0]:
ws2 = model_ft.fc2.lin2.weight.data.detach().cpu().numpy()

plt.style.use('ggplot')

cm = plt.cm.get_cmap('YlOrRd')

sc = plt.scatter(ws2.T[:, 0], ws2.T[:, 1], c = activation['lin2'][0].cpu().detach().numpy()[0], vmax = 1.5, vmin = -1,
                 cmap = cm, alpha = 0.8, s = 30, edgecolor = 'gray')
plt.xlabel('Happy Weight')
plt.ylabel('Unhappy Weight')
plt.title('Activation of Neurons in Last Layer')
cb = plt.colorbar(sc)
cb.set_label('Incoming Activation Values')

plt.show()

# Visualizes Exemplary Model Predictions and Model Architecture

In [0]:
visualize_model(model_ft)

# Prepare Model for GradCAM
- need to separate convolutional part from rest (because we want the gradients at the end of the convolutional part)

In [0]:
modulelist_conv = nn.Sequential(*list(model_ft.children())[:-1]).cuda().half()
modulelist_fc = nn.Sequential(*list(model_ft.children())[-1:]).cuda().half()
modulelist_fc

# GradCAM
- function to create heatmap based on convolutional gradients, indicating saliency

In [0]:
## from https://github.com/eclique/pytorch-gradcam/blob/master/gradcam.ipynb

def GradCAM(img, c, features_fn, classifier_fn):
    feats = modulelist_conv(img.cuda().half())
    feats = feats.cuda()
    _, N, H, W = feats.size()
    
    out = modulelist_fc(feats)
    c_score = out[0, c]
    grads = torch.autograd.grad(c_score, feats)
    w = grads[0][0].mean(-1).mean(-1)
    
    sal = torch.matmul(w, feats.view(N, H*W))
    sal = sal.view(H, W).cpu().detach().numpy()
    sal = np.maximum(sal, 0)
    
    return sal

# Create GradCAM Heatmaps
- grab an image and plot it plus overlaying GradCAM heatmaps for both classes

In [0]:
model_ft.eval()

read_tensor = transforms.Compose([
    lambda x: Image.open(x),
    transforms.Resize((512, 512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                          std=[0.229, 0.224, 0.225]),
    lambda x: torch.unsqueeze(x, 0)
])

def get_class_name(c):
  if c == 0:
    return 'Happy Couple'
  if c == 1:
    return 'Unhappy Couple'

## change that to your working directory
direc = 'drive/My Drive/couples/couple-2436263_960_720.jpg'

dil = read_tensor(direc)

# get prediction probabilities and corresponding classes
pp, cc = torch.topk(nn.Softmax(dim = 1)(model_ft(dil.cuda())), 2)

plt.figure(figsize = (15, 5))

for i, (p, c) in enumerate(zip(pp[0], cc[0])):
    plt.subplot(1, 2, i+1)
    
    sal = GradCAM(dil.cuda(), int(c), modulelist_conv, modulelist_fc)
    img = Image.open(direc)
    sal = Image.fromarray(sal)
    sal = sal.resize(img.size, resample = Image.LANCZOS)

    plt.title('{}: {:.1f}%'.format(get_class_name(c), 100*float(p)))
    plt.axis('off')
    plt.imshow(img)
    plt.imshow(np.array(sal), alpha = 0.5, cmap = 'jet')
    
plt.show()