In [1]:
import openslide
from openslide import lowlevel as openslide_ll
from ctypes import c_uint32, POINTER, cast, addressof, c_void_p, byref, c_uint8, sizeof, c_uint16
import numpy as np
from typing import Tuple, Optional
from loguru import logger
import re
import cv2
from multiprocessing import RawArray
import concurrent
from concurrent.futures import ProcessPoolExecutor
from concurrent import futures
from tqdm import tqdm
from pathlib import Path
import dask.array as da
import dask
from dask.distributed import as_completed, futures_of
from dask.distributed import Client, LocalCluster
from functools import partial
from time import time

from tsl8.progress import tqdm_dask
from tsl8.canny import is_background
from tsl8.slide import load_slide, MPPExtractionError

slide_file = "/pathology/camelyon16/training/tumor/tumor_001.tif"
output_path = Path("/data/tsl8_out")
slide_output_dir = output_path / Path(slide_file).stem
slide_output_dir.mkdir(exist_ok=True, parents=True)
check_status = False

patch_size = 512

# target_mpp = 4.0
# level = None  # None means auto select
# patch_to_chunk_multiplier = 2

target_mpp = 0.5
level = 0
patch_to_chunk_multiplier = 16

In [2]:
client = Client(n_workers=32, threads_per_worker=1, memory_limit="64GB")
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 32
Total threads: 32,Total memory: 1.86 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38295,Workers: 32
Dashboard: http://127.0.0.1:8787/status,Total threads: 32
Started: Just now,Total memory: 1.86 TiB

0,1
Comm: tcp://127.0.0.1:35449,Total threads: 1
Dashboard: http://127.0.0.1:44751/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:44405,
Local directory: /tmp/dask-scratch-space/worker-x4xw930p,Local directory: /tmp/dask-scratch-space/worker-x4xw930p

0,1
Comm: tcp://127.0.0.1:41613,Total threads: 1
Dashboard: http://127.0.0.1:46273/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:37557,
Local directory: /tmp/dask-scratch-space/worker-t5m4k8ua,Local directory: /tmp/dask-scratch-space/worker-t5m4k8ua

0,1
Comm: tcp://127.0.0.1:38563,Total threads: 1
Dashboard: http://127.0.0.1:45997/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:44115,
Local directory: /tmp/dask-scratch-space/worker-2n27rd48,Local directory: /tmp/dask-scratch-space/worker-2n27rd48

0,1
Comm: tcp://127.0.0.1:38135,Total threads: 1
Dashboard: http://127.0.0.1:38447/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:42779,
Local directory: /tmp/dask-scratch-space/worker-rukptrpw,Local directory: /tmp/dask-scratch-space/worker-rukptrpw

0,1
Comm: tcp://127.0.0.1:41193,Total threads: 1
Dashboard: http://127.0.0.1:46087/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:34255,
Local directory: /tmp/dask-scratch-space/worker-5o16crl2,Local directory: /tmp/dask-scratch-space/worker-5o16crl2

0,1
Comm: tcp://127.0.0.1:46141,Total threads: 1
Dashboard: http://127.0.0.1:42785/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38593,
Local directory: /tmp/dask-scratch-space/worker-st2an8ri,Local directory: /tmp/dask-scratch-space/worker-st2an8ri

0,1
Comm: tcp://127.0.0.1:37443,Total threads: 1
Dashboard: http://127.0.0.1:43749/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:43649,
Local directory: /tmp/dask-scratch-space/worker-d651i0ab,Local directory: /tmp/dask-scratch-space/worker-d651i0ab

0,1
Comm: tcp://127.0.0.1:45035,Total threads: 1
Dashboard: http://127.0.0.1:42487/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38439,
Local directory: /tmp/dask-scratch-space/worker-1qoug49k,Local directory: /tmp/dask-scratch-space/worker-1qoug49k

0,1
Comm: tcp://127.0.0.1:46727,Total threads: 1
Dashboard: http://127.0.0.1:40095/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:34751,
Local directory: /tmp/dask-scratch-space/worker-nryip1lq,Local directory: /tmp/dask-scratch-space/worker-nryip1lq

0,1
Comm: tcp://127.0.0.1:46791,Total threads: 1
Dashboard: http://127.0.0.1:38197/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38087,
Local directory: /tmp/dask-scratch-space/worker-hl6hwm9n,Local directory: /tmp/dask-scratch-space/worker-hl6hwm9n

0,1
Comm: tcp://127.0.0.1:33255,Total threads: 1
Dashboard: http://127.0.0.1:32823/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:37139,
Local directory: /tmp/dask-scratch-space/worker-ehabxwz6,Local directory: /tmp/dask-scratch-space/worker-ehabxwz6

0,1
Comm: tcp://127.0.0.1:37233,Total threads: 1
Dashboard: http://127.0.0.1:35739/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:36489,
Local directory: /tmp/dask-scratch-space/worker-t6cf0zrb,Local directory: /tmp/dask-scratch-space/worker-t6cf0zrb

0,1
Comm: tcp://127.0.0.1:44617,Total threads: 1
Dashboard: http://127.0.0.1:45711/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:39123,
Local directory: /tmp/dask-scratch-space/worker-l17am47r,Local directory: /tmp/dask-scratch-space/worker-l17am47r

0,1
Comm: tcp://127.0.0.1:33141,Total threads: 1
Dashboard: http://127.0.0.1:37173/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:40821,
Local directory: /tmp/dask-scratch-space/worker-44xfmhdp,Local directory: /tmp/dask-scratch-space/worker-44xfmhdp

0,1
Comm: tcp://127.0.0.1:43385,Total threads: 1
Dashboard: http://127.0.0.1:37821/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38463,
Local directory: /tmp/dask-scratch-space/worker-51ye454d,Local directory: /tmp/dask-scratch-space/worker-51ye454d

0,1
Comm: tcp://127.0.0.1:36013,Total threads: 1
Dashboard: http://127.0.0.1:46795/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38147,
Local directory: /tmp/dask-scratch-space/worker-_hn41gxi,Local directory: /tmp/dask-scratch-space/worker-_hn41gxi

0,1
Comm: tcp://127.0.0.1:38703,Total threads: 1
Dashboard: http://127.0.0.1:38663/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:40633,
Local directory: /tmp/dask-scratch-space/worker-fjtdnwc4,Local directory: /tmp/dask-scratch-space/worker-fjtdnwc4

0,1
Comm: tcp://127.0.0.1:42513,Total threads: 1
Dashboard: http://127.0.0.1:41639/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:44275,
Local directory: /tmp/dask-scratch-space/worker-iduwemjf,Local directory: /tmp/dask-scratch-space/worker-iduwemjf

0,1
Comm: tcp://127.0.0.1:40923,Total threads: 1
Dashboard: http://127.0.0.1:38221/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:43343,
Local directory: /tmp/dask-scratch-space/worker-nzeicmt3,Local directory: /tmp/dask-scratch-space/worker-nzeicmt3

0,1
Comm: tcp://127.0.0.1:38223,Total threads: 1
Dashboard: http://127.0.0.1:35173/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:35357,
Local directory: /tmp/dask-scratch-space/worker-r0g_1z_f,Local directory: /tmp/dask-scratch-space/worker-r0g_1z_f

0,1
Comm: tcp://127.0.0.1:33189,Total threads: 1
Dashboard: http://127.0.0.1:33355/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:36745,
Local directory: /tmp/dask-scratch-space/worker-fx2llqj5,Local directory: /tmp/dask-scratch-space/worker-fx2llqj5

0,1
Comm: tcp://127.0.0.1:35285,Total threads: 1
Dashboard: http://127.0.0.1:44891/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:43075,
Local directory: /tmp/dask-scratch-space/worker-fd_a4dxh,Local directory: /tmp/dask-scratch-space/worker-fd_a4dxh

0,1
Comm: tcp://127.0.0.1:38863,Total threads: 1
Dashboard: http://127.0.0.1:34265/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:40157,
Local directory: /tmp/dask-scratch-space/worker-mqlhezrw,Local directory: /tmp/dask-scratch-space/worker-mqlhezrw

0,1
Comm: tcp://127.0.0.1:38911,Total threads: 1
Dashboard: http://127.0.0.1:34391/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38989,
Local directory: /tmp/dask-scratch-space/worker-wojz64nz,Local directory: /tmp/dask-scratch-space/worker-wojz64nz

0,1
Comm: tcp://127.0.0.1:34367,Total threads: 1
Dashboard: http://127.0.0.1:40277/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:39539,
Local directory: /tmp/dask-scratch-space/worker-sqlig7c7,Local directory: /tmp/dask-scratch-space/worker-sqlig7c7

0,1
Comm: tcp://127.0.0.1:33865,Total threads: 1
Dashboard: http://127.0.0.1:43159/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:37927,
Local directory: /tmp/dask-scratch-space/worker-6i9z_236,Local directory: /tmp/dask-scratch-space/worker-6i9z_236

0,1
Comm: tcp://127.0.0.1:46817,Total threads: 1
Dashboard: http://127.0.0.1:40003/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:34257,
Local directory: /tmp/dask-scratch-space/worker-v5x3bomq,Local directory: /tmp/dask-scratch-space/worker-v5x3bomq

0,1
Comm: tcp://127.0.0.1:42427,Total threads: 1
Dashboard: http://127.0.0.1:42233/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:38783,
Local directory: /tmp/dask-scratch-space/worker-rse7xmw5,Local directory: /tmp/dask-scratch-space/worker-rse7xmw5

0,1
Comm: tcp://127.0.0.1:38361,Total threads: 1
Dashboard: http://127.0.0.1:44225/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:33959,
Local directory: /tmp/dask-scratch-space/worker-lxd3us3f,Local directory: /tmp/dask-scratch-space/worker-lxd3us3f

0,1
Comm: tcp://127.0.0.1:33021,Total threads: 1
Dashboard: http://127.0.0.1:38817/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:41875,
Local directory: /tmp/dask-scratch-space/worker-4zg8ftqg,Local directory: /tmp/dask-scratch-space/worker-4zg8ftqg

0,1
Comm: tcp://127.0.0.1:40467,Total threads: 1
Dashboard: http://127.0.0.1:39395/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:45759,
Local directory: /tmp/dask-scratch-space/worker-ffw_vume,Local directory: /tmp/dask-scratch-space/worker-ffw_vume

0,1
Comm: tcp://127.0.0.1:33115,Total threads: 1
Dashboard: http://127.0.0.1:41985/status,Memory: 59.60 GiB
Nanny: tcp://127.0.0.1:45211,
Local directory: /tmp/dask-scratch-space/worker-iu2hv623,Local directory: /tmp/dask-scratch-space/worker-iu2hv623


In [3]:
"""Process a slide and save the features to disk."""
start_time = time()
slide_file = Path(slide_file)

optimize_graph = False

futures_to_cancel = []

# Check if the slide has already been processed
status_file = output_path / "status" / f"{slide_file.stem}.done"
if check_status and status_file.exists():
    logger.info(f"Skipping {slide_file.stem} because it has already been processed")
    raise


def write_status_file(message: str = "done"):
    status_file.parent.mkdir(parents=True, exist_ok=True)
    with status_file.open("w") as f:
        f.write(message)


# Load the slide
try:
    slide = load_slide(
        slide_file, target_mpp=target_mpp, level=level, target_slide_chunk_size=patch_size * patch_to_chunk_multiplier
    )
except MPPExtractionError:
    logger.warning(f"Could not extract MPP for {slide_file}, skipping")
    write_status_file("mpp_extraction_error")
    raise

slide

[32m2023-12-30 16:37:57.348[0m | [34m[1mDEBUG   [0m | [36mtsl8.slide.readers[0m:[36mmake_slide_reader[0m:[36m15[0m - [34m[1mUsing cucim backend for /pathology/camelyon16/training/tumor/tumor_001.tif[0m
[32m2023-12-30 16:37:57.508[0m | [34m[1mDEBUG   [0m | [36mtsl8.slide.readers.mpp[0m:[36m__call__[0m:[36m24[0m - [34m[1mMPP successfully extracted using extract_mpp_from_properties: 0.243[0m
[32m2023-12-30 16:37:57.510[0m | [34m[1mDEBUG   [0m | [36mtsl8.slide[0m:[36mload_slide[0m:[36m25[0m - [34m[1mSlide has 10 levels with following downsamples: {0: 1.0, 1: 2.0, 2: 4.0, 3: 8.0, 4: 16.0, 5: 32.0, 6: 64.0, 7: 128.0, 8: 256.0, 9: 512.0}[0m
[32m2023-12-30 16:37:57.511[0m | [34m[1mDEBUG   [0m | [36mtsl8.slide[0m:[36mload_slide[0m:[36m40[0m - [34m[1mUsing level 0 with level_mpp=0.243 for slide_mpp=0.243 and target_mpp=0.500[0m
[32m2023-12-30 16:37:57.513[0m | [34m[1mDEBUG   [0m | [36mtsl8.slide.readers[0m:[36mmake_slide_reader[0m:

Unnamed: 0,Array,Chunk
Bytes,15.75 GiB,192.00 MiB
Shape,"(114688, 49152, 3)","(8192, 8192, 3)"
Dask graph,84 chunks in 3 graph layers,84 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 15.75 GiB 192.00 MiB Shape (114688, 49152, 3) (8192, 8192, 3) Dask graph 84 chunks in 3 graph layers Data type uint8 numpy.ndarray",3  49152  114688,

Unnamed: 0,Array,Chunk
Bytes,15.75 GiB,192.00 MiB
Shape,"(114688, 49152, 3)","(8192, 8192, 3)"
Dask graph,84 chunks in 3 graph layers,84 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


In [4]:
def reshape_block(block, patch_size=patch_size):
    h, w, c = block.shape
    block = block.reshape(h // patch_size, patch_size, w // patch_size, patch_size, c)

    # Transpose the dimensions to get the tiles in the correct order
    block = block.transpose(0, 2, 1, 3, 4)

    # Reshape again to get a 4D array where each element is a tile
    block = block.reshape(-1, patch_size, patch_size, c)

    return block


def slide_to_chunks(slide, patch_size=patch_size):
    """Convert a slide to a dask array of chunks.

    Returns: array of dask delayed objects, each of which is a chunk (a 4D array of shape (chunksize, patch_size, patch_size, 3))
    """
    k = np.prod(np.array(slide.chunksize[:2]) // np.array((patch_size, patch_size)))
    d = da.blockwise(
        partial(reshape_block, patch_size=patch_size),
        "kijc",
        slide,
        "ijc",
        dtype=slide.dtype,
        new_axes={"k": k},
        adjust_chunks={"i": patch_size, "j": patch_size},
    )
    patches = d.to_delayed().flatten()
    # patches = da.concatenate(
    #     [da.from_delayed(delayed, shape=(k, patch_size, patch_size, 3), dtype=slide.dtype) for delayed in patches]
    # )

    return patches


patches = slide_to_chunks(
    slide, patch_size=patch_size
)  # array of dask delayed objects, each of which is a chunk (a 4D array of shape (n, patch_size, patch_size, 3))

In [5]:
slide_h, slide_w, _ = slide.shape

xs, ys = da.meshgrid(da.arange(slide_w // patch_size), da.arange(slide_h // patch_size))
patch_coords = da.stack([xs, ys], axis=-1) * patch_size
patch_coords = patch_coords.rechunk((patch_to_chunk_multiplier, patch_to_chunk_multiplier, 2))
patch_coords = slide_to_chunks(
    patch_coords, patch_size=1
)  # array of dask delayed objects, each of which is a chunk (a 4D array of shape (n, 1, 1, 2))

In [6]:
def process_block_for_slide(slide_output_dir: Path):
    @dask.delayed
    def process_block(patches, coords):
        """Process a block of patches (perform background rejection, and save to disk)

        Args:
            patches: 4D array of shape (n, patch_size, patch_size, 3)
            coords: 4D array of shape (n, 1, 1, 2)
        """

        # Reject background patches
        patch_mask = is_background(patches)
        patches = patches[~patch_mask]
        coords = coords[~patch_mask].squeeze((1, 2))

        # Save the patches
        for patch, (x, y) in zip(patches, coords):
            patch = cv2.cvtColor(patch, cv2.COLOR_RGB2BGR)
            cv2.imwrite(str(slide_output_dir / f"{x:08d}_{y:08d}.jpg"), patch)

        return patches.shape[0]

    return process_block

In [7]:
results = [process_block_for_slide(slide_output_dir)(patch, coord) for patch, coord in zip(patches, patch_coords)]
results = client.persist(results)
for future in tqdm(as_completed(futures_of(results)), total=len(results), desc=f"Processing {slide_file.stem}"):
    pass