In [None]:
# Cell 1: Imports
import os
import math
import random
import inspect

import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import torchvision.transforms as transforms


Collecting torchview
  Downloading torchview-0.2.7-py3-none-any.whl.metadata (13 kB)
Downloading torchview-0.2.7-py3-none-any.whl (26 kB)
Installing collected packages: torchview
Successfully installed torchview-0.2.7
cuda


In [None]:
# Cell 2: Hyperparameters
BATCH_SIZE     = 32
LEARNING_RATE  = 0.001
EPOCHS         = 100
NUM_CLASSES    = 7
BETAS          = (0.5, 0.999)


In [None]:
# Cell 3: Paths
dataset_path   = "/kaggle/input/sports-image-classification/dataset"
train_csv_path = os.path.join(dataset_path, 'train.csv')
test_csv_path  = os.path.join(dataset_path, 'test.csv')


torch.Size([1, 256, 56, 56])


In [None]:
# Cell 4: Dataset class
class SportsDataset(Dataset):
    def __init__(self, csv_file, file_path, split='train', transform=None):
        self.data_info = pd.read_csv(csv_file)
        self.root_dir  = os.path.join(file_path, split)
        self.transform = transform
        self.split     = split

        if split == 'train':
            labels = self.data_info.iloc[:,1]
            classes = sorted(labels.unique())
            self.class_to_idx = {c:i for i,c in enumerate(classes)}
            print(f"Classes: {self.class_to_idx}")

    def __len__(self):
        return len(self.data_info)

    def __getitem__(self, idx):
        fname = self.data_info.iloc[idx,0]
        img   = Image.open(os.path.join(self.root_dir, fname)).convert('RGB')
        if self.transform:
            img = self.transform(img)

        if self.split == 'train':
            label_str = self.data_info.iloc[idx,1]
            return img, torch.tensor(self.class_to_idx[label_str], dtype=torch.long)
        else:
            return img


torch.Size([1, 1000])


In [None]:
# Cell 5: Transforms
data_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],
                         std=[0.229,0.224,0.225]),
])


In [None]:
# Cell 6: Simple conv‐ReLU block
class Block(nn.Module):
    def __init__(self, in_ch, out_ch, act='relu', drop=False, norm=True):
        super().__init__()
        layers = [nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False)]
        if norm:  layers.append(nn.BatchNorm2d(out_ch))
        if act:   layers.append(nn.ReLU(inplace=True))
        if drop:  layers.append(nn.Dropout(0.5))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


In [None]:
# Cell 7: Bottleneck residual unit
class Bottleneck(nn.Module):
    def __init__(self, in_ch, mid_ch, exp, is_bottle, stride):
        super().__init__()
        self.expansion = exp
        self.residual = nn.Sequential(
            nn.Conv2d(in_ch, mid_ch, 1, bias=False),
            nn.BatchNorm2d(mid_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_ch, mid_ch, 3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(mid_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_ch, mid_ch*exp, 1, bias=False),
            nn.BatchNorm2d(mid_ch*exp),
        )
        if is_bottle or in_ch != mid_ch*exp:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_ch, mid_ch*exp, 1, stride=stride, bias=False),
                nn.BatchNorm2d(mid_ch*exp)
            )
        else:
            self.shortcut = nn.Identity()

    def forward(self, x):
        out = self.residual(x)
        out += self.shortcut(x)
        return F.relu(out)


In [None]:
# Cell 8: ResNet50 definition
class ResNet50(nn.Module):
    def __init__(self, num_classes=NUM_CLASSES):
        super().__init__()
        self.conv1   = nn.Conv2d(3,64,7,2,3,bias=False)
        self.bn1     = nn.BatchNorm2d(64)
        self.relu    = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(3,2,1)

        self.layer1 = self._make_layer(64,  64, blocks=3, stride=1, exp=4)
        self.layer2 = self._make_layer(256,128, blocks=4, stride=2, exp=4)
        self.layer3 = self._make_layer(512,256, blocks=6, stride=2, exp=4)
        self.layer4 = self._make_layer(1024,512,blocks=3, stride=2, exp=4)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc      = nn.Linear(512*4, num_classes)

    def _make_layer(self, in_ch, out_ch, blocks, stride, exp):
        layers = [Bottleneck(in_ch, out_ch, exp, True, stride)]
        for _ in range(1, blocks):
            layers.append(Bottleneck(out_ch*exp, out_ch, exp, False, 1))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)
        x = self.layer1(x); x = self.layer2(x)
        x = self.layer3(x); x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x,1)
        return self.fc(x)


In [None]:
# Cell 9: Weight initialization helpers
def xavier_init(m):
    if isinstance(m, (nn.Linear, nn.Conv2d)):
        nn.init.xavier_uniform_(m.weight)
        if m.bias is not None: nn.init.constant_(m.bias, 0)

def init_weights(model, init_type='xavier'):
    if init_type=='xavier':
        model.apply(xavier_init)
    # else if you want kaiming, define and call it here


In [None]:
# Cell 10: Early stopping
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta    = delta
        self.best     = None
        self.counter  = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best is None or val_loss < self.best - self.delta:
            self.best    = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True


In [None]:
# Cell 11: Initialization routines
def initialize_model(model_class, num_classes=NUM_CLASSES, weight_init='xavier'):
    model = model_class(num_classes=num_classes)
    init_weights(model, init_type=weight_init)
    return model

def initialize_optimizer(model, optimizer='adam', lr=LEARNING_RATE, betas=BETAS, wd=1e-4):
    opts = {
        'sgd':   lambda: optim.SGD(model.parameters(), lr=lr, momentum=betas[0], weight_decay=wd),
        'adam':  lambda: optim.Adam(model.parameters(), lr=lr, betas=betas, weight_decay=wd),
        'adamw': lambda: optim.AdamW(model.parameters(), lr=lr, betas=betas, weight_decay=wd),
    }
    return opts[optimizer]()

def initialize_loss_function(loss_fn='crossentropy'):
    if loss_fn=='crossentropy':
        return nn.CrossEntropyLoss()
    elif loss_fn.lower().startswith('kl'):
        return nn.KLDivLoss()

def initialize_lr_scheduler(opt, scheduler_type='step', step_size=10, gamma=0.1):
    if scheduler_type=='step':
        return optim.lr_scheduler.StepLR(opt, step_size=step_size, gamma=gamma)
    elif scheduler_type=='cosine':
        return optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS, eta_min=0)

def initialize_tensorboard_writer(log_dir='logs', run_name='run'):
    os.makedirs(log_dir, exist_ok=True)
    return SummaryWriter(log_dir=os.path.join(log_dir, run_name))

def initialize_early_stopping(patience=5, delta=0):
    return EarlyStopping(patience=patience, delta=delta)

def initialize_data_loaders(train_csv, test_csv, path, tfms):
    train_ds = SportsDataset(train_csv, path, 'train', tfms)
    val_ds   = SportsDataset(test_csv,  path, 'test',  tfms)
    return (DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True),
            DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False))

def initialize_all(run_name='resnet_run'):
    tl, vl = initialize_data_loaders(train_csv_path, test_csv_path, dataset_path, data_transforms)
    model   = initialize_model(ResNet50)
    opt     = initialize_optimizer(model, 'adam')
    crit    = initialize_loss_function('crossentropy')
    sched   = initialize_lr_scheduler(opt, scheduler_type='cosine')
    writer  = initialize_tensorboard_writer('logs', run_name)
    es      = initialize_early_stopping()
    return model, opt, crit, sched, writer, es, tl, vl


In [None]:
# Cell 12: Training loop
def training_loop(model, optimizer, criterion, train_loader, device, writer, num_epochs=EPOCHS):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total   = 0
        for imgs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            out = model(imgs)
            loss = criterion(out, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * imgs.size(0)
            _, preds = out.max(1)
            correct += (preds == labels).sum().item()
            total   += labels.size(0)

        epoch_loss = running_loss / total
        epoch_acc  = correct / total
        print(f"Epoch {epoch+1}: Loss={epoch_loss:.4f}, Acc={epoch_acc:.4f}")
        writer.add_scalar('Loss/train', epoch_loss, epoch)
        writer.add_scalar('Acc/train',  epoch_acc,  epoch)


In [None]:
# Cell 13: Kick off training
model, optimizer, criterion, scheduler, writer, early_stopping, train_loader, val_loader = initialize_all('ResNet50_run')
training_loop(model, optimizer, criterion, train_loader, device='cuda', writer=writer)
