In [None]:
import tensorflow as tf
import keras
from keras import Model, layers, applications
import os, os.path as path, sys
import pandas as pd, numpy as np
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
Sequence = tf.keras.utils.Sequence

print(tf.__version__)
print(tf.config.list_physical_devices('GPU'))

In [None]:
workingDir = 'o:/temp/pixiv/training/'
# if len(sys.argv) > 1:
#     workingDir = sys.argv[1]
imgSize = (112, 112)
inputShape = (112, 112, 3)


In [None]:
class MyGAN(Model):

    def __init__(self, latent_dim=128, img_shape=inputShape):
        super(MyGAN, self).__init__()
        self.latent_dim = latent_dim
        self.img_shape = img_shape
        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()

    # def save(self, targetDir):
    #     self.generator.save(path.join(targetDir, 'generator.h5'))
    #     self.discriminator.save(path.join(targetDir, 'discriminator.h5'))

    def build_generator(self):
        model = keras.Sequential([
            layers.Input(shape=(self.latent_dim)),
            layers.Dense(7 * 7 * self.latent_dim),
            layers.Reshape((7, 7, self.latent_dim)),
            layers.Conv2DTranspose(
                self.latent_dim * 1, kernel_size=3, strides=2, padding='same'),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(
                self.latent_dim * 2, kernel_size=3, strides=2, padding='same'),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(
                self.latent_dim * 3, kernel_size=3, strides=2, padding='same'),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(
                self.latent_dim * 4, kernel_size=3, strides=2, padding='same'),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2DTranspose(
                3, kernel_size=3, padding='same', activation='sigmoid')
        ],
                                 name='generator')
        model.summary()
        return model

    def build_discriminator(self):
        model = keras.Sequential([
            layers.Input(shape=self.img_shape),
            layers.Conv2D(32, kernel_size=3, strides=2, padding='same'),
            layers.BatchNormalization(),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2D(64, kernel_size=3, strides=2, padding='same'),
            layers.BatchNormalization(),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2D(128, kernel_size=3, strides=2, padding='same'),
            layers.BatchNormalization(),
            layers.LeakyReLU(alpha=0.2),
            layers.Conv2D(256, kernel_size=3, strides=2, padding='same'),
            layers.BatchNormalization(),
            layers.LeakyReLU(alpha=0.2),
            layers.GlobalAveragePooling2D(),
            layers.Dense(256),
            layers.LeakyReLU(alpha=0.2),
            layers.Dense(1, activation='sigmoid')
        ],
                                 name='discriminator')
        model.summary()
        return model

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(MyGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn
        self.d_loss_metric = keras.metrics.Mean(name='d_loss')
        self.g_loss_metric = keras.metrics.Mean(name='g_loss')

    @property
    def metrics(self):
        return [self.d_loss_metric, self.g_loss_metric]

    def train_step(self, real_imgs):
        batch_size = tf.shape(real_imgs)[0]
        # Sample random points in the latent space
        random_numbers = tf.random.normal(shape=(batch_size, self.latent_dim))
        # Generate a batch of new images
        gen_imgs = self.generator(random_numbers)
        # combine real and fake images
        combined_imgs = tf.concat([real_imgs, gen_imgs], axis=0)
        # Labels for generated and real images
        labels = tf.concat(
            [tf.ones((batch_size, 1)),
             tf.zeros((batch_size, 1))], axis=0)
        # add random noise to the labels - important!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            pred = self.discriminator(combined_imgs)
            d_loss = self.loss_fn(labels, pred)
        grads = tape.gradient(d_loss, self.discriminator.trainable_variables)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_variables))

        # Sample random points in the latent space
        random_numbers = tf.random.normal(shape=(batch_size, self.latent_dim))
        # assemble labels for the generator
        labels = tf.ones((batch_size, 1))
        # Train the generator
        with tf.GradientTape() as tape:
            pred = self.discriminator(self.generator(random_numbers))
            g_loss = self.loss_fn(labels, pred)
        grads = tape.gradient(g_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(
            zip(grads, self.generator.trainable_variables))

        # Update metrics
        self.d_loss_metric.update_state(d_loss)
        self.g_loss_metric.update_state(g_loss)
        return {
            "d_loss": self.d_loss_metric.result(),
            "g_loss": self.g_loss_metric.result()
        }


class GANMonitor(keras.callbacks.Callback):

    def __init__(self, num_img=3, latent_dim=64):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.rlv = None

    def on_epoch_end(self, epoch, logs=None):
        if self.rlv is None:

            self.rlv = tf.random.normal(shape=(self.num_img, self.latent_dim))
        generated_images = self.model.generator(self.rlv)
        generated_images *= 255
        generated_images.numpy()
        with ThreadPoolExecutor(max_workers=5) as executor:
            for i in range(self.num_img):
                img = keras.preprocessing.image.array_to_img(
                    generated_images[i])
                executor.submit(self.save_pic, img, epoch, i)
            # save model weights
            if epoch % 10 == 0:
                self.model.generator.save_weights(
                    f'weights/gen_weights_{epoch}.h5')
                self.model.discriminator.save_weights(
                    f'weights/disc_weights_{epoch}.h5')

    def save_pic(self, img, epoch, i):
        img.save("tmp/generated_img_%03d_%d.jpg" % (epoch, i), quality=95)


In [None]:
dataset = []
data = pd.read_csv(path.join(workingDir, 'data.csv'), index_col=0)


def read_img(img_path):
    img = Image.open(img_path)
    img = img.resize(imgSize)
    img = np.asarray(img)
    img = img / 255.0
    return img.astype(np.float32)


img_paths = []
with ThreadPoolExecutor(max_workers=6) as executor:
    for f in os.listdir(path.join(workingDir, 'ds')):
        img_paths.append(path.join(workingDir, 'ds', f))
    dataset = executor.map(read_img, img_paths)
dataset = list(dataset)
dataset = np.array(dataset)
dataset.shape

In [None]:
epochs = 100  # In practice, use ~100 epochs

gan = MyGAN(latent_dim=64)
gan.compile(
    d_optimizer=tf.keras.optimizers.Adam(learning_rate=0.0002),
    g_optimizer=tf.keras.optimizers.Adam(learning_rate=0.0002),
    loss_fn=keras.losses.BinaryCrossentropy(),
)

gan.fit(
    dataset,
    batch_size=40,
    # initial_epoch=8,
    epochs=epochs,
    callbacks=[GANMonitor(num_img=5, latent_dim=64)])
