In [None]:
# Import

import os
import time
import cv2 as cv
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.utils import to_categorical
from keras.models import Sequential, Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.layers import Input, Dense, BatchNormalization, Conv2D, Conv2DTranspose, ReLU, LeakyReLU, Flatten, MaxPooling2D, Dropout, Reshape

In [None]:
# GPU Initialization

gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)


In [None]:
# Initialization

BUFFER_SIZE = 16000
BATCH_SIZE = 16
batch_size = BATCH_SIZE
EPOCHS = 30
latent_dim = 128
input_size = [128, 128, 3]
image_size = (128, 128)


In [None]:
# Data Preprocessing

datagen = ImageDataGenerator(
    rescale=1./255,   
)

image_directory = 'C:\\Users\\HARSH\\OneDrive\\Desktop\\GAN Project\\dataset_download\\chest_xray'

dataset= datagen.flow_from_directory(
    os.path.join(image_directory, 'train'),   
    classes=['PNEUMONIA'],   
    target_size=image_size,        
    batch_size=BATCH_SIZE,      
    class_mode='binary',        
    shuffle=True                 
)

len(dataset)

In [None]:
# Generator Model

def gen_model():
    model = Sequential([
        Input(shape=(latent_dim,)),
        Dense(8 * 8 * 128),  # Reduced number of units in the Dense layer
        Reshape((8, 8, 128)),
        Conv2DTranspose(128, kernel_size=4, strides=2, padding='same'), 
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128, kernel_size=4, strides=2, padding='same'),  # Reduced number of filters
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2D(3, kernel_size=4, padding='same', activation='sigmoid')
    ], name="generator")    
    return model

In [None]:
# Discriminator Model

def disc_model():
    model = Sequential([
        Input(shape=input_size),
        Conv2D(128, kernel_size=4, strides=2, padding='same'),  # Reduced number of filters
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(128, kernel_size=4, strides=2, padding='same'),  # Reduced number of filters
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(128, kernel_size=4, strides=2, padding='same'),  # Reduced number of filters
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        Conv2D(128, kernel_size=4, strides=2, padding='same'),  # Reduced number of filters
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        
        Flatten(),
        Dense(128),  # Reduced size of the dense layer
        LeakyReLU(alpha=0.1),
        Dropout(0.2),
        Dense(1, activation='sigmoid')
    ], name="discriminator")
    return model

In [None]:
generator = gen_model()
discriminator = disc_model()

generator.summary()
discriminator.summary()


#helper funtion to help us with loadidng images in batches
def image_loader(generator):
    for images, labels in generator:
        yield images, labels

In [None]:
# GAN with Custom Traning Step

#gan model with custom gradient calculation
class Gan(Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
    
    def compile(self, disc_opt, gen_opt, loss_function):
        super().compile()
        self.disc_opt = disc_opt
        self.gen_opt = gen_opt
        self.loss_function = loss_function
        self.disc_loss_metric = tf.keras.metrics.Mean(name = "disc_loss")
        self.gen_loss_metric = tf.keras.metrics.Mean(name = "gen_loss")
        
    @property
    def metrics(self):
        return [self.disc_loss_metric, self.gen_loss_metric]
    
    def train_step(self, data):  # Modify the function to accept labels separately
        real_images, real_labels = data
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Fake image decoding
        generated_images = self.generator(random_latent_vectors)
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Concatenate the real and fake labels
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        labels += 0.05*tf.random.uniform(tf.shape(labels))
        
        
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            disc_loss = self.loss_function(labels, predictions)    
        grads  = tape.gradient(disc_loss, self.discriminator.trainable_weights)
        self.disc_opt.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )
        
        random_latent_vectors = tf.random.normal(shape = (batch_size,self.latent_dim))
        misleading_labels = tf.zeros((batch_size, 1))
        
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))    
            gen_loss = self.loss_function(misleading_labels, predictions)
        grads = tape.gradient(gen_loss, self.generator.trainable_weights)
        self.gen_opt.apply_gradients(zip(grads, self.generator.trainable_weights))
        
        self.disc_loss_metric.update_state(disc_loss)
        self.gen_loss_metric.update_state(gen_loss)
        return{
            "disc_loss": self.disc_loss_metric.result(),
            "gen_loss": self.gen_loss_metric.result()
        }

def gen_images(generator, current_epoch, latent_dim, num_samples=2):
    noise = tf.random.normal([num_samples, latent_dim], dtype=tf.float32)
    generated_images = generator(noise, training=False)

    fig, axs = plt.subplots(2, 2, figsize=(20, 20))
    for i, ax in enumerate(axs.flatten()):
        ax.imshow(generated_images[i, :, :, 0], cmap='gray')
        ax.set_title(f"After epoch {current_epoch}")
        ax.axis('off')

    plt.tight_layout()
    plt.savefig(f'After_epochs_{current_epoch:04d}.png')
    plt.show()

In [None]:
# Callbacks 

from google.colab import drive
drive.mount('/content/drive')

#callbacks. We are showing progress of gan and also saving samples after each epochs
class Gan_Callback(tf.keras.callbacks.Callback):
    def __init__(self, num_images=2, latent_dim = 128):
        self.num_images = num_images
        self.latent_dim = latent_dim       
    
    def on_epoch_end(self, epoch, logs =None):
        latent_vectors = tf.random.normal(shape = (self.num_images, latent_dim))
        generated_images = self.model.generator(latent_vectors)
        generated_images *=255
        generated_images.numpy()
        figure = plt.figure(figsize=(10,10))
        for i in range(generated_images.shape[0]):
            plt.subplot(2, 2,i+1)
            plt.imshow(generated_images[i, :, :, 0, ], cmap='gray')
            plt.title(f"After epoch {epoch+1}")
            plt.axis('off')
        plt.savefig('After epochs{:04d}.png'.format(epoch+1))
        plt.show()
        if(epoch % 10 ==0):
            self.model.generator.save('/content/drive/MyDrive/gen.h5')
            self.model.discriminator.save('/content/drive/MyDrive/disc.h5')

gan = Gan(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
        disc_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5), 
        gen_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
        # Parallel gpu computing won't work unless  we pass reduction=tf.keras.losses.Reduction.NONE as a parameter too.
        loss_function=tf.keras.losses.BinaryCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE),
    )

In [None]:
# Traning

#actual traing begins here
history = gan.fit(
    image_loader(dataset), 
    epochs=EPOCHS,
    steps_per_epoch=len(dataset),  
    callbacks=[Gan_Callback(num_images=4, latent_dim=latent_dim)]
)