In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet50
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from tqdm import tqdm
import os
from PIL import Image
import cv2
from torch.utils.data import Dataset, DataLoader, random_split
import random

class Conf:
    train_dir = '/kaggle/input/imagenet-object-localization-challenge/ILSVRC/Data/CLS-LOC/train'
    val_dir = '/kaggle/input/imagenet-object-localization-challenge/ILSVRC/Data/CLS-LOC/val'
    batch_size = 64
    num_workers = 4
    num_epochs = 100
    learning_rate = 1e-3
    max_lr = 1e-2
    image_size = 224
    num_classes = 1000

In [None]:
# Albumentations transformations
train_transforms = A.Compose([
    A.RandomResizedCrop(height=Conf.image_size, width=Conf.image_size),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
    A.CoarseDropout(max_holes=8, max_height=8, max_width=8, fill_value=0, p=0.2),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

val_transforms = A.Compose([
    A.Resize(Conf.image_size, Conf.image_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])


In [None]:
class ImageDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        subdirs = [os.path.join(image_dir, d) for d in os.listdir(image_dir)
                  if os.path.isdir(os.path.join(image_dir, d))]
        self.class_to_idx = {os.path.basename(d): idx for idx, d in enumerate(sorted(subdirs))}
        
        for d in subdirs:
            class_idx = self.class_to_idx[os.path.basename(d)]
            for fname in os.listdir(d):
                if fname.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.image_paths.append(os.path.join(d, fname))
                    self.labels.append(class_idx)
                    
        print(f"Loaded {len(self.image_paths)} images from {image_dir}")
        print(f"Number of unique labels: {len(set(self.labels))}")

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        return image, label

# Create datasets and dataloaders
def create_data_loaders(train_dir, batch_size, num_workers, train_transforms, val_transforms):
    # Create full dataset
    full_dataset = ImageDataset(train_dir, transform=train_transforms)
    
    # Calculate splits
    total_size = len(full_dataset)
    train_size = int(0.7 * total_size)
    val_size = total_size - train_size
    
    # Split dataset
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
    
    # Override transforms
    train_dataset.dataset.transform = train_transforms
    val_dataset.dataset.transform = val_transforms
    
    # Create dataloaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    
    return train_loader, val_loader

In [None]:
def train_one_epoch(model, train_loader, criterion, optimizer, scheduler, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{Conf.num_epochs}')
    for batch_idx, (images, labels) in enumerate(pbar):
        images, labels = images.cuda(), labels.cuda()

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()
        scheduler.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        pbar.set_postfix({
            'loss': f'{running_loss/(batch_idx+1):.3f}',
            'acc': f'{100.*correct/total:.2f}%',
            'lr': f'{scheduler.get_last_lr()[0]:.6f}'
        })

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc,model

@torch.no_grad()
def validate(model, val_loader, criterion, epoch):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(val_loader, desc='Validation')
    for batch_idx, (images, labels) in enumerate(pbar):
        images, labels = images.cuda(), labels.cuda()

        outputs = model(images)
        loss = criterion(outputs, labels)

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        pbar.set_postfix({
            'loss': f'{running_loss/(batch_idx+1):.3f}',
            'acc': f'{100.*correct/total:.2f}%'
        })

    val_loss = running_loss / len(val_loader)
    val_acc = 100. * correct / total
    return val_loss, val_acc,model

In [None]:
def main():
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    train_loader, val_loader = create_data_loaders(
        train_dir=Conf.train_dir,
        batch_size=Conf.batch_size,
        num_workers=Conf.num_workers,
        train_transforms=train_transforms,
        val_transforms=val_transforms
    )

    # Initialize model
    print("\nInitializing Model...")
    model = resnet50(weights=None, num_classes=Conf.num_classes)
    model = model.to(device)

    # Initialize weights
    def init_weights(m):
        if isinstance(m, nn.Conv2d):
            nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        elif isinstance(m, nn.BatchNorm2d):
            nn.init.ones_(m.weight)
            nn.init.zeros_(m.bias)
        elif isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            nn.init.zeros_(m.bias)

    model.apply(init_weights)
    print("Initialized model weights using Xavier/Glorot initialization")

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=Conf.learning_rate)

    total_steps = len(train_loader) * Conf.num_epochs
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=Conf.max_lr,
        total_steps=total_steps,
        pct_start=0.3,
        anneal_strategy='cos'
    )

    # Training loop
    print("\nStarting Training...")
    best_val_acc = 0

    for epoch in range(Conf.num_epochs):
        train_loss, train_acc,model = train_one_epoch(
            model, train_loader, criterion, optimizer, scheduler, epoch
        )

        val_loss, val_acc,model = validate(model, val_loader, criterion, epoch)

        print(f'\nEpoch {epoch+1}/{Conf.num_epochs} Summary:')
        print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'best_val_acc': best_val_acc,
            }, 'best_resnet50.pth')
            print(f"Saved new best model with validation accuracy: {val_acc:.2f}%")

        print('-' * 60)

if __name__ == "__main__":
    main()