# 生成式对抗网络(GAN)

In [1]:
import pandas as pd
import numpy as np
from pandas import DataFrame
from datetime import timedelta
from numpy import ndarray
from typing import Union, List, Dict
from sklearn.preprocessing import MinMaxScaler
from ultralytics import YOLO
import cv2
from PIL import Image
import os
import json
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
import torch.nn.functional as F
from tqdm import tqdm  # 打印进度条
import math
import matplotlib.pyplot as plt
from torchvision import datasets, transforms, models
from torchvision.models import vgg16
import torchvision.utils as vutils
import seaborn as sns
from typing import List
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset
from sklearn.metrics import r2_score, mean_squared_error
import joblib
import warnings

warnings.filterwarnings("ignore")
plt.rcParams['font.sans-serif'] = ['SimHei']  # 显示中文标签
plt.rcParams['axes.unicode_minus'] = False
%matplotlib inline

## 基本概念

生成式对抗网络迫使生成图像与真实图像在统计上几乎无法区别，从而生成相当逼真的合成图像。GAN由一个生成器网络(generator)和一个判别式网络(discriminator)组成。判别器的训练目的是能够区分生成器的输出与来自训练集的真实图像，生成器的训练目的是欺骗判别器。生成器从未直接见过训练集中的图像，它所知道的关于数据的信息都来自于判别器。

生成器生成假数据，然后将生成的假数据和真数据都输入判别器，判别器要判断出哪些是真的哪些是假的。判别器第一次判别出来的肯定有很大的误差，然后我们根据误差来优化判别器。现在判别器水平提高了，生成器生成的数据很难再骗过判别器了，所以我们得反过来优化生成器，之后生成器水平提高了，然后反过来继续训练判别器，判别器水平又提高了，再反过来训练生成器，就这样循环往复，直到达到纳什均衡。

**生成网络的损失函数：**
$$L_G=H(1,D(G(z)))$$
上式中，$G$ 代表生成网络，$D $代表判别网络，$H$ 代表交叉熵，$z$ 是输入随机数据。$D(G(z))$是对生成数据的判断概率，1代表数据绝对真实，0代表数据绝对虚假。$H(1,D(G(z)))$代表判断结果与1的距离。显然生成网络想取得良好的效果，那就要做到，让判别器将生成数据判别为真数据（即$D(G(z))$与1的距离越小越好）。

**判别网络的损失函数：**
$$L_D=H(1,D(x))+H(0,D(G(z)))$$
上式中，$x$是真实数据，这里要注意的是，$H(1,D(x))$代表真实数据与1的距离，$H(0,D(G(z)))$代表生成数据与0的距离。显然，识别网络要想取得良好的效果，那么就要做到，在它眼里，真实数据就是真实数据，生成数据就是虚假数据（即真实数据与1的距离小，生成数据与0的距离小）。
 

**理想的损失趋势**
- 生成器的损失 (Generator Loss)：

    - 初期较高，随着训练的进行逐渐下降。
    - 稳定后维持在一个较低水平，但不是接近 0，因为生成器需要不断与判别器竞争。
- 判别器的损失 (Discriminator Loss)：

    - 初期较低，表示判别器能够轻松区分真实样本和生成样本。
    - 随着生成器的改进，判别器的损失逐渐上升，趋于约 0.5（随机猜测的水平）。
  
两者在一个理想的平衡点上达到动态稳定：生成器和判别器互相逼近彼此的最优性能。

**优化原理**：生成网络和判别网络有了损失函数，就可以基于各自的损失函数，利用误差反向传播（Backpropagation）(BP)反向传播算法和最优化方法(如梯度下降法)来实现参数的调整），不断提高生成网络和判别网络的性能（最终生成网络和判别网络的成熟状态就是学习到了合理的映射函数）。

### DCGAN

DCGAN（Deep Convolutional Generative Adversarial Network，深度卷积生成对抗网络）是一种生成对抗网络（GAN）的变体，由 Radford 等人在 2015 年提出，专注于利用卷积神经网络（CNN）改进 GAN 的生成能力。它是 GAN 在生成高质量图像任务上的重要发展。

**DCGAN 的特点**  
- 卷积神经网络架构

    - 在生成器和判别器中均使用卷积神经网络（CNN）架构，而非全连接网络。
    - 卷积操作有助于捕获局部特征，从而生成更加真实和细节丰富的图像。
- 批归一化（Batch Normalization）

    - 在生成器和判别器的每一层中都加入批归一化，这有助于稳定训练过程，并加快收敛速度。
- 不使用全连接层

    - DCGAN 避免使用全连接层，特别是在生成器中。这减少了参数量，使得模型更高效。
- ReLU 和 LeakyReLU 激活函数

    - 在生成器中，使用 ReLU（Rectified Linear Unit）作为主要激活函数，输出层除外（输出层使用 tanh 激活函数）。
    - 判别器使用 LeakyReLU 激活函数，使得负梯度部分也能得到一些梯度信号。
- 输出范围归一化

    - DCGAN 的生成器输出范围通过 tanh 激活函数归一化到 [−1,1]，使训练更加稳定。

**DCGAN 的结构**
- 生成器（Generator）

    - 输入：随机噪声向量 z（通常从标准正态分布中采样）。
    - 通过一系列的转置卷积（Transposed Convolution 或 Fractionally-Strided Convolution）逐步将低维向量扩展为高分辨率图像。
    - 最终输出目标分辨率的生成图像。
- 判别器（Discriminator）

    - 输入：真实图像或生成图像。
    - 通过卷积层提取图像特征，最终输出一个概率值，用于判断输入图像是真实的还是生成的。

**DCGAN 的缺点**
- 训练不稳定

    - GAN 类模型本身训练容易不稳定，生成器和判别器需要精细调节。
    - 容易出现模式崩塌（Mode Collapse）问题，即生成器总是生成相似的图像。
- 难以处理高分辨率图像

    - **虽然相比普通 GAN 有较大改进，但 DCGAN 在生成非常高分辨率图像时仍有困难**。
- 对超参数敏感

    - 网络深度、学习率、批量大小等超参数对生成效果影响显著。

## 深度卷积生成对抗网络

### 数据准备

In [2]:
# 加载图像并生成批次数据
def generator(data_path, batch_size):
    """
    读取图像，并生成批次数据

    参数说明
    ----------
    data_path : {str}
        图像文件夹地址
    batch_size : {int} 
        输入数据的批次大小，正整数

    返回值
    -------
    data_loader : {torch.utils.data.dataloader.DataLoader}
        数据加载器，[批次，目标，特征时间编码，目标时间编码]
    """
    # 定义图像变换操作
    transform = transforms.Compose([
        transforms.Resize((32, 32)),         # 调整图像大小
        transforms.ToTensor(),                  # 转换为张量
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) # 标准化
    ])
    
    dataset = datasets.CIFAR10(root=data_path, download=True, transform=transform)
    # 筛选标签为4的索引
    indices = [i for i, (_, label) in enumerate(dataset) if label == 4]
    # 创建只包含标签为4数据的子集
    dataset = Subset(dataset, indices)
    
    # dataset = datasets.ImageFolder(root=data_path, transform=transform)
    print(f"图像个数：{len(dataset)}, 尺寸：{dataset[0][0].shape}")
    
    # 数据加载器
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    # 查看一个批次数据
    images, labels = next(iter(dataloader))
    print(f"图像批次大小: {images.shape}")
    print(f"标签批次大小: {labels.shape}")
    print(f"图像批次个数: {len(dataloader)}")

    return dataloader

In [3]:
# 训练集
params1 = {
    "data_path": "../../../../../data/02.cv/cifar-10/",
    "batch_size": 128,
}
print("训练集：")
data_loader = generator(**params1)

训练集：
Files already downloaded and verified
图像个数：5000, 尺寸：torch.Size([3, 32, 32])
图像批次大小: torch.Size([128, 3, 32, 32])
标签批次大小: torch.Size([128])
图像批次个数: 40


### 模型定义

In [56]:
# 生成器（Generator）
class Generator(nn.Module):
    def __init__(self, latent_dim, channels):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            # 输入：潜在向量 latent_dim -> 特征图 (512, 4, 4)
            nn.ConvTranspose2d(latent_dim, 512, kernel_size=4, stride=1, padding=0),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),

            # 上采样：512 -> 256, (4, 4) -> (8, 8)
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),

            # 上采样：256 -> 128, (8, 8) -> (16, 16)
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),

            # 上采样：128 -> channels, (16, 16) -> (32, 32)
            nn.ConvTranspose2d(128, channels, kernel_size=4, stride=2, padding=1),
            # nn.BatchNorm2d(64),
            # nn.ReLU(inplace=True),

            # # 输出层：64 -> channels (图像通道数)，(32, 32)
            # nn.ConvTranspose2d(64, channels, kernel_size=3, stride=1, padding=1),
            nn.Tanh()  # 将输出限制在 [-1, 1]
        )

    def forward(self, z):
        # 输入 z 的形状为 (batch_size, latent_dim)
        # 需要将 z 变为 (batch_size, latent_dim, 1, 1) 才能输入转置卷积
        z = z.view(z.size(0), z.size(1), 1, 1)
        return self.model(z)

# 判别器（Discriminator）
class Discriminator(nn.Module):
    def __init__(self, channels):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            # 输入：图像通道数 channels -> 特征图 64, (32, 32) -> (16, 16)
            nn.Conv2d(channels, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),

            # 下采样：64 -> 128, (16, 16) -> (8, 8)
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),

            # 下采样：128 -> 256, (8, 8) -> (4, 4)
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),

            # 下采样：256 -> 1, (4, 4)
            nn.Conv2d(256, 1, kernel_size=4, stride=1, padding=0),
            # nn.BatchNorm2d(512),
            # nn.LeakyReLU(0.2, inplace=True),

            # # 输出层：512 -> 1
            # nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0),
            nn.Sigmoid()  # 输出范围在 [0, 1]
        )

    def forward(self, img):
        # 输入 img 的形状为 (batch_size, channels, height, width)
        return self.model(img).view(-1, 1)  # 展平为 (batch_size, 1)

### 模型训练

In [57]:
def train(train_args, generator_args, discriminator_args):
    # 参数配置
    generator_name = train_args['generator_name']
    discriminator_name = train_args['discriminator_name']
    data_loader = train_args['data_loader']
    n_epochs = train_args['n_epochs']
    learning_rate = train_args['learning_rate']
    lradj = train_args['lradj']
    model_path = train_args['model_path']
    image_path = train_args['image_path']
    verbose = train_args['verbose']
    plots = train_args['plots']
    device = train_args['device']
    clip_value = train_args['clip_value']
    loss = train_args['loss']
    latent_dim = generator_args['latent_dim']
    channels = generator_args['channels']
    noise_level = train_args.get('noise_level', 0.1)  # 默认噪声强度

    # 检查可用 device
    device = torch.device(device)

    # 添加噪声
    def add_noise(images, noise_level=0.1):
        noise = torch.randn_like(images) * noise_level
        noisy_images = images + noise
        return noisy_images.clamp(0.0, 1.0)  # 保证像素值在 [0, 1] 范围内

    # 调整学习率
    def adjust_learning_rate(optimizer, epoch, lradj, learning_rate, train_epochs):
        if lradj == 'type1':
            lr_adjust = {epoch: learning_rate * (0.5 ** ((epoch - 1) // 1))}
        elif lradj == 'type2':
            lr_adjust = {
                2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
                10: 5e-7, 15: 1e-7, 20: 5e-8
            }
        elif lradj == "cosine":
            lr_adjust = {epoch: learning_rate / 2 * (1 + math.cos(epoch / train_epochs * math.pi))}
        if epoch in lr_adjust.keys():
            lr = lr_adjust[epoch]
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr
            print('Updating learning rate to {}'.format(lr))

    # 设置保存模型路径
    if not os.path.exists(model_path):
        os.makedirs(model_path)

    # 定义感知损失
    class PerceptualLoss(nn.Module):
        def __init__(self, feature_layer_idx=9):
            super(PerceptualLoss, self).__init__()
            vgg = vgg16(pretrained=True).features[:feature_layer_idx].eval()
            for param in vgg.parameters():
                param.requires_grad = False
            self.vgg = vgg.to(device)

        def forward(self, x, y):
            x_features = self.vgg(x)
            y_features = self.vgg(y)
            loss = nn.functional.mse_loss(x_features, y_features)
            return loss
    perceptual_loss_fn = PerceptualLoss().to(device)

    # 定义模型和损失函数
    generator = generator_name(**generator_args).to(device)
    discriminator = discriminator_name(**discriminator_args).to(device)
    optimizer_G = optim.Adam(generator.parameters(), lr=learning_rate, betas=(0.5, 0.999))
    optimizer_D = optim.Adam(discriminator.parameters(), lr=learning_rate, betas=(0.5, 0.999))
    criterion = loss

    # 损失记录
    G_losses, D_losses = [], []

    for epoch in tqdm(range(n_epochs)):
        total_G_loss = 0
        total_D_loss = 0
        for i, (imgs, _) in enumerate(data_loader):
            imgs = imgs.to(device)
            noisy_real_imgs = add_noise(imgs, noise_level=noise_level)

            # 标签平滑
            real_labels = torch.full((imgs.size(0), 1), 0.9).to(device)
            fake_labels = torch.full((imgs.size(0), 1), 0.1).to(device)

            # 生成假图像并添加噪声
            z = torch.randn(imgs.size(0), latent_dim).to(device)
            fake_imgs = generator(z)
            noisy_fake_imgs = add_noise(fake_imgs, noise_level=noise_level)

            # 判别器损失
            real_loss = criterion(discriminator(noisy_real_imgs), real_labels) # 判别器对真实图像的损失
            fake_loss = criterion(discriminator(noisy_fake_imgs.detach()), fake_labels) # 判别器对假图像的损失
            loss_D = real_loss + fake_loss

            optimizer_D.zero_grad()
            loss_D.backward()
            # torch.nn.utils.clip_grad_norm_(discriminator.parameters(), clip_value)
            optimizer_D.step()

            # 生成器损失：判别器损失 + 感知损失
            adv_loss = criterion(discriminator(noisy_fake_imgs), real_labels)
            perc_loss = perceptual_loss_fn(noisy_fake_imgs, noisy_real_imgs)
            loss_G = adv_loss + 0.1 * perc_loss  # 调整感知损失的权重，生成器希望生成的图像被判别为真实

            optimizer_G.zero_grad()
            loss_G.backward()
            # torch.nn.utils.clip_grad_norm_(generator.parameters(), clip_value)
            optimizer_G.step()

            # 记录损失
            total_G_loss += loss_G.item()
            total_D_loss += loss_D.item()

        # 计算每个 epoch 的平均损失
        avg_G_loss = total_G_loss / len(data_loader)
        avg_D_loss = total_D_loss / len(data_loader)
        G_losses.append(avg_G_loss)
        D_losses.append(avg_D_loss)

        print(f'Epoch [{epoch}/{n_epochs}], Generator Loss: {avg_G_loss:.4f}, Discriminator Loss: {avg_D_loss:.4f}')

        # 保存生成的图像
        fake_imgs = fake_imgs.view(fake_imgs.size(0), channels, latent_dim, latent_dim)
        vutils.save_image(fake_imgs[:25], f"{image_path}/output_epoch_{epoch+1}.png", nrow=5, normalize=True)

        # 调整学习率
        adjust_learning_rate(optimizer_G, epoch + 1, lradj, learning_rate, n_epochs)
        adjust_learning_rate(optimizer_D, epoch + 1, lradj, learning_rate, n_epochs)

    # 绘制损失曲线
    def plot_loss(G_losses, D_losses):
        plt.figure(figsize=(10, 5))
        plt.style.use('seaborn-v0_8-paper')
        plt.grid(axis='y', linewidth=0.35)
        plt.plot(G_losses, linestyle='-', color='#11b3b6')
        plt.plot(D_losses, linestyle='-', color='#f14643')
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Training Progress")
        plt.legend(["Generator", "Discriminator"])
        plt.show()

    if plots:
        plot_loss(G_losses, D_losses)

    return generator

In [None]:
# 构造参数字典
params2 = {
    "train_args": {
        "generator_name": Generator,
        "discriminator_name": Discriminator,
        "data_loader": data_loader,
        "n_epochs": 100,
        "patience": 50,
        "learning_rate": 0.0002,
        "lradj": 'cosine',
        "model_path": "../outputs/best_models/DCGAN",
        "image_path": "../outputs/images",
        "device": 'cuda',
        "loss": nn.BCELoss(),
        "verbose": True,
        "plots": True,
        "clip_value": 0.1,
    },
    "generator_args": {
        'latent_dim': 32,
        'channels': 3,
    },
    "discriminator_args": {
        'channels': 3,
    },
}
generator = train(**params2)

  1%|▊                                                                              | 1/100 [03:35<5:55:55, 215.71s/it]

Epoch [0/100], Generator Loss: 3.2411, Discriminator Loss: 0.9668
Updating learning rate to 0.00019995065603657316
Updating learning rate to 0.00019995065603657316


### 图像生成