In [15]:
import numpy as np
from scipy.stats import norm
import tensorflow.compat.v1 as tf
import matplotlib.pyplot as plt
tf.disable_v2_behavior()

设置真实数据样本分布，均值为3，方差为0.5

In [16]:
class DataDistribution(object):
    def __init__(self) -> None:
        self.mu = 3
        self.sigma = 0.5
    def sample(self, N):
        samples = np.random.normal(self.mu, self.sigma, N)
        samples.sort()
        return samples

设定生成器的初始分布

In [17]:
# 设定的为平均分布
class GeneratorDistribution(object):
    def __init__(self) -> None:
        self.range = range
    def sample(self, N):
        return np.linspace(-self.range, self.range, N) + np.random.random(N) * 0.01

设置一个简单的线性运算含函数

In [18]:
def linear(input, output_dim, scope=None, stddev=1.0):
    norm = tf.random_normal_initializer(stddev=stddev)
    const = tf.constant_initializer(0.0)
    with tf.variable_scope(scope or 'linear'):
        w = tf.get_variable('w', [input.get_shape()[1], output_dim], initializer=norm)
        b = tf.get_variable('b', [output_dim], initializer=const)
        return tf.matmul(input, w) + b

实现简单的生成器和判别器

In [19]:
def generator(input, h_dim):
    h0 = tf.nn.softplus(linear(input, h_dim, 'g0'))
    h1 = linear(h0, 1, 'g1')
    return h1

In [20]:
def discriminator(input, h_dim):
    h0 = tf.tanh(linear(input, h_dim * 2, 'd0'))
    h1 = tf.tanh(linear(h0, h_dim * 2, 'd1'))
    h2 = tf.tanh(linear(h1, h_dim * 2, 'd2'))
    h3 = tf.tanh(linear(h2, 1, 'd3'))
    return h3

设置优化器，选择使得学习率衰减的梯度下降方法

In [21]:
def optimizer(loss, var_list, initial_learing_rate):
    decay = 0.95
    num_decay_steps = 150
    batch = tf.Variable(0)
    learning_rate = tf.train.exponential_decay(
        initial_learing_rate,
        batch,
        num_decay_steps,
        decay,
        staircase=True
    )
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(
        loss,
        global_step=batch,
        var_list=var_list
    )
    return optimizer

搭建 GAN 模型类的代码

除了初始化参数外，最核心的两个函数是模型的创建和模型的训练

In [22]:
class GAN(object):
    def __init__(self, data, gen, num_steps, batch_size, log_every):
        self.data = data
        self.gen = gen
        self.num_steps = num_steps
        self.batch_size = batch_size
        self.log_every = log_every
        self.mlp_hidden_size = 4
        self.learning_rate = 0.03
        self._create_model()
    
    def _create_model(self):
        """
        创建模型
        预训练判别器:D_pre
        生成器:Generator
        判别器:Discriminator
        生成器损失函数:loss_g
        判别器损失函数:loss_d
        生成器优化器:opt_g
        判别器优化器:opt_d
        真实数据的判别:D1
        生成数据的判别:D2
        """
        with tf.variable_scope('D_pre'):
            self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            D_pre = discriminator(self.pre_input, self.mlp_hidden_size)
            self.pre_loss = tf.reduce_mean(tf.square(D_pre - self.pre_labels))
            self.pre_opt = optimizer(self.pre_loss, None, self.learning_rate)
        with tf.variable_scope('Generator'):
            self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.G = generator(self.z, self.mlp_hidden_size)
        with tf.variable_scope('Discriminator') as scope:
            self.x = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.D1 = discriminator(self.x, self.mlp_hidden_size)
            scope.reuse_variables()
            self.D2 = discriminator(self.G, self.mlp_hidden_size)
        
        self.loss_d = tf.reduce_mean(-tf.log(self.D1) - tf.log(1 - self.D2))
        self.loss_g = tf.reduce_mean(-tf.log(self.D2))

        self.d_pre_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='D_pre')
        self.d_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Discriminator')
        self.g_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='Generator')

        self.opt_d = optimizer(self.loss_d, self.d_params, self.learning_rate)
        self.opt_g = optimizer(self.loss_g, self.g_params, self.learning_rate)
    def train(self):
        """
        训练模型：
        预先训练判别器D_pre,然后将训练后的参数共享给判别器Discriminator
        然后正式训练生成器Generator和判别器Discriminator
        """
        with tf.Session() as session:
            tf.global_variables_initializer().run()
            num_pretrain_steps = 1000
            for step in range(num_pretrain_steps):
                d = (np.random.random(self.batch_size) - 0.5) * 10.0
                labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma)
                pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt], 
                                               {self.pre_input:np.reshape(d, (self.batch_size,1)), 
                                                self.pre_labels:np.reshape(labels, (self.batch_size,1))
                                                })
                self.weightsD = session.run(self.d_pre_params)
                for i, v in enumerate(self.d_params):
                    session.run(v.assign(self.weightsD[i]))
                
                for step in range(self.num_steps):
                    # 更新判别器
                    x = self.data.sample(self.batch_size)
                    z = self.gen.sample(self.batch_size)
                    loss_d, _ = session.run([self.loss_d, self.opt_d],{
                        self.x: np.reshape(x, (self.batch_size, 1)),
                        self.z: np.reshape(z, (self.batch_size, 1))
                    })
                    # 更新生成器
                    z = self.gen.sample(self.batch_size)
                    loss_g, _ = session.run([self.loss_g, self.opt_g], {
                        self.z:np.reshape(z, (self.batch_size, 1))
                    })
                    if step % self.log_every == 0:
                        print('{}:{}\t {}'.format(step, loss_d, loss_g))
                    if step % 100 == 0 or step==0 or step==self.num_steps - 1:
                        self._plot_distributions(session)
    
    def _samples(self, session, num_points=10000, num_bins=100):
        xs = np.linspace(-self.gen.range, self.gen.range, num_points)
        bins = np.linspace(-self.gen.range, self.gen.range, num_bins)
        # 数据分布
        d = self.data.sample(num_points)
        pd, _ = np.histogram(d, bins=bins, density=True)
        # 生成样本
        zs = np.linspace(-self.gen.range, self.gen.range, num_points)
        g = np.zeros((num_points, 1))
        for i in range(num_points // self.batch_size):
            g[self.batch_size * i:self.batch * (i + 1)] = session.run(self.G,{self.z: np.reshape(zs[self.batch_size * (i + 1)], (self.batch_size, 1))})
            pg, _ = np.histogram(g, bins=bins, density=True)
            return pd, pg
    
    def _plot_distributions(self, session):
        pd, pg = self._samples(session)
        p_x = np.linspace(-self.gen.range, self.gen.range, len(pd))
        f, ax = plt.subplots(1)
        ax.set_ylim(0, 1)
        plt.plot(p_x, pd, label='Real Data')
        plt.plot(p_x, pg, label='generated Data')
        plt.title('GAN')
        plt.xlabel('Value')
        plt.ylabel('Probability Density')
        plt.legend()
        plt.show()


可视化代码，对数据进行采样来展示生成数据与真实数据的分布


In [23]:
# 主函数
def main(args):
    model = GAN(
        DataDistribution(),
        GeneratorDistribution(range=8),
        1200,
        12,
        10,
    )
    model.train()