In [None]:
from wholeslidedata.annotation.wholeslideannotation import WholeSlideAnnotation
from wholeslidedata.image.wholeslideimage import WholeSlideImage
from wholeslidedata.annotation.types import PolygonAnnotation as Polygon
from matplotlib import pyplot as plt
import numpy as np
import os
from tqdm import tqdm

import cv2

from py.registration import get_3p_transform, get_align_transform, align
from py.helpers import get_all_annotated_cases, get_outlines, get_area, get_patch, \
    get_sub_areas, patch_empty, concat_one

ROOT = r'L:\\basic\\divg\\PATH-COMPAI\\datasets\\Barrett\\Barrett ASL 21-11-22\\'
ROOT_ADJACENT = r'L:\\basic\\divg\\PATH-COMPAI\\datasets\\Barrett\\unaligned_HE_p53\\'

In [None]:
import shutil

def add_template_xmls():
    path_to_template_xml = os.path.join(ROOT, r'_template.xml')
    xmls = [nm.split('.')[0] for nm in os.listdir(ROOT) if ".xml" in nm]
    tiffs = [nm.split('.')[0] for nm in os.listdir(ROOT) if ".tiff" in nm]
    tiffs_without_xml = [tiff for tiff in tiffs if tiff not in xmls and "BAD" not in tiff and "HEX" not in tiff]
    print(tiffs_without_xml)
    for tiff in tiffs_without_xml:
        shutil.copyfile(path_to_template_xml, os.path.join(ROOT, tiff+".xml"))

In [None]:
def plot_sub_areas(wsi, sub_areas, save_path="", spacing=2.0):
    nrows = len(sub_areas)
    ncols = len(sub_areas[0])
    fig, ax = plt.subplots(nrows,ncols, figsize=(ncols*2,nrows*2))
    for i, sub_area_row in enumerate(sub_areas):
        for j, sub_area in enumerate(sub_area_row):
            sub_patch = wsi.get_patch(*sub_area, spacing)
            # color = "red" if sub_patch.mean() < 10 else "black"
            # ax[i,j].text(105,128, f"{sub_patch.std():.2f}", c=color)
            color = "red" if sub_patch.mean() > 223 else "black"
            ax[i,j].text(105,128, f"{sub_patch.mean():.2f}", c=color)
            ax[i,j].imshow(sub_patch)
            ax[i,j].axis("off")
    if save_path:
        plt.savefig(save_path, bbox_inches="tight")
        plt.close(fig)

# casename = "RASL-04"
# cases = get_all_annotated_cases()
# plot_sub_areas(WholeSlideImage(cases[casename]["HE"]["wsi"]), get_sub_areas(get_area( get_outlines(WholeSlideAnnotation(cases[casename]["HE"]["wsa"]))[3], spacing)))

def save_all_sub_areas_plots(spacing, root=ROOT):
    for casename, case in tqdm(get_all_annotated_cases(root).items()):
        for coupe, paths in case.items():
            outlines = get_outlines(WholeSlideAnnotation(paths["wsa"]))
            for biopsy_nr, outline in enumerate(outlines):
                plot_sub_areas(
                    WholeSlideImage(paths["wsi"]), 
                    get_sub_areas(get_area(outline, spacing), spacing=spacing), 
                    save_path=os.path.join(ROOT, "visualisation", f"sub_areas_{casename}_{biopsy_nr}_{coupe}.png"),
                    spacing=spacing)

# save_all_sub_areas_plots(2, root=ROOT_ADJACENT)

In [None]:
# casename = "RASL-15"
# biopsy_nr = 1
# case = get_all_annotated_cases()[casename]
# wsi = {"p53": WholeSlideImage(case["p53"]["wsi"]),
#         "HE": WholeSlideImage(case["HE" ]["wsi"])}
# outlines = {"p53": get_outlines(WholeSlideAnnotation(case["p53"]["wsa"])),
#                 "HE": get_outlines(WholeSlideAnnotation(case["HE" ]["wsa"]))}

# transform = get_align_transform(wsi['p53'], wsi['HE'], outlines['p53'][biopsy_nr], outlines['HE'][biopsy_nr], spacing, 10000, plotting=True)

# HE_outline = (transform @ concat_one(outlines['p53'][biopsy_nr]).T).T

# biopsy_img1 = get_patch(wsi['p53'], outlines['p53'][biopsy_nr], spacing)
# biopsy_img2 = get_patch(wsi['HE'],  HE_outline,                 spacing)

# fig, ax = plt.subplots(3,1,figsize=(15,15))
# ax[0].imshow(biopsy_img1)
# ax[2].imshow(biopsy_img1)

# ax[1].imshow(biopsy_img2)
# ax[2].imshow(biopsy_img2, alpha=0.5)

def save_all_align_plots(n_keypoints=10000, spacing=2.0, root=ROOT):
    for casename, case in get_all_annotated_cases(root).items():
        wsi = {"p53": WholeSlideImage(case["p53"]["wsi"]),
                "HE": WholeSlideImage(case["HE" ]["wsi"])}
        outlines = {"p53": get_outlines(WholeSlideAnnotation(case["p53"]["wsa"])),
                     "HE": get_outlines(WholeSlideAnnotation(case["HE" ]["wsa"]))}

        for biopsy_nr, outline in enumerate(outlines["p53"]):
            transform, _, _ = get_align_transform(wsi['p53'], wsi['HE'], outlines['p53'][biopsy_nr], outlines['HE'][biopsy_nr], spacing, n_keypoints, plotting=False)
            biopsy_imgs = {}
            for coupe in ["p53", "HE"]:
                save_path = os.path.join(ROOT, "visualisation", f"aligned_{casename}_{biopsy_nr}_{coupe}.png")

                if coupe == "HE":
                    outline = (transform @ concat_one(outlines["p53"][biopsy_nr]).T).T

                biopsy_imgs[coupe] = get_patch(wsi[coupe], outline, spacing)

                fig, ax = plt.subplots(1,1)
                ax.imshow(biopsy_imgs[coupe])
                ax.axis("Off")
                plt.savefig(save_path, bbox_inches="tight")
                plt.close(fig)
            
            save_path = os.path.join(ROOT, "visualisation", f"aligned_{casename}_{biopsy_nr}__overlay.png")
            fig, ax = plt.subplots(1,1)
            ax.imshow(biopsy_imgs["p53"])
            ax.imshow(biopsy_imgs["HE"], alpha=0.5)
            ax.axis("Off")
            plt.savefig(save_path, bbox_inches="tight")
            plt.close(fig)

# save_all_align_plots(10000, 0.25)

In [None]:
def plot_compare_sub_areas(wsi1, wsi2, annotations1, annotations2, spacing=2.0, realign_true=True, register_method="control_points"):
    """1 should be the p53, as that is the one with biopsy outlines"""
    outlines1 = [a.coordinates for a in annotations1 if isinstance(a, Polygon)]

    if register_method == "control_points":
        transform = get_3p_transform(annotations1, annotations2, spacing, 
            # [0,2,4]
        )
        scale = 1/spacing*0.25
    elif register_method == "keypoint_matching":
        outlines2 = [a.coordinates for a in annotations2 if isinstance(a, Polygon)]
        scale = 1
        transform,_,_ = get_align_transform(wsi1, wsi2, outlines1[0], outlines2[0], spacing)
    else:
        print("invalid register method")
        return


    fig, ax = plt.subplots(len(outlines1),3, figsize=(15,len(outlines1)*5))

    for i, biopsy in enumerate(outlines1):
        # if register_method == "keypoint_matching":
        #     transform = get_align_transform(wsi1, wsi2, outlines1[i], outlines2[i], spacing)

        for j, wsi in enumerate([wsi1, wsi2]):
            padding = 0
            if j == 1 and realign_true:
                padding = 50

            sub_areas = get_sub_areas(get_area(biopsy, spacing), padding=padding)
            sub_area_row = sub_areas[int(len(sub_areas)//2)]    # middle row
            sub_area  = sub_area_row[int(len(sub_area_row)//2)] # middle of middle row
            
            sub_patch = wsi.get_patch(*sub_area, spacing)

            if j == 1 and realign_true:
                sub_patch,_,_ = align(sub_patch, sub_patch_prev)

            ax[i,j].imshow(sub_patch)
            ax[i,2].imshow(sub_patch, alpha=1-j/2)
            ax[i,j].axis("Off")

            biopsy = (transform @ concat_one(biopsy*scale).T).T / scale
            sub_patch_prev = sub_patch.copy()

        ax[i,2].axis("Off")

# casename = "ASL-0049"
# casename = "RASL-20"
# case = get_all_annotated_cases()[casename]
# wsi = {"p53": WholeSlideImage(case["p53"]["wsi"]),
#         "HE": WholeSlideImage(case["HE" ]["wsi"])}
# wsa = {"p53": WholeSlideAnnotation(case["p53"]["wsa"]),
#         "HE": WholeSlideAnnotation(case["HE" ]["wsa"])}
# plot_compare_sub_areas(wsi['p53'], wsi['HE'], wsa['p53'].annotations, wsa['HE'].annotations, spacing=2.0, realign_true=True, register_method="keypoint_matching")

In [None]:
def save_all_aligned_patches(n_keypoints=10000, size=256, padding=None, spacing=2.0):
    paddings = [
        {"HE": 2,       "p53": size//2},
        {"HE": size//2,  "p53": size//2},
        {"HE": size//2,  "p53": size},
        {"HE": size,    "p53": size},
        {"HE": size,    "p53": size*2},
    ]
    if padding:
        paddings.insert(0, padding)

    save_dir_ext = f"_s{spacing}_{size}"
    save_dir = "patches" + save_dir_ext
    directory = os.path.join(ROOT, save_dir)
    if os.path.exists(directory):
        save_dir_ext = f"_s{spacing}_{size}_v{len([True for x in os.listdir(ROOT) if x.startswith(save_dir)])}"

    for casename, case in tqdm(list(get_all_annotated_cases().items())[:]):
        wsi = {"p53": WholeSlideImage(case["p53"]["wsi"]),
                "HE": WholeSlideImage(case["HE" ]["wsi"])}
        outlines = {"p53": get_outlines(WholeSlideAnnotation(case["p53"]["wsa"])),
                     "HE": get_outlines(WholeSlideAnnotation(case["HE" ]["wsa"]))}

        # Mind you, HE is handled before p53 here
        for biopsy_nr, outline in enumerate(outlines["HE"]):
            transform, biop_msg, biop_det = get_align_transform(wsi['HE'], wsi['p53'], outlines['HE'][biopsy_nr], outlines['p53'][biopsy_nr], n_features=n_keypoints, spacing=8.0, plotting=False)
            subpatches = {"p53":{},"HE":{}}

            for padding in paddings:
                HE_pad = padding["HE"]//2
                n_patches = 0
                for coupe in ["HE", "p53"]:

                    biopsy_area = get_area(outline, spacing)
                    if coupe == "p53":
                        outline_p53 = (transform @ concat_one(outlines["HE"][biopsy_nr]).T).T

                        x, y = get_area(outline_p53, spacing)[:2]
                        biopsy_area = (x, y, biopsy_area[2], biopsy_area[3]) # only change the coordinates, not width height

                    sub_areas = get_sub_areas(biopsy_area, size, padding=padding[coupe], spacing=spacing)

                    patch_nr = 0
                    for sub_area_row in sub_areas:
                        for sub_area in sub_area_row:
                            if patch_nr in subpatches[coupe]:
                                patch_nr += 1
                                continue

                            sub_patch = wsi[coupe].get_patch(*sub_area, spacing)

                            if coupe == "HE":
                                msg, det = "no transform", 1
                                if patch_empty(sub_patch[HE_pad:-HE_pad, HE_pad:-HE_pad]):
                                    patch_nr += 1
                                    continue
                            elif coupe == "p53":
                                if patch_nr in subpatches["HE"]:
                                    sub_patch, msg, det = align(sub_patch, subpatches["HE"][patch_nr][0], warp="affine", size=size, 
                                                                # plotting=True
                                                                )

                                    # If not covered: remove patch_nr from subpatches["HE"] and continue
                                    if "NC" in msg and not padding == paddings[-1]:
                                        del subpatches["HE"][patch_nr]
                                        patch_nr += 1
                                        continue

                                    # Remove padding from HE
                                    subpatches["HE"][patch_nr] = (subpatches["HE"][patch_nr][0][HE_pad:-HE_pad, HE_pad:-HE_pad], "no transform", 1)
                                else:
                                    patch_nr += 1
                                    continue

                            subpatches[coupe][patch_nr] = (sub_patch, msg, det)
                            patch_nr += 1
                    n_patches = patch_nr
                if len(subpatches["p53"]) == n_patches:
                    break

            for patch_nr in subpatches["HE"]:

                _, msg, det = subpatches["p53"][patch_nr]

                # Create new directory if new spacing/size
                save_dir = "patches"
                if msg != "success" or not 0.8 < det < 1.2:
                    save_dir = "iffy_patches"
                save_dir += save_dir_ext
                directory = os.path.join(ROOT, save_dir)
                if not os.path.exists(directory):
                    os.makedirs(directory)
                    print("Created directory: ", directory)

                # Save the patches and create an overlay
                fig, ax = plt.subplots(1,1)
                alpha = 1
                for coupe in ["p53", "HE"]:
                    sub_patch = subpatches[coupe][patch_nr][0]
                    save_path = os.path.join(ROOT, save_dir, f"{casename}_{biopsy_nr}_{patch_nr}_{coupe}.png")
                    cv2.imwrite(save_path, sub_patch[:,:,::-1])

                    ax.imshow(sub_patch, alpha=alpha)
                    alpha = 0.5
                
                ax.text(50,50,f"{msg}, {det:.2f}")
                ax.axis("Off")

                save_path = os.path.join(directory, f"{casename}_{biopsy_nr}_{patch_nr}__overlay.png")

                plt.savefig(save_path, bbox_inches="tight", pad_inches=0)
                plt.close(fig)

save_all_aligned_patches(size=2048, spacing=0.25, 
                        #  padding={"HE": 256, "p53": 700}
                         )

Make dataset anonymous

In [None]:
# data_path = os.path.join(ROOT, "patches")
# patch_paths = [os.path.join(ROOT, "patches", nm) for nm in os.listdir(data_path) if "overlay" not in nm]
# HE_patch_paths = [p for p in patch_paths if "HE" in p]

# for i, path in tqdm(enumerate(HE_patch_paths)):
#     # shutil.copy(path, os.path.join(ROOT, "patch_dataset_s1_512", "HE", f"{i}.jpeg"))
#     # shutil.copy(path.replace("HE", "p53"), os.path.join(ROOT, "patch_dataset", "p53", f"{i}.jpeg"))

#     patch_name = os.path.basename(path).replace('_HE.png', '')
#     shutil.copy(path, os.path.join(ROOT, "patch_dataset", "HE", f"{patch_name}.jpeg"))
#     shutil.copy(path.replace("HE", "p53"), os.path.join(ROOT, "patch_dataset", "p53", f"{patch_name}.jpeg"))