## Stain normalization

In [1]:
# https://github.com/wanghao14/Stain_Normalization
!wget https://raw.githubusercontent.com/wanghao14/Stain_Normalization/master/stain_utils.py
!wget https://raw.githubusercontent.com/wanghao14/Stain_Normalization/master/stainNorm_Macenko.py
!pip install spams

--2023-02-03 15:12:21--  https://raw.githubusercontent.com/wanghao14/Stain_Normalization/master/stain_utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4255 (4.2K) [text/plain]
Saving to: ‘stain_utils.py’


2023-02-03 15:12:21 (44.0 MB/s) - ‘stain_utils.py’ saved [4255/4255]

--2023-02-03 15:12:22--  https://raw.githubusercontent.com/wanghao14/Stain_Normalization/master/stainNorm_Macenko.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2663 (2.6K) [text/plain]
Saving to: ‘stainNorm_Macenko.py’


2023-02-03 15:12:22 (50.3 

In [2]:
!pwd

/content


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%cd drive/MyDrive/nuclei_segmentation

/content/drive/MyDrive/nuclei_segmentation


In [5]:
import os
import cv2
import glob
import shutil
import numpy as np
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt

import stain_utils as utils
import stainNorm_Macenko

In [6]:
def  macenko_normalize(img_dir: str, ref_img_dir:str):
    # read image
    img = utils.read_image(img_dir)
    n = stainNorm_Macenko.Normalizer()

    # fit macenko normallizer on reference image
    n.fit(np.array(Image.open(ref_img_dir))) 

    # stain normalize H&E image
    normalized_img = n.transform(img)

    return normalized_img

In [7]:
def create_path(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [8]:
train_dir = "dataset/monuseg/original/train/tissue_images"
test_dir = "dataset/monuseg/original/test/tissue_images"

In [9]:
train_stain_normalized_images_path = "dataset/monuseg/stain_normalized/train/tissue_images"
create_path(train_stain_normalized_images_path)

test_stain_normalized_images_path = "dataset/monuseg/stain_normalized/test/tissue_images"
create_path(test_stain_normalized_images_path)

# reference image path 
ref_img_dir = "dataset/monuseg/original/train/tissue_images/TCGA-AR-A1AS-01Z-00-DX1.tif"

In [10]:
for image_path in tqdm(glob.glob(os.path.join(train_dir, "*")), total=len(os.listdir(train_dir))):
    name = os.path.basename(image_path)
    normI= macenko_normalize(image_path, ref_img_dir)
    normI = Image.fromarray(normI.astype(np.uint8))
    normI.save(os.path.join(train_stain_normalized_images_path, name))

100%|██████████| 30/30 [02:13<00:00,  4.46s/it]


In [11]:
for image_path in tqdm(glob.glob(os.path.join(test_dir, "*")), total=len(os.listdir(test_dir))):
    name = os.path.basename(image_path)
    normI= macenko_normalize(image_path, ref_img_dir)
    normI = Image.fromarray(normI.astype(np.uint8))
    normI.save(os.path.join(test_stain_normalized_images_path, name))

100%|██████████| 14/14 [01:01<00:00,  4.42s/it]


## Modifying GT

In [12]:
import xml.etree.ElementTree as ET
from skimage.draw import polygon, polygon_perimeter

In [13]:
# https://github.com/ykrmm/monuseg/blob/main/dataset_utils/xml_to_mask.py
# https://github.com/bnsreenu/python_for_microscopists/blob/master/tips_tricks_31_generating_borders_around_objects.py
# a function to generate border
def generate_boarder(_mask, boarder_size=5, n_erosions=1):
    # Define a kernel for erosion
    erosion_kernel = np.ones((3, 3), dtype=np.uint8)
    eroded_mask = cv2.erode(_mask, erosion_kernel, iterations=n_erosions)

    # Define kernel size for dilation
    kernel_size = 2 * boarder_size + 1
    dilation_kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8)
    dilated_mask = cv2.dilate(eroded_mask, dilation_kernel, iterations=1)

    dilated_127 = np.where(dilated_mask == 255, 127, 0)

    mask_with_boarders = np.where(eroded_mask > 0, 255, dilated_127)

    return mask_with_boarders

def generate_mask(xml_file, shape, instance=False, color=False):
    tree = ET.parse(xml_file)
    xDoc = tree.getroot()
    regions = xDoc.iter('Region')
    array_xy = []
    
    for i,region in enumerate(regions): # Region = nuclei 
        verticies = region.iter('Vertex')
        l_verticies = len(list(region.iter('Vertex')))
        xy = []
        for vertexi,vertex in enumerate(region.iter('Vertex')):  
            #get the x value of that vertex
            x = float(vertex.attrib['X'])
            y = float(vertex.attrib['Y'])

            xy.append([x, y])        # finally save them into the array
        array_xy.append(xy)
    array_xy = np.array(array_xy, dtype=list)  
    ncol, nrow = shape[0], shape[1]
    binary_mask = np.zeros((nrow, ncol))
    color_mask = np.zeros((3, nrow, ncol))

    for i,r in enumerate(array_xy):    #for each region
        smaller_x = np.array(r)[:,0] 
        smaller_y = np.array(r)[:,1]
        if instance:
            value = i + 1
        else:
            value = 1
        r1, c1 = polygon(smaller_x, smaller_y, shape=(nrow, ncol))
        binary_mask[r1, c1] = value
        if color:
            temp_mask = np.zeros((nrow, ncol), dtype=int)
            temp_mask[r1, c1] = 1
            color_mask = color_mask + np.stack((np.random.rand() * temp_mask, np.random.rand()* temp_mask, np.random.rand() * temp_mask))
    if color:
        return binary_mask.T, color_mask.T
    else:
        return binary_mask.T

def generate_ternary_mask(xml_file, shape):
    tree = ET.parse(xml_file)
    xDoc = tree.getroot()
    regions = xDoc.iter('Region')
    array_xy = []
    
    for i,region in enumerate(regions): # Region = nuclei 
        verticies = region.iter('Vertex')
        l_verticies = len(list(region.iter('Vertex')))
        xy = []
        for vertexi,vertex in enumerate(region.iter('Vertex')):  
            #get the x value of that vertex
            x = float(vertex.attrib['X'])
            y = float(vertex.attrib['Y'])

            xy.append([x, y])        # finally save them into the array
        array_xy.append(xy)
    array_xy = np.array(array_xy, dtype=list)  
    ncol, nrow = shape[0], shape[1]
    ternary_mask = np.zeros((nrow,ncol))

    for i,r in enumerate(array_xy):    #for each region
        smaller_x = np.array(r)[:,0] 
        smaller_y = np.array(r)[:,1]
        try:
            r1, c1 = polygon(smaller_x, smaller_y, shape=(nrow, ncol))
            ternary_mask[r1, c1] += 1
            r2, c2 = polygon_perimeter(smaller_x, smaller_y, shape=(nrow, ncol))
            ternary_mask[r2, c2] = 0
        except:
            continue

    temp_mask = np.where(ternary_mask > 1, 1, 0)
    ternary_mask[ternary_mask > 1] = 0
    ternary_mask[ternary_mask == 1] = 255
    ternary_mask = generate_boarder(ternary_mask)
    ternary_mask[temp_mask== 1] = 127
        
    return ternary_mask.T

In [14]:
train_xml_dir = "dataset/monuseg/original/train/annotations"
test_xml_dir = "dataset/monuseg/original/test/annotations"
shape = (1000, 1000)

In [15]:
train_instance_mask_path = "dataset/monuseg/stain_normalized/train/instance_masks"
create_path(train_instance_mask_path)
train_binary_mask_path = "dataset/monuseg/stain_normalized/train/binary_masks"
create_path(train_binary_mask_path)
train_modified_mask_path = "dataset/monuseg/stain_normalized/train/modified_masks"
create_path(train_modified_mask_path)

test_instance_mask_path = "dataset/monuseg/stain_normalized/test/instance_masks"
create_path(test_instance_mask_path)
test_binary_mask_path = "dataset/monuseg/stain_normalized/test/binary_masks"
create_path(test_binary_mask_path)
test_modified_mask_path = "dataset/monuseg/stain_normalized/test/modified_masks"
create_path(test_modified_mask_path)

In [16]:
test_color_mask_path = "dataset/monuseg/stain_normalized/test/color_masks"
create_path(test_color_mask_path)

In [17]:
for xml_path in tqdm(glob.glob(os.path.join(train_xml_dir, "*")), total=len(os.listdir(train_xml_dir))):
    name = os.path.basename(xml_path)

    binary_mask = generate_mask(xml_path, shape)
    cv2.imwrite(os.path.join(train_binary_mask_path, name.replace("xml", "png")), binary_mask*255)

    instance_mask = generate_mask(xml_path, shape, instance=True)
    np.save(os.path.join(train_instance_mask_path, name.replace("xml", "npy")), instance_mask)

    modified_mask = generate_ternary_mask(xml_path, shape)
    cv2.imwrite(os.path.join(train_modified_mask_path, name.replace("xml", "png")), modified_mask)

100%|██████████| 30/30 [03:01<00:00,  6.03s/it]


In [18]:
for xml_path in tqdm(glob.glob(os.path.join(test_xml_dir, "*")), total=len(os.listdir(test_xml_dir))):
    name = os.path.basename(xml_path)

    binary_mask = generate_mask(xml_path, shape)
    cv2.imwrite(os.path.join(test_binary_mask_path, name.replace("xml", "png")), binary_mask*255)

    instance_mask = generate_mask(xml_path, shape, instance=True)
    np.save(os.path.join(test_instance_mask_path, name.replace("xml", "npy")), instance_mask)

    modified_mask = generate_ternary_mask(xml_path, shape)
    cv2.imwrite(os.path.join(test_modified_mask_path, name.replace("xml", "png")), modified_mask)

100%|██████████| 14/14 [00:40<00:00,  2.91s/it]
