In [5]:
import numpy as np
import matplotlib.pyplot as plt
import os
import time
from keras.optimizers import Adam, SGD
from keras.models import Sequential
from keras.layers import Dense, Reshape, Flatten,Conv2D,Conv2DTranspose,LeakyReLU,Dropout,UpSampling2D, BatchNormalization, Input, GaussianNoise
from keras.activations import tanh
import tensorflow as tf
import keras
import random

In [6]:
@keras.saving.register_keras_serializable(package="InstanceNormalization")
class InstanceNormalization(tf.keras.layers.Layer):
  """Instance Normalization Layer (https://arxiv.org/abs/1607.08022)."""

  def __init__(self, epsilon=1e-5):
    super(InstanceNormalization, self).__init__()
    self.epsilon = epsilon

  def build(self, input_shape):
    self.scale = self.add_weight(
        name='scale',
        shape=input_shape[-1:],
        initializer=tf.random_normal_initializer(1., 0.02),
        trainable=True)

    self.offset = self.add_weight(
        name='offset',
        shape=input_shape[-1:],
        initializer='zeros',
        trainable=True)

  def call(self, x):
    mean, variance = tf.nn.moments(x, axes=[1, 2], keepdims=True)
    inv = tf.math.rsqrt(variance + self.epsilon)
    normalized = (x - mean) * inv
    return self.scale * normalized + self.offset



In [7]:
# parameers 
Batch_size=64
IMG_H=128
IMG_W=128
learning_rate=0.0001
b1=0.0
latent_dim=100


In [8]:
inig=keras.initializers.random_normal(0, 0.02)

def Build_generator():
    model=Sequential()
    model.add(Input((100, )))
    model.add(Dense(4*4*512, use_bias=True, kernel_initializer=inig))
    model.add(LeakyReLU())

    model.add(Reshape((4,4,512)))# (4,4,256)

    # model.add(UpSampling2D())
    model.add(Conv2DTranspose(512, (4,4), (2,2), padding="same", use_bias=False, kernel_initializer=inig))# (8,8,512)
    model.add(InstanceNormalization())
    model.add(LeakyReLU())


    # model.add(UpSampling2D())
    model.add(Conv2DTranspose(256, (4,4), (2,2,), padding="same", use_bias=False, kernel_initializer=inig))# (16,16,256)
    model.add(InstanceNormalization())
    model.add(LeakyReLU())


    # model.add(UpSampling2D())
    model.add(Conv2DTranspose(128, (4,4), (2,2), padding="same", use_bias=False, kernel_initializer=inig))# (32,32,128)
    model.add(InstanceNormalization())
    model.add(LeakyReLU())


    # model.add(UpSampling2D())
    model.add(Conv2DTranspose(64, (4,4), (2,2), padding="same", use_bias=False, kernel_initializer=inig))# (64,64,64)
    model.add(InstanceNormalization())
    model.add(LeakyReLU())


    # model.add(UpSampling2D())
    model.add(Conv2DTranspose(32, (4,4), (2,2), padding="same", use_bias=False, kernel_initializer=inig))# (128,128,32)
    model.add(InstanceNormalization())
    model.add(LeakyReLU())


    model.add(Conv2D(3, (5,5), padding="same", activation="tanh", kernel_initializer=inig))#
    
    return model

Build_generator().summary()



Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 8192)              827392    
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 8192)              0         
                                                                 
 reshape (Reshape)           (None, 4, 4, 512)         0         
                                                                 
 conv2d_transpose (Conv2DTr  (None, 8, 8, 512)         4194304   
 anspose)                                                        
                                                                 
 instance_normalization (In  (None, 8, 8, 512)         1024      
 stanceNormalization)                                            
                                                                 
 leaky_re_lu_1 (LeakyReLU)   (None, 8, 8, 512)         0

In [9]:
inid=keras.initializers.random_normal(0, 0.02)
def Build_discrimnator():
    model=Sequential()
    model.add(Input((128,128,3)))

    model.add(Conv2D(32, (4,4), (2,2), padding="same", kernel_initializer=inid))# (64, 64, 32)
    model.add(LeakyReLU())

    model.add(Conv2D(64, (4,4), (2,2), padding="same", kernel_initializer=inid))# (32, 32, 64)
    model.add(LeakyReLU())

    model.add(Conv2D(128, (4,4), (2,2), padding="same", kernel_initializer=inid))# (16, 16, 128)
    model.add(LeakyReLU())

    model.add(Conv2D(256, (4,4), (2,2), padding="same", kernel_initializer=inid))# (8, 8, 256)
    model.add(LeakyReLU())

    model.add(Conv2D(512, (4,4), (2,2), padding="same", kernel_initializer=inid))# (4, 4, 512)
    model.add(LeakyReLU())

    model.add(Flatten())
    model.add(Dropout(0.25))

    model.add(Dense(1))

    return model


Build_discrimnator().summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_1 (Conv2D)           (None, 64, 64, 32)        1568      
                                                                 
 leaky_re_lu_6 (LeakyReLU)   (None, 64, 64, 32)        0         
                                                                 
 conv2d_2 (Conv2D)           (None, 32, 32, 64)        32832     
                                                                 
 leaky_re_lu_7 (LeakyReLU)   (None, 32, 32, 64)        0         
                                                                 
 conv2d_3 (Conv2D)           (None, 16, 16, 128)       131200    
                                                                 
 leaky_re_lu_8 (LeakyReLU)   (None, 16, 16, 128)       0         
                                                                 
 conv2d_4 (Conv2D)           (None, 8, 8, 256)        

In [12]:
class DCGAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.seed=tf.random.normal([16,100])
        self.Batch_size=64
        self.d_loss_tracker = keras.metrics.Mean(name="d_loss")
        self.g_loss_tracker = keras.metrics.Mean(name="g_loss")

    def compile(self, d_optimizer, g_optimizer):
        super().compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer



    # losses
    def discriminator_loss(self, real_img, fake_img):
        real_loss = tf.reduce_mean(real_img)
        fake_loss = tf.reduce_mean(fake_img)
        return fake_loss - real_loss


    # Define the loss functions for the generator.
    def generator_loss(self, fake_img):
        return -tf.reduce_mean(fake_img)


    def gradient_penalty(self, batch_size, real_images, fake_images):
        """Calculates the gradient penalty.

        This loss is calculated on an interpolated image
        and added to the discriminator loss.
        """
        # Get the interpolated image
        alpha = tf.random.uniform([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            # 1. Get the discriminator output for this interpolated image.
            pred = self.discriminator(interpolated, training=True)

        # 2. Calculate the gradients w.r.t to this interpolated image.
        grads = gp_tape.gradient(pred, [interpolated])[0]
        # 3. Calculate the norm of the gradients.
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp
    

    def show_pro(self, epoch):
        predictions = self.generator(self.seed, training=False)
        predictions=(predictions+1)/2.0

        fig = plt.figure(figsize=(5, 5))

        for i in range(16):
            plt.subplot(4, 4, i+1)
            plt.imshow(predictions[i])
            plt.axis('off')
        
        plt.savefig('Images/image_at_epoch_{:04d}.png'.format(epoch))
        plt.show()
        
        
    def Train_disc(self, Images):
        noise = tf.random.normal([self.Batch_size, self.latent_dim])
        generated_images = self.generator(noise, training=False)
        with tf.GradientTape() as disc_tape2:
            real_out=self.discriminator(Images, training=True)
            fake_out=self.discriminator(generated_images, training=True)

            d_cost = self.discriminator_loss(real_out, fake_out)
            gp = self.gradient_penalty(Batch_size, Images, generated_images)
            disc_loss= d_cost + gp * 10

        gradients_of_discriminator = disc_tape2.gradient(disc_loss, self.discriminator.trainable_variables)
        self.d_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))





    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]


        self.Batch_size= tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(self.Batch_size, self.latent_dim))

        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            gen_img=self.generator(random_latent_vectors)

            real_out=self.discriminator(real_images)
            fake_out=self.discriminator(gen_img)

            gen_loss=self.generator_loss(fake_out)
            d_cost = self.discriminator_loss(real_out, fake_out)
            gp = self.gradient_penalty(self.Batch_size, real_images, gen_img)
            disc_loss= d_cost + gp * 10


        gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))

        gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
        self.d_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))

#       Train discriminator for 5 more epochs 
        for _ in range(5):
            self.Train_disc(real_images)


        # Update metrics and return their value.
        self.d_loss_tracker.update_state(disc_loss)
        self.g_loss_tracker.update_state(gen_loss)
        return {
            "d_loss": self.d_loss_tracker.result(),
            "g_loss": self.g_loss_tracker.result()
        }

In [14]:
from IPython import display
class CustomCallback(keras.callbacks.Callback):

    def __init__(self, ckmanager):
        super().__init__()
        self.ckpt_manager=ckmanager

    def on_train_begin(self, logs=None):
        pass

    def on_epoch_begin(self, epoch, logs=None):
        pass
        

    def on_epoch_end(self, epoch, logs=None):
        display.clear_output(wait=True)
        self.model.show_pro(epoch+600)
        self.ckpt_manager.save()
        print("Model Saved")
        
    def on_train_batch_begin(self, batch, logs=None):
        pass      

    def on_train_batch_end(self, batch, logs=None):
        pass

In [129]:
with tf.device("/device:GPU:0"):
    generator=Build_generator()
    discriminator=Build_discrimnator()
    DCgan=DCGAN(discriminator, generator, 100)
    
# generator.load_weights("Models/generator_weights.h5")
# discriminator.load_weights("Models/discriminator_weights.h5")

opt_gen=Adam(learning_rate=0.00008, beta_1=0.5)
opt_desc=Adam(learning_rate=0.00004, beta_1=0.5)
DCgan.compile(d_optimizer=opt_desc, g_optimizer=opt_gen)

In [17]:

# CHECK POINTS
checkpoint_path = "./checkpoints/train"

ckpt = tf.train.Checkpoint(generator=generator,
                           discriminator=discriminator,
                           generator_optimizer=opt_gen,
                           discriminator_optimizer=opt_desc,)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)


In [None]:
# if a checkpoint exists, restore the latest checkpoint.
if ckpt_manager.latest_checkpoint:
  ckpt.restore(ckpt_manager.latest_checkpoint)
  print ('Latest checkpoint restored!!')

In [18]:
def load(image_path):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_jpeg(img)
    img = tf.image.resize(img, [128, 128])
    img = tf.cast(img, tf.float32)
    img = (img - 127.5) / 127.5
    return img
    
dataset=tf.data.Dataset.list_files(Dataset_path).map(load).batch(Batch_size, drop_remainder=True)

In [None]:
# Start Training 
DCgan.fit(dataset, epochs=600, callbacks=[CustomCallback(ckpt_manager)], verbose=2)

In [None]:
#  show some images form dataset
iterr=next(dataset.take(1).as_numpy_iterator())
print(np.max(iterr[4]))
plt.imshow((iterr[4]+1)/2)
plt.show()

In [29]:
# random noise for checking model performance
noise=tf.random.normal([1, 100])

In [None]:
img=generator(noise, training=False)
img=(img[0]+1)/2
plt.imshow(img)
plt.show()

In [41]:
# save model 
generator.save("Models/gennrator.h5")
discriminator.save("Models/discromonator.h5")

In [None]:
#  Show The Dataset Images
iterr=next(dataset.take(1).as_numpy_iterator())
def show_pro(predictions):
    predictions=(predictions+1)/2

    fig = plt.figure(figsize=(10, 10))

    for i in range(len(predictions)):
        plt.subplot(8, 8, i+1)
        plt.imshow(predictions[i])
        plt.axis('off')
    plt.show()

show_pro(iterr)