# SVS 图像切片工具 - 基于注释区域

这个笔记本用于处理 SVS 格式的大型图像，只切割并保存包含注释区域的图像块。

## 功能特点
- 批量处理指定目录下的所有 SVS 文件
- 自动查找对应的注释文件 (位于 `{sample_name}_kfb/Annotations/1.json`)
- 只切割并保存与注释区域有重叠的图像块
- 使用多线程加速处理

In [None]:
# 导入必要的库
import os
import json
import cv2
import numpy as np
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
from IPython.display import display, HTML

# 设置 OpenCV 的最大像素限制，以处理大型图像
os.environ["OPENCV_IO_MAX_IMAGE_PIXELS"] = pow(2, 50).__str__()

## 定义工具函数

In [None]:
def find_svs_files(input_dir):
    """查找指定目录下所有的SVS文件"""
    Info_list = []
    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith(".svs"):
                svs_file_path = os.path.join(root, file)
                print(f"找到SVS文件: {svs_file_path}")
                svs_path, svs_name = os.path.split(svs_file_path)
                sample_name = os.path.splitext(svs_name)[0]
                
                # 构建注释文件路径
                annotation_dir = os.path.join(os.path.dirname(svs_file_path), f"{sample_name}_kfb", "Annotations")
                annotation_file = os.path.join(annotation_dir, "1.json")
                
                if os.path.exists(annotation_file):
                    file_dict = {
                        'name': sample_name, 
                        'svs_path': svs_file_path,
                        'annotation_path': annotation_file
                    }
                    Info_list.append(file_dict)
                else:
                    print(f"警告: 未找到对应的注释文件 {annotation_file}")
    
    return Info_list

def load_annotations(annotation_file):
    """加载注释文件"""
    try:
        with open(annotation_file, 'r', encoding='utf-8') as f:
            annotations = json.load(f)
        
        regions = []
        for anno in annotations:
            if 'region' in anno:
                regions.append({
                    'x': int(anno['region']['x']),
                    'y': int(anno['region']['y']),
                    'width': int(anno['region']['width']),
                    'height': int(anno['region']['height']),
                    'name': anno['name']
                })
        
        return regions
    except Exception as e:
        print(f"加载注释文件时出错: {e}")
        return []

def is_patch_overlapping_annotations(left, top, right, bottom, annotations):
    """检查图像块是否与任何注释区域重叠"""
    for anno in annotations:
        anno_left = anno['x']
        anno_top = anno['y']
        anno_right = anno['x'] + anno['width']
        anno_bottom = anno['y'] + anno['height']
        
        # 检查是否有重叠
        if not (right <= anno_left or left >= anno_right or bottom <= anno_top or top >= anno_bottom):
            return True, anno['name']
    
    return False, None

In [None]:
def process_chunk(slide, output_folder, tile_size, overlap_rate, chunk_row, chunk_col, 
                 chunk_rows, chunk_cols, annotations, sample_name):
    """处理一个区块的图像切片"""
    height, width = slide.shape[:2]
    overlap_pixels = int(tile_size * overlap_rate)
    saved_patches = 0
    
    for row in range(chunk_row, chunk_row + chunk_rows):
        for col in range(chunk_col, chunk_col + chunk_cols):
            left = col * (tile_size - overlap_pixels)
            top = row * (tile_size - overlap_pixels)
            right = min(left + tile_size, width)
            bottom = min(top + tile_size, height)
            
            # 检查这个patch是否与任何注释重叠
            has_overlap, anno_name = is_patch_overlapping_annotations(left, top, right, bottom, annotations)
            
            if has_overlap:
                patch = slide[top:bottom, left:right]
                # 使用样本名称和注释名称作为文件名的一部分
                output_filename = f'{sample_name}_{anno_name}_tile_{row}_{col}.png'
                output_path = os.path.join(output_folder, output_filename)
                cv2.imwrite(output_path, patch)
                saved_patches += 1
    
    return saved_patches

def split_large_image_with_annotations(input_image_path, annotation_file, output_folder, 
                                      tile_size, overlap_rate, sample_name, chunk_size=50):
    """根据注释信息切割大图像"""
    # 加载注释
    annotations = load_annotations(annotation_file)
    if not annotations:
        print(f"警告: 没有找到有效的注释信息 {annotation_file}")
        return 0
    
    print(f"加载了 {len(annotations)} 个注释区域")
    
    # 显示注释信息
    display(HTML("<h4>注释区域信息:</h4>"))
    for i, anno in enumerate(annotations):
        print(f"注释 {i+1}: {anno['name']} - 位置: ({anno['x']}, {anno['y']}), 大小: {anno['width']}x{anno['height']}")
    
    # 加载图像
    try:
        slide = cv2.imread(input_image_path, cv2.IMREAD_UNCHANGED | cv2.IMREAD_LOAD_GDAL)
        if slide is None:
            print(f"错误: 无法加载图像 {input_image_path}")
            return 0
            
        height, width = slide.shape[:2]
        print(f"图像尺寸: {width}x{height}")
    except Exception as e:
        print(f"加载图像时出错 {input_image_path}: {e}")
        return 0
    
    overlap_pixels = int(tile_size * overlap_rate)
    rows = (height - overlap_pixels) // (tile_size - overlap_pixels) + 1
    cols = (width - overlap_pixels) // (tile_size - overlap_pixels) + 1
    
    os.makedirs(output_folder, exist_ok=True)
    
    print(f"开始处理图像: {sample_name}")
    print(f"将分割成 {rows}x{cols} 个块，但只保存包含注释的块")
    
    total_saved_patches = 0
    
    with ThreadPoolExecutor() as executor:
        futures = []
        for chunk_row in range(0, rows, chunk_size):
            for chunk_col in range(0, cols, chunk_size):
                chunk_rows = min(chunk_size, rows - chunk_row)
                chunk_cols = min(chunk_size, cols - chunk_col)
                futures.append(executor.submit(
                    process_chunk, 
                    slide, output_folder, tile_size, overlap_rate, 
                    chunk_row, chunk_col, chunk_rows, chunk_cols,
                    annotations, sample_name
                ))
        
        total_chunks = len(futures)
        with tqdm(total=total_chunks, desc=f"处理 {sample_name}") as pbar:
            for future in futures:
                saved_patches = future.result()
                total_saved_patches += saved_patches
                pbar.update(1)
    
    return total_saved_patches

## 配置参数

在下面的单元格中设置输入和输出目录，以及其他参数。

In [None]:
# 设置参数
input_dir = ""  # 请填写包含SVS文件的目录路径
output_dir = ""  # 请填写输出目录路径
tile_size = 1024  # 输出图像块的大小
overlap_rate = 0.05  # 图像块之间的重叠率

# 如果您希望直接处理单个SVS文件和注释文件，可以设置以下变量
# 若设置，将忽略input_dir参数
single_svs_file = ""  # 单个SVS文件的路径，留空则使用input_dir
single_annotation_file = ""  # 单个注释文件的路径，留空则自动查找

## 执行处理

运行以下单元格开始处理图像。

In [None]:
# 验证参数
if not input_dir and not single_svs_file:
    print("错误: 请设置输入目录或单个SVS文件路径")
elif not output_dir:
    print("错误: 请设置输出目录路径")
else:
    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)
    
    # 处理单个文件或批量处理
    if single_svs_file:
        Info_list = []
        svs_path, svs_name = os.path.split(single_svs_file)
        sample_name = os.path.splitext(svs_name)[0]
        
        if single_annotation_file:
            annotation_file = single_annotation_file
        else:
            # 自动查找注释文件
            annotation_dir = os.path.join(os.path.dirname(single_svs_file), f"{sample_name}_kfb", "Annotations")
            annotation_file = os.path.join(annotation_dir, "1.json")
        
        if os.path.exists(annotation_file):
            file_dict = {
                'name': sample_name, 
                'svs_path': single_svs_file,
                'annotation_path': annotation_file
            }
            Info_list.append(file_dict)
        else:
            print(f"错误: 未找到注释文件 {annotation_file}")
    else:
        # 查找所有SVS文件及其对应的注释
        Info_list = find_svs_files(input_dir)
    
    if not Info_list:
        print("未找到任何有效的SVS文件和对应的注释")
    else:
        print(f"找到 {len(Info_list)} 个SVS文件及其注释")
        
        # 处理每个SVS文件
        total_files = len(Info_list)
        total_patches_saved = 0
        
        for i, sample in enumerate(Info_list):
            print(f"\n处理文件 {i+1}/{total_files}: {sample['name']}")
            print(f"SVS文件: {sample['svs_path']}")
            print(f"注释文件: {sample['annotation_path']}")
            
            sample_output_folder = os.path.join(output_dir, sample['name'])
            os.makedirs(sample_output_folder, exist_ok=True)
            
            patches_saved = split_large_image_with_annotations(
                sample['svs_path'], 
                sample['annotation_path'], 
                sample_output_folder, 
                tile_size, 
                overlap_rate,
                sample['name']
            )
            
            total_patches_saved += patches_saved
            print(f"完成处理 {sample['name']}, 保存了 {patches_saved} 个图像块")
        
        print(f"\n所有处理完成! 总共保存了 {total_patches_saved} 个图像块")

## 可视化结果

以下单元格可以用来查看保存的图像块示例。

In [None]:
def show_sample_patches(output_dir, max_samples=5):
    """显示保存的图像块示例"""
    if not os.path.exists(output_dir):
        print(f"输出目录 {output_dir} 不存在")
        return
    
    # 查找所有保存的图像块
    all_patches = []
    for root, _, files in os.walk(output_dir):
        for file in files:
            if file.endswith(".png"):
                all_patches.append(os.path.join(root, file))
    
    if not all_patches:
        print("未找到任何保存的图像块")
        return
    
    # 随机选择一些示例
    import random
    samples = random.sample(all_patches, min(max_samples, len(all_patches)))
    
    # 显示示例
    fig, axes = plt.subplots(1, len(samples), figsize=(15, 5))
    if len(samples) == 1:
        axes = [axes]
    
    for i, sample_path in enumerate(samples):
        img = cv2.imread(sample_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        axes[i].imshow(img)
        axes[i].set_title(os.path.basename(sample_path))
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.show()

In [None]:
# 显示一些保存的图像块示例
if output_dir:
    show_sample_patches(output_dir, max_samples=5)

## 总结

这个笔记本实现了以下功能：

1. 加载SVS格式的大型图像
2. 读取对应的JSON格式注释文件
3. 只切割并保存包含注释区域的图像块
4. 使用多线程加速处理
5. 提供了可视化工具查看结果

这种方法可以大大减少存储空间和后续处理的工作量，因为只保存了包含感兴趣区域的图像块。