<a href="https://colab.research.google.com/github/DWalicki95/MaterialsVision/blob/main/notebooks/cellpose_out_of_the_box_evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [38]:
!git clone https://github.com/DWalicki95/MaterialsVision.git

fatal: destination path 'MaterialsVision' already exists and is not an empty directory.


In [1]:
!pip install cellpose

Collecting cellpose
  Downloading cellpose-4.0.4-py3-none-any.whl.metadata (22 kB)
Collecting fastremap (from cellpose)
  Downloading fastremap-1.17.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting imagecodecs (from cellpose)
  Downloading imagecodecs-2025.3.30-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting roifile (from cellpose)
  Downloading roifile-2025.5.10-py3-none-any.whl.metadata (5.9 kB)
Collecting fill-voids (from cellpose)
  Downloading fill_voids-2.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.0 kB)
Collecting segment_anything (from cellpose)
  Downloading segment_anything-1.0-py3-none-any.whl.metadata (487 bytes)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.6->cellpose)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.6->cellpose)
  Downloading

In [2]:
import os

os.environ['PYTHONPATH'] = (
  '/content/MaterialsVision:' + os.environ.get('PYTHONPATH', '')
)

In [3]:
from pathlib import Path
import numpy as np
from cellpose.io import imread
import tifffile
import time

from cellpose import models, io, plot
from cellpose.metrics import (
    mask_ious, boundary_scores, aggregated_jaccard_index, average_precision
)
import torch
from skimage.segmentation import relabel_sequential
from cellpose.metrics import boundary_scores
from cellpose import utils
import gc
import pandas as pd
from typing import Dict, List
import json
import matplotlib.pyplot as plt

from datetime import datetime



Welcome to CellposeSAM, cellpose v
cellpose version: 	4.0.4 
platform:       	linux 
python version: 	3.11.13 
torch version:  	2.6.0+cu124! The neural network component of
CPSAM is much larger than in previous versions and CPU excution is slow. 
We encourage users to use GPU/MPS if available. 




In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
dataset_path = Path(
    '/content/drive/Othercomputers/Mój MacBook Air/train_and_test/'
    'synthetic_dataset_4000'
  )
train_path = dataset_path / 'train'
test_path = dataset_path / 'test'

In [6]:
def remove_temp_files(ds_path):
  ds = []
  for file in ds_path.rglob('*'):
    if str(file.name).startswith('.'):
      continue
    ds.append(file)
  return ds

In [7]:
def get_image_from_store(prefix: str = '_image', n_samples=10):
  images = []
  files = remove_temp_files(test_path)
  for f in files:
    if prefix in str(f):
      images.append(imread(f))
      if (n_samples == -1):
        continue
      if len(images) >= n_samples:
        return images
  return images

In [28]:
imgs = get_image_from_store(n_samples=200)

In [29]:
true_masks = get_image_from_store(prefix='_masks', n_samples=200)

In [10]:
# model = models.CellposeModel(
#     gpu=True,
#     diam_mean=None,
#     pretrained_model=model_path,
#     progress=True
#   )


In [11]:
# start_time = time.time()
# with torch.inference_mode():
#   pred_masks, flows, styles = model.eval(
#       imgs,
#       diameter=None,
#       batch_size=8,
#       progress=True
#   )
# duration = time.time() - start_time
# print(f'Evaluation of {len(imgs)} images lasts {duration / 60} minutes')

# Model evaluation

In [10]:
def get_bbox_from_mask(mask: np.ndarray, relabel_image: bool = True) -> dict:
  if relabel_image:
    mask, forward, inverse_map = relabel_sequential(mask, offset=1)
  labels = np.unique(mask)
  bboxes = {}
  for label in labels[labels>0]:
    rows, cols = np.where(mask==label)
    if rows.size == 0:
      continue
    y_min, y_max = rows.min(), rows.max()
    x_min, x_max = cols.min(), cols.max()
    bboxes[int(label)] = int(x_min), int(y_min), int(x_max), int(y_max)
  return bboxes

In [11]:
def get_n_pores_from_mask(mask: np.ndarray):
  labels = np.unique(mask)
  n_pores = len(labels) - (1 if 0 in labels else 0)
  return n_pores

In [12]:
def calculate_iou(bbox_true, bbox_pred, epsilon=1e-5):
  # coordinates of the intersection box
  x1 = np.max([bbox_true[0], bbox_pred[0]])
  y1 = np.max([bbox_true[1], bbox_pred[1]])
  x2 = np.min([bbox_true[2], bbox_pred[2]])
  y2 = np.min([bbox_true[3], bbox_pred[3]])

  # area of overlap
  width = (x2 - x1)
  height = (y2 - y1)

  # if there is no overlap
  if (width<0) or (height<0):
    return 0.0
  area_of_overlap = width * height
  # combined area
  area_bbox_true = (bbox_true[2] - bbox_true[0]) * (bbox_true[3] - bbox_true[1])
  area_bbox_pred = (bbox_pred[2] - bbox_pred[0]) * (bbox_pred[3] - bbox_pred[1])
  area_combined = area_bbox_true + area_bbox_pred - area_of_overlap
  # ratio of area of overlap over combined area
  iou = area_of_overlap / (area_combined + epsilon)
  return iou

In [13]:
def get_iou_matrix(bboxes_true, bboxes_pred):
  n_bboxes_true = len(bboxes_true)
  n_bboxes_pred = len(bboxes_pred)
  I = np.zeros((n_bboxes_true, n_bboxes_pred), dtype=np.float32)
  for i in range(n_bboxes_true):
    for j in range(n_bboxes_pred):
      I[i, j] = calculate_iou(bboxes_true[i], bboxes_pred[j])
  return I

In [14]:
def calculate_pores_difference(n_pores_true: int, n_pores_pred):
  return float(np.abs(n_pores_true - n_pores_pred) / n_pores_true)

In [15]:
def greedy_match_iou(iou_matrix, threshold):
  I = iou_matrix.copy()
  all_triples_list = []
  matched_true, matched_pred = set(), set()
  matches = []
  I[I < threshold] = 0.0
  N_true, N_pred = I.shape
  # get pores difference
  pores_diff = calculate_pores_difference(N_true, N_pred)

  non_zero = np.argwhere(I>0.0)
  for (i, j) in non_zero:
    all_triples_list.append((i, j, I[i, j]))

  sorted_all_triples_list = sorted(
      all_triples_list, key=lambda x: x[2], reverse=True
  )

  for triples in sorted_all_triples_list:
    if (triples[0] not in matched_true) & (triples[1] not in matched_pred):
      matched_true.add(triples[0])
      matched_pred.add(triples[1])
      matches.append((triples[0], triples[1], triples[2]))

  TP = len(matches)
  FN = N_true - len(matched_true)
  FP = N_pred - len(matched_pred)

  precision = (TP / (TP + FP) if (TP + FP) > 0 else 0.0)
  recall = (TP / (TP + FN) if (TP+FN) > 0 else 0.0)
  f1_score = (2 * (precision * recall) / (precision + recall) if
      (precision + recall) > 0 else 0.0
  )
  iou_mean = float(sum(score for (_, _, score) in matches) / TP if TP > 0 else 0.0)

  return {
      'TP': TP,
      'FP': FP,
      'FN': FN,
      'precision': precision,
      'recall': recall,
      'f1_score': f1_score,
      'iou_mean': iou_mean,
      'pores_diff': pores_diff
  }


In [16]:
def iou_scores_batch(true_masks, pred_masks, greedy_match_threshold = 0.5):
  iou_results = {}
  for idx, (true_mask, pred_mask) in enumerate(zip(true_masks, pred_masks)):
    pred_bboxes = get_bbox_from_mask(pred_mask)
    true_bboxes = get_bbox_from_mask(true_mask)
    iou_matrix = get_iou_matrix(
        list(pred_bboxes.values()), list(true_bboxes.values())
    )
    one_img_output = greedy_match_iou(iou_matrix, greedy_match_threshold)
    iou_results[idx] = one_img_output
  return iou_results

In [17]:
def boundary_scores_batched(masks_true, masks_pred, scales, batch_size=2):
    N = len(masks_true)
    M = len(scales)

    precision_all = np.zeros((M, N), dtype=float)
    recall_all    = np.zeros((M, N), dtype=float)
    fscore_all    = np.zeros((M, N), dtype=float)

    for start in range(0, N, batch_size):
        end = min(start + batch_size, N)
        # part of lists (batched solution)
        sub_true = masks_true[start:end]
        sub_pred = masks_pred[start:end]
        p_sub, r_sub, f_sub = boundary_scores(sub_true, sub_pred, scales)

        precision_all[:, start:end] = p_sub
        recall_all[:, start:end]    = r_sub
        fscore_all[:, start:end]    = f_sub

        # clean memory
        gc.collect()

    return precision_all, recall_all, fscore_all


In [18]:
def summarize_evaluation_iou(iou_results):
  TP_all, FP_all, FN_all = [], [], []
  micro_metrics = ['TP', 'FP', 'FN']
  macro_metrics_list = [
      metric for metric in list(
          [metric for metric in iou_results.values()][0].keys()
      ) if metric not in micro_metrics
  ]
  macro_metric_results = {}
  # macro evaluation
  for metric_dict in iou_results.values():
    for metric, value in metric_dict.items():
      if metric not in ['TP', 'FP', 'FN']:
        macro_metric_results[f'iou_mean_{metric}'] = np.mean(value)
  # micro evaluation
      elif metric == 'TP':
        TP_all.append(value)
      elif metric == 'FP':
        FP_all.append(value)
      elif metric == 'FN':
        FN_all.append(value)
  TP_all_sum = np.sum(TP_all)
  FP_all_sum = np.sum(FP_all)
  FN_all_sum = np.sum(FN_all)
  precision_micro = TP_all_sum / (TP_all_sum + FP_all_sum)
  recall_micro = TP_all_sum / (TP_all_sum + FN_all_sum)
  f1_micro = (
      2 * precision_micro * recall_micro
    ) / (precision_micro + recall_micro)
  # prepare final report
  report = pd.DataFrame.from_dict(
      macro_metric_results, orient='index', columns=['value']
    )
  report.loc['iou_precision_micro'] = precision_micro
  report.loc['iou_recall_micro'] = recall_micro
  report.loc['iou_f1_micro'] = f1_micro
  return report

In [19]:
def summarize_evaluation_boundary_score(precision_all, recall_all, fscore_all):
  bs_precision_mean = np.mean(precision_all)
  bs_recall_mean = np.mean(recall_all)
  bs_fscore_mean = np.mean(fscore_all)
  bs_metrics_dict = {
        'boundary_score_mean_precision': bs_precision_mean,
        'boundary_score_mean_recall':    bs_recall_mean,
        'boundary_score_mean_f1':        bs_fscore_mean,
  }
  report = pd.DataFrame.from_dict(
      bs_metrics_dict, orient='index', columns=['value']
  )
  return report

In [20]:
def get_today_datetime_str():
  today_datetime = datetime.now().strftime("%m/%d/%Y %H:%M:%S")
  today_datetime = today_datetime[:-3]  # skip seconds
  today_datetime = (
      today_datetime
      .replace('-', '_')
      .replace(':', '_')
      .replace('/', '_')
      .replace(' ', '__')
  )
  return today_datetime

In [21]:
def summarize_evaluation(
    output_filename: str,
    model_params: Dict,
    evaluating_duration: float,
    iou_results: Dict,
    boundary_score_precision_all: List,
    boundary_score_recall_all: List,
    boundary_score_fscore_all: List,
    evaluation_comment: str = '',
    save_raw_iou_results: bool = True,
    save_final_results: bool = True,
    output_directory_path = None,
    add_today_datetime_to_filename: bool = True
):
  report_iou = summarize_evaluation_iou(iou_results)
  report_boundary_score = summarize_evaluation_boundary_score(
      precision_all=boundary_score_precision_all,
      recall_all=boundary_score_recall_all,
      fscore_all=boundary_score_fscore_all
  )
  report_final = pd.concat([report_iou, report_boundary_score])
  report_final['comment'] = evaluation_comment
  report_final['model_params'] = [model_params] * len(report_final)
  report_final['evaluation_duration'] = evaluating_duration
  # save files
  today_datetime_str = (
      get_today_datetime_str() if add_today_datetime_to_filename else ''
  )
  if not output_directory_path:
    output_directory_path = Path(
        '/content/drive/MyDrive/evaluation_metrics'
    )
  # save raw iou
  if save_raw_iou_results:
    output_directory_path.mkdir(parents=True, exist_ok=True)
    output_filename = (
        f'raw_iou_results_{evaluation_comment}_{today_datetime_str}.json'
    )
    output_filepath = output_directory_path / output_filename
    with open(output_filepath, 'w') as f:
      json.dump(iou_results, f)
    print('Raw iou test dictionary saved.')
  # save final results
  if save_final_results:
    output_directory_path.mkdir(parents=True, exist_ok=True)
    output_filename = (
        f'evaluation_results_{today_datetime_str}.csv'
    )
    output_filepath = output_directory_path / output_filename
    report_final.to_csv(output_filepath)
    print('Final evaluation metrics report saved.')
    return report_final

In [22]:
def visualize_results(
    pred_masks,
    img_idx_to_visualize: int = 0,
    save_subsample: bool = True,
    output_directory_path = None,
    add_today_datetime_to_filename: bool = True,
    evaluation_comment: str = ''
):
  if not output_directory_path:
    output_directory_path = Path(
        '/content/drive/MyDrive/evaluation_metrics/plots'
    )
  today_datetime_str = (
      get_today_datetime_str() if add_today_datetime_to_filename else ''
  )
  # save images
  if save_subsample:
    output_directory_path.mkdir(parents=True, exist_ok=True)
    num_images = 10
    for img_idx in range(num_images):
        fig = plt.figure(figsize=(20, 12))
        plot.show_segmentation(
          fig=fig,
          maski=pred_masks[img_idx],
          img=imgs[img_idx],
          flowi=flows[img_idx][0]
        )
        plt.savefig(
            f'{output_directory_path}/{evaluation_comment}_figure_{img_idx}.jpg'
        )
        plt.close(fig)
  # show_figure
  fig = plt.figure(figsize=(20, 12))
  plot.show_segmentation(
    fig=fig,
    maski=pred_masks[img_idx_to_visualize],
    img=imgs[img_idx_to_visualize],
    flowi=flows[img_idx_to_visualize][0]
  )
  plt.show()

# Evaluation: default configuration

In [23]:
iou_results = iou_scores_batch(
    true_masks=true_masks,
    pred_masks=pred_masks,
    greedy_match_threshold=0.5
)

NameError: name 'pred_masks' is not defined

In [24]:
bs_precision_all, bs_recall_all, bs_fscore_all = boundary_scores_batched(
    true_masks,
    pred_masks,
    scales=[0.1],
    batch_size=3
)

NameError: name 'pred_masks' is not defined

In [None]:
summarize_evaluation(
    output_filename='evaluation_2',
    iou_results=iou_results,
    boundary_score_precision_all=bs_precision_all,
    boundary_score_recall_all=bs_recall_all,
    boundary_score_fscore_all=bs_fscore_all,
    evaluation_comment='evaluation_2_cyto3_baseline',
    save_raw_iou_results=True,
    save_final_results=True,
    add_today_datetime_to_filename=True
)

In [None]:
visualize_results(
    pred_masks=pred_masks,
    img_idx_to_visualize=0,
    save_subsample=True,
    add_today_datetime_to_filename=True,
    evaluation_comment='evaluation_2'
)

# Evaluation: parameters optimization

In [30]:
from sklearn.model_selection import ParameterGrid

In [31]:
model = models.CellposeModel(
    gpu=True,
    diam_mean=None
  )


In [32]:
parameters_dict = {
    'flow_threshold': [0.0, 0.4, 1.0],
    'cellprob_threshold': [0, 0.5, 1.0],
    'diameter': [None, 100, 250, 450],
    'normalize': [True, False],
    'augment': [True, False],
    'progress': [True]
}

In [None]:
i = 3  # 2 number of attempts before
for params in ParameterGrid(parameters_dict):
  if i == 3:
    i += 1  # next model number
    continue
  start_time = time.time()
  # prediction step
  with torch.inference_mode():
    pred_masks, flows, styles = model.eval(imgs, **params)
  # prediction additional info
  duration = (time.time() - start_time) / 60  # in minutes
  eval_comment = f'non-retrained-model_param_{i}'
  # evaluation (counting metrics and save results)
  iou_results = iou_scores_batch(
    true_masks=true_masks,
    pred_masks=pred_masks,
    greedy_match_threshold=0.5
  )
  bs_precision_all, bs_recall_all, bs_fscore_all = boundary_scores_batched(
    true_masks,
    pred_masks,
    scales=[0.1],
    batch_size=64
  )
  summarize_evaluation(
    output_filename=f'evaluation_{i}_cpsam',
    model_params=params,
    evaluating_duration=duration,
    iou_results=iou_results,
    boundary_score_precision_all=bs_precision_all,
    boundary_score_recall_all=bs_recall_all,
    boundary_score_fscore_all=bs_fscore_all,
    evaluation_comment=f'evaluation_{i}_cpsam',
    save_raw_iou_results=True,
    save_final_results=True,
    add_today_datetime_to_filename=True
  )
  print(f'Iteracja: {i}. Parametry: {params}')
  i += 1  # next model number
print('Evaluation finished.')

Raw iou test dictionary saved.
Final evaluation metrics report saved.
Iteracja: 4. Parametry: {'augment': True, 'cellprob_threshold': 0, 'diameter': None, 'flow_threshold': 0.0, 'normalize': False, 'progress': True}
Raw iou test dictionary saved.
Final evaluation metrics report saved.
Iteracja: 5. Parametry: {'augment': True, 'cellprob_threshold': 0, 'diameter': None, 'flow_threshold': 0.4, 'normalize': True, 'progress': True}
Raw iou test dictionary saved.
Final evaluation metrics report saved.
Iteracja: 6. Parametry: {'augment': True, 'cellprob_threshold': 0, 'diameter': None, 'flow_threshold': 0.4, 'normalize': False, 'progress': True}
Raw iou test dictionary saved.
Final evaluation metrics report saved.
Iteracja: 7. Parametry: {'augment': True, 'cellprob_threshold': 0, 'diameter': None, 'flow_threshold': 1.0, 'normalize': True, 'progress': True}
