In [13]:
import random
from random import sample
import numpy as np
import os
import pickle
from tqdm import tqdm
from collections import OrderedDict
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import precision_recall_curve
from scipy.spatial.distance import mahalanobis
from scipy.ndimage import gaussian_filter
from skimage.segmentation import mark_boundaries
from matplotlib.colors import Normalize
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import torch
import torch.nn.functional as F
from utils import *
import cv2

In [14]:
# device setup
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
print('Device: {}'.format(device))

Device: cuda


In [15]:
config_path = 'config.yaml'
opt = read_config(config_path)
experiment_path = opt['dataset']['save_dir'] + '/' + opt['model']['backbone']
opt['model']['experiment_path'] = experiment_path

os.makedirs(os.path.join(experiment_path, 'temp_%s' % opt['model']['backbone']), exist_ok=True)
train_feature_filepath = os.path.join(experiment_path, 'temp_%s' % opt['model']['backbone'], 'train_%s.pkl' % 'brainmri')
opt['model']['train_feature_filepath'] = train_feature_filepath


pic_save_path = os.path.join(experiment_path, 'pictures')
os.makedirs(pic_save_path, exist_ok=True)

# set random seed
seed = opt['model']['seed']
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if use_cuda:
    torch.cuda.manual_seed_all(seed)

## pre-trained CNN

In [16]:
# Load the pretrained CNN
model, t_d, r_d = load_pretrained_CNN(opt)
model = model.to(device)
model.eval()

Backbone: resnet18
Input dim size: 448
Output dim size after reduced: 180




ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [17]:
# select randomly choosen dimension to reduce the dimensionality of the feature vector (like PCA)
idx = torch.tensor(sample(range(0, t_d), r_d))
outputs = []
def hook(module, input, output):
    outputs.append(output)
model.layer1[-1].register_forward_hook(hook)
model.layer2[-1].register_forward_hook(hook)
model.layer3[-1].register_forward_hook(hook)

<torch.utils.hooks.RemovableHandle at 0x7f30f0d21d00>

## Load Dataset

In [18]:
train_dataloader = load_train_dataset(opt).train_dataloader()

Using 581 IXI images and 130 fastMRI images for training. Using 15 images for validation.


## Normal Class Representation

In [19]:
train_outputs = OrderedDict([('layer1', []), ('layer2', []), ('layer3', [])])

In [20]:
if not os.path.exists(train_feature_filepath):    
# for each batch in the dataloader (use tqdm bar), train dataloader get item returns only x
    for batch_idx, img in tqdm(enumerate(train_dataloader), '| feature extraction | train | %s |' % 'brainmri'):
        img = img.to(device)
        with torch.no_grad():
            _ = model(img)
        for key, value in zip(train_outputs.keys(), outputs):
            train_outputs[key].append(value.cpu().detach())
        # initialize hook outputs
        outputs = []

    for key, value in train_outputs.items():
        train_outputs[key] = torch.cat(value, 0)

    print('first layer shape:', train_outputs['layer1'].shape)
    print('second layer shape:', train_outputs['layer2'].shape)
    print('third layer shape:', train_outputs['layer3'].shape)
    # Embedding concat
    embedding_vectors = train_outputs['layer1'] # get the maximum size of the embedding vectors
    """
    Rresearchers conceptually divide the input image into a grid based on the resolution of the largest activation map—typically
    the first layer of the pre-trained CNN. This way, each grid position, denoted as (i,j), 
    is associated with a unique embedding vector that represents the collective activation vectors for that particular image patch.
    """
    for layer_name in ['layer2', 'layer3']:
        embedding_vectors = embedding_concat(embedding_vectors, train_outputs[layer_name])

    # randomly select d dimension
    print('randomly select %d dimension' % opt['model']['output_dimension'])
    embedding_vectors = torch.index_select(embedding_vectors, 1, idx)

    B, C, H, W = embedding_vectors.size() # Get the shape of the embedding vectors which is same with the first layer of the pretrained model
    print('embedding_vectors shape:', embedding_vectors.shape)
    embedding_vectors = embedding_vectors.view(B, C, H * W)

    # calculate multivariate Gaussian distribution
    mean = torch.mean(embedding_vectors, dim=0).numpy()
    cov = torch.zeros(C, C, H * W).numpy()
    I = np.identity(C)

    # calculate mean, cov and inverse covariance matrix for each patch position at Xij 
    # (each patch position (i,j) is associated with a unique embedding vector)
    for i in range(H * W):
        # Xij = embedding_vectors[:, :, i].numpy()
        cov[:, :, i] = np.cov(embedding_vectors[:, :, i].numpy(), rowvar=False) + 0.01 * I

    # save learned distribution
    train_outputs = [mean, cov]
    with open(train_feature_filepath, 'wb') as f:
        pickle.dump(train_outputs, f)
else:
    with open(train_feature_filepath, 'rb') as f:
        train_outputs = pickle.load(f)   

## PaDiM at Inference Time

In [21]:
from data_loader import get_all_test_dataloaders

In [22]:
test_dataloaders = get_all_test_dataloaders( opt['dataset']['ann_path'],  opt['dataset']['target_size'], opt['dataset']['batch_size'])

In [23]:
test_outputs_dict = {key: OrderedDict([('layer1', []), ('layer2', []), ('layer3', [])]) for key in test_dataloaders.keys()}
metrics = {
    'TP': [],
    'FP': [],
    'Precision': [],
    'Recall': [],
    'ROCAUC' : [],
    'PRO-SCORE': [],
}
all_scores = []

In [24]:
for dataset_key in test_dataloaders.keys():

    test_dataloader = test_dataloaders[dataset_key]
    test_metrics = {
        'TP': [],
        'FP': [],
        'Precision': [],
        'Recall': [],
        'F1': [],
        'ROCAUC' : [],
        'PRO-SCORE': [],
    }
    tps, fns, fps = 0, 0, []

    print('******************* DATASET: {} ****************'.format(dataset_key))

    images = []
    labels = []
    masks = []
    neg_masks = []

    for batch_idx, data in tqdm(enumerate(test_dataloader), '| feature extraction | test | %s | %s |' % (dataset_key, 'brainmri')):
        img, label, mask, neg_mask = data
        images.append(img[0].cpu().detach().numpy())
        labels.append(label[0].cpu().detach().numpy())
        masks.append(mask[0].cpu().detach().numpy())
        neg_masks.append(neg_mask[0].cpu().detach().numpy())

        with torch.no_grad():
            _ = model(img.to(device))

        for key, value in zip(test_outputs_dict[dataset_key].keys(), outputs):
            test_outputs_dict[dataset_key][key].append(value.cpu().detach())
        
        # initialize hook outputs   
        outputs = []

    for key, value in test_outputs_dict[dataset_key].items():
        test_outputs_dict[dataset_key][key] = torch.cat(value, 0)

    print('first layer shape:', test_outputs_dict[dataset_key]['layer1'].shape)
    print('second layer shape:', test_outputs_dict[dataset_key]['layer2'].shape)
    print('third layer shape:', test_outputs_dict[dataset_key]['layer3'].shape)

    # Embedding concat
    embedding_vectors = test_outputs_dict[dataset_key]['layer1'] # get the maximum size of the embedding vectors
    for layer_name in ['layer2', 'layer3']:
        embedding_vectors = embedding_concat(embedding_vectors, test_outputs_dict[dataset_key][layer_name])

    embedding_vectors = torch.index_select(embedding_vectors, 1, idx)
    # calculate mahalanobis distance between train_outputs to give anomaly score to each patch position of the test images
    B, C, H, W = embedding_vectors.size()
    print('embedding_vectors shape:', embedding_vectors.shape)

    embedding_vectors = embedding_vectors.view(B, C, H * W).numpy()
    dist_list = []
    for i in range(H * W):
        mean = train_outputs[0][:, i]
        conv_inv = np.linalg.inv(train_outputs[1][:, :, i])
        dist = [mahalanobis(sample[:, i], mean, conv_inv) for sample in embedding_vectors]
        dist_list.append(dist)
    dist_list = np.array(dist_list).transpose(1, 0).reshape(B, H, W)

    # upsample to image size to get anomaly score map
    dist_list = torch.tensor(dist_list)
    score_map = F.interpolate(dist_list.unsqueeze(1), size=img.size(2), mode='bilinear',
                                align_corners=False).squeeze().numpy()

    # apply gaussian smoothing on the score map
    for i in range(score_map.shape[0]):
        score_map[i] = gaussian_filter(score_map[i], sigma=4)

    # Normalize the score map
    max_score = score_map.max()
    min_score = score_map.min()
    scores = (score_map - min_score) / (max_score - min_score)

    
    num = len(images)
    vmax = scores.max() * 255.
    vmin = scores.min() * 255.

    for i in range(num):
        img = images[i]
        #img = denormalization(img)
        img = img.transpose(1, 2, 0)
        gt = masks[i].transpose(1, 2, 0).squeeze()
        gt_neg = neg_masks[i].transpose(1, 2, 0).squeeze()
        anno_map = scores[i]
        heat_map = scores[i] * 255
        mask = scores[i]
        mask[mask > 0.246] = 1
        mask[mask <= 0.246] = 0
        kernel = morphology.disk(4)
        mask = morphology.opening(mask, kernel)
        mask *= 255
        vis_img = mark_boundaries(img, mask, color=(1, 0, 0), mode='thick')
        fig_img, ax_img = plt.subplots(1, 6, gridspec_kw={'wspace': 0, 'hspace': 0})
        fig_img.set_size_inches(6 * 4, 4)
        norm = matplotlib.colors.Normalize(vmin=vmin, vmax=vmax)

        bboxes = cv2.cvtColor(gt_neg * 255, cv2.COLOR_GRAY2RGB)
        cnts_gt = cv2.findContours((gt * 255).astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        cnts_gt = cnts_gt[0] if len(cnts_gt) == 2 else cnts_gt[1]
        gt_box = []

        for c_gt in cnts_gt:
            x, y, w, h = cv2.boundingRect(c_gt)
            gt_box.append([x, y, x + w, y + h])
            cv2.rectangle(bboxes, (x, y), (x + w, y + h), (0, 255, 0), 1)
                
        x_pos = anno_map * gt
        x_neg = anno_map * gt_neg
        res_anomaly = np.sum(x_pos)
        res_healthy = np.sum(x_neg)


        amount_anomaly = np.count_nonzero(x_pos)
        amount_mask = np.count_nonzero(gt)

        tp = 1 if amount_anomaly > 0.1 * amount_mask else 0  # 10% overlap due to large bboxes e.g. for enlarged ventricles
        fn = 1 if tp == 0 else 0
        fp = int(res_healthy / max(res_anomaly, 1))
        precision = tp / max((tp + fp), 1)

        fp = int(res_healthy / max(res_anomaly, 1))
        fps.append(fp)
        precision = tp / max((tp + fp), 1)
        test_metrics['TP'].append(tp)
        test_metrics['FP'].append(fp)
        test_metrics['Precision'].append(precision)
        test_metrics['Recall'].append(tp)
        test_metrics['F1'].append(2 * (precision * tp) / (precision + tp + 1e-8))

        
        for ax_i in ax_img:
            ax_i.axes.xaxis.set_visible(False)
            ax_i.axes.yaxis.set_visible(False)
        ax_img[0].imshow(img)
        ax_img[0].title.set_text('Input')

        ax_img[1].imshow(vis_img)
        ax_img[1].title.set_text('Localization')

        
        ax_img[2].imshow(bboxes.astype(np.int64), cmap='gray')
        ax_img[2].title.set_text('GT')

        
        ax = ax_img[3].imshow(heat_map, cmap='jet', norm=norm)
        ax_img[3].imshow(img, cmap='gray', interpolation='none')
        ax_img[3].imshow(heat_map, cmap='jet', alpha=0.5, interpolation='none')
        ax_img[3].title.set_text('Anomaly Map')

        
        ax_img[4].imshow(x_pos, cmap='gray')
        ax_img[4].title.set_text(str(np.round(res_anomaly, 2)) + ', TP: ' + str(tp))

        
        ax_img[5].imshow(x_neg, cmap='gray')
        ax_img[5].title.set_text(str(np.round(res_healthy, 2)) + ', FP: ' + str(tp))
        
        left = 0.92
        bottom = 0.15
        width = 0.015
        height = 1 - 2 * bottom
        rect = [left, bottom, width, height]
        cbar_ax = fig_img.add_axes(rect)
        cb = plt.colorbar(ax, shrink=0.6, cax=cbar_ax, fraction=0.046)
        cb.ax.tick_params(labelsize=8)
        font = {
            'family': 'serif',
            'color': 'black',
            'weight': 'normal',
            'size': 8,
        }
        cb.set_label('Anomaly Score', fontdict=font)
        save_dir = os.path.join(opt['dataset']['save_dir'], opt['d'], dataset_key)
        fig_img.savefig(os.path.join(save_dir, class_name + '_{}'.format(i)), dpi=100)
        plt.close()
    


******************* DATASET: absent_septum ****************


| feature extraction | test | absent_septum | brainmri |: 1it [00:00, 111.06it/s]

first layer shape: torch.Size([1, 64, 32, 32])
second layer shape: torch.Size([1, 128, 16, 16])
third layer shape: torch.Size([1, 256, 8, 8])
embedding_vectors shape: torch.Size([1, 180, 32, 32])





KeyboardInterrupt: 