# ResUNET

In [None]:
import torch
import torchvision

print("torch ==", torch.__version__)
print("torchvision ==", torchvision.__version__)

print("CUDA availiable == %s" % (torch.cuda.is_available()))

In [None]:
# LINUX JUPYTER
# !apt-get update
# !apt-get install -y libgl1-mesa-glx

# !pip config set global.index-url https://mirrors.aliyun.com/pypi/simple
# !pip install scikit-learn
# !pip install opencv-python
# !pip install ipympl

# 定义ISBI_LOADER

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm import tqdm
import numpy as np
import cv2
import os
import glob
import random
from IPython.display import clear_output
from model.resunet_model import Resnet_Unet

from sklearn.model_selection import KFold


class ISBI_Loader(Dataset):
    def __init__(self, data_path=None, data_lst=None, use_augment=True):
        if data_path is not None:
            # 初始化函数，读取所有data_path下的图片
            self.data_path = data_path
            self.imgs_path = glob.glob(os.path.join(data_path, "images/*.jpg"))

        if data_lst is not None:
            # 直接载入
            self.imgs_path = data_lst

        # 预处理方法
        self.use_augment = use_augment

    def augment(self, image, flipCode):
        # 使用cv2.flip进行数据增强，filpCode为1水平翻转，0垂直翻转，-1水平+垂直翻转
        flip = cv2.flip(image, flipCode)
        return flip

    def __getitem__(self, index):
        # 根据index读取图片
        image_path = self.imgs_path[index]

        # 根据image_path生成label_path
        label_path = image_path.replace("images", "mask")
        label_path = label_path.replace(".jpg", ".png")

        # 读取训练图片和标签图片
        image = cv2.imread(image_path)
        label = cv2.imread(label_path)
        image = cv2.resize(image, (512, 512))
        label = cv2.resize(label, (512, 512), interpolation=cv2.INTER_NEAREST)

        # label转为单通道
        label = cv2.cvtColor(label, cv2.COLOR_BGR2GRAY)

        # 处理标签，将像素值为255的改为1
        if label.max() > 1:
            label = label / 255

        # 随机进行数据增强，为3时不做处理
        if self.use_augment:
            flipCode = random.choice([-1, 0, 1])

            if flipCode != 2:
                image = self.augment(image, flipCode)
                label = self.augment(label, flipCode)
            elif flipCode == 2:
                kernel_size = (random.choice([1, 3]), random.choice([1, 3]))
                sigma = random.choice([1, 2, 3])
                image = cv2.GaussianBlur(image, kernel_size, sigma)

        # 转换为tensor并标准化
        transform = transforms.Compose(
            [
                transforms.ToTensor(),
                # transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
            ]
        )
        image_tensor = transform(image)
        
        # label_tensor = transform(label)
        label_tensor = label.reshape(1, label.shape[0], label.shape[1])

        return image_tensor, label_tensor

    def __len__(self):
        # 返回训练集大小
        return len(self.imgs_path)


print("ok")

# 设置参数

In [None]:
from utils_files import *

# 设置模型参数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 创建 resunet 网络实例
model = Resnet_Unet(BN_enable=True, resnet_pretrain=False).to(device)

# 续训练(Fine-tuning)权重加载
fine_tuning_weights_dir = str(get_current_path() / "blost_0.13408492505550385.pth")
# model.load_state_dict(torch.load(fine_tuning_weights_dir, map_location=device))

# 定义损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.RMSprop(
    model.parameters(), lr=0.00001, weight_decay=1e-8, momentum=0.9
)

print(model)

# 图表

In [None]:
# 初始化图表

# 使用qt5(vscode)
# %matplotlib qt5

# 使用widget(linux jupyter)
%matplotlib widget

from utils_plot import Myplot

plot = Myplot(2, 2)

# plot.axes[1, 1].remove()

plt_epochs = []

plt_trainloss = []
plt_valloss = []

plt_trainacc = []
plt_valacc = []

plt_tpr = []
plt_fpr = []

plt_f1 = []

plot.save("./result/figure.png")
print("ok")

# 训练

In [None]:
# 训练参数
epochs = 50
batch_size = 2
dataset_path = get_current_path() / "dataset" / "prepare"

# 最优数值
best_loss = float("inf")
best_accuracy = 0
best_f1 = 0

# 累计epochs
accumulate_epochs = 0

# 创建 KFold 对象，将数据集划分为 K 个折叠
kf = KFold(n_splits=10)

# 获取训练集列表
k_fold_lst = get_file_list(dataset_path / "images")
k_fold_lst = [str(file_pathobj) for file_pathobj in k_fold_lst]

# 循环进行 K 次交叉验证
for fold_index, (train_index, val_index) in enumerate(kf.split(k_fold_lst)):
    # 根据索引划分训练集和验证集
    train_lst = [k_fold_lst[i] for i in train_index]
    val_lst = [k_fold_lst[i] for i in val_index]

    # 加载训练集
    isbi_tra_dataset = ISBI_Loader(data_lst=train_lst)
    train_loader = DataLoader(
        dataset=isbi_tra_dataset, batch_size=batch_size, shuffle=True
    )

    isbi_val_dataset = ISBI_Loader(data_lst=val_lst)
    val_loader = DataLoader(
        dataset=isbi_val_dataset, batch_size=batch_size, shuffle=False
    )

    print(
        "fold:{:.0f} train_cnt:{:.0f} val_cnt:{:.0f}".format(
            fold_index, len(train_loader), len(val_loader)
        )
    )

    # 开始训练
    for epoch in range(epochs):
        print(
            "Epoch: {:.0f} Accumulate epochs: {:.0f}".format(epoch, accumulate_epochs)
        )

        # 训练模式
        model.train()

        train_loss = 0.0

        train_acc = 0.0
        train_tptn = 0.0
        train_samples = 0.0

        # 按照batch_size开始训练
        for image, label in tqdm(train_loader):
            optimizer.zero_grad()

            # 将数据拷贝到device中
            image = image.to(device=device, dtype=torch.float32)
            label = label.to(device=device, dtype=torch.float32)

            # print("image:", image.shape)

            # 使用网络参数，输出预测结果
            pred = model(image)

            # print("pred:", pred.shape, " label:", label.shape)

            # 计算loss
            loss = criterion(pred, label)
            train_loss += loss.item()

            # 二值化
            pred_binary = torch.zeros_like(pred)
            pred_binary = torch.where(pred >= 0.5, 1, 0)

            # 计算ACC
            train_tptn += (pred_binary == label).sum().item()
            train_samples += (pred_binary == 1).sum().item() + (
                pred_binary == 0
            ).sum().item()
            train_acc = train_tptn / train_samples

            # 更新参数
            loss.backward()
            optimizer.step()

        # 求epoch平均loss
        train_loss /= len(train_loader)

        print(
            "[{:.0f}/{:.0f}] LOSS: {:.3f} ACC:{:.3f} TP:{:.0f} TOTAL:{:.0f}".format(
                epoch + 1,
                epochs,
                train_loss,
                train_acc,
                train_tptn,
                train_samples,
            )
        )

        # 验证模式
        model.eval()

        val_loss = 0.0

        val_acc = 0.0
        val_tptn = 0.0
        val_samples = 0.0

        tp = 0.0
        fp = 0.0
        tn = 0.0
        fn = 0.0

        with torch.no_grad():
            for val_image, val_label in tqdm(val_loader):
                val_image = val_image.to(device=device, dtype=torch.float32)
                val_label = val_label.to(device=device, dtype=torch.float32)

                # 预测
                val_pred = model(val_image)

                # 计算loss
                loss = criterion(val_pred, val_label)
                val_loss += loss.item()

                # 二值化
                val_pred[val_pred >= 0.5] = 1
                val_pred[val_pred < 0.5] = 0

                # 计算TP、FP、TN、FN
                tp += ((val_pred == 1) & (val_label == 1)).sum().item()
                fp += ((val_pred == 1) & (val_label == 0)).sum().item()
                tn += ((val_pred == 0) & (val_label == 0)).sum().item()
                fn += ((val_pred == 0) & (val_label == 1)).sum().item()

        # 计算准确率
        val_acc = (tp + tn) / (tp + fp + tn + fn + 1e-8)

        # 计算精确率
        val_prec = tp / (tp + fp + 1e-8)

        # 计算召回率
        val_rec = tp / (tp + fn + 1e-8)

        # 计算F1分数
        val_f1 = 2 * (val_prec * val_rec) / (val_prec + val_rec + 1e-8)

        # 求epoch平均loss
        val_loss /= len(val_loader)

        print("vLoss:{:.3f}, vAccuracy:{:.3f}".format(val_loss, val_acc))

        # 保存最优模型
        if val_loss < best_loss:
            best_loss = loss
            torch.save(
                model.state_dict(),
                "./result/lost_{:.3f}_k{:.0f}_e{:.0f}.pth".format(
                    val_loss, fold_index, accumulate_epochs
                ),
            )

        if val_acc > best_accuracy:
            best_accuracy = val_acc
            torch.save(
                model.state_dict(),
                "./result/acc_{:.3f}_k{:.0f}_e{:.0f}.pth".format(
                    val_acc, fold_index, accumulate_epochs
                ),
            )

        if val_f1 > (best_f1 + 0.02) and val_f1 > 0.80:
            best_f1 = val_f1
            torch.save(
                model.state_dict(),
                "./result/f1_{:.3f}_k{:.0f}_e{:.0f}.pth".format(
                    val_f1, fold_index, accumulate_epochs
                ),
            )

        torch.save(model.state_dict(), "./result/latest.pth")

        # 画图
        plt_trainloss.append(train_loss)
        plt_trainacc.append(train_acc)
        plt_valloss.append(val_loss)
        plt_valacc.append(val_acc)
        plt_f1.append(val_f1)
        plt_epochs.append(accumulate_epochs)
        plot.plot_learning_curve(0, 0, plt_epochs, plt_trainloss, plt_valloss)
        plot.plot_accuracy_curve(0, 1, plt_epochs, plt_trainacc, plt_valacc)
        plot.plot_f1_epoch_score_curve(1, 0, plt_epochs, plt_f1)
        plot.plot_confusion_matrix(1, 1, tp, fp, tn, fn)
        plot.fresh()
        plot.save("./result/figure.png")

        accumulate_epochs += 1

# 验证

In [None]:
import os
from tqdm import tqdm
from utils_metrics import compute_mIoU, show_results
import glob
import numpy as np
import torch
import os
import cv2
from model.unet_model import UNet

from utils_files import *

def unet_predict(test_dir, pred_dir):
    if not os.path.exists(pred_dir):
        os.makedirs(pred_dir)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 加载网络，图片单通道，分类为1。
    net = UNet(n_channels=1, n_classes=1)

    # 将网络拷贝到deivce中
    net.to(device=device)

    # 加载模型参数
    net.load_state_dict(torch.load("unet_best_model.pth", map_location=device))

    # 测试模式
    net.eval()
    print("Load model done")

    img_names = os.listdir(test_dir)
    image_ids = [image_name.split(".")[0] for image_name in img_names]

    for image_id in tqdm(image_ids):
        # 获取文件列表
        image_path = os.path.join(test_dir, image_id + ".jpg")
        img = cv2.imread(image_path)
        origin_shape = img.shape

        # 转为灰度图
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        img = cv2.resize(img, (512, 512))

        # 转为batch为1，通道为1，大小为512*512的数组
        img = img.reshape(1, 1, img.shape[0], img.shape[1])

        # 转为tensor
        img_tensor = torch.from_numpy(img)

        # 将tensor拷贝到device中
        img_tensor = img_tensor.to(device=device, dtype=torch.float32)

        # 预测
        pred = net(img_tensor)

        # 提取结果
        pred[pred >= 0.5] = 255
        pred[pred < 0.5] = 0

        pred = np.array(pred.data.cpu()[0])[0]
        pred = cv2.resize(
            pred,
            (origin_shape[1], origin_shape[0]),
            interpolation=cv2.INTER_NEAREST,
        )

        cv2.imwrite(os.path.join(pred_dir, image_id + ".png"), pred)

    print("Get predict result done")


dataset_path = get_current_path() / "dataset" / "test"

unet_predict(test_dir=str(dataset_path / "images"), pred_dir=str(dataset_path / "pred"))

# 计算指标

In [None]:
from utils_metrics import compute_mIoU, show_results
from utils_files import *
dataset_path = get_current_path() / "dataset" / "prepare"

gt_dir = str(dataset_path / "mask")
pred_dir = str(dataset_path / "pred")
test_dir = str(dataset_path / "images")
result_dir = str(dataset_path / "result")

img_names = os.listdir(test_dir)
image_ids = [image_name.split(".")[0] for image_name in img_names]

num_classes = 2
name_classes = ["background", "potholes"]

print("Get mIoU")
print(gt_dir)
print(pred_dir)
print(num_classes)
print(name_classes)
hist, IoUs, PA_Recall, Precision = compute_mIoU(
    gt_dir, pred_dir, image_ids, num_classes, name_classes
)
# print(hist)
print("Get mIoU done.")
show_results(result_dir, hist, IoUs, PA_Recall, Precision, name_classes)

In [2]:
# 计算面积百分比
from utils_metrics import only_percentage
from utils_files import *

dataset_path = get_current_path() / "dataset" / "test"

gt_dir = str(dataset_path / "mask")
pred_dir = str(dataset_path / "pred")
test_dir = str(dataset_path / "images")
result_dir = str(dataset_path / "result")

img_names = os.listdir(test_dir)
image_ids = [image_name.split(".")[0] for image_name in img_names]

only_percentage(pred_dir, image_ids)
print("ok")

ok
