# Preparation

### Connect to Drive and Kaggle

In [1]:
!pip install -q kaggle
from google.colab import files

files.upload()


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"gkyyds","key":"194d6831ee94cd778feec959408dc570"}'}

In [2]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [3]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


### Install Packages

In [4]:

# # Core WSI handling
!pip install openslide-python

# # Image processing
!pip install opencv-python Pillow

!pip install tifffile zarr imagecodecs opencv-python pillow pandas



Collecting openslide-python
  Downloading openslide_python-1.4.2-cp311-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (4.9 kB)
Downloading openslide_python-1.4.2-cp311-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.whl (36 kB)
Installing collected packages: openslide-python
Successfully installed openslide-python-1.4.2
Collecting zarr
  Downloading zarr-3.1.2-py3-none-any.whl.metadata (10 kB)
Collecting imagecodecs
  Downloading imagecodecs-2025.8.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (20 kB)
Collecting donfig>=0.8 (from zarr)
  Downloading donfig-0.8.1.post1-py3-none-any.whl.metadata (5.0 kB)
Collecting numcodecs>=0.14 (from numcodecs[crc32c]>=0.14->zarr)
  Downloading numcodecs-0.16.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.3 kB)
Collecting crc32c>=2.7 (from numcodecs[crc32c]>=0.14->zarr)
  Downloading crc32c-2.7.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.me

In [5]:
import os
from pathlib import Path
import zipfile
from glob import glob

import tifffile as tiff, zarr
import numpy as np
import cv2, pandas as pd
from PIL import Image

import torch, torchvision as tv
import h5py

import re


## Edge Detection - Contour function

In [6]:
def _to_uint8(img):
    if img.dtype == np.uint8: return img
    x = img.astype(np.float32); mn, mx = float(x.min()), float(x.max())
    if mx <= mn: return np.zeros_like(x, dtype=np.uint8)
    return ((x - mn) / (mx - mn) * 255.0).astype(np.uint8)




# 通过饱和度分离组织和背景（饱和度高和白色）
# 去掉噪点，让组织区域更连贯
# 找到轮廓线 cv2.findContours
def build_contour_mask_1024(
    lowres_rgb, # 低分辨率彩色图
    mthresh: int = 41,        # 去噪点
    sthresh: int = 8,        # 分离前景背景
    close: int = 3,           # morphology closing kernel
    min_area_fore: int = 6,  # min area (low-res px) 低分辩像素个数
    min_area_hole: int = 4,  # min hole area (low-res px)
    max_n_holes: int = 12,     # cap holes per region 最多保留孔洞数量
):


    img = _to_uint8(lowres_rgb)
    # 组织区域更有颜色，和白色对比强
    hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV); sat = hsv[..., 1]
    sat = cv2.medianBlur(sat, int(max(1, mthresh) | 1))
    _, bin_s = cv2.threshold(sat, int(sthresh), 255, cv2.THRESH_BINARY)

    # 填补小缝隙
    if close > 0:
        kernel = np.ones((int(close), int(close)), np.uint8)
        bin_s = cv2.morphologyEx(bin_s, cv2.MORPH_CLOSE, kernel)
    contours, hier = cv2.findContours(bin_s, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE)
    if hier is None or len(contours) == 0:
        return (bin_s > 0)
    hier = hier[0]  # [Next, Prev, First_Child, Parent]
    fore_ids = [i for i,h in enumerate(hier) if h[3] == -1]
    kept_fore, holes_per_fore = [], []
    for fid in fore_ids:
        a = cv2.contourArea(contours[fid])
        if a <= 0: continue
        # collect children (holes)
        holes = []
        child = hier[fid][2]
        while child != -1:
            holes.append(child)
            child = hier[child][0]
        hole_areas = [cv2.contourArea(contours[h]) for h in holes]
        real_a = a - (np.sum(hole_areas) if hole_areas else 0.0)
        if real_a >= float(min_area_fore):
            kept_fore.append(fid)
            holes_kept = [h for h in holes if cv2.contourArea(contours[h]) > float(min_area_hole)]
            holes_kept = sorted(holes_kept, key=lambda h: cv2.contourArea(contours[h]), reverse=True)[:max_n_holes]
            holes_per_fore.append(holes_kept)
    H, W = bin_s.shape
    mask = np.zeros((H, W), dtype=np.uint8)
    if kept_fore:
        cv2.drawContours(mask, [contours[i] for i in kept_fore], -1, 255, thickness=cv2.FILLED)
    for holes in holes_per_fore:
        if holes:
            cv2.drawContours(mask, [contours[i] for i in holes], -1, 0, thickness=cv2.FILLED)
    return (mask > 0)


# 是否留下patch
# 组织是否较多？是否有边缘？
def patch_keep(mask_bool, x_m, y_m, w_m, h_m, min_tissue = 0.20, min_edge = 0.05):
    H, W = mask_bool.shape[:2]
    x2, y2 = min(W, x_m + w_m), min(H, y_m + h_m)
    if x_m >= x2 or y_m >= y2: return False, {'cov':0.0,'edge':0.0}
    win = mask_bool[y_m:y2, x_m:x2]
    if win.size == 0: return False, {'cov':0.0,'edge':0.0}
    cov = float(win.mean())
    if cov == 0.0:
        edge_ratio = 0.0
    else:
        eroded = cv2.erode(win.astype(np.uint8), np.ones((3,3), np.uint8), 1).astype(bool)
        border = win ^ eroded
        edge_ratio = float(border.mean())
    keep = (cov >= min_tissue) or (edge_ratio >= min_edge)
    return keep, {'cov': cov, 'edge': edge_ratio}

# 打分数
# score = 0.6 * 组织覆盖率 + 0.4 * edge
def rank_key(stats: dict, alpha: float = 0.6):
    return alpha*float(stats.get('cov',0.0)) + (1.0-alpha)*float(stats.get('edge',0.0))


### Download data

In [7]:
drive_path = '/content/drive/MyDrive/PANDA_OneImage'
os.makedirs(drive_path, exist_ok=True)

image_id = '0005f7aaab2800f6170c399693a96917'
image_filename = f'train_images/{image_id}.tiff'


In [8]:
!kaggle competitions download -c prostate-cancer-grade-assessment -f train.csv -p '{drive_path}'
!kaggle competitions download -c prostate-cancer-grade-assessment -f '{image_filename}' -p '{drive_path}'

train.csv: Skipping, found more recently modified local copy (use --force to force download)
0005f7aaab2800f6170c399693a96917.tiff: Skipping, found more recently modified local copy (use --force to force download)


In [9]:
print(os.listdir(drive_path))

['train.csv', '0005f7aaab2800f6170c399693a96917.tiff', 'extracted', 'patches_10x', 'patches_20x']


In [10]:
from google.colab import drive
drive.mount('/content/drive')

import os, pathlib
drive_path = '/content/drive/MyDrive/PANDA_OneImage'
image_id = '0005f7aaab2800f6170c399693a96917'
image_filename = f'{image_id}.tiff'
tiff_path = os.path.join(drive_path, image_filename)

print("Exists:", os.path.exists(tiff_path))
print("Size (MB):", round(os.path.getsize(tiff_path)/1024/1024, 2))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Exists: True
Size (MB): 15.38


# Preprocessing

### Import and unzip

*   Many documents implment with OpenSlide, but this Kaggle one does not work as it's a zip --> tifffile and zarr


*   forgetground threshold: reject patches if less than % of pixels are tissues

*   min std: reject patches with low variations (blank)



In [11]:

# with tiff.TiffFile(f'{drive_path}/0005f7aaab2800f6170c399693a96917.tiff') as tf:
#   print(len(tf.series[0].levels))   # number of pyramid levels
#   for lvl in tf.series[0].levels:
#     print(lvl.shape)

In [12]:
ZIP_OR_TIFF_PATH = f'/content/drive/MyDrive/PANDA_OneImage/{image_id}.tiff'

# output path
EXTRACT_DIR = '/content/drive/MyDrive/PANDA_OneImage/extracted'     # where we unzip and get the real tiff
OUT_DIR     = '/content/drive/MyDrive/PANDA_OneImage/patches_20x'   # where we save patches/CSV, all outputs



# resolution flag
# reso = '20x'
reso = '20x'

Path(EXTRACT_DIR).mkdir(parents=True, exist_ok=True)
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

In [13]:
# unzip the file and return the real tiff
def get_real_tiff(path: str):
    # if file starts with 'PK', it's a ZIP
    with open(path, 'rb') as f:
        sig = f.read(2)
    if sig == b'PK':
        with zipfile.ZipFile(path) as z:
            print('Archive contents:', z.namelist())
            z.extractall(EXTRACT_DIR)
        tiffs = sorted(glob(os.path.join(EXTRACT_DIR, '**', '*.tif*'), recursive=True))
        if not tiffs:
            raise FileNotFoundError('No .tif/.tiff found after extraction.')
        return tiffs[0]
    return path


# zarr: n-dimensional array, like NumPy, but load what you need when you need
# WSIs giant -> zarr
# get the actual array of the pixel data
def _to_zarr_array(znode):
    obj = zarr.open(znode, mode='r')
    # if a single array
    if isinstance(obj, zarr.Array):
        return obj
    # if a group with multi arrays
    if isinstance(obj, zarr.Group):
        keys = list(obj.array_keys())
        if not keys:
            raise ValueError('Zarr Group has no arrays.')
        return obj[keys[0]]
    raise TypeError(f'Unexpected zarr node type: {type(obj)}')


# open the first image in tiff
# collect all levels （pyramid) , 40x --> 20x --> 10x....
def open_pyramid_as_zarr(tf: tiff.TiffFile):

    s0 = tf.series[0]
    arr0 = _to_zarr_array(s0.aszarr())        # level 0 (highest resolution)
    levels = [arr0] + [_to_zarr_array(l.aszarr()) for l in s0.levels]
    # compute downsample factors related to level 0
    # eg: L0 wid = 1000, L1 wid = 500, 1000 / 500 = 2
    downs = [1.0] + [arr0.shape[-2] / lvl.shape[-2] for lvl in levels[1:]]
    return levels, downs

# pick the one closest to my target, 20X here
def pick_level_for_target(downs, target_down=1.0):
    return int(np.argmin([abs(d - target_down) for d in downs]))

# standardize all to RGB unit8 format (H, W, 3)
def ensure_hwc(tile: np.ndarray):
    t = tile
    if t.ndim == 2: # grayscale
        t = np.stack([t]*3, axis=-1)
    elif t.ndim == 3 and t.shape[0] in (3,4) and t.shape[-1] not in (3,4): # (C, H, W)
        t = np.moveaxis(t, 0, -1)  # (C,H,W) -> (H,W,C)
    if t.shape[-1] > 3: # RGBA with alpha
        t = t[..., :3]            # drop alpha
    if t.dtype != np.uint8: # other format
        # best-effort clamp/convert (many WSIs are already uint8)
        t = np.clip(t, 0, 255).astype(np.uint8)
    return t

def save_png(arr: np.ndarray, path: Path):
    Image.fromarray(arr).save(path, format='PNG', compress_level=3)

In [14]:
REAL_TIFF_PATH = get_real_tiff(ZIP_OR_TIFF_PATH)
print('Using TIFF:', REAL_TIFF_PATH)


Archive contents: ['0005f7aaab2800f6170c399693a96917.tiff']
Using TIFF: /content/drive/MyDrive/PANDA_OneImage/extracted/0005f7aaab2800f6170c399693a96917.tiff


In [15]:
with tiff.TiffFile(REAL_TIFF_PATH) as tf:
    series = tf.series[0]
    low = series.levels[-1].asarray()
    img = Image.fromarray(low)

img.save(f"original{image_id}.png")

In [16]:
import tifffile as tiff
with tiff.TiffFile('/content/drive/MyDrive/PANDA_OneImage/extracted/0005f7aaab2800f6170c399693a96917.tiff') as tf:
    print(len(tf.series[0].levels))   # number of pyramid levels
    for lvl in tf.series[0].levels:
        print(lvl.shape)

3
(29440, 27648, 3)
(7360, 6912, 3)
(1840, 1728, 3)


In [17]:
if reso == '20x':
  # base level: 20X
  hi_level = 0
  PATCH, STRIDE = 1024, 512
  TARGET_DOWN = 1.0

else:
  # 10x level
  hi_level = 1
  PATCH, STRIDE = 512, 256
  TARGET_DOWN = 2.0

In [18]:
with tiff.TiffFile(REAL_TIFF_PATH) as tf:
    levels, downs = open_pyramid_as_zarr(tf)

print('Pyramid downsample factors vs level-0:', [f'{d:.2f}×' for d in downs])

L = pick_level_for_target(downs, TARGET_DOWN)
arrL = levels[L]

# extract H and W
H, W = arrL.shape[-3], arrL.shape[-2]
print(f'Chosen level: {L}  (downsample {downs[L]:.2f}×),  shape≈({H}, {W}, …)')

Pyramid downsample factors vs level-0: ['1.00×', '1.00×', '4.00×', '16.00×']
Chosen level: 0  (downsample 1.00×),  shape≈(29440, 27648, …)


## Run Contour part to get the non-black part

In [19]:
# choose the lowest reso to build the mask
low_level = len(levels) - 1
arr_low = levels[low_level]
lowres_rgb = np.asarray(arr_low)  # (H_low, W_low, 3) uint8
H_low, W_low = lowres_rgb.shape[:2]



arr0 = levels[hi_level]
H0, W0 = arr0.shape[0], arr0.shape[1]

# Map factor: pixels at hi_level → pixels at low_level
# If your 'downs' is defined as (down from level 0), then:
#   scale_hi_to_low = downs[hi_level] / downs[low_level]
# For hi_level=0 this simplifies to:
scale_hi_to_low = downs[hi_level] / downs[low_level]

# Build the contour mask at the low-res level
contour_mask = build_contour_mask_1024(lowres_rgb)


out_dir = f'patches_out_{reso}'; os.makedirs(out_dir, exist_ok=True)

kept = []
for y in range(0, H0 - PATCH + 1, STRIDE):
    for x in range(0, W0 - PATCH + 1, STRIDE):
        # Map hi-res patch → low-res mask window
        x_m = int(x * scale_hi_to_low)
        y_m = int(y * scale_hi_to_low)
        w_m = max(1, int(PATCH * scale_hi_to_low))
        h_m = max(1, int(PATCH * scale_hi_to_low))

        keep, stats = patch_keep(contour_mask, x_m, y_m, w_m, h_m, min_tissue=0.15, min_edge=0.04)
        if not keep:
            continue

        score = rank_key(stats, alpha=0.6)
        kept.append((score, x, y))

        # Read hi-res tile directly from zarr and save
        tile = np.asarray(arr0[y:y+PATCH, x:x+PATCH, :])
        if tile.shape[:2] != (PATCH, PATCH):
            continue
        Image.fromarray(_to_uint8(tile), 'RGB').save(os.path.join(out_dir, f'p_x{x}_y{y}_s{score:.3f}.png'))

  Image.fromarray(_to_uint8(tile), 'RGB').save(os.path.join(out_dir, f'p_x{x}_y{y}_s{score:.3f}.png'))


In [20]:

# Folder where your patches are stored
PATCH_DIR = f"patches_out_{reso}"

# Regex to parse filenames like p_x1024_y1024_s0.187.png
pattern = re.compile(r"p_x(\d+)_y(\d+)_s[\d.]+\.png")

# Load all patches and record their coordinates
patches = []
max_x, max_y = 0, 0

for fname in os.listdir(PATCH_DIR):
    match = pattern.match(fname)
    if not match:
        continue
    x, y = int(match.group(1)), int(match.group(2))
    img = Image.open(os.path.join(PATCH_DIR, fname))
    w, h = img.size
    patches.append((x, y, img))

    # Track max extent of the canvas
    max_x = max(max_x, x + w)
    max_y = max(max_y, y + h)

# Create a blank canvas large enough to hold all patches
canvas = Image.new("RGB", (max_x, max_y), (255, 255, 255))

# Paste each patch onto the canvas
for x, y, img in patches:
    canvas.paste(img, (x, y))

# Save the reconstructed image
out_path = f"reconstructed_{reso}.png"
canvas.save(out_path)
print(f"Reconstructed {reso} saved at {out_path}")

Reconstructed 20x saved at reconstructed_20x.png



# Model SetUp



In [21]:
!pip -q install open_clip_torch tqdm


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m61.6 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.8/44.8 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [22]:
import os, re, math, csv
from pathlib import Path
from typing import List, Tuple, Dict

import numpy as np
from PIL import Image, ImageDraw
from tqdm import tqdm

import torch
import open_clip  # open_clip_torch

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)


Using device: cpu


In [24]:
# 你的 patch 目录（10x 或 20x）
PATCH_DIR = "patches_out_20x"   # 例如：patches_out_10x / patches_out_20x

# 目录分辨率（自动从目录名猜，不放心可手填）
FROM_RESO = "10x" if "10x" in PATCH_DIR.lower() else ("20x" if "20x" in PATCH_DIR.lower() else "10x")

# 目标坐标系：统一回帖到 20x（如需 10x 自己改）
TARGET_RESO = "20x"

# 10x→20x 的缩放比例（通常≈2.0，亦可从TIFF金字塔精确计算）
SCALE_10X_TO_20X = 2.0

# patch 尺寸（像素）
PATCH_SIZE_10X = 512
PATCH_SIZE_20X = 1024

# （可选）送模型前统一 resize 到固定尺寸；None 表示使用 open_clip 自带的 preprocess
RESIZE_MODEL_INPUT = None  # 比如 224；None 表示不用

# 零样本文本标签（可按需修改/增删，建议英文专业术语更稳）
LABELS = [
    "benign gland",
    "Gleason pattern 3",
    "Gleason pattern 4",
    "Gleason pattern 5",
    "stroma"
]

# 输出目录前缀
OUT_PREFIX = "out_conch_style"  # 会生成 .npy/.csv/.png 等文件
os.makedirs(OUT_PREFIX, exist_ok=True)

print("PATCH_DIR =", PATCH_DIR)
print("FROM_RESO =", FROM_RESO)


PATCH_DIR = patches_out_20x
FROM_RESO = 20x


In [25]:
# 解析文件名：p_x1024_y1280_s0.389.png
FNAME_RE = re.compile(r"p_x(\d+)_y(\d+)_s([\d.]+)\.png", re.IGNORECASE)

def parse_patch_filename(fn: str) -> Tuple[int, int, float]:
    m = FNAME_RE.search(os.path.basename(fn))
    if not m:
        raise ValueError(f"Bad patch filename: {fn}")
    x = int(m.group(1)); y = int(m.group(2)); score = float(m.group(3))
    return x, y, score

def map_coords_to_20x(x: int, y: int,
                      from_reso: str,
                      scale_10x_to_20x: float = 2.0) -> Tuple[int, int]:
    """把 10x/20x 的坐标映射到 20x 坐标系。"""
    if from_reso.lower() == "20x":
        return x, y
    elif from_reso.lower() == "10x":
        return int(round(x * scale_10x_to_20x)), int(round(y * scale_10x_to_20x))
    else:
        raise ValueError(f"Unsupported from_reso = {from_reso}")

def patch_size_in_20x(from_reso: str,
                      patch_size_10x: int,
                      patch_size_20x: int,
                      scale_10x_to_20x: float = 2.0) -> int:
    """返回该分辨率的 patch 在 20x 坐标系下的尺寸（像素）。"""
    if from_reso.lower() == "20x":
        return patch_size_20x
    elif from_reso.lower() == "10x":
        return int(round(patch_size_10x * scale_10x_to_20x))
    else:
        raise ValueError(f"Unsupported from_reso = {from_reso}")

def build_patch_index(patch_dir: str) -> List[Dict]:
    files = sorted([str(p) for p in Path(patch_dir).glob("p_x*_y*_s*.png")])
    items = []
    for fn in files:
        x, y, s = parse_patch_filename(fn)
        items.append({"path": fn, "x": x, "y": y, "score": s})
    print(f"Found {len(items)} patches.")
    return items

def make_canvas_size(items: List[Dict]) -> Tuple[int, int]:
    W = 0; H = 0
    for it in items:
        W = max(W, it["x20"] + it["ps20"])
        H = max(H, it["y20"] + it["ps20"])
    return W, H


In [26]:
class ZSModel:
    def __init__(self, device="cuda" if torch.cuda.is_available() else "cpu"):
        self.device = device
        model_name, pretrained = "ViT-B-32", "openai"  # 也可换 ViT-L-14 等
        # 返回: model, preprocess_train, preprocess_val
        self.model, _, self.preprocess = open_clip.create_model_and_transforms(
            model_name, pretrained=pretrained
        )
        self.model.to(self.device).eval()
        self.tokenizer = open_clip.get_tokenizer(model_name)

    @torch.no_grad()
    def encode_image(self, pil_img: Image.Image) -> np.ndarray:
        # 如果用 CONCH：在这换 conch_preprocess(img)+model.encode_image(...)
        if RESIZE_MODEL_INPUT is not None:
            pil_img = pil_img.resize((RESIZE_MODEL_INPUT, RESIZE_MODEL_INPUT), Image.BILINEAR)
        img = self.preprocess(pil_img).unsqueeze(0).to(self.device)
        feats = self.model.encode_image(img)  # (1, D)
        feats = feats / feats.norm(dim=-1, keepdim=True)
        return feats.squeeze(0).detach().cpu().numpy()

    @torch.no_grad()
    def encode_texts(self, labels: List[str]) -> np.ndarray:
        # 如果用 CONCH：在这换相应文本编码接口
        toks = self.tokenizer(labels).to(self.device)
        feats = self.model.encode_text(toks)  # (K, D)
        feats = feats / feats.norm(dim=-1, keepdim=True)
        return feats.detach().cpu().numpy()

zs = ZSModel(device)
print("Model ready.")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


open_clip_model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]



Model ready.


In [29]:
import json

# --- 保存为 .npz（推荐），避免 pickle ---
NPZ_PATH = os.path.join(OUT_PREFIX, "patch_embeddings.npz")

np.savez_compressed(
    NPZ_PATH,
    paths=np.array(paths, dtype='U'),  # 字符串数组
    x20=x20,
    y20=y20,
    ps20=ps20a,
    score=score,
    embs=embs,  # (N, D) float32
    meta=np.array([json.dumps({
        "from_reso": FROM_RESO,
        "target_reso": TARGET_RESO,
        "scale_10x_to_20x": SCALE_10X_TO_20X,
        "patch_size_10x": PATCH_SIZE_10X,
        "patch_size_20x": PATCH_SIZE_20X,
        "labels": LABELS,
        "model": "open_clip ViT-B/32 openai",
        "resize_model_input": RESIZE_MODEL_INPUT
    })], dtype='U')  # 放到数组里，保持 npz 兼容
)

print("Saved:", NPZ_PATH, "embs shape:", embs.shape)



Saved: out_conch_style/patch_embeddings.npz embs shape: (166, 512)


In [30]:
# === 从 .npz 载入（可跨重启） ===
import json, csv

NPZ_PATH = os.path.join(OUT_PREFIX, "patch_embeddings.npz")
data = np.load(NPZ_PATH, allow_pickle=False)

paths = data["paths"]
x20   = data["x20"]
y20   = data["y20"]
ps20a = data["ps20"]
score = data["score"]
embs  = data["embs"]          # (N, D) float32
meta  = json.loads(str(data["meta"][0]))

# 如果你重启过内核，确保 zs/LABELS 等还在；没在就再跑下 Cell 4/Cell 2
print("Loaded:", NPZ_PATH, "| N =", len(paths), "| embs shape:", embs.shape)

# === 还原 items（供热图/可视化使用，不把 emb 塞进去以节省内存） ===
items = []
for i in range(len(paths)):
    items.append({
        "path": str(paths[i]),
        "x20": int(x20[i]),
        "y20": int(y20[i]),
        "ps20": int(ps20a[i]),
        "score": float(score[i]),
    })

# === 文本零样本编码 ===
# 需要 zs（Cell 4 里已构建 ZSModel）；如果未定义，请先跑 Cell 4
text_embs = zs.encode_texts(LABELS)  # (K, D)

# === 计算余弦相似度（emb 已 L2 归一化，点积即相似度） ===
sims = embs @ text_embs.T            # (N, K)
pred_idx = sims.argmax(axis=1)       # (N,)
pred_label = [LABELS[i] for i in pred_idx]
pred_score = sims.max(axis=1)

# === 导出 CSV ===
CSV_PATH = os.path.join(OUT_PREFIX, "patch_predictions.csv")
header = ["path", "x20", "y20", "ps20", "score", "pred_label", "pred_score"] + [f"sim_{lbl}" for lbl in LABELS]
with open(CSV_PATH, "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(header)
    for i in range(len(items)):
        row = [
            items[i]["path"], items[i]["x20"], items[i]["y20"], items[i]["ps20"],
            float(items[i]["score"]), pred_label[i], float(pred_score[i])
        ]
        row += [float(sims[i, j]) for j in range(len(LABELS))]
        w.writerow(row)
print("Saved:", CSV_PATH)


Loaded: out_conch_style/patch_embeddings.npz | N = 166 | embs shape: (166, 512)
Saved: out_conch_style/patch_predictions.csv


In [31]:
from PIL import Image, ImageDraw

def draw_heatmap_rects(items,
                       values: np.ndarray,   # (N,)
                       out_png: str,
                       alpha: float = 0.7):
    """
    最小可行热图：每个 patch 画成一个矩形，强度=values（归一化到0~255）。
    白底 + 红色通道叠加效果；若想更平滑可改为高斯权重/透明度叠加。
    """
    assert len(items) == len(values)

    # 计算画布大小（20×坐标系）
    W = 0; H = 0
    for it in items:
        W = max(W, it["x20"] + it["ps20"])
        H = max(H, it["y20"] + it["ps20"])

    base = Image.new("RGB", (W, H), (255, 255, 255))
    overlay = Image.new("L", (W, H), 0)
    draw = ImageDraw.Draw(overlay)

    v = values.astype(np.float32)
    v = (v - v.min()) / (v.max() - v.min() + 1e-8)
    v = (v * 255.0).astype(np.uint8)

    for it, val in zip(items, v):
        x, y, ps = it["x20"], it["y20"], it["ps20"]
        draw.rectangle([x, y, x + ps, y + ps], fill=int(val))

    # 伪彩：红通道=强度；与白底 alpha 混合
    heat = Image.merge("RGB", (overlay, Image.new("L", (W, H), 0), Image.new("L", (W, H), 0)))
    out = Image.blend(base, heat, alpha=alpha)
    out.save(out_png)
    return out_png


In [None]:
# 最大相似度热图
OUT_MAX = os.path.join(OUT_PREFIX, "heatmap_maxsim_20x.png")
draw_heatmap_rects(items, sims.max(axis=1), OUT_MAX, alpha=0.7)
print("Saved:", OUT_MAX)

# 各类别热图
for j, lbl in enumerate(LABELS):
    OUT_LBL = os.path.join(OUT_PREFIX, f"heatmap_{lbl.replace(' ', '_')}_20x.png")
    draw_heatmap_rects(items, sims[:, j], OUT_LBL, alpha=0.7)
    print("Saved:", OUT_LBL)


Saved: out_conch_style/heatmap_maxsim_20x.png
Saved: out_conch_style/heatmap_benign_gland_20x.png
Saved: out_conch_style/heatmap_Gleason_pattern_3_20x.png
Saved: out_conch_style/heatmap_Gleason_pattern_4_20x.png
