In [1]:
import os
import sys
import cv2
import logging
import numpy as np
import csv
from scipy.interpolate import splprep, splev
from scipy.ndimage import gaussian_filter1d
import matplotlib.pyplot as plt

# ---------- 配置参数 ----------
from dataclasses import dataclass

@dataclass
class AnalysisConfig:
    pixel_size_um: float = 1.0
    min_contour_length: int = 50
    min_perimeter: float = 50.0
    smoothing_factor_ratio: float = 0.1
    max_reasonable_curvature_factor: float = 10.0
    endpoint_exclude_ratio: float = 0.02  # 2%
    circle_window_size: int = 7
    angle_window_size: int = 5

# ---------- 日志及辅助 ----------
def setup_logging(output_folder):
    log_file = os.path.join(output_folder, 'analysis.log')
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file, encoding='utf-8'),
            logging.StreamHandler()
        ]
    )

def create_output_folder(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

def validate_config(config):
    if config.pixel_size_um <= 0:
        raise ValueError("pixel_size_um必须大于0")
    if config.min_contour_length < 10:
        logging.warning("min_contour_length太小，可能影响曲率计算精度")
    if config.endpoint_exclude_ratio < 0 or config.endpoint_exclude_ratio > 0.4:
        raise ValueError("endpoint_exclude_ratio应在0到0.4之间")

# ---------- 图像预处理 ----------
class ImageProcessor:
    SUPPORTED_FORMATS = ('.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif')

    def __init__(self, config):
        self.config = config
        self.check_dependencies()

    def check_dependencies(self):
        if not hasattr(cv2, 'ximgproc'):
            raise ImportError("cv2.ximgproc模块不可用。请安装opencv-contrib-python")

    def preprocess_image(self, image_path):
        try:
            img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
            if img is None:
                raise ValueError(f"无法加载图像: {image_path}")

            h, w = img.shape
            max_size = 2048
            if max(h, w) > max_size:
                scale = max_size / max(h, w)
                img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)
                logging.info(f"图像尺寸缩放至: {img.shape[1]}x{img.shape[0]}")

            # 高斯模糊降噪
            img = cv2.GaussianBlur(img, (3, 3), 0)

            # 自适应阈值，反转得到前景白色
            binary = cv2.adaptiveThreshold(img, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                           cv2.THRESH_BINARY_INV, 11, 2)

            kernel = np.ones((2, 2), np.uint8)
            binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
            binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)

            # 使用ximgproc thinning 细化骨架
            skeleton = cv2.ximgproc.thinning(binary)
            return skeleton

        except Exception as e:
            logging.error(f"图像预处理失败 ({image_path}): {e}")
            return None

# ---------- 轮廓分析 ----------
class ContourAnalyzer:
    def __init__(self, config):
        self.config = config

    def extract_contours(self, skeleton):
        contours, _ = cv2.findContours(skeleton, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
        valid = []
        for cnt in contours:
            if len(cnt) >= self.config.min_contour_length:
                perimeter = cv2.arcLength(cnt, False)
                if perimeter > self.config.min_perimeter:
                    valid.append(cnt)
        return valid

    def resample_contour(self, points, num_points=None):
        if num_points is None:
            num_points = len(points)
        distances = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1))
        cumdist = np.concatenate(([0], np.cumsum(distances)))
        if cumdist[-1] == 0:
            return points
        uniform_dist = np.linspace(0, cumdist[-1], num_points)
        x_interp = np.interp(uniform_dist, cumdist, points[:, 0])
        y_interp = np.interp(uniform_dist, cumdist, points[:, 1])
        return np.column_stack((x_interp, y_interp))

    def smooth_contour_spline(self, points, smoothing_factor=None):
        if len(points) < 4:
            return points
        if smoothing_factor is None:
            smoothing_factor = len(points) * self.config.smoothing_factor_ratio
        try:
            tck, u = splprep([points[:, 0], points[:, 1]], s=smoothing_factor, per=False, k=min(3, len(points)-1))
            u_fine = np.linspace(0, 1, len(points)*2)
            smooth_points = np.array(splev(u_fine, tck)).T
            return smooth_points
        except Exception as e:
            logging.warning(f"B样条平滑失败: {e}")
            return points

# ---------- 曲率计算 ----------
class CurvatureCalculator:
    def __init__(self, config):
        self.config = config

    def calculate_curvature_angle_based(self, points):
        n = len(points)
        curv = np.zeros(n)
        points_um = points * self.config.pixel_size_um
        exclude_count = max(2, int(n * self.config.endpoint_exclude_ratio))
        start = exclude_count
        end = n - exclude_count
        window = self.config.angle_window_size
        for i in range(start, end):
            prev_idx = max(0, i - window // 2)
            next_idx = min(n - 1, i + window // 2)
            if next_idx - prev_idx >= 2:
                v1 = points_um[i] - points_um[prev_idx]
                v2 = points_um[next_idx] - points_um[i]
                len1 = np.linalg.norm(v1)
                len2 = np.linalg.norm(v2)
                if len1 > 1e-10 and len2 > 1e-10:
                    cos_theta = np.dot(v1, v2) / (len1 * len2)
                    cos_theta = np.clip(cos_theta, -1, 1)
                    angle = np.arccos(cos_theta)
                    arc_length = (len1 + len2) / 2
                    if arc_length > 1e-10:
                        curv[i] = angle / arc_length
        # 曲率平滑
        curv = gaussian_filter1d(curv, sigma=1)
        return curv

# ---------- 曲率质量评估 ----------
class QualityAssessor:
    def __init__(self, config):
        self.config = config

    def assess_curvature_quality(self, curvatures, points):
        n = len(curvatures)
        if n < 10:
            return {'valid_ratio': 0, 'endpoint_effect': True, 'mean_curvature': 0,
                    'std_curvature': 0, 'valid_count': 0}

        valid_mask = curvatures > 0.001
        valid_ratio = np.sum(valid_mask) / n

        exclude_count = max(2, int(n * self.config.endpoint_exclude_ratio))
        front = curvatures[:exclude_count]
        back = curvatures[-exclude_count:]
        middle = curvatures[exclude_count:-exclude_count] if exclude_count < n//2 else curvatures[n//4:-n//4]

        endpoint_effect = False
        if len(middle) > 0 and np.mean(middle) > 0:
            front_mean = np.mean(front[front > 0.001]) if np.any(front > 0.001) else 0
            back_mean = np.mean(back[back > 0.001]) if np.any(back > 0.001) else 0
            middle_mean = np.mean(middle[middle > 0.001]) if np.any(middle > 0.001) else 0
            if middle_mean > 0:
                front_ratio = front_mean / middle_mean if front_mean > 0 else 0
                back_ratio = back_mean / middle_mean if back_mean > 0 else 0
                endpoint_effect = front_ratio > 2.0 or back_ratio > 2.0

        valid_curv = curvatures[valid_mask]
        mean_c = np.mean(valid_curv) if len(valid_curv) > 0 else 0
        std_c = np.std(valid_curv) if len(valid_curv) > 1 else 0

        return {'valid_ratio': valid_ratio, 'endpoint_effect': endpoint_effect,
                'mean_curvature': mean_c, 'std_curvature': std_c, 'valid_count': len(valid_curv)}

    def print_contour_stats(self, contour_id, points, angle_curv, angle_q):
        valid_angle = angle_curv[angle_curv > 0.001]
        print(f"  Contour {contour_id}: {len(points)} points")
        print(f"    Angle:  {np.mean(valid_angle):.6f} 1/μm (valid: {len(valid_angle)}/{len(points)}) {'⚠️' if angle_q['endpoint_effect'] else '✓'}")

# ---------- 可视化 ----------
class CurvatureVisualizer:
    def __init__(self, config):
        self.config = config

    def visualize_all_contours(self, all_contours_data, output_path):
        if not all_contours_data:
            return
        fig, axes = plt.subplots(1, 3, figsize=(15, 5))
        colors = plt.cm.tab10(np.linspace(0, 1, len(all_contours_data)))

        # 1. 原始轮廓
        ax = axes[0]
        for i, data in enumerate(all_contours_data):
            points = data['points']
            ax.plot(points[:, 0], points[:, 1], color=colors[i], linewidth=2, alpha=0.8, label=f'Contour {i+1}')
        ax.set_title('Original Contours')
        ax.set_xlabel('X (pixels)')
        ax.set_ylabel('Y (pixels)')
        ax.axis('equal')
        ax.grid(True, alpha=0.3)
        ax.legend(fontsize=8)

        # 2. 角度曲率散点图
        ax = axes[1]
        scatter = None
        for i, data in enumerate(all_contours_data):
            points = data['points']
            curvature = data['angle_curvature']
            valid_mask = curvature > 0.001
            if np.any(valid_mask):
                scatter = ax.scatter(points[valid_mask, 0], points[valid_mask, 1],
                                     c=curvature[valid_mask], cmap='jet', s=15, alpha=0.8)
        ax.set_title('Angle-based Curvature')
        ax.axis('equal')
        ax.grid(True, alpha=0.3)
        if scatter is not None:
            cbar = plt.colorbar(scatter, ax=ax)
            cbar.set_label('Curvature (1/μm)', rotation=270, labelpad=15)

        # 3. 曲率直方图
        ax = axes[2]
        all_angle = []
        for data in all_contours_data:
            valid = data['angle_curvature'][data['angle_curvature'] > 0.001]
            all_angle.extend(valid)
        if all_angle:
            ax.hist(all_angle, bins=30, color='green', alpha=0.7, edgecolor='black')
            ax.set_title(f'Angle Curvature\n(Mean: {np.mean(all_angle):.4f} ± {np.std(all_angle):.4f} 1/μm)')
        else:
            ax.set_title('Angle Curvature\n(No valid data)')
        ax.set_xlabel('Curvature (1/μm)')
        ax.set_ylabel('Frequency')
        ax.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        plt.close()
        logging.info(f"Visualization saved to: {output_path}")

# ---------- 数据导出 ----------
class DataExporter:
    def __init__(self, config):
        self.config = config

    def save_csv_data(self, all_contours_data, output_path):
        header = ['contour_id', 'point_id', 'x_um', 'y_um', 'angle_curvature', 'angle_valid']
        try:
            with open(output_path, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(header)
                for contour_id, data in enumerate(all_contours_data):
                    points = data['points']
                    points_um = points * self.config.pixel_size_um
                    angle_curvature = data['angle_curvature']
                    for point_id, (x, y, ac) in enumerate(zip(points_um[:, 0], points_um[:, 1], angle_curvature)):
                        angle_valid = ac > 0.001
                        writer.writerow([contour_id, point_id, x, y, ac, angle_valid])
            logging.info(f"CSV data saved to: {output_path}")
        except Exception as e:
            logging.error(f"Failed to save CSV data: {e}")

    def save_summary_report(self, all_results, output_path):
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write("# 曲率分析汇总报告\n\n")
                f.write(f"## 分析配置\n")
                f.write(f"- 像素尺寸: {self.config.pixel_size_um} μm/pixel\n")
                f.write(f"- 最小轮廓长度: {self.config.min_contour_length}\n")
                f.write(f"- 端点排除比例: {self.config.endpoint_exclude_ratio}\n")
                f.write(f"- 角度窗口大小: {self.config.angle_window_size}\n\n")
                total_contours = 0
                total_valid_points = 0
                all_angle_means = []
                for image_name, data_list in all_results.items():
                    f.write(f"## {image_name}\n")
                    total_contours += len(data_list)
                    for i, data in enumerate(data_list):
                        quality = data['quality_metrics']
                        f.write(f"### 轮廓 {i+1}\n")
                        f.write(f"- 点数: {len(data['points'])}\n")
                        f.write(f"- 角度曲率: 均值={quality['angle']['mean_curvature']:.6f}, 有效点={quality['angle']['valid_count']}\n\n")
                        total_valid_points += quality['angle']['valid_count']
                        if quality['angle']['mean_curvature'] > 0:
                            all_angle_means.append(quality['angle']['mean_curvature'])
                    f.write("\n")
                f.write(f"## 总体统计\n")
                f.write(f"- 处理图像数: {len(all_results)}\n")
                f.write(f"- 总轮廓数: {total_contours}\n")
                f.write(f"- 总有效点数: {total_valid_points}\n")
                if all_angle_means:
                    f.write(f"- 角度方法平均曲率: {np.mean(all_angle_means):.6f} ± {np.std(all_angle_means):.6f} 1/μm\n")
            logging.info(f"Summary report saved to: {output_path}")
        except Exception as e:
            logging.error(f"Failed to save summary report: {e}")

# ---------- 主控制类 ----------
class CurvatureAnalyzer:
    SUPPORTED_FORMATS = ('.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif')

    def __init__(self, input_folder, output_folder=None, config=None):
        self.input_folder = input_folder
        if output_folder is None:
            output_folder = os.path.join(input_folder, "curvature_results")
        self.output_folder = output_folder
        self.config = config or AnalysisConfig()

        self.image_processor = ImageProcessor(self.config)
        self.contour_analyzer = ContourAnalyzer(self.config)
        self.curvature_calculator = CurvatureCalculator(self.config)
        self.quality_assessor = QualityAssessor(self.config)
        self.visualizer = CurvatureVisualizer(self.config)
        self.data_exporter = DataExporter(self.config)

        create_output_folder(self.output_folder)
        setup_logging(self.output_folder)
        validate_config(self.config)

    def process_single_image(self, image_path):
        print(f"Processing: {os.path.basename(image_path)}")
        logging.info(f"Processing image: {image_path}")

        skeleton = self.image_processor.preprocess_image(image_path)
        if skeleton is None:
            print(f"Failed to load or preprocess image: {os.path.basename(image_path)}")
            return []

        contours = self.contour_analyzer.extract_contours(skeleton)
        if not contours:
            print(f"No valid contours found in {os.path.basename(image_path)}")
            return []

        all_contours_data = []
        for i, contour in enumerate(contours):
            points = contour.reshape(-1, 2).astype(np.float64)
            if len(points) < 15:
                continue

            resampled = self.contour_analyzer.resample_contour(points, len(points))
            smooth = self.contour_analyzer.smooth_contour_spline(resampled)

            angle_curv = self.curvature_calculator.calculate_curvature_angle_based(smooth)
            angle_quality = self.quality_assessor.assess_curvature_quality(angle_curv, smooth)

            contour_data = {
                'points': smooth,
                'angle_curvature': angle_curv,
                'quality_metrics': {'angle': angle_quality}
            }
            all_contours_data.append(contour_data)

            self.quality_assessor.print_contour_stats(i, smooth, angle_curv, angle_quality)

        return all_contours_data

    def process_all_images(self):
        image_files = [os.path.join(self.input_folder, f) for f in os.listdir(self.input_folder)
                       if f.lower().endswith(self.SUPPORTED_FORMATS)]

        if not image_files:
            print("No image files found in input folder.")
            logging.warning("No image files found in input folder.")
            return

        print(f"Found {len(image_files)} image files.")
        print(f"Pixel size: {self.config.pixel_size_um} μm/pixel")
        print(f"Endpoint exclude ratio: {self.config.endpoint_exclude_ratio}")

        logging.info(f"Starting batch processing ({len(image_files)} images).")
        logging.info(f"Config: {self.config}")

        all_results = {}

        for img_path in image_files:
            try:
                contours_data = self.process_single_image(img_path)
                if contours_data:
                    base_name = os.path.splitext(os.path.basename(img_path))[0]
                    viz_path = os.path.join(self.output_folder, f"{base_name}_curvature_analysis.png")
                    csv_path = os.path.join(self.output_folder, f"{base_name}_curvature_data.csv")

                    self.visualizer.visualize_all_contours(contours_data, viz_path)
                    self.data_exporter.save_csv_data(contours_data, csv_path)

                    all_results[base_name] = contours_data
                else:
                    logging.warning(f"No valid contours in {img_path}")
            except Exception as e:
                logging.error(f"Error processing {img_path}: {e}", exc_info=True)
                print(f"Error processing {img_path}: {e}")

        if all_results:
            summary_path = os.path.join(self.output_folder, "summary_report.md")
            self.data_exporter.save_summary_report(all_results, summary_path)

        print(f"\nProcessing completed. Results saved in: {self.output_folder}")
        logging.info("Batch processing finished.")

# ---------- 主程序入口 ----------
def main():
    input_folder = r"C:\Users\chenb\Desktop\input_images"
    # 输出目录放在input_images目录内
    output_folder = os.path.join(input_folder, "curvature_results")

    config = AnalysisConfig(
        pixel_size_um=1.0,
        min_contour_length=50,
        min_perimeter=50.0,
        smoothing_factor_ratio=0.1,
        max_reasonable_curvature_factor=10.0,
        endpoint_exclude_ratio=0.02,
        circle_window_size=7,
        angle_window_size=5
    )

    try:
        analyzer = CurvatureAnalyzer(input_folder, output_folder, config)
        analyzer.process_all_images()
    except Exception as e:
        print(f"程序运行出错: {e}")
        logging.error(f"Fatal error: {e}", exc_info=True)
        sys.exit(1)

if __name__ == "__main__":
    main()


2025-11-22 00:03:43,144 - INFO - Starting batch processing (1 images).
2025-11-22 00:03:43,146 - INFO - Config: AnalysisConfig(pixel_size_um=1.0, min_contour_length=50, min_perimeter=50.0, smoothing_factor_ratio=0.1, max_reasonable_curvature_factor=10.0, endpoint_exclude_ratio=0.02, circle_window_size=7, angle_window_size=5)
2025-11-22 00:03:43,148 - INFO - Processing image: C:\Users\chenb\Desktop\input_images\2108854-2-masson-4_02_Hematoxylin_Channel (3)_processed.png


Found 1 image files.
Pixel size: 1.0 μm/pixel
Endpoint exclude ratio: 0.02
Processing: 2108854-2-masson-4_02_Hematoxylin_Channel (3)_processed.png
  Contour 0: 176 points
    Angle:  0.255761 1/μm (valid: 173/176) ✓
  Contour 1: 120 points
    Angle:  0.269034 1/μm (valid: 120/120) ✓
  Contour 2: 228 points
    Angle:  0.260109 1/μm (valid: 224/228) ✓
  Contour 3: 102 points
    Angle:  0.311517 1/μm (valid: 99/102) ✓
  Contour 4: 156 points
    Angle:  0.254960 1/μm (valid: 154/156) ✓
  Contour 5: 132 points
    Angle:  0.154667 1/μm (valid: 132/132) ✓
  Contour 6: 164 points
    Angle:  0.253897 1/μm (valid: 162/164) ✓
  Contour 7: 156 points
    Angle:  0.181326 1/μm (valid: 153/156) ✓
  Contour 8: 128 points
    Angle:  0.359844 1/μm (valid: 128/128) ✓
  Contour 9: 112 points
    Angle:  0.261182 1/μm (valid: 112/112) ✓
  Contour 10: 204 points
    Angle:  0.222845 1/μm (valid: 200/204) ✓
  Contour 11: 762 points
    Angle:  0.103434 1/μm (valid: 673/762) ✓
  Contour 12: 154 points

  plt.tight_layout()
2025-11-22 00:04:19,137 - INFO - Visualization saved to: C:\Users\chenb\Desktop\input_images\curvature_results\2108854-2-masson-4_02_Hematoxylin_Channel (3)_processed_curvature_analysis.png
2025-11-22 00:04:20,222 - INFO - CSV data saved to: C:\Users\chenb\Desktop\input_images\curvature_results\2108854-2-masson-4_02_Hematoxylin_Channel (3)_processed_curvature_data.csv
2025-11-22 00:04:20,229 - INFO - Summary report saved to: C:\Users\chenb\Desktop\input_images\curvature_results\summary_report.md
2025-11-22 00:04:20,230 - INFO - Batch processing finished.



Processing completed. Results saved in: C:\Users\chenb\Desktop\input_images\curvature_results
