# Swin Transformer
In this notebook, we use transfer learning to train a classification head on top of a pretrained Swin Transformer backbone over 5 folds. Through public discussions, it was found that Swin Transformers perform better than normal Convolutional Neural Networks for this competition. Public discussions further discovered that even though the initial problem statement is a regression problem, converting this regression problem into a classification problem yields better results. Therefore, the model is trained using the binary cross entropy (BCE) loss rather than the mean squared error (MSE) loss. We then convert the predictions back to the initial value range from 0 to 100 by applying the sigmoid function and multiplying by 100. We are building off of top-scoring public notebooks and discussions. Important custom modifications are noted in comments. This notebook is intended to be run on Kaggle with GPU enabled and with the necessary packages and datasets in the working directory.

# Imports

In [1]:
# Add tez and timm libraries to Kaggle
import sys
sys.path.append("../input/tez-lib/")
sys.path.append("../input/timmmaster/")

# Basics
import numpy as np
import pandas as pd

# Image manipulation
import cv2

# Main neural network library
import torch.nn as nn
import torch

# Easier PyTorch development
import tez
from tez.callbacks import EarlyStopping

# Image augmentations
import albumentations

# Swin Transformer backbone
import timm

# Calculate MSE
from sklearn import metrics

# Progress display
from tqdm import tqdm

# General

In [2]:
# Fold to train
FOLD = 0

# Input image size
IMAGE_SIZE = 384

In [3]:
# To denormalize predictions
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

# Dataset

In [4]:
class CustomDataset:
    def __init__(self, image_paths, dense_features, targets, augmentations):
        self.image_paths = image_paths
        self.dense_features = dense_features
        self.targets = targets
        self.augmentations = augmentations
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, item):
        # Read image and convert BGR to RGB
        image = cv2.imread(self.image_paths[item])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Augmentations
        if self.augmentations is not None:
            augmented = self.augmentations(image=image)
            image = augmented["image"]
            
        # OpenCV is (height, width, channel)
        # PyTorch wants (channel, height, width)
        image = np.transpose(image, (2, 0, 1)).astype(np.float32)
        
        # Photo metadata
        features = self.dense_features[item, :]
        
        # Important: divide by 100 to normalize targets for BCE loss
        targets = self.targets[item] / 100.0
        
        return {
            "image": torch.tensor(image, dtype=torch.float),
            "features": torch.tensor(features, dtype=torch.float),
            "targets": torch.tensor(targets, dtype=torch.float),
        }

# Model

In [5]:
class CustomModel(tez.Model):
    def __init__(self):
        super().__init__()

        # Swin Transformer backbone
        self.model = timm.create_model("swin_large_patch4_window12_384", pretrained=True, in_chans=3)
        
        # Classification head
        self.model.head = nn.Linear(self.model.head.in_features, 128)
        self.dropout = nn.Dropout(0.1)
        self.dense1 = nn.Linear(140, 64)
        
        # Important: add ReLU activation function for dense head layer to introduce non-linearity
        self.relu1 = nn.ReLU()
        
        # Single output without sigmoid, BCE loss will apply sigmoid automatically
        self.dense2 = nn.Linear(64, 1)
        
        # Step the schedule after each epoch
        self.step_scheduler_after = "epoch"

    def monitor_metrics(self, outputs, targets):
        # Important: apply sigmoid function to output predictions and multiply by 100 to denormalize
        outputs = sigmoid(outputs.cpu().detach().numpy()) * 100
        
        # Important: multiply targets by 100 to denormalize
        targets = targets.cpu().detach().numpy() * 100
        
        # Calculate RMSE
        rmse = metrics.mean_squared_error(targets, outputs, squared=False)
        return {"rmse": rmse}

    def fetch_scheduler(self):
        # Cosine annealing scheduler
        scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
            self.optimizer, T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1
        )
        return scheduler

    def fetch_optimizer(self):
        # Adam optimizer
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        return optimizer

    def forward(self, image, features, targets=None):
        # Forward pass through model
        x = self.model(image)
        x = self.dropout(x)
        x = torch.cat([x, features], dim=1)
        x = self.dense1(x)
        
        # Important: pass through ReLU activation
        x = self.relu1(x)
        
        # Output node
        x = self.dense2(x)

        if targets is not None:
            # Important: use binary cross entropy loss instead of mean squared error loss
            loss = nn.BCEWithLogitsLoss()(x, targets.view(-1, 1))
            
            metrics = self.monitor_metrics(x, targets)
            return x, loss, metrics
        return x, 0, {}

# Create Datasets

In [6]:
# Load community-provided 5-fold preprocessed CSV file to dataframe
df = pd.read_csv("../input/same-old-creating-folds/train_5folds.csv")

# Dataframes
df_train = df[df.kfold != FOLD].reset_index(drop=True)
df_valid = df[df.kfold == FOLD].reset_index(drop=True)

# Image paths
train_image_paths = [f"../input/petfinder-pawpularity-score/train/{x}.jpg" for x in df_train["Id"].values]
valid_image_paths = [f"../input/petfinder-pawpularity-score/train/{x}.jpg" for x in df_valid["Id"].values]

# Dense features (photo metadata)
photo_metadata = ['Subject Focus', 'Eyes', 'Face', 'Near', 'Action', 'Accessory', 'Group', 'Collage', 'Human', 'Occlusion', 'Info', 'Blur']
train_dense_features = df_train[photo_metadata].values
valid_dense_features = df_valid[photo_metadata].values

# Targets (Pawpularity scores)
train_targets = df_train["Pawpularity"].values
valid_targets = df_valid["Pawpularity"].values

# Augmentations: always resize and normalize, hue/saturation and brightness for train set only
train_aug = albumentations.Compose(
    [
        albumentations.Resize(IMAGE_SIZE, IMAGE_SIZE, p=1),
        albumentations.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.5),
        albumentations.RandomBrightnessContrast(brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), p=0.5),
        albumentations.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
            max_pixel_value=255.0,
            p=1.0,
        ),
    ],
    p=1.0,
)
valid_aug = albumentations.Compose(
    [
        albumentations.Resize(IMAGE_SIZE, IMAGE_SIZE, p=1),
        albumentations.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
            max_pixel_value=255.0,
            p=1.0,
        ),
    ],
    p=1.0,
)

# Create datasets
train_dataset = CustomDataset(
    image_paths=train_image_paths,
    dense_features=train_dense_features,
    targets=train_targets,
    augmentations=train_aug,
)
valid_dataset = CustomDataset(
    image_paths=valid_image_paths,
    dense_features=valid_dense_features,
    targets=valid_targets,
    augmentations=valid_aug,
)

# Training

In [7]:
# Create model
model = CustomModel()

# Early stopping after 3 epochs of no validation RMSE improvement
es = EarlyStopping(
    monitor="valid_rmse",
    model_path=f"model_f{FOLD}.bin",
    patience=3,
    mode="min",
    save_weights_only=True,
)

# Fit the model
model.fit(
    train_dataset,
    valid_dataset=valid_dataset,
    train_bs=8,
    valid_bs=16,
    device="cuda",
    epochs=1,
    callbacks=[es],
    fp16=True,
)

Downloading: "https://github.com/SwinTransformer/storage/releases/download/v1.0.0/swin_large_patch4_window12_384_22kto1k.pth" to /root/.cache/torch/hub/checkpoints/swin_large_patch4_window12_384_22kto1k.pth
100%|██████████| 992/992 [19:36<00:00,  1.19s/it, loss=0.665, rmse=19.6, stage=train]
100%|██████████| 124/124 [01:45<00:00,  1.18it/s, loss=0.663, rmse=20.1, stage=valid]


Validation score improved (inf --> 20.12838645135203). Saving model!


# Inference

In [9]:
class CustomInferenceModel(tez.Model):
    def __init__(self):
        super().__init__()
        self.model = timm.create_model("swin_large_patch4_window12_384", pretrained=False, in_chans=3)
        self.model.head = nn.Linear(self.model.head.in_features, 128)
        self.dropout = nn.Dropout(0.1)
        self.dense1 = nn.Linear(140, 64)
        self.relu1 = nn.ReLU()
        self.dense2 = nn.Linear(64, 1)

    def forward(self, image, features, targets=None):
        x = self.model(image)
        x = self.dropout(x)
        x = torch.cat([x, features], dim=1)
        x = self.dense1(x)
        x = self.relu1(x)
        x = self.dense2(x)
        return x, 0, {}

In [10]:
# Create test dataset
df_test = pd.read_csv("../input/petfinder-pawpularity-score/test.csv")
test_image_paths = [f"../input/petfinder-pawpularity-score/test/{x}.jpg" for x in df_test["Id"].values]
test_dense_features = df_test[photo_metadata].values
test_targets = np.ones(len(test_image_paths))
test_aug = albumentations.Compose(
    [
        albumentations.Resize(IMAGE_SIZE, IMAGE_SIZE, p=1),
        albumentations.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
            max_pixel_value=255.0,
            p=1.0,
        ),
    ],
    p=1.0,
)
test_dataset = CustomDataset(
    image_paths=test_image_paths,
    dense_features=test_dense_features,
    targets=test_targets,
    augmentations=test_aug,
)

# Get predictions for all 5 folds
all_fold_preds = []
for fold in range(5):
    # Load model for fold
    model = CustomInferenceModel()
    model.load(f"../input/bce-20-epochs/model_f{fold}.bin", device="cuda", weights_only=True)

    # Predict test set
    test_preds = model.predict(test_dataset, batch_size=16, n_jobs=-1)

    cur_fold_preds = []
    for preds in tqdm(test_preds):
        # Denormalize
        preds = sigmoid(preds) * 100    
        
        # Convert to list
        preds = preds.ravel().tolist()
        
        # Add to current fold list
        cur_fold_preds.extend(preds)
    
    # Add current fold list to all folds list
    all_fold_preds.append(cur_fold_preds)

# Final prediction is the average of all folds
df_test["Pawpularity"] = np.mean(np.column_stack(all_fold_preds), axis=1)
df_test = df_test[["Id", "Pawpularity"]]
df_test.to_csv("submission.csv", index=False)

0it [00:00, ?it/s]
1it [00:00,  1.68it/s]00:00<?, ?it/s][A
  0%|          | 0/1 [00:00<?, ?it/s, stage=test][A
100%|██████████| 1/1 [00:00<00:00,  1.56it/s, stage=test][A
1it [00:00,  1.53it/s]
0it [00:00, ?it/s]
1it [00:00,  1.89it/s]00:00<?, ?it/s][A
  0%|          | 0/1 [00:00<?, ?it/s, stage=test][A
100%|██████████| 1/1 [00:00<00:00,  1.68it/s, stage=test][A
1it [00:00,  1.63it/s]
0it [00:00, ?it/s]
1it [00:00,  1.93it/s]00:00<?, ?it/s][A
  0%|          | 0/1 [00:00<?, ?it/s, stage=test][A
100%|██████████| 1/1 [00:00<00:00,  1.76it/s, stage=test][A
1it [00:00,  1.72it/s]
0it [00:00, ?it/s]
1it [00:00,  1.88it/s]00:00<?, ?it/s][A
  0%|          | 0/1 [00:00<?, ?it/s, stage=test][A
100%|██████████| 1/1 [00:00<00:00,  1.73it/s, stage=test][A
1it [00:00,  1.68it/s]
0it [00:00, ?it/s]
1it [00:00,  1.92it/s]00:00<?, ?it/s][A
  0%|          | 0/1 [00:00<?, ?it/s, stage=test][A
100%|██████████| 1/1 [00:00<00:00,  1.74it/s, stage=test][A
1it [00:00,  1.70it/s]


In [11]:
df_test.head()

Unnamed: 0,Id,Pawpularity
0,4128bae22183829d2b5fea10effdb0c3,40.683147
1,43a2262d7738e3d420d453815151079e,41.5589
2,4e429cead1848a298432a0acad014c9d,40.380956
3,80bc3ccafcc51b66303c2c263aa38486,40.51514
4,8f49844c382931444e68dffbe20228f4,40.567381
