In [11]:
import numpy as np
import os
import torch
import imageio as iio
from tqdm import tqdm
from torch.utils.data import DataLoader  # 导入 DataLoader
import torch.nn as nn  # 导入 nn 模块

from networks.cycleisp import Rgb2Raw
from dataloaders.data_rgb import get_rgb_data
from utils.noise_sampling import random_noise_levels_dnd, add_noise
import utils
from skimage.util import img_as_ubyte

# 参数设置
input_dir = '/data/C_zr/clip vib ai project/kaggle/input/SIDD_patches/val/groundtruth'  # 目录：干净RGB图像
result_dir = '/data/C_zr/clip vib ai project/kaggle/input/SIDD_patches/val/ISP_noise'   # 目录：保存带噪图像
weights = './pretrained_models/isp/rgb2raw.pth'  # 预训练模型路径
gpus = '0'  # 使用 GPU0

# 创建结果目录（如果不存在）
os.makedirs(os.path.join(result_dir, 'clean'), exist_ok=True)
os.makedirs(os.path.join(result_dir, 'noisy'), exist_ok=True)

# 设置使用的 GPU
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = gpus

# 加载数据
test_dataset = get_rgb_data(input_dir)
test_loader = DataLoader(dataset=test_dataset, batch_size=4, shuffle=False, num_workers=2, drop_last=False)

# 加载模型
model_rgb2raw = Rgb2Raw()
utils.load_checkpoint(model_rgb2raw, weights)
model_rgb2raw.cuda()
model_rgb2raw = nn.DataParallel(model_rgb2raw)
model_rgb2raw.eval()

# 处理图像
with torch.no_grad():
    for ii, data in enumerate(tqdm(test_loader), 0):
        rgb_gt = data[0].cuda()
        filenames = data[1]
        padh = data[2]
        padw = data[3]

        # 将干净RGB图像转换为RAW图像
        raw_gt = model_rgb2raw(rgb_gt)  # raw_gt 是 RGGB 格式
        raw_gt = torch.clamp(raw_gt, 0, 1)

        # 添加噪声到干净的RAW图像
        for j in range(raw_gt.shape[0]):
            filename = filenames[j]
            shot_noise, read_noise = random_noise_levels_dnd()
            shot_noise, read_noise = shot_noise.cuda(), read_noise.cuda()
            raw_noisy = add_noise(raw_gt[j], shot_noise, read_noise, use_cuda=True)
            raw_noisy = torch.clamp(raw_noisy, 0, 1)  # 归一化噪声图像
            variance = shot_noise * raw_noisy + read_noise

            # 去除填充并保存图像
            clean_packed = raw_gt[j]
            clean_packed = clean_packed[:, padh[j] // 2:-padh[j] // 2, padw[j] // 2:-padw[j] // 2]  # RGGB channels
            clean_unpacked = utils.unpack_raw(clean_packed.unsqueeze(0))  # 重新排列为Bayer模式
            clean_unpacked = clean_unpacked.squeeze().cpu().detach().numpy()
            iio.imwrite(os.path.join(result_dir, 'clean', filename[:-4] + '.png'), img_as_ubyte(clean_unpacked))

            noisy_packed = raw_noisy
            noisy_packed = noisy_packed[:, padh[j] // 2:-padh[j] // 2, padw[j] // 2:-padw[j] // 2]  # RGGB channels
            noisy_unpacked = utils.unpack_raw(noisy_packed.unsqueeze(0))  # 重新排列为Bayer模式
            noisy_unpacked = noisy_unpacked.squeeze().cpu().detach().numpy()
            iio.imwrite(os.path.join(result_dir, 'noisy', filename[:-4] + '.png'), img_as_ubyte(noisy_unpacked))

            variance_packed = variance[:, padh[j] // 2:-padh[j] // 2, padw[j] // 2:-padw[j] // 2]  # RGGB channels

            # 保存字典（噪声、原始图像等信息）
            dict_ = {}
            dict_['clean'] = clean_packed.cpu().detach().numpy()
            dict_['noisy'] = noisy_packed.cpu().detach().numpy()
            dict_['variance'] = variance_packed.cpu().detach().numpy()
            dict_['shot_noise'] = shot_noise.cpu().detach().numpy()
            dict_['read_noise'] = read_noise.cpu().detach().numpy()
            utils.save_dict(dict_, os.path.join(result_dir, 'pkl', filename[:-4] + '.pkl'))


100%|██████████| 320/320 [03:03<00:00,  1.75it/s]


In [None]:
import numpy as np
import os
import argparse
from tqdm import tqdm

import torch.nn as nn
import torch
from torch.utils.data import DataLoader  # 导入 DataLoader

import scipy.io as sio
from networks.cycleisp import Rgb2Raw, Raw2Rgb, CCM
from dataloaders.data_rgb import get_rgb_data
from utils.noise_sampling import random_noise_levels_dnd, add_noise
import utils

from skimage.util import img_as_ubyte
import imageio.v3 as iio  # 替换 lycon

# 参数设置
input_dir = '/data/C_zr/clip vib ai project/kaggle/input/SIDD_patches/train/groundtruth'  # 目录：干净RGB图像
result_dir = '/data/C_zr/clip vib ai project/kaggle/input/SIDD_patches/val/ISP_RGB'  # 目录：保存生成的图像
weights_rgb2raw = './pretrained_models/isp/rgb2raw_joint.pth'  # 预训练模型路径
weights_raw2rgb = './pretrained_models/isp/raw2rgb_joint.pth'  # 预训练模型路径
weights_ccm = './pretrained_models/isp/ccm_joint.pth'  # 预训练模型路径
gpus = '0'  # 使用 GPU0

# 创建结果目录（如果不存在）
os.makedirs(os.path.join(result_dir, 'clean'), exist_ok=True)
os.makedirs(os.path.join(result_dir, 'noisy'), exist_ok=True)

# 设置使用的 GPU
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = gpus

# 加载数据
test_dataset = get_rgb_data(input_dir)
test_loader = DataLoader(dataset=test_dataset, batch_size=4, shuffle=False, num_workers=2, drop_last=False)

# 加载模型
model_rgb2raw = Rgb2Raw()
model_ccm = CCM()
model_raw2rgb = Raw2Rgb()

utils.load_checkpoint(model_rgb2raw, weights_rgb2raw)
utils.load_checkpoint(model_ccm, weights_ccm)
utils.load_checkpoint(model_raw2rgb, weights_raw2rgb)

model_rgb2raw.cuda()
model_ccm.cuda()
model_raw2rgb.cuda()

model_rgb2raw = nn.DataParallel(model_rgb2raw)
model_ccm = nn.DataParallel(model_ccm)
model_raw2rgb = nn.DataParallel(model_raw2rgb)

model_rgb2raw.eval()
model_ccm.eval()
model_raw2rgb.eval()

# 处理图像
with torch.no_grad():
    for ii, data in enumerate(tqdm(test_loader), 0):
        rgb_gt = data[0].cuda()
        filenames = data[1]
        padh = data[2]
        padw = data[3]

        # 将干净RGB图像转换为RAW图像
        raw_gt = model_rgb2raw(rgb_gt)  # raw_gt 是 RGGB 格式
        raw_gt = torch.clamp(raw_gt, 0, 1)

        # 添加噪声到干净的RAW图像
        for j in range(raw_gt.shape[0]):
            filename = filenames[j]
            shot_noise, read_noise = random_noise_levels_dnd()
            shot_noise, read_noise = shot_noise.cuda(), read_noise.cuda()
            raw_noisy = add_noise(raw_gt[j], shot_noise, read_noise, use_cuda=True)
            raw_noisy = torch.clamp(raw_noisy, 0, 1)  # 归一化噪声图像

            ccm_tensor = model_ccm(rgb_gt[j].unsqueeze(0))
            rgb_noisy = model_raw2rgb(raw_noisy.unsqueeze(0), ccm_tensor)
            rgb_noisy = torch.clamp(rgb_noisy, 0, 1)

            rgb_noisy = rgb_noisy.permute(0, 2, 3, 1).squeeze().cpu().detach().numpy()
            rgb_clean = rgb_gt[j].permute(1, 2, 0).cpu().detach().numpy()

            rgb_clean = rgb_clean[padh[j]:-padh[j], padw[j]:-padw[j], :]
            rgb_noisy = rgb_noisy[padh[j]:-padh[j], padw[j]:-padw[j], :]

            # 使用 imageio.v3.imwrite 保存图片
            iio.imwrite(os.path.join(result_dir, 'clean', filename[:-4] + '.png'), img_as_ubyte(rgb_clean))
            iio.imwrite(os.path.join(result_dir, 'noisy', filename[:-4] + '.png'), img_as_ubyte(rgb_noisy))


  checkpoint = torch.load(weights)
  0%|          | 11/24000 [00:08<4:38:47,  1.43it/s]