In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append("../")

In [3]:
from pathlib import Path

In [4]:
data_root = Path("../../data/")

## Transform Multiclass Masks into Binary Masks

Changelog:
1. 2D -> 3D
2. Put mask into different class subdirectories, while leaving images unchanged.
3. distal_tubules(3) and proximal_tubules(5) merged to "Tubules"(3)
4. Path changes: files are saved in train/test directories separately, so necessary changes should be applied to the code.
5. Remove Arteriole subclass

In [6]:
import pandas as pd
from ml_core.utils.annotations import *
from PIL import Image
from pathlib import Path
from matplotlib.pyplot import imshow

In [7]:
label_info = pd.DataFrame({
    "label": [0, 1, 2, 3, 4],
    "label_name": ["Background", "Arteriole", "Artery", "Tubules", "Glomerulus"],
    "color": ["#000000", "#ffffff", "#ffffff", "#ffffff", "#ffffff"]
})

In [11]:
def save_mask_to_subdir(label_info, mask_path, output_dir):

    mask = np.array(Image.open(mask_path))
    mask[mask == 5] = 3 # tubules class merge
    
    for row in label_info.itertuples():
        label, label_name = row.label, row.label_name
        if label <= 0:
            continue # skip the background and omitted classes (< 0)
        
        subdir = output_dir / Path(f"{label_name}/")
        subdir.mkdir(exist_ok=True, parents=True)
        
        binary_mask = np.array(mask == label, dtype=np.uint8)
        
        if not np.any(binary_mask):
            # not found any pixel belong to this class
            # just skip it
            continue

        assert np.all(np.unique(binary_mask) == np.array([0,1], dtype=np.uint8)), \
                print(f"For class {label_name}, unique labels detected: {list(np.unique(binary_mask))}")
        
        single_class_label_info = pd.DataFrame({
            "label": [1],
            "label_name":[label_name],
            "color": ["#ffffff"]
        })
        
        mask_3d = generate_colorful_mask(binary_mask, single_class_label_info)
        
        Image.fromarray(mask_3d).save(subdir / mask_path.name)
    
    print(f"Finish generating class-level masks for {mask_path.name}.")

In [13]:
for dataset_type in ["train", "val"]:
    raw_data_root = Path(f"../../data/Collage/ROI_data/{dataset_type}/Raw/")
    output_dir = Path(f"../../data/Collage/ROI_data/{dataset_type}/")
    print(f"Dealing with {dataset_type} dataset ...")
    for mask_path in sorted(raw_data_root.glob("mask_*.png")):
        save_mask_to_subdir(label_info, mask_path, output_dir)

Dealing with train dataset ...
Finish generating class-level masks for mask_train_1.png.
Finish generating class-level masks for mask_train_10.png.
Finish generating class-level masks for mask_train_100.png.
Finish generating class-level masks for mask_train_11.png.
Finish generating class-level masks for mask_train_12.png.
Finish generating class-level masks for mask_train_13.png.
Finish generating class-level masks for mask_train_14.png.
Finish generating class-level masks for mask_train_15.png.
Finish generating class-level masks for mask_train_16.png.
Finish generating class-level masks for mask_train_17.png.
Finish generating class-level masks for mask_train_18.png.
Finish generating class-level masks for mask_train_19.png.
Finish generating class-level masks for mask_train_2.png.
Finish generating class-level masks for mask_train_20.png.
Finish generating class-level masks for mask_train_200.png.
Finish generating class-level masks for mask_train_201.png.
Finish generating class-

## Generate HDF5 files

In [14]:
from sklearn.model_selection import train_test_split
from ml_core.preprocessing.patches_extraction import Extractor, crop_and_save_patches_to_hdf5
import re
from functools import reduce

In [15]:
def match_id_from_name(name, regex=r"\w+_(\d+).*\.png"):
    match = re.match(regex, name)
    if match:
        image_id = int(match[1])
    else:
        image_id = -1
    return image_id

In [16]:
def generate_train_test_paths(class_name):

# old verion: using sklearn for random split
#     image_paths = sorted(list((data_root).glob("collage_*.png")))
#     mask_paths = sorted(list((data_root / class_name).glob("mask_*.png")))

#     image_mask_pair = list(zip(image_paths, mask_paths))
#     train_pairs, test_pairs = train_test_split(image_mask_pair, test_size=0.2, random_state=42)
    
#     train_images, train_masks = debox_pairs(train_pairs)
#     test_images, test_masks = debox_pairs(test_pairs)
    train_images, train_masks, test_images, test_masks = [], [], [], []
    for dataset in ["train", "test"]:
        image_paths = {match_id_from_name(p.name): p for p in (data_root / dataset).glob("collage_*.png")}
        mask_paths = {match_id_from_name(p.name): p for p in (data_root / dataset / class_name).glob("mask_*.png")}
        
        for image_id in image_paths:
            if image_id != -1 and image_id in mask_paths:
                output_images, output_masks = (train_images, train_masks) if dataset == "train" \
                                                else (test_images, test_masks)
                
                output_images.append(image_paths[image_id])
                output_masks.append(mask_paths[image_id])
                
    assert reduce(lambda a,b: a & b, map(lambda x: len(x) != 0, (train_images, train_masks, test_images, test_masks))), \
            print(f"One of train/test images/masks is empty for class {class_name}.")
    
    return train_images, train_masks, test_images, test_masks

In [17]:
def debox_pairs(pairs):
    return [p[0] for p in pairs],  [p[1] for p in pairs]

In [18]:
def export_to_hdf5(output_root, train_images, train_masks, test_images, test_masks):
    extractor = Extractor(config_section_name=f"Collage_{class_name}")

    crop_and_save_patches_to_hdf5(output_root / f"HE_{class_name}_train.h5",
                                  images=train_images,
                                  masks=train_masks,
                                  extractor=extractor)
    
    crop_and_save_patches_to_hdf5(output_root / f"HE_{class_name}_val.h5",
                                  images=test_images,
                                  masks=test_masks,
                                  extractor=extractor)

In [19]:
data_root = Path("../../data/Collage")
output_root = Path("../../data/Collage/hdf5_data")

In [21]:
for class_name in ["Tubules", "Glomerulus", "Artery", "Arteriole"]:
    extractor = Extractor(config_section_name=f"Collage_{class_name}")

    for split in ("train", "val"):
        mask_root = data_root / f"ROI_data/{split}/{class_name}"
        roi_root = data_root / f"ROI_data/{split}/Raw"
        
        fpaths = list(mask_root.glob("mask_*.png"))
        roi_mask_mappings = {"roi": [], "mask": []}
        
        for fpath in fpaths:
            roi_name = fpath.name.replace("mask_", "collage_")
            roi_path = roi_root / roi_name
            if roi_path.exists():
                roi_mask_mappings["mask"].append(fpath)
                roi_mask_mappings["roi"].append(roi_path)
            else:
                print(f"ROI for {fpath} is not found.")
                    
        assert roi_mask_mappings
        
        hdf5_name = f"patch_{extractor.patch_size}/{class_name}_{split}.h5"
        output_path = output_root / hdf5_name
        if not output_path.parent.exists():
            output_path.parent.mkdir(parents=True, exist_ok=True)
        
        crop_and_save_patches_to_hdf5(output_root / hdf5_name,
                                      images=roi_mask_mappings["roi"],
                                      masks=roi_mask_mappings["mask"],
                                      extractor=extractor)
        
        print(f"Finish saving {hdf5_name}.")

Finish saving patch_256/Tubules_train.h5.
Finish saving patch_256/Tubules_val.h5.
Finish saving patch_256/Glomerulus_train.h5.
Finish saving patch_256/Glomerulus_val.h5.
Finish saving patch_256/Artery_train.h5.
Finish saving patch_256/Artery_val.h5.
Finish saving patch_256/Arteriole_train.h5.
Finish saving patch_256/Arteriole_val.h5.
