In [2]:
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader, Subset
import torchvision.transforms as transforms
from collections import Counter
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
import numpy as np
import matplotlib.pyplot as plt
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [3]:

class UECFoodPixDataset(Dataset):
    def __init__(self, img_dir, mask_dir, file_list, transform=None, mask_transform=None):
        self.img_dir = img_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.mask_transform = mask_transform

        with open(file_list, 'r') as f:
            self.image_ids = [line.strip() for line in f]

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, f"{self.image_ids[idx]}.jpg")
        mask_name = os.path.join(self.mask_dir, f"{self.image_ids[idx]}.png")

        image = Image.open(img_name).convert("RGB")
        mask = Image.open(mask_name).convert("RGB")
        mask = mask.split()[0]

        if self.transform:
            image = self.transform(image)

        if self.mask_transform:
            mask = self.mask_transform(mask)

        return image, mask, self.image_ids[idx]

# Data enhancement and preprocessing
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor()
])


mask_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.Lambda(lambda x: torch.from_numpy(np.array(x)).long())
])

# Train and test data dir
train_img_dir = "UECFOODPIXCOMPLETE/data/UECFoodPIXCOMPLETE/train/img"
train_mask_dir = "UECFOODPIXCOMPLETE/data/UECFoodPIXCOMPLETE/train/mask"
test_img_dir = "UECFOODPIXCOMPLETE/data/UECFoodPIXCOMPLETE/test/img"
test_mask_dir = "UECFOODPIXCOMPLETE/data/UECFoodPIXCOMPLETE/test/mask"
category_file = "UECFOODPIXCOMPLETE/data/category.txt"
train_list_file = "UECFOODPIXCOMPLETE/data/train9000.txt"
test_list_file = "UECFOODPIXCOMPLETE/data/test1000.txt"

train_dataset = UECFoodPixDataset(
    img_dir=train_img_dir,
    mask_dir=train_mask_dir,
    file_list=train_list_file,
    transform=train_transform,
    mask_transform=mask_transform
)
num_classes = 103
test_dataset = UECFoodPixDataset(
    img_dir=test_img_dir,
    mask_dir=test_mask_dir,
    file_list=test_list_file,
    transform=train_transform,
    mask_transform=mask_transform
)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

import config
from model_utils import *
class SegNet(nn.Module):
    def __init__(self):
        super(SegNet, self).__init__()
        self.vgg16_bn = models.vgg16_bn(pretrained=True).features
        self.relu = nn.ReLU(inplace=True)
        self.index_MaxPool = nn.MaxPool2d(kernel_size=2, stride=2, return_indices=True)
        self.index_UnPool = nn.MaxUnpool2d(kernel_size=2, stride=2)
        # net struct
        self.conv1_block = nn.Sequential(
            self.vgg16_bn[0],  # conv2d(3,64,(3,3))
            self.vgg16_bn[1],  # bn(64,eps=1e-05,momentum=0.1,affine=True)
            self.vgg16_bn[2],  # relu(in_place)
            self.vgg16_bn[3],  # conv2d(3,64,(3,3))
            self.vgg16_bn[4],  # bn(64,eps=1e-05,momentum=0.1,affine=True)
            self.vgg16_bn[5],  # relu(in_place)
        )
        self.conv2_block = nn.Sequential(
            self.vgg16_bn[7],
            self.vgg16_bn[8],
            self.vgg16_bn[9],
            self.vgg16_bn[10],
            self.vgg16_bn[11],
            self.vgg16_bn[12],
        )
        self.conv3_block = nn.Sequential(
            self.vgg16_bn[14],
            self.vgg16_bn[15],
            self.vgg16_bn[16],
            self.vgg16_bn[17],
            self.vgg16_bn[18],
            self.vgg16_bn[19],
            self.vgg16_bn[20],
            self.vgg16_bn[21],
            self.vgg16_bn[22],
        )
        self.conv4_block = nn.Sequential(
            self.vgg16_bn[24],
            self.vgg16_bn[25],
            self.vgg16_bn[26],
            self.vgg16_bn[27],
            self.vgg16_bn[28],
            self.vgg16_bn[29],
            self.vgg16_bn[30],
            self.vgg16_bn[31],
            self.vgg16_bn[32],
        )
        self.conv5_block = nn.Sequential(
            self.vgg16_bn[34],
            self.vgg16_bn[35],
            self.vgg16_bn[36],
            self.vgg16_bn[37],
            self.vgg16_bn[38],
            self.vgg16_bn[39],
            self.vgg16_bn[40],
            self.vgg16_bn[41],
            self.vgg16_bn[42],
        )

        self.upconv5_block = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(512, 512, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(512, 512, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
        )
        self.upconv4_block = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(512, 512, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(512, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
        )
        self.upconv3_block = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(256, 128, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
        )
        self.upconv2_block = nn.Sequential(
            nn.Conv2d(128, 128, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(128, 64, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
        )
        self.upconv1_block = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=(3, 3), padding=(1, 1)),
            nn.BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True),
            self.relu,
            nn.Conv2d(64, 103, kernel_size=(3, 3), padding=(1, 1)),
        )

    def forward(self, x):
        f1, idx1 = self.index_MaxPool(self.conv1_block(x))
        f2, idx2 = self.index_MaxPool(self.conv2_block(f1))
        f3, idx3 = self.index_MaxPool(self.conv3_block(f2))
        f4, idx4 = self.index_MaxPool(self.conv4_block(f3))
        f5, idx5 = self.index_MaxPool(self.conv5_block(f4))
        up6 = self.index_UnPool(f5, idx5)
        up5 = self.index_UnPool(self.upconv5_block(up6), idx4)
        up4 = self.index_UnPool(self.upconv4_block(up5), idx3)
        up3 = self.index_UnPool(self.upconv3_block(up4), idx2)
        up2 = self.index_UnPool(self.upconv2_block(up3), idx1)
        up1 = self.upconv1_block(up2)

        return F.log_softmax(up1, dim=1)


model = SegNet().to(device)
lossf = nn.CrossEntropyLoss(ignore_index=255)
optimizer = optim.Adam(model.parameters(),lr=0.0003)
epochs_num = 10



In [5]:
from torch.amp import autocast, GradScaler
from tqdm import tqdm
import random
scaler = GradScaler()


def calculate_iou(pred, mask, num_classes):
    iou = []
    pred = pred.cpu().numpy()
    mask = mask.cpu().numpy()

    # 计算所有类别的 IoU，包括背景类0
    for cls in range(num_classes):
        pred_cls = pred == cls
        mask_cls = mask == cls
        intersection = np.logical_and(pred_cls, mask_cls).sum()
        union = np.logical_or(pred_cls, mask_cls).sum()
        if union == 0:
            iou.append(float('nan'))  # 避免分母为0时出错
        else:
            iou.append(intersection / union)

    return np.nanmean(iou)  # 返回所有类的平均 IoU


# 定义准确率计算函数，包括背景类0
def calculate_accuracy(pred, mask):
    pred = pred.cpu().numpy()
    mask = mask.cpu().numpy()
    correct = (pred == mask).sum()
    total = mask.size
    return correct / total


# 训练函数
def train(model, train_loader, test_loader, criterion, optimizer, device, epochs, num_classes):
    model.train()
    train_loss_history = []
    train_acc_history = []
    train_miou_history = []

    for epoch in range(epochs):
        running_loss = 0.0
        running_acc = 0.0
        running_miou = 0.0

        for images, masks, _ in tqdm(train_loader):
            images, masks = images.to(device), masks.to(device)

            optimizer.zero_grad()

            # 使用自动混合精度进行前向传播
            with autocast("cuda"):
                outputs = model(images)
                loss = criterion(outputs, masks)

            # 混合精度反向传播和优化
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item()

            # 计算预测
            preds = torch.argmax(outputs, dim=1)

            # 计算准确率和 IoU
            running_acc += calculate_accuracy(preds, masks)
            running_miou += calculate_iou(preds, masks, num_classes)

            # 释放无用的变量，并清理显存
            del outputs, loss, preds
            torch.cuda.empty_cache()

        # 计算每个 epoch 的平均 loss、accuracy 和 mIoU
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = running_acc / len(train_loader)
        epoch_miou = running_miou / len(train_loader)

        train_loss_history.append(epoch_loss)
        train_acc_history.append(epoch_acc)
        train_miou_history.append(epoch_miou)

        print(
            f"Epoch [{epoch + 1}/{epochs}], Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}, Train mIoU: {epoch_miou:.4f}")

        # 在每个 epoch 后保存模型
        torch.save(model.state_dict(), f"segnet_epoch_{epoch + 1}.pth")

        # 在测试集上运行评估
        #test_acc, test_miou = evaluate_model(model, test_loader, device, num_classes)
       # test_acc_history.append(test_acc)
        #test_miou_history.append(test_miou)
      #  print(f"Epoch [{epoch + 1}/{epochs}], Test Accuracy: {test_acc:.4f}, Test mIoU: {test_miou:.4f}")
        visualize_predictions(model, test_loader, device, num_samples=4)
    return train_loss_history, train_acc_history, train_miou_history


# 测试集上的评估函数
def evaluate_model(model, test_loader, device, num_classes):
    model.eval()
    total_acc = 0.0
    total_miou = 0.0
    num_batches = len(test_loader)

    with torch.no_grad():
        for images, masks, _ in test_loader:
            images, masks = images.to(device), masks.to(device)

            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)

            # 计算准确率和 IoU
            total_acc += calculate_accuracy(preds, masks)
            total_miou += calculate_iou(preds, masks, num_classes)

    avg_acc = total_acc / num_batches
    avg_miou = total_miou / num_batches

    return avg_acc, avg_miou


def visualize_predictions(model, test_loader, device, num_samples=4):
    model.eval()
    with torch.no_grad():
        for images, masks, ids in test_loader:
            images, masks = images.to(device), masks.to(device)

            outputs = model(images)
            preds = torch.argmax(outputs, dim=1)

            print("Preds:", preds[0].cpu().numpy())  # 打印第一个样本的预测
            unique_values = torch.unique(preds).cpu().numpy()
            print("Unique predicted classes:", unique_values)  # 打印预测中包含的唯一类别

            # 随机选择num_samples个索引
            indices = random.sample(range(len(images)), min(num_samples, len(images)))

            fig, axes = plt.subplots(3, num_samples, figsize=(15, 8))
            for i, idx in enumerate(indices):
                # 显示原始图像
                axes[0, i].imshow(images[idx].cpu().permute(1, 2, 0))
                axes[0, i].set_title(f"Test Image ID: {ids[idx]}")
                axes[0, i].axis('off')

                # 显示真实掩码
                axes[1, i].imshow(masks[idx].cpu())
                axes[1, i].set_title("Ground Truth")
                axes[1, i].axis('off')

                # 显示模型预测掩码
                axes[2, i].imshow(preds[idx].cpu())
                axes[2, i].set_title("Predicted Mask")
                axes[2, i].axis('off')

            plt.show()
            break

train_loss_history, train_acc_history, train_miou_history = train(
    model, train_loader, test_loader, lossf, optimizer, device, epochs_num, num_classes
)

  0%|          | 0/282 [00:10<?, ?it/s]


OutOfMemoryError: CUDA out of memory. Tried to allocate 64.00 MiB. GPU 0 has a total capacity of 4.00 GiB of which 0 bytes is free. Of the allocated memory 3.42 GiB is allocated by PyTorch, and 56.00 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)