In [1]:
import torch
import numpy as np

def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    print(f'Random seed {seed} has been set.')
set_seed(2021)

In [2]:
cfg = {
    # data
    'Kfold': 5,
    'batch_size' : 64,
    'image_size': 224,
    'crop_pct': 0.875,
    'interpolation': 'bicubic',
    # model
    'name': 'resnetv2_50x1_bit_distilled',
    'precision': 32,
    'drop_path_rate': 0.0,
    'drop_rate': 0.0
}
from psutil import *
num_workers = cpu_count()
num_workers

In [3]:
import cv2
from PIL import Image

class TestPawpularDataset:
    def __init__(self, image_paths, augmentations):
        self.image_paths = image_paths
        self.augmentations = augmentations
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])
        
        if self.augmentations is not None:
            image = self.augmentations(image)
        
        return image
    
class PawpularDataset:
    def __init__(self, image_paths, targets, augmentations):
        self.targets = targets
        self.image_paths = image_paths
        self.augmentations = augmentations
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])
        
        if self.augmentations is not None:
            image = self.augmentations(image)
        
        target = (self.targets[idx]).reshape(-1).astype(np.float32)
        
        return image, target

In [4]:
import sys
sys.path.append("../input/timmasters/")
import timm
import torchmetrics
import torch.nn as nn
import pytorch_lightning as pl

class TestPawpularModel(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg

        self.backbone = timm.create_model(
            self.cfg['name'], pretrained=False,
            num_classes=0, in_chans=3,
            drop_path_rate=self.cfg['drop_path_rate'],
            drop_rate=self.cfg['drop_rate']
        )
        num_features = self.backbone.num_features

        # mean estimator head
        self.fc_mean = nn.Sequential(
            nn.Linear(num_features, 1),
            nn.Sigmoid()
        )

        # var estimator head
        self.fc_var = nn.Sequential(
            nn.Linear(num_features, 1),
            nn.Sigmoid()
        )
        
    def forward(self, images):
        x = self.backbone(images)
        mean = self.fc_mean(x)
        var = self.fc_var(x)
        return mean, var

In [5]:
from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD
from timm.data.transforms_factory import transforms_imagenet_eval
from torchvision.transforms import transforms

test_aug = transforms_imagenet_eval(
                                    img_size=cfg['image_size'],
                                    crop_pct=cfg['crop_pct'],
                                    interpolation=cfg['interpolation'],
                                    use_prefetcher=False,
                                    mean=IMAGENET_INCEPTION_MEAN,
                                    std=IMAGENET_INCEPTION_STD)
test_aug.transforms[1] = transforms.RandomCrop(size=(cfg['image_size'], cfg['image_size']))
test_aug

In [6]:
from torch.utils.data import DataLoader
import pandas as pd
def get_test_dataset():
    df_test = pd.read_csv("../input/petfinder-pawpularity-score/test.csv")
    test_img_paths = [f"../input/petfinder-pawpularity-score/test/{x}.jpg" for x in df_test["Id"].values]
    test_dataset = TestPawpularDataset(
        augmentations=test_aug,
        image_paths=test_img_paths)
    return test_dataset

def get_kfold_dataset(fold):
    df = pd.read_csv("../input/pawpularitykfold/train_%dfolds.csv"%cfg['Kfold'])
    df_train = df[df.kfold != fold].reset_index(drop=True)
    df_valid = df[df.kfold == fold].reset_index(drop=True)

    train_img_paths = [f"../input/petfinder-pawpularity-score/train/{x}.jpg" for x in df_train["Id"].values]
    valid_img_paths = [f"../input/petfinder-pawpularity-score/train/{x}.jpg" for x in df_valid["Id"].values]

    valid_dataset = PawpularDataset(
        image_paths=valid_img_paths,
        targets=df_valid.Pawpularity.values,
        augmentations=test_aug,
    )
    
    test_dataset = TestPawpularDataset(
        image_paths=valid_img_paths,
        augmentations=test_aug,
    )
    return valid_dataset, test_dataset


def get_test_dataloader(test_dataset):
    test_dataset = DataLoader(test_dataset, batch_size=cfg['batch_size'], shuffle=False, num_workers=num_workers)
    return test_dataset

In [7]:
def average_good_preds(preds, preds_var, threshold):
    preds = torch.tensor(preds)
    preds_vars = torch.tensor(preds_var)
    preds_means = torch.mean(preds, dim=0, keepdims=True)
    within_dist_score = 1/(torch.sqrt(2*np.pi*preds_vars)) * torch.exp(-((preds-preds_means)/100)**2/(2*preds_vars))
    # exclude the outliers
    preds_topk = torch.topk(within_dist_score, int(len(preds)*threshold), axis=0)[0]
    topk_threshold = torch.min(preds_topk, axis=0, keepdims=True)[0]                    
    preds_weight = (within_dist_score > topk_threshold).float()
    preds_weight /= torch.sum(preds_weight, dim=0)
    good_preds = torch.sum(preds * preds_weight, dim=0)
    return good_preds.numpy(), within_dist_score.numpy()

def get_preds(model, test_loader):
    preds = []
    preds_var = []
    trainer = pl.Trainer(gpus=1, deterministic=True)
    model_preds = trainer.predict(model, test_loader)
    for batch_preds in model_preds:
        means, vars = batch_preds
        means = means * 100
        preds += means.detach().cpu().numpy().flatten().tolist()
        preds_var += vars.detach().cpu().numpy().flatten().tolist()
        
    preds = np.array(preds)
    preds_var = np.array(preds_var)
    return preds, preds_var

In [None]:
import os

test_datadoader = get_test_dataloader(get_test_dataset())

checkpoint_path = '../input/model-checkpoints/'
super_final_preds = []
super_final_preds_var = []
for checkpoint in os.listdir(checkpoint_path):
    model = TestPawpularModel.load_from_checkpoint(
            cfg=cfg,
            checkpoint_path=checkpoint_path+checkpoint
    )
    for _ in range(20): # in order to try multiple crops
        final_preds, final_preds_var = get_preds(model, test_datadoader)
        super_final_preds += [final_preds]
        super_final_preds_var += [final_preds_var]
      
good_preds, scores = average_good_preds(super_final_preds, super_final_preds_var, threshold=0.95)
df_test = pd.read_csv("../input/petfinder-pawpularity-score/test.csv")
df_test["Pawpularity"] = good_preds
df_test = df_test[["Id", "Pawpularity"]]
df_test.to_csv("submission.csv", index=False)
df_test.tail()

In [None]:
sample = 6
import matplotlib.pyplot as plt
plt.hist(np.array(super_final_preds)[:, sample], bins=20)
plt.xlabel('Pawpularity scores of random crops for sample=%d'%sample)
plt.show()
plt.hist(scores[:, sample], bins=20)
plt.xlabel('score of sample=%d'%sample)
plt.show()

In [None]:
# checkpoints in fold 1 to 5 order
checkpoint_path = '../input/model-checkpoints/'
checkpoints = ['8a8371422d81dcf484aacc5a25084e3f',
               'dabb4442e85f1474442fb872f7b11049',
               'c94895f37dd8075139544b46a84567ca',
               '408d0061329c33528e4219ca0ec855fd',
               '5ba4c2a62f3f9adaf2d17949e617fc6d']

fold = 0
checkpoint = checkpoints[fold]
valid_dataset, test_dataset = get_kfold_dataset(fold)
test_loader = get_test_dataloader(test_dataset)

hyperparameters = [1.0, 0.95]
hyperparameters_rmse = []
for hyperparameter in hyperparameters:
    print('hyperparmeter: ',hyperparameter)
    super_final_preds = []
    super_final_preds_var = []
    model = TestPawpularModel.load_from_checkpoint(cfg=cfg,
            checkpoint_path=checkpoint_path+checkpoint)
    iters_num = cfg['Kfold']*20
    for i in range(iters_num): # in order to try multiple crops
        if i % 5 == 0:
            print('%d / %d'%(i / iters_num))
        final_preds, final_preds_var = get_preds(model, test_loader)
        super_final_preds += [final_preds]
        super_final_preds_var += [final_preds_var]

    good_preds, scores = average_good_preds(super_final_preds, super_final_preds_var, hyperparameter)
    mse = 0
    for i, p in enumerate(good_preds):
        x, pawpularity = valid_dataset[i]
        mse += (pawpularity - p)**2
    RMSE = np.sqrt(mse / len(good_preds))
    print(RMSE)
        
    hyperparameters_rmse.append(RMSE / len(checkpoints))
    
plt.plot(hyperparameters, hyperparameters_rmse)
plt.xlabel('Random crop repeats')
plt.ylabel('5Fold RMSE')
plt.title('Test time hypyerparmeter tunning')

In [None]:
sample = 20
plt.hist(np.array(super_final_preds)[:, sample], bins=20)
plt.xlabel('Pawpularity scores of random crops for sample=%d'%sample)
plt.show()
plt.hist(scores[:, sample], bins=20)
plt.xlabel('score of sample=%d'%sample)
plt.show()