In [1]:
import os
from typing import List, Tuple

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from cellpose import models, io
from skimage.color import rgb2hed
from skimage.filters import threshold_otsu
from skimage.morphology import (
    remove_small_objects,
    binary_closing,
    binary_erosion,
    binary_dilation,
    disk,
)
from skimage.measure import regionprops_table
from skimage.segmentation import find_boundaries


# =========================
# basic config
# =========================

WORK_DIR = "/Volumes/FlynnDisk/PengIHC/1205"
os.chdir(WORK_DIR)

POS_SLIDES: List[str] = [
    "CH-967-5.tif",
    "CH-967-1.tif",
    "CH-642-17.tif",
    "CH-642-16.tif",
    "CH-642-12.tif",
    "CH-642-11.tif",
    "CH-642-3.tif",
    "CH-225-21.tif",
    "CH-225-20.tif",
    "CH-225-19.tif",
    "CH-225-11.tif",
    "CH-225-9.tif",
    "CH-225-7.tif",
]

# cellpose nuclei model parameters
FLOW_THRESHOLD_NUC = 0.38
CELLPROB_THRESHOLD_NUC = -1.4
DIAMETER_NUC = 18

# Molecule Of Interest positive area parameters (你刚刚选的那组)
MIN_POS_SIZE = 150
CLOSING_DISK = 10
CORE_EROSION_DISK = 3
DILATION_DISK = 5
T_PIXEL_RAW = 0.0531

# whether to save overlay images for QC
SAVE_OVERLAYS = True
OVERLAY_DIR = "neg_overlays_strong_broad"
os.makedirs(OVERLAY_DIR, exist_ok=True)


# =========================
# helper functions
# =========================

def normalize_channel(ch: np.ndarray) -> np.ndarray:
    ch = ch.astype(np.float32)
    ch = ch - ch.min()
    max_val = ch.max()
    if max_val > 0:
        ch = ch / max_val
    return ch


def load_H_D(path: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Load RGB, compute H_norm and raw D channel."""
    img_rgb = io.imread(path)
    img_f = img_rgb.astype(np.float32) / 255.0

    ihc_hed = rgb2hed(img_f)
    H = ihc_hed[:, :, 0]
    D = ihc_hed[:, :, 2]   # raw CgA channel

    H_norm = normalize_channel(H)
    return img_rgb, H_norm, D


def segment_nuclei(H_norm: np.ndarray, model: models.CellposeModel) -> np.ndarray:
    """Run Cellpose on H channel to get nuclei masks."""
    masks_nuc, flows_nuc, styles_nuc = model.eval(
        H_norm,
        batch_size=16,
        diameter=DIAMETER_NUC,
        flow_threshold=FLOW_THRESHOLD_NUC,
        cellprob_threshold=CELLPROB_THRESHOLD_NUC,
    )
    return masks_nuc


def compute_positive_areas_with_fixed_threshold(D: np.ndarray, T_pixel: float):
    print(f"    [INFO] Using fixed T_pixel_raw: {T_pixel:.6f}")
    pos_area = D > T_pixel

    pos_area = remove_small_objects(pos_area, min_size=MIN_POS_SIZE)
    pos_area = binary_closing(pos_area, footprint=disk(CLOSING_DISK))

    pos_core = binary_erosion(pos_area, footprint=disk(CORE_EROSION_DISK))
    pos_dil  = binary_dilation(pos_core, footprint=disk(DILATION_DISK))

    return pos_core, pos_dil


def label_strong_broad(
    masks_nuc: np.ndarray,
    pos_core: np.ndarray,
    pos_dil: np.ndarray,
) -> pd.DataFrame:
    """
    For each nucleus, decide strong / broad positivity based on centroid.
    """
    props = regionprops_table(
        masks_nuc,
        properties=("label", "centroid"),
    )
    nuc_df = pd.DataFrame(props)

    h, w = pos_core.shape
    strong_flags = []
    broad_flags = []

    for cy, cx in zip(nuc_df["centroid-0"], nuc_df["centroid-1"]):
        r = int(round(cy))
        c = int(round(cx))

        in_strong = False
        in_broad = False

        if 0 <= r < h and 0 <= c < w:
            in_strong = bool(pos_core[r, c])
            in_broad = bool(pos_dil[r, c])

        strong_flags.append(in_strong)
        broad_flags.append(in_broad)

    nuc_df["CgA_strong"] = strong_flags
    nuc_df["CgA_broad"] = broad_flags

    return nuc_df


def make_overlay(
    img_rgb: np.ndarray,
    masks_nuc: np.ndarray,
    nuc_df: pd.DataFrame,
    save_path: str,
) -> None:
    """
    Save an overlay image with red (strong) and orange (broad-only) nuclei outlines.
    """
    labels_strong = nuc_df.loc[nuc_df["CgA_strong"], "label"].to_numpy()
    labels_broad = nuc_df.loc[nuc_df["CgA_broad"], "label"].to_numpy()

    mask_strong = np.isin(masks_nuc, labels_strong)
    mask_broad = np.isin(masks_nuc, labels_broad)

    bound_strong = find_boundaries(mask_strong, mode="inner")
    bound_strong = binary_dilation(bound_strong, footprint=disk(1))

    bound_broad = find_boundaries(mask_broad, mode="inner")
    bound_broad = binary_dilation(bound_broad, footprint=disk(1))

    # broad-only
    bound_broad_only = np.logical_and(bound_broad, ~bound_strong)

    base = img_rgb.astype(np.float32) / 255.0
    overlay = base.copy()

    red = np.array([1.0, 0.0, 0.0], dtype=np.float32)
    orange = np.array([1.0, 0.5, 0.0], dtype=np.float32)

    overlay[bound_strong] = red
    overlay[bound_broad_only] = orange

    fig, axes = plt.subplots(1, 2, figsize=(16, 8))
    axes[0].imshow(img_rgb)
    axes[0].axis("off")
    axes[0].set_title("Original RGB")

    axes[1].imshow(overlay)
    axes[1].axis("off")
    axes[1].set_title("CgA-positive nuclei (red=strong, orange=broad-only)")

    plt.tight_layout()
    fig.savefig(save_path, dpi=300)
    plt.close(fig)


# =========================
# main
# =========================

def main() -> None:
    print("[INFO] Loading Cellpose nuclei model...")
    nuc_model = models.CellposeModel(gpu=True)

    records = []

    for fname in POS_SLIDES:
        path = os.path.join(WORK_DIR, fname)
        print(f"\n[INFO] Processing positive slide: {fname}")

        img_rgb, H_norm, D = load_H_D(path)

        print("  [STEP] Nuclei segmentation...")
        masks_nuc = segment_nuclei(H_norm, nuc_model)

        nuc_labels = np.unique(masks_nuc)
        nuc_labels = nuc_labels[nuc_labels != 0]
        total_cells = len(nuc_labels)
        print(f"    [INFO] Total nuclei: {total_cells}")

        if total_cells == 0:
            print("    [WARN] No nuclei found, skipping.")
            records.append(
                {
                    "slide": fname,
                    "total_cells": 0,
                    "strong_cells": 0,
                    "broad_cells": 0,
                    "strong_fraction": np.nan,
                    "broad_fraction": np.nan,
                    "t_cga": np.nan,
                }
            )
            continue

        print("  [STEP] CgA positive area (core/dilated)...")
        pos_core, pos_dil = compute_positive_areas_with_fixed_threshold(D, T_PIXEL_RAW)

        print("  [STEP] Label strong/broad positive nuclei...")
        nuc_df = label_strong_broad(masks_nuc, pos_core, pos_dil)

        N_strong = int(nuc_df["CgA_strong"].sum())
        N_broad = int(nuc_df["CgA_broad"].sum())
        frac_strong = N_strong / total_cells
        frac_broad = N_broad / total_cells

        print(f"    [RESULT] strong={N_strong} ({frac_strong:.4f}), "
              f"broad={N_broad} ({frac_broad:.4f})")

        records.append(
            {
                "slide": fname,
                "total_cells": total_cells,
                "strong_cells": N_strong,
                "broad_cells": N_broad,
                "strong_fraction": frac_strong,
                "broad_fraction": frac_broad,
                "T_pixel_raw": T_PIXEL_RAW,
            }
        )


        if SAVE_OVERLAYS:
            save_path = os.path.join(
                OVERLAY_DIR,
                f"{os.path.splitext(fname)[0]}_overlay.png"
            )
            print(f"  [STEP] Saving overlay to {save_path}")
            make_overlay(img_rgb, masks_nuc, nuc_df, save_path)

    # summary
    df = pd.DataFrame(records)
    print("\n[INFO] Summary over positive slides:")
    print(df)

    print("\n[INFO] Strong fraction: mean="
          f"{df['strong_fraction'].mean():.4f}, "
          f"max={df['strong_fraction'].max():.4f}")

    print("[INFO] Broad fraction:  mean="
          f"{df['broad_fraction'].mean():.4f}, "
          f"max={df['broad_fraction'].max():.4f}")

    df.to_csv("positive_slides_strong_broad_summary.csv", index=False)
    print("[INFO] Saved summary to positive_slides_strong_broad_summary.csv")


if __name__ == "__main__":
    main()


[INFO] Loading Cellpose nuclei model...

[INFO] Processing positive slide: CH-967-5.tif
  [STEP] Nuclei segmentation...
    [INFO] Total nuclei: 1191
  [STEP] CgA positive area (core/dilated)...
    [INFO] Using fixed T_pixel_raw: 0.053100
  [STEP] Label strong/broad positive nuclei...
    [RESULT] strong=17 (0.0143), broad=26 (0.0218)
  [STEP] Saving overlay to neg_overlays_strong_broad/CH-967-5_overlay.png

[INFO] Processing positive slide: CH-967-1.tif
  [STEP] Nuclei segmentation...
    [INFO] Total nuclei: 912
  [STEP] CgA positive area (core/dilated)...
    [INFO] Using fixed T_pixel_raw: 0.053100
  [STEP] Label strong/broad positive nuclei...
    [RESULT] strong=263 (0.2884), broad=331 (0.3629)
  [STEP] Saving overlay to neg_overlays_strong_broad/CH-967-1_overlay.png

[INFO] Processing positive slide: CH-642-17.tif
  [STEP] Nuclei segmentation...
    [INFO] Total nuclei: 672
  [STEP] CgA positive area (core/dilated)...
    [INFO] Using fixed T_pixel_raw: 0.053100
  [STEP] Label 