In [None]:
import torch
from torch.utils.data import Dataset
import json
import os
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from PIL import Image, ImageFilter
from torchvision.transforms import ToPILImage
import matplotlib.pyplot as plt
import torch
from torch import nn
import torchvision
import math
import torch.backends.cudnn as cudnn
import torch
from torch import nn
from torchvision.utils import make_grid
from torch.utils.tensorboard import SummaryWriter
import os
from math import log10

In [None]:

class SRDataset(Dataset):

    def __init__(self, data_path, crop_size, scaling_factor):
        """
        :参数 data_path: 图片文件夹路径
        :参数 crop_size: 高分辨率图像裁剪尺寸  （实际训练时不会用原图进行放大，而是截取原图的一个子块进行放大）
        :参数 scaling_factor: 放大比例
        """

        self.data_path=data_path
        self.crop_size = int(crop_size)
        self.scaling_factor = int(scaling_factor)
        self.images_path=[]

        # 如果是训练，则所有图像必须保持固定的分辨率以此保证能够整除放大比例
        # 如果是测试，则不需要对图像的长宽作限定

        # 读取图像路径
        for name in os.listdir(self.data_path):
            self.images_path.append(os.path.join(self.data_path,name))

        # 数据处理方式
        self.pre_trans=transforms.Compose([
                                # transforms.CenterCrop(self.crop_size),
                                transforms.RandomCrop(self.crop_size),
                                transforms.RandomHorizontalFlip(p=0.5),
                                transforms.RandomVerticalFlip(p=0.5),
                                # transforms.ColorJitter(brightness=0.3, contrast=0.3, hue=0.3),
                                ])

        self.input_transform = transforms.Compose([
                                transforms.Resize(self.crop_size//self.scaling_factor),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.5],std=[0.5]),
                                ])

        self.target_transform = transforms.Compose([
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.5],std=[0.5]),
                                ])


    def __getitem__(self, i):
        # 读取图像
        img = Image.open(self.images_path[i], mode='r')
        img = img.convert('RGB')
        img=self.pre_trans(img)

        lr_img = self.input_transform(img)
        hr_img = self.target_transform(img.copy())
        

        return lr_img, hr_img


    def __len__(self):
        return len(self.images_path)

In [None]:

# 是可以保持w，h不变的
class ConvolutionalBlock(nn.Module):
    """
    卷积模块,由卷积层, BN归一化层, 激活层构成.
    """

    def __init__(self, in_channels, out_channels, kernel_size, stride=1, batch_norm=False, activation=None):
        """
        :参数 in_channels: 输入通道数
        :参数 out_channels: 输出通道数
        :参数 kernel_size: 核大小
        :参数 stride: 步长
        :参数 batch_norm: 是否包含BN层
        :参数 activation: 激活层类型; 如果没有则为None
        """
        super(ConvolutionalBlock, self).__init__()

        if activation is not None:
            activation = activation.lower()
            assert activation in {'prelu', 'leakyrelu', 'tanh'}

        # 层列表
        layers = list()

        # 1个卷积层
        layers.append(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride,
                      padding=kernel_size // 2))

        # 1个BN归一化层
        if batch_norm is True:
            layers.append(nn.BatchNorm2d(num_features=out_channels))

        # 1个激活层
        if activation == 'prelu':
            layers.append(nn.PReLU())
        elif activation == 'leakyrelu':
            layers.append(nn.LeakyReLU(0.2))
        elif activation == 'tanh':
            layers.append(nn.Tanh())

        # 合并层
        self.conv_block = nn.Sequential(*layers)

    def forward(self, input):
        output = self.conv_block(input)

        return output



# w,h放大
class SubPixelConvolutionalBlock(nn.Module):

    def __init__(self, kernel_size=3, n_channels=64, scaling_factor=2):
        super(SubPixelConvolutionalBlock, self).__init__()

        # 首先通过卷积将通道数扩展为 scaling factor^2 倍
        self.conv = nn.Conv2d(in_channels=n_channels, out_channels=n_channels * (scaling_factor ** 2),
                              kernel_size=kernel_size, padding=kernel_size // 2)

        # 进行像素清洗，合并相关通道数据 放大了图像
        self.pixel_shuffle = nn.PixelShuffle(upscale_factor=scaling_factor)
        # 最后添加激活层
        self.prelu = nn.PReLU()


    def forward(self, input):

        output = self.conv(input)
        output = self.pixel_shuffle(output)  
        output = self.prelu(output) 

        return output



class ResidualBlock(nn.Module):
    """
    残差模块, 包含两个卷积模块和一个跳连.
    """

    def __init__(self, kernel_size=3, n_channels=64):
        """
        :参数 kernel_size: 核大小
        :参数 n_channels: 输入和输出通道数（由于是ResNet网络，需要做跳连，因此输入和输出通道数是一致的）
        """
        super(ResidualBlock, self).__init__()

        # 第一个卷积块
        self.conv_block1 = ConvolutionalBlock(in_channels=n_channels, out_channels=n_channels, kernel_size=kernel_size,
                                              batch_norm=True, activation='PReLu')

        # 第二个卷积块
        self.conv_block2 = ConvolutionalBlock(in_channels=n_channels, out_channels=n_channels, kernel_size=kernel_size,
                                              batch_norm=True, activation=None)

    def forward(self, input):
        """
        前向传播.

        :参数 input: 输入图像集，张量表示，大小为 (N, n_channels, w, h)
        :返回: 输出图像集，张量表示，大小为 (N, n_channels, w, h)
        """
        residual = input  # (N, n_channels, w, h)
        output = self.conv_block1(input)  # (N, n_channels, w, h)
        output = self.conv_block2(output)  # (N, n_channels, w, h)
        output = output + residual  # (N, n_channels, w, h)

        return output


class SRResNet(nn.Module):

    def __init__(self, large_kernel_size=9, small_kernel_size=3, n_channels=64, n_blocks=16, scaling_factor=2):
        """
        :参数 large_kernel_size: 第一层卷积和最后一层卷积核大小
        :参数 small_kernel_size: 中间层卷积核大小
        :参数 n_channels: 中间层通道数
        :参数 n_blocks: 残差模块数
        :参数 scaling_factor: 放大比例
        """
        super(SRResNet, self).__init__()

        # 放大比例必须为 2、 4 或 8
        scaling_factor = int(scaling_factor)
        assert scaling_factor in {2, 4, 8}, "放大比例必须为 2、 4 或 8!"

        # 第一个卷积块
        self.conv_block1 = ConvolutionalBlock(in_channels=3, out_channels=n_channels, kernel_size=large_kernel_size,
                                              batch_norm=False, activation='PReLu')


        # 一系列残差模块, 每个残差模块包含一个跳连接
        self.residual_blocks = nn.Sequential(
            *[ResidualBlock(kernel_size=small_kernel_size, n_channels=n_channels) for i in range(n_blocks)])


        # 第二个卷积块
        self.conv_block2 = ConvolutionalBlock(in_channels=n_channels, out_channels=n_channels,
                                              kernel_size=small_kernel_size,
                                              batch_norm=True, activation=None)

        # 放大通过子像素卷积模块实现, 每个模块放大两倍 log2 2=1
        n_subpixel_convolution_blocks = int(math.log2(scaling_factor))
        self.subpixel_convolutional_blocks = nn.Sequential(
            *[SubPixelConvolutionalBlock(kernel_size=small_kernel_size, n_channels=n_channels, scaling_factor=2) for i
              in range(n_subpixel_convolution_blocks)])

        # 最后一个卷积模块
        self.conv_block3 = ConvolutionalBlock(in_channels=n_channels, out_channels=3, kernel_size=large_kernel_size,
                                              batch_norm=False, activation='Tanh')

    def forward(self, lr_imgs):
        """
        :参数 lr_imgs: 低分辨率输入图像集, 张量表示，大小为 (N, 3, w, h)
        :返回: 高分辨率输出图像集, 张量表示， 大小为 (N, 3, w * scaling factor, h * scaling factor)
        """
        output = self.conv_block1(lr_imgs)  # (16, 3, 24, 24)
        residual = output  # (16, 64, 24, 24)
        output = self.residual_blocks(output)  # (16, 64, 24, 24)
        output = self.conv_block2(output)  # (16, 64, 24, 24)
        output = output + residual  # (16, 64, 24, 24)
        output = self.subpixel_convolutional_blocks(output)  # (16, 64, 24 * 2, 24 * 2)
        sr_imgs = self.conv_block3(output)  # (16, 3, 24 * 2, 24 * 2)

        return sr_imgs


In [None]:
train_path='G:/DIV2K/DIV2K/DIV2K_train_HR'
test_path='../../SRCNN/SRCNN-WBQ/data/test'

crop_size = 224      # 高分辨率图像裁剪尺寸
scaling_factor = 2  # 放大比例

# 模型参数
large_kernel_size = 9   # 第一层卷积和最后一层卷积的核大小
small_kernel_size = 3   # 中间层卷积的核大小
n_channels = 64         # 中间层通道数
n_blocks = 16           # 残差模块数量

# 学习参数
checkpoint = '../model/rsresnet.pth'   # 预训练模型路径，如果不存在则为None
batch_size = 30    # 批大小
start_epoch = 1     # 轮数起始位置
epochs = 300        # 迭代轮数
workers = 1        # 工作线程数
lr = 1e-3           # 学习率

# 先前的psnr
pre_psnr=0

# 设备参数
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# print(device)

ngpu = 1

# cudnn.benchmark = True # 对卷积进行加速

# writer = SummaryWriter('../log') # 实时监控     使用命令 tensorboard --logdir runs  进行查看


if os.path.exists(checkpoint):
    model = torch.load(checkpoint).to(device)
    print('加载先前模型成功')
else:
    print('未加载原有模型训练')
    model = SRResNet(large_kernel_size=large_kernel_size,
                    small_kernel_size=small_kernel_size,
                    n_channels=n_channels,
                    n_blocks=n_blocks,
                    scaling_factor=scaling_factor)

# 初始化优化器
optimizer = torch.optim.Adam(params=filter(lambda p: p.requires_grad, model.parameters()),lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.9)

model = model.to(device)
criterion = nn.MSELoss().to(device)

train_dataset = SRDataset(train_path, crop_size, scaling_factor)
test_dataset = SRDataset(test_path, crop_size, scaling_factor)

train_loader = torch.utils.data.DataLoader(train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=workers,
    pin_memory=True) 

test_loader = torch.utils.data.DataLoader(test_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=workers,
    pin_memory=True)

    # 开始逐轮训练
for epoch in range(start_epoch, epochs+1):

    model.train()  # 训练模式：允许使用批样本归一化
    train_loss=0
    n_iter = len(train_loader)

    # 按批处理
    for i, (lr_imgs, hr_imgs) in enumerate(train_loader):
        lr_imgs = lr_imgs.to(device)
        hr_imgs = hr_imgs.to(device)

        sr_imgs = model(lr_imgs)
        loss = criterion(sr_imgs, hr_imgs)  
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss+=loss.item()

        # print('ga')

    epoch_loss_train=train_loss / n_iter

    # tensorboard
    # writer.add_scalar('SRResNet/MSE_Loss_train', epoch_loss_train, epoch)

    print(f"Epoch {epoch}. Training loss: {epoch_loss_train}")



    model.eval()  # 测试模式
    test_loss=0
    all_psnr = 0
    n_iter = len(test_loader)
    # print(n_iter)
    # print(len(test_dataset))
    with torch.no_grad():
        for i, (lr_imgs, hr_imgs) in enumerate(test_loader):
            lr_imgs = lr_imgs.to(device)
            hr_imgs = hr_imgs.to(device)

            sr_imgs = model(lr_imgs)
            loss = criterion(sr_imgs, hr_imgs)

            psnr = 10 * log10(1 / loss.item())
            all_psnr+=psnr
            test_loss+=loss.item()
    
    epoch_loss_test=test_loss/n_iter
    epoch_psnr=all_psnr / n_iter

    # tensorboard
    # writer.add_scalar('SRResNet/MSE_Loss_test', epoch_loss_test, epoch)
    # writer.add_scalar('SRResNet/test_psnr', epoch_psnr, epoch)

    print(f"Epoch {epoch}. Testing loss: {epoch_loss_test}")
    print(f"Average PSNR: {epoch_psnr} dB.")

    if epoch_psnr>pre_psnr:
        torch.save(model, checkpoint)
        pre_psnr=epoch_psnr
        print('模型更新成功')

    scheduler.step()
    # 打印当前学习率
    print('当前学习率为：',end=' ')
    print(optimizer.state_dict()['param_groups'][0]['lr'])

    print('*'*50)
# 训练结束关闭监控
# writer.close()




