# Homework 2：

⾃动编码器模型（AE）；变分⾃动编码器模型（VAE）；条件变分⾃动编码器模型（Conditional VAE）

并将模型应⽤到MNIST 数据上对⼿写体数字进⾏降维、聚类、⽣成和指定⽣成⼯作。

## 1. FC layer实现AE、VAE、CVAE & 画图展示

### 1.1 AE、VAE、CVAE架构搭建
 
#### 1.1.1 通用部件Encoder & Decoder的实现


In [73]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Encoder实现

condition_size = 10 #10个数字，用一个10维向量作为CVAE的条件（0-9十个数字，依label给出one-hot编码）


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def onehot(label): # CVAE为了要和input（1*28*28）可以concatenate，需要扩展成3维的(后续还有batch_size就变成三维)
    label = label.unsqueeze(1)
    label = label.unsqueeze(2)
    #print(label.size(0))
    exp_vec = torch.zeros(label.size(0), 1, condition_size).to(device)
    # 【新创建的，和已在cuda上tensor无关，不知道应该放在哪里！！！】
    exp_vec.scatter_(2, label, 1) # 根据label，对exp_vec进行操作，填写到维度为1的对应位置
    return exp_vec

class Encoder(nn.Module):
    def __init__(self, x_dim, hidden_size, latent_size, arch="AE", **kwargs) -> None:
        super(Encoder, self).__init__()
        self.mu = nn.Sequential(nn.Linear(x_dim, hidden_size), nn.ReLU(), nn.Linear(hidden_size, latent_size),)
        self.arch = arch
        if self.arch != "AE":  # 若encoder返回的是均值与标准差(VAE、CVAE), 需要额外生成mu和sigma（两架构可以一样）
            self.sigma = nn.Sequential(nn.Linear(x_dim, hidden_size), nn.ReLU(), nn.Linear(hidden_size, latent_size),)

        if self.arch == "CVAE":
            self.mu = nn.Sequential(nn.Linear(x_dim+condition_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, latent_size),)
            self.sigma = nn.Sequential(nn.Linear(x_dim+condition_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, latent_size),)

    def forward(self, xs, label=None):# 只有CVAE才会传入label，xs是输入的图片，784*1*batchsize
        # 实现编码器的forward过程，arch架构的不同取值意味着我们需要不同输出的encoder

        if self.arch == "AE":
            output = self.mu(xs) # AE的输出，只有一个code
        elif self.arch == "VAE":# VAE的输出，有mu和sigma两个输出
            mu = self.mu(xs)
            sigma = self.sigma(xs)
            output = (mu,sigma)
        else: # CVAE的输出，有mu和sigma两个，但是输入还有希望生成数据的label！先扩展、再concatenate起来
            label = onehot(label)
            # print("dim_image",xs.shape)
            # print("label_image",label.shape)
            xs= torch.cat((xs, label), dim=-1)
            # xs = xs.to(device)     # Bug：不用等号赋值，就没办法移动到cuda上！
            mu = self.mu(xs)
            sigma = self.sigma(xs)
            output = (mu,sigma)
        return output

In [74]:

# Decoder的实现
# 不同的自编码器可能需要不同的输入, 借助**kwargs来处理.
# Review：**收集所有未匹配的关键字参数组成一个dict对象，局部变量kwargs指向此dict对象
# 对于AE的情况, 只需要zs作为输入即可,
# 对于VAE,CVAE的情况, 我们可能需要code服从分布的均值与方差(对于CVAE, 还需要类别的指示变量)

# 在实现遇到迷茫的时候, 不妨考虑具体的自编码器类需要什么样的encoder, decoder.
class Decoder(nn.Module):
    def __init__(self, x_dim, hidden_size, latent_size, arch="AE", **kwargs) -> None:
        super(Decoder, self).__init__()
        self.arch = arch
        if self.arch == "AE":
            self.decoder = nn.Sequential(nn.Linear(latent_size, hidden_size), nn.ReLU(), 
            nn.Linear(hidden_size, x_dim),)
        elif self.arch == "VAE": # 需要传入encoder的均值和方差 
            self.decoder = nn.Sequential(nn.Linear(latent_size, hidden_size), nn.ReLU(), 
            nn.Linear(hidden_size, x_dim),)
        elif self.arch == "CVAE":
            self.decoder = nn.Sequential(nn.Linear(latent_size+condition_size, hidden_size), nn.ReLU(), 
            nn.Linear(hidden_size, x_dim),)  

    def forward(self, zs,label=None, **otherinputs): # 【【此时的zs已经是经过处理过的，已经是N(mu,sigma^2)中sample出来的值了】】
        # 实现decoder的decode部分,此时就假定直接输出图片了，不再生成mu和sigma再sample
        if self.arch == "AE":
            output = self.decoder(zs)
        elif self.arch == "VAE":
            output = self.decoder(zs)
        elif self.arch == "CVAE":
            label = onehot(label)
            zs = torch.cat((zs, label), dim=-1)
            output = self.decoder(zs)
        return output

#### 1.1.2 基于通用部件的AE、VAE、CVAE实现

In [75]:
class AE(nn.Module):
    def __init__(self, encoder, decoder, **kwargs) -> None:
        super(AE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

class VAE(nn.Module): # 要同时返回中间参数mu和sigma的，用于后续计算kl loss
    def __init__(self,encoder,decoder,**kwargs) -> None:
        super(VAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self,x):
        mu,sigma = self.encoder(x) # 【encoder的forward函数返回了两个参数！！！】
        # epsilon = torch.randn(mu) # 生成了很多个正态分布！！！mu是一个latent_size大小的，每一个分量是一个正态参数
        # z = mu + epsilon*sigma
        # 【更优！加入随机噪声！！！对encoder输出进行简单处理，且保证是正数！】
        std = torch.exp(0.5 * sigma)
        epsilon = torch.randn_like(std)
        z = epsilon * std + mu
        return mu, sigma, self.decoder(z)

    def generate(self,new_code):
        new_code = new_code.to(device) 
        return self.decoder(new_code) # 【假设此时new_code被用户经过正态分布的处理了！】


class CVAE(nn.Module):
    def __init__(self,encoder,decoder,**kwargs) -> None:
        super(CVAE, self).__init__()
        self.encoder = encoder #【【【小心！传入时arch必须设定成 “CVAE”】】】
        self.decoder = decoder

    def forward(self,x,label):# label必须传入
        mu,sigma =self.encoder(x,label)# 【forward返回两参数，其中input已经concatenate了】
        std = torch.exp(0.5 * sigma)
        epsilon = torch.randn_like(std)
        z = epsilon * std + mu
        return mu, sigma, self.decoder(z,label) # decoder中再合并

    def generate(self,new_code,label): # 【假设此时new_code被用户经过正态分布的处理了！label就是一个数字】
        new_code = new_code.to(device)
        label = label.to(device)
        return self.decoder(new_code,label)
        



# 1.2 模型训练

## 1.2.1 损失函数
其中值得注意的是VAE及CVAE为了防止过拟合而引入的KL散度，作为损失函数的衡量标准之一
$$KL(\mathcal{N}(\mu,\sigma ^2)||\mathcal{N}(0,1) ) = -\frac{1}{2}(\log (\sigma ^2)-\sigma ^2-\mu^2+1) $$

In [76]:
# 训练中损失函数
# AE：MSE
# VAE & CVAE：KL loss + MSE 

# 【可调节超参数：kl散度在epoch增加后所占loss的权重】
aging_rate = 0.3

class Loss(nn.Module):
    def __init__(self, arch="AE", choice="BCE", **kwargs) -> None:
        super().__init__()
        self.arch = arch
        self.choice = choice
    def recon_loss(self, x, x_): # 重构损失
        if self.choice == "L1": # L1
            instance = nn.L1Loss()  # 【【BUG？必须选择这个？】】
            loss = instance(x, x_) # 【【类的实例化（上一行） 和 实例的调用（本行）不能放在一起！！！】】
        else: # BCE
            loss = F.binary_cross_entropy(F.sigmoid(x_),F.sigmoid(x), reduction='sum')
        return loss

    def kl_div(self, mu,sigma): # KL loss
        return  -0.5 * torch.mean(torch.log(sigma.pow(2)) - sigma.pow(2) - mu.pow(2) + 1)

    def forward(self, x, x_, mu=None, sigma=None, **otherinputs):# loss的计算
        loss = Loss.recon_loss(self,x,x_)# 可选choice = "L1"
        if self.arch == "AE":
            total_loss = loss
        elif self.arch == "VAE" or self.arch=="CVAE":
            kl_loss = Loss.kl_div(self,mu,sigma)
            total_loss = loss + kl_loss
        return total_loss

### 1.2.2 训练

In [77]:
# 超参数，可以在x.reshape中调整！

batch_size = 128

def train_epoch(model, train_loader, loss, optimizer, epoch, epoch_num, arch):
    """
    main中需要传入的参数
    model：是AE、VAE、CVAE的model（类的实例）
    train_loader：main中导入的minist数据集
    loss：定义的损失函数类的实例中的函数
    optimizer：torch中自带的优化器
    epoch：当前迭代优化轮数
    epoch_num：一共迭代优化的轮数
    arch：当前选择的model的名字（字符串）
    """
    model.train() # 调用继承的父类nn.Module的方法，所有子模块调整为训练模式
    train_loss = 0
    for i, (x, y) in enumerate(train_loader):
        x = x.to(device)
        # 【【【只需要把所有训练数据（x，y）放到cuda上就可以啦！！！其他的自动依据device判断位置】】】
        y = y.to(device)
        # Review：enumerate把可遍历的数据对象(如列表、元组或字符串)组合为一个索引序列，同时列出数据和数据下标
        # 既可以直接访问该对象，又可以通过索引来访问！
        x = x.reshape(128, 1, -1)  
        # 生成(128,1,784)，原来batch_size对应的特征矩阵是128*1*28*28，
        # 其实四维和三维的效果一样，28*28每个pixel都是要作为一个neuron作为input的！
        if arch == "AE":
            # x_ = model.forward(x)
            x_ = model(x)     
            """
            自动调用 forward 函数原因分析：
            利用Python的语言特性，y = model(x)是调用了对象model的__call__方法，
            而nn.Module把__call__方法实现为类对象的forward函数，
            所以任意继承了nn.Module的类对象都可以这样简写来调用forward函数。
            """
            loss_num = loss(x, x_) # 同理，自动调用loss中的forward方法！！！
        elif  arch == "VAE":
            # mu, log_var, x_= model.forward(x)
            mu, sigma, x_= model(x)
            loss_num = loss(x, x_,mu,sigma)
        elif  arch == "CVAE":
            # mu, log_var, x_ = model.forward(x,y)# y是train data的真实label
            mu, sigma, x_ = model(x,y)# y是train data的真实label
            loss_num = loss(x, x_,mu,sigma)
        train_loss += loss_num # 在此处计算总loss
        optimizer.zero_grad() # 在计算本batch_size的梯度之前，把之前的梯度清空,不做积累。只针对每一个batch中loss回溯
        loss_num.backward()
        optimizer.step()

        if i % 50 == 0: # 训练过程中的标记
            print('Train Epoch:',epoch, i * len(x), len(train_loader.dataset),
                       100. * i / len(train_loader),"Loss: ",loss_num.item())
    print('====> Epoch: {} Average loss: {:.4f}'.format(epoch,
        train_loss * 128 / len(train_loader.dataset)))


### 1.2.3 性能评估

In [78]:
import os
from torchvision.utils import make_grid, save_image
# make_grid是张量可视化的包！！！
# 可以协同plt一起共同操作，可以操作一下！！！

def evaluate_epoch(model, test_loader, loss, epoch, epoch_num, arch):
    model.eval() # 所有子模块调整为评估模式
    test_loss = 0
    with torch.no_grad(): # 评估过程，不用调整参数=不用梯度下降》》在此作用域中不会构建计算图
        for i, (x, y) in enumerate(test_loader):
            x = x.to(device)
            y = y.to(device)#【必须要放！】
            x = x.reshape(128, 1, -1) # 同理，特征矩阵的三维处理

            if arch == "AE":
                x_ = model(x)
                loss_num = loss(x, x_)
                test_loss += loss_num.item()

                # 保存一些重构出来的图像
                with torch.no_grad():
                    if i % 20 == 0:
                        # 对照：保留原始图像数据，每50个保留一个

                        # 测试：对原图的重现功能
                        if not os.path.exists(f"results_AE/epoch_{epoch}"):
                            os.makedirs(f"results_AE/epoch_{epoch}")
                        save_x_ = x_.reshape(-1, 1, 28, 28)[:16]
                        save_x_ = make_grid(save_x_, 8, 0)
                        save_image(save_x_, os.path.join(f"results_AE/epoch_{epoch}/batch_{i}.png"))

                        if not os.path.exists(f"results_AE/initial_data"):
                            os.makedirs(f"results_AE/initial_data")
                        save_x = x.reshape(-1,1, 28, 28)[:16]#dim=0是batch_size，后面就是一个个图片的三维大小啦
                        save_x = make_grid(save_x,8,0) # 【生成图片，可以后续使用plt在notebook展现】
                        save_image(save_x,os.path.join(f"results_AE/initial_data/batch_{i}.png"))

            # 只有VAE和CVAE有图像生成功能，AE就不用考虑啦！
            elif arch == "VAE":
                mu, sigma, x_ = model(x)
                loss_num = loss(x, x_, mu, sigma)
                test_loss += loss_num.item()
                z = torch.randn(128,1,10)

                # 测试：随机向量生成图片功能
                sample = model.generate(z)
                sample = sample.reshape(-1, 1, 28, 28)[:32]
                #print(z[0],sample[0])
                if not os.path.exists(f"results_VAE"):
                    os.makedirs(f"results_VAE")
                save_image(
                    sample,
                    os.path.join(f"results_VAE/epoch_{epoch}.png")
                )
                # 【生成图片，可以后续使用plt在notebook展现】

            elif arch == "CVAE":
                mu, sigma, x_ = model(x,y)
                loss_num = loss(x, x_, mu, sigma)
                test_loss += loss_num.item()
                z = torch.randn(128,1,10)

                # 测试：随机向量生成图片功能
                sample = model.generate(z,y)
                sample = sample.reshape(-1, 1, 28, 28)[:32]
                #print(z[0],sample[0])
                # 保存一些重构出来的图像用于(写报告)进一步研究 (5/100)
                if not os.path.exists(f"results_CVAE"):
                    os.makedirs(f"results_CVAE")
                save_image(
                    sample,
                    os.path.join(f"results_CVAE/epoch_{epoch}.png")
                )
                # 【生成图片，可以后续使用plt在notebook展现】

    test_loss *= 128
    test_loss /= len(test_loader.dataset)
    print('====> Test set loss: {:.4f}'.format(test_loss))

### 1.2.4 数据准备

In [79]:
# 数据集格式：28*28的灰度图像，channel = 1
# 超参数 batch_size = 128

import torch.utils.data as td
import torchvision.datasets.mnist as mnist
import torchvision.transforms as transforms
import numpy as np
import math
import scipy.linalg as linalg
import matplotlib as mpl
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt

def get_data(wh_train=True, batch_size=128):
    dataset = mnist.MNIST(
        root="./data",
        train=wh_train,
        download=True,
        transform=transforms.Compose(
            [
                #transforms.ToPILImage(),
                transforms.ToTensor(),
                transforms.Normalize([0.1307,], [0.3081,],),
            ]
        ),
    )
    #for i, (x, y) in enumerate(dataset):
        #print(i,x.shape,y)# 0 torch.Size([1, 28, 28]) 
        #print(y.shape)
    return td.DataLoader(dataset, batch_size=batch_size,drop_last=True,)



### 1.2.5 main函数

In [80]:
import matplotlib
import argparse
import logging


def main(args):
    encoder_args = {
        "x_dim": args.x_dim,
        "hidden_size": args.hidden_size,
        "latent_size": args.latent_size,
        "arch": args.type,
    }
    encoder = Encoder(**encoder_args)
    decoder = Decoder(**encoder_args)
    ae = {"AE": AE, "VAE": VAE, "CVAE": CVAE}

    auto_encoder = ae[args.type](encoder, decoder) #【巧！通过字典的索引直接可以用此类来创建实例啦！】

    auto_encoder.to(device)
    # 优化器选择
    # optimizer = torch.optim.SGD(auto_encoder.parameters(), lr=0.01)
    optimizer =torch.optim.Adam(auto_encoder.parameters(), lr=0.001)
    # class中用nn定义的神经网络中参数，可认为是自动加上了nn.Parameters()的处理，参数会被加入！
    train_loader = get_data(wh_train=True, batch_size=args.batch_size)
    test_loader = get_data(wh_train=False, batch_size=args.batch_size)
    loss = Loss(args.type,choice="L1")#【不加choice，就默认用BCE.可以选择choice="L1"】

# TODO：可以每个损失函数，分别给一个文件夹存放出来的图片呢！！！

    for epoch in range(args.epoch_num):
        print("\n epoch:", epoch)
        train_epoch(model=auto_encoder, loss=loss, train_loader=train_loader,
                    optimizer=optimizer, epoch=epoch, epoch_num=args.epoch_num, arch=args.type)
        evaluate_epoch(model=auto_encoder, test_loader=test_loader, loss=loss,
                   epoch=epoch, epoch_num=args.epoch_num, arch=args.type)

if __name__ == "__main__":
    matplotlib.use('TkAgg')
    parser = argparse.ArgumentParser()
    parser.add_argument("--type", default="VAE", choices=["AE", "VAE", "CVAE"])
    parser.add_argument("--x_dim", default=784, type=int)  # 28 x 28 的像素展开为一个一维的行向量，每行代表一个图片
    parser.add_argument("--latent_size", default=10, type=int)  # 输出层大小，即服从高斯分布的隐含变量的维度。
    parser.add_argument("--hidden_size", default=128, type=int)
    parser.add_argument("--batch_size", default=128, type=int)
    parser.add_argument("--epoch_num", default=128, type=int)
    # args = parser.parse_args()
    # 调用parser.parse_args()会读取系统参数：sys.argv[]，命令行调用时是正确参数，
    # 而在jupyter notebook中调用时，sys.argv的值为ipykrnel_launcher.py：

    # args = parser.parse_args(args=[])
    
    args = parser.parse_known_args()[0]
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    args.device = device
    n_gpu = torch.cuda.device_count()
    logging.basicConfig(format = '%(asctime)s - %(levelname)s - %(name)s -   %(message)s',
                        datefmt = '%m/%d/%Y %H:%M:%S',
                        level = logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info(f"device: {device} n_gpu: {n_gpu}")

    print(args)
    main(args)
    # parser.set_defaults(type="CVAE")
    # args = parser.parse_args()

11/19/2023 13:35:52 - INFO - __main__ -   device: cuda n_gpu: 1


Namespace(batch_size=128, device=device(type='cuda'), epoch_num=128, hidden_size=128, latent_size=10, type='VAE', x_dim=784)

 epoch: 0
Train Epoch: 0 0 60000 0.0 Loss:  2.25697922706604
Train Epoch: 0 6400 60000 10.683760683760683 Loss:  0.6273771524429321
Train Epoch: 0 12800 60000 21.367521367521366 Loss:  0.535597026348114
Train Epoch: 0 19200 60000 32.05128205128205 Loss:  0.689893901348114
Train Epoch: 0 25600 60000 42.73504273504273 Loss:  0.5740326046943665
Train Epoch: 0 32000 60000 53.41880341880342 Loss:  0.5111684799194336
Train Epoch: 0 38400 60000 64.1025641025641 Loss:  0.45639461278915405
Train Epoch: 0 44800 60000 74.78632478632478 Loss:  0.462720662355423
Train Epoch: 0 51200 60000 85.47008547008546 Loss:  0.5798219442367554
Train Epoch: 0 57600 60000 96.15384615384616 Loss:  0.47290411591529846
====> Epoch: 0 Average loss: 0.5627
====> Test set loss: 0.5021

 epoch: 1
Train Epoch: 1 0 60000 0.0 Loss:  0.4960477352142334
Train Epoch: 1 6400 60000 10.683760683760683 Lo

KeyboardInterrupt: 