In [None]:
import sys
sys.path.append('../input/timm-pytorch-image-models/pytorch-image-models-master')
import timm

In [None]:
timm.__version__

In [None]:
import numpy as np
import pandas as pd
import cv2

import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.nn import functional as F
from torch.optim.lr_scheduler import CosineAnnealingLR

import torchvision
from torchvision import transforms as T
from torchvision.io import read_image

from tqdm.notebook import tqdm as tqdm

import pytorch_lightning as pl
from pytorch_lightning.core.lightning import LightningModule



In [None]:
class args:
  #Overall Args

  pretrained = False
  imagesize = 224 
  model_name = 'swin_small_patch4_window7_224' #'cait_m36_384'
  num_classes = 1
  num_workers = 4
  model_shape = 768 #1536 for swin large 1792 for efficientnet b4 768 for cait-m-36
  folds = 10

  #Training Args
  train_batch_size = 2
  val_batch_size = 2
  test_batch_size = 2
  weight_decay = 1e-6
  
  #Optimizer and Scheduler args
  loss = 'nn.BCEWithLogitsLoss'
  lr = 1e-4
  warmup_epochs = 0
  weight_decay = 1e-6
  eta_min = 0.0000001
  n_accumulate = 1
  T_0 = 25
  T_max = 100

  #Callback args
  min_delta = 0.0
  patience = 8


#Dataloader Args
loaderargs = {'num_workers' : args.num_workers, 'pin_memory': False, 'drop_last': False}
device = torch.device("cuda:0")


In [None]:
# Load our test data
tstdf = pd.read_csv('../input/petfinder-pawpularity-score/test.csv')
tstdf['filename'] = '../input/petfinder-pawpularity-score/test/' + tstdf["Id"] + '.jpg'

feature_cols = [col for col in tstdf.columns if col not in ['Id', 'Pawpularity', 'filename']]

In [None]:
class PawpularityDataset(torch.utils.data.Dataset):
    def __init__(self, df, test = False):
        self.df = df
        self.file_names = df['filename'].values
        self._transform = T.Resize(size= (args.imagesize, args.imagesize))
        self.test = test
        self.meta = df[feature_cols].values

        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        img_path = self.file_names[index]
        img = read_image(img_path)
        img = self._transform(img)

        meta = self.meta[index, :]
        
        if self.test:
          target = 0
            
        return img, target, meta

In [None]:
class PawpularityDataModule(pl.LightningDataModule):
    def __init__(self, traindf, valdf, tstdf, args, loaderargs):
        super().__init__()
        self._train_df = traindf
        self._val_df = valdf
        self._tst_df = tstdf
        self.args = args
        self.loaderargs = loaderargs


    def __create_dataset(self, train=True):
        if train == 'train':
          return PawpularityDataset(self._train_df)
        elif train == 'val':
          return PawpularityDataset(self._val_df)
        else:
          return PawpularityDataset(self._tst_df, test = True)


    def train_dataloader(self):
        pawpularity_train = self.__create_dataset("train")
        return DataLoader(pawpularity_train, **self.loaderargs, batch_size=self.args.train_batch_size)

    def val_dataloader(self):
        pawpularity_val = self.__create_dataset("val")
        return DataLoader(pawpularity_val, **self.loaderargs, batch_size=self.args.val_batch_size)
    
    def test_dataloader(self):
        pawpularity_test = self.__create_dataset("test")
        return DataLoader(pawpularity_test, **self.loaderargs, batch_size=self.args.test_batch_size)

In [None]:
def get_default_transforms():
    transform = {
        "train": T.Compose(
            [
                T.RandomHorizontalFlip(),
                T.RandomVerticalFlip(),
                T.RandomAffine(15, translate=(0.1, 0.1), scale=(0.9, 1.1)),
                T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
                T.ConvertImageDtype(torch.float),
                T.Normalize(mean = (0.485, 0.456, 0.406), 
                            std = (0.229, 0.224, 0.225))
                
            ]
        ),
        "val": T.Compose(
            [
                T.ConvertImageDtype(torch.float),
                T.Normalize(mean = (0.485, 0.456, 0.406), 
                            std = (0.229, 0.224, 0.225))
            ]
        ),
    }
    return transform

In [None]:
class DenseModel(LightningModule):
    def __init__(self):
        super().__init__()
        self.args = args
        self.model = timm.create_model(args.model_name, 
                                       pretrained=args.pretrained, 
                                       num_classes=0,
                                       in_chans = 3)
        self.fc1 = nn.Linear(args.model_shape, 128)
        self.fc2 = nn.Linear(140, 64)
        self.fc3 = nn.Linear(64, args.num_classes)
        self.dropout = nn.Dropout(p=0.1)
        self._criterion = eval(self.args.loss)()
        self.transform = get_default_transforms()

    def forward(self, features, meta):
        features = self.model(features)                 
        features = self.fc1(features)
        features = torch.cat([features, meta], dim=1)
        features = self.fc2(features)
        output = self.fc3(features)           
        return output

    def __share_step(self, batch, mode):
        images, labels, meta = batch
        labels = labels.float() / 100.0
        meta = meta.float()
        images = self.transform[mode](images)

        pred = logits.sigmoid().detach().cpu() * 100.
        labels = labels.detach().cpu() * 100.
        
        return loss, pred, labels

    def training_step(self, batch, batch_idx):
        loss, pred, labels = self.__share_step(batch, 'train')
        self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
        return {'loss': loss, 'pred': pred, 'labels': labels}



    def validation_step(self, batch, batch_idx):
        loss, pred, labels = self.__share_step(batch, 'val')
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        return {'pred': pred, 'labels': labels}


    def training_epoch_end(self, outputs):
        self.__share_epoch_end(outputs, 'train')

    def validation_epoch_end(self, outputs):
        self.__share_epoch_end(outputs, 'val')

        
    def __share_epoch_end(self, outputs, mode):
        preds = []
        labels = []
        for out in outputs:
            pred, label = out['pred'], out['labels']
            preds.append(pred)
            labels.append(label)
        preds = torch.cat(preds)
        labels = torch.cat(labels)
        metrics = torch.sqrt(((labels - preds) ** 2).mean())
        self.log(f'{mode}_RMSE', metrics)    


    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=args.lr, weight_decay = args.weight_decay)
        return {
        "optimizer": optimizer,
        "lr_scheduler": {
            "scheduler": CosineAnnealingLR(optimizer, T_max = args.T_max, eta_min= args.eta_min),
            "interval": "step",
            "monitor": "train_loss",
            "frequency": 1}
            }

In [None]:
super_final_test_predictions = []

for fold in range(args.folds):
    testmodel = DenseModel.load_from_checkpoint(f'../input/swinsmall/64_{fold}best_weights.ckpt')
    testmodel.eval().to(device)

    test = PawpularityDataModule(tstdf, tstdf, tstdf, 
                                 args = args, loaderargs = loaderargs).test_dataloader()

    final_test_predictions = []


    tk2 = tqdm(test, total=int(len(test)))
    for step, batch in enumerate(tk2):
        org_images = batch[0]
        labels = batch[1]
        meta = batch[2]
        images = testmodel.transform['val'](org_images)
        images = images.to(device)
        meta = meta.to(device)
        logits = testmodel.forward(images, meta).squeeze(1)
        pred = logits.sigmoid().detach().cpu().numpy() * 100
        final_test_predictions .extend(pred)

     
    super_final_test_predictions.append(final_test_predictions)
    
    del testmodel
    torch.cuda.empty_cache()

In [None]:
print(super_final_test_predictions)
    
super_final_test_predictions = np.mean(np.column_stack(super_final_test_predictions), axis=1)

print(super_final_test_predictions)

In [None]:
sample_submission = pd.read_csv("../input/petfinder-pawpularity-score/sample_submission.csv")
sample_submission.Pawpularity = super_final_test_predictions
sample_submission.to_csv("submission.csv", index=False)