# Imports

In [None]:
import time, os, warnings, torch, multiprocessing, skimage, scipy, random
import numpy as np
import pandas as pd
import torch.nn as nn
from PIL import Image
from timm import create_model
from prodigyopt import Prodigy
import pytorch_lightning as pl
from torchvision import transforms
from torch.utils.data import Subset, Dataset
from model_extractors import resnet50_img_extractor
from sklearn.preprocessing import LabelEncoder
from masking_network import resnet50_trained_extractor
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from captum.attr import GradientShap
from captum.attr import IntegratedGradients
from pytorch_grad_cam import GradCAMPlusPlus
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from torchray.attribution.extremal_perturbation import extremal_perturbation, contrastive_reward
from quantus.metrics.faithfulness.faithfulness_estimate import FaithfulnessEstimate

# Data

In [None]:


# Path to the dataset
base_path = "data/HAM10000"
metadata_path = os.path.join(base_path, "HAM10000_metadata.csv")
# Check if CUDA is available and set the device to GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {device}")
# Read metadata
metadata = pd.read_csv(metadata_path)
label_encoder = LabelEncoder()
metadata['encoded_dx'] = label_encoder.fit_transform(metadata['dx'])
class_counts = metadata['dx'].value_counts()
print(class_counts)


# Assuming 'dx' is your target variable and it's encoded as integers
class_labels = np.unique(metadata['encoded_dx'])  # Unique classes
weights = compute_class_weight(class_weight='balanced', classes=class_labels, y=metadata['encoded_dx'].values)
class_weights = {label: weight for label, weight in zip(class_labels, weights)}

print(class_weights)
# Convert class weights to a tensor
weights_tensor = torch.tensor(list(class_weights.values()), dtype=torch.float)

# Move to the same device as your model and inputs
weights_tensor = weights_tensor.to(device)  # device could be 'cpu' or 'cuda'

In [54]:

class SkinCancerDataset(Dataset):
    def __init__(self, csv_file, base_img_dir, transform=None, skin_frame=None):
        self.skin_frame = skin_frame if skin_frame is not None else pd.read_csv(csv_file)
        self.base_img_dir = base_img_dir
        self.transform = transform
        self.num_classes = len(self.skin_frame['encoded_dx'].unique())

    def __len__(self):
        return len(self.skin_frame)

    def __getitem__(self, idx):
        img_id = self.skin_frame.iloc[idx]['image_id']
        img_path = os.path.join(self.base_img_dir,"HAM_10000_images" ,f"{img_id}.jpg")
        image = Image.open(img_path)
        
        label = self.skin_frame.iloc[idx]['encoded_dx']

        if self.transform:
            image = self.transform(image)

        label = torch.tensor(label, dtype=torch.long)

        return image, label, self.skin_frame.iloc[idx]['image_id']



In [None]:
transform = transforms.Compose([
     transforms.Resize((224, 224)),
     transforms.ToTensor(),
     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
 ])


dataset = SkinCancerDataset(csv_file=metadata_path, base_img_dir=base_path, transform=transform, skin_frame=metadata)

# Splitting dataset into train and test

train_idx, val_idx = train_test_split(list(range(len(dataset))), test_size=0.2, random_state=42)

random.seed(42)
evaluation_idx = np.unique(random.sample(val_idx,1000))

train_dataset = Subset(dataset, train_idx)
val_dataset = Subset(dataset, val_idx)
evaluation_dataset = Subset(dataset, evaluation_idx)
len(train_idx)


# Model


## Architecture

In [56]:
class classifier_model(pl.LightningModule):
    def __init__(self, model_string = "resnet50",):
        super().__init__()
        self.model = create_model(model_string, pretrained=True, num_classes=7,in_chans= 3)
        self.criterion = nn.CrossEntropyLoss(weight=weights_tensor)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y, _ = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch
        logits = self(x)
        loss = self.criterion(logits, y)
        self.log('val_loss', loss)

    def configure_optimizers(self):
        optimizer = Prodigy(self.parameters(), lr=1, weight_decay=1e-4, )
        return optimizer
model = classifier_model("resnet50")

## Model Training

In [57]:
BATCH = 32
NUM_WORKERS = multiprocessing.cpu_count() - 2
EPOCHS = 10
MIXED_PRECISION = False
DETERMINISTIC = False


train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH, shuffle=True, num_workers=NUM_WORKERS)
test_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH, shuffle=False, num_workers=NUM_WORKERS)

trainer = pl.Trainer(
        max_epochs=EPOCHS,
        devices="auto",
        accelerator="gpu" if torch.cuda.is_available() else "cpu", 
        precision=16 if MIXED_PRECISION else 32,
        default_root_dir="classifier_lesion_resnet50_logs",
        accumulate_grad_batches=int(32/BATCH,),
        deterministic=DETERMINISTIC)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [58]:
trainer.fit(model, train_loader, test_loader)
model.eval()
model.cuda()
accs = []
for batch in test_loader:
    x, y , _  = batch
    logits = (model(x.cuda())).detach().cpu()
    acc = (logits.argmax(1) == y).float().mean()
    accs.append(acc.item())
model.cpu()
print("Accuracy: ", sum(accs) / len(accs))


Accuracy:  0.07886904761904762


# Methodology

## NEM

In [59]:
BATCH = 32
NUM_WORKERS = multiprocessing.cpu_count() - 2
EPOCHS = 5
MIXED_PRECISION = False
DETERMINISTIC = False
CONTRASTIVE = False
NOISE_MASK = False
INVERSE = True

### Model

In [None]:

extractor =  resnet50_img_extractor(model.model)
masked_model = resnet50_trained_extractor(extractor, EPOCHS, batch_size=BATCH, lr = 1, 
                                           center = False, partition = 1, 
                                           noise_mask = NOISE_MASK, constrastive = CONTRASTIVE, inverse=INVERSE)

# Training

In [None]:

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH, shuffle=True, num_workers=NUM_WORKERS)
test_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH, shuffle=False, num_workers=NUM_WORKERS)
trainer = pl.Trainer(
        max_epochs=EPOCHS,
        devices="auto",
        accelerator="gpu" if torch.cuda.is_available() else "cpu", 
        precision=16 if MIXED_PRECISION else 32,
        default_root_dir="masking_supervised_lesion_logs",
        accumulate_grad_batches=int(128/BATCH,),
        deterministic=DETERMINISTIC
)

trainer.fit(masked_model, train_loader)

# Prediction

In [None]:


masked_model.eval()
masked_model = masked_model.to("cuda")
nem_path = "result/supervised/skin_lesion/nem_inv/"
os.makedirs(nem_path,exist_ok=True)
execution_times = []
for (img, y, name) in evaluation_dataset:
    name = name.split("/")[-1] + ".png"
    img = img.unsqueeze(0).to("cuda")
    start_time = time.time()
    attr, _  = masked_model(img)
    execution_times.append(time.time()-start_time)
    x = img.squeeze().permute(1,2,0).cpu().detach().numpy()
    x = ((x - x.min()) / (x.max() - x.min()))*255
    attr = attr.cpu().squeeze().detach().numpy()
    attr = (((attr - attr.min()) / (attr.max() - attr.min()))*255).astype(np.uint8)
    attr = 255 - attr



    Image.fromarray(attr).save(f"{nem_path}{name}")

print("Average execution time: ", np.array(execution_times).sum()/len(execution_times))

# Comparisons

## GradCAM

In [None]:
method = GradCAMPlusPlus(model, [model.model.layer4[-1]])   
path = "result/supervised/skin_lesion/gradcam/"
os.makedirs(path, exist_ok=True)
execution_times = []
for (img,target, name) in evaluation_dataset:
    name = name.split("/")[-1] + ".png"
    start_time = time.time()
    attr = method(input_tensor=img.unsqueeze(0).cuda(), targets=[ClassifierOutputTarget(target)]).squeeze()
    execution_times.append(time.time()-start_time)
    x = img.permute(1,2,0).cpu().detach().numpy()
    x = (((x - x.min()) / (x.max() - x.min()))*255).astype(np.uint8)
    
    attr = (((attr - attr.min()) / (attr.max() - attr.min()))*255).astype(np.uint8)
    Image.fromarray(attr).save(f"{path}{name}")
print("Average execution time: ", np.array(execution_times).sum()/len(execution_times))

## Gradient Shap

In [None]:
model.eval()
method = GradientShap(model)
baseline = torch.zeros((1, 3, 224, 224)).cuda()
path = "result/supervised/skin_lesion/grad_shap/"
os.makedirs(path, exist_ok=True)
execution_times = []
for (img,target, name) in evaluation_dataset:
    name = name.split("/")[-1] + ".png"
    start_time = time.time()
    attr = method.attribute(img.unsqueeze(0).cuda(),baselines=baseline,target= torch.tensor(target.argmax())).squeeze()
    execution_times.append(time.time()-start_time)
    x = img.permute(1,2,0).cpu().detach().numpy()
    x = (((x - x.min()) / (x.max() - x.min()))*255).astype(np.uint8)
    attr = torch.abs(attr).sum(0).cpu().numpy()
    attr = (((attr - attr.min()) / (attr.max() - attr.min()))*255).astype(np.uint8)
    Image.fromarray(attr).save(f"{path}{name}")
print("Average execution time: ", np.array(execution_times).sum()/len(execution_times))

## Integrated Gradients

In [None]:
model.eval()
method = IntegratedGradients(model)
baseline = torch.zeros((1, 3, 224, 224)).cuda()
path = "result/supervised/skin_lesion/integrated_gradients/"
os.makedirs(path, exist_ok=True)
execution_times = []
for (img,target, name) in evaluation_dataset:
    name = name.split("/")[-1] + ".png"
    start_time = time.time()
    attr = method.attribute(img.unsqueeze(0).cuda(),baselines=baseline,target= torch.tensor(target.argmax())).squeeze()
    execution_times.append(time.time()-start_time)
    x = img.permute(1,2,0).cpu().detach().numpy()
    x = (((x - x.min()) / (x.max() - x.min()))*255).astype(np.uint8)
    attr = torch.abs(attr).sum(0).cpu().numpy()
    attr = (((attr - attr.min()) / (attr.max() - attr.min()))*255).astype(np.uint8)
    Image.fromarray(attr).save(f"{path}{name}")
print("Average execution time: ", np.array(execution_times).sum()/len(execution_times))

  attr = method.attribute(img.unsqueeze(0).cuda(),baselines=baseline,target= torch.tensor(target.argmax())).squeeze()


Average execution time:  0.04806521725654602


# Smooth Pixel mask

In [None]:
warnings.filterwarnings("ignore", category=UserWarning)
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")   
softmax = torch.nn.Softmax(dim=-1)

class SmoothMask:
    def __init__(self, area, model):
        self.area = area 
        self.model = model
    def __call__(self, x, pred):
        mask, _ = extremal_perturbation(
            self.model, x, pred,
            reward_func=contrastive_reward,
            debug=False,
            areas=[self.area]
        )
        return mask

In [None]:
method = SmoothMask(model =model, area=0.05)
path = "result/supervised/skin_lesion/smooth_mask/"
os.makedirs(path, exist_ok=True)
execution_times = []
for (img,target, name) in evaluation_dataset:
    name = name.split("/")[-1]+ ".png"
    start_time = time.time()
    attr = method(img.unsqueeze(0).cuda(), int(target))
    execution_times.append(time.time()-start_time)

    x = img.permute(1,2,0).cpu().detach().numpy()
    x = (((x - x.min()) / (x.max() - x.min()))*255).astype(np.uint8)
    attr = attr.squeeze().cpu().detach().numpy()
    attr = (((attr - attr.min()) / (attr.max() - attr.min()))*255).astype(np.uint8)
    Image.fromarray(attr).save(f"{path}{name}")
print("Average execution time: ", np.array(execution_times).sum()/len(execution_times))


# Results

## Run Experiment

In [46]:
def run_exp(metrics, metric_names,data):
    res_path = "result/supervised/skin_lesion/"
    mask_path = "data/HAM10000/HAM10000_segmentations_lesion_tschandl/HAM10000_segmentations_lesion_tschandl/"
    resnet_50_smoothmask_path = f"{res_path}smooth_mask/"
    resnet_50_gradcam_path = f"{res_path}gradcam/"
    resnet_50_gradshape_path = f"{res_path}grad_shap/"
    resnet_50_integrated_gradients_path = f"{res_path}integrated_gradients/"
    resnet_50_nem_path = f"{res_path}nem_inv/"

    for metric_func,metric_name in zip(metrics,metric_names):    
        resnet_50_smoothmask_res = metric_func(mask_path=mask_path, explanation_path=resnet_50_smoothmask_path, samples=data)
        resnet_50_gradcam_res = metric_func(mask_path=mask_path, explanation_path=resnet_50_gradcam_path, samples=data)
        resnet_50_gradshape_res = metric_func(mask_path=mask_path, explanation_path=resnet_50_gradshape_path, samples=data)
        resnet_50_integrated_gradients_res = metric_func(mask_path=mask_path, explanation_path=resnet_50_integrated_gradients_path, samples=data)
        resnet_50_nem_res = metric_func(mask_path=mask_path, explanation_path=resnet_50_nem_path, samples=data)

        print(f"""
        {metric_name}:
        resnet_50_smoothmask:     {resnet_50_smoothmask_res}
        resnet_50_gradcam:        {resnet_50_gradcam_res}
        resnet_50_gradshape:      {resnet_50_gradshape_res}
        resnet_50_integrated_gradients: {resnet_50_integrated_gradients_res}
        resnet_50_nem:            {resnet_50_nem_res}
        """)


# Locality

In [None]:
def relevance_rank( mask_path, explanation_path, samples, uncertainty=False):
    rank_accuracy = 0
    samps = 0
    for _, _, img_name in samples:
        s = (np.array(Image.open(mask_path + img_name + "_segmentation.png" ).resize((224,224))) > 0).astype(np.uint8)
        a =  np.array(Image.open(explanation_path +  img_name + ".png"))/255
        # Prepare shapes.
        a = a.flatten()
        s = np.where(s.flatten().astype(bool))[0]
        # Size of the ground truth mask.
        k = len(s)
        # Sort in descending order.
        a_sorted = np.argsort(a)[-int(k) :]
        # Calculate hits.
        hits = len(np.intersect1d(s, a_sorted))
        if hits != 0:
            rank_accuracy += hits / float(k)
        else:
            rank_accuracy += 0.0
        samps +=1 



    return rank_accuracy/ samps

def relevancy_mass(mask_path, explanation_path, samples, uncertainty=False):
    mass_accuracy_total = 0
    samps = 0
    for _, _, img_name in samples:
        s = (np.array(Image.open(mask_path + img_name + "_segmentation.png" ).resize((224,224))) > 0).astype(np.uint8)
        a =  np.array(Image.open(explanation_path +  img_name + ".png"))/255
        # 
        a = a.flatten()
        s = s.flatten().astype(bool)
        # Compute inside/outside ratio.
        r_within = np.sum(a[s])
        r_total = np.sum(a)
        # Calculate mass accuracy.
        mass_accuracy = r_within / r_total
        mass_accuracy_total += mass_accuracy
        samps +=1

    return mass_accuracy_total/ samps
     

run_exp([
     relevance_rank, relevancy_mass],
        [
     "Relevance Rank", "Relevancy Mass"],
    evaluation_dataset)

## Complexity

In [None]:
def complexity( mask_path, explanation_path, samples,uncertainty=False):
    complexity = 0
    
    for _, _, img_name in samples:
        a =  np.array(Image.open(explanation_path +  img_name + ".png"))/255
        # Prepare shapes.
        newshape = np.prod(a.shape)
        a = np.array(np.reshape(a, newshape), dtype=np.float64) / np.sum(np.abs(a))
        complexity += scipy.stats.entropy(pk=a)   


    return complexity/ len(samples)


def sparseness( mask_path, explanation_path, samples,uncertainty=False):
    complexity = 0
    
    for _, _, img_name in samples:
        a =  np.array(Image.open(explanation_path +  img_name + ".png"))/255
        # Prepare shapes.
        newshape = np.prod(a.shape)
        a = np.array(np.reshape(a, newshape), dtype=np.float64)
        a += 0.0000001
        a = np.sort(a)
        complexity += (np.sum((2 * np.arange(1, a.shape[0] + 1) - a.shape[0] - 1) * a)) / (
            a.shape[0] * np.sum(a)
        )

    return complexity/ len(samples)


run_exp([
    complexity, 
    sparseness
    ],
        [
    "Complexity", 
    "Sparseness"
    ],
    evaluation_dataset)


# Faithfullness

In [None]:
def faithfullness(mask_path, explanation_path, samples):
    skips = 0
    metric = FaithfulnessEstimate(features_in_step=224 * 4)
    values = []
    i = 0
    for X,Y,img_name in samples:
        i += 1
        img_name = img_name.split("/")[-1]
        a =  np.array(Image.open(explanation_path +  img_name + ".png"))/255
        Y = Y.argmax().numpy()
        try:
            values +=   metric(model=model.cuda().eval(),
                            x_batch=X.unsqueeze(0).numpy(), y_batch=np.expand_dims(Y.astype(np.uint8), axis=0),
                              a_batch=np.expand_dims(a, axis=0),device="cuda")
        except:
            values += [0]
            skips += 1
            continue
    print(f"Skipped {skips} images") 
    return np.nanmean(values)

    
run_exp([faithfullness],
        ["Faithfullness"],
    evaluation_dataset)