# 天池地表建筑物识别 Baseline

这个 Notebook 实现了一个完整的训练和预测工作流。
它使用了 `segmentation-models-pytorch` 库来构建带有预训练编码器的强大模型，
使用混合精度训练 (Mixed Precision) 来提高效率，并根据 `test_sample_submit.csv`
文件指定的顺序来正确处理测试数据集，以生成符合要求的提交文件。

In [12]:
# === 1. 导入必要的库 ===
import numpy as np
import pandas as pd
import pathlib, sys, os, random, time, gc
import cv2
from tqdm.notebook import tqdm # 用于显示进度条
import matplotlib.pyplot as plt # 用于绘图

# %matplotlib inline

import warnings
warnings.filterwarnings('ignore') # 忽略不必要的警告信息

# 数据增强库
import albumentations as A
from albumentations.pytorch import ToTensorV2

# PyTorch 核心库
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as D
import torchvision
from torchvision import transforms as T
import torch.cuda.amp as amp # 用于混合精度训练
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau # 学习率调度器

# Segmentation Models PyTorch 库
import segmentation_models_pytorch as smp

# --- 新增：导入 KFold ---
from sklearn.model_selection import KFold

print(f"PyTorch 版本: {torch.__version__}")
print(f"Segmentation Models Pytorch 版本: {smp.__version__}")
# 打印 sklearn 版本（如果安装了）
try:
    import sklearn
    print(f"Scikit-learn 版本: {sklearn.__version__}")
except ImportError:
    print("警告: Scikit-learn (sklearn) 未安装。KFold 将无法使用。")
    print("请运行: pip install scikit-learn")

import csv

PyTorch 版本: 2.1.0
Segmentation Models Pytorch 版本: 0.4.0
Scikit-learn 版本: 1.6.1


In [13]:
# === 2. 配置参数 ===

# --- 文件与目录路径 ---
TRAIN_IMAGE_DIR = '数据集/train/' # 训练图片目录
TRAIN_MASK_CSV = '数据集/train_mask.csv' # 训练集 RLE 编码文件
TEST_IMAGE_DIR = '数据集/test/' # 测试图片目录
SAMPLE_SUBMISSION_CSV = '数据集/test_sample_submit.csv' # 提交样例文件 (用于保证测试集顺序!)
OUTPUT_DIR = 'output/' # 输出目录 (用于保存模型、日志、提交文件等)
# BEST_MODEL_NAME 不再是单个文件，而是每个 fold 一个
SUBMISSION_FILENAME = 'submission_kfold.csv' # 最终提交文件名 (区分开)

# --- 训练超参数 ---
SEED = 42 # 随机种子，保证结果可复现
EPOCHS = 15 # **每折**训练的轮数 (根据需要调整)
BATCH_SIZE = 16 # 每批处理的图片数量 (根据 GPU 显存大小调整)
IMAGE_SIZE = 512 # 输入模型的图片尺寸 (训练和预测时都使用)
LEARNING_RATE = 1e-4 # 初始学习率
WEIGHT_DECAY = 1e-4 # 优化器的权重衰减
THRESHOLD = 0.5 # 将模型输出概率转换为二值掩码 (0或1) 的阈值
ACCUMULATION_STEPS = 8 # 梯度累积步数 (实际生效的批大小 = BATCH_SIZE * ACCUMULATION_STEPS)
N_FOLDS = 5 # --- 新增：K 折交叉验证的折数 ---

# --- 模型配置 ---
# 可选的模型架构和编码器请参考: https://github.com/qubvel/segmentation_models.pytorch
MODEL_ARC = 'Unet' # 例如: Unet, UnetPlusPlus, FPN, DeepLabV3Plus
ENCODER = 'efficientnet-b3' # 例如: efficientnet-b3, resnet34, mobilenet_v2
ENCODER_WEIGHTS = 'imagenet' # 使用在 ImageNet 上预训练的权重

# --- 硬件设置 ---
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' # 自动选择 GPU 或 CPU

# --- 确保输出目录存在 ---
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"配置参数已加载:")
print(f"  输出目录: {OUTPUT_DIR}")
print(f"  K-Fold 折数: {N_FOLDS}")
print(f"  每折训练轮数: {EPOCHS}, 批大小: {BATCH_SIZE}, 图片尺寸: {IMAGE_SIZE}")
print(f"  模型架构: {MODEL_ARC}, 编码器: {ENCODER}")
print(f"  运行设备: {DEVICE}")

配置参数已加载:
  输出目录: output/
  K-Fold 折数: 5
  每折训练轮数: 15, 批大小: 16, 图片尺寸: 512
  模型架构: Unet, 编码器: efficientnet-b3
  运行设备: cuda


In [14]:
# === 3. 辅助函数 ===

# --- 设置随机种子 ---
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed) # 如果使用多块 GPU
        # 以下两行可能提高性能，但可能导致结果不完全可复现
        # torch.backends.cudnn.deterministic = False
        # torch.backends.cudnn.benchmark = True
        # 为了更好的可复现性 (可能会牺牲一点性能):
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

seed_everything(SEED)
print(f"已设置随机种子: {SEED}")

# --- RLE 编码/解码函数 (来自题目描述) ---
def rle_encode(im):
    '''将二值掩码图片编码为 RLE 字符串'''
    pixels = im.flatten(order = 'F')
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def rle_decode(mask_rle, shape=(512, 512)):
    '''将 RLE 字符串解码为二值掩码图片'''
    if mask_rle == '' or pd.isna(mask_rle): # 处理空 RLE 或 NaN 值
        return np.zeros(shape, dtype=np.uint8)
    s = mask_rle.split()
    starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
    starts -= 1
    ends = starts + lengths
    img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
    for lo, hi in zip(starts, ends):
        img[lo:hi] = 1
    return img.reshape(shape, order='F')

# --- 清理 GPU 显存 ---
def clear_memory():
    if DEVICE == 'cuda':
        torch.cuda.empty_cache() # 释放未被使用的缓存显存
    gc.collect() # 执行 Python 的垃圾回收

已设置随机种子: 42


In [15]:
# === 4. 数据增强策略 ===

# --- 训练集数据增强 ---
# 包含一些常用的几何和颜色变换
train_transform = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE), # 调整图片大小
    A.HorizontalFlip(p=0.5), # 水平翻转
    A.VerticalFlip(p=0.5), # 垂直翻转
    A.RandomRotate90(p=0.5), # 随机旋转90度
    # 轻微的仿射变换 (位移、缩放、旋转)
    A.ShiftScaleRotate(p=0.5, shift_limit=0.0625, scale_limit=0.1, rotate_limit=15, border_mode=cv2.BORDER_CONSTANT, value=0),
    # 可选的颜色变换 (如果需要可以取消注释)
    # A.RandomBrightnessContrast(p=0.3), # 随机调整亮度和对比度
    # A.HueSaturationValue(p=0.3), # 随机调整色调、饱和度、明度
    # 归一化 (使用 ImageNet 的均值和标准差)
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(), # 转换为 PyTorch Tensor
])

# --- 验证集/测试集数据增强 ---
# 通常只包含 Resize 和 Normalize
valid_transform = A.Compose([
    A.Resize(IMAGE_SIZE, IMAGE_SIZE),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

print("数据增强流程已定义。")

数据增强流程已定义。


In [16]:
# === 5. Dataset 类定义 ===
# 已修改，使其在处理测试集时能返回图片和对应的文件名

class BuildingSegmentationDataset(D.Dataset):
    def __init__(self, img_paths, mask_info=None, transform=None, is_test=False):
        """
        初始化 Dataset 对象。
        Args:
            img_paths (list): 图片文件的完整路径列表。
            mask_info (pd.DataFrame or dict, optional): 包含掩码信息的对象。
                训练/验证时: 可以是包含 'name' 和 'mask' 列的 DataFrame，或 {文件名: RLE字符串} 的字典。
                测试时: 应为 None。
            transform (callable, optional): Albumentations 定义的数据增强/转换流程。
            is_test (bool): 如果是 True，表示这是测试数据集。
        """
        self.img_paths = img_paths
        self.mask_info = mask_info
        self.transform = transform
        self.is_test = is_test

        # 如果不是测试集，并且提供了 mask_info，则创建一个快速查找 RLE 的字典
        if not self.is_test and self.mask_info is not None:
             if isinstance(self.mask_info, pd.DataFrame):
                 # 确保 'name' 列存在，并将其设为索引以加速查找
                 if 'name' in self.mask_info.columns:
                     # 使用 DataFrame 的 set_index 和 to_dict 方法创建查找字典
                     self.mask_lookup = self.mask_info.set_index('name')['mask'].to_dict()
                 else:
                     print("警告: 在 mask_info DataFrame 中未找到 'name' 列。")
                     self.mask_lookup = {}
             elif isinstance(self.mask_info, dict):
                 # 如果 mask_info 本身就是字典，直接使用
                 self.mask_lookup = self.mask_info
             else:
                 # 处理未预期的 mask_info 类型
                 print(f"警告: mask_info 的类型未预期: {type(self.mask_info)}")
                 self.mask_lookup = {}
        else:
             self.mask_lookup = {} # 测试集不需要 RLE 查找

    def __len__(self):
        # 返回数据集中样本的总数
        return len(self.img_paths)

    def __getitem__(self, idx):
        # 根据索引获取单个样本
        img_path = self.img_paths[idx]
        filename = os.path.basename(img_path) # 从完整路径中提取文件名

        # 加载图片
        try:
            img = cv2.imread(img_path)
            if img is None: # 检查图片是否成功加载
                raise IOError(f"无法加载图片: {img_path}")
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # OpenCV 默认是 BGR，转换为 RGB
        except Exception as e:
            print(f"加载图片 {img_path} 时出错: {e}")
            # 如果加载失败，创建一个黑色图片作为替代，以避免程序崩溃
            img = np.zeros((IMAGE_SIZE, IMAGE_SIZE, 3), dtype=np.uint8) # 使用配置的尺寸
            if self.is_test:
                 # 对黑色图片应用变换 (主要是 Resize, Normalize, ToTensor)
                if self.transform:
                    transformed = self.transform(image=img)
                    img = transformed['image']
                return img, filename # 返回处理后的黑色图片和文件名
            else:
                 # 训练/验证时，还需要返回一个空的掩码
                 mask = np.zeros((IMAGE_SIZE, IMAGE_SIZE), dtype=np.uint8)
                 if self.transform:
                     transformed = self.transform(image=img, mask=mask)
                     img = transformed['image']
                     # 确保掩码是 [1, H, W] 的 float Tensor
                     mask = transformed['mask'].unsqueeze(0).float()
                 return img, mask

        # --- 处理逻辑分支 ---
        if self.is_test:
            # 如果是测试集，只对图片应用变换
            if self.transform:
                transformed = self.transform(image=img)
                img = transformed['image']
            # 返回处理后的图片和文件名
            return img, filename
        else:
            # 如果是训练集或验证集，需要加载并处理掩码
            mask_rle = self.mask_lookup.get(filename, '') # 使用文件名查找对应的 RLE 字符串
            mask = rle_decode(mask_rle, shape=(512, 512)) # 解码 RLE，注意使用原始尺寸 (512x512)

            # 对图片和掩码同时应用变换
            if self.transform:
                transformed = self.transform(image=img, mask=mask)
                img = transformed['image']
                mask = transformed['mask'] # 变换后的掩码

            # 将掩码转换为 PyTorch Tensor，确保是 float 类型，并增加一个通道维度
            if isinstance(mask, np.ndarray):
                mask = torch.from_numpy(mask) # 从 NumPy 数组转换为 Tensor

            # 增加通道维度 [H, W] -> [1, H, W]，并确保是 float 类型
            mask = mask.unsqueeze(0).float()

            # 返回处理后的图片和掩码
            return img, mask

print("Dataset 类已定义。")

Dataset 类已定义。


In [17]:
# === 6. 损失函数定义 ===

# --- Dice Loss ---
# 用于衡量预测掩码和真实掩码之间的重叠程度
class DiceLoss(nn.Module):
    def __init__(self, smooth=1.0): # smooth 参数用于防止分母为零
        super(DiceLoss, self).__init__()
        self.smooth = smooth

    def forward(self, pred_logits, target):
        # pred_logits 是模型的原始输出 (通常未经 sigmoid)
        pred_prob = torch.sigmoid(pred_logits) # 将 logits 转换为概率 (0到1之间)
        # 计算交集 (预测为1且真实为1的像素)
        intersection = (pred_prob * target).sum(dim=(2,3)) # 在高度和宽度维度上求和
        # 计算并集 (预测为1的像素 + 真实为1的像素)
        union = pred_prob.sum(dim=(2,3)) + target.sum(dim=(2,3))
        # 计算 Dice 系数
        dice = (2.0 * intersection + self.smooth) / (union + self.smooth)
        # Dice Loss = 1 - Dice 系数 (目标是最小化 Loss，即最大化 Dice 系数)
        return 1.0 - dice.mean() # 返回整个 batch 的平均 Dice Loss

# --- Focal Loss ---
# 用于处理类别不平衡问题，降低易分类样本的权重，关注难分类样本
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, alpha=0.25): # gamma 控制调制因子，alpha 控制类别权重
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha # 正样本 (前景) 的权重
        # 使用 BCEWithLogitsLoss 可以提高数值稳定性，它内部处理了 sigmoid
        self.bce_with_logits = nn.BCEWithLogitsLoss(reduction='none') # 不进行自动平均

    def forward(self, pred_logits, target):
        # 计算标准的二元交叉熵损失 (每个像素单独计算)
        bce_loss = self.bce_with_logits(pred_logits, target)
        # 计算预测概率
        pred_prob = torch.sigmoid(pred_logits)
        # 计算 pt (预测正确的概率)
        # 如果 target 是 1, pt = pred_prob; 如果 target 是 0, pt = 1 - pred_prob
        pt = torch.where(target == 1, pred_prob, 1 - pred_prob)
        # 计算 alpha 权重
        # 如果 target 是 1, alpha_t = alpha; 如果 target 是 0, alpha_t = 1 - alpha
        alpha_t = torch.where(target == 1, self.alpha, 1 - self.alpha)

        # 计算 Focal Loss 的调制因子 (1 - pt)^gamma
        focal_weight = alpha_t * (1 - pt).pow(self.gamma)
        # 计算最终的 Focal Loss
        focal_loss = focal_weight * bce_loss
        # 返回整个 batch 和所有像素的平均 Focal Loss
        return focal_loss.mean()

# --- Combined Loss (Dice + Focal) ---
# 结合 Dice Loss 和 Focal Loss 的优点
class CombinedLoss(nn.Module):
    def __init__(self, dice_weight=0.5, focal_weight=0.5, focal_gamma=2.0, focal_alpha=0.25):
        super(CombinedLoss, self).__init__()
        self.dice_weight = dice_weight # Dice Loss 的权重
        self.focal_weight = focal_weight # Focal Loss 的权重
        self.dice_loss = DiceLoss()
        self.focal_loss = FocalLoss(gamma=focal_gamma, alpha=focal_alpha)

    def forward(self, pred_logits, target):
        # 分别计算 Dice Loss 和 Focal Loss
        dice = self.dice_loss(pred_logits, target)
        focal = self.focal_loss(pred_logits, target)
        # 按权重组合两种损失
        return self.dice_weight * dice + self.focal_weight * focal

print("损失函数已定义 (Dice, Focal, Combined)。")

损失函数已定义 (Dice, Focal, Combined)。


In [18]:
# === 7. Early Stopping (早停机制) ===
# 用于在验证集性能不再提升时自动停止训练，防止过拟合

class EarlyStopping:
    """当验证集指标在一定轮数内不再改善时，提前停止训练。"""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print, mode='min'):
        """
        Args:
            patience (int): 验证指标没有改善后，允许继续训练的轮数。
            verbose (bool): 如果为 True，则打印每次指标改善的信息。
            delta (float): 被认为是指标改善的最小变化量。
            path (str): 保存最佳模型权重的文件路径。
            trace_func (function): 用于打印日志信息的函数 (默认为 print)。
            mode (str): 'min' 表示监控的指标越小越好 (如 Loss)，'max' 表示越大越好 (如 Dice Score)。
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0 # 记录指标没有改善的轮数
        self.best_score = None # 记录迄今为止最好的指标分数
        self.early_stop = False # 标记是否应该停止训练
        self.val_metric_best = np.Inf if mode == 'min' else -np.Inf # 根据 mode 初始化最佳指标
        self.delta = delta # 改善的阈值
        self.path = path # 模型保存路径
        self.trace_func = trace_func
        self.mode = mode

    def __call__(self, val_metric, model):
        # 将当前轮次的验证指标传入
        score = val_metric

        if self.best_score is None:
            # 第一轮，直接记录分数并保存模型
            self.best_score = score
            self.save_checkpoint(val_metric, model)
        elif (self.mode == 'min' and score >= self.best_score - self.delta) or \
             (self.mode == 'max' and score <= self.best_score + self.delta):
            # 如果指标没有改善 (或改善小于 delta)
            self.counter += 1
            if self.verbose:
                self.trace_func(f'EarlyStopping 计数: {self.counter} / {self.patience}')
            if self.counter >= self.patience:
                # 达到容忍轮数，标记停止
                self.early_stop = True
        else:
            # 如果指标有改善
            self.best_score = score
            self.save_checkpoint(val_metric, model) # 保存更好的模型
            self.counter = 0 # 重置计数器

    def save_checkpoint(self, val_metric, model):
        '''当验证指标改善时，保存模型权重。'''
        if self.verbose:
             # 计算改善量
             improvement = self.val_metric_best - val_metric if self.mode == 'min' else val_metric - self.val_metric_best
             self.trace_func(f'验证指标改善 ({self.val_metric_best:.6f} --> {val_metric:.6f}, 改善量: {improvement:.6f}). 保存模型 ...')
        # 保存模型的状态字典
        torch.save(model.state_dict(), self.path)
        # 更新记录的最佳指标
        self.val_metric_best = val_metric

print("EarlyStopping 类已定义。")

EarlyStopping 类已定义。


In [19]:
# === 8. 定义训练和验证函数 ===

# --- 单轮训练函数 ---
def train_one_epoch(model, dataloader, optimizer, criterion, device, scaler, accumulation_steps):
    model.train() # 将模型设置为训练模式 (启用 Dropout 等)
    total_loss = 0.0 # 记录该轮的总损失
    optimizer.zero_grad() # 清空之前的梯度

    # 使用 tqdm 显示进度条
    progress_bar = tqdm(dataloader, desc="训练中", leave=False)
    for i, (images, masks) in enumerate(progress_bar):
        # 将数据移动到指定设备 (GPU 或 CPU)
        images = images.to(device)
        masks = masks.to(device) # 应为 [B, 1, H, W] 且 float 类型

        # 使用自动混合精度上下文管理器
        with amp.autocast():
            outputs = model(images) # 模型前向传播，得到 logits
            loss = criterion(outputs, masks) # 计算损失
            # 为了梯度累积，将损失除以累积步数
            loss = loss / accumulation_steps

        # 使用混合精度的 scaler 来缩放损失并进行反向传播
        scaler.scale(loss).backward()

        # 每 accumulation_steps 步或者在最后一个 batch 时，执行一次优化器步骤
        if (i + 1) % accumulation_steps == 0 or (i + 1) == len(dataloader):
            # 在梯度裁剪前，需要 unscale 梯度
            scaler.unscale_(optimizer)
            # 进行梯度裁剪，防止梯度爆炸 (可选但推荐)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            # 执行优化器步骤 (更新模型权重)
            scaler.step(optimizer)
            # 更新 scaler 的状态
            scaler.update()
            # 清空梯度，为下一次累积做准备
            optimizer.zero_grad()

        # 记录当前 batch 的损失 (乘以累积步数还原)
        batch_loss = loss.item() * accumulation_steps
        total_loss += batch_loss
        # 更新进度条显示当前 batch 的损失
        progress_bar.set_postfix(loss=f'{batch_loss:.4f}')

    # 计算该轮的平均损失 (按样本数计算)
    avg_loss = total_loss / len(dataloader.dataset)
    return avg_loss


# --- 验证函数 ---
@torch.no_grad() # 装饰器，表示在此函数内不计算梯度，节省显存和计算
def validate(model, dataloader, criterion, device):
    model.eval() # 将模型设置为评估模式 (禁用 Dropout 等)
    total_loss = 0.0 # 记录总验证损失
    dice_scores = [] # 记录每个样本的 Dice 分数

    # 使用 tqdm 显示进度条
    progress_bar = tqdm(dataloader, desc="验证中", leave=False)
    for images, masks in progress_bar:
        # 将数据移动到设备
        images = images.to(device)
        masks = masks.to(device) # 应为 [B, 1, H, W] 且 float 类型

        # 使用混合精度进行前向传播 (即使不训练，也可以加速)
        with amp.autocast():
             outputs = model(images) # 模型输出 logits
             loss = criterion(outputs, masks) # 计算损失

        total_loss += loss.item() # 累加 batch 损失

        # --- 计算 Dice 分数 ---
        preds_prob = torch.sigmoid(outputs) # 转换为概率
        # 使用阈值将概率图二值化为 0 或 1
        preds_binary = (preds_prob > THRESHOLD).float()

        # 计算 Dice 分数 (与 DiceLoss 类似，但针对每个样本计算)
        # 在通道、高度、宽度维度上求和 (通道维度 C=1)
        intersection = (preds_binary * masks).sum(dim=(1, 2, 3))
        union = preds_binary.sum(dim=(1, 2, 3)) + masks.sum(dim=(1, 2, 3))
        # 添加平滑项 epsilon 防止除以零
        dice = (2.0 * intersection + 1e-7) / (union + 1e-7)
        # 将当前 batch 中每个样本的 Dice 分数添加到列表中
        dice_scores.extend(dice.cpu().numpy())

    # 计算平均验证损失 (按 batch 数计算)
    avg_loss = total_loss / len(dataloader)
    # 计算所有验证样本的平均 Dice 分数
    avg_dice = np.mean(dice_scores)
    return avg_loss, avg_dice

print("训练和验证函数已定义。")

训练和验证函数已定义。


In [20]:
# === 9. 主训练流程函数 (修改为 K 折交叉验证) ===

def run_training_kfold():
    print(f"\n===== 开始 {N_FOLDS}-折交叉验证训练 =====")
    clear_memory()
    all_fold_best_model_paths = [] # 存储每一折最佳模型的路径

    # --- 1. 加载完整的训练数据信息 ---
    try:
        print("加载训练数据信息...")
        # !! 注意分隔符 !!
        train_mask_df_full = pd.read_csv(TRAIN_MASK_CSV, sep='\t', names=['name', 'mask'])
        train_mask_df_full['img_path'] = train_mask_df_full['name'].apply(lambda x: os.path.join(TRAIN_IMAGE_DIR, x))
        print(f"找到 {len(train_mask_df_full)} 条训练数据记录。")
        # 检查图片路径
        sample_paths = train_mask_df_full['img_path'].sample(min(5, len(train_mask_df_full))).tolist()
        missing_samples = [p for p in sample_paths if not os.path.exists(p)]
        if missing_samples:
             print(f"警告: 无法找到以下示例图片文件: {missing_samples}")
        else:
             print("示例图片路径检查通过。")
    except Exception as e:
        print(f"加载 {TRAIN_MASK_CSV} 时出错: {e}")
        return None

    # --- 2. 初始化 KFold ---
    # shuffle=True 保证每次运行 KFold 前数据被打乱
    kf = KFold(n_splits=N_FOLDS, shuffle=True, random_state=SEED)

    # --- 3. K-Fold 循环 ---
    for fold in range(N_FOLDS):
        print(f"\n--- 开始训练第 {fold+1}/{N_FOLDS} 折 ---")
        clear_memory()

        # --- 3.1 获取当前折的训练和验证索引 ---
        # kf.split 返回的是索引
        train_idx, valid_idx = list(kf.split(train_mask_df_full))[fold]
        train_df = train_mask_df_full.iloc[train_idx].reset_index(drop=True)
        valid_df = train_mask_df_full.iloc[valid_idx].reset_index(drop=True)
        print(f"第 {fold+1} 折数据划分: {len(train_df)} 训练样本, {len(valid_df)} 验证样本")

        # --- 3.2 创建当前折的 Dataset 和 DataLoader ---
        print(f"第 {fold+1} 折: 创建 Dataset 和 DataLoader...")
        train_dataset = BuildingSegmentationDataset(
            img_paths=train_df['img_path'].tolist(),
            mask_info=train_df[['name', 'mask']],
            transform=train_transform,
            is_test=False
        )
        valid_dataset = BuildingSegmentationDataset(
            img_paths=valid_df['img_path'].tolist(),
            mask_info=valid_df[['name', 'mask']],
            transform=valid_transform,
            is_test=False
        )
        """train_loader = D.DataLoader(
            train_dataset, batch_size=BATCH_SIZE, shuffle=True,
            num_workers=max(1, os.cpu_count() // 2), pin_memory=True, drop_last=True
        )
        valid_loader = D.DataLoader(
            valid_dataset, batch_size=BATCH_SIZE * 2, shuffle=False,
            num_workers=max(1, os.cpu_count() // 2), pin_memory=True
        )"""
        train_loader = D.DataLoader(
            train_dataset, batch_size=BATCH_SIZE, shuffle=True,
            num_workers=2, pin_memory=True, drop_last=True
        )
        valid_loader = D.DataLoader(
            valid_dataset, batch_size=BATCH_SIZE * 2, shuffle=False,
            num_workers=2, pin_memory=True
        )
        print(f"第 {fold+1} 折: DataLoader 创建完成。")

        # --- 3.3 初始化当前折的模型、损失、优化器、调度器 ---
        print(f"第 {fold+1} 折: 初始化模型及相关组件...")
        # !! 每次循环都重新创建模型，保证每折独立训练 !!
        model = smp.create_model(
            arch=MODEL_ARC, encoder_name=ENCODER, encoder_weights=ENCODER_WEIGHTS,
            in_channels=3, classes=1,
        ).to(DEVICE)
        criterion = CombinedLoss(dice_weight=0.6, focal_weight=0.4)
        optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
        # Scheduler T_max 仍然是每折的总轮数
        scheduler = CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=LEARNING_RATE / 100)
        scaler = amp.GradScaler()
        # Early Stopping 保存路径包含折数
        fold_model_path = os.path.join(OUTPUT_DIR, f"best_model_fold_{fold}.pth")
        early_stopping = EarlyStopping(patience=7, verbose=True, path=fold_model_path, mode='max', delta=0.001)
        print(f"第 {fold+1} 折: 初始化完成。")

        # --- 3.4 当前折的训练循环 ---
        print(f"第 {fold+1} 折: 开始训练循环...")
        best_fold_val_dice = -np.inf
        fold_history = {'train_loss': [], 'val_loss': [], 'val_dice': [], 'lr': []}
        log_file_path = os.path.join(OUTPUT_DIR, f"training_log_fold_{fold}.csv") # 每折一个日志

        with open(log_file_path, "w") as log_file:
             log_file.write("epoch,train_loss,val_loss,val_dice,learning_rate\n")
             for epoch in range(1, EPOCHS + 1):
                 start_time = time.time()
                 avg_train_loss = train_one_epoch(model, train_loader, optimizer, criterion, DEVICE, scaler, ACCUMULATION_STEPS)
                 clear_memory()
                 avg_val_loss, avg_val_dice = validate(model, valid_loader, criterion, DEVICE)
                 clear_memory()

                 current_lr = optimizer.param_groups[0]['lr']
                 scheduler.step() # CosineAnnealingLR

                 elapsed_time = time.time() - start_time
                 fold_history['train_loss'].append(avg_train_loss)
                 fold_history['val_loss'].append(avg_val_loss)
                 fold_history['val_dice'].append(avg_val_dice)
                 fold_history['lr'].append(current_lr)

                 log_line = f"{epoch},{avg_train_loss:.6f},{avg_val_loss:.6f},{avg_val_dice:.6f},{current_lr:.8f}\n"
                 log_file.write(log_line)
                 log_file.flush()

                 print(f"Fold {fold+1}/{N_FOLDS} - Epoch {epoch}/{EPOCHS} - "
                       f"Time: {elapsed_time:.0f}s - LR: {current_lr:.6f} - "
                       f"Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f} - Val Dice: {avg_val_dice:.4f}")

                 early_stopping(avg_val_dice, model)
                 if early_stopping.early_stop:
                     print(f"第 {fold+1} 折触发 Early Stopping!")
                     break
                 if avg_val_dice > best_fold_val_dice:
                     best_fold_val_dice = avg_val_dice

        print(f"--- 第 {fold+1}/{N_FOLDS} 折训练结束。最佳验证 Dice: {early_stopping.best_score:.4f} ---")
        # 将当前折保存的最佳模型路径添加到列表中
        if os.path.exists(fold_model_path):
            all_fold_best_model_paths.append(fold_model_path)
        else:
            print(f"警告: 第 {fold+1} 折的最佳模型文件 {fold_model_path} 未找到！")

        # --- 可选：绘制当前折的训练曲线 ---
        plt.figure(figsize=(12, 5))
        plt.subplot(1, 2, 1)
        plt.plot(fold_history['train_loss'], label='Train Loss')
        plt.plot(fold_history['val_loss'], label='Val Loss')
        plt.title(f'Fold {fold+1} Loss')
        plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.legend()
        plt.subplot(1, 2, 2)
        plt.plot(fold_history['val_dice'], label='Val Dice')
        plt.title(f'Fold {fold+1} Validation Dice')
        plt.xlabel('Epoch'); plt.ylabel('Dice Score'); plt.legend()
        plt.tight_layout()
        plt.savefig(os.path.join(OUTPUT_DIR, f'training_history_fold_{fold}.png'))
        plt.show()

    print(f"\n===== {N_FOLDS}-折交叉验证训练全部结束 =====")
    print(f"共保存了 {len(all_fold_best_model_paths)} 个模型:")
    for p in all_fold_best_model_paths:
        print(f"  - {p}")

    # 返回所有保存的模型路径列表
    return all_fold_best_model_paths

In [26]:
# === 10. 测试集预测函数 (K-Fold 集成, 文件名清理, 使用内建 csv 模块写入, 无表头) ===
# 假设以下变量已在之前的单元格定义:
# SAMPLE_SUBMISSION_CSV, TEST_IMAGE_DIR, OUTPUT_DIR, SUBMISSION_FILENAME
# MODEL_ARC, ENCODER, DEVICE, BATCH_SIZE, N_FOLDS
# valid_transform, rle_encode, clear_memory, THRESHOLD, BuildingSegmentationDataset

@torch.no_grad() # 预测时不需要计算梯度
def predict_test_set_kfold(model_paths: list): # 接收模型路径列表
    print("\n===== 开始对测试集进行 K-Fold 集成预测 (无表头输出, 使用内建 csv 模块) =====")
    if not model_paths:
        print("错误：没有提供模型路径列表，无法进行预测。")
        return
    clear_memory()

    # --- 1. 加载测试数据路径和顺序 (包含文件名清理) ---
    try:
        print(f"从 {SAMPLE_SUBMISSION_CSV} 加载测试集文件顺序...")
        sample_df = pd.read_csv(SAMPLE_SUBMISSION_CSV, sep='\t', names=['name', 'mask'])
        test_filenames_raw = sample_df['name'].tolist()
        print(f"{len(test_filenames_raw)} 个原始测试文件名加载成功。")

        print("清理文件名中的空白字符...")
        test_filenames = [fname.strip() for fname in test_filenames_raw]
        print(f"清理后的第一个文件名示例: '{test_filenames[0]}'")

        test_img_paths = [os.path.join(TEST_IMAGE_DIR, fname) for fname in test_filenames]

        print("使用清理后的路径进行示例检查...")
        sample_paths = random.sample(test_img_paths, min(5, len(test_img_paths)))
        missing_samples = [p for p in sample_paths if not os.path.exists(p)]
        if missing_samples:
            print(f"警告: 清理后仍然无法找到示例测试图片: {missing_samples}")
        else:
            print("清理后的示例测试图片路径检查通过。")

    except Exception as e:
        print(f"加载或处理 {SAMPLE_SUBMISSION_CSV} 时出错: {e}")
        return

    # --- 2. 创建测试 Dataset 和 DataLoader ---
    print("创建测试集 Dataset 和 DataLoader...")
    test_dataset = BuildingSegmentationDataset(
        img_paths=test_img_paths, mask_info=None, transform=valid_transform, is_test=True
    )
    test_loader = D.DataLoader(
        test_dataset, batch_size=BATCH_SIZE * 2, shuffle=False, num_workers=2, pin_memory=False
    )
    print("测试集 DataLoader 创建完成。")


    # --- 3. 加载所有 K 折的模型 ---
    print(f"加载 {len(model_paths)} 个 K-Fold 模型...")
    models = []
    for model_path in model_paths:
        print(f"  加载: {model_path}")
        model = smp.create_model(
            arch=MODEL_ARC, encoder_name=ENCODER, encoder_weights=None, in_channels=3, classes=1,
        )
        try:
            model.load_state_dict(torch.load(model_path, map_location=DEVICE))
            model.to(DEVICE)
            model.eval()
            models.append(model)
        except Exception as e:
            print(f"  加载模型 {model_path} 失败: {e}")
    if not models:
        print("错误：未能成功加载任何模型，无法进行预测。")
        return
    print(f"成功加载 {len(models)} 个模型。")


    # --- 4. 执行集成预测 ---
    print("开始集成预测...")
    results_list = []
    progress_bar = tqdm(test_loader, desc="集成预测中")

    for images, filenames_batch in progress_bar:
        images = images.to(DEVICE)
        batch_probs_sum = torch.zeros_like(images[:, 0:1, :, :]).float().to(DEVICE)

        for model in models:
            with amp.autocast():
                outputs = model(images)
            probs = torch.sigmoid(outputs)
            batch_probs_sum += probs

        avg_probs = batch_probs_sum / len(models)

        for i in range(avg_probs.shape[0]):
            prob_single = avg_probs[i].squeeze().cpu().numpy()
            raw_filename = filenames_batch[i]
            clean_filename = raw_filename.strip()

            mask_resized = cv2.resize(prob_single, (512, 512), interpolation=cv2.INTER_LINEAR)
            mask_binary = (mask_resized > THRESHOLD).astype(np.uint8)
            rle = rle_encode(mask_binary)

            # 可选的额外清理：显式移除 RLE 中的换行符 (如果需要)
            # rle_sanitized = rle.replace('\n', ' ').replace('\r', '')
            # results_list.append([clean_filename, rle_sanitized])
            results_list.append([clean_filename, rle]) # 暂时先不用额外清理

    print(f"集成预测完成。共生成 {len(results_list)} 条结果。")


    # --- 5. 创建并保存提交文件 (使用内建 csv 模块, 无表头) ---
    print("创建提交文件 (无表头, 使用内建 csv 模块)...")
    if len(results_list) != len(test_filenames):
         print(f"警告: 生成的结果数量 ({len(results_list)}) 与预期的测试文件数量 ({len(test_filenames)}) 不符！")

    submission_map = {name: rle for name, rle in results_list}
    ordered_results = []
    for fname in test_filenames:
         rle_str = submission_map.get(fname, '')
         ordered_results.append([fname, rle_str]) # 创建列表的列表

    # 保存
    submission_path = os.path.join(OUTPUT_DIR, SUBMISSION_FILENAME)
    try:
        print(f"正在使用 Python 内建 csv 模块保存提交文件到: {submission_path} (无表头)...")
        # 使用 'w' 模式打开文件，newline='' 防止 csv 模块写入多余的空行
        with open(submission_path, 'w', newline='', encoding='utf-8') as f:
            # 创建 csv writer 对象，指定逗号分隔，并对非数字加引号
            writer = csv.writer(f, delimiter=',', quoting=csv.QUOTE_NONNUMERIC)
            # 一次性写入所有行数据 (ordered_results 是一个列表的列表)
            writer.writerows(ordered_results)

        print(f"提交文件已成功保存到: {submission_path}")
        print("提交文件前 5 行预览 (注意可能有引号):")
        try:
            with open(submission_path, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f):
                    if i < 5:
                        print(line.strip())
                    else:
                        break
        except Exception as read_e:
            print(f"无法预览文件内容: {read_e}")

    except Exception as e:
        print(f"使用内建 csv 模块保存提交文件时出错: {e}")

    clear_memory()

# --- 提示 ---
# 1. 确保在这个单元格运行前，所有依赖项已定义。
# 2. 确保 `import csv` 已执行。
# 3. 运行调用此函数的代码来执行预测。

In [21]:
# === 11. 主执行入口 ===

if __name__ == "__main__":
    # --- 执行 K-Fold 训练 ---
    # run_training_kfold() 返回一个包含所有折数最佳模型路径的列表
    list_of_best_models = run_training_kfold()

    # --- 执行 K-Fold 集成预测 ---
    if list_of_best_models and len(list_of_best_models) > 0:
        print(f"\n使用 {len(list_of_best_models)} 个 K-Fold 模型进行集成预测...")
        predict_test_set_kfold(list_of_best_models)
    else:
        # 如果训练失败或未找到模型，尝试从输出目录加载已有的 fold 模型
        print("\n训练未生成模型或失败，尝试加载输出目录中已存在的 K-Fold 模型...")
        existing_fold_models = [os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR) if f.startswith("best_model_fold_") and f.endswith(".pth")]
        if existing_fold_models:
             print(f"找到 {len(existing_fold_models)} 个已存在的 K-Fold 模型，将使用它们进行预测。")
             predict_test_set_kfold(existing_fold_models)
        else:
             print("\n未找到任何 K-Fold 模型文件，无法进行预测。")

    print("\n===== K-Fold 工作流执行完毕 =====")


===== 开始 5-折交叉验证训练 =====
加载训练数据信息...
找到 30000 条训练数据记录。
示例图片路径检查通过。

--- 开始训练第 1/5 折 ---
第 1 折数据划分: 24000 训练样本, 6000 验证样本
第 1 折: 创建 Dataset 和 DataLoader...
第 1 折: DataLoader 创建完成。
第 1 折: 初始化模型及相关组件...


KeyboardInterrupt: 

In [27]:
# === 手动触发 K-Fold 集成预测 ===

print("开始手动触发 K-Fold 集成预测流程...")

# 1. 定义输出目录和模型文件前缀 (确保与训练时一致)
output_dir = OUTPUT_DIR # 使用之前定义的全局变量
model_prefix = "best_model_fold_"
model_suffix = ".pth"
num_folds = N_FOLDS # 使用之前定义的全局变量

# 2. 查找所有已保存的最佳模型文件路径
fold_model_paths = []
print(f"在目录 '{output_dir}' 中查找模型文件...")
try:
    for i in range(num_folds):
        model_filename = f"{model_prefix}{i}{model_suffix}"
        model_path = os.path.join(output_dir, model_filename)
        if os.path.exists(model_path):
            fold_model_paths.append(model_path)
            print(f"  找到: {model_filename}")
        else:
            print(f"  警告: 未找到模型文件 {model_filename}")
except Exception as e:
    print(f"查找模型文件时出错: {e}")

# 3. 检查是否找到了模型文件
if not fold_model_paths:
    print("\n错误：在输出目录中未能找到任何 K-Fold 模型文件。无法进行预测。")
    print(f"请确认 '{output_dir}' 目录中存在形如 '{model_prefix}*{model_suffix}' 的文件。")
else:
    print(f"\n共找到 {len(fold_model_paths)} 个模型文件，将使用它们进行集成预测。")
    # 4. 调用预测函数
    # 确保 predict_test_set_kfold 函数以及所有依赖项 (如 Dataset 类, 配置参数等)
    # 在当前的 Jupyter Session 中已经被定义 (即之前的单元格已运行)
    try:
        predict_test_set_kfold(fold_model_paths)
        print("\n===== 手动预测流程执行完毕 =====")
    except NameError as ne:
        print(f"\n错误：调用预测函数失败，可能之前的单元格未运行。错误信息: {ne}")
        print("请确保定义 predict_test_set_kfold 及相关配置/类的单元格已成功运行。")
    except Exception as ex:
        print(f"\n执行预测时发生意外错误: {ex}")


开始手动触发 K-Fold 集成预测流程...
在目录 'output/' 中查找模型文件...
  找到: best_model_fold_0.pth
  找到: best_model_fold_1.pth
  找到: best_model_fold_2.pth
  找到: best_model_fold_3.pth
  找到: best_model_fold_4.pth

共找到 5 个模型文件，将使用它们进行集成预测。

===== 开始对测试集进行 K-Fold 集成预测 (无表头输出, 使用内建 csv 模块) =====
从 数据集/test_sample_submit.csv 加载测试集文件顺序...
2500 个原始测试文件名加载成功。
清理文件名中的空白字符...
清理后的第一个文件名示例: 'R05K5826G4.jpg'
使用清理后的路径进行示例检查...
清理后的示例测试图片路径检查通过。
创建测试集 Dataset 和 DataLoader...
测试集 DataLoader 创建完成。
加载 5 个 K-Fold 模型...
  加载: output/best_model_fold_0.pth
  加载: output/best_model_fold_1.pth
  加载: output/best_model_fold_2.pth
  加载: output/best_model_fold_3.pth
  加载: output/best_model_fold_4.pth
成功加载 5 个模型。
开始集成预测...


集成预测中:   0%|          | 0/79 [00:00<?, ?it/s]

集成预测完成。共生成 2500 条结果。
创建提交文件 (无表头, 使用内建 csv 模块)...
正在使用 Python 内建 csv 模块保存提交文件到: output/submission_kfold.csv (无表头)...
提交文件已成功保存到: output/submission_kfold.csv
提交文件前 5 行预览 (注意可能有引号):
"R05K5826G4.jpg","21 20 532 24 1044 27 1555 29 2067 30 2578 32 3089 35 3600 37 4111 39 4622 42 5133 44 5645 45 6156 47 6667 49 7179 49 7690 51 8201 53 8713 54 9224 56 9735 59 10247 60 10758 65 11269 82 11781 87 12292 91 12803 94 13315 98 13826 106 13938 3 14337 122 14849 126 15361 135 15874 142 16386 148 16897 157 17409 165 17921 170 18433 177 18945 182 19457 185 19969 191 20481 200 20993 202 21505 204 22017 219 22529 223 23041 230 23553 237 24065 245 24577 249 25089 250 25601 251 26113 261 26625 276 27137 284 27649 59 27712 226 28161 58 28226 230 28673 54 28750 221 29185 52 29266 221 29697 51 29802 201 30209 50 30315 203 30721 48 30827 206 31233 47 31339 208 31745 45 31852 210 32257 44 32364 216 32769 43 32877 218 33283 3 33301 3 33310 13 33391 219 33824 10 33906 219 34338 6 34426 4 34433 207 34947 208 35461

**如何运行:**

1.  **确认所有路径** (在第 2 个代码单元格中) 正确指向您的数据目录和文件。
2.  **检查超参数** (轮数、批大小、图片尺寸、学习率等) 并根据您的硬件和需求进行调整。
3.  **选择模型架构和编码器** (`MODEL_ARC`, `ENCODER`)。`Unet` 配合 `efficientnet-b3` 或 `resnet34` 通常是不错的起点。
4.  **运行所有单元格** (例如，通过菜单栏 "Run" -> "Run All Cells")。
5.  训练过程将开始，最佳模型 (基于验证集 Dice 分数) 会保存在 `OUTPUT_DIR` 目录下 (默认为 `best_model.pth`)。同时会生成训练日志 (`training_log.csv`) 和训练曲线图 (`training_history.png`)。
6.  训练结束后，程序会自动加载最佳模型，并对测试集进行预测。
7.  最终的提交文件 (`submission.csv`) 会保存在 `OUTPUT_DIR` 目录下。

**后续可能的改进方向:**

*   **超参数调优:** 尝试不同的学习率、优化器参数、批大小、图片尺寸、损失函数权重等。
*   **模型选择:** 测试 `segmentation-models-pytorch` 库中提供的其他模型架构和编码器。
*   **交叉验证 (Cross-Validation):** 实现 K 折交叉验证，可以更可靠地评估模型性能，并通过集成 K 个模型的预测结果来提升最终性能。
*   **测试时数据增强 (Test-Time Augmentation, TTA):** 在预测阶段对测试图片应用数据增强 (如翻转)，然后将多次预测的结果进行平均，通常能提高精度。
*   **更复杂的数据增强:** 谨慎地尝试更多类型的数据增强方法。
*   **后处理:** 对预测出的二值掩码进行处理，例如移除面积过小的连通区域，或者填充内部的小孔洞。

**如何运行 (K-Fold 版本):**

1.  **安装 Scikit-learn:** 如果您还没有安装，请运行 `pip install scikit-learn`。
2.  **确认所有路径** (单元格 3) 正确。
3.  **检查超参数** (单元格 3)，特别是 `N_FOLDS` (折数) 和 `EPOCHS` (每折轮数)。总训练时间约等于 `N_FOLDS * EPOCHS * 每轮时间`。
4.  **选择模型架构和编码器** (单元格 3)。
5.  **运行所有单元格**。
6.  程序将依次训练 `N_FOLDS` 个模型，每个模型对应交叉验证的一折。每个模型训练时会进行 Early Stopping，并将最佳权重保存在 `output/best_model_fold_{折数}.pth`。
7.  所有折训练完毕后，程序会自动加载这 `N_FOLDS` 个模型。
8.  对测试集的每张图片，程序会用所有加载的模型进行预测，然后将预测的概率图进行平均。
9.  基于平均后的概率图生成最终的二值掩码和 RLE 编码。
10. 生成的集成预测结果将保存在 `output/submission_kfold.csv` 文件中。

**K-Fold 的优势:**

*   **更可靠的性能评估:** 验证集分数是基于所有数据的轮换验证得到的，更能反映模型在未知数据上的泛化能力。
*   **更充分的数据利用:** 所有数据都被用于训练和验证。
*   **模型集成:** 通过平均 K 个模型的预测，通常可以获得比单个模型更稳定、更精确的结果。