### Clone GitHub repository

In [None]:
!rm -rf ML-Project

In [None]:
REPO_URL = "https://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@github.com/aleexx02/ML-Project.git"
!git clone $REPO_URL
%cd ML-Project


Cloning into 'ML-Project'...
remote: Enumerating objects: 1656, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (19/19), done.[K


### Imports and Paths

In [None]:
import os
import pandas as pd
import re
import numpy as np
import copy
import json
import time
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torchvision.transforms as T
import matplotlib.pyplot as plt
from torchvision.models import resnet50, ResNet50_Weights
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
from tqdm import tqdm
from sklearn.model_selection import StratifiedKFold

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

PROJECT_ROOT = "."
DATA_DIR = os.path.join(PROJECT_ROOT, "BDMA7_project_files")

TRAIN_IMAGES_DIR = os.path.join(DATA_DIR, "train_images")
VAL_IMAGES_DIR   = os.path.join(DATA_DIR, "val_images")
TEST_IMAGES_DIR  = os.path.join(DATA_DIR, "test_images", "mistery_cat")

TRAIN_META_PATH = os.path.join(DATA_DIR, "train_metadata.csv")
VAL_META_PATH   = os.path.join(DATA_DIR, "val_metadata.csv")
SAMPLE_SUB_PATH = os.path.join(DATA_DIR, "sample_submission.csv")

### Weights and Biases Set Up

In [None]:
# Uncomment to install wandb
# !pip install wandb -q

In [None]:
# import wandb
# wandb.login()

### Parameters

In [None]:
# PARAMETERS FOR MODEL
BATCH_SIZE = 32
NUM_CLASSES = 20
IMG_SIZE = 224
NUM_WORKERS = 4

### Data

* **Training dataset**: with flips, to have variety and make a stronger model: model learns more than one orientation of a same bird: *(image_tensor, label)*.

* **Evaluation dataset**: without flips. *(image_tensor, label)*.

* **Testing dataset**: *(image_tensor, path)*.

In [None]:
# Load metadata for train, eval and test
train_meta = pd.read_csv(TRAIN_META_PATH)
val_meta   = pd.read_csv(VAL_META_PATH)
test_meta = pd.read_csv(SAMPLE_SUB_PATH)

train_meta["images_dir"] = TRAIN_IMAGES_DIR
val_meta["images_dir"] = VAL_IMAGES_DIR


train_meta = train_meta[["path", "class_idx", "images_dir"]]
val_meta = val_meta[["path", "class_idx", "images_dir"]]

print(train_meta.head())
print(train_meta.columns)

In [None]:
# Generate the datasets
class BirdsDataset(Dataset):
    def __init__(self, df, images_dir, transform=None, has_labels=True):
        self.df = df.reset_index(drop=True)
        self.images_dir = images_dir
        self.transform = transform
        self.has_labels = has_labels

    def __len__(self):
        return len(self.df)

    # given id, get (image_tensor, label) of the corresponding image.
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        try:
            base_dir = row["images_dir"] if "images_dir" in self.df.columns else self.images_dir
            img_path = os.path.join(base_dir, row["path"])
            image = Image.open(img_path).convert("RGB")
            if self.transform is not None:
                image = self.transform(image)

            if self.has_labels:
                return image, int(row["class_idx"])
            else:
                return image, row["path"]

        except Exception as e:
            print("\n DATA ERROR idx =", idx)
            print("row index keys:", list(row.index))
            print("row:", row)
            print("base_dir:", base_dir if "base_dir" in locals() else None)
            print("img_path:", img_path if "img_path" in locals() else None)
            raise

##### Transformations

In [None]:
# Transform the training and evaluation datasets:

train_transform = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(5),
    transforms.ColorJitter(brightness=0.05, contrast=0.6, saturation=0.15, hue=0.01),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


# We don't flip the evaluation images
val_test_transform = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

##### Cross validation set up

In [None]:
skf = StratifiedKFold(n_splits=8,shuffle=True,random_state=42)

In [None]:
full_meta = pd.concat([train_meta, val_meta], ignore_index=True).copy()
full_meta["images_dir"] = full_meta["images_dir"].astype(str)
full_meta["path"] = full_meta["path"].astype(str)
full_meta["full_path"] = full_meta["images_dir"].str.rstrip("/") + "/" + full_meta["path"].str.lstrip("/")

In [None]:
# Create datasets for sampling purposes (not for training)
train_dataset = BirdsDataset(train_meta, TRAIN_IMAGES_DIR, train_transform, has_labels=True)
eval_dataset = BirdsDataset(val_meta, VAL_IMAGES_DIR, val_test_transform, has_labels=True)
test_dataset = BirdsDataset(test_meta, TEST_IMAGES_DIR, val_test_transform, has_labels=False)

In [None]:
def make_loader(df, transform, shuffle, has_labels = True):
    dataset = BirdsDataset(df, images_dir = None, transform=transform, has_labels=has_labels)
    return DataLoader(dataset,batch_size=BATCH_SIZE,shuffle=shuffle, num_workers=NUM_WORKERS, pin_memory=True)

##### Distribution of the data

In [None]:
idx_to_name = {
    0: 'Groove_billed_Ani', 1: 'Red_winged_Blackbird', 2: 'Rusty_Blackbird',
    3: 'Gray_Catbird', 4: 'Brandt_Cormorant', 5: 'Eastern_Towhee',
    6: 'Indigo_Bunting', 7: 'Brewer_Blackbird', 8: 'Painted_Bunting',
    9: 'Bobolink', 10: 'Lazuli_Bunting', 11: 'Yellow_headed_Blackbird',
    12: 'American_Crow', 13: 'Fish_Crow', 14: 'Brown_Creeper',
    15: 'Yellow_billed_Cuckoo', 16: 'Yellow_breasted_Chat',
    17: 'Black_billed_Cuckoo', 18: 'Gray_crowned_Rosy_Finch',
    19: 'Bronzed_Cowbird'
}

train_dist = train_meta["class_idx"].value_counts().sort_index()
val_dist = val_meta["class_idx"].value_counts().sort_index()

# DataFrames
train_df = pd.DataFrame({
    'Class_ID': train_dist.index,
    'Class_Name': [idx_to_name[i] for i in train_dist.index],
    'Train_Count': train_dist.values
})

val_df = pd.DataFrame({
    'Class_ID': val_dist.index,
    'Class_Name': [idx_to_name[i] for i in val_dist.index],
    'Val_Count': val_dist.values
})

test_df = pd.DataFrame({
    'Class_ID': list(range(20)),
    'Class_Name': [idx_to_name[i] for i in range(20)],
    'Test_Count': '?'
})

print("="* 50)
print("            Training Distribution")
print("="* 50)
print(train_df.to_markdown(index=False))

print("="* 50)
print("            Validation Distribution")
print("="* 50)
print(val_df.to_markdown(index=False))

print("="* 50)
print("            Test Distribution")
print("="* 50)
print(test_df.to_markdown(index=False))


In [None]:
class_counts = full_meta["class_idx"].value_counts().sort_index()

class_names = [idx_to_name[i] for i in class_counts.index]

plt.figure(figsize=(14,7))
plt.bar(class_names, class_counts.values)
plt.xticks(rotation=45, ha='right')
plt.xlabel("Class")
plt.ylabel("Count of birds")
plt.title("distrubution of the dataset")
plt.tight_layout()
plt.show()

##### Show some samples

In [None]:
def show_random_samples(dataset, num_samples=4, title="Samples"):
    fig, axes = plt.subplots(1, num_samples, figsize=(15, 3))

    # Get random indices
    indices = torch.randperm(len(dataset))[:num_samples].numpy()

    for i, idx in enumerate(indices):
        image, label = dataset[idx]

        image = image * torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
        image = image + torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
        image = torch.clamp(image, 0, 1)

        axes[i].imshow(image.permute(1, 2, 0).numpy())
        axes[i].set_title(f"Class {label}", fontsize=14)
        axes[i].axis('off')

    plt.suptitle(title, fontsize=16)
    plt.tight_layout()
    plt.show()

# Show samples for train, evaluation and test
print("TRAINING samples:")
show_random_samples(train_dataset, title= "Training Dataset")

print("\nVALIDATION samples:")
show_random_samples(eval_dataset, title="Validation Dataset")

print("\nTESTING samples:")
show_random_samples(test_dataset, title="Testing Dataset")


### Model definition

In [None]:
from torchvision.models import convnext_small, ConvNeXt_Small_Weights

class ConvNeXtBirds(nn.Module):
    def __init__(self, num_classes=20, dropout=0.25, pretrained=True, mlp_depth=1, mlp_hidden=512, mlp_ratio=0.5, use_layernorm=True):

        super().__init__()
        weights = ConvNeXt_Small_Weights.DEFAULT if pretrained else None
        self.backbone = convnext_small(weights=weights)

        in_features = self.backbone.classifier[-1].in_features

        layers = [nn.Flatten(1), nn.LayerNorm(in_features)]

        # when depth = 1 -> Dropout + Linear output
        if mlp_depth == 1:
            layers.append(nn.Dropout(dropout))
            layers.append(nn.Linear(in_features, num_classes))

        # when depth > 1 -> (depth-1) hidden + output
        else:
            prev = in_features
            hidden = mlp_hidden

            # hidden layers: (mlp_depth - 1)
            for i in range(mlp_depth - 1):
                layers.append(nn.Linear(prev, hidden))
                layers.append(nn.GELU())
                if use_layernorm:
                    layers.append(nn.LayerNorm(hidden))
                layers.append(nn.Dropout(dropout))

                prev = hidden
                hidden = max(64, int(hidden * mlp_ratio))

            layers.append(nn.Linear(prev, num_classes))

        self.backbone.classifier = nn.Sequential(*layers)

    def forward(self, x):
        return self.backbone(x)


### GRID SEARCH using the Weights and Biases Tool

##### Settle the sweep configuration

In [None]:
# Uncomment to settle the sweep configuration for grid search

# # Sweep Configuration: what hyperparameters to tune
# sweep_config = {
#     'method': 'bayes',
#     'name': 'convnext-bird-hyperparameter-search',
#     'metric': {
#         'name': 'mean_fold_accuracy',  # what we want to maximize
#         'goal': 'maximize'
#     },
#     'parameters': {
#         # Hyperparameters to tune:

#         'dropout': {
#             'values': [0.15, 0.20, 0.25, 0.30]
#         },

#         'learning_rate': {
#             'values': [7.5e-5, 1e-4, 1.5e-4, 2e-4, 3e-4, 4e-4]
#         },

#         'batch_size': {
#             'values': [32, 64]
#         },

#         'frozen_epochs': {
#             'values': [10, 15, 20, 25]
#         },

#         'finetune_epochs': {
#             'values': [0, 5, 10]
#         },

#         'weight_decay': {
#             'values': [0.0, 0.001, 0.005, 0.01]
#         },

#         'rotation_degrees': {
#             'values': [15, 30]
#         },

#         'color_jitter_strength': {
#             'values': [0.1, 0.2, 0.3]
#         },

#         'n_folds': {
#             'value': 8
#         },

#         'mlp_depth': {'values': [1, 2, 3]},
#         'mlp_hidden': {'values': [256, 512, 1024]},
#         'mlp_ratio': {'values': [0.5, 0.75]},
#         'use_layernorm': {'values': [True]},

#         'num_classes': {
#             'value': 20
#         },

#         'img_size': {
#             'value': 224
#         },

#         'num_workers': {
#             'value': 4
#         },
#         'scheduler_type': {
#           'values': ['plateau', 'cosine']
#         },

#         # per plateau
#         'plateau_factor': {'values': [0.5]},
#         'plateau_patience_frozen': {'values': [2, 3]},
#         'plateau_patience_finetune': {'values': [1, 2]},

#         # per cosine
#         'cosine_tmax_frozen': {'values': [20, 30, 40]},
#         'cosine_tmax_finetune': {'values': [5, 10]},
#         'cosine_eta_min': {'values': [1e-6, 1e-5]},
#             }
# }

# # Initializing the sweep
# sweep_id = wandb.sweep(sweep_config, project="bird-classification-sweep")

# print(f"Access to dashboard at: https://wandb.ai/alexandraperruchot/bird-classification-sweep/sweeps/{sweep_id}")


##### Training with sweep configuration

In [None]:
# Uncomment to define the function for training with sweep configuration

# def train_with_sweep_config():
#     with wandb.init() as run:
#         config = wandb.config
#         run_id = run.id
#         timestamp = time.strftime("%Y%m%d-%H%M%S")
#         hp_tag = (
#             f"dp{config.dropout}_lr{config.learning_rate:.1e}_bs{config.batch_size}"
#             f"_fd{config.frozen_epochs}_ft{config.finetune_epochs}"
#             f"_mlpD{config.mlp_depth}_H{config.mlp_hidden}_r{config.mlp_ratio}"
#         )

#         base_ckpt_dir = f"checkpoints/sweep_{timestamp}_{run_id}_{hp_tag}"
#         os.makedirs(base_ckpt_dir, exist_ok=True)

#         print(f"\n Checkpoints directory for this run: {base_ckpt_dir}\n")
#         print(f"\n{'='*70}")
#         print(f"TRAINING WITH HYPERPARAMETERS:")
#         print(f"{'='*70}")
#         print(f"  Dropout:          {config.dropout}")
#         print(f"  Learning Rate:    {config.learning_rate:.6f}")
#         print(f"  Batch Size:       {config.batch_size}")
#         print(f"  Frozen Epochs:    {config.frozen_epochs}")
#         print(f"  Finetune Epochs:  {config.finetune_epochs}")
#         print(f"  Weight Decay:     {config.weight_decay}")
#         print(f"  Rotation:         {config.rotation_degrees}°")
#         print(f"  Color Jitter:     {config.color_jitter_strength}")
#         print(f"  N Folds:          {config.n_folds}")
#         print(f"{'='*70}\n")

#         # Create data transforms with sweep config
#         train_transform = transforms.Compose([
#             transforms.Resize((config.img_size, config.img_size)),
#             transforms.RandomHorizontalFlip(p=0.5),
#             transforms.RandomRotation(degrees=config.rotation_degrees),
#             transforms.ColorJitter(
#                 brightness=config.color_jitter_strength,
#                 contrast=config.color_jitter_strength,
#                 saturation=config.color_jitter_strength,
#                 hue=config.color_jitter_strength * 0.5
#             ),
#             transforms.ToTensor(),
#             transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#         ])

#         # Validation transform (no augmentation)
#         val_test_transform = transforms.Compose([
#             transforms.Resize((config.img_size, config.img_size)),
#             transforms.ToTensor(),
#             transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#         ])

#         # Cross validation setup
#         fold_models = []
#         fold_accuracies = []
#         fold_histories = []

#         skf = StratifiedKFold(n_splits=config.n_folds, shuffle=True, random_state=42)

#         # Cross validation loop
#         for fold, (train_idx, val_idx) in enumerate(skf.split(full_meta, full_meta["class_idx"])):
#             print(f"\n{'='*50}")
#             print(f" Fold {fold+1}/{config.n_folds}")
#             print(f"{'='*50}")

#             # split data
#             train_df = full_meta.iloc[train_idx].reset_index(drop=True)
#             val_df = full_meta.iloc[val_idx].reset_index(drop=True)

#             # Create data loaders
#             train_dataset = BirdsDataset(train_df, images_dir=None, transform=train_transform, has_labels=True)
#             val_dataset = BirdsDataset(val_df, images_dir=None, transform=val_test_transform, has_labels=True)

#             train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers)

#             val_loader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers)

#             print(f"Train images: {len(train_df)} | Val images: {len(val_df)}")

#             # Create model with sweep config
#             model = ConvNeXtBirds(num_classes=config.num_classes, dropout=config.dropout, pretrained=True, mlp_depth=config.mlp_depth, mlp_hidden=config.mlp_hidden, mlp_ratio=config.mlp_ratio, use_layernorm=config.use_layernorm).to(device)

#             # we freeze backbone initially
#             for p in model.backbone.features.parameters():
#                 p.requires_grad = False
#             for p in model.backbone.classifier.parameters():
#                 p.requires_grad = True

#             # optimizer with sweep config
#             optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=config.learning_rate, weight_decay=config.weight_decay)

#             criterion = nn.CrossEntropyLoss()
#             if config.scheduler_type == "plateau":
#               scheduler = ReduceLROnPlateau(optimizer, mode="max", factor=config.plateau_factor, patience=config.plateau_patience_frozen)
#             elif config.scheduler_type == "cosine":
#                 scheduler = CosineAnnealingLR(optimizer, T_max=config.cosine_tmax_frozen, eta_min=config.cosine_eta_min)
#             else:
#                 raise ValueError(f"Unknown scheduler_type: {config.scheduler_type}")

#             # keeping track of training metrics
#             best_acc = 0.0
#             best_model_path = os.path.join(base_ckpt_dir, f"best_fold_{fold+1}.pth")
#             history = {"train_loss": [], "val_loss": [], "val_acc": []}

#             # ========== FROZEN TRAINING ==========
#             print(f"\nFrozen Training ({config.frozen_epochs} epochs)")

#             for epoch in range(config.frozen_epochs):
#                 print(f"\nEpoch {epoch+1}/{config.frozen_epochs} (Frozen)")

#                 train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
#                 val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)

#                 history["train_loss"].append(train_loss)
#                 history["val_loss"].append(val_loss)
#                 history["val_acc"].append(val_acc)

#                 print(f"Train: {train_acc:.1f}% (Loss: {train_loss:.3f})")
#                 print(f"Val:   {val_acc:.1f}% (Loss: {val_loss:.3f}) | LR: {optimizer.param_groups[0]['lr']:.2e}")

#                 # we save the best model
#                 if val_acc > best_acc:
#                     best_acc = val_acc

#                     torch.save(model.state_dict(), best_model_path)
#                     print(f"New best accuracy! Saved to {best_model_path}")

#                 if config.scheduler_type == "plateau":
#                     scheduler.step(val_acc)
#                 else:
#                     scheduler.step()

#                 # Log
#                 wandb.log({
#                     f"fold_{fold+1}/train_loss": train_loss,
#                     f"fold_{fold+1}/train_acc": train_acc,
#                     f"fold_{fold+1}/val_loss": val_loss,
#                     f"fold_{fold+1}/val_acc": val_acc,
#                     f"fold_{fold+1}/learning_rate": optimizer.param_groups[0]['lr'],
#                     "epoch": epoch,
#                     "fold": fold + 1,
#                     "phase": "frozen"
#                 })

#             # ========== FINE-TUNING ==========
#             if config.finetune_epochs > 0:
#                 print(f"\nFine-tuning ({config.finetune_epochs} epochs)")

#                 # Unfreeze all layers
#                 for p in model.parameters():
#                     p.requires_grad = True

#                 # new optimizer with lower learning rate
#                 optimizer = AdamW(model.parameters(), lr=config.learning_rate * 0.1, weight_decay=config.weight_decay)
#                 scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2)

#                 for epoch in range(config.finetune_epochs):
#                     print(f"\nEpoch {epoch+1}/{config.finetune_epochs} (Fine-tuning)")

#                     train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
#                     val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)

#                     history["train_loss"].append(train_loss)
#                     history["val_loss"].append(val_loss)
#                     history["val_acc"].append(val_acc)

#                     print(f"Train: {train_acc:.1f}% (Loss: {train_loss:.3f})")
#                     print(f"Val:   {val_acc:.1f}% (Loss: {val_loss:.3f}) | LR: {optimizer.param_groups[0]['lr']:.2e}")

#                     # Save if better
#                     if val_acc > best_acc:
#                         best_acc = val_acc
#                         torch.save(model.state_dict(), best_model_path)
#                         print(f" New best accuracy! Saved to {best_model_path}")

#                     scheduler.step(val_acc)

#                     # Log
#                     wandb.log({
#                         f"fold_{fold+1}/finetune_train_loss": train_loss,
#                         f"fold_{fold+1}/finetune_train_acc": train_acc,
#                         f"fold_{fold+1}/finetune_val_loss": val_loss,
#                         f"fold_{fold+1}/finetune_val_acc": val_acc,
#                         f"fold_{fold+1}/learning_rate": optimizer.param_groups[0]['lr'],
#                         "epoch": config.frozen_epochs + epoch,
#                         "fold": fold + 1,
#                         "phase": "finetune"
#                     })

#             # store fold results
#             fold_accuracies.append(best_acc)
#             fold_models.append(best_model_path)
#             fold_histories.append(history)

#             # Log
#             wandb.log({
#                 f"fold_{fold+1}/best_val_accuracy": best_acc,
#             })

#             print(f"\n Fold {fold+1} Complete - Best Accuracy: {best_acc:.2f}%")

#             # Clean up before next fold
#             del model, optimizer, scheduler, train_loader, val_loader
#             torch.cuda.empty_cache()

#         # ========== SUMMARY STATISTICS ==========
#         mean_accuracy = np.mean(fold_accuracies)
#         std_accuracy  = np.std(fold_accuracies)
#         best_fold     = np.max(fold_accuracies)
#         worst_fold    = np.min(fold_accuracies)

#         summary = {
#             "run_id": run_id,
#             "timestamp": timestamp,
#             "mean_accuracy": float(mean_accuracy),
#             "std_accuracy": float(std_accuracy),
#             "best_fold_accuracy": float(best_fold),
#             "worst_fold_accuracy": float(worst_fold),
#             "fold_accuracies": [float(x) for x in fold_accuracies],
#             "fold_models": fold_models,
#             "hyperparameters": {
#                 k: (v if isinstance(v, (int, float, str, bool)) else str(v))
#                 for k, v in dict(config).items()
#             },
#         }

#         with open(os.path.join(base_ckpt_dir, "summary.json"), "w") as f:
#             json.dump(summary, f, indent=2)

#         with open(os.path.join(base_ckpt_dir, "README.txt"), "w") as f:
#             f.write("RUN SUMMARY\n")
#             f.write(f"run_id: {run_id}\n")
#             f.write(f"timestamp: {timestamp}\n")
#             f.write(f"mean_accuracy: {mean_accuracy:.4f}\n")
#             f.write(f"std_accuracy: {std_accuracy:.4f}\n")
#             f.write(f"best_fold_accuracy: {best_fold:.4f}\n")
#             f.write(f"worst_fold_accuracy: {worst_fold:.4f}\n\n")
#             f.write("FOLD ACCURACIES:\n")
#             for i, acc in enumerate(fold_accuracies, 1):
#                 f.write(f"  fold_{i}: {acc:.4f}\n")
#             f.write("\nHYPERPARAMETERS:\n")
#             for k, v in dict(config).items():
#                 f.write(f"  {k}: {v}\n")

#         final_dir = f"{base_ckpt_dir}_mean{mean_accuracy:.4f}"
#         os.rename(base_ckpt_dir, final_dir)


#         fold_models = [p.replace(base_ckpt_dir, final_dir) for p in fold_models]

#         print(f"Summary statistics saved: {os.path.join(final_dir, 'summary.json')}")

#         summary["fold_models"] = fold_models
#         with open(os.path.join(final_dir, "summary.json"), "w") as f:
#             json.dump(summary, f, indent=2)

#         print(f"\n{'='*70}")
#         print(f"CROSS-VALIDATION RESULTS:")
#         print(f"{'='*70}")
#         print(f"  Mean Accuracy:  {mean_accuracy:.2f}% ± {std_accuracy:.2f}%")
#         print(f"  Best Fold:      {best_fold:.2f}%")
#         print(f"  Worst Fold:     {worst_fold:.2f}%")
#         print(f"  All Folds:      {[f'{acc:.2f}%' for acc in fold_accuracies]}")
#         print(f"{'='*70}\n")

#         wandb.log({
#             "mean_fold_accuracy": mean_accuracy,
#             "std_fold_accuracy": std_accuracy,
#             "best_fold_accuracy": best_fold,
#             "worst_fold_accuracy": worst_fold,
#         })

#         fold_table = wandb.Table(
#             columns=["Fold", "Validation Accuracy", "Model Path"],
#             data=[[i+1, acc, path] for i, (acc, path) in enumerate(zip(fold_accuracies, fold_models))]
#         )
#         wandb.log({"fold_results_table": fold_table})

#         return mean_accuracy, fold_accuracies, fold_models, fold_histories

In [None]:
# Uncomment to start the grid search

# wandb.agent(sweep_id, function=train_with_sweep_config, count=15)

# print("\n" + "="*70)
# print("GRID SEARCH COMPLETED!")
# print("="*70)
# print(f"\nView results of the grid search in the dashboard: https://wandb.ai/alexandraperruchot/bird-classification-sweep/sweeps/{sweep_id}")

### Optimal Parameters

After grid search we came up with the following optimal parameters

In [None]:
OPTIMAL_PARAMS = {
    'learning_rate': 0.0002,
    'weight_decay': 0.001,
    'batch_size': 32,
    'dropout': 0.3,
    'frozen_epochs': 25,
    'finetune_epochs': 10,
    'mlp_depth': 3,
    'mlp_hidden': 1024,
    'mlp_ratio': 0.5,
    'rotation_degrees': 30,
    'color_jitter_strength': 0.1,
    'scheduler_type': 'plateau',
    'plateau_factor': 0.5,
    'plateau_patience_frozen': 2,
    'plateau_patience_finetune': 1,
}


# Setup data transformations with the optimal parameters
train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=OPTIMAL_PARAMS['rotation_degrees']),
    transforms.ColorJitter(
        brightness=OPTIMAL_PARAMS['color_jitter_strength'],
        contrast=OPTIMAL_PARAMS['color_jitter_strength'],
        saturation=OPTIMAL_PARAMS['color_jitter_strength'],
        hue=OPTIMAL_PARAMS['color_jitter_strength'] * 0.5
    ),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_test_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
def train_one_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    # we initialize variables for loss and accuracy
    train_loss = 0.0
    correct = 0
    total = 0

    # progress bar to show training progress
    pbar = tqdm(train_loader, desc="Training")

    for images, labels in pbar:
        # move data to device
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()

        # forward pass
        outputs = model(images)

        # compute loss
        loss = criterion(outputs, labels)
        # backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        # update statistics
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        # update progress bar
        pbar.set_postfix({"Loss": f"{loss.item():.3f}", "Acc": f"{100.*correct/total:.1f}%"})

    # return epoch metrics (optional)
    train_acc = 100.*correct/total
    return train_loss/ len(train_loader), train_acc

In [None]:
def validate_one_epoch(model, val_loader, criterion, device):

    model.eval()

    # initialize variables for loss and accuracy
    val_loss = 0.0
    correct = 0
    total = 0

    # progress bar to show validation progress
    pbar = tqdm(val_loader, desc="Validating")
    with torch.no_grad():
      for images, labels in pbar:
          # move data to device
          images, labels = images.to(device), labels.to(device)

          # forward pass
          outputs = model(images)

          # compute validation loss
          loss = criterion(outputs, labels)

          # update statistics
          val_loss += loss.item()
          _, predicted = outputs.max(1)
          total += labels.size(0)
          correct += predicted.eq(labels).sum().item()

          pbar.set_postfix({"Loss": f"{loss.item():.3f}", "Acc": f"{100.*correct/total:.1f}%"})

    val_acc = 100. * correct / total

    # return validation metrics
    return val_loss/len(val_loader), val_acc

### Cross-validation loop

In [None]:
fold_models = []
fold_accuracies = []
all_histories = []


for fold, (train_idx, val_idx) in enumerate(skf.split(full_meta, full_meta['class_idx']), 1):
    print(f"\n{'='*70}")
    print(f"FOLD {fold}/{8}")
    print(f"{'='*70}")

    # split data
    train_fold_df = full_meta.iloc[train_idx].reset_index(drop=True)
    val_fold_df = full_meta.iloc[val_idx].reset_index(drop=True)

    print(f"Train: {len(train_fold_df)} | Val: {len(val_fold_df)}")

    # create datasets
    train_dataset = BirdsDataset(train_fold_df, images_dir=TRAIN_IMAGES_DIR, transform=train_transform, has_labels=True)
    val_dataset = BirdsDataset(val_fold_df, images_dir=TRAIN_IMAGES_DIR, transform=val_test_transform, has_labels=True)

    # create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=OPTIMAL_PARAMS['batch_size'], shuffle=True, num_workers=NUM_WORKERS)
    val_loader = DataLoader(val_dataset, batch_size=OPTIMAL_PARAMS['batch_size'], shuffle=False, num_workers=NUM_WORKERS)

    # create model with optimal parameters
    model = ConvNeXtBirds(num_classes=NUM_CLASSES, dropout=OPTIMAL_PARAMS['dropout'], pretrained=True, mlp_depth=OPTIMAL_PARAMS['mlp_depth'], mlp_hidden=OPTIMAL_PARAMS['mlp_hidden'], mlp_ratio=OPTIMAL_PARAMS['mlp_ratio'], use_layernorm=True).to(device)

    # ==========FROZEN BACKBONE ==========
    print(f"\nFrozen backbone ({OPTIMAL_PARAMS['frozen_epochs']} epochs)")

    # freeze backbone
    for p in model.backbone.features.parameters():
        p.requires_grad = False
    for p in model.backbone.classifier.parameters():
        p.requires_grad = True

    # optimizer and scheduler
    optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=OPTIMAL_PARAMS['learning_rate'], weight_decay=OPTIMAL_PARAMS['weight_decay'])

    if OPTIMAL_PARAMS['scheduler_type'] == 'plateau':
        scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=OPTIMAL_PARAMS['plateau_factor'], patience=OPTIMAL_PARAMS['plateau_patience_frozen'])
    else:  # cosine
        scheduler = CosineAnnealingLR(optimizer, T_max=OPTIMAL_PARAMS.get('cosine_tmax_frozen', 30), eta_min=OPTIMAL_PARAMS.get('cosine_eta_min', 1e-6))

    criterion = nn.CrossEntropyLoss()
    best_acc = 0.0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

    # Train frozen
    for epoch in range(OPTIMAL_PARAMS['frozen_epochs']):
        print(f"\nEpoch {epoch+1}/{OPTIMAL_PARAMS['frozen_epochs']} (Frozen)")
        train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f"Train: {train_acc:.1f}% (Loss: {train_loss:.3f})")
        print(f"Val: {val_acc:.1f}% (Loss: {val_loss:.3f})")

        # we update scheduler
        if OPTIMAL_PARAMS['scheduler_type'] == 'plateau':
            scheduler.step(val_acc)
        else:
            scheduler.step()

        # save best model
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), f'best_model_fold{fold}.pth')
            print(f"Saved best model: {best_acc:.2f}%")


    # ========== FINETUNE ==========
    if OPTIMAL_PARAMS['finetune_epochs'] > 0:
        print(f"\nFinetuning ({OPTIMAL_PARAMS['finetune_epochs']} epochs)")

        # unfreeze all
        for p in model.parameters():
            p.requires_grad = True

        # mew optimizer with lower LR
        optimizer = AdamW(model.parameters(), lr=OPTIMAL_PARAMS['learning_rate'] * 0.1,weight_decay=OPTIMAL_PARAMS['weight_decay'])
        scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2)

        # train finetune
        for epoch in range(OPTIMAL_PARAMS['finetune_epochs']):
            print(f"\nEpoch {epoch+1}/{OPTIMAL_PARAMS['finetune_epochs']} (Finetune)")

            train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
            val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)

            history['train_loss'].append(train_loss)
            history['train_acc'].append(train_acc)
            history['val_loss'].append(val_loss)
            history['val_acc'].append(val_acc)

            print(f"Train: {train_acc:.1f}% (Loss: {train_loss:.3f})")
            print(f"Val: {val_acc:.1f}% (Loss: {val_loss:.3f})")

            scheduler.step(val_acc)

            if val_acc > best_acc:
                best_acc = val_acc
                torch.save(model.state_dict(), f'best_model_fold{fold}.pth')
                print(f"Saved best model: {best_acc:.2f}%")



    print(f"\n Fold {fold} Best Accuracy: {best_acc:.2f}%")
    fold_models.append(f'best_model_fold{fold}.pth')
    fold_accuracies.append(best_acc)
    all_histories.append(history)


    # clean-up
    del model, train_loader, val_loader
    torch.cuda.empty_cache()


In [None]:
print("\n" + "="*70)
print("TRAINING COMPLETE - CROSS-VALIDATION RESULTS")
print("="*70)

# show accuracy of each fold
for fold_num, acc in enumerate(fold_accuracies, 1):
    print(f"Fold {fold_num}: {acc:.2f}%")

mean_acc = np.mean(fold_accuracies)
std_acc = np.std(fold_accuracies)

print("\n" + "-"*70)
print(f"Mean Fold Accuracy: {mean_acc:.2f}%")
print(f"Standard Deviation: {std_acc:.2f}%")
print("-"*70)

### Testing the model

In [None]:
@torch.no_grad()
def predict_logits_tta(model, images):
    # get predictions from original images
    logits1 = model(images)
    # get predictions from horizontally flipped images
    logits2 = model(torch.flip(images, dims=[3]))
    # return average of both predictions
    return (logits1 + logits2) / 2.0

def predict_test_ensemble_tta_mean_logits(model_paths, test_loader, device, model_fn, num_classes, dropout, use_amp=True):
    all_paths = None
    logits_sum = None
    K = len(model_paths)

    for mp in model_paths:
        print(f"\nLoading model: {mp}")
        # create model architecture (without pretrained weights)
        model = model_fn(
            num_classes=num_classes,
            dropout=dropout,
            pretrained=False,
            mlp_depth=OPTIMAL_PARAMS["mlp_depth"],
            mlp_hidden=OPTIMAL_PARAMS["mlp_hidden"],
            mlp_ratio=OPTIMAL_PARAMS["mlp_ratio"],
            use_layernorm=True,
        ).to(device)

        # load trained weights from this fold
        state = torch.load(mp, map_location=device)
        model.load_state_dict(state)
        model.eval()

        fold_logits = []
        fold_paths = []

        # predict on the test dataset
        for images, paths in tqdm(test_loader, desc=f"Predicting {mp}"):
            images = images.to(device)

            if use_amp and device.type == "cuda":
                with torch.cuda.amp.autocast():
                    logits = predict_logits_tta(model, images)
            else:
                logits = predict_logits_tta(model, images)
            # store predictions and paths
            fold_logits.append(logits.detach().cpu().numpy())
            fold_paths.extend(list(paths))

        fold_logits = np.concatenate(fold_logits, axis=0)

        if all_paths is None:
            all_paths = fold_paths
        else:
            assert fold_paths == all_paths, ("ERROR: order mismatch!")

        # accumulate logits across models
        if logits_sum is None:
            logits_sum = fold_logits
        else:
            logits_sum += fold_logits

        # free memory after this model
        del model
        if device.type == "cuda":
            torch.cuda.empty_cache()

    # compute ensemble predictions
    mean_logits = logits_sum / K

    preds = mean_logits.argmax(axis=1)

    # extract image filenames
    jpg_regex = re.compile(r".+\.jpg$", re.IGNORECASE)
    image_ids = []
    for p in all_paths:
        filename = p.split("/")[-1]
        if jpg_regex.match(filename):
            image_ids.append(filename)
        else:
            raise ValueError(f"error file wrong .jpg: {filename}")
    # return predictions, filenames and mean logits
    return preds, image_ids, mean_logits


print("="*70)
print("GENERATING TEST PREDICTIONS (MEAN LOGITS, NO WEIGHTS, TTA)")
print("="*70)

# create test dataLoader
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

preds, image_ids, mean_logits = predict_test_ensemble_tta_mean_logits(
    model_paths=fold_models,
    test_loader=test_loader,
    device=device,
    model_fn=ConvNeXtBirds,
    num_classes=NUM_CLASSES,
    dropout=OPTIMAL_PARAMS["dropout"],
    use_amp=True
)


df_final = pd.DataFrame({
    "path": image_ids,
    "class_idx": preds,
})

# export predicitons to a .csv file
out_csv = "/content/ML-Project/test_ConvNext_Layers.csv"
df_final.to_csv(out_csv, index=False)

print(f"\n Saved: {out_csv}")
print(df_final.head(10))
print("Total predictions:", len(df_final))
