In [1]:
import torch
import torchvision
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.models import resnet18
import torchvision.transforms as transforms
from tqdm import tqdm
from torch.optim import lr_scheduler
import matplotlib.pyplot as plt
import numpy as np
import cv2
import random
import numpy as np

import ttach as tta
from typing import Callable, List, Tuple, Optional
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.activations_and_gradients import ActivationsAndGradients
from pytorch_grad_cam.utils.svd_on_activations import get_2d_projection
from pytorch_grad_cam.utils.image import scale_cam_image
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import (
    show_cam_on_image, deprocess_image, preprocess_image
)

In [2]:
class MyImageFolder(datasets.ImageFolder):  
    def __init__(self, root, transform=None, target_transform=None,  
                 loader=datasets.folder.default_loader,  
                 is_valid_file=None,):  
        super(MyImageFolder, self).__init__(root, transform, target_transform, loader, is_valid_file)  

    def __getitem__(self, index): 
        path, _ = self.samples[index] 
        sample, _ = super(MyImageFolder, self).__getitem__(index) 
        return sample, path[-16:]

In [3]:
torch.manual_seed(42)
train_transform = transforms.Compose([
    transforms.ToTensor(),
])
train_transform_gpu = transforms.Compose([
    transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225]),
])
train_ds = MyImageFolder('./task_images_downsampled/train',
                                transform = train_transform,)
batch_size=64
def get_dataloader_workers():
    return 4
train_dl = DataLoader(train_ds,batch_size=batch_size,shuffle=True,)

In [4]:
class BaseCAM:
    def __init__(self,
                 model: torch.nn.Module,
                 target_layers: List[torch.nn.Module],
                 reshape_transform: Callable = None,
                 compute_input_gradient: bool = False,
                 uses_gradients: bool = True,
                 tta_transforms: Optional[tta.Compose] = None) -> None:
        self.model = model.eval()
        self.target_layers = target_layers

        # Use the same device as the model.
        self.device = next(self.model.parameters()).device
        self.reshape_transform = reshape_transform
        self.compute_input_gradient = compute_input_gradient
        self.uses_gradients = uses_gradients
        if tta_transforms is None:
            self.tta_transforms = tta.Compose(
                [
                    tta.HorizontalFlip(),
                    tta.Multiply(factors=[0.9, 1, 1.1]),
                ]
            )
        else:
            self.tta_transforms = tta_transforms

        self.activations_and_grads = ActivationsAndGradients(
            self.model, target_layers, reshape_transform)

    """ Get a vector of weights for every channel in the target layer.
        Methods that return weights channels,
        will typically need to only implement this function. """

    def get_cam_weights(self,
                        input_tensor: torch.Tensor,
                        target_layers: List[torch.nn.Module],
                        targets: List[torch.nn.Module],
                        activations: torch.Tensor,
                        grads: torch.Tensor) -> np.ndarray:
        raise Exception("Not Implemented")


    def forward(self,
                input_tensor: torch.Tensor,
                targets: List[torch.nn.Module],
                eigen_smooth: bool = False) -> np.ndarray:

        input_tensor = input_tensor.to(self.device)

        if self.compute_input_gradient:
            input_tensor = torch.autograd.Variable(input_tensor,
                                                   requires_grad=True)

        self.outputs = outputs = self.activations_and_grads(input_tensor)

        if targets is None:
            target_categories = np.argmax(outputs.cpu().data.numpy(), axis=-1)
            targets = [ClassifierOutputTarget(
                category) for category in target_categories]

        if self.uses_gradients:
            self.model.zero_grad()
            loss = sum([target(output)
                       for target, output in zip(targets, outputs)])
            loss.backward(retain_graph=True)

        # In most of the saliency attribution papers, the saliency is
        # computed with a single target layer.
        # Commonly it is the last convolutional layer.
        # Here we support passing a list with multiple target layers.
        # It will compute the saliency image for every image,
        # and then aggregate them (with a default mean aggregation).
        # This gives you more flexibility in case you just want to
        # use all conv layers for example, all Batchnorm layers,
        # or something else.
        cam = self.compute_cam_per_layer(input_tensor,
                                                   targets,
                                                   eigen_smooth)
        return cam

    def compute_cam_per_layer(
            self,
            input_tensor: torch.Tensor,
            targets: List[torch.nn.Module],
            eigen_smooth: bool) -> np.ndarray:
        activations = self.activations_and_grads.activations[0]
        grads = self.activations_and_grads.gradients[0]
        # Loop over the saliency image from every layer
        cam = activations*grads
        cam = torch.max(cam, torch.tensor(0.0))

        return cam

    def __call__(self,
                 input_tensor: torch.Tensor,
                 targets: List[torch.nn.Module] = None,
                 aug_smooth: bool = False,
                 eigen_smooth: bool = False) -> np.ndarray:

        # Smooth the CAM result with test time augmentation
        if aug_smooth is True:
            return self.forward_augmentation_smoothing(
                input_tensor, targets, eigen_smooth)

        return self.forward(input_tensor,
                            targets, eigen_smooth)

    def __del__(self):
        self.activations_and_grads.release()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.activations_and_grads.release()
        if isinstance(exc_value, IndexError):
            # Handle IndexError here...
            print(
                f"An exception occurred in CAM with block: {exc_type}. Message: {exc_value}")
            return True

In [5]:
class GradCAM(BaseCAM):
    def __init__(self, model, target_layers,
                 reshape_transform=None):
        super(
            GradCAM,
            self).__init__(
            model,
            target_layers,
            reshape_transform)

    def get_cam_weights(self,
                        input_tensor,
                        target_layer,
                        target_category,
                        activations,
                        grads):
        return torch.mean(grads, axis=(2, 3))

In [6]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = resnet18(num_classes=2).to(device).eval()
model.load_state_dict(torch.load('best.pkl'))

<All keys matched successfully>

In [12]:
def interprete(model,input_tensor,targets,path):
    target_layers = [model.layer4[-1]]
    cam_algorithm = GradCAM
    with cam_algorithm(model=model,
                        target_layers=target_layers) as cam:
        cam.batch_size = 32

        grayscale_cam = cam(input_tensor=input_tensor, 
                            targets=targets,
                            )
        grayscale_cam = grayscale_cam[0, :]
        grayscale_cam = grayscale_cam/grayscale_cam.max()

    grayscale_cam[grayscale_cam > 0.8] = -1
    grayscale_cam[grayscale_cam > 0.6] = -0.8
    grayscale_cam[grayscale_cam > 0.4] = -0.6
    grayscale_cam[grayscale_cam > 0.2] = -0.4
    grayscale_cam[grayscale_cam > 0] = -0.2
    grayscale_cam_array = grayscale_cam.numpy()

    np.savez_compressed(path, compressed_array=grayscale_cam_array)


In [13]:
for X,name in tqdm(train_dl):
    X = X.to(device)
    for j in range(len(X)):
        targets = None
        input_tensor = train_transform_gpu(X[j:j+1]).to(device)
        interprete(model,input_tensor,targets,"./dropout_prob/"+name[j][0:-4]+".npz")

100%|██████████| 964/964 [13:37<00:00,  1.18it/s]
