In [1]:
import os
import cv2
import json
import numpy as np
from PIL import Image, ImageDraw
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch
import torch.nn as nn
import matplotlib.pyplot as plt

In [2]:
def json_to_mask(json_path, size):
    """
    将 LabelMe JSON 文件转换为掩码图像。
    
    :param json_path: JSON 文件路径
    :param size: 图像尺寸 (宽, 高)
    :return: 掩码的 NumPy 数组
    """
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # 创建空掩码，初始为 0（背景）
    mask = Image.new('L', size, 0)
    draw = ImageDraw.Draw(mask)
    
    # 遍历 shapes，绘制多边形
    for shape in data['shapes']:
        label = shape['label']
        if label == "road":  # 只处理道路标签
            points = shape['points']
            draw.polygon(points, outline=1, fill=1)  # 填充道路区域为 1
    
    # 转换为 NumPy 数组
    mask = np.array(mask, dtype=np.uint8)
    return mask


In [3]:
def parse_json_to_mask(json_path, image_size):
    """
    将 LabelMe 标注的 JSON 文件解析为二值化掩码。
    :param json_path: JSON 文件路径
    :param image_size: 图像尺寸 (width, height)
    :return: 二值化掩码 (numpy 数组)
    """
    import json

    with open(json_path, 'r') as f:
        data = json.load(f)

    mask = np.zeros((image_size[1], image_size[0]), dtype=np.uint8)  # 初始化掩码
    draw = ImageDraw.Draw(Image.fromarray(mask))

    for shape in data['shapes']:
        if shape['label'] == 'road':  # 只处理标签为 'road' 的区域
            points = shape['points']
            polygon = [(int(x), int(y)) for x, y in points]
            draw.polygon(polygon, outline=1, fill=1)  # 绘制多边形

    return np.array(mask, dtype=np.uint8)  # 返回二值掩码


In [4]:
def preprocess_image_and_mask(image_path, json_path, size=(256, 256)):
    # 图像预处理
    transform_image = transforms.Compose([
        transforms.Resize(size),
        transforms.ToTensor()
    ])
    image = Image.open(image_path).convert("RGB")
    image = transform_image(image)

    # 解析 JSON 并生成掩码
    original_image = Image.open(image_path)
    mask = parse_json_to_mask(json_path, original_image.size)

    # 掩码预处理
    transform_mask = transforms.Compose([
        transforms.Resize(size),
        transforms.ToTensor()
    ])
    mask = transform_mask(Image.fromarray(mask))

    return image, mask

In [5]:
class RoadSegmentationDataset(Dataset):
    def __init__(self, image_paths, json_paths, size=(256, 256)):
        """
        自定义数据集，用于加载图像和 JSON 掩码。

        :param image_paths: 图像路径列表
        :param json_paths: JSON 掩码路径列表
        :param size: 图像和掩码的目标尺寸
        """
        assert len(image_paths) == len(json_paths), "图像和 JSON 的数量必须一致！"
        self.image_paths = image_paths
        self.json_paths = json_paths
        self.size = size

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        json_path = self.json_paths[idx]
        image, mask = preprocess_image_and_mask(image_path, json_path, self.size)
        return {'image': image, 'mask': mask}


In [6]:
class RoadSegmentationModel(nn.Module):
    def __init__(self, num_classes=2):
        super(RoadSegmentationModel, self).__init__()
        resnet = models.resnet18(pretrained=True)
        self.encoder = nn.Sequential(*list(resnet.children())[:-2])  # 提取高层特征
        
        self.decoder = nn.Sequential(
            nn.Conv2d(512, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(64, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False),
            nn.Conv2d(32, num_classes, kernel_size=1)  # 输出类别
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x


In [7]:
def train_model(model, dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        for batch in dataloader:
            images, masks = batch['image'], batch['mask']
            if torch.cuda.is_available():
                images, masks = images.cuda(), masks.cuda()
            
            # 前向传播
            outputs = model(images)
            loss = criterion(outputs, masks.squeeze(1))  # 去掉多余的通道
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / len(dataloader)}")


In [8]:
def test_and_visualize(model, image_path, json_path, size=(256, 256)):
    model.eval()
    image, _ = preprocess_image_and_mask(image_path, json_path, size)
    if torch.cuda.is_available():
        image = image.cuda()
    with torch.no_grad():
        output = model(image.unsqueeze(0))  # 添加 batch 维度
        prediction = torch.argmax(output.squeeze(), dim=0).cpu().numpy()
    
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.title("Original Image")
    plt.imshow(image.permute(1, 2, 0).cpu().numpy())
    plt.subplot(1, 2, 2)
    plt.title("Predicted Mask")
    plt.imshow(prediction, cmap="gray")
    plt.show()


In [13]:
# 配置路径
target = 1
data_name = ['0618', '0854', '1066'][target - 1]
image_path = f'../input_data/{data_name}.png'
json_path = f'../input_data/{data_name}.json'
image, mask = preprocess_image_and_mask(image_path, json_path, size=(256, 256))
print(image.shape, mask.shape)  # 应该返回 torch.Tensor 的尺寸

torch.Size([3, 256, 256]) torch.Size([1, 256, 256])


In [10]:
# 加载数据集
dataset = RoadSegmentationDataset([image_path], [json_path])
dataloader = DataLoader(dataset, batch_size=4, shuffle=False)

# 初始化模型
model = RoadSegmentationModel(num_classes=2)
if torch.cuda.is_available():
    model = model.cuda()

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()  # 二分类交叉熵损失
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练模型
train_model(model, dataloader, criterion, optimizer, num_epochs=10)

# 测试和可视化
test_and_visualize(model, image_path, json_path)



RuntimeError: input and target batch or spatial sizes don't match: target [1, 256, 256], input [1, 2, 128, 128]