In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import cv2
from keras.utils import to_categorical  # Only for categorical one hot encoding
from tensorflow.keras import layers

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

1 Physical GPUs, 1 Logical GPUs


In [3]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

# (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
cy_train = np.array(to_categorical(y_train))
cy_test = np.array(to_categorical(y_test))

cx_train, cx_test = np.array(x_train.reshape(-1, 784)/255.), np.array(x_test.reshape(-1, 784)/255.)
cx_train.shape

(60000, 784)

In [4]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.SUM)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
#     total_loss = tf.concat((fake_loss, real_loss), axis=0)
    total_loss = fake_loss + real_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [5]:
def generator():
    model = tf.keras.Sequential()
    model.add(layers.Dense(7*7*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((7, 7, 256)))

    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))

    return model

def discriminator():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same',
                                     input_shape=[28, 28, 1]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

In [6]:
gen = generator()
des = discriminator()

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [7]:
# @tf.function
def trainDes(gen, des, real, batch_size):
    with tf.GradientTape() as disc_tape:
        noise = tf.random.normal([batch_size, 100])
        fake = gen(noise, training=False)
        X = tf.concat((fake, real), axis=0)
        pred = des(X, training=True)

        fake_output = pred[:batch_size]
        real_output = pred[batch_size:]

        des_loss = discriminator_loss(real_output, fake_output)
        gradients_of_discriminator = disc_tape.gradient(des_loss, des.trainable_variables)
        discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, des.trainable_variables))

# @tf.function
def trainDesGen(gen, des, real, batch_size):
    with tf.GradientTape() as disc_tape, tf.GradientTape() as gen_tape:
        noise = tf.random.normal([batch_size, 100])
        fake = gen(noise, training=True)
        X = tf.concat((fake, real), axis=0)
        pred = des(X, training=True)

        fake_output = pred[:batch_size]
        real_output = pred[batch_size:]

        des_loss = discriminator_loss(real_output, fake_output)

        gradients_of_discriminator = disc_tape.gradient(des_loss, des.trainable_variables)
        discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, des.trainable_variables))

        gen_loss = generator_loss(fake_output)

        gradients_of_generator = gen_tape.gradient(gen_loss, gen.trainable_variables)
        generator_optimizer.apply_gradients(zip(gradients_of_generator, gen.trainable_variables))

# @tf.function
def trainGen(gen, des, batch_size):
    with tf.GradientTape() as gen_tape:
        noise = tf.random.normal([batch_size, 100])
        fake = gen(noise, training=True)
        fake_output = des(fake, training=False)
        gen_loss = generator_loss(fake_output)
        gradients_of_generator = gen_tape.gradient(gen_loss, gen.trainable_variables)
        generator_optimizer.apply_gradients(zip(gradients_of_generator, gen.trainable_variables))

def trainGan(realData, epochs=10, batch_size=5, loss='mse'):
    realData = realData.reshape(tuple([-1, batch_size] + list(realData.shape[1:])))
    print(realData.shape)
    for epoch in range(epochs):
        realData = tf.random.shuffle(realData)
        for iter in range(len(realData)):
            real = realData[iter]
            real = tf.cast(real, tf.float32)  
            trainDes(gen, des, real, batch_size)
#             trainDes(gen, des, real, batch_size)
            # trainDesGen(gen, des, real, batch_size)
            trainGen(gen, des, batch_size)

    noise = tf.random.normal([batch_size, 100])
    fake = gen(noise, training=True)
    print("Epoch ", epoch)
    for i in range(10):#len(fake)):
        plt.imshow(fake[i].numpy().reshape((28,28)))
        plt.show()

In [None]:
trainGan(cx_train.reshape((-1, 28, 28, 1)), epochs=100, batch_size=60)

(1000, 60, 28, 28, 1)


In [None]:
1 