In [1]:
import numpy as np
import random
import torch
from basicsr.data.degradations import random_add_gaussian_noise_pt, random_add_poisson_noise_pt
from basicsr.utils import DiffJPEG, USMSharp
from basicsr.utils.img_process_util import filter2D
from torch.nn import functional as F
import data.realesrgan_dataset as dataset
import math
import cv2
import yaml
import os

logs = []

@torch.no_grad()
def generate_second_order_degradation_images(image, kernel1, kernel2, sinc_kernel, opt, jpeger_1, jpeger_2, usm_sharpener):
    """Generate ten images with second order degradations."""
    images = []
    for _ in range(10):
        # 用于记录参数的列表
        parameter_logs = []
        # 记录第一个模糊核的大小
        # First degradation process
        # blur
        out = filter2D(usm_sharpener(image), kernel1)
        # updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob'])[0]
        updown_type = random.choices(['down', 'keep'], opt['resize_prob'])[0]
        # resize
        # if updown_type == 'up':
        #     scale = np.random.uniform(1, opt['resize_range'][1])
        # el
        if updown_type == 'down':
            scale = np.random.uniform(opt['resize_range'][0], 1)
        else:
            scale = 1
        # mode = random.choice(['area', 'bilinear', 'bicubic'])
        mode = random.choice(['area', 'bilinear'])
        # 记录resize的参数
        parameter_logs.append({'resize_scale': scale, 'resize_mode': mode})
        out = F.interpolate(out, scale_factor=scale, mode=mode)
        # noise
        # gray_noise_prob = opt['gray_noise_prob']
        gray_noise_prob = 0
        # if np.random.uniform() < opt['gaussian_noise_prob']:
        # # if np.random.uniform() <= 1:
        #     out, sigma, grey = random_add_gaussian_noise_pt(
        #         out, sigma_range=opt['noise_range'], clip=True, rounds=False, gray_prob=gray_noise_prob)
        #     # 记录高斯噪声参数
        #     parameter_logs.append({'noise_type': 'gaussian', 'sigma': sigma.item(), 'gray_noise': grey.item()})
        # else:
        #     out, scale, grey = random_add_poisson_noise_pt(
        #         out,
        #         scale_range=opt['poisson_scale_range'],
        #         gray_prob=gray_noise_prob,
        #         clip=True,
        #         rounds=False)
        #     # 记录泊松噪声参数
        #     parameter_logs.append({'noise_type': 'poisson', 'scale': scale.item(), 'grey_noise': grey.item()})
        # JPEG compression
        jpeg_p = out.new_zeros(out.size(0)).uniform_(*opt['jpeg_range'])
        parameter_logs.append({'jpeg_quality': jpeg_p.item()})
        out = torch.clamp(out, 0, 1)
        # out = jpeger_1(out, quality=jpeg_p)

        # Second degradation process
        if np.random.uniform() < opt['second_blur_prob']:
            # 记录第二个模糊核的大小
            parameter_logs.append({'second_blur: True'})
            out = filter2D(out, kernel2)
        else:
            parameter_logs.append({'second_blur: False'})

        updown_type = random.choices(['up', 'down', 'keep'], opt['resize_prob2'])[0]
        if updown_type == 'up':
            scale = np.random.uniform(1, opt['resize_range2'][1])
        elif updown_type == 'down':
            scale = np.random.uniform(opt['resize_range2'][0], 1)
        else:
            scale = 1
        # mode = random.choice(['area', 'bilinear', 'bicubic'])
        mode = random.choice(['area', 'bilinear'])
        # 记录resize的参数
        parameter_logs.append({'resize_scale2': scale, 'resize_mode2': mode})
        out = F.interpolate(
            out, size=(int(image.size(2) / opt['scale'] * scale), int(image.size(3) / opt['scale'] * scale)), mode=mode)
        # gray_noise_prob = opt['gray_noise_prob2']
        # gray_noise_prob = 0
        # if np.random.uniform() < opt['gaussian_noise_prob2']:
        # # if np.random.uniform() <= 1:
        #     out, sigma, grey = random_add_gaussian_noise_pt(
        #         out, sigma_range=opt['noise_range2'], clip=True, rounds=False, gray_prob=gray_noise_prob)
        #     # 记录高斯噪声参数
        #     parameter_logs.append({'noise_type2': 'gaussian','sigma': sigma.item(), 'gray_noise': grey.item()})
        # else:
        #     out, scale, gray = random_add_poisson_noise_pt(
        #         out,
        #         scale_range=opt['poisson_scale_range2'],
        #         gray_prob=gray_noise_prob,
        #         clip=True,
        #         rounds=False)
        #     # 记录泊松噪声参数
        #     parameter_logs.append({'noise_type2': 'poisson','scale': scale.item(), 'gray_noise': gray.item()})

        if np.random.uniform() < 0.5:
            mode = random.choice(['area', 'bilinear', 'bicubic'])
            out = F.interpolate(out, size=(image.size(2) // opt['scale'], image.size(3) // opt['scale']), mode=mode)
            out = filter2D(out, sinc_kernel)

            jpeg_p = out.new_zeros(out.size(0)).uniform_(*opt['jpeg_range2'])
            jpeg_p[:] = 25
            out = torch.clamp(out, 0, 1)
            out = jpeger_2(out, quality=jpeg_p)
            parameter_logs.append({'final' : 'sinc_kernel + jepg', 'jpeg_quality2': jpeg_p.item()})
        else:
            jpeg_p = out.new_zeros(out.size(0)).uniform_(*opt['jpeg_range2'])
            # make jpeg fixed: 30
            jpeg_p[:] = 25
            out = torch.clamp(out, 0, 1)
            out = jpeger_2(out, quality=jpeg_p)

            # mode = random.choice(['area', 'bilinear', 'bicubic'])
            mode = random.choice(['area', 'bilinear'])
            out = F.interpolate(out, size=(image.size(2) // opt['scale'], image.size(3) // opt['scale']), mode=mode)
            out = filter2D(out, sinc_kernel)
            parameter_logs.append({'final' : 'jepg + sinc_kernel', 'jpeg_quality2': jpeg_p.item()})

        logs.append(parameter_logs)
        images.append(torch.clamp((out * 255.0).round(), 0, 255) / 255.)

    return images


def generate_kernels(opt):
    kernel_range = [2 * v + 1 for v in range(3, 11)]
    # Generate kernel1
    kernel_size = random.choice(kernel_range)
    if np.random.uniform() < opt['sinc_prob']:
        if kernel_size < 13:
            omega_c = np.random.uniform(np.pi / 3, np.pi)
        else:
            omega_c = np.random.uniform(np.pi / 5, np.pi)
        kernel1 = dataset.circular_lowpass_kernel(omega_c, kernel_size, pad_to=False)
    else:
        kernel1 = dataset.random_mixed_kernels(
            opt['kernel_list'],
            opt['kernel_prob'],
            kernel_size,
            opt['blur_sigma'],
            opt['blur_sigma'], [-math.pi, math.pi],
            opt['betag_range'],
            opt['betap_range'],
            noise_range=None)
    # pad kernel
    pad_size = (21 - kernel_size) // 2
    kernel1 = np.pad(kernel1, ((pad_size, pad_size), (pad_size, pad_size)))
    kernel1 = torch.tensor(kernel1, dtype=torch.float32)

    # Generate kernel2
    kernel_size = random.choice(kernel_range)
    if np.random.uniform() < opt['sinc_prob2']:
        if kernel_size < 13:
            omega_c = np.random.uniform(np.pi / 3, np.pi)
        else:
            omega_c = np.random.uniform(np.pi / 5, np.pi)
        kernel2 = dataset.circular_lowpass_kernel(omega_c, kernel_size, pad_to=False)
    else:
        kernel2 = dataset.random_mixed_kernels(
            opt['kernel_list2'],
            opt['kernel_prob2'],
            kernel_size,
            opt['blur_sigma2'],
            opt['blur_sigma2'], [-math.pi, math.pi],
            opt['betag_range2'],
            opt['betap_range2'],
            noise_range=None)
    # pad kernel
    pad_size = (21 - kernel_size) // 2
    kernel2 = np.pad(kernel2, ((pad_size, pad_size), (pad_size, pad_size)))
    kernel2 = torch.tensor(kernel2, dtype=torch.float32)

    # Generate sinc_kernel
    sinc_kernel = None
    if np.random.uniform() < opt['final_sinc_prob']:
        kernel_size = random.choice(kernel_range)
        omega_c = np.random.uniform(np.pi / 3, np.pi)
        sinc_kernel = dataset.circular_lowpass_kernel(omega_c, kernel_size, pad_to=21)
        sinc_kernel = torch.tensor(sinc_kernel, dtype=torch.float32)
    else:
        sinc_kernel = torch.zeros(21, 21, dtype=torch.float32)  # If not using final sinc kernel, initialize with zeros
    return kernel1.cuda(), kernel2.cuda(), sinc_kernel.cuda()

def generate_degraded_images(image, opt):
    # Generate kernels
    kernel1, kernel2, sinc_kernel = generate_kernels(opt['datasets']['train'])
    image = image.to(torch.device('cuda'))

    # Define DiffJPEG and USMSharp instances
    with open('D:\\Projects\\Real-ESRGAN\\options\\train_realesrnet_x4plus.yml', mode='r') as f:
        opt1 = yaml.load(f, Loader=yaml.FullLoader)
        diff_jpeg_1 = DiffJPEG(opt1['jpeg_range']).cuda()
        diff_jpeg_2 = DiffJPEG(opt1['jpeg_range2']).cuda()
    usm_sharpener = USMSharp().cuda()

    # Generate degraded images using the previously defined function
    degraded_images = generate_second_order_degradation_images(image, kernel1, kernel2, sinc_kernel, opt, diff_jpeg_1, diff_jpeg_2, usm_sharpener)

    return degraded_images

# 示例用法：
# 假设高质量图像为 high_quality_image，参数配置为 opt
image_path = "D:\\OCT_result_ssim_kw32_epoch196\\data_crop_contrast\\094146octscan_1.png"
high_quality_image = cv2.imread(image_path)
high_quality_image = cv2.cvtColor(high_quality_image, cv2.COLOR_BGR2RGB)
image = torch.FloatTensor(high_quality_image.transpose((2, 0, 1)).astype(np.float32) / 255.).unsqueeze(0)
with open('D:\\Projects\\Real-ESRGAN\\options\\train_realesrnet_x4plus.yml', mode='r') as f:
    opt = yaml.load(f, Loader=yaml.FullLoader)
    degraded_images = generate_degraded_images(image.cuda(), opt)

# 保存
for i in range(10):
    # 获取第 i 张图片的参数
    print("第", i+1, "张图片的参数：")
    parameters = logs[i]
    params_str = ""
    for j in range(len(parameters)):
        params_str += str(parameters[j]) + " "
        params_str += "\n"
    print(params_str)
    # 将张量转换为numpy数组，并调整维度顺序
    numpy_image = degraded_images[i].cpu().numpy().squeeze().transpose((1, 2, 0))
    # 调整范围到0-255
    numpy_image = (numpy_image * 255).round().astype(np.uint8)
    cv2.imshow(f"{i}", numpy_image)

    cv2.waitKey(0)
    # 保存图像
    cv2.imwrite(os.path.join("D:\\Projects\\Real-ESRGAN\\test_result", "degraded_image_" + params_str + ".png"), numpy_image)

cv2.destroyAllWindows()




第 1 张图片的参数：
{'resize_scale': 0.9360624185953617, 'resize_mode': 'bilinear'} 
{'jpeg_quality': 57.535797119140625} 
{'second_blur: False'} 
{'resize_scale2': 0.9782343395300537, 'resize_mode2': 'bilinear'} 
{'final': 'sinc_kernel + jepg', 'jpeg_quality2': 2.0} 

第 2 张图片的参数：
{'resize_scale': 0.9076729615407557, 'resize_mode': 'area'} 
{'jpeg_quality': 84.99238586425781} 
{'second_blur: True'} 
{'resize_scale2': 0.9041552378178989, 'resize_mode2': 'bilinear'} 
{'final': 'sinc_kernel + jepg', 'jpeg_quality2': 2.0} 

第 3 张图片的参数：
{'resize_scale': 0.6431820576466964, 'resize_mode': 'bilinear'} 
{'jpeg_quality': 71.3431625366211} 
{'second_blur: True'} 
{'resize_scale2': 0.9388386034078336, 'resize_mode2': 'bilinear'} 
{'final': 'jepg + sinc_kernel', 'jpeg_quality2': 2.0} 

第 4 张图片的参数：
{'resize_scale': 0.6526190680853625, 'resize_mode': 'bilinear'} 
{'jpeg_quality': 56.37245178222656} 
{'second_blur: True'} 
{'resize_scale2': 1.1617777448555529, 'resize_mode2': 'bilinear'} 
{'final': 'sinc_ker