In [1]:
# @Time    : 2022/9/25
# @Function: 用pytorch实现一个最简单的GAN，用MNIST数据集生成新图片

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter
import os
import shutil
from tqdm import tqdm


# 判别器，判断一张图片来源于真实数据集的概率，输入0-1之间的数，数值越大表示数据来源于真实数据集的概率越高。
class Discriminator(nn.Module):
    def __init__(self, img_dim):
        super().__init__()
        self.disc = nn.Sequential(
            nn.Linear(in_features=img_dim, out_features=128),
            nn.LeakyReLU(0.01),
            nn.Linear(128, 1),
            nn.Sigmoid(),  # 将输出值映射到0-1之间
        )

    def forward(self, x):
        return self.disc(x)


# 生成器,用随机噪声生成图片
class Generator(nn.Module):
    def __init__(self, noise_dim, img_dim):
        super().__init__()
        self.gen = nn.Sequential(
            nn.Linear(noise_dim, 256),
            nn.LeakyReLU(0.01),
            nn.Linear(256, img_dim),
            nn.Tanh(),
            # normalize inputs to [-1, 1] so make outputs [-1, 1]
            # 一般二分类问题中，隐藏层用Tanh函数，输出层用Sigmod函数
        )

    def forward(self, x):
        return self.gen(x)


if __name__ == '__main__':
    device = "cuda" if torch.cuda.is_available() else "cpu"
    lr = 3e-4
    noise_dim = 50  # noise
    image_dim = 28 * 28 * 1  # 784
    batch_size = 32
    num_epochs = 200

    # dataset
    transforms = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5), (0.5))])
    dataset = datasets.MNIST(root="dataset/", transform=transforms, download=True)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    fixed_noise = torch.randn((batch_size, noise_dim)).to(device)

    D = Discriminator(image_dim).to(device)
    G = Generator(noise_dim, image_dim).to(device)
    opt_disc = optim.Adam(D.parameters(), lr=lr)
    opt_gen = optim.Adam(G.parameters(), lr=lr)
    criterion = nn.BCELoss()     # 二分类交叉熵损失函数

    # 存放log的文件夹
    log_dir = "test-record"
    if (os.path.exists(log_dir)):
        shutil.rmtree(log_dir)
    writer = SummaryWriter(log_dir)

    for epoch in tqdm(range(num_epochs), desc='epochs'):
        # GAN不需要真实label
        for batch_idx, (img, _) in enumerate(loader):
            img = img.view(-1, 784).to(device)
            batch_size = img.shape[0]

            # 训练判别器: max log(D(x)) + log(1 - D(G(z)))
            noise = torch.randn(batch_size, noise_dim).to(device)
            fake_img = G(noise)    # 根据随机噪声生成虚假数据
            disc_fake = D(fake_img)    # 判别器判断生成数据为真的概率
            # torch.zeros_like(x) 表示生成与 x 形状相同、元素全为0的张量
            lossD_fake = criterion(disc_fake, torch.zeros_like(disc_fake))    # 虚假数据与0计算损失
            disc_real = D(img)    # 判别器判断真实数据为真的概率
            lossD_real = criterion(disc_real, torch.ones_like(disc_real))     # 真实数据与1计算损失
            lossD = (lossD_real + lossD_fake) / 2

            D.zero_grad()
            lossD.backward(retain_graph=True)
            opt_disc.step()

            # 训练生成器: 在此过程中将判别器固定，min log(1 - D(G(z))) <-> max log(D(G(z))
            output = D(fake_img)
            lossG = criterion(output, torch.ones_like(output))
            G.zero_grad()
            lossG.backward()
            opt_gen.step()

            if batch_idx == 0:
                # print( f"Epoch [{epoch+1}/{num_epochs}]  Batch {batch_idx}/{len(loader)}   lossD = {lossD:.4f}, lossG = {lossG:.4f}")
                with torch.no_grad():
                    # 用固定的噪声数据生成图像，以对比经过不同epoch训练后的生成器的生成能力
                    fake_img = G(fixed_noise).reshape(-1, 1, 28, 28)
                    real_img = img.reshape(-1, 1, 28, 28)

                    # make_grid的作用是将若干幅图像拼成一幅图像
                    img_grid_fake = torchvision.utils.make_grid(fake_img, normalize=True)
                    img_grid_real = torchvision.utils.make_grid(real_img, normalize=True)

                    writer.add_image("Fake Images", img_grid_fake, global_step=epoch)
                    writer.close()
                    writer.add_image("Real Images", img_grid_real, global_step=epoch)
                    writer.close()
                    writer.add_scalar(tag="lossD", scalar_value=lossD, global_step=epoch)
                    writer.close()
                    writer.add_scalar(tag="lossG", scalar_value=lossG, global_step=epoch)
                    writer.close()


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to dataset/MNIST\raw\train-images-idx3-ubyte.gz


 90%|████████▉ | 8911504/9912422 [00:02<00:00, 3181132.56it/s]





RuntimeError: File not found or corrupted.

In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# 定义生成器模型
class Generator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Generator, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = torch.tanh(self.fc3(x))
        return x

# 定义判别器模型
class Discriminator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.activation = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x

# 定义超参数
input_size = 100 # 噪声向量维度
hidden_size = 128 # 隐藏层维度
output_size = 1 # 判别结果维度
num_epochs = 2000
batch_size = 64
learning_rate = 0.0002

# 初始化模型和优化器
G = Generator(input_size, hidden_size, output_size)
D = Discriminator(output_size, hidden_size, output_size)
criterion = nn.BCELoss()
optimizer_G = torch.optim.Adam(G.parameters(), lr=learning_rate)
optimizer_D = torch.optim.Adam(D.parameters(), lr=learning_rate)

# 数据集，这里使用正态分布做噪声向量
def sample_noise(batch_size, input_size):
    return torch.randn(batch_size, input_size)

# 训练GAN模型
for epoch in tqdm(range(num_epochs), desc='epoches'):
    for i in range(batch_size):
        # 训练判别器
        D.zero_grad()

        real_data = torch.ones(batch_size, output_size) # 真实数据样本标签都为1
        fake_data = torch.zeros(batch_size, output_size) # 生成数据样本标签都为0

        # 对真实数据计算损失
        real_outputs = D(real_data)
        d_loss_real = criterion(real_outputs, torch.ones_like(real_outputs))

        # 对生成数据计算损失
        z = sample_noise(batch_size, input_size)
        fake_inputs = G(z)
        fake_outputs = D(fake_inputs)
        d_loss_fake = criterion(fake_outputs, torch.zeros_like(fake_outputs))

        # 反向传播更新判别器的参数
        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        optimizer_D.step()

        # 训练生成器
        G.zero_grad()
        z = sample_noise(batch_size, input_size)
        fake_inputs = G(z)
        fake_outputs = D(fake_inputs)

        # 计算生成器的损失
        g_loss = criterion(fake_outputs, torch.ones_like(fake_outputs))

        # 反向传播更新生成器的参数
        g_loss.backward()
        optimizer_G.step()

    # 打印损失信息和生成图像
    if (epoch+1) % 500 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Discriminator Loss: {d_loss.item():.4f}, Generator Loss: {g_loss.item():.4f}")

        z = sample_noise(16, input_size)
        samples = G(z).detach().numpy()

        fig, axs = plt.subplots(4, 4, figsize=(4, 4))
        for i in range(4):
            for j in range(4):
                axs[i, j].imshow(np.reshape(samples[i*4+j], (28, 28)), cmap='gray')
                axs[i, j].axis('off')
        plt.show()

In [3]:
import torch.nn as nn
from torchvision import transforms
from torchvision import datasets
from torchvision.utils import  save_image
import torch.autograd
from torch.autograd import Variable
import os

In [5]:
#创建文件夹
if not os.path.exists('./img'):
    os.mkdir('./img')

In [6]:
def to_img(x):
    out = 0.5*(x+1)
    out = out.clamp(0,1)#Clamp函数可以将随机变化的数值限制在一个给定的区间[min, max]内：
    out = out.view(-1, 1, 28, 28)#view()函数作用是将一个多行的Tensor,拼接成一行
    return out

In [7]:
batch_size = 128 #一批128个
num_epoch = 100 #总共100批
z_dimension = 100 #噪音维度

In [8]:
#图形的处理过程
img_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5,0.5,0.5),std=(0.5,0.5,0.5))
])

In [10]:
#mnist dataset mnist数据集下载
mnist = datasets.MNIST(
    root='./data/mnist/', train=True, transform = img_transform, download = True
)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./data/mnist/MNIST\raw\train-images-idx3-ubyte.gz


 89%|████████▉ | 8864123/9912422 [00:00<00:00, 16204655.20it/s]







RuntimeError: File not found or corrupted.

In [None]:
#data loader 数据载入(批次读取)
dataloader = torch.utils.data.DataLoader(
    dataset = mnist, batch_size = batch_size, shuffle = True
)

In [None]:
#将图片28x28展开成784，然后通过多层感知器，中间经过斜率设置为0.2的LeakyReLU激活函数，
# 最后接sigmoid激活函数得到一个0到1之间的概率进行二分类。
class discriminator(nn.Module):
    def __init__(self):
        super(discriminator,self).__init__()
        self.dis = nn.Sequential(
            nn.Linear(784,256),#输入特征数为784，输出为256
            nn.LeakyReLU(0.2),#进行非线性映射
            nn.Linear(256,256),#进行一个线性映射
            nn.LeakyReLU(0.2),
            nn.Linear(256,1),
            nn.Sigmoid()#也是一个激活函数，二分类问题中，
            # sigmoid可以班实数映射到【0,1】，作为概率值，
            # 多分类用softmax函数
        )
    def forward(self, x):
        x = self.dis(x)
        return x

In [None]:
#输入一个100维的0～1之间的高斯分布，然后通过第一层线性变换将其映射到256维,
# 然后通过LeakyReLU激活函数，接着进行一个线性变换，再经过一个LeakyReLU激活函数，
# 然后经过线性变换将其变成784维，最后经过Tanh激活函数是希望生成的假的图片数据分布
# 能够在-1～1之间。
class generator(nn.Module):
    
    def __init__(self):
        super(generator, self).__init__()
        self.gen = nn.Sequential(
            nn.Linear(100, 256), #用线性变换将输入映射到256维
            nn.ReLU(True),       #relu激活
            nn.Linear(256, 256), #线性变换
            nn.ReLU(True),       #relu激活
            nn.Linear(256, 784), #线性变换
            nn.Tanh()            #Tanh激活使得生成数据分布在【-1,1】之间
        )
 
    def forward(self, x):
        x = self.gen(x)
        return x

In [None]:
#创建对象
D = discriminator()
G = generator()
if torch.cuda.is_available():
    D = D.cuda()
    G = G.cuda()

In [None]:
#########判别器训练train#####################
#分为两部分：1、真的图像判别为真；2、假的图像判别为假
#此过程中，生成器参数不断更新
 
#首先需要定义loss的度量方式  （二分类的交叉熵）
#其次定义 优化函数,优化函数的学习率为0.0003
criterion = nn.BCELoss() #是单目标二分类交叉熵函数
d_optimizer = torch.optim.Adam(D.parameters(),lr=0.0003)
g_optimizer = torch.optim.Adam(G.parameters(),lr=0.0003)

In [None]:
###########################进入训练##判别器的判断过程#####################
 
for epoch in range(num_epoch): #进行多个epoch的训练
    for i,(img, _) in enumerate(dataloader):
        num_img = img.size(0)
        # view()函数作用是将一个多行的Tensor,拼接成一行
        # 第一个参数是要拼接的tensor,第二个参数是-1
        # =============================训练判别器==================
        img = img.view(num_img, -1)  # 将图片展开为28*28=784
        real_img = Variable(img)  # 将tensor变成Variable放入计算图中
        real_label = Variable(torch.ones(num_img))  # 定义真实的图片label为1
        fake_label = Variable(torch.zeros(num_img))  # 定义假的图片的label为0
 
        # 计算真实图片的损失
        real_out = D(real_img)  # 将真实图片放入判别器中
        d_loss_real = criterion(real_out, real_label)  # 得到真实图片的loss
        real_scores = real_out  # 得到真实图片的判别值，输出的值越接近1越好
 
        # 计算假的图片的损失
        z = Variable(torch.randn(num_img, z_dimension))  # 随机生成一些噪声
        fake_img = G(z)  # 随机噪声放入生成网络中，生成一张假的图片
        fake_out = D(fake_img)  # 判别器判断假的图片
        d_loss_fake = criterion(fake_out, fake_label)  # 得到假的图片的loss
        fake_scores = fake_out  # 得到假图片的判别值，对于判别器来说，假图片的损失越接近0越好
 
        # 损失函数和优化
        d_loss = d_loss_real + d_loss_fake #损失包括判真损失和判假损失
        d_optimizer.zero_grad()  # 在反向传播之前，先将梯度归0
        d_loss.backward()  # 将误差反向传播
        d_optimizer.step()  # 更新参数
 
        # ==================训练生成器============================
        ################################生成网络的训练###############################
        # 原理：目的是希望生成的假的图片被判别器判断为真的图片，
        # 在此过程中，将判别器固定，将假的图片传入判别器的结果与真实的label对应，
        # 反向传播更新的参数是生成网络里面的参数，
        # 这样可以通过更新生成网络里面的参数，来训练网络，使得生成的图片让判别器以为是真的
        # 这样就达到了对抗的目的
 
        # 计算假的图片的损失
 
        z = Variable(torch.randn(num_img, z_dimension))  # 得到随机噪声
        fake_img = G(z) #随机噪声输入到生成器中，得到一副假的图片
        output = D(fake_img)  # 经过判别器得到的结果
        g_loss = criterion(output, real_label)  # 得到的假的图片与真实的图片的label的loss
 
        # bp and optimize
        g_optimizer.zero_grad()  # 梯度归0
        g_loss.backward()  # 进行反向传播
        g_optimizer.step()  # .step()一般用在反向传播后面,用于更新生成网络的参数
 
        #打印中间的损失
        if (i+1)%100 == 0:
            print('Epoch[{}/{}],d_loss:{:.6f},g_loss:{:.6f} '
                  'D real: {:.6f},D fake: {:.6f}'.format(
                epoch,num_epoch,d_loss.item(),g_loss.item(),
                real_scores.data.mean(),fake_scores.data.mean()  #打印的是真实图片的损失均值
            ))
 
        if epoch == 0:
            real_images=to_img(real_img.cpu().data)
            save_image(real_images, './img/real_images.png')
 
        fake_images = to_img(real_img.cpu().data)
        save_image(fake_images, './img/fake_images-{}.png'.format(epoch+1))

In [None]:
torch.save(G.state_dict(),'./generator.pth')
torch.save(D.state_dict(),'./discriminator.pth')

In [1]:
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

logger.info('This is a log info')
logger.debug('Debugging')
logger.warning('Warning exists')
logger.info('Finish')

2023-06-25 15:05:11,996 - __main__ - INFO - This is a log info
2023-06-25 15:05:11,997 - __main__ - INFO - Finish


In [None]:
import 