# Nuclei Segmentation Ensemble/Non-Ensemble

In [None]:
# !python -m pip install pyyaml==5.1
import sys, os, distutils.core
# # Note: This is a faster way to install detectron2 in Colab, but it does not include all functionalities.
# # See https://detectron2.readthedocs.io/tutorials/install.html for full installation instructions
# !git clone 'https://github.com/facebookresearch/detectron2'
# dist = distutils.core.run_setup("./detectron2/setup.py")
# !python -m pip install {' '.join([f"'{x}'" for x in dist.install_requires])}
# sys.path.insert(0, os.path.abspath('./detectron2'))

# # Properly install detectron2. (Please do not install twice in both ways)
# # !python -m pip install 'git+https://github.com/facebookresearch/detectron2.git

In [None]:
import torch
import albumentations as albu
from PIL import Image
import numpy as np
import os
import cv2
import copy
import cv2
import segmentation_models_pytorch as smp
from segmentation_models_pytorch.encoders import get_preprocessing_fn
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from pycocotools.coco import COCO
import sys, os, distutils.core

np.random.seed(0)

In [None]:
# model list that will be aggregated
model_list = [
    {
        "architecture": "unet++",
        "encoder_name": "vgg16"
    },
    {
        "architecture": "manet",
        "encoder_name": "resnet152"
    },
    {
        "architecture": "unet++",
        "encoder_name": "resnet152"
    }
]

# please set this to the folder containing the image and the label

TRAIN_PATH = "/data2/SrikanthData/ResearchWork2/Latest_build/20221219-unet/Final_Models/Img_data/unet_cvat_data/train_set"
VALIDATION_PATH = "/data2/SrikanthData/ResearchWork2/Latest_build/20221219-unet/Final_Models/Img_data/unet_cvat_data/validation_set"
TEST_PATH = "/data2/SrikanthData/ResearchWork2/Latest_build/20221219-unet/Final_Models/Img_data/unet_cvat_data/test_set"


SIZE = 192
N_EPOCH = 300

class Types:
    BACKGROUND = 0
    NUCLEUS = 1
    BORDER = 2
    

CLASS_MAP = {
    0: Types.BACKGROUND,
    1: Types.NUCLEUS,
    2: Types.BORDER
}

N_CLASSES = len(set(CLASS_MAP.values()))
COLOR_MAP = {
    Types.BACKGROUND: (0, 0, 0),
    Types.NUCLEUS: (0,0,  255),
    Types.BORDER: (0, 255, 0)
}

# this is set to 3 if input is colored image. else set to 1
N_INPUT_CHANNEL = 3

# Define augmentation function for training and validation

In [None]:
def get_training_augmentation():
    """
    Most of the images have size closest to SIZE, which is a divisible by 32 (required by the model),
    therefore we will set the final size of input image to SIZE
    """
    train_transform = [
        albu.Resize(SIZE, SIZE),

        albu.HorizontalFlip(p=0.5),

        albu.ShiftScaleRotate(
            scale_limit=0.2, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0,
            value=0, mask_value=0
        ),
        albu.PadIfNeeded(min_height=SIZE, min_width=SIZE, always_apply=True, border_mode=0,
            value=0, mask_value=0
        ),
        albu.RandomCrop(height=SIZE, width=SIZE, always_apply=True),

        albu.IAAAdditiveGaussianNoise(p=0.1),
        albu.IAAPerspective(p=0.3),

        albu.OneOf(
            [
                albu.CLAHE(p=1),
                albu.RandomBrightness(p=1),
                albu.RandomGamma(p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.IAASharpen(p=1),
                albu.Blur(blur_limit=3, p=1),
                albu.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.RandomContrast(p=1),
                albu.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return albu.Compose(train_transform)


def get_validation_augmentation():
    """
    Most of the images have size closest to 128, which is a divisible by 32 (required by the model),
    therefore we will set the final size of input image to 128

    No augmentation will be done for validation data except for resizing (from the center) to 128
    """
    test_transform = [
        albu.Resize(SIZE, SIZE)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1)


def get_preprocessing(preprocessing_fn):
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callbale): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """
    
    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)

# Define the Dataset and Dataloader

In [None]:
class SegmentationDataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, augmentation=None,  preprocessing=None):
        self.data_dir = data_dir
        self.augmentation = augmentation
        self.preprocessing = preprocessing


        self.annFile = os.path.join(self.data_dir, "annotations/instances_default.json")
        self.coco = COCO(self.annFile)
        category_ids = self.coco.getCatIds(catNms=["nucleus"])
        image_ids = self.coco.getImgIds(catIds=category_ids)
        self.images = self.coco.loadImgs(image_ids)


    def __len__(self):
        """ function required by torch.utils.data.Dataset """
        return len(self.images)

    def load_coco_image(self, idx):
        coco_image = self.images[idx]
        file_name = os.path.join(self.data_dir, "images", coco_image["file_name"])
        image = Image.open(file_name)
        image = np.array(image)
        image = np.expand_dims(image, axis=-1)
        image = np.repeat (image, 3, axis= -1)
        return image

    def load_coco_mask(self, idx):
        """ load the mask from the image folder """
        coco_image = self.images[idx]
        annotation_ids = self.coco.getAnnIds(imgIds=coco_image["id"], iscrowd=0)                             
                                                                                                        
        annotations = self.coco.loadAnns(annotation_ids)                                                     
        masks = []                                                                                      
        border = np.zeros((coco_image["height"], coco_image["width"])).astype(np.uint8)
        for ann in annotations:                                                                         
            mask = self.coco.annToMask(ann)                                                                  
            masks.append(mask)

            polygons = ann["segmentation"]                                                              
            polygons = np.array(polygons).reshape(-1, 2).astype(int)                                                                       
            for i in range(len(polygons)):                                                              
                # paint the border with color 1
                border = cv2.line(
                    border,
                    list(polygons[i-1]), list(polygons[i]),
                    1, 
                    2,
                )                                                                                              
        # concatenate all masks of all cells        
        masks = np.array(masks)                                                                      

        # set the mask to 1 where there are cells
        masks = (np.sum(masks, axis=0) > 0) + 0                                                         
        # set the border to Types.BORDER (which is 2)
        masks[border == 1] = Types.BORDER
                                                                                                        
        # change 2D to 3D ()                                                                                                
        masks = np.expand_dims(masks, axis=-1).astype(np.int32)
                                                                                                    
        return masks

    def __getitem__(self, idx):
        """ function required by torch.utils.data.Dataset """

        # load the image and the mask
        image = self.load_coco_image(idx)
        mask = self.load_coco_mask(idx)

        # transform the image and mask
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask  = sample["image"], sample["mask"]

        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample["image"], sample["mask"]

        result = {
            "name": self.images[idx]["file_name"],
            "image": image, 
            "mask": mask
        }
        return result


def create_data_loaders(preprocessing_fn=None):
    """
    preprocessing_fn: this is specific to different encoder
    This function create the dataloaders for train and test dataset
    Assuming that the image and mask have the same name, only different in extensions
    """

    datasets = {
        "train": SegmentationDataset(
            data_dir=TRAIN_PATH,
            augmentation=get_training_augmentation(),
            preprocessing=get_preprocessing(preprocessing_fn) if preprocessing_fn is not None else None
        ),
        "val": SegmentationDataset(
            data_dir=VALIDATION_PATH,
            augmentation=get_validation_augmentation(),
            preprocessing=get_preprocessing(preprocessing_fn) if preprocessing_fn is not None else None
        ),
        "test": SegmentationDataset(
            data_dir=TEST_PATH
        )
    }

    data_loaders = {
        "train": torch.utils.data.DataLoader(datasets["train"], batch_size=4, shuffle=True),
        "val": torch.utils.data.DataLoader(datasets["val"], batch_size=4, shuffle=False),
        "test": torch.utils.data.DataLoader(datasets["test"], batch_size=4, shuffle=False)
    }

    return datasets, data_loaders

# Visualize example of train data and validation data
## Some training examples

In [None]:
datasets, dataloaders = create_data_loaders(None)

In [None]:
def inverse_process(processed_image):
    std = np.array([[[0.229, 0.224, 0.225]]])
    mean = np.array([[[0.485, 0.456, 0.406]]])
    processed_image = (processed_image.transpose(1, 2, 0)*std + mean)*255
    processed_image = processed_image.astype(np.uint8)

    return processed_image

def get_colored_mask(processed_mask):
    h, w, _ = processed_mask.shape

    colored_mask = np.zeros((h, w, 3))
    mask = processed_mask[:, :, 0]
    colored_mask[mask == Types.BACKGROUND] = COLOR_MAP[Types.BACKGROUND]
    colored_mask[mask == Types.NUCLEUS] = COLOR_MAP[Types.NUCLEUS]
    colored_mask[mask == Types.BORDER] = COLOR_MAP[Types.BORDER]
    
    colored_mask = colored_mask.astype(np.uint8)
    return colored_mask

In [None]:
plt.figure(figsize=(8, 12))
for count, i in enumerate(range(3)):
   
    one_example = datasets["train"][i]
    
    plt.subplot(3, 2, 2*count + 1)
    plt.title("training image")
    plt.imshow(one_example['image'])
    plt.axis("off")
    plt.subplot(3, 2, 2*count + 2)
    plt.title("training label")
    plt.imshow(get_colored_mask(one_example["mask"]))
    plt.axis("off")
    
plt.savefig("training_examples.jpg")

Some validation example (without augmentation, only resizing)

In [None]:
plt.figure(figsize=(8, 12))
for count, i in enumerate(range(3)):

    one_example = datasets["val"][i]
    
    plt.subplot(3, 2, 2*count + 1)
    plt.title("validation image")
    plt.imshow(one_example['image'])
    plt.axis("off")
    plt.subplot(3, 2, 2*count + 2)
    plt.title("validation label")
    plt.imshow(get_colored_mask(one_example["mask"]))
    plt.axis("off")

plt.savefig("validation_examples.jpg")

# Create the model

In [None]:
class SegmentationCell(pl.LightningModule):

    def __init__(self, architecture="unet", encoder_name=""):
        super().__init__()

        # TODO: add more architecture if you like
        if architecture == "unet":
            self.model = smp.Unet(
                encoder_name=encoder_name,        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
                encoder_weights="imagenet",     # use `imagenet` pre-trained weights for encoder initialization
                in_channels=N_INPUT_CHANNEL,                  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
                classes=N_CLASSES,                      # model output channels (number of classes in your dataset)
            )
        elif architecture == "unet++":
            self.model = smp.UnetPlusPlus(
                encoder_name=encoder_name,        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
                encoder_weights="imagenet",     # use `imagenet` pre-trained weights for encoder initialization
                in_channels=N_INPUT_CHANNEL,                  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
                classes=N_CLASSES,                      # model output channels (number of classes in your dataset)
            )
        elif architecture == "manet":
            self.model = smp.MAnet(
                encoder_name=encoder_name,        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
                encoder_weights="imagenet",     # use `imagenet` pre-trained weights for encoder initialization
                in_channels=N_INPUT_CHANNEL,                  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
                classes=N_CLASSES,                      # model output channels (number of classes in your dataset)
            )           

        self.losses = [
            ("jaccard", 0.4, smp.losses.JaccardLoss(mode="multiclass", from_logits=True)),
            ("dice", 0.6, smp.losses.DiceLoss(mode="multiclass", from_logits=True))
        ]
    def forward(self, x):
        return self.model(x.float())

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        return optimizer

    def training_step(self, train_batch, batch_idx):
        x, masks = train_batch["image"], train_batch["mask"]
        masks = masks.long()
        logits = self.forward(x)

        total_loss = 0
        logs = {}
        for loss_name, weight, loss in self.losses:
            ls_mask = loss(logits, masks)
            total_loss += weight * ls_mask
            logs[f"train_mask_{loss_name}"] = ls_mask

        logs["train_loss"] = total_loss
        self.log("train_loss", total_loss, on_step=True, on_epoch=True, prog_bar=True, logger=False)

        return {"loss": total_loss, "log": logs}


    def validation_step(self, batch, idx):
        x, masks = batch["image"], batch["mask"]

        logits = self.forward(x)
        masks = masks.long()

        total_loss = 0
        logs = {}
        for loss_name, weight, loss in self.losses:
            ls_mask = loss(logits, masks)
            total_loss += weight * ls_mask
            logs[f"val_mask_{loss_name}"] = ls_mask

        logs["val_loss"] = total_loss
        self.log("val_loss", total_loss, on_epoch=True, prog_bar=True, logger=False, on_step=True)
        return {"loss": total_loss, "log": logs}

    # def validation_epoch_end(self, outputs):
    #     self.log("val_loss", outputs[-1]["loss"], on_step=False, on_epoch=True, prog_bar=True, logger=False)

class MetricsCallback(pl.callbacks.Callback):
    """PyTorch Lightning metric callback."""

    def __init__(self):
        super().__init__()
        self.metrics = []

    def on_validation_epoch_end(self, trainer, pl_module):
        each_me = copy.deepcopy(trainer.callback_metrics)
        self.metrics.append(each_me)

## Train 3 models
* resnet34
* mobilenet_v2
* efficientnet-b0

### Train the first model

In [None]:
architecture_name = model_list[0]["architecture"]
encoder_name = model_list[0]["encoder_name"]
preprocessing_fn = get_preprocessing_fn(encoder_name, pretrained='imagenet')
datasets, dataloaders = create_data_loaders(preprocessing_fn=preprocessing_fn)

model_0 = SegmentationCell(architecture_name, encoder_name)

early_stopping_0 = pl.callbacks.early_stopping.EarlyStopping(monitor="val_loss", patience=75, mode="min")
checkpoint_0 = pl.callbacks.model_checkpoint.ModelCheckpoint(monitor="val_loss", mode="min")
metrics_0 = MetricsCallback()

trainer = pl.Trainer(
    devices=1,
    accelerator='auto',
    callbacks=[early_stopping_0, checkpoint_0, metrics_0],
    limit_train_batches=1.0, 
    max_epochs=N_EPOCH,
    log_every_n_steps=1)
trainer.fit(model_0, dataloaders["train"], dataloaders["val"])

# load the best weight for the the model
model_0 = SegmentationCell.load_from_checkpoint(checkpoint_0.best_model_path, architecture=architecture_name, encoder_name=encoder_name)

In [None]:
def plot_metrics(callback_metrics, name):
    """
    This function plots the metrics
    """
    data = [
        [item['train_loss'], item['val_loss']]
        for item in callback_metrics.metrics
        if 'train_loss' in item and 'val_loss' in item
    ]
    x = np.arange(len(data))
    train_loss = [item[0].cpu() for item in data]
    val_loss = [item[1].cpu() for item in data]

    plt.title(name)
    plt.plot(x, train_loss, label='train_loss')
    plt.plot(x, val_loss, label='val_loss')

    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid()

    plt.show()

plot_metrics(metrics_0, f"{architecture_name} - {encoder_name}")

Load the bestweight for model_0

## Train model 2

In [None]:
architecture_name = model_list[1]["architecture"]
encoder_name = model_list[1]["encoder_name"]
preprocessing_fn = get_preprocessing_fn(encoder_name, pretrained='imagenet')
datasets, dataloaders = create_data_loaders(preprocessing_fn=preprocessing_fn)

model_1 = SegmentationCell(architecture_name, encoder_name)

early_stopping_1 = pl.callbacks.early_stopping.EarlyStopping(monitor="val_loss", patience=75, mode="min")
checkpoint_1 = pl.callbacks.model_checkpoint.ModelCheckpoint(monitor="val_loss", mode="min")
metrics_1 = MetricsCallback()

trainer = pl.Trainer(
    devices=1,
    accelerator='auto',
    callbacks=[early_stopping_1, checkpoint_1, metrics_1],
    limit_train_batches=1.0, 
    max_epochs=300,
    log_every_n_steps=1)
trainer.fit(model_1, dataloaders["train"], dataloaders["val"])

# load the best weight for the the model
model_1 = SegmentationCell.load_from_checkpoint(checkpoint_1.best_model_path, architecture=architecture_name, encoder_name=encoder_name)

In [None]:
plot_metrics(metrics_1, f"{architecture_name} - {encoder_name}")

## Train model 3

In [None]:

architecture_name = model_list[2]["architecture"]
encoder_name = model_list[2]["encoder_name"]
preprocessing_fn = get_preprocessing_fn(encoder_name, pretrained='imagenet')
datasets, dataloaders = create_data_loaders(preprocessing_fn=preprocessing_fn)

model_2 = SegmentationCell(architecture_name, encoder_name)

early_stopping_2 = pl.callbacks.early_stopping.EarlyStopping(monitor="val_loss", patience=75, mode="min")
checkpoint_2 = pl.callbacks.model_checkpoint.ModelCheckpoint(monitor="val_loss", mode="min")
metrics_2 = MetricsCallback()

trainer = pl.Trainer(
    devices=1,
    accelerator='auto',
    callbacks=[early_stopping_2, checkpoint_2, metrics_2],
    limit_train_batches=1.0, 
    max_epochs=300 , 
    log_every_n_steps=1)
trainer.fit(model_2, dataloaders["train"], dataloaders["val"])

# load the best weight for the the model
model_2 = SegmentationCell.load_from_checkpoint(checkpoint_2.best_model_path, architecture=architecture_name, encoder_name=encoder_name) 

In [None]:
plot_metrics(metrics_2, f"{architecture_name} - {encoder_name}")

## Create the ensemble model

In [None]:
def ensemble_prediction(image_batch, *models):

    with torch.no_grad():
        for model in models:
            model.eval()
        
        logits = [model(image_batch) for model in models] 
        logits = [torch.nn.functional.softmax(logit, dim=1).cpu() for logit in logits]
        logits = np.concatenate([logit[:, :, np.newaxis, :, :] for logit in logits], axis=2)

        logits = np.mean(logits, axis=2)

    return logits

In [None]:
tmp_iter = iter(dataloaders["test"])
batch = next(tmp_iter)
logits = ensemble_prediction(batch["image"], model_0, model_1, model_2)
plt.figure(figsize=(16, 24))
plt.rcParams['font.size'] = 18
# plt.figure(dpi = 800)
# plt.figure(figsize=(8, 12))
for i, (name, image, gt_mask, pr_mask) in enumerate(zip(batch["name"], batch["image"], batch["mask"], logits)):
    
    image = inverse_process(image.numpy())
    plt.subplot(len(batch["name"]), 3, i*3 + 1)
    plt.imshow(image)
    plt.title(f"Image {name}")
    plt.axis("off")

    plt.subplot(len(batch["name"]), 3, i*3 + 2)
    gt_mask = gt_mask.permute(1, 2, 0).numpy()
    gt_mask = get_colored_mask(gt_mask) 
    plt.imshow(gt_mask)
    plt.title(f"Ground truth")
    plt.axis("off")

    plt.subplot(len(batch["name"]), 3, i*3 + 3)
    pr_mask = np.transpose(pr_mask, (1, 2, 0)).argmax(axis=-1)
    pr_mask = np.expand_dims(pr_mask, axis=-1)
    pr_mask = get_colored_mask(pr_mask)
    plt.imshow(pr_mask) 
    plt.title(f"Prediction")
    plt.axis("off")
    if i >= 4:
        break

# This is to calculate metrics

In [None]:
# import some common detectron2 utilities
from detectron2.data import MetadataCatalog, DatasetCatalog

from detectron2.data.datasets import register_coco_instances
from detectron2.data import build_detection_test_loader

from detectron2.data import detection_utils as utils
import detectron2.data.transforms as T
import copy
from PIL import Image
register_coco_instances(
    "my_dataset_test",
    {},
    "/data2/SrikanthData/ResearchWork2/Latest_build/20221219-unet/Final_Models/Img_data/unet_cvat_data/test_set/annotations/instances_default.json",
    "/data2/SrikanthData/ResearchWork2/Latest_build/20221219-unet/Final_Models/Img_data/unet_cvat_data/test_set/images"
)



IMAGE_SIZE = 192
def mapper(dataset_dict):
    """ To load image """
    dataset_dict = copy.deepcopy(dataset_dict)

    image = Image.open(dataset_dict["file_name"])
    image = np.array(image)
    image = np.expand_dims(image, axis=-1)
    image = np.repeat(image, 3, axis= -1)
    height, width, _ = image.shape

    auginput = T.AugInput(image)
    transform = T.Resize((IMAGE_SIZE, IMAGE_SIZE))(auginput)
    image = torch.from_numpy(auginput.image.transpose(2, 0, 1))
    
    annos = [
        utils.transform_instance_annotations(annotation, [transform], image.shape[1:])
        for annotation in dataset_dict.pop("annotations")
    ]
    return {
       # create the format that the model expects
       "image": image,
       "width": width,
       "height": height,
       "instances": utils.annotations_to_instances(annos, image.shape[1:]),
       "image_id": dataset_dict["image_id"],
    }
 
dataloader = build_detection_test_loader( 
    dataset=DatasetCatalog.get("my_dataset_test"),
    mapper=mapper
)


In [None]:
def find_nuclei(mask):                                                                              
    """                                                                                             
    find a list of nuclei given the mask                                                            
    """                                                                                             
    mask = mask[:, :, 0]
    h, w = mask.shape                                                                               
    checked_pixels = set([])                                                                        
    cells = []                                                                                      
    for i in range(h):                                                                              
        for j in range(w):                                                                          
            pixel_id = i*10000 + j                                                                  
            if mask[i, j] != Types.NUCLEUS or  pixel_id in checked_pixels:                    
                continue                                                                            
                                                                                                    
            cell = np.zeros((h, w))                                                                 
            cell[i, j] = 1                                                                          
            checked_pixels.add(pixel_id)                                                            
                                                                                                    
            # do breadth first search and and find all the neightbor having the same color          
            # first layer only has 1 pixel                                                          
            neighbors = [[[i, j]]]                                                                  
            found_other_pixel = False                                                               
            while True:                                                                             
                new_layers = []                                                                     
                # loop through all pixcel in last layer                                             
                for last_point in neighbors[-1]:                                                    
                                                                                                    
                    # find all their neighbor and add add to the new_layers                         
                    tmp_i, tmp_j = last_point                                                       
                    for dh in [-1, 0, 1]:                                                           
                        for dw in [-1, 0, 1]:                                                       
                            if dh == 0 and dw == 0 :                                                
                                continue                                                            
                                                                                                    
                            y, x = tmp_i + dh, tmp_j + dw                                           
                            if y < 0 or y >= h or x < 0 or x >= w:                                  
                                continue                                                            
                            pixel_id = (y)*10000 + x                                                
                            if pixel_id in checked_pixels:                                          
                                continue                                                            
                                                                                                    
                            if mask[tmp_i + dh, tmp_j + dw] == Types.NUCLEUS:                 
                                new_layers.append([y, x])                                           
                                cell[y, x] = 1                                                      
                                checked_pixels.add(pixel_id)                                        
                                                                                                    
                # if no more pixel is found for the new layer, exit the while loop                  
                if len(new_layers) == 0:                                                            
                    break                                                                           
                                                                                                    
                neighbors.append(new_layers)                                                        
                                                                                                    
            # ignore cell that is too small                                                         
            if np.sum(cell) > 30:                                                                   
                cells.append(cell)                                                                  
    return cells

def find_box_from_binary_mask(mask):
    """ find the bounding box from mask prediction """
    bool_mask = (mask == 1)

    horizontal_mask = np.argwhere(np.any(bool_mask, axis=0))
    vertical_mask = np.argwhere(np.any(bool_mask, axis=1))

    xmin = np.min(horizontal_mask)
    xmax = np.max(horizontal_mask)
    ymin = np.min(vertical_mask)
    ymax = np.max(vertical_mask)

    return [xmin, ymin, xmax, ymax]

    

In [None]:
from detectron2.structures import Instances, Boxes

from detectron2.evaluation import (
    COCOEvaluator,
    DatasetEvaluator,
    inference_on_dataset,
)
from detectron2.evaluation.coco_evaluation import (
    _evaluate_predictions_on_coco,
)
from detectron2.data import build_detection_test_loader
import itertools
from detectron2.utils.file_io import PathManager

from pycocotools.cocoeval import COCOeval

try:
    from detectron2.evaluation.fast_eval_api import COCOeval_opt
except ImportError:
    COCOeval_opt = COCOeval

def step_summarization(coco_eval,  ap=1, iouThr=None, areaRng='all', maxDets=100 ):
    """
    Helper function to get the average precision at given IoU
    """
    p = coco_eval.params
    maxDets = coco_eval.params.maxDets[2]
    iStr = ' {:<18} {} @[ IoU={:<9} | area={:>6s} | maxDets={:>3d} ] = {:0.3f}'
    titleStr = 'Average Precision' if ap == 1 else 'Average Recall'
    typeStr = '(AP)' if ap==1 else '(AR)'
    iouStr = '{:0.2f}:{:0.2f}'.format(p.iouThrs[0], p.iouThrs[-1]) \
        if iouThr is None else '{:0.2f}'.format(iouThr)

    aind = [i for i, aRng in enumerate(p.areaRngLbl) if aRng == areaRng]
    mind = [i for i, mDet in enumerate(p.maxDets) if mDet == maxDets]
    if ap == 1:
        # dimension of precision: [TxRxKxAxM]
        s = coco_eval.eval['precision']
        # IoU
        if iouThr is not None:
            t = np.where(iouThr == p.iouThrs)[0]
            s = s[t]
        s = s[:,:,:,aind,mind]
    else:
        # dimension of recall: [TxKxAxM]
        s = coco_eval.eval['recall']
        if iouThr is not None:
            t = np.where(iouThr == p.iouThrs)[0]
            s = s[t]
        s = s[:,:,aind,mind]
    if len(s[s>-1])==0:
        mean_s = -1
    else:
        mean_s = np.mean(s[s>-1])
    return mean_s

class CustomizedEvaluator(COCOEvaluator):
    """
    Customized Evaluator to return the intermediate COCOEval result, instead of
    printing only the final results
    """
    def __init__(self, *args, **kwargs):
        super(CustomizedEvaluator, self).__init__(*args, **kwargs)
        self._coco_evals = {}
    
    def _eval_predictions(self, predictions, img_ids=None):
        """
        Evaluate predictions. Fill self._results with the metrics of the tasks.
        """
        self._logger.info("Preparing results for COCO format ...")
        coco_results = list(itertools.chain(*[x["instances"] for x in predictions]))
        tasks = self._tasks or self._tasks_from_predictions(coco_results)

        # unmap the category ids for COCO
        if hasattr(self._metadata, "thing_dataset_id_to_contiguous_id"):
            dataset_id_to_contiguous_id = self._metadata.thing_dataset_id_to_contiguous_id
            all_contiguous_ids = list(dataset_id_to_contiguous_id.values())
            num_classes = len(all_contiguous_ids)
            assert min(all_contiguous_ids) == 0 and max(all_contiguous_ids) == num_classes - 1

            reverse_id_mapping = {v: k for k, v in dataset_id_to_contiguous_id.items()}
            for result in coco_results:
                category_id = result["category_id"]
                assert category_id < num_classes, (
                    f"A prediction has class={category_id}, "
                    f"but the dataset only has {num_classes} classes and "
                    f"predicted class id should be in [0, {num_classes - 1}]."
                )
                result["category_id"] = reverse_id_mapping[category_id]

        if self._output_dir:
            file_path = os.path.join(self._output_dir, "coco_instances_results.json")
            self._logger.info("Saving results to {}".format(file_path))
            with PathManager.open(file_path, "w") as f:
                f.write(json.dumps(coco_results))
                f.flush()

        if not self._do_evaluation:
            self._logger.info("Annotations are not available for evaluation.")
            return

        self._logger.info(
            "Evaluating predictions with {} COCO API...".format(
                "unofficial" if self._use_fast_impl else "official"
            )
        )
        for task in sorted(tasks):
            assert task in {"bbox", "segm", "keypoints"}, f"Got unknown task: {task}!"
            coco_eval = (
                _evaluate_predictions_on_coco(
                    self._coco_api,
                    coco_results,
                    task,
                    kpt_oks_sigmas=self._kpt_oks_sigmas,
                    cocoeval_fn=COCOeval_opt if self._use_fast_impl else COCOeval,
                    img_ids=img_ids,
                    max_dets_per_image=self._max_dets_per_image,
                )
                if len(coco_results) > 0
                else None  # cocoapi does not handle empty results very well
            )

            res = self._derive_coco_results(
                coco_eval, task, class_names=self._metadata.get("thing_classes")
            )
            self._results[task] = res
            # add the coco_eval to the attribute of the Evaluator
            self._coco_evals[task] = coco_eval


evaluator = CustomizedEvaluator("my_dataset_test")
evaluator.reset()

# loop through all images in the validation dataset
for i, inputs in enumerate(dataloader):
    image = inputs[0]["image"] # in (3, H, W)
    height = inputs[0]["height"]
    width = inputs[0]["width"]
    image = np.transpose(image, [1, 2, 0])   # now it is at (H, W, 3)
    image = preprocessing_fn(image)
    image = np.transpose(image, [2, 0, 1]) # change back to (3, H, W)
    logits = ensemble_prediction(torch.unsqueeze(image, axis=0), model_0, model_1, model_2)
    pr_mask = logits[0]
    # Get the predictions of shape (H, W, 1)
    pr_mask = np.transpose(pr_mask, (1, 2, 0)).argmax(axis=-1).astype(float)
    pr_mask = cv2.resize(pr_mask, (width, height))
    pr_mask = np.expand_dims(pr_mask, axis=-1).astype(np.uint8)

    colored_mask = get_colored_mask(pr_mask)

    # resize the mask to the correct size, otherwise, the evaluator won't work
    # Get all the cells
    cells = find_nuclei(pr_mask)

    # put the output in formats that is acceptable by detectron's evaluator
    outputs = [{
        "instances": Instances(
            image_size=(height, width),
            scores=torch.Tensor([1.0 for _ in range(len(cells))]),
            pred_classes=torch.Tensor(np.array([Types.NUCLEUS - 1 for _ in range(len(cells))], dtype=np.int32)),
            pred_masks=torch.Tensor(np.array(cells)) == 1,
            pred_boxes=Boxes(torch.Tensor([find_box_from_binary_mask(cell) for cell in cells]))
        )
    }]
    evaluator.process(inputs, outputs)

evaluation_results = evaluator.evaluate()
print(evaluation_results)


# Plot Average Precision vs IoU

In [None]:
import matplotlib.pyplot as plt

ious = np.linspace(0.5, 0.95, 10)
aps = [
    step_summarization(evaluator._coco_evals["segm"], 1, iouThr=iou, maxDets=-1)
    for iou in ious
]

print("IoU: ", ious)
print("Average Precision: ", aps)
plt.plot(ious, aps, 'o--', label="Ensemble")
plt.ylabel("Average Precision")
plt.xlabel("IoU Threshold")
plt.legend()
plt.grid()
plt.show()

Calculate Dice score and Jaccard score

In [None]:
from detectron2.structures import polygons_to_bitmask

from segmentation_models_pytorch.losses.dice import DiceLoss
from segmentation_models_pytorch.losses.jaccard import JaccardLoss


dice_loss = DiceLoss(
    mode='multiclass',
    from_logits=False,
)

jaccard_loss = JaccardLoss(
    mode='multiclass',
    from_logits=False
)

def calculate_dice():
    """
    function to calculate the DICE score for test dataset
    """
    dataloader = build_detection_test_loader( 
        dataset=DatasetCatalog.get("my_dataset_test"),
        mapper=mapper
    )

    masks = np.zeros
    dice_scores = []
    jaccard_scores = []

    # -----------------------

    # loop through all images in the validation dataset
    for i, inputs in enumerate(dataloader):
        # get the image
        image = inputs[0]["image"] # in (3, H, W)
        real_h = inputs[0]["height"]
        real_w= inputs[0]["width"]
        image = np.transpose(image, [1, 2, 0])   # now it is at (H, W, 3)
        image = preprocessing_fn(image)
        image = np.transpose(image, [2, 0, 1]) # change back to (3, H, W)

        # get prediction
        y_pred = ensemble_prediction(torch.unsqueeze(image, axis=0), model_0, model_1, model_2)
        y_pred = np.argmax(y_pred, axis=1)
        # set all the border pixels no nuclei pixels
        y_pred[y_pred > 1]  = 1
        
        
        # 1-hot, then change to (n, c, h, w)
        y_pred = np.eye(2)[y_pred]
        y_pred = np.transpose(y_pred, (0, 3, 1, 2))

        # extract the ground truth
        y_true = np.zeros(inputs[0]['instances'].image_size, dtype=int)
        for polygon in inputs[0]['instances'].gt_masks.polygons:
            y_true[polygons_to_bitmask(polygon, height=y_true.shape[0], width=y_true.shape[1])] = 1
    
        y_true = np.expand_dims(y_true, axis=0)                    # (n, h, w)
    
        # calculate dice and jaccard
        dice_scores.append(
            1 - dice_loss(torch.from_numpy(y_pred), torch.from_numpy(y_true)).numpy()
        )
        jaccard_scores.append(
            1 - jaccard_loss(torch.from_numpy(y_pred), torch.from_numpy(y_true)).numpy()
        )
 
    print("\nDice Score: ", np.mean(dice_scores))
    print("\nJaccard Score: ", np.mean(jaccard_scores))
    return np.mean(dice_scores)
        
dice = calculate_dice()

# Print final metrics

In [None]:
import pandas as pd

df = pd.DataFrame(
    {
        "dice": [dice],
        "precision_0.7": [step_summarization(evaluator._coco_evals["segm"], 1, iouThr=0.7, maxDets=-1)],
        "recall_0.7": [step_summarization(evaluator._coco_evals["segm"], 0, iouThr=0.7, maxDets=-1)],
     
        "precision_0.5": [step_summarization(evaluator._coco_evals["segm"], 1, iouThr=0.5, maxDets=-1)],
        "recall_0.5": [step_summarization(evaluator._coco_evals["segm"], 0, iouThr=0.5, maxDets=-1)],
    }
)
df["f1_0.7"] = df["precision_0.7"]*df["recall_0.7"]*2/(df["precision_0.7"] + df["recall_0.7"])
df["f1_0.5"] = df["precision_0.5"]*df["recall_0.5"]*2/(df["precision_0.5"] + df["recall_0.5"])

df = df[["dice", "f1_0.7", "precision_0.7", "recall_0.7", "f1_0.5", "precision_0.5", "recall_0.5"]]

df