In [2]:
# 8.1 用变分自编码器生成图像
# 8.1.2 变分自编码器
# 度量图像的相似度一般采用交叉熵（如nn.BCELoss），
# 度量两个分布的相似度一般采用KL散度（Kullback-Leibler divergence）。
# 这两个度量的和构成了整个模型的损失函数

# 定义重构损失函数及KL散度
import torch.nn.functional as F

reconst_loss = F.binary_cross_entropy(x_reconst, x, size_average=False)
kl_div = -0.5*torch.sum(1+log_var - mu.pow(2) - log_var.exp())
# 两者相加得到总损失
loss = reconst_loss + kl_div

NameError: name 'x_reconst' is not defined

In [1]:
# 8.1.3 用变分自编码器生成图像
# （1）导入必要的包
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torchvision.utils import save_image

In [2]:
# (2)定义一些超参数
image_size = 784
h_dim = 400
z_dim = 20
num_epochs = 30
batch_size = 128
learning_rate = 0.001

In [7]:
# (3)对数据集进行预处理，如转换为Tensor，将数据集转换成循环，可批量加载的数据集
# 下载MNIST训练集，如果需要下载，设置download=True将自动下载
dataset = torchvision.datasets.MNIST(root='data', train=True, transform=transforms.ToTensor(),download=False)
# 数据加载
data_loader = torch.utils.data.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)

In [4]:
# (4)构建AVE模型，主要由Encode和Decode两部分组成
# 定义AVE模型
class VAE(nn.Module):
    def __init__(self, image_size=784, h_dim=400, z_dim=20):
        super(VAE, self).__init__()
        self.fc1 = nn.Linear(image_size, h_dim)
        self.fc2 = nn.Linear(h_dim, z_dim)
        self.fc3 = nn.Linear(h_dim, z_dim)
        self.fc4 = nn.Linear(z_dim, h_dim)
        self.fc5 = nn.Linear(h_dim, image_size)
        
    def encode(self, x):
        h = F.relu(self.fc1(x))
        return self.fc2(h), self.fc3(h)

    # 用mu，log_var生成一个潜在空间点z，mu，log_var为两个统计参数，假设分布能生成图像
    def reparameterize(self, mu, log_var):
        std = torch.exp(log_var/2)
        eps = torch.randn_like(std)
        return mu + eps*std
    
    def decode(self,z):
        h = F.relu(self.fc4(z))
        return F.sigmoid(self.fc5(h))
    
    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        x_reconst = self.decode(z)
        return x_reconst, mu, log_var

In [6]:
# (5) 选择GPU及优化器
# 设置PyTorch在哪块GPU上运行
#torch.cuda.set_device(1)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VAE().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [9]:
# 训练模型，同时保存原图像与随机生成的图像
with torch.no_grad():
    # 保存采样图像，即潜在向量z通过解码器生成新的图像
    z = torch.randn(batch_size, z_dim).to(device)
    out = model.decode(z).view(-1,1,28,28)
    save_image(out, os.path.join('./ave_samples', 'sampled-{}.png').format(epoch+1))
    # 保存重构图像，即原图像通过解码器生成的图像
    out, _, _ = model(x)
    x_concat = torch.cat([x.view(-1,1,28,28), out.view(-1,1,28,28)], dim=3)
    save_image(x_concat, os.path.join(sample_dir, 'reconst-{}.png'.format(epoch+1)))

NameError: name 'epoch' is not defined

In [10]:
# (7) 展示图像及重构图像
reconsPath = './ave_samples/reconst-30.png'
Image = mpimg.imread(reconsPath)
plt.imshow(Image) # 显示图像
plt.axis('off') # 不显示坐标轴
plt.show()

NameError: name 'mpimg' is not defined

In [11]:
# (8)显示由潜在空间点Z生成的新图像
genPath = './ave_samples/sampled-30.png'
Image = mpimg.imread(genPath)
plt.imshow(Image) # 显示图像
plt.axis('off') # 不显示坐标轴
plt.show()

NameError: name 'mpimg' is not defined

In [12]:
# 8.2 GAN简介
# 定义判别器对真图像的损失函数
outputs = D(images)
d_loss_real = criterion(outputs, real_labels)
real_score = outputs
# 定义判别器对假图像（即由潜在空间点生成的图像）的损失函数
z = torch.randn(batch_size, latent_size).to(device)
fake_images = G(z)
outputs = D(fake_images)
d_loss_fake = criterion(outputs, fake_labels)
fake_score = outputs
# 得到判别器总的损失函数
d_loss = d_loss_real + d_loss_fake

NameError: name 'D' is not defined

In [13]:
# 生成器的损失函数如何定义，才能使其越来越向真图像靠近？以真图像为标杆或标签即可
z = torch.randn(batch_size, latent_size).to(device)
fake_images = G(z)
outputs = D(fake_images)
g_loss = criterion(outputs, real_labels)

NameError: name 'latent_size' is not defined

In [None]:
# 8.3用GAN生成图像
# 8.3.1 判别器
# 定义判别器网络结构，这里使用LeakyReLU为激活函数，输出一个节点并经过Sigmoid后输出，用于真假二分类

# 构建判别器
D = nn.Sequential(
    nn.Linear(image_size, hidden_size),
    nn.LeakyReLU(0.2),
    nn.Linear(hidden_size, hidden_size),
    nn.LeakyReLU(0.2),
    nn.Linear(hidden_size, 1),
    nn.Sigmoid())

In [None]:
# 8.3.2 生成器
# 生成器与AVE的生成器类似，不同的地方是输出为nn.tanh，
# 使用nn.tanh将使数据分布在[-1,1]之间。其输入是潜在空间的向量z，输出维度与真图像相同

# 构建生成器，相当于AVE中的解码器
G = nn.Sequential(
    nn.Linear(latent_size, hidden_size),
    nn.ReLU(),
    nn.Linear(hidden_size, hidden_size),
    nn.ReLU(),
    nn.Linear(hidden_size, image_size),
    nn.Tanh())

In [None]:
# 8.3.3 训练模型
for epoch in range(num_epochs):
    for i, (images, _) in enumerate(data_loader):
        images = images.reshape(batch_size, -1).to(device)
        # 定义图像是真或假的标签
        real_labels = torch.ones(batch_size, 1).to(device)
        fake_labels = torch.zeros(batch_size, 1).to(device)
        # 训练判别器
        # 定义判别器对真图像的损失函数
        outputs = D(images)
        d_loss_real = criterion(outputs, real_labels)
        real_score = outputs
        # 定义判别器对假图像（即由潜在空间点生成的图像）的损失函数
        z = torch.randn(batch_size, latent_size).to(device)
        fake_images = G(z)
        outputs = D(fake_images)
        d_loss_fake = criterion(outputs, fake_labels)
        fake_score = outputs
        # 得到判别器总的损失函数
        d_loss = d_loss_real + d_loss_fake
        # 对生成器，判别器的梯度清零
        reset_grad()
        d_loss.backward()
        # 在计算模型中所有张量的梯度后,调用optimizer.step()会使优化器迭代它应该更新的所有参数(张量),
        # 并使用它们内部存储的grad来更新它们的值.
        """
        optimizer.step()需要放在每一个batch训练中，而不是epoch训练中，
        这是因为现在的mini-batch训练模式是假定每一个训练集就只有mini-batch这样大，
        因此实际上可以将每一次mini-batch看做是一次训练，一次训练更新一次参数空间，因而optimizer.step()放在这里。

        scheduler.step（）按照Pytorch的定义是用来更新优化器的学习率的，一般是按照epoch为单位进行更换，
        即多少个epoch后更换一次学习率，因而scheduler.step()放在epoch这个大循环下
        """
        if (i+1)%200==0:
            print('Epoch[{}/{}],step[{}/{}],d_loss: {:.4f},g_loss: {:.4f}, D(x): {:.2f}, D(G(z)): {:.2f}'
                 .format(epoch, num_epochs, i+1, total_step, d_loss.item(), g_loss.item(),real_score.mean().item(),
                         fake_score.mean().item()))
        # 保存真图像
        if(epoch+1) == 1:
            images = images.reshape(images.size(0), 1, 28, 28)
            save_image(denorm(images), os.path.join(sample_dir, 'real_images.png'))
            # 保存假图像
            fake_images = fake_images.reshape(fake_images.size(0), 1, 28, 28)
            save_image(denorm(fake_images), os.path.join(sample_dir, 'fake_image-{}.png'.format(epoch+1)))
        # 保存模型
        torch.save(G.state_dict(), 'G.ckpt')
        torch.save(D.state_dict(), 'D.ckpt')

In [None]:
# 8.3.4 可视化结果
reconsPath = './gan_samples/fake_images-200.png'
Image = mpimg.imread(reconsPath)
plt.imshow(Image) # 显示图片
plt.axis('off') # 不显示坐标轴
plt.show()

In [14]:
# 8.5 ConditionGAN
# 8.5.2 CGAN生成器
# 定义生成器（Generator）及前向传播函数
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.label_emp = nn.Embedding(10, 10)
        self.model = nn.Sequential(
            nn.Linear(110,256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512,1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(1024, 784),
            nn.Tanh()
        )
        """
        inplace=True的意思是进行原地操作，
        例如x=x+5，对x就是一个原地操作，y=x+5,x=y，完成了与x=x+5同样的功能但是不是原地操作，
        上面LeakyReLU中的inplace=True的含义是一样的，
        是对于Conv2d这样的上层网络传递下来的tensor直接进行修改，好处就是可以节省运算内存，不用多储存变量y
        """
    
    def forward(self, z, labels):
        z = z.view(z.size(0), 100)
        c = self.label_emb(labels)
        x = torch.cat([z,c], 1)
        out = self.model(x)
        return out.view(x.size(0), 28, 28)

In [None]:
# 8.5.3 CGAN判别器
# 定义判别器及前向传播函数
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.label_emb = nn.Embedding(10, 10)
        self.model = nn.Sequential(
            nn.Linear(794,1024),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512,256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Dropout(0.4)
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x, labels):
        x = x.view(x.size(0), 784)
        c = self.label_emb(labels)
        x = torch.cat([x,c], 1)
        out = self.model(x)
        return out.squeeze()

In [None]:
# 8.5.4 CGAN损失函数
# 定义判别器对真实图像的损失函数
real_validity = D(images, labels)
d_loss_real = criterion(real_validity, real_labels)
# 定义判别器对假图像（即由潜在空间点生成的图像）的损失函数
z = torch.randn(batch_size, 100).to(device)
fake_labels = torch.randint(0,10,(batch_size,)).to(device)
fake_images = G(z, fake_labels)
fake_validity = D(fake_images, fake_labels)
d_loss_fake = criterion(fake_validity, torch.zeros(batch_size).to(device))
# CGAN总的损失值
d_loss = d_loss_real + d_loss_fake

In [None]:
# 8.5.5 CGAN可视化
# 利用网格（10×10）的形式显示指定条件下生成的图像
from torchvision.utils import make_grid
z = torch.randn(100, 100).to(device)
labels = torch.LongTensor([i for i in range(10) for _ in range(10)]).to(device)
images = G(z, labels).unsqueeze(1)
grid = make_grid(images, nrow=10, normalize=True)
fig, ax = plt.subplots(figsize=(10, 10))
ax.imshow(grid.permute(1,2,0).detach().cpu.numpy(), cmap='binary')
ax.axis('off')

In [None]:
# 8.5.6 查看指定标签的数据
# 可视化指定单个数字条件下生成的数字
def generate_digit(generator, digit):
    z = torch.randn(1,100).to(device)
    label = torch.LongTensor([digit]).to(device)
    img = generator(z, label).detach().cpu()
    img = 0.5*img+0.5
    return transforms.ToPILImage()(img)

generate_digit(G, 8)

In [None]:
# 8.5.7 可视化损失值
# 记录判别器、生成器的损失值代码
writer.add_scalars('scalars', {'g_loss':g_loss, 'd_loss':d_loss}, step)

In [None]:
# 8.6 DCGAN
# 使用卷积层的判别器及使用转置卷积的生成器的一个具体代码

# (1)使用卷积层，批规范层的判别器
class Discrimibator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.main = nn.Sequential(
            # 输入大致为（nc）*64*64，nc表示通道数
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)
            nn.LeakyReLU(0.2, inplace=True),
            # ndf 表示判别器特征图的大小
            nn.Conv2d(ndf, ndf*2, 4, 2, 1, bias=False),
            nn.BatchNorma2d(ndf*2),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf*2, ndf*4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf*4),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf*4, ndf*8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf*8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(ndf*8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )
    
    def forward(self, input):
        return self.main(input)

In [None]:
# (2)使用转置卷积，批规范层的生成器
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.main = nn.Sequential(
            # 输入Z， nz表示Z的大小
            nn.ConvTranspose2d(nz, ngf*8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf*8),
            nn.ReLU(True),
            # ngf为生成器特征图大小
            nn.ConvTranspose2d(ngf*8, ngf*4, 4, 2, 1, bias=False)
            nn.BatchNorm2d(ngf*4),
            nn.ReLU(True),
            # state size. (ngf*4)*8*8
            nn.ConvTranspose2d(ngf*4, ngf*2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf*2),
            nn.ReLU(True),
            # state size. (ngf*2)*16*16
            nn.ConvTranspose2d(ngf*2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            # nc为通道数
            nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
        )
    def forward(self, input):
        return self.main(input)