In [None]:
# Check OpenCV version
import cv2
cv2.__version__

In [None]:
# Import required libraries
from pathlib import Path
import napari
from napari.settings import get_settings
import pandas as pd
import os
import pyvista as pv
import numpy as np
import matplotlib.pyplot as plt
import SimpleITK as sitk
from scipy import ndimage as ndi
from scipy.ndimage import label, find_objects
from skimage.segmentation import relabel_sequential
from itertools import combinations
from skimage import filters
from skimage.segmentation import watershed
from skimage.feature import peak_local_max
from vispy.color import Colormap
from matplotlib.colors import to_rgb
from csbdeep.utils import normalize
from stardist.models import StarDist2D
from collections import defaultdict
from aicsimageio import AICSImage

# Enable interactive mode for napari in Jupyter
settings = get_settings()
settings.application.ipy_interactive = True

#### Functions

In [None]:
# Image processing and utility functions

def gamma_trans(im_in, gamma):
    """Apply gamma correction to an image."""
    val_c = 255.0 / (np.max(im_in)**gamma)
    return (val_c * (im_in**gamma)).copy()

def contr_limit(im_in, c_min, c_max):
    """Adjust contrast limits of an image."""
    alpha = 255.0 / (c_max - c_min)
    beta = -c_min * alpha
    return np.clip(alpha * im_in + beta, 0.0, 255.0).astype(int)

def hist_plot(im_in, stain_complete_df, thresh=0):
    """Plot histogram and CDF for each channel."""
    fig, axs = plt.subplots(1, im_in.shape[2], figsize=(15, 2))
    for z in range(im_in.shape[2]):
        hist, _ = np.histogram(im_in[:, :, z].flatten(), 256, [0, 256])
        cdf = hist.cumsum()
        cdf_normalized = cdf * hist.max() / cdf.max()
        color = stain_complete_df.loc[stain_complete_df.index[z], 'Color']
        axs[z].plot(cdf_normalized, color='b')
        axs[z].hist(im_in[:, :, z].flatten(), 256, [0, 256], color=color if color != 'white' else 'gray')
        axs[z].set_xlim([0, 256])
        axs[z].legend(('cdf', 'histogram'), loc='upper left')
        if thresh > 0:
            axs[z].plot([thresh, thresh], [0, cdf_normalized.max()], color='g')
        axs[z].set_title(stain_complete_df.index[z])
        axs[z].set_yscale('log')

def truncate_cell(val, width=15):
    """Truncate long values for display in tables."""
    val_str = str(val)
    return val_str if len(val_str) <= width else val_str[:width-3] + "..."

def merge_touching_labels(label_matrix):
    """Merge touching labels in a label matrix using union-find."""
    if label_matrix.max() == 0:
        return label_matrix.copy()
    padded = np.pad(label_matrix, 1, mode='constant', constant_values=0)
    touching = defaultdict(set)
    for i in range(1, padded.shape[0] - 1):
        for j in range(1, padded.shape[1] - 1):
            center = padded[i, j]
            if center == 0:
                continue
            neighborhood = padded[i-1:i+2, j-1:j+2].ravel()
            for neighbor in neighborhood:
                if neighbor != center and neighbor != 0:
                    touching[center].add(neighbor)
    all_labels = set(np.unique(label_matrix)) - {0}
    parent = {label: label for label in all_labels}
    def find(u):
        while parent[u] != u:
            parent[u] = parent[parent[u]]
            u = parent[u]
        return u
    def union(u, v):
        pu, pv = find(u), find(v)
        if pu != pv:
            parent[pu] = pv
    for u, neighbors in touching.items():
        for v in neighbors:
            if u in parent and v in parent:
                union(u, v)
    label_map = {label: find(label) for label in all_labels}
    merged = np.zeros_like(label_matrix, dtype=np.int32)
    for label, root in label_map.items():
        merged[label_matrix == label] = root
    merged, _, _ = relabel_sequential(merged)
    return merged

def remove_small_islands(binary_matrix, area_threshold):
    """Remove small connected components from a binary mask."""
    labeled_array, num_features = label(binary_matrix)
    for i in range(1, num_features + 1):
        component = (labeled_array == i)
        if component.sum() < area_threshold:
            binary_matrix[component] = 0
    return binary_matrix

def assign_labels(A, B, connectivity=1):
    """Assign labels from B to islands in A based on overlap."""
    structure = np.ones((3, 3)) if connectivity == 2 else None
    labeled_A, num_features = label(A, structure=structure)
    C = np.zeros_like(A)
    for i in range(1, num_features + 1):
        mask = labeled_A == i
        overlapping_labels = np.unique(B[mask & (B > 0)])
        C[mask] = overlapping_labels[0] if len(overlapping_labels) > 0 else 0
    return C

## File upload

In [None]:
# Load TIFF file and extract image data
tiff_file = 'PRO_EB-008_2D_M2_3.tif'
meta = AICSImage(tiff_file)
img = meta.get_image_data("XYZ", T=0) 
print(img.shape)

In [None]:
# Get physical pixel sizes
r_X = meta.physical_pixel_sizes.X
r_Y = meta.physical_pixel_sizes.Y
print([r_X, r_Y])

### Information about the staining

In [None]:
# Define staining dictionary and create DataFrame
stain_dict = {
    'MACRO': ['F4_80', 'Red'],
    'M2': ['CD206', 'Green'],
    'NUCLEI': ['DAPI', 'Blue']
}
stain_dict = {k.upper(): [item.upper() if isinstance(item, str) else item for item in v] for k, v in stain_dict.items()}
stain_df = pd.DataFrame.from_dict(stain_dict, orient='index', columns=['Marker', 'Color'])
stain_df.index.name = 'Condition'
if 'NUCLEI' not in stain_df.index:
    print('No nuclei condition!')

In [None]:
# Visualize each channel using napari
viewer_0 = napari.Viewer()
for c, c_name in enumerate(stain_df['Marker']):
    im_in = meta.get_image_data("XY", Z=c, C=0, S=0, T=0)
    im_in = (im_in / 256.0).astype('uint8')
    viewer_0.add_image(im_in, name=f"{stain_df.index[c]} ({c_name})", 
                        colormap=stain_df['Color'][c], blending='additive')

### Acquisition processing setup

In [None]:
# Setup for acquisition and contrast/gamma settings
name_setup = 'PRO_EB-009'
use_setup = True

stain_df = stain_df.reset_index(drop=False)
stain_complete_df = stain_df.copy()
stain_complete_df.set_index(['Condition', 'Marker', 'Color'], inplace=True)
stain_complete_df[['Cont_min', 'Cont_max', 'Gamma']] = [0, 255, 1]

setup_path = f"{name_setup}_setup.csv"
if use_setup and os.path.exists(setup_path):
    stain_setup_df = pd.read_csv(setup_path)
    stain_setup_df.set_index(['Condition', 'Marker', 'Color'], inplace=True)
    for idx in stain_complete_df.index:
        if idx in stain_setup_df.index:
            stain_complete_df.loc[idx] = stain_setup_df.loc[idx]
        else:
            use_setup = False

if not use_setup or not os.path.exists(setup_path):
    settings.application.ipy_interactive = False
    viewer_1 = napari.Viewer()
    for c, idx in enumerate(stain_complete_df.index):
        im_in = meta.get_image_data("XY", Z=c, C=0, S=0, T=0)
        im_in = (im_in / 256.0).astype('uint8')
        viewer_1.add_image(im_in, name=f"{idx[0]} ({idx[1]})", colormap=idx[2], blending='additive')
    napari.run()
    image_layers = [layer for layer in viewer_1.layers if isinstance(layer, napari.layers.Image)]
    contrast_limits = {layer.name: layer.contrast_limits for layer in image_layers}
    gamma_val = {layer.name: layer.gamma for layer in image_layers}
    stain_complete_df.sort_index(inplace=True)
    for c, idx in enumerate(stain_complete_df.index):
        name = f"{idx[0]} ({idx[1]})"
        stain_complete_df.loc[idx, 'Cont_min'] = int(contrast_limits[name][0])
        stain_complete_df.loc[idx, 'Cont_max'] = int(contrast_limits[name][1])
        stain_complete_df.loc[idx, 'Gamma'] = gamma_val[name]
    if os.path.exists(setup_path):
        stain_setup_df = pd.read_csv(setup_path)
        stain_setup_df.set_index(['Condition', 'Marker', 'Color'], inplace=True)
        for idx in stain_complete_df.index:
            stain_setup_df.loc[idx] = stain_complete_df.loc[idx]
    else:
        stain_setup_df = stain_complete_df.copy()
    stain_csv_setup_df = stain_setup_df.reset_index().sort_values(by='Condition')
    stain_csv_setup_df = stain_csv_setup_df[['Condition', 'Marker', 'Color', 'Cont_min', 'Cont_max', 'Gamma']]
    stain_csv_setup_df.to_csv(setup_path, index=False)

stain_df = stain_df.set_index('Condition')
stain_complete_df = stain_complete_df.reset_index().set_index('Condition')
stain_complete_df = stain_complete_df.loc[stain_df.index]
stain_complete_df = stain_complete_df[['Marker', 'Color', 'Cont_min', 'Cont_max', 'Gamma']]

In [None]:
# Display stain settings DataFrame
stain_complete_df

## MULTIPLE TRANSFORM

In [None]:
# Load and normalize image data for all channels
im_in = meta.get_image_data("XYZ", C=0, S=0, T=0)
im_in = (im_in / 256.0).astype('uint8')
im_original = im_in.copy()
im_out = im_original.copy()
im_trans = im_out.copy()

# Plot histogram for each channel
hist_plot(im_out, stain_complete_df)

In [None]:
# Noise removal using median filter
im_in = im_out.copy()
for z in range(im_in.shape[2]):
    im_out[:, :, z] = filters.median(im_in[:, :, z])
im_denoised = im_out.copy()
hist_plot(im_out, stain_complete_df)

In [None]:
# Gaussian filter for smoothing
im_in = im_out.copy()
for z in range(im_in.shape[2]):
    im_out[:, :, z] = filters.gaussian(im_in[:, :, z], 1.0, preserve_range=True)
im_filtered = im_out.copy()
hist_plot(im_out, stain_complete_df)

In [None]:
# Contrast and gamma adjustment for each channel
im_in = im_out.copy()
for c, idx in enumerate(stain_complete_df.index):
    im_out[:, :, c] = contr_limit(im_out[:, :, c], stain_complete_df.loc[idx, 'Cont_min'], stain_complete_df.loc[idx, 'Cont_max'])
    im_out[:, :, c] = gamma_trans(im_out[:, :, c], stain_complete_df.loc[idx, 'Gamma'])
im_trans = im_out.copy()
hist_plot(im_out, stain_complete_df)

In [None]:
# Thresholding using Otsu's method and small island removal
im_in = im_out.copy()
for z in range(im_in.shape[2]):
    th_filter = sitk.OtsuThresholdImageFilter()
    th_filter.SetInsideValue(0)
    th_filter.SetOutsideValue(200)
    seg = th_filter.Execute(sitk.GetImageFromArray(im_in[:, :, z]))
    arrayseg = sitk.GetArrayFromImage(seg)
    filtered = remove_small_islands(arrayseg, 30)
    im_out[:, :, z] = filtered
im_threshold = im_out.copy()

In [None]:
# Segmentation of nuclei using watershed or StarDist
im_in = im_out.copy()
im_out = np.zeros_like(im_in, dtype=np.int32)
trig_stardist = False  # Set to True to use StarDist model

for z in range(im_in.shape[2]):
    if stain_df.index[z] == 'NUCLEI':
        if trig_stardist:
            model = StarDist2D.from_pretrained('2D_versatile_fluo')
            img_te = normalize(im_trans[:, :, z], 1.0, 99.8)
            im_out[:, :, z], _ = model.predict_instances(img_te)
            im_mask = im_in[:, :, z] / np.max(im_in[:, :, z])
            im_mask = filters.binary_erosion(im_mask, footprint=np.ones((2, 2))).astype(im_mask.dtype)
            im_positive = im_out[:, :, z] * im_mask
        else:
            distance = ndi.distance_transform_edt(im_in[:, :, z])
            coords = peak_local_max(distance, footprint=np.ones((3, 3)), labels=im_in[:, :, z].astype(np.int32))
            mask = np.zeros(distance.shape, dtype=bool)
            mask[tuple(coords.T)] = True
            markers, _ = label(mask)
            transl = watershed(-distance, markers, mask=im_in[:, :, z])
            im_out[:, :, z] = merge_touching_labels(transl)
        cm_rand = np.random.rand(int(np.max(im_out[:, :, z])), 3)
        cm_rand[0, :] = [0.0, 0.0, 0.0]
        colormaps_rand = Colormap(cm_rand)
        im_segmented = im_out[:, :, z].copy()

In [None]:
# Assign segmented nuclei labels to other channels (cell assignment)
im_assigned = np.zeros_like(im_in, dtype=np.int32)
for z in range(im_in.shape[2]):
    if stain_df.index[z] != 'NUCLEI':
        im_assigned[:, :, z] = assign_labels(im_threshold[:, :, z].astype('int32'), im_segmented)

In [None]:
# Visualize original, denoised, filtered, corrected, thresholded, assigned, and segmented images
viewer_0 = napari.Viewer()
viewer_1 = napari.Viewer()
for z in range(im_in.shape[2]):
    idx = stain_complete_df.index[z]
    marker = stain_complete_df.loc[idx, 'Marker']
    color = stain_complete_df['Color'].iloc[z]
    viewer_0.add_image(im_original[:, :, z], name=f'ORIGINAL {idx} ({marker})', colormap=color, blending='additive')
    viewer_0.add_image(im_denoised[:, :, z], name=f'DENOISED {idx} ({marker})', colormap=color, blending='additive')
    viewer_0.add_image(im_filtered[:, :, z], name=f'FILTERED {idx} ({marker})', colormap=color, blending='additive')
    viewer_0.add_image(im_trans[:, :, z], name=f'CORRECTED {idx} ({marker})', colormap=color, blending='additive')
    viewer_1.add_image(im_threshold[:, :, z], name=f'THRESHOLDED {idx} ({marker})', colormap=color, blending='additive')
    if stain_df.index[z] != 'NUCLEI':
        viewer_1.add_image(im_assigned[:, :, z], name=f'ASSIGNED {idx} ({marker})', colormap=colormaps_rand, contrast_limits=[0, np.max(im_segmented)], blending='additive')
    else:
        viewer_1.add_image(im_segmented, name=f'SEGMENTED {idx} ({marker})', colormap=colormaps_rand, contrast_limits=[0, np.max(im_segmented)], blending='additive')
viewer_0.scale_bar.visible = True
viewer_0.scale_bar.unit = 'um'
viewer_1.scale_bar.visible = True
viewer_1.scale_bar.unit = 'um'

## QUANTIFICATION

In [None]:
# Quantify nuclei and cell properties
im_mask = im_segmented > 0
labels_dict = {}
i_nuc = stain_df.index.get_loc('NUCLEI')
marker = stain_df['Marker'][i_nuc]
positions = []
sizes = []
for n in range(1, int(np.max(im_segmented)) + 1):
    y, x = np.where(im_segmented == n)
    positions.append((np.mean(x * r_X), np.mean(y * r_Y)))
    sizes.append(x.size * r_X * r_Y)
labels_dict[stain_complete_df['Marker'].iloc[i_nuc]] = [
    stain_complete_df.index[i_nuc],
    stain_complete_df['Color'][i_nuc],
    int(np.max(im_segmented)),
    (),
    tuple(positions),
    tuple(sizes),
    ()
]
for i in range(im_in.shape[2]):
    if i != i_nuc:
        positions = []
        nuclei_sizes = []
        cell_sizes = []
        marker = stain_df['Marker'][i]
        for n in np.unique(im_assigned[:, :, i])[1:]:
            y, x = np.where(im_segmented == n)
            positions.append((np.mean(x * r_X), np.mean(y * r_Y)))
            nuclei_sizes.append(x.size * r_X * r_Y)
            cell_sizes.append(np.sum(im_assigned[:, :, i] == n) * r_X * r_Y)
        labels_dict[stain_complete_df['Marker'].iloc[i]] = [
            stain_complete_df.index[i],
            stain_complete_df['Color'][i],
            len(np.unique(im_assigned[:, :, i])[1:]),
            tuple(np.unique(im_assigned[1:, :, i])[1:]),
            tuple(positions),
            tuple(nuclei_sizes),
            tuple(cell_sizes)
        ]

In [None]:
# Create DataFrame for quantification results and truncate long values for display
labels_df = pd.DataFrame.from_dict(labels_dict, orient='index', columns=['Condition', 'Color', 'Number', 'Shared labels', 'Mean positions [um]', 'Nuclei size [um2]', 'Cell size [um2]'])
labels_df.index.name = 'Combination'
truncated_df = labels_df.copy()
for col in ["Shared labels", "Mean positions [um]", "Nuclei size [um2]", "Cell size [um2]"]:
    truncated_df[col] = truncated_df[col].apply(lambda x: truncate_cell(x))

In [None]:
# Display quantification DataFrame
labels_df

In [None]:
# Print summary statistics for nuclei and cell populations
print('TOT NUCLEI =', labels_df['Number'][0])
for i, marker in enumerate(labels_df.index):
    if labels_df['Condition'][i] != 'NUCLEI':
        print(f" PERC {labels_df['Condition'][i]} ({marker}) = {100.0 * labels_df['Number'][i] / labels_df['Number'][0]} %")
print('_' * 80)
print('MEAN SIZE NUCLEI =', np.mean(labels_df['Nuclei size [um2]'][0]), 'um2')
for i, marker in enumerate(labels_df.index):
    if labels_df['Condition'][i] != 'NUCLEI':
        print(f" MEAN SIZE NUCLEI {labels_df['Condition'][i]} ({marker}) = {np.mean(labels_df['Nuclei size [um2]'][i])} um2")
print('_' * 80)
for i, marker in enumerate(labels_df.index):
    if labels_df['Condition'][i] != 'NUCLEI':
        print(f"MEAN SIZE {labels_df['Condition'][i]} ({marker}) = {np.mean(labels_df['Cell size [um2]'][i])} um2")

## Evaluate cell distribution in the space

In [None]:
# Plot spatial distribution of nuclei and cells
fig, axs = plt.subplots(2, 1, figsize=(15, 10))
for i, marker in enumerate(labels_df.index):
    xcoor = [t[0] for t in labels_df['Mean positions [um]'][i]]
    ycoor = [t[1] for t in labels_df['Mean positions [um]'][i]]
    xcount, xbins = np.histogram(xcoor, range=(0, im_original.shape[0] * r_X), bins=30)
    ycount, ybins = np.histogram(ycoor, range=(0, im_original.shape[1] * r_Y), bins=30)
    xbin_centers = (xbins[:-1] + xbins[1:]) / 2
    ybin_centers = (ybins[:-1] + ybins[1:]) / 2
    color = stain_df.loc[str(labels_df['Condition'][i])]['Color']
    axs[0].plot(xbin_centers, xcount, label=str(labels_df['Condition'][i]), color=color)
    axs[1].plot(ybin_centers, ycount, label=str(labels_df['Condition'][i]), color=color)
axs[0].set_title('NUCLEI X DISTRIBUTION')
axs[0].set_xlabel('[μm]')
axs[0].legend(loc='upper right')
axs[0].set_facecolor('black')
axs[1].set_title('NUCLEI Y DISTRIBUTION')
axs[1].set_xlabel('[μm]')
axs[1].legend(loc='upper right')
axs[1].set_facecolor('black')

## Evaluate cell size distribution

In [None]:
# Plot size distribution of nuclei and cells
fig, axs = plt.subplots(2, 1, figsize=(15, 10))
nuclei_max_size = max(x for t in labels_df['Nuclei size [um2]'] for x in t)
cell_max_size = max(x for t in labels_df['Cell size [um2]'] for x in t)
for i, marker in enumerate(labels_df.index):
    nuclei_sizes = list(labels_df['Nuclei size [um2]'][i])
    cell_sizes = list(labels_df['Cell size [um2]'][i])
    color = stain_df.loc[str(labels_df['Condition'][i])]['Color']
    axs[0].hist(nuclei_sizes, range=(0, nuclei_max_size), bins=30, label=str(labels_df['Condition'][i]), alpha=1/len(labels_df), color=color)
    if labels_df['Condition'][i] != 'NUCLEI':
        axs[1].hist(cell_sizes, range=(0, cell_max_size), bins=30, label=str(labels_df['Condition'][i]), alpha=1/len(labels_df), color=color)
axs[0].set_title('NUCLEI SIZE DISTRIBUTION')
axs[0].set_xlabel('[μm2]')
axs[0].legend(loc='upper right')
axs[1].set_title('CELL SIZE DISTRIBUTION')
axs[1].set_xlabel('[μm2]')
axs[1].legend(loc='upper right')

### Create a complete report XSL

In [None]:
# Export quantification results to Excel file
with pd.ExcelWriter(Path(tiff_file).stem + '_nuclei_segmentation.xlsx', engine='xlsxwriter') as writer:
    stain_complete_df.to_excel(writer, sheet_name='Staining', index=True)
    xlsx_dict = {}
    columns = ['X position [um]', 'Y position [um]', 'Nuclei size [um2]']
    for i, marker in enumerate(labels_df.index):
        if labels_df['Condition'][i] != 'NUCLEI':
            columns.append(f"{marker} ({labels_df['Condition'][i]})")
            columns.append(f"{labels_df['Condition'][i]} Cell size [um2]")
    for k in range(1, int(labels_df['Number'][0])):
        row = [labels_df['Mean positions [um]'][0][k-1], labels_df['Nuclei size [um2]'][0][k-1]]
        row = [row[0][0], row[0][1], row[1]]
        for i, marker in enumerate(labels_df.index):
            if labels_df['Condition'][i] != 'NUCLEI':
                shared = labels_df['Shared labels'][i]
                if k in shared:
                    idx = list(shared).index(k)
                    row.append(marker)
                    row.append(labels_df['Cell size [um2]'][marker][idx])
                else:
                    row.extend(['', ''])
        xlsx_dict[k] = row
    cell_df = pd.DataFrame.from_dict(xlsx_dict, orient='index', columns=columns)
    cell_df.to_excel(writer, sheet_name='NUCLEI', index=True)
    resume_df = labels_df.drop(columns=['Shared labels', 'Mean positions [um]', 'Nuclei size [um2]', 'Cell size [um2]'])
    resume_df['%'] = [
        100.0 * labels_df['Number'][t] / labels_df['Number'][0] if labels_df['Condition'][t] != 'NUCLEI' else ''
        for t in range(len(labels_df))
    ]
    resume_df['Mean nuclei size [um2]'] = [np.mean(t) for t in labels_df['Nuclei size [um2]']]
    resume_df['Mean cell size [um2]'] = [
        np.mean(val) if labels_df['Condition'][t] != 'NUCLEI' else ''
        for t, val in enumerate(labels_df['Cell size [um2]'])
    ]
    resume_df.to_excel(writer, sheet_name='RECAP', index=True)