In [1]:
from pathlib import Path
import PIL
import numpy as np
import matplotlib.pyplot as plt 

In [2]:
# Cancer pathology samples
data_dir = "./data/"
img_paths = {
    path.stem: path for path in Path(data_dir).glob("*.TIF")
}

print(img_paths)

images = {
    name: np.array(PIL.Image.open(path))
    for name, path in img_paths.items()
}

{'Dual fusion': WindowsPath('data/Dual fusion.TIF'), 'separation probe': WindowsPath('data/separation probe.TIF'), 'Trisomy 8': WindowsPath('data/Trisomy 8.TIF')}


In [3]:
from ufish.api import UFish
from cellpose import models

# initialize cellpose model
cp = models.Cellpose(gpu=False, model_type="nuclei")

# initialize ufish model
uf = UFish(device="cuda")
uf.load_weights("./v1.0.1-DNAFISH_model.onnx")

[32m2024-06-17 15:18:42.983[0m | [1mINFO    [0m | [36mufish.api[0m:[36m_load_onnx[0m:[36m296[0m - [1mLoading ONNX from ./v1.0.1-DNAFISH_model.onnx[0m
[32m2024-06-17 15:18:42.984[0m | [1mINFO    [0m | [36mufish.api[0m:[36m_load_onnx[0m:[36m303[0m - [1mUsing ONNX runtime providers: ['CUDAExecutionProvider'][0m


<ufish.api.UFish at 0x184afd41330>

In [4]:
from utils import segment_cells

In [5]:
masks = segment_cells(cp, images["Dual fusion"])

In [49]:
from skimage.measure import regionprops, label

In [13]:
mask = masks
image = images["Dual fusion"]
target_size = 128

In [46]:
def extract_cells(image: np.ndarray, mask: np.ndarray, target_size=128):
    cell_rois = []
    cell_props = []
    cell_masks = []

    props = regionprops(mask)

    for i, prop in enumerate(props):
        x0, y0, x1, y1 = prop.bbox
        w, h = x1 - x0, y1 - y0
        long_side = max(w, h)
        if long_side > target_size:
            continue
        roi = np.zeros(
            (target_size, target_size, image.shape[2]),
            dtype=image.dtype
        )
        start_y = (target_size - h) // 2
        start_x = (target_size - w) // 2
        coords = prop.coords
        roi_y = coords[:, 0] - x0 + start_y
        roi_x = coords[:, 1] - y0 + start_x
        roi[roi_y, roi_x] = image[coords[:, 0], coords[:, 1], :]
        cell_rois.append(roi)
        cell_props.append(prop)
        cell_mask = np.zeros((target_size, target_size), dtype=np.uint8)
        cell_mask[roi_y, roi_x] = 1
        cell_masks.append(cell_mask)
    return cell_rois, cell_masks, cell_props

cell_rois, cell_masks, cell_props = extract_cells(image, mask, target_size)


In [55]:
cell_roi = cell_rois[0]
cell_mask = cell_masks[0]
signal_channels = [0, 1]

In [59]:
cell_signal = cell_roi[..., signal_channels].mean(axis=-1)

In [63]:
spots, _ = uf.predict(cell_signal)

[32m2024-06-17 23:24:33.412[0m | [1mINFO    [0m | [36mufish.api[0m:[36mpredict[0m:[36m469[0m - [1mAxes not specified, infering from image shape.[0m
[32m2024-06-17 23:24:33.414[0m | [1mINFO    [0m | [36mufish.api[0m:[36mpredict[0m:[36m471[0m - [1mInfered axes: yx, image shape: (128, 128)[0m


In [64]:
spots

Unnamed: 0,axis-0,axis-1
0,42,53
1,55,64
2,58,73
3,64,58


In [50]:
from utils import get_merge_and_split_masks, assign_spots

In [45]:
def pipeline(
        cellpose_instance, ufish_instance,
        img, signal_channels=[0, 1],
        ):
    print("Processing image:")
    print(f"Image shape: {img.shape}")
    print("Step 1: segment cells")
    masks = segment_cells(cellpose_instance, img)

    print("Step 2: extract ROIs")
    cell_rois, cell_masks, cell_props = extract_cells(img, masks)

    print("Step 3: call spots and assign spots")
    table = []
    cell_signals = []
    for i, c_roi in enumerate(cell_rois):
        signals = {}
        for ch in signal_channels:
            signals[f"ch{ch+1}"] = []
        name = "+".join([f"ch{ch+1}" for ch in signal_channels])
        signals[name] = []
        merge_mask, signal_masks, signal_masks_sub = get_merge_and_split_masks(
            ufish_instance, c_roi, signal_channels,
            quantile=20, square_size=5)
        if signal_masks is None:
            for ch in signal_channels:
                signals[f"ch{ch+1}"] = []
            name = "+".join([f"ch{ch+1}" for ch in signal_channels])
            signals[name] = []
        else:
            for ch in signal_channels:
                single_ch = regionprops(label(signal_masks_sub[ch]))
                spots = np.array([cc.centroid for cc in single_ch])
                signals[f"ch{ch+1}"] = spots

            merged = regionprops(label(merge_mask))
            spots = np.array([cc.centroid for cc in merged])
            name = "+".join([f"ch{ch+1}" for ch in signal_channels])
            signals[name] = spots

        assigns = {}
        for name, spots in signals.items():
            try:
                assigns[name] = assign_spots(spots, cell_masks[w], 30)
            except Exception:
                assigns[name] = []

        df = {
            key: sum(value) for key, value in assigns.items()
            if isinstance(value, np.ndarray)
        }
        df["cell_id"] = f'{w+1}'
        last_key = list(df.keys())[-1]
        last_value = df.pop(last_key)
        df = {last_key: last_value, **df}
        table.append(df)
        cell_signals.append(signals)


(128, 128, 3)