In [117]:
import os
import glob
import multiprocessing
import numpy as np
# from scipy.misc import toimage
from imageio import imsave
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from dataset import make_anime_dataset

In [118]:
class Generator(keras.Model):
    def __init__(self):
        super(Generator, self).__init__()
        filter = 64
        self.conv1 = layers.Conv2DTranspose(filter * 8, 4, 1, 'valid', use_bias = False)
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2DTranspose(filter * 4, 4, 2, 'same', use_bias = False)
        self.bn2 = layers.BatchNormalization()
        self.conv3 = layers.Conv2DTranspose(filter * 2, 4, 2, 'same', use_bias = False)
        self.bn3 = layers.BatchNormalization()
        self.conv4 = layers.Conv2DTranspose(filter * 1, 4, 2, 'same', use_bias = False)
        self.bn4 = layers.BatchNormalization()
        self.conv5 = layers.Conv2DTranspose(3, 4, 2, 'same', use_bias = False)
        
    def call(self, inputs, training = None):
        x = inputs
        x = tf.reshape(x, (x.shape[0], 1, 1, x.shape[1]))
        x = tf.nn.relu(x)
        x = tf.nn.relu(self.bn1(self.conv1(x), training=training))
        x = tf.nn.relu(self.bn2(self.conv2(x), training=training))
        x = tf.nn.relu(self.bn3(self.conv3(x), training=training))
        x = tf.nn.relu(self.bn4(self.conv4(x), training=training))
        x = self.conv5(x)
        x = tf.tanh(x)
        return x

In [119]:
class Discriminator(keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        filter = 64
        self.conv1 = layers.Conv2D(filter, 4, 2, 'valid', use_bias = False)
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2D(filter * 2, 4, 2, 'valid', use_bias = False)
        self.bn2 = layers.BatchNormalization()
        self.conv3 = layers.Conv2D(filter * 4, 4, 2, 'valid', use_bias = False)
        self.bn3 = layers.BatchNormalization()
        self.conv4 = layers.Conv2D(filter * 8, 3, 1, 'valid', use_bias = False)
        self.bn4 = layers.BatchNormalization()
        self.conv5 = layers.Conv2D(filter * 16, 3, 1, 'valid', use_bias = False)
        self.bn5 = layers.BatchNormalization()
        self.pool = layers.GlobalAveragePooling2D()
        self.flatten = layers.Flatten()
        self.fc = layers.Dense(1)
    
    def call(self, inputs, training = None):
        x = tf.nn.leaky_relu(self.bn1(self.conv1(inputs),training=training))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x),training=training))
        x = tf.nn.leaky_relu(self.bn3(self.conv3(x),training=training))
        x = tf.nn.leaky_relu(self.bn4(self.conv4(x),training=training))
        x = tf.nn.leaky_relu(self.bn5(self.conv5(x),training=training))
        x = self.pool(x)
        x = self.flatten(x)
        logits = self.fc(x)
        return logits

In [120]:
def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    d_real_logits = discriminator(batch_x, is_training)
    d_loss_real = celoss_ones(d_real_logits)
    d_loss_fake = celoss_zeros(d_fake_logits)
    loss = d_loss_fake + d_loss_real
    return loss

def celoss_ones(logits):
    y = tf.ones_like(logits)
    loss = keras.losses.binary_crossentropy(y, logits, from_logits=True)
    return tf.reduce_mean(loss)

def celoss_zeros(logits):
    y = tf.zeros_like(logits)
    loss = keras.losses.binary_crossentropy(y, logits, from_logits=True)
    return tf.reduce_mean(loss)

def g_loss_fn(generator, discriminator, batch_z, is_training):
    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    loss = celoss_ones(d_fake_logits)
    return loss

def save_result(val_out, val_block_size, image_path, color_mode):
    def preprocess(img):
        img = ((img + 1.0) * 127.5).astype(np.uint8)
        # img = img.astype(np.uint8)
        return img

    preprocesed = preprocess(val_out)
    final_image = np.array([])
    single_row = np.array([])
    for b in range(val_out.shape[0]):
        # concat image into a row
        if single_row.size == 0:
            single_row = preprocesed[b, :, :, :]
        else:
            single_row = np.concatenate((single_row, preprocesed[b, :, :, :]), axis=1)

        # concat image row to final_image
        if (b+1) % val_block_size == 0:
            if final_image.size == 0:
                final_image = single_row
            else:
                final_image = np.concatenate((final_image, single_row), axis=0)

            # reset single row
            single_row = np.array([])

    if final_image.shape[2] == 1:
        final_image = np.squeeze(final_image, axis=2)
#     toimage(final_image).save(image_path)
    imsave(image_path, final_image)

In [None]:
tf.random.set_seed(3333)
np.random.seed(3333)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')

z_dim = 100 # 隐藏向量z的长度
epochs = 3000000 # 训练步数
batch_size = 64 # batch size
learning_rate = 0.0002
is_training = True

img_path = glob.glob(r'faces/*.jpg')
print('images num:', len(img_path))
# 构建数据集对象
dataset, img_shape, _ = make_anime_dataset(img_path, batch_size, resize = 64)
print(dataset, img_shape)
sample = next(iter(dataset)) # 采样
print(sample.shape, tf.reduce_max(sample).numpy(),
      tf.reduce_min(sample).numpy())
dataset = dataset.repeat(100) # 重复循环
db_iter = iter(dataset)

generator = Generator()
generator.build(input_shape=(4, z_dim))
discriminator = Discriminator()
discriminator.build(input_shape=(4, 64, 64, 3))
g_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
d_optimizer = keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)

d_losses, g_losses = [],[]
for epoch in range(epochs):
    for _ in range(1):
        batch_z = tf.random.normal([batch_size, z_dim])
        batch_x = next(db_iter)
        with tf.GradientTape() as tape:
            d_loss = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
        grads = tape.gradient(d_loss, discriminator.trainable_variables)
        d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

    batch_z = tf.random.normal([batch_size, z_dim])
    batch_x = next(db_iter)

    with tf.GradientTape() as tape:
        g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
    grads = tape.gradient(g_loss, generator.trainable_variables)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

    if epoch % 100 == 0:
        print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss))
        # 可视化
        z = tf.random.normal([100, z_dim])
        fake_image = generator(z, training=False)
        img_path = os.path.join('gan_images', 'gan-%d.png'%epoch)
        save_result(fake_image.numpy(), 10, img_path, color_mode='P')

        d_losses.append(float(d_loss))
        g_losses.append(float(g_loss))

        if epoch % 10000 == 1:
            # print(d_losses)
            # print(g_losses)
            generator.save_weights('generator.ckpt')
            discriminator.save_weights('discriminator.ckpt')

images num: 51223
<PrefetchDataset shapes: (64, 64, 64, 3), types: tf.float32> (64, 64, 3)
(64, 64, 64, 3) 1.0 -1.0
0 d-loss: 1.4976342916488647 g-loss: 0.5409046411514282
100 d-loss: 0.5926315784454346 g-loss: 1.4660078287124634
200 d-loss: 1.2859300374984741 g-loss: 1.1056727170944214
300 d-loss: 0.6222599744796753 g-loss: 1.830592155456543
400 d-loss: 0.7236131429672241 g-loss: 1.8737250566482544
500 d-loss: 1.1825542449951172 g-loss: 2.9941649436950684
600 d-loss: 1.0746476650238037 g-loss: 1.9452667236328125
700 d-loss: 1.1636590957641602 g-loss: 2.20737886428833


In [122]:
import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers

class Generator(keras.Model):
    def __init__(self):
        super(Generator, self).__init__()

        # z: [b, 100] => [b, 3*3*512] => [b, 3, 3, 512] => [b, 64, 64, 3]
        self.fc = layers.Dense(3*3*512)

        self.conv1 = layers.Conv2DTranspose(256, 3, 3, 'valid')
        self.bn1 = layers.BatchNormalization()

        self.conv2 = layers.Conv2DTranspose(128, 5, 2, 'valid')
        self.bn2 = layers.BatchNormalization()

        self.conv3 = layers.Conv2DTranspose(3, 4, 3, 'valid')

    def call(self, inputs, training=None):
        # [z, 100] => [z, 3*3*512]
        x = self.fc(inputs)
        x = tf.reshape(x, [-1, 3, 3, 512])
        x = tf.nn.leaky_relu(x)

        #
        x = tf.nn.leaky_relu(self.bn1(self.conv1(x), training=training))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = self.conv3(x)
        x = tf.tanh(x)

        return x


class Discriminator(keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()

        # [b, 64, 64, 3] => [b, 1]
        self.conv1 = layers.Conv2D(64, 5, 3, 'valid')

        self.conv2 = layers.Conv2D(128, 5, 3, 'valid')
        self.bn2 = layers.BatchNormalization()

        self.conv3 = layers.Conv2D(256, 5, 3, 'valid')
        self.bn3 = layers.BatchNormalization()

        # [b, h, w ,c] => [b, -1]
        self.flatten = layers.Flatten()
        self.fc = layers.Dense(1)


    def call(self, inputs, training=None):

        x = tf.nn.leaky_relu(self.conv1(inputs))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = tf.nn.leaky_relu(self.bn3(self.conv3(x), training=training))

        # [b, h, w, c] => [b, -1]
        x = self.flatten(x)
        # [b, -1] => [b, 1]
        logits = self.fc(x)

        return logits

def main():

    d = Discriminator()
    g = Generator()
    x = tf.random.normal([2, 64, 64, 3])
    z = tf.random.normal([2, 100])
    prob = d(x)
    print(prob)
    x_hat = g(z)
    print(x_hat.shape)




if __name__ == '__main__':
    main()

tf.Tensor(
[[ 0.07164985]
 [-0.12412959]], shape=(2, 1), dtype=float32)
(2, 64, 64, 3)


In [None]:
def gradient_penalty(discriminator, batch_x, fake_image):

    batchsz = batch_x.shape[0]

    # [b, h, w, c]
    t = tf.random.uniform([batchsz, 1, 1, 1])
    # [b, 1, 1, 1] => [b, h, w, c]
    t = tf.broadcast_to(t, batch_x.shape)

    interplate = t * batch_x + (1 - t) * fake_image

    with tf.GradientTape() as tape:
        tape.watch([interplate])
        d_interplote_logits = discriminator(interplate, training=True)
    grads = tape.gradient(d_interplote_logits, interplate)

    # grads:[b, h, w, c] => [b, -1]
    grads = tf.reshape(grads, [grads.shape[0], -1])
    gp = tf.norm(grads, axis=1) #[b]
    gp = tf.reduce_mean( (gp-1)**2 )

    return gp



def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
    # 1. treat real image as real
    # 2. treat generated image as fake
    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    d_real_logits = discriminator(batch_x, is_training)

    d_loss_real = celoss_ones(d_real_logits)
    d_loss_fake = celoss_zeros(d_fake_logits)
    gp = gradient_penalty(discriminator, batch_x, fake_image)

    loss = d_loss_real + d_loss_fake + 10. * gp

    return loss, gp


def g_loss_fn(generator, discriminator, batch_z, is_training):

    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    loss = celoss_ones(d_fake_logits)

    return loss


def main():

    tf.random.set_seed(233)
    np.random.seed(233)
    assert tf.__version__.startswith('2.')


    # hyper parameters
    z_dim = 100
    epochs = 3000000
    batch_size = 512
    learning_rate = 0.0005
    is_training = True


    img_path = glob.glob(r'faces/*.jpg')
    assert len(img_path) > 0
    

    dataset, img_shape, _ = make_anime_dataset(img_path, batch_size)
    print(dataset, img_shape)
    sample = next(iter(dataset))
    print(sample.shape, tf.reduce_max(sample).numpy(),
          tf.reduce_min(sample).numpy())
    dataset = dataset.repeat()
    db_iter = iter(dataset)


    generator = Generator() 
    generator.build(input_shape = (None, z_dim))
    discriminator = Discriminator()
    discriminator.build(input_shape=(None, 64, 64, 3))
    z_sample = tf.random.normal([100, z_dim])


    g_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
    d_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)


    for epoch in range(epochs):

        for _ in range(5):
            batch_z = tf.random.normal([batch_size, z_dim])
            batch_x = next(db_iter)

            # train D
            with tf.GradientTape() as tape:
                d_loss, gp = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
            grads = tape.gradient(d_loss, discriminator.trainable_variables)
            d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))
        
        batch_z = tf.random.normal([batch_size, z_dim])

        with tf.GradientTape() as tape:
            g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
        grads = tape.gradient(g_loss, generator.trainable_variables)
        g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

        if epoch % 100 == 0:
            print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss),
                  'gp:', float(gp))

            z = tf.random.normal([100, z_dim])
            fake_image = generator(z, training=False)
            img_path = os.path.join('wgan-images', 'wgan-%d.png'%epoch)
            save_result(fake_image.numpy(), 10, img_path, color_mode='P')
if __name__ == '__main__':
    main()