In [10]:
import os
import shutil
import random
import numpy as np
from pathlib import Path
import re

# 设置随机种子以确保结果可重现
random.seed(42)
np.random.seed(42)

# 设置根目录
dcalc_root = "D:/Dataset/New_CBDDSM/Dmass"

# 定义分类
classes = ["benign", "malignant"]

# 确保验证集目录存在
for cls in classes:
    val_img_dir = os.path.join(dcalc_root, "val", "images", cls)
    val_mask_dir = os.path.join(dcalc_root, "val", "masks", cls)
    
    os.makedirs(val_img_dir, exist_ok=True)
    os.makedirs(val_mask_dir, exist_ok=True)

# 用于记录移动的文件
moved_images = []
moved_masks = []
error_files = []

# 处理每个分类
for cls in classes:
    # 源目录
    train_img_dir = os.path.join(dcalc_root, "train", "images", cls)
    train_mask_dir = os.path.join(dcalc_root, "train", "masks", cls)
    
    # 目标目录
    val_img_dir = os.path.join(dcalc_root, "val", "images", cls)
    val_mask_dir = os.path.join(dcalc_root, "val", "masks", cls)
    
    # 获取图像文件列表
    if os.path.exists(train_img_dir):
        img_files = [f for f in os.listdir(train_img_dir) 
                    if os.path.isfile(os.path.join(train_img_dir, f)) and 
                    f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        # 随机选择20%的文件
        num_files_to_move = int(len(img_files) * 0.2)
        files_to_move = random.sample(img_files, num_files_to_move)
        
        print(f"分类 {cls}: 总共{len(img_files)}个文件, 将移动{num_files_to_move}个文件到验证集")
        
        # 获取掩码文件列表
        all_mask_files = []
        if os.path.exists(train_mask_dir):
            all_mask_files = [f for f in os.listdir(train_mask_dir) 
                            if os.path.isfile(os.path.join(train_mask_dir, f)) and 
                            f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        # 移动文件
        for img_file in files_to_move:
            try:
                # 移动图像文件
                src_img_path = os.path.join(train_img_dir, img_file)
                dst_img_path = os.path.join(val_img_dir, img_file)
                shutil.move(src_img_path, dst_img_path)
                moved_images.append((cls, img_file))
                
                # 查找对应的掩码文件
                img_base = os.path.splitext(img_file)[0]  # 不含扩展名
                base_parts = re.match(r'(.*?)(?:-\d+)?$', img_base)
                if base_parts:
                    base_name = base_parts.group(1)
                    
                    # 在所有掩码文件中查找匹配项
                    matching_masks = []
                    for mask_file in all_mask_files:
                        mask_base = os.path.splitext(mask_file)[0]
                        
                        # 特定命名模式: Calc-Training...1-1.png -> Calc-Training...1_1-1.png
                        if mask_base.startswith(f"{base_name}_1") or mask_base.startswith(base_name):
                            matching_masks.append(mask_file)
                    
                    # 移动找到的掩码文件
                    if matching_masks:
                        for mask_file in matching_masks:
                            src_mask_path = os.path.join(train_mask_dir, mask_file)
                            dst_mask_path = os.path.join(val_mask_dir, mask_file)
                            if os.path.exists(src_mask_path):
                                shutil.move(src_mask_path, dst_mask_path)
                                moved_masks.append((cls, mask_file))
                                # 从可用掩码文件列表中移除，避免后续重复匹配
                                all_mask_files.remove(mask_file)
                        
                        print(f"移动了图像 {img_file} 及其对应的 {len(matching_masks)} 个掩码文件")
                    else:
                        print(f"警告: 未找到图像 {img_file} 对应的掩码文件")
                        error_files.append((cls, img_file, "未找到掩码文件"))
            
            except Exception as e:
                print(f"移动文件 {img_file} 时出错: {e}")
                error_files.append((cls, img_file, str(e)))
    else:
        print(f"警告: 目录不存在 {train_img_dir}")

# 统计结果
print("\n数据划分完成!")
print(f"成功移动了 {len(moved_images)} 个图像到验证集")
print(f"成功移动了 {len(moved_masks)} 个掩码文件到验证集")

if error_files:
    print(f"处理 {len(error_files)} 个文件时出现错误")

# 验证数据分布
def count_files(directory):
    if os.path.exists(directory):
        files = [f for f in os.listdir(directory) 
               if os.path.isfile(os.path.join(directory, f)) and 
               f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        return len(files)
    return 0

print("\n最终数据分布:")
print("-" * 50)

for split in ["train", "val"]:
    print(f"\n{split.upper()} 集:")
    for data_type in ["images", "masks"]:
        print(f"  {data_type}:")
        total = 0
        for cls in classes:
            dir_path = os.path.join(dcalc_root, split, data_type, cls)
            count = count_files(dir_path)
            total += count
            print(f"    - {cls}: {count} 文件")
        print(f"    小计: {total} 文件")

# 检查训练集和验证集是否有重叠文件
for cls in classes:
    train_files = set(os.listdir(os.path.join(dcalc_root, "train", "images", cls))) if os.path.exists(os.path.join(dcalc_root, "train", "images", cls)) else set()
    val_files = set(os.listdir(os.path.join(dcalc_root, "val", "images", cls))) if os.path.exists(os.path.join(dcalc_root, "val", "images", cls)) else set()
    
    overlap = train_files.intersection(val_files)
    if overlap:
        print(f"\n警告: 分类 {cls} 中有 {len(overlap)} 个文件同时存在于训练集和验证集")
    else:
        print(f"\n分类 {cls} 的训练集和验证集没有重叠文件")

分类 benign: 总共634个文件, 将移动126个文件到验证集
移动了图像 Mass-Training_P_00351_LEFT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00076_LEFT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00798_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00723_LEFT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00673_RIGHT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00430_LEFT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00332_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_01698_RIGHT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00298_LEFT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_01819_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_01273_RIGHT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00096_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00094_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00309_LEFT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00659_RIGHT_MLO.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_00703_LEFT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_01608_RIGHT_CC.png 及其对应的 1 个掩码文件
移动了图像 Mass-Training_P_01849_

# 图像掩码文件对应测试

In [13]:
import os
import re
import pandas as pd
from collections import defaultdict
import numpy as np
import shutil
from collections import Counter
import matplotlib as mpl
import matplotlib.pyplot as plt
# 设置matplotlib参数以支持中文显示
mpl.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial', 'sans-serif']  # 优先使用中文字体
mpl.rcParams['axes.unicode_minus'] = False  # 正确显示负号
import  warnings
warnings.filterwarnings("ignore", category=UserWarning, module='openpyxl')

# 设置根目录
dcalc_root = "D:/Dataset/New_CBDDSM/Dcalc"

# 定义分类
classes = ["benign", "malignant"]

# 用于存储结果的数据结构
results = {
    "matched": [],         # 匹配成功的图像-掩码对
    "multiple_masks": [],  # 有多个掩码的图像
    "no_mask": [],         # 没有掩码的图像
    "orphan_masks": []     # 没有对应图像的掩码
}

# 统计计数
statistics = {
    "total_images": 0,
    "total_masks": 0,
    "matched_images": 0,
    "multiple_mask_images": 0,
    "no_mask_images": 0,
    "orphan_masks": 0
}

# 遍历所有数据集分割
for split in ["train", "val","test"]:
    split_stats = {"images": 0, "masks": 0, "matched": 0, "unmatched": 0}
    
    for cls in classes:
        img_dir = os.path.join(dcalc_root, split, "images", cls)
        mask_dir = os.path.join(dcalc_root, split, "masks", cls)
        
        # 检查目录是否存在
        if not os.path.exists(img_dir) or not os.path.exists(mask_dir):
            print(f"警告: 目录不存在: {img_dir} 或 {mask_dir}")
            continue
        
        # 获取文件列表
        img_files = [f for f in os.listdir(img_dir) 
                    if os.path.isfile(os.path.join(img_dir, f)) and 
                    f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        mask_files = [f for f in os.listdir(mask_dir) 
                     if os.path.isfile(os.path.join(mask_dir, f)) and 
                     f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        
        # 更新统计
        statistics["total_images"] += len(img_files)
        statistics["total_masks"] += len(mask_files)
        split_stats["images"] += len(img_files)
        split_stats["masks"] += len(mask_files)
        
        # 创建掩码文件的集合，用于快速查找
        all_mask_files_set = set(mask_files)
        used_mask_files = set()
        
        # 为每个图像查找对应的掩码
        for img_file in img_files:
            img_base = os.path.splitext(img_file)[0]  # 不含扩展名
            
            # 获取基本名，例如: Calc-Training_P_00001_RIGHT_CC_1 (不含-1等后缀)
            base_parts = re.match(r'(.*?)(?:-\d+)?$', img_base)
            if base_parts:
                base_name = base_parts.group(1)
                
                # 查找此图像的所有掩码
                matching_masks = []
                
                # 1. 检查特定命名模式: base_name_1 或 base_name_1-数字
                for mask_file in mask_files:
                    mask_base = os.path.splitext(mask_file)[0]
                    
                    # 特定命名模式匹配
                    if mask_base.startswith(f"{base_name}_1") or mask_base.startswith(base_name):
                        matching_masks.append(mask_file)
                        used_mask_files.add(mask_file)
                
                # 记录匹配结果
                if matching_masks:
                    if len(matching_masks) == 1:
                        results["matched"].append({
                            "split": split,
                            "class": cls,
                            "image": img_file,
                            "mask": matching_masks[0]
                        })
                        statistics["matched_images"] += 1
                        split_stats["matched"] += 1
                    else:
                        results["multiple_masks"].append({
                            "split": split,
                            "class": cls,
                            "image": img_file,
                            "masks": matching_masks,
                            "count": len(matching_masks)
                        })
                        statistics["multiple_mask_images"] += 1
                        split_stats["matched"] += 1
                else:
                    results["no_mask"].append({
                        "split": split,
                        "class": cls,
                        "image": img_file
                    })
                    statistics["no_mask_images"] += 1
                    split_stats["unmatched"] += 1
        
        # 检查没有对应图像的掩码
        orphan_masks = all_mask_files_set - used_mask_files
        for mask_file in orphan_masks:
            results["orphan_masks"].append({
                "split": split,
                "class": cls,
                "mask": mask_file
            })
            statistics["orphan_masks"] += 1
    
    # 打印此分割的统计
    print(f"\n{split.upper()} 集统计:")
    print(f"  图像总数: {split_stats['images']}")
    print(f"  掩码总数: {split_stats['masks']}")
    print(f"  匹配成功的图像: {split_stats['matched']} ({split_stats['matched']/split_stats['images']*100:.1f}%)")
    print(f"  未匹配到掩码的图像: {split_stats['unmatched']} ({split_stats['unmatched']/split_stats['images']*100:.1f}%)")

# 打印总体统计
print("\n" + "="*50)
print("总体统计:")
print(f"  总图像数: {statistics['total_images']}")
print(f"  总掩码数: {statistics['total_masks']}")
print(f"  匹配成功的图像: {statistics['matched_images']} ({statistics['matched_images']/statistics['total_images']*100:.1f}%)")
print(f"  有多个掩码的图像: {statistics['multiple_mask_images']} ({statistics['multiple_mask_images']/statistics['total_images']*100:.1f}%)")
print(f"  没有掩码的图像: {statistics['no_mask_images']} ({statistics['no_mask_images']/statistics['total_images']*100:.1f}%)")
print(f"  没有对应图像的掩码: {statistics['orphan_masks']} ({statistics['orphan_masks']/statistics['total_masks']*100:.1f}%)")

# 详细结果展示
if results["no_mask"]:
    print("\n" + "-"*50)
    print(f"未找到掩码的图像 (显示前10个):")
    for i, item in enumerate(results["no_mask"][:10]):
        print(f"  {i+1}. {item['split']}/{item['class']}/{item['image']}")
    if len(results["no_mask"]) > 10:
        print(f"  ... 还有 {len(results['no_mask']) - 10} 个")

if results["multiple_masks"]:
    print("\n" + "-"*50)
    print(f"有多个掩码的图像 (显示前10个):")
    for i, item in enumerate(results["multiple_masks"][:10]):
        print(f"  {i+1}. {item['split']}/{item['class']}/{item['image']} -> {item['count']}个掩码")
        print(f"     掩码: {', '.join(item['masks'][:3])}" + ("..." if len(item['masks']) > 3 else ""))
    if len(results["multiple_masks"]) > 10:
        print(f"  ... 还有 {len(results['multiple_masks']) - 10} 个")

if results["orphan_masks"]:
    print("\n" + "-"*50)
    print(f"没有对应图像的掩码 (显示前10个):")
    for i, item in enumerate(results["orphan_masks"][:10]):
        print(f"  {i+1}. {item['split']}/{item['class']}/{item['mask']}")
    if len(results["orphan_masks"]) > 10:
        print(f"  ... 还有 {len(results['orphan_masks']) - 10} 个")

# 保存详细结果到Excel
print("\n" + "="*50)
print("正在保存详细结果到Excel文件...")

# 创建pandas DataFrame并保存
dfs = {}

if results["matched"]:
    dfs["匹配成功"] = pd.DataFrame(results["matched"])

if results["multiple_masks"]:
    # 展开multiple_masks中的masks列表
    expanded_data = []
    for item in results["multiple_masks"]:
        for mask in item["masks"]:
            expanded_data.append({
                "split": item["split"],
                "class": item["class"],
                "image": item["image"],
                "mask": mask,
                "total_masks": item["count"]
            })
    dfs["多个掩码"] = pd.DataFrame(expanded_data)

if results["no_mask"]:
    dfs["无掩码图像"] = pd.DataFrame(results["no_mask"])

if results["orphan_masks"]:
    dfs["无图像掩码"] = pd.DataFrame(results["orphan_masks"])

# 将所有DataFrame保存到一个Excel文件的不同sheet中
excel_path = os.path.join(dcalc_root, "image_mask_matching_report.xlsx")
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
    for sheet_name, df in dfs.items():
        df.to_excel(writer, sheet_name=sheet_name, index=False)

print(f"详细报告已保存到: {excel_path}")

# 绘制图表展示匹配情况
plt.figure(figsize=(10, 6))
labels = ['匹配成功(单掩码)', '匹配成功(多掩码)', '未找到掩码', '无对应图像的掩码']
sizes = [
    statistics["matched_images"] - statistics["multiple_mask_images"], 
    statistics["multiple_mask_images"], 
    statistics["no_mask_images"],
    statistics["orphan_masks"]
]
colors = ['#66b3ff', '#99ff99', '#ff9999', '#ffcc99']
explode = (0.1, 0.1, 0, 0)  # 突出显示前两个扇形

plt.pie(sizes, explode=explode, labels=labels, colors=colors, autopct='%1.1f%%',
        shadow=True, startangle=90)
plt.axis('equal')  # 确保饼图是圆的
plt.title('Dcalc数据集图像-掩码匹配情况')
plt.tight_layout()
plt.savefig(os.path.join(dcalc_root, 'matching_statistics.png'))
plt.close()

print(f"图表已保存到: {os.path.join(dcalc_root, 'matching_statistics.png')}")


TRAIN 集统计:
  图像总数: 982
  掩码总数: 982
  匹配成功的图像: 982 (100.0%)
  未匹配到掩码的图像: 0 (0.0%)

VAL 集统计:
  图像总数: 245
  掩码总数: 245
  匹配成功的图像: 245 (100.0%)
  未匹配到掩码的图像: 0 (0.0%)

TEST 集统计:
  图像总数: 284
  掩码总数: 284
  匹配成功的图像: 284 (100.0%)
  未匹配到掩码的图像: 0 (0.0%)

总体统计:
  总图像数: 1511
  总掩码数: 1511
  匹配成功的图像: 1511 (100.0%)
  有多个掩码的图像: 0 (0.0%)
  没有掩码的图像: 0 (0.0%)
  没有对应图像的掩码: 0 (0.0%)

正在保存详细结果到Excel文件...
详细报告已保存到: D:/Dataset/New_CBDDSM/Dcalc\image_mask_matching_report.xlsx
图表已保存到: D:/Dataset/New_CBDDSM/Dcalc\matching_statistics.png
