In [1]:
# imports

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import DataLoader, Subset

import os
import PIL.Image as Image
from tqdm import tqdm

In [2]:
# device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

In [3]:
# check data csv files

data_dir = '../input/petfinder-pawpularity-score'

def get_dataframe(data_dir, is_train=True):
    
    if is_train:
        image_dir = os.path.join(data_dir, 'train')
        file_path = os.path.join(data_dir, 'train.csv')
    else:
        image_dir = os.path.join(data_dir, 'test')
        file_path = os.path.join(data_dir, 'test.csv')
    
    df = pd.read_csv(file_path)

    # set image filepath
    df['img_file_path'] = df['Id'].apply(lambda x: os.path.join(image_dir, f'{x}.jpg'))
    
    return df

train_df = get_dataframe(data_dir, is_train=True)
test_df = get_dataframe(data_dir, is_train=False)

train_df.head()

In [4]:
# define custom dataset for pytorch

class PawpularityDataset(Dataset):
    def __init__(self, image_filepaths, covariates, targets, transform):
        self.image_filepaths = image_filepaths
        self.targets = targets
        self.transform = transform
        self.covaraites_all = covariates
    
    def __len__(self):
        return len(self.image_filepaths)

    def __getitem__(self, idx):
        image_filepath = self.image_filepaths[idx]
        covaraites_per_image = torch.tensor(self.covaraites_all[idx])
        target = torch.tensor(self.targets[idx])
        
        with open(image_filepath, 'rb') as f:
            image = Image.open(f)
            image = image.convert('RGB')
            image = self.transform(image)
        
        return image, covaraites_per_image, target

In [5]:
# dataloader

def load_data(data_dir, batch_size=32, is_train=True, use_subset=False):
    """
    return the train dataloader
    """
    
    # images and targets
    if is_train:
        df = get_dataframe(data_dir, is_train=True)
        images = df['img_file_path'].to_numpy()
        targets = df['Pawpularity'].to_numpy()
    else:
        df = get_dataframe(data_dir, is_train=False)
        images = df['img_file_path'].to_numpy()
        targets = np.zeros_like(images)
    
    # covariates [2:13]
    # But here for computational complexity we will only choose a few
    selected_columns = ['Accessory', 'Collage', 'Human']
    covariates = df.loc[:, selected_columns].to_numpy()
    
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5] * 3, std=[0.5] * 3)
    ])
    
    dataset = PawpularityDataset(image_filepaths=images, covariates=covariates, targets=targets, transform=transform)
    
    subset_ind = list(range(500))
    
    data_subset = Subset(dataset, subset_ind)

    # data loader
    data_loader = DataLoader(dataset=data_subset if use_subset else dataset, 
                                batch_size=batch_size,
                                shuffle=True)
    
    return data_loader


train_loader = load_data(data_dir, is_train=True, use_subset=False)
test_loader = load_data(data_dir, is_train=False)

In [6]:
# VGG 16 baseline model

class VGG(nn.Module):

    def __init__(self, features, num_classes=1000, init_weights=True): # change to binary classifier
        super(VGG, self).__init__()
        self.features = features
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes),
        )
        if init_weights:
            self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

def make_layers(cfg, batch_norm=False):
    layers = []
    in_channels = 3
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)


cfg_vgg16 = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']

# this is pretrained weights on ImageNet
model_path = {
    'vgg16_bn': '../input/vgg16-pretrained-models/vgg16_bn_pretrained.pth'
}

def _vgg(arch, cfg, batch_norm, pretrained, progress, **kwargs):
    if pretrained:
        kwargs['init_weights'] = False
    model = VGG(make_layers(cfg, batch_norm=batch_norm), **kwargs)
    if pretrained:
        state_dict = torch.load(model_path[arch])
        model.load_state_dict(state_dict)
    return model

def vgg16_bn(pretrained=False, progress=True, **kwargs):
    """ (CUSTOMIZED) VGG 16-layer model (configuration "D") with batch normalization
    `"Very Deep Convolutional Networks For Large-Scale Image Recognition" <https://arxiv.org/pdf/1409.1556.pdf>`_

    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
        progress (bool): If True, displays a progress bar of the download to stderr
    """
    return _vgg('vgg16_bn', cfg_vgg16, True, pretrained, progress, **kwargs)

In [9]:
# VGG 16 Hybrid Model

#########################
# The hybrid Conv2d layer
#########################

class Hybrid_Conv2d(nn.Module):
    """    
    (self, channel_in, channel_out, kernel_size, cov, stride=1, padding=0)
    kernel_size are 4d weights: (out_channel, in_channel, height, width)
    """    
    def __init__(self, channel_in, channel_out, kernel_size, num_cov, stride=1, padding=0):
        super(Hybrid_Conv2d, self).__init__()
        self.kernel_size = kernel_size # 4D weight (out_channel, in_channel, height, width)
        self.channel_in = channel_in
        self.channel_out = channel_out
        self.stride = stride
        self.padding = padding
        self.num_cov = num_cov # number of covariates

        self.W_0 = nn.Parameter(torch.randn(kernel_size), requires_grad=True)
        self.W = []
        for r in range(self.num_cov):
            W_r = nn.Parameter(torch.randn(kernel_size), requires_grad=True)
            self.W.append(W_r)        
        
        self._initialize_weights()
        
    # weight initialization
    def _initialize_weights(self):
        nn.init.kaiming_normal_(self.W_0, mode='fan_out', nonlinearity='relu')
        for r in range(self.num_cov):
            nn.init.kaiming_normal_(self.W[r], mode='fan_out', nonlinearity='relu')
 
    def forward(self, x, cov):
        # input x is of shape = (batchsize, channel=3, width, height) e.g. (32, 3, 224, 224)
        # cov: 2d tensor of shape (batchsize, r): 
        # r = number of covariates per image; 
        # bs = batchsize = number of images
        
        outputs = []
        for i in range(cov.shape[0]): # for every image x[i] there are r covariates
            res = torch.zeros_like(self.W_0)
            self.W_0 = self.W_0.to('cuda:0')
            for j in range(cov.shape[1]): # for every cov
                self.W[j] = self.W[j].to('cuda:0')
                res = res.to('cuda:0')
                res = res + ( torch.mul(self.W[j], cov[i][j]) ).to('cuda:0') # cov[i] is an array with shape (r,); cov[i][j] is either 1 or 0
            
            kernel = self.W_0 + res
            x_i = torch.unsqueeze(x[i], 0) # (3, 224, 224) -> (1, 3, 224, 224) for 4d weight shape matching
            out = F.conv2d(x_i, kernel, stride=self.stride, padding=self.padding)
            outputs.append(out) 
            
        outputs = torch.cat(outputs)
        return outputs

class HybridVGG16(nn.Module):
    """
    Hybrid Vgg16_bn network: A pretrained vgg16_bn with FIRST conv layer being a Hybrid_Conv2d layer
    """
    def __init__(self):
        super(HybridVGG16, self).__init__()
        # load pytorch vgg16 with pretrained weights
        vgg = vgg16_bn(pretrained=True)

        # set the three blocks you need for forward pass
        # remove the first conv layer + relu from the feature extractor
        self.features = vgg.features[1:]
        self.avgpool = vgg.avgpool
        self.classifier = vgg.classifier
        
        # hybrid layers
        self.hybrid_conv = Hybrid_Conv2d(3, 64, kernel_size=(64, 3, 3, 3), num_cov=3) 
        
    # Set your own forward pass
    def forward(self, x, cov):
        x = self.hybrid_conv(x, cov)
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [10]:
# Load pre-trained model (skip train)
# note: no need for loss function and optimizer anymore

# evaluate
def evaluate(test_loader, data_dir, model_name, device):
    
    if model_name == "baseline":
        model = vgg16_bn(pretrained=False).to(device) # this "False" is w.r.t. ImageNet pretrained weights
        checkpoint_path = '../input/vgg16-pretrained-models/vgg16_baseline_checkpoint.ckpt'
    elif model_name == "hybrid":
        model = HybridVGG16().to(device) 
        checkpoint_path = '../input/vgg16-pretrained-models/vgg16_hybrid.ckpt'
        
    if isinstance(model, VGG) or isinstance(model, HybridVGG16):
        num_ftrs = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(num_ftrs, 1)

    model.load_state_dict(torch.load(checkpoint_path, map_location=device))

    model.eval() 
    
    print('Making predictions...')
    
    test_pred = []    
    test_df = get_dataframe(data_dir, is_train=False)
    
    with torch.no_grad():
        for (test_images, covariates, test_labels) in tqdm(test_loader):
            test_images = test_images.to(device).float()
            covariates = covariates.to(device).float()
            test_labels = test_labels.to(device).float()
            # forward pass
            if isinstance(model, VGG):
                outputs = model(test_images)               # baseline vgg
            else:
                outputs = model(test_images, covariates)    # hybrid model takes covariate here
  
            test_pred.extend(outputs.cpu().detach().squeeze().numpy().tolist())
            

        # write to file
        output_df = pd.DataFrame({"Id": test_df['Id'], "Pawpularity": test_pred})
        
        # check output
        # output_df = pd.read_csv('submission.csv')

        return output_df


model_name = "hybrid"
output_df = evaluate(test_loader, data_dir, model_name, device) 

In [None]:
output_df.to_csv('submission.csv', index = False)