In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

# Define device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define transformations for data augmentation and normalization
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.RandomHorizontalFlip(),  # Random horizontal flip for augmentation
    transforms.RandomRotation(15),  # Random rotation for augmentation
    transforms.ToTensor(),  # Convert images to tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet means and std
])

# Load dataset (train and test)
train_dataset = ImageFolder(root="/kaggle/input/grape-disease/grape_dataset/train", transform=transform)
test_dataset = ImageFolder(root="/kaggle/input/grape-disease/grape_dataset/test", transform=transform)

# Create data loaders for batching
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Load the VGG16 model pre-trained on ImageNet
model = models.vgg16(pretrained=True)
# Modify the last fully connected layer for 4 output classes (grape disease categories)
model.classifier[6] = nn.Linear(in_features=4096, out_features=4)
model = model.to(device)  # Move model to the defined device (GPU/CPU)

# Define loss function (cross-entropy) and optimizer (Adam)
cross_entropy_loss = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00001)

# Set number of epochs
epochs = 10

# Training loop over multiple epochs
for epoch in range(epochs):  
    for i, batch in enumerate(train_loader, 0):
        inputs, labels = batch
        inputs = inputs.to(device)  # Move inputs to device
        labels = labels.to(device)  # Move labels to device
        optimizer.zero_grad()  # Zero the gradients to avoid accumulation
        outputs = model(inputs)  # Forward pass
        loss = cross_entropy_loss(outputs, labels)  # Calculate loss
        loss.backward()  # Backpropagate the gradients
        optimizer.step()  # Update the model parameters
        print(loss)  # Print loss for every batch

# Inspect predictions for the first batch from the test set
import pandas as pd
inputs, labels = next(iter(test_loader))
inputs = inputs.to(device)  # Move inputs to device

labels = labels.numpy()  # Convert labels to numpy
# Get predictions from the model (output class with max score)
outputs = model(inputs).max(1).indices.detach().cpu().numpy()

# Calculate and print batch accuracy
comparison = pd.DataFrame()
print("Batch accuracy: ", (labels == outputs).sum() / len(labels))
comparison["labels"] = labels
comparison["outputs"] = outputs
comparison  # Display comparison of labels and predictions

# Save the trained model's weights to a file
torch.save(model.state_dict(), "/kaggle/working/fine_tuned_model.pth")
print("Model saved successfully!")

# Install necessary libraries for explainability
!pip install torch torchvision captum quantus grad-cam

# Import necessary packages for explanation methods (e.g., Grad-CAM++, LRP)
import pathlib
import random
import copy
import gc
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torchvision
from torchvision import models
from captum.attr import *
import quantus
from pytorch_grad_cam import GradCAMPlusPlus
from captum.attr import LRP
from captum.attr._utils.lrp_rules import EpsilonRule, GammaRule, Alpha1_Beta0_Rule

from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import cv2

# Define class labels for your grape dataset (indices to class names)
class_labels = {0: "Black Rot", 1: "Esca", 2: "Leaf Blight", 3: "Healthy"}

# Load the fine-tuned VGG16 model
model = models.vgg16()
model.classifier[6] = torch.nn.Linear(in_features=4096, out_features=4)  # Modify output for 4 classes
model.load_state_dict(torch.load("/kaggle/input/fine-tuned-grape-leaf/fine_tuned_model.pth", map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu')))
model.eval()  # Set the model to evaluation mode (no dropout, batch norm updates)

# Define the image transformations for test data (same as training)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load the test dataset
test_dataset = ImageFolder(root="/kaggle/input/grape-disease/grape_dataset/test", transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

# Get a batch of test data
x_batch, y_batch = next(iter(test_loader))

# Initialize Grad-CAM++ for visualization
cam = GradCAMPlusPlus(model=model, target_layers=[model.features[-1]])  # Target last convolutional layer
grayscale_cam = cam(input_tensor=x_batch)  # Generate Grad-CAM++ heatmaps
layers = list(model._modules["features"]) + list(model._modules["classifier"])  # Combine feature and classifier layers
num_layers = len(layers)

# Initialize LRP (Layer-wise Relevance Propagation)
for idx_layer in range(1, num_layers):
    if idx_layer <= 30:  # First few layers use Alpha1_Beta0_Rule
        setattr(layers[idx_layer], "rule", Alpha1_Beta0_Rule())
    elif idx_layer >= 31:  # Later layers use EpsilonRule
        setattr(layers[idx_layer], "rule", EpsilonRule(epsilon=0))

lrp = LRP(model)  # Initialize LRP method
# Generate LRP attributions for the batch
attributions_lrp = lrp.attribute(x_batch, target=y_batch)
attributions_lrp_np = attributions_lrp.cpu().detach().numpy()  # Convert to numpy
# Reorganize the attribution array for visualization
attributions_lrp_np = np.transpose(attributions_lrp_np, (0, 2, 3, 1))
attributions_lrp_np = np.average(attributions_lrp_np, axis=3)  # Average across channels
# Normalize the attribution values
attributions_lrp_np = (attributions_lrp_np.T / np.max(attributions_lrp_np, axis=(1, 2)).T).T

# Process Grad-CAM heatmap (apply thresholding)
grayscale_cam_thr = np.copy(grayscale_cam)
grayscale_cam_thr = grayscale_cam_thr - 0.2  # Apply a threshold to remove weak activations
grayscale_cam_thr = np.clip(grayscale_cam_thr, 0, 1)  # Clip values between 0 and 1

# Multiply Grad-CAM and LRP outputs to get a combined attribution map
product = attributions_lrp_np * grayscale_cam_thr
product = (product.T / np.max(product, axis=(1, 2)).T).T  # Normalize the combined map

# Apply Gaussian smoothing to the combined attribution map
smoothed = np.zeros(product.shape)
for i in range(0, product.shape[0]):
    smoothed[i] = cv2.GaussianBlur(product[i], (5, 5), cv2.BORDER_DEFAULT)
    smoothed[i] = smoothed[i] / np.max(smoothed[i])  # Normalize after smoothing

# Plotting the results (original image, Grad-CAM, LRP, and combined attribution map)
fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(15, 4.5))
axes[0].imshow(np.moveaxis(quantus.normalise_func.denormalise(x_batch[1].cpu().detach().numpy(), mean=np.array([0.485, 0.456, 0.406]), std=np.array([0.229, 0.224, 0.225])), 0, -1), vmin=0.0, vmax=1.0)
axes[0].title.set_text(f"Input(class '{class_labels[y_batch[1].item()]}')")
axes[1].imshow(grayscale_cam[1], cmap="seismic", vmin=-1.0, vmax=1.0)
axes[1].title.set_text("Grad-CAM")
axes[2].imshow(grayscale_cam_thr[1], cmap="seismic", vmin=-1.0, vmax=1.0)
axes[2].title.set_text("Threshold created from Grad-CAM")
axes[3].imshow(attributions_lrp_np[1], cmap="seismic", vmin=-1.0, vmax=1.0)
axes[3].title.set_text("LRP")
axes[4].imshow(smoothed[1], cmap="seismic", vmin=-1.0, vmax=1.0)
axes[4].title.set_text("Proposed method")

# Remove axes ticks for better visualization
for i in range(0, 5):
    axes[i].axis("on")
    axes[i].get_xaxis().set_ticks([])
    axes[i].get_yaxis().set_ticks([])

# Save the image with comparisons
plt.savefig('/kaggle/working/example.png', bbox_inches='tight')

# Save qualitative comparisons for each test image
explanations = {
    "GradCAM": grayscale_cam,
    "LRP": attributions_lrp_np,
    "Proposed method": smoothed
}

fig, axes = plt.subplots(nrows=len(x_batch), ncols=1+len(explanations), figsize=(15, 4.5*len(x_batch)))
for index in range(0, len(x_batch)):
    axes[index][0].imshow(np.moveaxis(quantus.normalise_func.denormalise(x_batch[index].cpu().detach().numpy(), mean=np.array([0.485, 0.456, 0.406]), std=np.array([0.229, 0.224, 0.225])), 0, -1), vmin=0.0, vmax=1.0)
    axes[index][0].title.set_text(f"Grape class {class_labels[y_batch[index].item()]}")
    axes[index][0].axis("off")
    for i, (k, v) in enumerate(explanations.items()):
        axes[index][i+1].imshow(explanations[k][index] / np.max(explanations[k][index]), cmap="seismic", vmin=-1.0, vmax=1.0)
        axes[index][i+1].title.set_text(f"{k}")
        axes[index][i+1].axis("off")

# Save the qualitative comparison images
plt.savefig('/kaggle/working/qualitative.png', bbox_inches='tight')

In [None]:
from captum.metrics import infidelity

# Dynamically match noise size with x_batch
# This noise is used to perturb the input images during infidelity evaluation
noise = torch.tensor(np.random.normal(0, 0.003, x_batch.shape)).float().to(x_batch.device)

# Define the perturbation function required by the Captum `infidelity` metric
# It returns both the noise and the perturbed input (input - noise)
def perturb_fn(inputs):
    perturbation = torch.tensor(np.random.normal(0, 0.003, inputs.shape)).float().to(inputs.device)
    return perturbation, inputs - perturbation

# ----------------------------- Infidelity Computation -----------------------------

# Compute Infidelity for LRP-based attributions
infid = infidelity(model, perturb_fn, x_batch, attributions_lrp, normalize=True, target=y_batch)
print(infid)

# Prepare Grad-CAM attributions to be the same shape as input (batch, 3, 224, 224)
grayscale_cam_rp = np.repeat(grayscale_cam[..., np.newaxis], 3, axis=-1)  # Add 3 channels
grayscale_cam_rp = np.transpose(grayscale_cam_rp, (0, 3, 1, 2))           # Change to (B, C, H, W)
grayscale_cam_rp = torch.tensor(grayscale_cam_rp).float().to(x_batch.device)

# Compute Infidelity for Grad-CAM attributions
infid_gcam = infidelity(model, perturb_fn, x_batch, grayscale_cam_rp, normalize=True, target=y_batch)
print(infid_gcam)

# Prepare proposed method's heatmaps for infidelity evaluation
smoothed_rp = np.repeat(smoothed[..., np.newaxis], 3, axis=-1)
smoothed_rp = np.transpose(smoothed_rp, (0, 3, 1, 2))  # Convert to (B, C, H, W)
smoothed_rp = torch.tensor(smoothed_rp).float().to(x_batch.device)

# Compute Infidelity for Proposed Method
infid_smoothed = infidelity(model, perturb_fn, x_batch, smoothed_rp, normalize=True, target=y_batch)
print(infid_smoothed)

# ----------------------------- Visualization -----------------------------

# Create x-axis for plotting
x = np.arange(1, len(infid) + 1)

# Convert tensors to numpy arrays for plotting
infid = infid.cpu().detach().numpy() if isinstance(infid, torch.Tensor) else infid
infid_gcam = infid_gcam.cpu().detach().numpy() if isinstance(infid_gcam, torch.Tensor) else infid_gcam
infid_smoothed = infid_smoothed.cpu().detach().numpy() if isinstance(infid_smoothed, torch.Tensor) else infid_smoothed

# Sanity check to ensure same number of samples in all metrics
assert len(infid) == len(x)
assert len(infid_gcam) == len(x)
assert len(infid_smoothed) == len(x)

# Plot the infidelity scores for comparison
fig, ax = plt.subplots()
ax.plot(x, infid, label='LRP', marker='o')
ax.plot(x, infid_gcam, label='GradCAM', marker='s')
ax.plot(x, infid_smoothed, label='Proposed', marker='^')

# Formatting the plot
ax.legend()
ax.set_xlabel('Image Sample Index')
ax.set_ylabel('Infidelity Score')
ax.set_title('Infidelity Scores Across Samples')
plt.grid(True)
plt.savefig('/kaggle/working/infidelity_graph.png', bbox_inches='tight')
plt.show()

# ----------------------------- Quantus Metric Evaluation -----------------------------

# Define the XAI methods and evaluation metrics from Quantus library
xai_methods = ['Proposed_method', 'GradCAM', 'LRP']
metrics = {
    "Robustness": quantus.AvgSensitivity(...),  # Measures attribution stability to small perturbations
    "Faithfulness": quantus.FaithfulnessCorrelation(...),  # Correlates attribution with prediction impact
    "Localisation": quantus.RelevanceRankAccuracy(...),  # Measures how accurately attributions localize relevant features
    "Complexity": quantus.Sparseness(...),  # Measures sparsity (less complex = more interpretable)
    "Randomisation": quantus.RandomLogit(...),  # Tests sensitivity of explanations to randomization of model
}

model.eval()  # Set model to evaluation mode

# Prepare LRP attributions for use as segmentation masks (binary masks)
s_batch = attributions_lrp.cpu().detach().numpy()
threshold = np.percentile(s_batch, 90)  # Keep top 10% of values as relevant
s_batch = (s_batch >= threshold).astype(np.uint8)
s_batch = np.expand_dims(s_batch, axis=1)  # Add channel dimension for Quantus compatibility

print(f"s_batch shape: {s_batch.shape}, unique values: {np.unique(s_batch)}")

# Dictionary to store metric results for each XAI method
results = {method: {} for method in xai_methods}

# Loop over each method and compute all metrics
for method in xai_methods:
    for metric, metric_func in metrics.items():
        print(f"Evaluating {metric} of {method} method.")

        # Extract dimensions from input images
        batch_size, channels, height, width = x_batch.shape

        # Regenerate the segmentation masks to prevent reuse of previous s_batch
        s_batch = attributions_lrp.cpu().detach().numpy()
        threshold = np.percentile(s_batch, 90)
        s_batch = (s_batch >= threshold).astype(np.uint8)
        s_batch = np.expand_dims(s_batch, axis=1)

        # Resize masks if necessary to match input image resolution
        if s_batch.shape[2:] != (height, width):
            from skimage.transform import resize
            s_batch = np.array([resize(mask[0], (height, width), anti_aliasing=True) for mask in s_batch])
            s_batch = np.expand_dims(s_batch, axis=1)
            s_batch = (s_batch > 0.5).astype(np.uint8)  # Convert back to binary

        print(f"x_batch shape: {x_batch.shape}, s_batch shape: {s_batch.shape}, unique values: {np.unique(s_batch)}")

        # Call Quantus metric evaluation
        scores = metric_func(
            model=model,
            x_batch=x_batch.cpu().detach().numpy(),
            y_batch=y_batch.cpu().detach().numpy(),
            a_batch=None,  # Not using continuous attributions
            s_batch=s_batch,
            explain_func=explainer_wrapper,  # Custom function to generate explanations per method
            explain_func_kwargs={"method": method},
        )
        results[method][metric] = scores
