In [None]:
#Imports
import os
import random
import time
import copy
import gc
import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import models, transforms
from sklearn.model_selection import KFold
import torch.nn.functional as F

#Settings
CONFIG = {
    'seed': 42,
    'k_folds': 5,
    'batch_size': 720,
    'num_epochs': 50,
    'patience': 5,
    'lr_backbone': 4.0e-5,
    'lr_head': 3.0e-4,
    'weight_decay': 1.5e-4,
    'dropout': 0.6,
    'img_size': 224,
    'num_workers': 2
}

#Seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)
    print(f"Seed set as {seed}.")

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

# Set seed
set_seed(CONFIG['seed'])
g = torch.Generator()
g.manual_seed(CONFIG['seed'])

# Setup Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
if torch.cuda.device_count() > 1:
    print(f"Avaliable GPU: {torch.cuda.device_count()}")

In [None]:
#Transformations
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(CONFIG['img_size'], scale=(0.8, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.RandomRotation(15),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ]),
    'val': transforms.Compose([
        transforms.Resize((CONFIG['img_size'], CONFIG['img_size'])),
        transforms.ToTensor(),
        transforms.Normalize(mean, std)
    ]),
}

#Dataset class
class BookCoverDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, class_to_idx=None):
        # Reading CSV
        self.df = pd.read_csv(csv_file, sep=';', encoding='ISO-8859-1', header=0, on_bad_lines='warn')
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(self.df['Category'].unique())

        if class_to_idx is None:
            self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        else:
            self.class_to_idx = class_to_idx

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = str(self.df.iloc[idx]['Filename'])
        img_path = os.path.join(self.root_dir, img_name)

        try:
            image = Image.open(img_path).convert('RGB')
        except (OSError, FileNotFoundError):
            image = Image.new('RGB', (CONFIG['img_size'], CONFIG['img_size']), (0, 0, 0))

        label_str = self.df.iloc[idx]['Category']
        label = self.class_to_idx[label_str]

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
#File Search
base_search_path = '/kaggle/input'

csv_path_train = None
csv_path_test = None
img_dir = None

print("Searching folders")
for root, dirs, files in os.walk(base_search_path):
    if "book30-listing-train.csv" in files:
        csv_path_train = os.path.join(root, "book30-listing-train.csv")
    if "book30-listing-test.csv" in files:
        csv_path_test = os.path.join(root, "book30-listing-test.csv")
    if "224x224" in dirs:
        img_dir = os.path.join(root, "224x224")

if not (csv_path_train and csv_path_test and img_dir):
    raise FileNotFoundError("File not found.")

print(f"Found:\nTrain CSV: {csv_path_train}\nTest CSV: {csv_path_test}\nImg Dir: {img_dir}")

#Creating Dataset
#Dataset Train (used for K-Fold)
full_train_dataset = BookCoverDataset(
    csv_file=csv_path_train,
    root_dir=img_dir,
    transform=data_transforms['train'] # Transform base
)

#Dataset Test
test_dataset = BookCoverDataset(
    csv_file=csv_path_test,
    root_dir=img_dir,
    transform=data_transforms['val'],
    class_to_idx=full_train_dataset.class_to_idx # Same classes as train
)

#DataLoader Test
test_loader = DataLoader(
    test_dataset,
    batch_size=CONFIG['batch_size'],
    shuffle=False,
    num_workers=CONFIG['num_workers'],
    worker_init_fn=seed_worker,
    generator=g
)

print(f" Dataset Train/Val loaded: {len(full_train_dataset)} images.")
print(f" Dataset Test loaded: {len(test_dataset)} images.")

In [None]:

def get_model(num_classes):
    # Download weights
    model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)

    # 1. Freezing Backbone
    for param in model.parameters():
        param.requires_grad = False

    # 2. New head (Classifier)
    num_ftrs = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Linear(num_ftrs, 512),
        nn.ReLU(),
        nn.Dropout(CONFIG['dropout']),
        nn.Linear(512, num_classes)
    )
    return model

In [None]:
#Training Function
def train_epoch_cycle(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, patience):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    best_acc = 0.0
    patience_counter = 0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}', end=' ')

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = val_loader

            running_loss = 0.0
            running_corrects = 0
            samples_processed = 0

            for inputs, labels in dataloader:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    batch_size = inputs.size(0)
                    running_loss += loss.item() * batch_size
                    _, preds = torch.max(outputs, 1)
                    running_corrects += torch.sum(preds == labels.data)
                    samples_processed += batch_size


                epoch_loss = running_loss / samples_processed
                epoch_acc = running_corrects.double() / samples_processed

            if phase == 'val':
                print(f'| Val Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}', end=' ')
                scheduler.step(epoch_loss)

                if epoch_loss < best_loss:
                    best_loss = epoch_loss
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                    patience_counter = 0
                    print("okay!")
                else:
                    patience_counter += 1
                    print(f"Increasing Patience Counter: ({patience_counter}/{patience})")

        if patience_counter >= patience:
            print(f"Early stopping")
            break

    model.load_state_dict(best_model_wts)
    return model, best_loss, best_acc

In [None]:
#Balanced Dataloader
#Save Paths
BEST_MODEL_PATH = "best_kfold_model.pth"
fold_results = []
fold_losses = []

from torch.utils.data import Sampler

class BalancedBatchSampler(Sampler):
    def __init__(self, dataset, n_classes, batch_size):
        self.dataset = dataset
        self.n_classes = n_classes
        self.batch_size = batch_size

        
        # Verify split
        if self.batch_size % self.n_classes != 0:
            raise ValueError(f"CONFIG ERROR: Batch Size ({self.batch_size}) must be dividible by class number ({self.n_classes}).")

        self.n_samples = self.batch_size // self.n_classes
        print(f"⚖️  Sampler: {self.n_samples} images for each {self.n_classes} class (Batch={self.batch_size})")

        # Extracting all labels of the subset
        self.indices = dataset.indices
        self.labels = np.array([dataset.dataset.df.iloc[i]['Category'] for i in self.indices])

        # Mapping Class-> Index list
        self.class_indices = {}
        for global_idx, label in zip(self.indices, self.labels):
            if label not in self.class_indices:
                self.class_indices[label] = []
            self.class_indices[label].append(global_idx)

        self.classes = list(self.class_indices.keys())

        # How many imgs per smallest class
        self.min_samples = min([len(indices) for indices in self.class_indices.values()])
        # How many complete batches can we do?
        self.n_batches = self.min_samples // self.n_samples

    def __iter__(self):
        # Shuffle indeces per class
        for label in self.class_indices:
            np.random.shuffle(self.class_indices[label])

        # Batch generation
        for i in range(self.n_batches):
            batch = []
            for label in self.classes:
                # Take the next n-samples for this class
                start = i * self.n_samples
                end = (i + 1) * self.n_samples
                selected_indices = self.class_indices[label][start:end]
                batch.extend(selected_indices)

            # Batch shuffling (class not ordered)
            np.random.shuffle(batch)
            yield batch

    def __len__(self):
        return self.n_batches



kfold = KFold(n_splits=CONFIG['k_folds'], shuffle=True, random_state=CONFIG['seed'])

# Dataset for Validation
val_dataset_ref = copy.deepcopy(full_train_dataset)
val_dataset_ref.transform = data_transforms['val']

global_best_loss = float('inf')
best_fold_idx = -1
results = {}

print(f" {CONFIG['k_folds']}-Fold Cross Validation with STRATIFIED BATCH SAMPLING")
print(f"Best fold will be saved in: {BEST_MODEL_PATH}")

try:
    for fold, (train_ids, val_ids) in enumerate(kfold.split(full_train_dataset)):
        gc.collect()
        torch.cuda.empty_cache()

        print(f"\n FOLD {fold + 1}/{CONFIG['k_folds']}")

        #Creating Subset
        train_sub = Subset(full_train_dataset, train_ids)
        val_sub = Subset(val_dataset_ref, val_ids)

        #Custom Batch Sampler
        custom_batch_sampler = BalancedBatchSampler(
            train_sub,
            n_classes=len(full_train_dataset.classes),
            batch_size= CONFIG['batch_size']
        )

        #Dataloaders
        train_loader = DataLoader(
            full_train_dataset,
            batch_sampler=custom_batch_sampler,
            num_workers=CONFIG['num_workers'],
            worker_init_fn=seed_worker
        )

        val_loader = DataLoader(
            val_sub,
            batch_size=CONFIG['batch_size'],
            shuffle=False,
            num_workers=CONFIG['num_workers'],
            worker_init_fn=seed_worker,
            generator=g
        )

        #Model initialization
        model = get_model(num_classes=len(full_train_dataset.classes))
        model = model.to(device)

        if torch.cuda.device_count() > 1:
            model = nn.DataParallel(model)
            real_model = model.module
        else:
            real_model = model

        #Setup Optimizer
        for param in real_model.parameters(): param.requires_grad = False
        for param in real_model.layer4.parameters(): param.requires_grad = True
        for param in real_model.fc.parameters(): param.requires_grad = True

        optimizer = optim.Adam([
            {'params': real_model.layer4.parameters(), 'lr': CONFIG['lr_backbone']},
            {'params': real_model.fc.parameters(), 'lr': CONFIG['lr_head']}
        ], weight_decay=CONFIG['weight_decay'])

        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
        criterion = nn.CrossEntropyLoss()

        #Fold training
        best_model_fold, best_loss_fold, best_acc_fold = train_epoch_cycle(
            model, train_loader, val_loader, criterion, optimizer, scheduler,
            CONFIG['num_epochs'], CONFIG['patience']
        )

        #Saving fold
        fold_results.append(best_acc_fold.item())
        fold_losses.append(best_loss_fold)
        fold_path = f"model_fold_{fold + 1}.pth"
        state_dict = real_model.state_dict()
        torch.save(state_dict, fold_path)
        print(f"Fold {fold+1} saved: Loss {best_loss_fold:.4f}, Acc {best_acc_fold:.4f}")

       #Saving best fold
        if best_loss_fold < global_best_loss:
            print(f"New best record! ({best_loss_fold:.4f} < {global_best_loss:.4f})")
            global_best_loss = best_loss_fold

            print(f" Overwrite {BEST_MODEL_PATH}...")
            if isinstance(best_model_fold, nn.DataParallel):
                torch.save(best_model_fold.module.state_dict(), BEST_MODEL_PATH)
            else:
                torch.save(best_model_fold.state_dict(), BEST_MODEL_PATH)
            best_fold_idx = fold + 1

except KeyboardInterrupt:
    print("\nTraining interrupted.")

print("Summary K-Fold Cross Validation")
print("-" * 50)

if fold_results:
    summary_df = pd.DataFrame({
        'Fold': range(1, len(fold_results) + 1),
        'Loss': fold_losses,
        'Accuracy': fold_results
    })
    print(summary_df.to_string(index=False))

    print("-" * 50)
    print(f" Avg. Loss: {np.mean(fold_losses):.4f} (+/- {np.std(fold_losses):.4f})")
    print(f" Avg. Accuracy: {np.mean(fold_results):.4f} (+/- {np.std(fold_results):.4f})")
else:
    print("No data available.")

print("\n" + "="*50)
if best_fold_idx != -1:
    print(f" BEST FOLD: {best_fold_idx} with Loss: {global_best_loss:.4f}")
else:
    print("No model saved!")
print("="*50)

In [None]:

# Ensemble Creation
ensemble_models = []
num_classes = len(full_train_dataset.classes)

print("Fold loading")
for i in range(CONFIG['k_folds']):
    path = f"model_fold_{i+1}.pth"
    if os.path.exists(path):
        model = get_model(num_classes=num_classes).to(device)
        model.load_state_dict(torch.load(path, map_location=device))
        ensemble_models.append(model)
        print(f"Fold {i+1} loaded.")
    else:
        print(f"Attention: {path} not found")

In [None]:
#Ensamble Evaluation
def evaluate_ensemble(models_list, dataloader):
    correct_top1 = 0
    correct_top2 = 0
    correct_top3 = 0
    total = 0

    #set eval mode
    for m in models_list:
        m.eval()

    print(f"Eval of {len(model_list)}")

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Sum ensemble logits
            ensemble_logits = None

            for model in models_list:
                outputs = model(inputs)
                #Softmax
                probs = F.softmax(outputs, dim=1)

                if ensemble_logits is None:
                    ensemble_logits = probs
                else:
                    ensemble_logits += prob

            avg_probs = ensemble_logits / len(models_list)

            #Top-K
            _, max_k_preds = torch.topk(avg_probs, k=3, dim=1)
            max_k_preds = max_k_preds.t()
            target_expanded = labels.view(1, -1).expand_as(max_k_preds)
            correct = max_k_preds.eq(target_expanded)

            correct_top1 += correct[:1].reshape(-1).float().sum(0, keepdim=True)
            correct_top2 += correct[:2].reshape(-1).float().sum(0, keepdim=True)
            correct_top3 += correct[:3].reshape(-1).float().sum(0, keepdim=True)
            total += labels.size(0)

    print(f"\\n ENSEMBLE RESULTS ({len(models_list)} Folds):")
    print("-" * 40)
    print(f" Top-1 Accuracy: {correct_top1.item()/total*100:.2f}%")
    print(f" Top-2 Accuracy: {correct_top2.item()/total*100:.2f}%")
    print(f" Top-3 Accuracy: {correct_top3.item()/total*100:.2f}%")
    print("-" * 40)


evaluate_ensemble(ensemble_models, test_loader)

In [None]:
# Examples with Top-3 
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
import random

#Denormalize image
def denormalize(tensor):
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img = tensor.cpu().numpy().transpose((1, 2, 0)) # C,H,W -> H,W,C
    img = std * img + mean
    img = np.clip(img, 0, 1)
    return img

#Setup
#set model in eval mode
for m in ensemble_models:
    m.eval()

class_names = full_train_dataset.classes
num_samples = 5
indices = random.sample(range(len(test_dataset)), num_samples)

#Plot
fig, axes = plt.subplots(1, num_samples, figsize=(18, 6))
if num_samples == 1: axes = [axes]

print(f"Visualizing {num_samples} examples")

with torch.no_grad():
    for i, idx in enumerate(indices):
        image, label = test_dataset[idx]
        ax = axes[i]

        #Ensemble Inference
        input_tensor = image.unsqueeze(0).to(device)
        ensemble_probs = None

        for model in ensemble_models:
            output = model(input_tensor)
            probs = F.softmax(output, dim=1)

            if ensemble_probs is None:
                ensemble_probs = probs
            else:
                ensemble_probs += probs

        # Average of all models probability
        avg_probs = ensemble_probs / len(ensemble_models)

        # Top-3 ensemble average
        top_probs, top_idxs = torch.topk(avg_probs, 3)
        top_probs = top_probs.cpu().numpy()[0]
        top_idxs = top_idxs.cpu().numpy()[0]

        #Visualization
        img_show = denormalize(image)
        ax.imshow(img_show)
        ax.axis('off')

        true_name = class_names[label]
        pred_name = class_names[top_idxs[0]]
        color = 'green' if label == top_idxs[0] else 'red'

        ax.set_title(f"True: {true_name}\nEnsemble Pred: {pred_name}", color=color, fontweight='bold', fontsize=9)

        # Average prob text
        text_str = "\n".join([f"{class_names[idx]}: {prob*100:.1f}%" for idx, prob in zip(top_idxs, top_probs)])
        ax.text(0.5, -0.15, text_str, transform=ax.transAxes, ha='center', va='top',
                bbox=dict(boxstyle='round', facecolor='white', alpha=0.8), fontsize=8)

plt.tight_layout()
plt.show()

In [None]:
#Grad-CAM Ensemble (Avg Heatmap)
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import random

def compute_ensemble_gradcam(models_list, img_tensor):
    """GradCam of Ensemble of {len(ensemble_models)} models."""
    combined_heatmap = None

    for model in models_list:
        model.eval()
        activations = {}
        gradients = {}

        def get_activations(name):
            return lambda m, i, o: activations.update({name: o.detach()})
        def get_gradients(name):
            return lambda m, gi, go: gradients.update({name: go[0].detach()})

        # Use last layer of model
        target_layer = model.layer4[-1]
        h_fwd = target_layer.register_forward_hook(get_activations('feat'))
        h_bwd = target_layer.register_full_backward_hook(get_gradients('feat'))

        try:
            input_t = img_tensor.unsqueeze(0).to(device)
            input_t.requires_grad = True
            output = model(input_t)
            pred_idx = output.argmax(dim=1).item()

            model.zero_grad()
            output[0, pred_idx].backward()

            acts = activations['feat'][0]
            grads = gradients['feat'][0]
            weights = torch.mean(grads, dim=(1, 2))

            cam = torch.zeros(acts.shape[1:], dtype=torch.float32).to(device)
            for i, w in enumerate(weights):
                cam += w * acts[i]

            cam = F.relu(cam)
            cam = F.interpolate(cam.unsqueeze(0).unsqueeze(0), size=(224, 224), mode='bilinear', align_corners=False)
            cam = cam.squeeze().cpu().detach().numpy()

            
            cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam) + 1e-8)

            if combined_heatmap is None:
                combined_heatmap = cam
            else:
                combined_heatmap += cam
        finally:
            h_fwd.remove()
            h_bwd.remove()

    return combined_heatmap / len(models_list)

#Plot
print(f"Average GradCam of Ensemble of {len(ensemble_models)} models")
idxs = random.sample(range(len(test_dataset)), 3)

for idx in idxs:
    img, label = test_dataset[idx]
    avg_heatmap = compute_ensemble_gradcam(ensemble_models, img)

    # Inference
    with torch.no_grad():
        ens_logits = None
        for m in ensemble_models:
            out = m(img.unsqueeze(0).to(device))
            p = F.softmax(out, dim=1)
            ens_logits = p if ens_logits is None else ens_logits + p
        pred = (ens_logits / len(ensemble_models)).argmax(dim=1).item()

    img_show = denormalize(img)
    plt.figure(figsize=(8, 4))

    plt.subplot(1, 2, 1)
    plt.imshow(img_show)
    plt.title(f"Original\nTrue: {class_names[label]}")
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(img_show)
    plt.imshow(avg_heatmap, cmap='jet', alpha=0.5)
    plt.title(f"Ensemble Grad-CAM\nPred: {class_names[pred]}")
    plt.axis('off')
    plt.show()

In [None]:
#Confusion Matrix Ensemble
from sklearn.metrics import confusion_matrix
import seaborn as sns
import torch
import torch.nn.functional as F

print(f"Confusion matrix Ensemble of {len(ensemble_models)} models")

y_true = []
y_pred = []

for m in ensemble_models:
    m.eval()

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)

        # Ensemble
        ensemble_probs = None

        for model in ensemble_models:
            outputs = model(inputs)
            probs = F.softmax(outputs, dim=1)

            if ensemble_probs is None:
                ensemble_probs = probs
            else:
                ensemble_probs += probs

        # Average probabilities
        avg_probs = ensemble_probs / len(ensemble_models)
        _, preds = torch.max(avg_probs, 1)

        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

# Matrix Generation
cm = confusion_matrix(y_true, y_pred)

plt.figure(figsize=(20, 16))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names)

plt.xlabel('Predicted (Ensemble)', fontsize=12)
plt.ylabel('Real', fontsize=12)
plt.title(f'Confusion Matrix - Ensemble of {len(ensemble_models)} Models', fontsize=15)
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.show()