In [None]:
# DAPI ↔︎ FITC (PAX6) overlap finder
# - Upload a single QuPath measurements CSV (both channels or just one; script reports what's present)
# - Strict channel mapping by filename suffix (customizable)
# - Outputs one CSV with PAIRS, DAPI_ONLY, and SUMMARY
# Usage (Colab):
#   1) Run this cell, upload your CSV when prompted.
#   2) Adjust TOL_UM and suffixes if your filenames differ.
#   3) Download the generated CSV.

import pandas as pd
import numpy as np
from sklearn.neighbors import KDTree
from google.colab import files
import io, sys, re, os
from typing import Tuple

# -------------------- USER SETTINGS --------------------
MARKER_NAME = "PAX6"   # FITC marker label to use in outputs
TOL_UM = 5.0           # distance tolerance (µm)

# If your exported image filenames end differently, edit these:
DAPI_SUFFIXES = ("-dapi.ndpi", "-dapi.tif", "-dapi.tiff")
FITC_SUFFIXES = ("-fitc.ndpi", "-fitc.tif", "-fitc.tiff")
# -------------------------------------------------------

def prompt_yn(msg="Continue? [y/n]: "):
    while True:
        a = input(msg).strip().lower()
        if a in ("y","n"): return a=="y"
        print("Please type 'y' or 'n'.")

def read_qupath_csv(bytes_obj: bytes) -> pd.DataFrame:
    """Read CSV with delimiter sniffing fallback."""
    try:
        return pd.read_csv(io.BytesIO(bytes_obj), sep=None, engine="python")
    except Exception:
        return pd.read_csv(io.BytesIO(bytes_obj))

def find_xy_cols(df: pd.DataFrame) -> Tuple[str,str]:
    """Find centroid columns across common QuPath headers."""
    lc = {str(c).strip().lower(): c for c in df.columns}
    x = (lc.get("centroid x µm") or lc.get("centroid x [µm]") or lc.get("centroid x [um]") or
         lc.get("centroid x (µm)") or lc.get("xm") or lc.get("x"))
    y = (lc.get("centroid y µm") or lc.get("centroid y [µm]") or lc.get("centroid y [um]") or
         lc.get("centroid y (µm)") or lc.get("ym") or lc.get("y"))
    if x is None or y is None:
        raise ValueError("Couldn't find centroid columns (expected e.g. 'Centroid X µm' / 'Centroid Y µm').")
    return x, y

def strict_channel_flags(img_series: pd.Series) -> Tuple[pd.Series, pd.Series]:
    """Return boolean Series for DAPI and FITC based on filename endings."""
    # normalize to casefolded strings
    s = img_series.astype(str).str.strip().str.casefold()
    is_dapi = s.str.endswith(DAPI_SUFFIXES, na=False)
    is_fitc = s.str.endswith(FITC_SUFFIXES, na=False)
    return is_dapi, is_fitc

def pair_to_dapi(dapi_xy: np.ndarray, mark_xy: np.ndarray, tol: float) -> pd.DataFrame:
    """For each DAPI point, keep nearest MARK within tol."""
    if len(dapi_xy)==0 or len(mark_xy)==0:
        return pd.DataFrame(columns=["X_DAPI","Y_DAPI","X_MARK","Y_MARK","dist_um"])
    tree = KDTree(mark_xy, leaf_size=40)
    dists, idx = tree.query(dapi_xy, k=1)
    dists = dists.ravel(); idx = idx.ravel()
    keep = dists <= tol
    out = pd.DataFrame({
        "X_DAPI": dapi_xy[:,0],
        "Y_DAPI": dapi_xy[:,1],
        "X_MARK": mark_xy[idx,0],
        "Y_MARK": mark_xy[idx,1],
        "dist_um": dists
    })
    return out[keep].reset_index(drop=True)

# -------------------- 1) Upload CSV --------------------
print("Upload your QuPath measurements CSV (image filenames + centroid coordinates).")
up = files.upload()
if not up:
    raise SystemExit("No file uploaded.")
csv_name, csv_bytes = list(up.items())[0]

# -------------------- 2) Load & normalize --------------------
df = read_qupath_csv(csv_bytes)

# Use 'Image' if present, else first column as filenames; keep a casefolded helper col
image_col = "Image" if "Image" in df.columns else df.columns[0]
df["__Image__"] = df[image_col].astype(str).str.strip().str.casefold()

# Find centroid columns -> rename to X,Y
xcol, ycol = find_xy_cols(df)
df = df.rename(columns={xcol: "X", ycol: "Y"})

# -------------------- 3) Strict channel mapping --------------------
is_dapi, is_fitc = strict_channel_flags(df["__Image__"])
df["Channel"] = None
df.loc[is_dapi, "Channel"] = "DAPI"
df.loc[is_fitc, "Channel"] = MARKER_NAME  # FITC marker

DAPI = df[df["Channel"]=="DAPI"][["X","Y"]].reset_index(drop=True)
MARK = df[df["Channel"]==MARKER_NAME][["X","Y"]].reset_index(drop=True)

# -------------------- 4) Overlap (DAPI -> nearest MARK ≤ tol) --------------------
pairs = pair_to_dapi(DAPI[["X","Y"]].to_numpy(), MARK[["X","Y"]].to_numpy(), TOL_UM)

# -------------------- 5) Numeric QC (no plots) --------------------
def rounded_dapi_set(pairs_df: pd.DataFrame):
    if pairs_df.empty: return set()
    r = (pairs_df["X_DAPI"].round(4).astype(str) + "," + pairs_df["Y_DAPI"].round(4).astype(str))
    return set(r)

if not DAPI.empty:
    DAPI["_round_key"] = DAPI["X"].round(4).astype(str) + "," + DAPI["Y"].round(4).astype(str)
else:
    DAPI["_round_key"] = pd.Series(dtype=str)

dapi_in_mark = rounded_dapi_set(pairs)
is_dapi_only = ~DAPI["_round_key"].isin(dapi_in_mark) if not DAPI.empty else pd.Series([], dtype=bool)
DAPI_ONLY = DAPI.loc[is_dapi_only, ["X","Y"]].reset_index(drop=True)

print("\nQC summary (no plots):")
print(f"  N_DAPI:                      {len(DAPI)}")
print(f"  N_{MARKER_NAME} (FITC):           {len(MARK)}")
print(f"  N_DAPI+{MARKER_NAME} (≤{TOL_UM} µm): {len(pairs)}")
print(f"  N_DAPI_ONLY:                 {len(DAPI_ONLY)}")
print(f"  N_{MARKER_NAME}_ONLY:            {max(len(MARK) - len(pairs), 0)}")

if not prompt_yn("Continue with save & download? [y/n]: "):
    raise SystemExit("Aborted by user after numeric QC.")

# -------------------- 6) Build one CSV (PAIRS + DAPI_ONLY + SUMMARY) --------------------
rows = []
if not pairs.empty:
    a = pairs.copy()
    a.insert(0, "RowType", f"PAIR_DAPI_{MARKER_NAME}")
    rows.append(a)

if len(DAPI_ONLY):
    b = DAPI_ONLY.copy()
    b = b.rename(columns={"X":"X_DAPI","Y":"Y_DAPI"})
    b["X_MARK"] = np.nan
    b["Y_MARK"] = np.nan
    b["dist_um"] = np.nan
    b.insert(0, "RowType", "DAPI_ONLY")
    rows.append(b)

n_pairs = len(pairs)
n_dapi_only  = len(DAPI_ONLY)
n_mark_only  = max(len(MARK) - n_pairs, 0)

summary = pd.DataFrame({
    "RowType": ["SUMMARY"]*6,
    "Metric": [
        "N_DAPI", f"N_{MARKER_NAME}",
        f"N_DAPI_plus_{MARKER_NAME}", "N_ONLY_DAPI", f"N_ONLY_{MARKER_NAME}",
        "TOL_UM"
    ],
    "Value": [
        len(DAPI), len(MARK),
        n_pairs, n_dapi_only, n_mark_only,
        TOL_UM
    ]
})
rows.append(summary)

out = pd.concat(rows, ignore_index=True) if rows else summary
out_name = f"overlap_counts_dapi_{MARKER_NAME.lower()}.csv"
out.to_csv(out_name, index=False)
files.download(out_name)
print(f"Done. Downloaded: {out_name}")

Upload your QuPath measurements CSV (image filenames + centroid coordinates).


Saving CO_D50_HIGH_BC_1-1_PAX6.csv to CO_D50_HIGH_BC_1-1_PAX6.csv

QC summary (no plots):
  N_DAPI:                      127403
  N_PAX6 (FITC):           27916
  N_DAPI+PAX6 (≤5.0 µm): 26820
  N_DAPI_ONLY:                 100583
  N_PAX6_ONLY:            1096


KeyboardInterrupt: Interrupted by user