In [1]:
import torch
print(torch.__version__)
print(torch.version.cuda)
print(torch.backends.cudnn.version())


1.8.2+cu111
11.1
8005


In [1]:
import os
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import models
import torchvision.transforms as transforms
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F  # 添加此行以使用 F.interpolate
from torch.cuda.amp import autocast, GradScaler
from PIL.Image import Resampling  # 导入Resampling

# 设置打印选项，显示完整 Tensor 内容
torch.set_printoptions(threshold=100000)


class UpperBodyDataset(Dataset):
    def __init__(self, img_dir, mask_dir, img_transform=None, mask_transform=None, color_to_class=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.img_transform = img_transform
        self.mask_transform = mask_transform
        self.color_to_class = color_to_class
        self.images = [f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.img_dir, img_name)
        mask_name = os.path.splitext(img_name)[0] + '.png'  # 确保mask文件名对应
        mask_path = os.path.join(self.mask_dir, mask_name)

        image = Image.open(img_path).convert('RGB')
        mask = Image.open(mask_path).convert('RGB')  # 保持三通道

        if self.img_transform:
            image = self.img_transform(image)
        if self.mask_transform:
            mask = self.mask_transform(mask)
            # print('transform')
            
        # print(mask)    

        # 将掩码从RGB转换为类别索引
        mask = self.rgb_to_class(mask)

        return image, mask  # 返回形状为 [C, H, W] 和 [H, W]

    def rgb_to_class(self, mask):
        mask = mask.permute(1, 2, 0).numpy()  # 转换为 H x W x C
        h, w, c = mask.shape
        class_mask = np.zeros((h, w), dtype=np.int64)  # 将 np.long 替换为 np.int64

        for color, class_idx in self.color_to_class.items():
        # 创建布尔掩码
            matches = np.all(mask == color, axis=-1)
            class_mask[matches] = class_idx

    # 检查是否有未映射的像素
        unique_unmapped = np.unique(class_mask)
        if 0 in unique_unmapped and 0 not in self.color_to_class.values():
            print("警告: 存在未映射的颜色，类别索引为0的像素可能被错误地标记为背景。")
            

        class_mask = torch.from_numpy(class_mask).long()  # 转换为 torch.Tensor
        
        # print(class_mask)
        return class_mask

class ToInteger(object):
    """将图像转换为整数（uint8）。"""
    def __call__(self, img):
        return np.array(img).astype(np.uint8)

class ToTensorWithoutNormalization(object):
    def __call__(self, image):
        # 将 PIL.Image 转换为 tensor，且不进行归一化
        return torch.from_numpy(np.array(image)).permute(2, 0, 1).float()



def get_transforms(img_size_h, img_size_w):
    img_transform = transforms.Compose([
        transforms.Resize((img_size_h, img_size_w), interpolation=Resampling.BILINEAR),
        ToTensorWithoutNormalization(),
        ToInteger(),
        # transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
    ])
    
    mask_transform = transforms.Compose([
        transforms.Resize((img_size_h, img_size_w), interpolation=Resampling.NEAREST),
        ToTensorWithoutNormalization(),
    ])
    
    return img_transform, mask_transform


def calculate_metrics(outputs, targets):
    """计算IoU和像素准确率"""
    predictions = torch.argmax(outputs, dim=1)
    predictions = predictions.view(-1)
    targets = targets.view(-1)

    intersection = (predictions * targets).sum()
    union = predictions.sum() + targets.sum() - intersection
    iou = (intersection + 1e-6) / (union + 1e-6)

    accuracy = (predictions == targets).float().mean()
    return iou.item(), accuracy.item()


def visualize_prediction(image, mask, prediction, epoch, idx, save_dir, class_colors=None):
    """可视化预测结果"""
    image = image.cpu().numpy().transpose(1, 2, 0)
    image = (image * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406])  # 反标准化
    image = np.clip(image, 0, 1)

    mask = mask.cpu().numpy()
    prediction = torch.argmax(prediction, dim=0).cpu().numpy()

    plt.figure(figsize=(15, 5))
    plt.subplot(131)
    plt.imshow(image)
    plt.title('Original Image')
    plt.axis('off')

    plt.subplot(132)
    if class_colors:
        mask_color = decode_segmap(mask, class_colors)
        plt.imshow(mask_color)
    else:
        plt.imshow(mask, cmap='gray')
    plt.title('Ground Truth')
    plt.axis('off')

    plt.subplot(133)
    if class_colors:
        prediction_color = decode_segmap(prediction, class_colors)
        plt.imshow(prediction_color)
    else:
        plt.imshow(prediction, cmap='gray')
    plt.title('Prediction')
    plt.axis('off')

    plt.savefig(os.path.join(save_dir, f'epoch_{epoch}_sample_{idx}.png'))
    plt.close()

def decode_segmap(image, class_colors):
    """
    将类别索引图转换为RGB图像。
    """
    r = np.zeros_like(image).astype(np.uint8)
    g = np.zeros_like(image).astype(np.uint8)
    b = np.zeros_like(image).astype(np.uint8)

    for class_idx, color in class_colors.items():
        print(class_idx)
        print(color)
        r[image == class_idx] = color[0]
        g[image == class_idx] = color[1]
        b[image == class_idx] = color[2]

    rgb = np.stack([r, g, b], axis=2)
    return rgb


class LightweightASPP(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()

        # Reduce input channels for memory efficiency
        self.conv1 = nn.Conv2d(in_channels, out_channels, 1)

        # Lightweight atrous convolutions
        self.aspp1 = nn.Conv2d(out_channels, out_channels, 1)
        self.aspp2 = nn.Conv2d(out_channels, out_channels, 3, padding=6, dilation=6, groups=out_channels)
        self.aspp3 = nn.Conv2d(out_channels, out_channels, 3, padding=12, dilation=12, groups=out_channels)

        # Global context
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.global_conv = nn.Conv2d(in_channels, out_channels, 1)

        # Final 1x1 conv
        self.final_conv = nn.Conv2d(out_channels * 4, out_channels, 1)

    def forward(self, x):
        size = x.size()[2:]

        conv1 = self.conv1(x)

        aspp1 = self.aspp1(conv1)
        aspp2 = self.aspp2(conv1)
        aspp3 = self.aspp3(conv1)

        # Global features
        global_features = self.global_avg_pool(x)
        global_features = self.global_conv(global_features)
        global_features = F.interpolate(global_features, size=size, mode='bilinear', align_corners=True)

        # Concatenate all features
        out = torch.cat([aspp1, aspp2, aspp3, global_features], dim=1)
        out = self.final_conv(out)

        return out


class LightweightDeepLabv3(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        # Use a lightweight custom backbone
        self.backbone = nn.Sequential(
            # Initial conv layer
            nn.Conv2d(3, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),

            # Depthwise separable convolutions
            self._make_separable_block(32, 64, stride=2),
            self._make_separable_block(64, 128, stride=2),
            self._make_separable_block(128, 256, stride=1),
        )

        self.aspp = LightweightASPP(256, 256)

        # Decoder
        self.decoder = nn.Sequential(
            nn.Conv2d(256, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, num_classes, 1)
        )

    def _make_separable_block(self, in_channels, out_channels, stride):
        return nn.Sequential(
            # Depthwise
            nn.Conv2d(in_channels, in_channels, 3, stride=stride, padding=1, groups=in_channels),
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            # Pointwise
            nn.Conv2d(in_channels, out_channels, 1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU()
        )

    def forward(self, x):
        input_size = x.size()[2:]

        # Extract features
        features = self.backbone(x)

        # Apply ASPP
        aspp_features = self.aspp(features)

        # Decode and upsample
        out = self.decoder(aspp_features)
        out = F.interpolate(out, size=input_size, mode='bilinear', align_corners=True)

        return out



def train_model(config):
    # 创建保存目录
    os.makedirs(config['save_dir'], exist_ok=True)
    os.makedirs(os.path.join(config['save_dir'], 'visualizations'), exist_ok=True)

    # 设置TensorBoard
    writer = SummaryWriter(os.path.join(config['save_dir'], 'logs'))

    # 获取图像和掩码的变换
    img_transform, mask_transform = get_transforms(config['img_size_h'], config['img_size_w'])

    # 创建数据集和数据加载器
    train_dataset = UpperBodyDataset(
        config['train_img_dir'],
        config['train_mask_dir'],
        img_transform=img_transform,
        mask_transform=mask_transform,
        color_to_class=config['color_to_class']  # 传递颜色映射
    )
    val_dataset = UpperBodyDataset(
        config['val_img_dir'],
        config['val_mask_dir'],
        img_transform=img_transform,
        mask_transform=mask_transform,
        color_to_class=config['color_to_class']  # 传递颜色映射
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=config['batch_size'],
        shuffle=True,
        pin_memory=True if config['device'] == 'cuda' else False,  # 如果使用GPU，启用pin_memory
        num_workers=2  # 根据系统配置调整
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=config['batch_size'],
        shuffle=False,
        pin_memory=True if config['device'] == 'cuda' else False,
        num_workers=2
    )

    # 初始化模型
    model = LightweightDeepLabv3(num_classes=config['num_classes'])  # 修正参数名称
    print(model)
    model = model.to(config['device'])

    # 损失函数和优化器
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min',
                                                           factor=0.1, patience=5)

    # 初始化混合精度Scaler
    # scaler = GradScaler()

    best_val_loss = float('inf')
    for epoch in range(config['num_epochs']):
        # 训练阶段
        model.train()
        train_loss = 0
        train_iou = 0
        train_acc = 0

        train_bar = tqdm(train_loader, desc=f'Training Epoch {epoch + 1}/{config["num_epochs"]}')
        for batch_idx, (images, masks) in enumerate(train_bar):
            images = images.to(config['device'], non_blocking=True)
            masks = masks.to(config['device'], non_blocking=True)
           

            optimizer.zero_grad()

            #             with autocast():  # 启用自动混合精度
            outputs = model(images)  # 修正输出获取方式
            loss = criterion(outputs, masks)  # masks 已经是 [batch_size, H, W] 且 dtype 为 long

            #             scaler.scale(loss).backward()
            #             scaler.step(optimizer)
            #             scaler.update()

            loss.backward()
            optimizer.step()

            # 计算指标
            batch_iou, batch_acc = calculate_metrics(outputs, masks)
            train_loss += loss.item()
            train_iou += batch_iou
            train_acc += batch_acc

            # 更新进度条
            train_bar.set_postfix({
                'loss': f'{loss.item():.4f}',
                'iou': f'{batch_iou:.4f}',
                'acc': f'{batch_acc:.4f}'
            })

            # 可视化第一个batch的第一张图片
            if batch_idx == 0:
                visualize_prediction(
                    images[0], masks[0],
                    outputs[0],
                    epoch + 1, batch_idx,
                    os.path.join(config['save_dir'], 'visualizations')
                )

        # 计算平均训练指标
        train_loss /= len(train_loader)
        train_iou /= len(train_loader)
        train_acc /= len(train_loader)

        # 验证阶段
        model.eval()
        val_loss = 0
        val_iou = 0
        val_acc = 0

        with torch.no_grad():
            val_bar = tqdm(val_loader, desc='Validation')
            for images, masks in val_bar:
                images = images.to(config['device'], non_blocking=True)
                masks = masks.to(config['device'], non_blocking=True)

                # with autocast():  # 也可以在验证阶段使用 autocast
                outputs = model(images)  # 修正输出获取方式
                loss = criterion(outputs, masks)

                batch_iou, batch_acc = calculate_metrics(outputs, masks)
                val_loss += loss.item()
                val_iou += batch_iou
                val_acc += batch_acc

                val_bar.set_postfix({
                    'loss': f'{loss.item():.4f}',
                    'iou': f'{batch_iou:.4f}',
                    'acc': f'{batch_acc:.4f}'
                })

        # 计算平均验证指标
        val_loss /= len(val_loader)
        val_iou /= len(val_loader)
        val_acc /= len(val_loader)

        # 记录到TensorBoard
        writer.add_scalar('Loss/train', train_loss, epoch)
        writer.add_scalar('Loss/val', val_loss, epoch)
        writer.add_scalar('IoU/train', train_iou, epoch)
        writer.add_scalar('IoU/val', val_iou, epoch)
        writer.add_scalar('Accuracy/train', train_acc, epoch)
        writer.add_scalar('Accuracy/val', val_acc, epoch)

        # 打印训练信息
        print(f'\nEpoch {epoch + 1}/{config["num_epochs"]}:')
        print(f'Train Loss: {train_loss:.4f}, Train IoU: {train_iou:.4f}, Train Acc: {train_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f}, Val IoU: {val_iou:.4f}, Val Acc: {val_acc:.4f}\n')
        
        visualize_prediction(
            images[0], masks[0],
            outputs[0],
            epoch + 1, batch_idx,
            os.path.join(config['save_dir'], 'visualizations'),
            class_colors=config['class_colors']
        )

        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
                'val_iou': val_iou,
                'val_acc': val_acc
            }, os.path.join(config['save_dir'], 'model_deeplabv3_label_test.pth'))

        # 学习率调整
        scheduler.step(val_loss)

        # 清理显存缓存
        torch.cuda.empty_cache()

        # 监控显存使用情况
        if config['device'] == 'cuda':
            allocated = torch.cuda.memory_allocated(config['device']) / 1024 ** 2
            reserved = torch.cuda.memory_reserved(config['device']) / 1024 ** 2
            print(f"Epoch {epoch + 1} - Memory Allocated: {allocated:.2f} MB")
            print(f"Epoch {epoch + 1} - Memory Reserved: {reserved:.2f} MB")

In [2]:
if __name__ == '__main__':
        # 示例配置
        # 定义颜色到类别索引的映射
    COLOR_TO_CLASS = {
        (0,0,0):0, #-> Background
        (1,1,1):1, #-> Hair
        (4,4,4):2, #-> Upclothes
        (5,5,5):3, #-> Left-shoe 
        (6,6,6):4, #-> Right-shoe
        (7,7,7):5, #-> Noise
        (8,8,8):6, #-> Pants
        (9,9,9):7, #-> Left_leg
        (10,10,10):8, #-> Right_leg
        (11,11,11):9,# -> Left_arm
        (12,12,12):10,# -> Face
        (13,13,13):11# -> Right_arm
    # 添加更多类别颜色映射
    # (R, G, B): 类别索引,
    }
    # 反转映射，得到类别索引到颜色的映射
    CLASS_TO_COLOR = {
        0:(0,0,0), #-> Background
        1:(1,1,1), #-> Hair
        2:(4,4,4), #-> Upclothes
        3:(5,5,5),#-> Left-shoe 
        4:(6,6,6),#-> Right-shoe
        5:(7,7,7),#-> Noise
        6:(8,8,8),#-> Pants
        7:(9,9,9),#-> Left_leg
        8:(10,10,10), #-> Right_leg
        9:(11,11,11),# -> Left_arm
        10:(12,12,12),# -> Face
        11:(13,13,13)# -> Right_arm
    }
    config = {
        'train_img_dir': 'train_img',          # 替换为训练图片文件夹路径
        'train_mask_dir': 'train_label',       # 替换为训练mask文件夹路径
        'val_img_dir': 'test_img',              # 替换为验证图片文件夹路径
        'val_mask_dir': 'test_label',           # 替换为验证mask文件夹路径
        'img_size_h': 256,                      # 设置图像高度，调整为128
        'img_size_w': 192,                      # 设置图像宽度，调整为128
        'batch_size': 2,                        # 设置批量大小为2，尽量减少
        'learning_rate': 1e-4,                  # 设置学习率
        'num_epochs': 50,                       # 设置训练轮数
        'save_dir': 'save_model',               # 替换为模型和日志保存路径
       
        'color_to_class': COLOR_TO_CLASS,      # 添加颜色到类别映射
        'class_colors': CLASS_TO_COLOR,        # 添加用于可视化的颜色映射
        'device': 'cpu',  # 设置设备
        'num_classes': 12                        # 设置类别数（例如：背景+目标）
    }

    # 启动训练
    train_model(config)

2024-12-26 13:09:10.184427: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-26 13:09:10.190849: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-26 13:09:10.556863: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-26 13:09:10.659523: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1735189750.862718     524 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1735189750.89

LightweightDeepLabv3(
  (backbone): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Sequential(
      (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=32)
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1))
      (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
    )
    (4): Sequential(
      (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=64)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1))
      (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (

Training Epoch 1/50:   0%|          | 0/681 [00:00<?, ?it/s]


RuntimeError: expected scalar type Byte but found Float