In [1]:
import os
import numpy as np
import pandas as pd
import json
import torch
import glob
import sys

def coordinates_to_dmatrix(a_coords, b_coords):
    a, b = torch.from_numpy(a_coords), torch.from_numpy(b_coords)
    return np.array(torch.cdist(a,b))

def object_detection_intersection_over_union(gt, pred, hsl=None, box_format="midpoint"):
    """
    Calculates the Intersection over Union (IoU) for object detection.

    Parameters
    ----------
    gt : numpy.ndarray
        Ground truth bounding boxes with shape (N, 2) if `box_format` is "midpoint" or (N, 4) if `box_format` is "corners".

    pred : numpy.ndarray
        Predicted bounding boxes with shape (N, 2) if `box_format` is "midpoint" or (N, 4) if `box_format` is "corners".

    hsl : float or numpy.ndarray, optional
        Half side length of the bounding boxes when `box_format` is "midpoint". Ignored if `box_format` is "corners".

    box_format : str, optional
        Format of the bounding boxes. Either "midpoint" (default) or "corners".
        - "midpoint": Bounding boxes are defined by their center coordinates and half side length (hsl).
        - "corners": Bounding boxes are defined by their corner coordinates [xmin, ymin, xmax, ymax].

    Returns
    -------
    numpy.ndarray
        IoU values for each pair of ground truth and predicted bounding boxes.
    """
    if box_format == "midpoint":
        # Calculate the corners of the bounding boxes
        xmin1, xmax1, ymin1, ymax1 = gt[:, 0] - hsl, gt[:, 0] + hsl, gt[:, 1] - hsl, gt[:, 1] + hsl
        xmin2, xmax2, ymin2, ymax2 = pred[:, 0] - hsl, pred[:, 0] + hsl, pred[:, 1] - hsl, pred[:, 1] + hsl
    elif box_format == "corners":
        xmin1, xmax1, ymin1, ymax1 = gt[:, 0], gt[:, 2], gt[:, 1], gt[:, 3]
        xmin2, xmax2, ymin2, ymax2 = pred[:, 0], pred[:, 2], pred[:, 1], pred[:, 3]

    # Calculate intersection area
    xmin = np.max(np.concatenate((xmin1[:, None], xmin2[:, None]), axis=1), axis=1)
    ymin = np.max(np.concatenate((ymin1[:, None], ymin2[:, None]), axis=1), axis=1)
    xmax = np.min(np.concatenate((xmax1[:, None], xmax2[:, None]), axis=1), axis=1)
    ymax = np.min(np.concatenate((ymax1[:, None], ymax2[:, None]), axis=1), axis=1)

    # Calculate intersection area
    intersection = np.clip(xmax - xmin, 0, None) * np.clip(ymax - ymin, 0, None)

    # Calculate area of the bounding boxes
    box1_area = abs((xmax1 - xmin1) * (ymax1 - ymin1))
    box2_area = abs((xmax2 - xmin2) * (ymax2 - ymin2))

    # Calculate union area
    union = np.clip(box1_area + box2_area - intersection, 1e-6, None)

    # Calculate and return IoU
    return intersection / union
  
# def object_detection_intersection_over_union(gt, pred, r):
#     """
#     计算两个圆（由中心点和半径定义）之间的 IoU。
#     - gt: [N, 2]，真实圆的质心坐标
#     - pred: [N, 2]，预测圆的质心坐标（必须与 gt 一一对应）
#     - r: float 或 [N] 数组，圆的半径，gt 和 pred 半径一致

#     返回：
#     - ious: [N] 数组，每对圆之间的 IoU
#     """
#     # 将输入统一为 numpy 数组
#     gt = np.asarray(gt)
#     pred = np.asarray(pred)
#     r = np.asarray(r)

#     # 检查变量 r 的维度是否为 0，即 r 是否是一个标量（单个数值）
#     if r.ndim == 0:
#         r = np.full(len(gt), r)

#     # 计算欧氏距离
#     d = np.linalg.norm(gt - pred, axis=1)   # 每对中心点之间的欧氏距离
#     r1 = r2 = r  # 假设真实和预测圆的半径相同
#     ious = np.zeros_like(d)  # 初始化结果

#     area1 = np.pi * r1**2  #两个圆的面积。
#     area2 = np.pi * r2**2

#     # 对每对圆进行 IoU 计算
#     for i in range(len(d)):
#         if d[i] >= r1[i] + r2[i]:
#             intersection = 0.0  # 不相交
#         elif d[i] <= abs(r1[i] - r2[i]):
#             intersection = np.pi * min(r1[i], r2[i])**2  # 完全包含
#         else:
#             # 部分相交，几何公式
#             # part1, part2: 两个圆相交扇形的面积；
#             part1 = r1[i]**2 * np.arccos((d[i]**2 + r1[i]**2 - r2[i]**2) / (2 * d[i] * r1[i]))
#             # part3: 两个圆相交区域中三角形的面积；
#             part2 = r2[i]**2 * np.arccos((d[i]**2 + r2[i]**2 - r1[i]**2) / (2 * d[i] * r2[i]))
#             part3 = 0.5 * np.sqrt(
#                 (-d[i] + r1[i] + r2[i]) *
#                 (d[i] + r1[i] - r2[i]) *
#                 (d[i] - r1[i] + r2[i]) *
#                 (d[i] + r1[i] + r2[i])
#             )
#             # 最终交集为两个扇形面积之和减去中间的三角形面积。
#             intersection = part1 + part2 - part3
           
#         # 并集 = 两圆面积和 - 交集面积；
#         union = area1[i] + area2[i] - intersection

#         ious[i] = intersection / (union + 1e-6)  # 加上小常数防止除0

#     return ious

def compute_od_metrics_per_dataset(path_of_gt, path_of_pred, model, beta=3, gt_threshold=0.6):
    # Initialize results array
    metrics = np.zeros(5)
    # print(metrics.shape)
    # (5)
    
    total_pred_len = 0
    # ious_list 和 weights_list 分别记录 IoU 值和预测权重。
    ious_list = []


    print("len_gt",len(path_of_gt))
    
    for i in range(0, len(path_of_gt), 1):
        gt_file = path_of_gt[i]

        image_name = gt_file.split('/')[-1].replace('.csv', '.json')
        pred_file = f'{path_of_pred}/{model}/{image_name}'

        if not os.path.exists(pred_file):
            print(f"Prediction file not found: {pred_file}")
            continue

        # 真实标签和预测结果
        df = pd.read_csv(gt_file, usecols=[0, 1, 2])
        
        diameter_column = df.iloc[:, 2]
        # 获取唯一值
        diameter = diameter_column.unique()
        diameter = int(diameter[0])
        r = int(diameter/2)

        gt_array = df.iloc[:, :2].to_numpy()  # 只保留前两列 (x, y)

        with open(pred_file, 'r') as file:
            pred_array = json.load(file)
        pred_array = np.array(pred_array)
        
        # 如果预测结果为空，则将 IoU 列表填充为 0，并跳过后续计算。
        if not len(pred_array):
            ious_list.extend([0] * len(gt_array))
            continue

        # 使用 coordinates_to_dmatrix 计算预测与真实标签之间的距离矩阵。
        # 找到每个 GT 最近的预测点。
        distances = coordinates_to_dmatrix(gt_array.astype(float), pred_array.astype(float))
        # print(distances.shape)
        # 用于找到数组中每行（沿着 axis=1 指定的轴）的最小值的索引
        min_distances, min_indices = np.min(distances, axis=1), np.argmin(distances, axis=1)
          # 调用 object_detection_intersection_over_union 计算 IoU。
        ious = object_detection_intersection_over_union(gt_array, pred_array[min_indices], np.repeat(r, len(gt_array)))
      
        # print("ious",ious.shape)
        # ious (59,)
        ious_list.extend(ious)

        total_pred_len += len(pred_array)
      
    ious_array = np.array(ious_list)

    # Calculate metrics
    
    if len(ious_array):
        mean_iou = np.round(np.mean(ious_array), 3)
    else:
        mean_iou = 0
    #  # 正确预测的目标占所有真实目标的比例
    recall = np.round(np.sum(ious_array > gt_threshold) / max(len(ious_array), 1e-6), 3)
    #  # 正确预测的目标占所有预测目标的比例
    precision = np.round(np.sum(ious_array > gt_threshold) / max(total_pred_len, 1e-6), 3)
    f1_score = np.round(2 * recall * precision / max((recall + precision), 1e-6), 3)
    f1_weighted = np.round((1 + beta ** 2) * recall * precision / max((recall + (beta ** 2) * precision), 1e-6), 3)

    metrics = mean_iou, recall, precision, f1_score, f1_weighted


    return metrics

In [12]:
empiar_ids=[10017,10028,10081,10093,10345,10532,11056]
output_txt_path = "evaluation_results.txt"

with open(output_txt_path, "w") as f:
    # 将所有print输出重定向到文件
    sys.stdout = f
    
    for empiar_id in empiar_ids:
        gt_path = list(glob.glob(f"/data/yb/Promptpoint/prompter/datasets/test_dataset/{empiar_id}/particle_coordinates/*.csv"))
        pre_path = f'/data/yb/CryoSegNet/output/star_files/{empiar_id}'

        print(f"Evaluation Results for EMPIAR ID {empiar_id}", flush=True)
        # evaluation(gt_path, model = "CrYOLO")
        # evaluation(gt_path, model = "Topaz")
        # evaluation(gt_path, model = "CryoSegNet")
        print('{:<5s} {:<8s} {:<12s} {:<8s} {:<8s} {:<8s}'.format('Dset', 'IoU','Precision','Recall', 'F1', 'F3'), flush=True)

        fr = compute_od_metrics_per_dataset(gt_path, pre_path, model = "prompter_SAM_star", beta=3, gt_threshold=0.5)

        print('{:<5d} {:<8.3f} {:<9.3f} {:<10.3f} {:<8.3f} {:<8.3f}'.format(empiar_id, fr[0], fr[2], fr[1], fr[3], fr[4]), flush=True)
        print(f"--------------------------------------", flush=True)
         # 恢复标准输出
    sys.stdout = sys.__stdout__

print(f"[INFO] All evaluation results saved to: {output_txt_path}")


In [2]:
empiar_ids=['10017_CNN',"10028_CNN"]
output_txt_path = "evaluation_results_CNN.txt"

with open(output_txt_path, "w") as f:
    # 将所有print输出重定向到文件
    sys.stdout = f
    
    for empiar_id_CNN in empiar_ids:
        empiar_id=empiar_id_CNN.split("_")[0]
        
        gt_path = list(glob.glob(f"/data/yb/Promptpoint/prompter/datasets/test_dataset/{empiar_id}/particle_coordinates/*.csv"))
        pre_path = f'/data/yb/CryoSegNet/output/star_files/{empiar_id_CNN}'

        print(f"Evaluation Results for EMPIAR ID {empiar_id_CNN}", flush=True)
        # evaluation(gt_path, model = "CrYOLO")
        # evaluation(gt_path, model = "Topaz")
        # evaluation(gt_path, model = "CryoSegNet")
        print('{:<5s} {:<8s} {:<12s} {:<8s} {:<8s} {:<8s}'.format('Dset', 'IoU','Precision','Recall', 'F1', 'F3'), flush=True)

        fr = compute_od_metrics_per_dataset(gt_path, pre_path, model = "prompter_SAM_star", beta=3, gt_threshold=0.5)

        print('{:<5s} {:<8.3f} {:<9.3f} {:<10.3f} {:<8.3f} {:<8.3f}'.format(empiar_id_CNN, fr[0], fr[2], fr[1], fr[3], fr[4]), flush=True)
        print(f"--------------------------------------", flush=True)
         # 恢复标准输出
    sys.stdout = sys.__stdout__

print(f"[INFO] All evaluation results saved to: {output_txt_path}")


[INFO] All evaluation results saved to: evaluation_results_CNN.txt


In [None]:
import os

folder_path = '/data/yb/CryoTransformer/output/predictions/11056/box_files'

# 列出文件夹中所有“文件”（不包括子文件夹）
file_list = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

# 输出数量
print(f"文件总数：{len(file_list)}")