In [1]:
import os
from PIL import Image
import torchvision.transforms as T
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# 设置设备
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

# 转换为 PyTorch Tensor
transform = T.Compose([
    T.ToTensor(),  # 将图像数据转换为 PyTorch tensor，范围在 [0, 1]
])

# 数据路径
main_folder_noisy = "fbp_phantom_liver_slice_512"
main_folder_1 = "fbp_phantom_liver_slice_400_1"
main_folder_2 = "fbp_phantom_liver_slice_400_2"
output_folder = "clean_liver_phantom"

# 检查并创建输出文件夹
os.makedirs(output_folder, exist_ok=True)

# 模型定义
class network(nn.Module):
    def __init__(self, n_chan, chan_embed=48):
        super(network, self).__init__()
        self.act = nn.LeakyReLU(negative_slope=0.2, inplace=True)
        self.conv1 = nn.Conv2d(n_chan, chan_embed, 3, padding=1)
        self.conv2 = nn.Conv2d(chan_embed, chan_embed, 3, padding=1)
        self.conv3 = nn.Conv2d(chan_embed, n_chan, 1)

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = self.conv3(x)
        return x

# 定义损失函数
def mse(gt: torch.Tensor, pred: torch.Tensor) -> torch.Tensor:
    return F.mse_loss(gt, pred)

def loss_func(img1, img2, model):
    noisy1, noisy2 = img1, img2
    pred1 = noisy1 - model(noisy1)
    pred2 = noisy2 - model(noisy2)
    loss_res = 0.5 * (mse(noisy1, pred2) + mse(noisy2, pred1))
    loss_cons = mse(pred1, pred2)
    loss = loss_res + loss_cons
    return loss

# 训练函数
def train(model, optimizer, img1, img2):
    loss = loss_func(img1, img2, model)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss.item()

# 去噪函数
def denoise(model, noisy_img):
    with torch.no_grad():
        pred = torch.clamp(noisy_img - model(noisy_img), 0, 1)
    return pred

# 超参数设置
max_epoch = 3000      # training epochs
lr = 0.001            # learning rate
step_size = 1000      # number of epochs at which learning rate decays
gamma = 0.6           # factor by which learning rate decays

# 循环处理每个 slice
for idx in tqdm(range(1, 257)):  # 001 到 256，共 256 张切片
    # 构造文件名
    slice_id = f"{idx:03d}"  # 格式化为三位数，例如 001, 002, ..., 256
    image_path_noisy = os.path.join(main_folder_noisy, f"Phantom_liver_slice_{slice_id}.png")
    image_path_1 = os.path.join(main_folder_1, f"Phantom_liver_slice_{slice_id}.png")
    image_path_2 = os.path.join(main_folder_2, f"Phantom_liver_slice_{slice_id}.png")

    # 检查文件是否存在
    if not (os.path.exists(image_path_noisy) and os.path.exists(image_path_1) and os.path.exists(image_path_2)):
        print(f"Warning: Missing slice {slice_id}. Skipping.")
        continue

    # 加载图像并转换为 PyTorch Tensor
    image_noisy = Image.open(image_path_noisy).convert("RGB")
    image_1 = Image.open(image_path_1).convert("RGB")
    image_2 = Image.open(image_path_2).convert("RGB")

    noisy_img = transform(image_noisy).unsqueeze(0).to(device)  # 增加 batch 维度
    img1 = transform(image_1).unsqueeze(0).to(device)
    img2 = transform(image_2).unsqueeze(0).to(device)

    # 模型初始化
    n_chan = noisy_img.shape[1]
    model = network(n_chan).to(device)

    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

    # 训练模型
    for epoch in range(max_epoch):
        train(model, optimizer, img1, img2)
        scheduler.step()

    # 去噪图像
    denoised_img = denoise(model, noisy_img)

    # 保存去噪图像
    denoised = denoised_img.cpu().squeeze(0).permute(1, 2, 0).numpy()  # 张量转 NumPy
    denoised = (denoised * 255).astype('uint8')  # 转换为 uint8
    output_path = os.path.join(output_folder, f"Phantom_liver_slice_{slice_id}.png")
    Image.fromarray(denoised).save(output_path)

print("所有切片处理完成。")

  from .autonotebook import tqdm as notebook_tqdm
100%|██████████| 256/256 [24:58<00:00,  5.85s/it]

所有切片处理完成。



