# Install libraries required

In [None]:
!pip install 'git+https://github.com/apple/ml-aim.git#subdirectory=aim-v1'
!pip install 'git+https://github.com/apple/ml-aim.git#subdirectory=aim-v2'

# Import Necessary Library functions

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from tqdm import tqdm
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.utils import class_weight
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, auc, recall_score, f1_score, balanced_accuracy_score
from sklearn.preprocessing import StandardScaler, LabelBinarizer
import json
from pathlib import Path
import os
from aim.v2.utils import load_pretrained
from aim.v1.torch.data import val_transforms
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns

In [6]:
# Paths
train_path = '/kaggle/input/capsule-vision/archive/Dataset/training'
val_path = '/kaggle/input/capsule-vision/archive/Dataset/validation'
test_path = '/kaggle/input/capvis-test/Test set with seperated folders of each class label'

random_seed = np.random.seed(1142)
# Hyperparameters
batch_size = 16
epochs = 10
learning_rate = 1e-6
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Image transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),  # Resize images to match model input
        transforms.RandomResizedCrop(224),  # Random resized crop for training
        transforms.RandomHorizontalFlip(),  # Random horizontal flip for augmentation
        transforms.ToTensor(),  # Convert to PyTorch tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize to ImageNet stats
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),  # Resize images to match model input
        transforms.CenterCrop(224),  # Center crop for testing
        transforms.ToTensor(),  # Convert to PyTorch tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize to ImageNet stats
    ]),
}

# Data loaders with the new transformations
train_dataset = ImageFolder(root=train_path, transform=data_transforms['train'])
val_dataset = ImageFolder(root=val_path, transform=data_transforms['test'])
test_dataset = ImageFolder(root=test_path, transform=data_transforms['test'])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Load and initialize DinoV2 model

In [None]:
# Load DINO model
dinov2_vits14 = torch.hub.load('facebookresearch/dinov2', 'dinov2_vits14')

# Define the custom model
class DinoVisionTransformerClassifier(nn.Module):
    def __init__(self):
        super(DinoVisionTransformerClassifier, self).__init__()
        self.transformer = dinov2_vits14
        self.classifier = nn.Sequential(
            nn.Linear(384, 256),
            nn.ReLU(),
            nn.Linear(256, 10)  # Change to the number of classes in your dataset
        )
    
    def forward(self, x):
        x = self.transformer(x)  # Extract features
        x = self.transformer.norm(x)  # Normalize the features
        x = self.classifier(x)  # Classify
        return x

# Initialize model, loss, and optimizer
dino_model = DinoVisionTransformerClassifier()
dino_model = dino_model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(dino_model.parameters(), lr=learning_rate)

checkpoint = torch.load('/kaggle/input/dinov2_capvis/pytorch/default/1/dino_vit_classifier_capsule.pth')
dino_model.load_state_dict(checkpoint)  # Load weights into the model

# Load and initialize Hiera model

In [None]:
# Load Hiera model
hiera_base = torch.hub.load("facebookresearch/hiera", model="hiera_tiny_224", pretrained=True, checkpoint="mae_in1k_ft_in1k")

# Define the custom model
class HieraVisionTransformerClassifier(nn.Module):
    def __init__(self):
        super(HieraVisionTransformerClassifier, self).__init__()
        self.transformer = hiera_base
        self.layer_norm = nn.LayerNorm(1000)  # Normalize the 1000 features output from Hiera model
        self.classifier = nn.Sequential(
            nn.Linear(1000, 256),
            nn.ReLU(),
            nn.Linear(256, 10)  # Change to the number of classes in your dataset
        )
    
    def forward(self, x):
        x = self.transformer(x)  # Extract features
        x = self.layer_norm(x)  # Normalize the features
        x = self.classifier(x)
        return x

# Initialize model, loss, and optimizer
hiera_model = HieraVisionTransformerClassifier()
hiera_model = hiera_model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(hiera_model.parameters(), lr=learning_rate)

checkpoint = torch.load('/kaggle/input/hierav3/pytorch/default/1/Hiera_model_3.pth')
model.load_state_dict(checkpoint)  # Load weights into the model

# Load and initialize AimV2 model

In [25]:
# Load AimV2 model
class AIMv2VisionTransformerClassifier(nn.Module):
    def __init__(self, model_name="aimv2-large-patch14-224", img_size=224, num_classes=10):
        super(AIMv2VisionTransformerClassifier, self).__init__()
        
        # Load AIMv2 pretrained model
        self.transformer = load_pretrained(model_name, backend="torch")
        self.pooling = nn.AdaptiveAvgPool1d(1)  # To perform global average pooling
        # Define a classifier on top of AIMv2 features
        self.classifier = nn.Sequential(
            nn.Linear(1024, 256),  # Use AIMv2's feature dimension
            nn.ReLU(),
            nn.Linear(256, num_classes)  # Adjust for the number of dataset classes
        )
    
    def forward(self, x):
        features = self.transformer(x)  # Extract features from AIMv2
        features = features.permute(0, 2, 1)
        # Use adaptive average pooling to reduce patch dimension
        features = self.pooling(features).squeeze(-1)
        output = self.classifier(features)
        return output

model_name = "aimv2-large-patch14-224"
aim_model = AIMv2VisionTransformerClassifier(model_name=model_name)

# Move the model to the device (e.g., GPU or CPU)
aim_model = aim_model.to(device)

# Initialize loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(aim_model.parameters(), lr=learning_rate)

checkpoint = torch.load('/kaggle/input/aimv2/pytorch/default/1/aimv2_model_3.pth')
aim_model.load_state_dict(checkpoint)

model.safetensors:   0%|          | 0.00/1.24G [00:00<?, ?B/s]

In [8]:
class FocalLoss(nn.Module):
    # Focal loss function to address class imbalance by focusing on hard-to-classify examples.
    def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha  # Weighting factor for class imbalance
        self.gamma = gamma  # Focusing parameter
        self.reduction = reduction  # How to reduce the final loss

    def forward(self, inputs, targets):
        # Apply softmax to get probabilities for each class
        probs = F.softmax(inputs, dim=1)        
        # Get the probability of the true class
        p_t = probs.gather(1, targets.view(-1, 1))  # Shape (N, 1)        
        # Compute the focal loss part (1 - p_t)^gamma
        loss = -self.alpha * (1 - p_t) ** self.gamma * torch.log(p_t)       
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:  # 'none'
            return loss

# Train and test model

In [14]:
# Helper functions to train, test and validate model

# Training loop with running loss
def train_model(epoch, model):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    # Using tqdm to show progress
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs} - Training", leave=True)
    for batch_idx, (images, labels) in enumerate(progress_bar):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        # Update progress bar description
        progress_bar.set_postfix(
            loss=running_loss / (batch_idx + 1),
            accuracy=100. * correct / total
        )

# Validation loop with running loss
def validate_model(epoch, model):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    # Using tqdm to show progress
    progress_bar = tqdm(val_loader, desc=f"Epoch {epoch + 1}/{epochs} - Validation", leave=True)
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(progress_bar):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            # Update progress bar description
            progress_bar.set_postfix(
                loss=running_loss / (batch_idx + 1),
                accuracy=100. * correct / total
            )

# Testing loop with accuracy
def test_model(model):
    model.eval()
    correct = 0
    total = 0
    all_probabilities = []  # To store all probabilities
    all_predictions = []    # To store all predicted classes
    all_labels = []         # To store all true labels for reference

    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(tqdm(val_loader, desc="Testing", leave=True)):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            
            # Calculate probabilities
            probabilities = F.softmax(outputs, dim=1)
            
            # Get the predicted classes
            _, predicted = outputs.max(1)
            
            # Update statistics
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            # Store probabilities, predictions, and labels
            all_probabilities.append(probabilities.cpu())
            all_predictions.append(predicted.cpu())
            all_labels.append(labels.cpu())

    # Concatenate all batches for a single tensor
    all_probabilities = torch.cat(all_probabilities)
    all_predictions = torch.cat(all_predictions)
    all_labels = torch.cat(all_labels)

    print(f"Test Accuracy: {100. * correct / total:.2f}%")
    
    return all_probabilities, all_predictions, all_labels

In [None]:
# Training loop
epochs = 3
for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")
    train_model(epoch, aim_model)  # Train model for one epoch
    validate_model(epoch, aim_model)  # Validate model after training
    print("-" * 50)

In [None]:
#Save model after training
torch.save(aim_model.state_dict(), 'aimv2_model_5.pth')

In [None]:
# Testing the model
prob, pred, labels = test_model(aim_model)

In [10]:
# Functions for evaluation (from organizers)
def save_predictions_to_excel(image_paths, y_pred, output_path):
    """
    Saves predictions along with their probabilities and image paths to an Excel file.

    Parameters:
    - image_paths: List of image file paths.
    - y_pred: Array of predicted class probabilities (shape: [n_samples, n_classes]).
    - output_path: File path for saving the Excel file.
    """
    
    class_columns = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
    y_pred_classes = np.argmax(y_pred, axis=1)
    predicted_class_names = [class_columns[i] for i in y_pred_classes]
    
    df_prob = pd.DataFrame(y_pred, columns=class_columns)
    df_prob.insert(0, 'image_path', image_paths)
    df_class = pd.DataFrame({'image_path': image_paths, 'predicted_class': predicted_class_names})
    
    df_merged = pd.merge(df_prob, df_class, on='image_path')
    df_merged.to_excel(output_path, index=False)

def calculate_specificity(y_true, y_pred):
    """
    Calculates specificity: TN / (TN + FP).
    
    Parameters:
    - y_true: Ground truth binary labels (0 or 1).
    - y_pred: Predicted binary labels (0 or 1).
    
    Returns:
    - specificity: Specificity score, or 0 if denominator is zero.
    """
    tn = np.sum((y_true == 0) & (y_pred == 0))
    fp = np.sum((y_true == 0) & (y_pred == 1))
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    return specificity

In [11]:
def generate_metrics_report(y_true, y_pred):
    """
    Generates a comprehensive metrics report for a multi-class classification problem.
    
    Parameters:
    - y_true: Ground truth labels, one-hot encoded (numpy array of shape (n_samples, n_classes)).
    - y_pred: Predicted probabilities for each class (numpy array of shape (n_samples, n_classes)).
    
    Returns:
    - metrics_report: A JSON string containing various performance metrics including AUC-ROC, 
                      specificity, average precision, sensitivity, F1-score, and balanced accuracy.
    """
    class_columns = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
    metrics_report = {}

    y_true_classes = np.argmax(y_true, axis=1)
    y_pred_classes = np.argmax(y_pred, axis=1)

    class_report = classification_report(y_true_classes, y_pred_classes, target_names=class_columns, output_dict=True, zero_division=0)

    auc_roc_scores = {class_name: roc_auc_score(y_true[:, i], y_pred[:, i]) for i, class_name in enumerate(class_columns)}
    mean_auc_roc = np.mean(list(auc_roc_scores.values()))
    auc_roc_scores['mean_auc'] = mean_auc_roc

    specificity_scores = {class_name: calculate_specificity(y_true[:, i], (y_pred[:, i] >= 0.5).astype(int)) for i, class_name in enumerate(class_columns)}
    mean_specificity = np.mean(list(specificity_scores.values()))
    specificity_scores['mean_specificity'] = mean_specificity

    average_precision_scores = {}
    for i, class_name in enumerate(class_columns):
        precision, recall, _ = precision_recall_curve(y_true[:, i], y_pred[:, i])
        average_precision_scores[class_name] = auc(recall, precision) if len(precision) > 0 else 0.0
    mean_average_precision = np.mean(list(average_precision_scores.values()))
    average_precision_scores['mean_average_precision'] = mean_average_precision

    sensitivity_scores = {class_name: recall_score(y_true[:, i], (y_pred[:, i] >= 0.5).astype(int), zero_division=0) for i, class_name in enumerate(class_columns)}
    mean_sensitivity = np.mean(list(sensitivity_scores.values()))
    sensitivity_scores['mean_sensitivity'] = mean_sensitivity

    f1_scores = {class_name: f1_score(y_true[:, i], (y_pred[:, i] >= 0.5).astype(int), zero_division=0) for i, class_name in enumerate(class_columns)}
    mean_f1_score = np.mean(list(f1_scores.values()))
    f1_scores['mean_f1_score'] = mean_f1_score

    balanced_accuracy = balanced_accuracy_score(y_true_classes, y_pred_classes)

    metrics_report.update(class_report)
    metrics_report['auc_roc_scores'] = auc_roc_scores
    metrics_report['specificity_scores'] = specificity_scores
    metrics_report['average_precision_scores'] = average_precision_scores
    metrics_report['sensitivity_scores'] = sensitivity_scores
    metrics_report['f1_scores'] = f1_scores
    metrics_report['mean_auc'] = mean_auc_roc
    metrics_report['mean_specificity'] = mean_specificity
    metrics_report['mean_average_precision'] = mean_average_precision
    metrics_report['mean_sensitivity'] = mean_sensitivity
    metrics_report['mean_f1_score'] = mean_f1_score
    metrics_report['balanced_accuracy'] = balanced_accuracy

    return json.dumps(metrics_report, indent=4)

In [28]:
one_hot_labels = torch.nn.functional.one_hot(labels, num_classes=10)
# Generate and save evaluation metrics as JSON
metrics_report = generate_metrics_report(np.array(one_hot_labels), np.array(prob))
metrics_report_path = '/kaggle/working/metrics_report.json'
with open(metrics_report_path, 'w') as f:
    f.write(metrics_report)

In [None]:
from sklearn.metrics import classification_report, accuracy_score, balanced_accuracy_score

# Compute metrics
accuracy = accuracy_score(labels, pred)
balanced_acc = balanced_accuracy_score(labels, pred)
class_report = classification_report(labels, pred, target_names=test_loader.dataset.classes, zero_division=1)

# Print metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Balanced Accuracy: {balanced_acc:.4f}")
print("Classification Report:")
print(class_report)


In [None]:
class_names = ['Angioectasia', 'Bleeding', 'Erosion', 'Erythema', 'Foreign Body', 'Lymphangiectasia', 'Normal', 'Polyp', 'Ulcer', 'Worms']
y_pred = prob
y_pred_classes = pred
#  Get true labels from the validation generator
y_val = labels # Assuming classes are set in the generator
y_val_tensor = torch.tensor(y_val)  # Convert to a tensor if not already
y_val_one_hot = torch.nn.functional.one_hot(y_val_tensor, num_classes=10)
class_names = list(test_loader.dataset.classes)

# Convert the class indices back to class names for both actual and predicted values
actual_class_names = [class_names[i] for i in y_val]
predicted_class_names = [class_names[i] for i in y_pred_classes]

#Create a DataFrame with the predicted probabilities for each class (y_pred)
df_predictions = pd.DataFrame(y_pred, columns=class_names)
image_paths = image_paths = [test_loader.dataset.samples[i][0] for i in range(len(test_loader.dataset))]

# Add the image paths, actual class, and predicted class to the DataFrame
df_predictions.insert(0, 'image_path', image_paths)  # Insert image names as the first column
df_predictions['predicted_class'] = predicted_class_names  # Add predicted class as a column
df_predictions['actual_class'] = actual_class_names  # Add actual class as a column

# Save to Excel
output_path = '/kaggle/working/test_results.xlsx'
df_predictions.to_excel(output_path, index=False)

# print(f"Validation results with probabilities saved to {output_path}")

# Plot AUC-ROC curve for each class and save it as PNG
lb = LabelBinarizer()
y_val_bin = lb.fit_transform(y_val)
fpr, tpr, roc_auc = {}, {}, {}

for i in range(len(class_names)):
    fpr[i], tpr[i], _ = roc_curve(y_val_bin[:, i], y_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

plt.figure()
for i in range(len(class_names)):
    plt.plot(fpr[i], tpr[i], label=f'{class_names[i]} (AUC = {roc_auc[i]:.2f})')

plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('AUC-ROC Curve')
plt.legend(loc='lower right')
roc_curve_path = '/kaggle/working/roc_curve.pdf'
plt.savefig(roc_curve_path, format='pdf')  # Save the AUC-ROC curve plot as PNG
plt.show()

In [None]:
# Print Confusion Matrix
print("Confusion Matrix:")
cm = confusion_matrix(y_val, y_pred_classes, normalize='true')
print(cm)

# Print Classification Report
print("Classification Report:")
print(classification_report(y_val, y_pred_classes, target_names=class_names))

# Plot normalized Confusion Matrix and save it as PNG
plt.figure(figsize=(12, 12))
sns.heatmap(cm, annot=True, fmt=".2f", cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Normalized Confusion Matrix')
plt.tight_layout()
conf_matrix_path = '/kaggle/working/confusion_matrix.pdf'
plt.savefig(conf_matrix_path, format='pdf')  # Save the confusion matrix plot as PNG
plt.show()

# Calculate number of parameters and Flops

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dummy input tensor (batch size of 1 and input size (224, 224, 3))
dummy_input = torch.randn(1, 3, 224, 224).to(device)

# Calculate number of parameters
num_params = sum(p.numel() for p in dino_model.parameters())
print(f"Number of parameters: {num_params:,}")

# Calculate FLOPs using thop
flops, _ = profile(dino_model, inputs=(dummy_input,))
print(f"FLOPs: {flops:,}")

In [None]:
# Calculate number of parameters
num_params = sum(p.numel() for p in aim_model.parameters())
print(f"Number of parameters: {num_params:,}")

# Calculate FLOPs using thop
flops, _ = profile(aim_model, inputs=(dummy_input,))
print(f"FLOPs: {flops:,}")

In [None]:
# Calculate number of parameters
num_params = sum(p.numel() for p in hiera_model.parameters())
print(f"Number of parameters: {num_params:,}")

# Calculate FLOPs using thop
flops, _ = profile(hiera_model, inputs=(dummy_input,))
print(f"FLOPs: {flops:,}")