# Organizing train/val data

In [1]:
import os
import shutil
import random

# Set paths
source_dir = 'cat_dog_data/images'
target_dir = 'cat_dog_data/organized'
os.makedirs(target_dir, exist_ok=True)

# Create train and val directories for each class
for split in ['train', 'val']:
    os.makedirs(os.path.join(target_dir, split, 'cat'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, split, 'dog'), exist_ok=True)

# Define split ratio
train_ratio = 0.8  # 80% for training, 20% for validation

# Gather and split images
cat_images = [img for img in os.listdir(source_dir) if img[0].isupper()]
dog_images = [img for img in os.listdir(source_dir) if img[0].islower()]

# Split images into training and validation
for images, label in [(cat_images, 'cat'), (dog_images, 'dog')]:
    random.shuffle(images)
    train_count = int(train_ratio * len(images))
    train_images = images[:train_count]
    val_images = images[train_count:]
    
    # Move files
    for img in train_images:
        shutil.move(os.path.join(source_dir, img), os.path.join(target_dir, 'train', label, img))
    for img in val_images:
        shutil.move(os.path.join(source_dir, img), os.path.join(target_dir, 'val', label, img))

print("Dataset organized successfully.")


Dataset organized successfully.


In [2]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision import datasets, models
from torch.utils.data import DataLoader
from functools import partial
import time
from tqdm import tqdm


import argparse



import sys
sys.path.append('dinov2')

from dinov2.eval.linear import create_linear_input
from dinov2.eval.linear import LinearClassifier
from dinov2.eval.utils import ModelWithIntermediateLayers

parser = argparse.ArgumentParser(description='Fine-tune DINOv2 on ImageNet100')
parser.add_argument('--arch', '-a', metavar='ARCH', default='dinov2_vitb14', choices=['dinov2_vitb14', 'dinov2_vitl14'],
                    help='')
parser.add_argument('--batch-size', '-b', default=128, type=int, metavar='N',
                    help='mini-batch size (default: 128)')
parser.add_argument('--log-dir', default='./', type=str, metavar='PATH',
                    help='path to directory where to log (default: current directory)')
parser.add_argument('--data-dir', required=True, type=str, metavar='PATH',
                    help='path to the dataset')


class Args:
    arch = 'dinov2_vitb14'
    batch_size = 128
    log_dir = './'
    data_dir = 'cat_dog_data/organized'  # Change to your dataset path

args = Args()
print(args)

if not os.path.exists(args.log_dir):
    os.makedirs(args.log_dir)




A matching Triton is not available, some optimizations will not be enabled.
Error caught was: No module named 'triton'


<__main__.Args object at 0x0000020F7C5F0CD0>




In [None]:
import torch
from torch import nn, optim
from torchvision import datasets, transforms
from tqdm import tqdm
import time
import os
from functools import partial
from dinov2.models import ModelWithIntermediateLayers  # Ensure the DINOv2 model setup is correct

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# Modified Dino class to include heatmap functionality
class Dino(nn.Module):
    def __init__(self, type, num_classes=100):
        super().__init__()
        # Load the feature model
        model = torch.hub.load(
            "facebookresearch/dinov2", type, pretrained=True
        ).to(device)

        # Use autocast for mixed precision
        autocast_ctx = partial(
            torch.cuda.amp.autocast, enabled=True, dtype=torch.float16
        )
        self.feature_model = ModelWithIntermediateLayers(
            model, n_last_blocks=1, autocast_ctx=autocast_ctx, return_all_patches=True
        ).to(device)

        with torch.no_grad():
            # Test a sample input to get feature shapes
            sample_input = torch.randn(1, 3, 224, 224).to(device)
            sample_output = self.feature_model(sample_input)
            patch_features = sample_output["patch_tokens"]
            print(f"Patch feature shape: {patch_features.shape}")

        # Create a linear classifier for patch features
        out_dim = patch_features.shape[-1]
        self.classifier = nn.Linear(out_dim, num_classes).to(device)

    def forward(self, x, target_class=None):
        features = self.feature_model(x)
        patch_tokens = features["patch_tokens"]  # Extract patch features (B, N, D)

        # Classify each patch
        patch_scores = self.classifier(patch_tokens)  # (B, N, num_classes)

        if target_class is not None:
            # Generate heatmap for the target class
            heatmap = patch_scores.softmax(dim=-1)[..., target_class]  # Class probability map (B, N)
            return patch_scores, heatmap
        else:
            # Aggregate patch scores for classification
            aggregated_scores = patch_scores.mean(dim=1)  # Average over patches (B, num_classes)
            return aggregated_scores


# Define transforms for the training and validation datasets
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Create datasets and dataloaders
image_datasets = {x: datasets.ImageFolder(os.path.join(args.data_dir, x), data_transforms[x])
                  for x in ['train', 'val']}
dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=32,
                                                   shuffle=True, num_workers=4)
                    for x in ['train', 'val']}

# Initialize model
model = Dino(args.arch, num_classes=100).to(device)

# Freeze feature model parameters
for param in model.feature_model.parameters():
    param.requires_grad = False

# Train only the classifier
for param in model.classifier.parameters():
    param.requires_grad = True

# Define loss function, optimizer, and learning rate scheduler
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.classifier.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Training loop
num_epochs = 25
best_acc = 0.0
for epoch in range(num_epochs):
    print(f'Epoch {epoch}/{num_epochs - 1}')
    print('-' * 10)

    for phase in ['train', 'val']:
        start_time = time.time()

        if phase == 'train':
            model.feature_model.eval()  # Freeze feature model
            model.classifier.train()
        else:
            model.feature_model.eval()
            model.classifier.eval()

        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in tqdm(dataloaders_dict[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            with torch.no_grad():
                # Extract features without gradients
                features = model.feature_model(inputs)["patch_tokens"]

            with torch.set_grad_enabled(phase == 'train'):
                # Forward pass through the classifier
                outputs = model.classifier(features.mean(dim=1))  # Aggregate patch scores
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(image_datasets[phase])
        epoch_acc = running_corrects.double() / len(image_datasets[phase])

        time_elapsed = time.time() - start_time
        print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Time: {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')

        # Save the best model
        if phase == 'val' and epoch_acc > best_acc:
            best_acc = epoch_acc
            torch.save(model.state_dict(), os.path.join(args.log_dir, 'best_model.pth'))

    print()

print('Training complete')
