In [1]:
%matplotlib inline

In [2]:
import argparse
import logging
import os
import os.path as osp
import sys
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
# Change this path to wherever you installed the Pytorch-UNet module
sys.path.append(r"C:\Users\Groh\Documents\GitHub\unet-nested-multiple-classification")
# sys.path.append(r"C:\Users\Groh\Documents\GitHub\colabtools\google")
os.environ['KMP_DUPLICATE_LIB_OK']='True'

import cv2
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
from torchvision.transforms import functional as func
from torchvision import transforms
from PIL import Image
from tqdm import tqdm

from unet import NestedUNet
from unet import UNet
from utils.dataset import BasicDataset
from config import UNetConfig

# from google.colab import drive
# drive.mount('/content/drive')
# os.chdir("/content/drive/My Drive")

### Load image directories and u-net

In [3]:
dir_img = r"C:\Users\Groh\Desktop\TrainingDigEndo\images_training"
dir_mask = r"C:\Users\Groh\Desktop\TrainingDigEndo\masks_grayscale"
checkpoint = r"C:\Users\Groh\Desktop\TrainingDigEndo\Checkpoints\checkpoints_training05-02\epoch_100.pth"
sessions_to_test = ["03","04"]

cfg = UNetConfig()
net = eval(cfg.model)(cfg)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net.to(device=device)
net.load_state_dict(torch.load(checkpoint, map_location=device))

input_imgs = [i for i in os.listdir(dir_img) if os.path.isfile(os.path.join(dir_img,i)) and \
     i.startswith(tuple(["aicm"+ses for ses in sessions_to_test]))]
print("\nAmount of Input images: {}".format(len(input_imgs)))


Amount of Input images: 223


In [4]:
# Evaluation function
def eval_net(net, loader, device, n_val, cfg):
    net.eval()
    tot = 0

    with tqdm(total=n_val, desc='Validation round', unit='img', leave=False) as pbar:
        for batch in loader:
            imgs = batch['image']
            true_masks = batch['mask']

            imgs = imgs.to(device=device, dtype=torch.float32)
            mask_type = torch.float32 if cfg.n_classes == 1 else torch.long
            true_masks = true_masks.to(device=device, dtype=mask_type)

            # compute loss
            if cfg.deepsupervision:
                masks_preds = net(imgs)
                loss = 0
                for masks_pred in masks_preds:
                    tot_cross_entropy = 0
                    for true_mask, pred in zip(true_masks, masks_pred):
                        pred = (pred > cfg.out_threshold).float()
                        if cfg.n_classes > 1:
                            sub_cross_entropy = F.cross_entropy(pred.unsqueeze(dim=0), true_mask.unsqueeze(dim=0).squeeze(1)).item()
                        else:
                            sub_cross_entropy = dice_coeff(pred, true_mask.squeeze(dim=1)).item()
                        tot_cross_entropy += sub_cross_entropy
                    tot_cross_entropy = tot_cross_entropy / len(masks_preds)
                    tot += tot_cross_entropy
            else:
                masks_pred = net(imgs)
                for true_mask, pred in zip(true_masks, masks_pred):
                    pred = (pred > cfg.out_threshold).float()
                    if cfg.n_classes > 1:
                        tot += F.cross_entropy(pred.unsqueeze(dim=0), true_mask.unsqueeze(dim=0).squeeze(1)).item()
                    else:
                        tot += dice_coeff(pred, true_mask.squeeze(dim=1)).item()

            pbar.update(imgs.shape[0])

    return tot / n_val


In [5]:
# This function generates prediction masks
def inference_one(net, image, device):
    net.eval()
    
    image_array = np.array(image)
    image_array = image_array.transpose((2,0,1))
    img = torch.from_numpy(image_array)

    img = img.unsqueeze(0)
    img = img.to(device=device, dtype=torch.float32)

    with torch.no_grad():
        output = net(img)
        if cfg.deepsupervision:
            output = output[-1]

        if cfg.n_classes > 1:
            probs = F.softmax(output, dim=1)
        else:
            probs = torch.sigmoid(output)

        probs = probs.squeeze(0)

        tf = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize((image.size[1], image.size[0])),
                transforms.ToTensor()
            ]
        )
        
        if cfg.n_classes == 1:
            probs = tf(probs.cpu())
            mask = probs.squeeze().cpu().numpy()
            return mask > cfg.out_threshold
        else:
            masks = []
            for prob in probs:
                prob = tf(prob.cpu())
                mask = prob.squeeze().cpu().numpy()
                mask = mask > cfg.out_threshold
                masks.append(mask)
            return masks

In [6]:
def getSelectedFilenames(directory, sessions):
    all_filenames = list(sorted(os.listdir(directory)));
    selected_filenames = []
  
    for session in sessions:
        string = 'aicm' + session
        for name in all_filenames:
            if string in name:
                selected_filenames.append(name)
    
    return selected_filenames

In [7]:
def binary_variability(masks_predic, mask_true):
        # Check whether both images have the same input shape
        if masks_predic[0].shape != mask_true.shape:
            raise ValueError("The two masks have different shapes. Please adjust the shapes accordingly.")
        else:
            
            intersection=0
            union=0
            
            # Calculate the intersection and the union of both masks
            for i in range(mask_true.shape[0]):
                for j in range(mask_true.shape[1]):
                    if any(element[i][j] for element in masks_predic[1:]) and mask_true[i][j]:
                        intersection+=1
                    if any(element[i][j] for element in masks_predic[1:]) or mask_true[i][j]:
                        union+=1       
            
            # Return dice score, as well as intersection over union (iou)
            # Quality metrics don't make sense, when the given class is not visible, hence the NaNs.
            if not mask_true.any():
                dice = np.NaN
                iou = np.NaN
            else:
                dice = (2. * intersection)/(np.sum(np.array(masks_predic[1:])) + mask_true.sum())
                iou = intersection/union
        return dice, iou

In [8]:
def variability(mask_predic, mask_true):
        # Check whether both images have the same input shape
        if mask_predic.shape != mask_true.shape:
            raise ValueError("The two masks have different shapes. Please adjust the shapes accordingly.")
        else:
            
            intersection=0
            union=0
            
            # Calculate the intersection and the union of both masks
            for i in range(mask_true.shape[0]):
                for j in range(mask_true.shape[1]):
                    if mask_predic[i][j] and mask_true[i][j]:
                        intersection+=1
                    if mask_predic[i][j] or mask_true[i][j]:
                        union+=1       
            
            # Return dice score, as well as intersection over union (iou)
            # Quality metrics don't make sense, when the given class is not visible, hence the NaNs.
            if not mask_true.any():
                dice = np.NaN
                iou = np.NaN
            else:
                dice = (2. * intersection)/(mask_predic.sum() + mask_true.sum())
                iou = intersection/union
        return dice, iou

In [9]:
class CustomDataset(Dataset):
    def __init__(self, images_directory, masks_directory, sessions, transform=None):  
        self.images_filenames = getSelectedFilenames(images_directory, sessions)
        self.images_directory = images_directory
        self.masks_directory = masks_directory
        self.transform = transform
        self.sessions = sessions          

    def __len__(self):
        return len(self.images_filenames)

    def __getitem__(self, idx):
        image_filename = self.images_filenames[idx]

        pathImage = os.path.join(self.images_directory, image_filename);
        pathMask = os.path.join(self.masks_directory, image_filename);
        
        pillow_image = Image.open(pathImage)
        image = np.array(pillow_image)
        pillow_mask = Image.open(pathMask).convert('L')
        mask = np.array(pillow_mask)
                
        if self.transform is not None:
            transformed = self.transform(image=image, mask=mask)
            image = transformed["image"]
            mask = transformed["mask"]
            image = np.array(image)
            mask = np.array(mask)
        
        mask = torch.from_numpy(mask)
        mask = mask.unsqueeze(0)
        
        image = image.transpose((2,0,1))
        image = torch.from_numpy(image)


        return {
            'image': image,
            'mask':  mask
        } 

    def preprocess(cls, pil_img, scale):
        w, h = pil_img.size
        newW, newH = int(scale * w), int(scale * h)
        assert newW > 0 and newH > 0, 'Scale is too small'
        pil_img = pil_img.resize((newW, newH))

        img_nd = np.array(pil_img)

        if len(img_nd.shape) == 2:
            img_nd = np.expand_dims(img_nd, axis=2)

        # HWC to CHW
        img_trans = img_nd.transpose((2, 0, 1))
        if img_trans.max() > 1:
            img_trans = img_trans / 255

        return img_trans         
        
val_dataset = CustomDataset(dir_img, dir_mask, sessions_to_test)

### Evaluate Cross-Entropy

In [13]:
n_val = len(val_dataset)
val = val_dataset

val_loader = DataLoader(val,
                        batch_size=cfg.batch_size,
                        shuffle=False,
                        num_workers=0,
                        pin_memory=True)

val_score = eval_net(net, val_loader, device, n_val, cfg)

print(val_score)


                                                                                                                       

1.1169132915045648




### Binary Segmentation

In [None]:
# Preallocate arrays
total_dice = np.zeros(int(len(input_imgs)))
total_dice[:] = np.NaN                  
total_iou = np.zeros(int(len(input_imgs)))
total_iou[:] = np.NaN     

img_count = 0
for i, img_name in tqdm(enumerate(input_imgs)):
    img_path = osp.join(dir_img, img_name)
    img = Image.open(img_path)

    mask = inference_one(net=net,
                         image=img,
                         device=device)

    img = cv2.imread(os.path.join(dir_mask, img_name), cv2.IMREAD_GRAYSCALE)
    mask_true = img != 0

    dice, iou = binary_variability(mask, mask_true)
    total_dice[img_count] = dice
    total_iou[img_count] = iou
    
    print("Image {} --- Dice Score: {:.5f} | IOU: {:.5f}".format(img_name, dice, iou))
    img_count += 1

print("\nAverage Binary Dice Score: {:.4f} (STD: {:.4f})".format(np.nanmean(total_dice), np.nanstd(total_dice)))
print("Average Binary IOU: {:.4f} (STD: {:.4f})".format(np.nanmean(total_iou), np.nanstd(total_iou)))
q75, q25 = np.nanpercentile(total_dice, [75 ,25])
print("\nMedian Binary Dice Score: {:.4f} (IQR: {:.4f}, {:.4f})".format(np.nanmedian(total_dice), q75, q25))
q75, q25 = np.nanpercentile(total_iou, [75 ,25])
print("Median Binary IOU: {:.4f} (IQR: {:.4f}, {:.4f})".format(np.nanmedian(total_iou), q75, q25))

1it [00:03,  3.10s/it]

Image aicm03_02_000002.png --- Dice Score: 0.22173 | IOU: 0.12469


2it [00:06,  3.02s/it]

Image aicm03_02_000004.png --- Dice Score: 0.00000 | IOU: 0.00000


3it [00:08,  2.98s/it]

Image aicm03_02_000006.png --- Dice Score: 0.48272 | IOU: 0.31814


### Multi-Class Segmentation

In [None]:
# Preallocate arrays
total_dice = np.zeros((cfg.n_classes, int(len(input_imgs))))
total_dice[:] = np.NaN                  
total_iou = np.zeros((cfg.n_classes, int(len(input_imgs))))
total_iou[:] = np.NaN     

img_count = 0
for i, img_name in tqdm(enumerate(input_imgs)):
    img_path = osp.join(dir_img, img_name)
    img = Image.open(img_path)

    mask = inference_one(net=net,
                         image=img,
                         device=device)
    for idx in range(0,len(mask)):
        img = cv2.imread(os.path.join(dir_mask, img_name), cv2.IMREAD_GRAYSCALE)
        mask_true = img == idx
        mask_predic = mask[idx]
        dice, iou = variability(mask_predic, mask_true)
        total_dice[idx,img_count] = dice
        total_iou[idx,img_count] = iou
        print("Image {}, Class {} --- Dice Score: {:.5f} | IOU: {:.5f}".format(img_name, idx, dice, iou))
    img_count += 1

for idx in range(0,cfg.n_classes):
    print("\nAverage Dice Score for Class {}: {:.4f} (STD: {:.4f})".format(idx, np.nanmean(total_dice[idx]), np.nanstd(total_dice[idx])))
    print("Average IOU for Class {}: {:.4f} (STD: {:.4f})".format(idx, np.nanmean(total_iou[idx]), np.nanstd(total_iou[idx])))
    q75, q25 = np.nanpercentile(total_dice[idx], [75 ,25])
    print("\nMedian Dice Score for Class {}: {:.4f} (IQR: {:.4f}, {:.4f})".format(idx, np.nanmedian(total_dice[idx]), q75, q25))
    q75, q25 = np.nanpercentile(total_iou[idx], [75 ,25])
    print("Median IOU for Class {}: {:.4f} (IQR: {:.4f}, {:.4f})".format(idx, np.nanmedian(total_iou[idx]), q75, q25))

### Save predicted masks