In [None]:
!pip install tensorflow-gpu==2.1.0-rc0

In [None]:
import tensorflow as tf
import random
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import time

In [None]:
tf.__version__

In [None]:
path_to_file = '/kaggle/input/100k/100k'

**Create list of paths to iamge**

In [None]:
import os
DATASET = []
for f in os.listdir(path_to_file):
    path = os.path.join(path_to_file, f)
    DATASET.append(path)

**Read images when training each batch**

In [None]:
DATASET_SIZE = len(DATASET)
BATCH_SIZE = 128
STEP_PER_EPOCH = DATASET_SIZE // BATCH_SIZE

In [None]:
def get_batch(dataset):
    lst_files = random.choices(dataset, k=BATCH_SIZE)
    batch = []
    for path in lst_files:
        image = cv.imread(DATASET[0])
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
        image = cv.resize(image, (128,128))
        batch.append(image)
    
    batch = np.asarray(batch)
    normalized_batch = (batch / 127.5) - 1.0
    return tf.cast(tf.convert_to_tensor(normalized_batch), dtype=tf.float32)

In [None]:
# Sample input
sample = get_batch(DATASET)
sample.shape

**Define discriminator/generator model**

In [None]:
def create_discriminator_model():
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Conv2D(64, (5,5), strides=(2,2), padding='same', 
                                   input_shape=(128,128,3)))
  model.add(tf.keras.layers.LeakyReLU())
  model.add(tf.keras.layers.Dropout(0.3))

  model.add(tf.keras.layers.Conv2D(128, (5,5), strides=(2,2), padding='same'))
  model.add(tf.keras.layers.LeakyReLU())
  model.add(tf.keras.layers.Dropout(0.3))

  model.add(tf.keras.layers.Conv2D(256, (5,5), strides=(2,2), padding='same'))
  model.add(tf.keras.layers.LeakyReLU())
  model.add(tf.keras.layers.Dropout(0.3))
    
  model.add(tf.keras.layers.Conv2D(512, (5,5), strides=(1,1), padding='same'))
  model.add(tf.keras.layers.LeakyReLU())
  model.add(tf.keras.layers.Dropout(0.3))

  model.add(tf.keras.layers.Conv2D(1024, (5,5), strides=(2,2), padding='same'))
  model.add(tf.keras.layers.LeakyReLU())
  model.add(tf.keras.layers.Dropout(0.3))
    
  model.add(tf.keras.layers.Flatten())
  model.add(tf.keras.layers.Dense(1))
  
  return model

In [None]:
discriminator = create_discriminator_model()

In [None]:
# smple input
discriminator(sample, training=False)

In [None]:
def create_generator_model():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Dense(8*8*1024, input_shape=(1000,)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())
    
    model.add(tf.keras.layers.Reshape((8,8,1024)))
    assert model.output_shape == (None, 8, 8, 1024)
    
    model.add(tf.keras.layers.Conv2DTranspose(512, (5,5), strides=(2,2), padding='same'))
    assert model.output_shape == (None, 16, 16, 512)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())
    
    model.add(tf.keras.layers.Conv2DTranspose(256, (5,5), strides=(2,2), padding='same'))
    assert model.output_shape == (None, 32, 32, 256)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())
    
    model.add(tf.keras.layers.Conv2DTranspose(128, (5,5), strides=(2,2), padding='same'))
    assert model.output_shape == (None, 64, 64, 128)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Conv2DTranspose(64, (5,5), strides=(2,2), padding='same'))
    assert model.output_shape == (None, 128, 128, 64)
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.LeakyReLU())
    
    model.add(tf.keras.layers.Conv2DTranspose(3, (5,5), strides=(1,1), padding='same', activation='tanh'))
    assert model.output_shape == (None, 128, 128, 3)

    return model

In [None]:
generator = create_generator_model()

In [None]:
# smple input
noise = tf.random.uniform((1, 1000))
generated_image = generator(noise, training=False)
discriminator(generated_image, training=False)

In [None]:
plt.imshow((generated_image[0] + 1.0) * 127.5)

**Define loss function**

In [None]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    
    return total_loss

In [None]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

**Checkpoint**

In [None]:
checkpoint_dir = '/kaggle/output/training'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)


**Training**

In [None]:
EPOCHS = 10000
noise_dim = 1000
num_examples_to_generate = 16

seed = tf.random.normal([num_examples_to_generate, noise_dim])

In [None]:
@tf.function
def train_step(images):
    
    noise = tf.random.uniform((BATCH_SIZE, noise_dim))
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)
        
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)
        
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)
    
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

In [None]:
def generate_images(model, epoch, test_input):
  pred = model(test_input, training=False)

  fig = plt.figure(figsize=(4,4))

  for i in range(pred.shape[0]):
    plt.subplot(4, 4, i+1)
    plt.imshow(pred[i, :, :, 0], cmap='gray')
    plt.axis('off')
  plt.show()

In [None]:
def train(dataset, epochs):
    for epoch in range(epochs):
        start = time.time()
        for step in range(STEP_PER_EPOCH):
            batch = get_batch(dataset)
            train_step(batch)
        
        display.clear_output(wait=True)
        generated_image(generator, epoch, seed)
        if (epoch + 1) % 20 == 0:
          checkpoint.save(file_prefix = checkpoint_prefix)
        print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
    display.clear_output(wait=True)
    generated_image(generator, epoch, seed)

In [None]:
train(DATASET, EPOCHS)