In [10]:
# Import the libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from PIL import ImageStat
from sklearn.model_selection import train_test_split
from torchvision.io import decode_image
from torch.utils.data import Dataset
from torchvision.transforms import v2
import torch
from torch.utils.data import DataLoader

In [11]:
# Data path constants 
DATA_DIR = "../data/raw"
TRAIN_CSV_PATH = f"{DATA_DIR}/train_images.csv"
TEST_CSV_PATH = f"{DATA_DIR}/test_images_path.csv"
CLASS_NAMES_PATH = f"{DATA_DIR}/class_names.npy"
ATTRIBUTES_PATH = f"{DATA_DIR}/attributes.npy"
ATTRIBUTES_TXT_PATH = f"{DATA_DIR}/attributes.txt"
TRAIN_IMAGES_BASE_PATH = f"{DATA_DIR}/train_images"
TEST_IMAGES_BASE_PATH = f"{DATA_DIR}/test_images"

# Dataset constants
VAL_SPLIT_RATIO = 0.1  # 10% validation set, 90% train as inspired by HF 
RANDOM_STATE = 45  # Keep the same random state across runs for reproducibility
NUM_CLASSES = 200  # It's given in the Kaggle competition description, so we don't need to count it (yet it's counted in EDA part)

# Image transformation constants
IMAGE_SIZE = 384      # Final image size after transformations
RESIZE_SIZE = 384     # Size before center crop for validation/test
RANDOM_CROP_SCALE = (0.7, 1.0)  # RandomResizedCrop scale range
HORIZONTAL_FLIP_PROB = 0.5  # RandomHorizontalFlip probability
ROTATION_DEGREES = 15  # RandomRotation degrees
COLOR_JITTER_BRIGHTNESS = 0.2
COLOR_JITTER_CONTRAST = 0.25
COLOR_JITTER_SATURATION = 0.25
COLOR_JITTER_HUE = 0.15

# Normalization constants (ImageNet normalization statistics since pretrained models are usually trained on ImageNet)
NORMALIZE_MEAN = [0.485, 0.456, 0.406]
NORMALIZE_STD = [0.229, 0.224, 0.225]

# Device constants
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
BATCH_SIZE_CPU = 16
BATCH_SIZE_GPU = 32
BATCH_SIZE_MPS = 32
NUM_WORKERS_CPU = 0
NUM_WORKERS_GPU = 0
NUM_WORKERS_MPS = 0
PIN_MEMORY_CPU = False
PIN_MEMORY_GPU = True
PIN_MEMORY_MPS = False
### Load the data

In [12]:
# Load the training and test sets
train_df = pd.read_csv(TRAIN_CSV_PATH)
test_df = pd.read_csv(TEST_CSV_PATH)
class_names = np.load(CLASS_NAMES_PATH, allow_pickle=True).item()

# Load attributes
attributes = np.load(ATTRIBUTES_PATH, allow_pickle=True)
with open(ATTRIBUTES_TXT_PATH, "r") as f:
    attribute_names = [line.strip().split(' ', 1)[1] for line in f.readlines()]

# Print the data summary
print("Loading the data....")
print(f"-Train Size: {len(train_df)} \n-Test Size: {len(test_df)} \n-Number of Classes: {len(class_names)}")
print(f"-Attributes: {len(attribute_names)} (shape: {attributes.shape if hasattr(attributes, 'shape') else 'N/A'})")


Loading the data....
-Train Size: 3926 
-Test Size: 4000 
-Number of Classes: 200
-Attributes: 312 (shape: (200, 312))


### Inspect Attributes

In [7]:
# Inspect the attributes
attributes_df = pd.DataFrame({'Index': range(len(attribute_names)), 'Attribute Name': attribute_names})
print("Attributes:")
display(attributes_df.head(10))

Attributes:


Unnamed: 0,Index,Attribute Name
0,0,has_bill_shape::curved_(up_or_down)
1,1,has_bill_shape::dagger
2,2,has_bill_shape::hooked
3,3,has_bill_shape::needle
4,4,has_bill_shape::hooked_seabird
5,5,has_bill_shape::spatulate
6,6,has_bill_shape::all-purpose
7,7,has_bill_shape::cone
8,8,has_bill_shape::specialized
9,9,has_wing_color::blue


### Train / test split

In [8]:
# Extract image paths and labels
X = train_df['image_path'].values
y = train_df['label'].values       

# Look at indices of labels (Y)
print("Before correction:")
print("- y indices min:", y.min())
print("- y indices max:", y.max())

# Since the labels start from 1, convert them to 0-indexed
y = y - 1

print("After correction:")
print("- y indices min:", y.min())
print("- y indices max:", y.max())

# Split the data into training and validation sets (80% train, 20% validation)
X_test = test_df['image_path'].values
X_train, X_val, y_train, y_val = train_test_split(
    X, y, 
    test_size = VAL_SPLIT_RATIO, 
    random_state = RANDOM_STATE, # keep the same random state across runs for reproducibility
    stratify = y  # Maintains class distribution in both sets
)

Before correction:
- y indices min: 1
- y indices max: 200
After correction:
- y indices min: 0
- y indices max: 199


### Define Transformations

In [13]:
# Transformations for training set (HF-style: 384x384, flip + rotation)
transformation_training = v2.Compose([
    v2.Resize(RESIZE_SIZE, antialias=True),                # 384
    v2.RandomHorizontalFlip(p=HORIZONTAL_FLIP_PROB),       # 0.5
    v2.RandomRotation(degrees=ROTATION_DEGREES),           # 15
    v2.CenterCrop(IMAGE_SIZE),                             # 384x384
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=NORMALIZE_MEAN, std=NORMALIZE_STD),
])

# Transformations for validation set (no randomness)
transformation_validation = v2.Compose([
    v2.Resize(RESIZE_SIZE, antialias=True),                # 384
    v2.CenterCrop(IMAGE_SIZE),                             # 384x384
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=NORMALIZE_MEAN, std=NORMALIZE_STD),
])

In [14]:
# Pytorch needs a custom dataset to load and transform the data
class FeatherImageDataset(Dataset):
    def __init__(self, image_paths, image_labels, transformation=None, target_transformation=None):
        self.image_paths = image_paths
        self.image_labels = image_labels
        self.transformation = transformation
        self.target_transformation = target_transformation
    
    def __len__(self):
        return len(self.image_labels)
    
    def __getitem__(self, i):
        image_path = self.image_paths[i]
        image_label = self.image_labels[i]  
        image = decode_image(image_path)
        if self.transformation:
            image = self.transformation(image)
        if self.target_transformation:
            image_label = self.target_transformation(image_label)
        return image, image_label

In [15]:
# Create dataset instances with transformations
# Create custom dataset instance for training set
train_dataset = FeatherImageDataset(
    image_paths=[TRAIN_IMAGES_BASE_PATH + path for path in X_train],
    image_labels=y_train,
    transformation=transformation_training
)

# Create custom dataset instance for validation set
val_dataset = FeatherImageDataset(
    image_paths=[TRAIN_IMAGES_BASE_PATH + path for path in X_val],
    image_labels=y_val,
    transformation=transformation_validation
)

# Check device availability
# - If GPU is available, use it
# - If MPS is available, use it (GPU supportfor Apple Silicon)
# - Otherwise, use CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu')
if device.type == 'cuda':
    batch_size = BATCH_SIZE_GPU
    num_workers = NUM_WORKERS_GPU  # Set to 0 for Jupyter notebooks as they might fail with too many workers
    pin_memory = PIN_MEMORY_GPU  # FYI: Pin memory speeds up the transfer data from CPU RAM to GPU vRAM
elif device.type == 'mps':
    batch_size = BATCH_SIZE_MPS
    num_workers = NUM_WORKERS_MPS
    pin_memory = PIN_MEMORY_MPS  # MPS doesn't support pin_memory
else:
    batch_size = BATCH_SIZE_CPU
    num_workers = NUM_WORKERS_CPU
    pin_memory = PIN_MEMORY_CPU  # Not needed for CPU (since it's used to speed up the transfer data from CPU RAM to GPU vRAM)

# Print the DataLoader configuration
print("DataLoader configuration:")
print(f"- Using device: {device}")
print(f"- Batch size: {batch_size}")
print(f"- Number of workers: {num_workers} (0 = main process, avoids pickling issues in Jupyter)")
print(f"- Pin memory: {pin_memory}")

# Create DataLoader for training set
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,  # Shuffle training data so that the model doesn't memorize the order of the data
    num_workers=num_workers,
    pin_memory=pin_memory
)

# Create DataLoader for validation set
val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,  # Don't shuffle validation data: So that each time we use the validation, it's the same
    num_workers=num_workers,
    pin_memory=pin_memory
)

# Print the final information
print(f"\nSetup:")
print(f"- Train dataset: {len(train_dataset)} samples")
print(f"- Validation dataset: {len(val_dataset)} samples")
print(f"- Batch size: {batch_size}")
print(f"- Batch size: {batch_size}")

DataLoader configuration:
- Using device: mps
- Batch size: 32
- Number of workers: 0 (0 = main process, avoids pickling issues in Jupyter)
- Pin memory: False

Setup:
- Train dataset: 3533 samples
- Validation dataset: 393 samples
- Batch size: 32
- Batch size: 32


In [16]:
# Use a pipeline as a high-level helper
from transformers import pipeline

pipe = pipeline("image-classification", model="Emiel/cub-200-bird-classifier-swin")
pipe("https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/parrots.png")

  from .autonotebook import tqdm as notebook_tqdm
Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Device set to use mps:0


[{'label': 'Pacific_Loon', 'score': 0.045932356268167496},
 {'label': 'Ovenbird', 'score': 0.02930556796491146},
 {'label': 'Hooded_Oriole', 'score': 0.029163602739572525},
 {'label': 'Black_Tern', 'score': 0.0259512010961771},
 {'label': 'Rufous_Hummingbird', 'score': 0.023637680336833}]

In [17]:
# Load model directly
from transformers import AutoImageProcessor, AutoModelForImageClassification

processor = AutoImageProcessor.from_pretrained("Emiel/cub-200-bird-classifier-swin")
model = AutoModelForImageClassification.from_pretrained("Emiel/cub-200-bird-classifier-swin")

In [18]:
class FeatherImageDatasetHF(Dataset):
    def __init__(self, image_paths, image_labels=None, processor=None):
        self.image_paths = image_paths
        self.image_labels = image_labels
        self.processor = processor

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, i):
        image = decode_image(self.image_paths[i])
        inputs = self.processor(images=image, return_tensors="pt")
        inputs = {k: v.squeeze(0) for k, v in inputs.items()}

        # Add labels only for train/val
        if self.image_labels is not None:
            inputs["labels"] = torch.tensor(self.image_labels[i], dtype=torch.long)
        
        return inputs


In [21]:
def validate_hf(model, val_loader, device):
    model.to(device)    
    model.eval()
    correct, total = 0, 0

    with torch.no_grad():
        for batch in val_loader:
            # Move all inputs to device
            batch = {k: v.to(device) for k, v in batch.items()}
            labels = batch.pop("labels")  # remove labels
            
            outputs = model(**batch)
            preds = outputs.logits.argmax(dim=1)

            total += labels.size(0)
            correct += (preds == labels).sum().item()

    return correct / total


In [22]:
train_dataset_hf = FeatherImageDatasetHF(
    [TRAIN_IMAGES_BASE_PATH + path for path in X_train],
    y_train,
    processor
)

val_dataset_hf = FeatherImageDatasetHF(
    [TRAIN_IMAGES_BASE_PATH + path for path in X_val],
    y_val,
    processor
)

train_loader_hf = DataLoader(train_dataset_hf, batch_size=batch_size, shuffle=True, 
                             num_workers=num_workers, pin_memory=pin_memory)

val_loader_hf = DataLoader(val_dataset_hf, batch_size=batch_size, shuffle=False, 
                           num_workers=num_workers, pin_memory=pin_memory)

acc = validate_hf(model, val_loader_hf, device)
print(f"HF Swin validation accuracy: {acc:.3f} ({acc*100:.1f}%)")


HF Swin validation accuracy: 0.982 (98.2%)


### Bayesian Grid search for hyperparameter tuning

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR

NUM_ATTRS = attributes.shape[1]

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        return F.relu(out)

class MultiTaskResNet(nn.Module):
    def __init__(self, num_classes=200, num_attrs=NUM_ATTRS, dropout=0.3):
        super().__init__()

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(32)

        self.layer1 = ResidualBlock(32, 64)
        self.layer2 = ResidualBlock(64, 128, stride=2)
        self.layer3 = ResidualBlock(128, 256, stride=2)

        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(dropout)

        self.fc_cls = nn.Linear(256, num_classes)
        self.fc_attr = nn.Linear(256, num_attrs)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.max_pool2d(self.layer1(x), 2)
        x = F.max_pool2d(self.layer2(x), 2)
        x = F.max_pool2d(self.layer3(x), 2)
        x = self.pool(x).view(x.size(0), -1)
        x = self.dropout(x)

        cls_logits = self.fc_cls(x)
        attr_preds = self.fc_attr(x)
        return cls_logits, attr_preds
