In [None]:
import os
import time
import cv2 as cv
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.utils import np_utils
from keras.models import Sequential, Model
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras.layers import Input, Dense, BatchNormalization, Conv2D, Conv2DTranspose, ReLU, LeakyReLU, Flatten, MaxPooling2D, Dropout, Reshape

# Setting up distributed strategy for multi-GPU training
strategy = tf.distribute.MirroredStrategy()
print('DEVICES AVAILABLE: {}'.format(strategy.num_replicas_in_sync))

# Hyperparameters
BUFFER_SIZE = 64000
BATCH_SIZE = 32 * strategy.num_replicas_in_sync
EPOCHS = 50
latent_dim = 128
input_size = [512, 512, 3]  # Changed input size to 512x512
image_size = (512, 512)

# Data Augmentation
datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Load dataset
image_directory = 'chest-xray-pneumonia/chest_xray'
dataset = datagen.flow_from_directory(
    os.path.join(image_directory, 'train'),
    classes=['PNEUMONIA'],
    target_size=image_size,
    batch_size=BATCH_SIZE,
    class_mode='binary',
    shuffle=True
)

# Define Generator Model
def gen_model():
    model = Sequential([
        Input(shape=(latent_dim,)),
        Dense(8 * 8 * 256),
        Reshape((8, 8, 256)),
        Conv2DTranspose(128 * 2, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128 * 4, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128 * 4, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128 * 8, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2DTranspose(128 * 8, kernel_size=4, strides=2, padding='same'),
        LeakyReLU(alpha=0.1),
        Conv2D(3, kernel_size=4, padding='same', activation='sigmoid')
    ], name="generator")
    return model

# Define Discriminator Model
def disc_model():
    model = Sequential([
        Input(shape=input_size),
        Conv2D(256, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(256 * 2, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Conv2D(256 * 4, kernel_size=4, strides=2, padding='same'),
        BatchNormalization(),
        LeakyReLU(alpha=0.1),
        MaxPooling2D(strides=2),
        Flatten(),
        Dense(256 * 4),
        LeakyReLU(alpha=0.1),
        Dropout(0.2),
        Dense(1, activation='sigmoid')
    ], name="discriminator")
    return model

with strategy.scope():
    generator = gen_model()
    discriminator = disc_model()

# Display model summaries
generator.summary()
discriminator.summary()

# Helper function to load images in batches
def image_loader(generator):
    for images, labels in generator:
        yield images, labels

# Custom GAN Class
class Gan(Model):
    def __init__(self, discriminator, generator, latent_dim):
        super().__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, disc_opt, gen_opt, loss_function):
        super().compile()
        self.disc_opt = disc_opt
        self.gen_opt = gen_opt
        self.loss_function = loss_function
        self.disc_loss_metric = tf.keras.metrics.Mean(name="disc_loss")
        self.gen_loss_metric = tf.keras.metrics.Mean(name="gen_loss")

    @property
    def metrics(self):
        return [self.disc_loss_metric, self.gen_loss_metric]

    # Custom training step
    def train_step(self, data):
        real_images, real_labels = data
        batch_size = tf.shape(real_images)[0]

        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
        generated_images = self.generator(random_latent_vectors)
        combined_images = tf.concat([generated_images, real_images], axis=0)

        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            disc_loss = self.loss_function(labels, predictions)

        grads = tape.gradient(disc_loss, self.discriminator.trainable_weights)
        self.disc_opt.apply_gradients(zip(grads, self.discriminator.trainable_weights))

        misleading_labels = tf.zeros((batch_size, 1))

        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            gen_loss = self.loss_function(misleading_labels, predictions)

        grads = tape.gradient(gen_loss, self.generator.trainable_weights)
        self.gen_opt.apply_gradients(zip(grads, self.generator.trainable_weights))

        self.disc_loss_metric.update_state(disc_loss)
        self.gen_loss_metric.update_state(gen_loss)

        return {
            "disc_loss": self.disc_loss_metric.result(),
            "gen_loss": self.gen_loss_metric.result(),
        }

# Instantiate and compile the GAN
with strategy.scope():
    gan = Gan(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
    gan.compile(
        disc_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
        gen_opt=tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
        loss_function=tf.keras.losses.BinaryCrossentropy(),
    )

# Callback for generating images
class Gan_Callback(tf.keras.callbacks.Callback):
    def __init__(self, model, latent_dim, filename='gan_generated.png'):
        super(Gan_Callback, self).__init__()
        self.model = model
        self.latent_dim = latent_dim
        self.filename = filename

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(shape=(16, self.latent_dim))
        generated_images = self.model.generator(random_latent_vectors)
        generated_images = tf.clip_by_value(generated_images, 0.0, 1.0)
        generated_images = generated_images.numpy() * 255
        generated_images = generated_images.astype(np.uint8)

        # Save generated images as a grid
        fig, axes = plt.subplots(4, 4, figsize=(8, 8))
        for i, ax in enumerate(axes.flatten()):
            ax.imshow(generated_images[i])
            ax.axis('off')
        plt.tight_layout()
        plt.savefig(self.filename)
        plt.close(fig)

# Start training the GAN
gan_callback = Gan_Callback(model=gan, latent_dim=latent_dim)
gan.fit(
    image_loader(dataset),
    epochs=EPOCHS,
    steps_per_epoch=len(dataset) // BATCH_SIZE,
    callbacks=[gan_callback]
)

# Save the models
generator.save('generator_model.h5')
discriminator.save('discriminator_model.h5')
print("Models saved successfully!")

# Calculate FID score for evaluation (optional)
# This requires an implementation of the FID score calculation

def calculate_inception_score(generator, latent_dim, n_samples=100):
    from tensorflow.keras.applications import InceptionV3
    from tensorflow.keras.applications.inception_v3 import preprocess_input

    # Load InceptionV3 model for feature extraction
    inception_model = InceptionV3(include_top=False, pooling='avg')

    # Generate fake images
    noise = np.random.normal(0, 1, (n_samples, latent_dim))
    fake_labels = np.random.randint(0, 2, n_samples)  # Random binary labels
    generated_images = generator.predict([noise, fake_labels])

    # Preprocess images for InceptionV3
    generated_images_rescaled = preprocess_input(generated_images)
    predictions = inception_model.predict(generated_images_rescaled)

    # Calculate Inception Score
    p_y = np.mean(predictions, axis=0)  # Marginal distribution
    kl_div = predictions * (np.log(predictions + 1e-9) - np.log(p_y + 1e-9))
    inception_score = np.exp(np.mean(np.sum(kl_div, axis=1)))

    print(f'Inception Score: {inception_score}')
    return inception_score

def calculate_fid(generator, real_images, latent_dim, n_samples=100):
    from scipy.linalg import sqrtm

    # Generate fake images
    noise = np.random.normal(0, 1, (n_samples, latent_dim))
    fake_labels = np.random.randint(0, 2, n_samples)
    generated_images = generator.predict([noise, fake_labels])

    # Calculate mean and covariance for real and generated images
    mu1, sigma1 = np.mean(real_images, axis=0), np.cov(real_images, rowvar=False)
    mu2, sigma2 = np.mean(generated_images, axis=0), np.cov(generated_images, rowvar=False)

    # Calculate FID
    ssdiff = np.sum((mu1 - mu2) ** 2.0)
    covmean = sqrtm(sigma1.dot(sigma2))
    if np.iscomplexobj(covmean):
        covmean = covmean.real

    fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
    print(f'FID: {fid}')
    return fid

from tensorflow.keras.optimizers import Adam

# Example of a hyperparameter tuning process
def compile_generator_with_hyperparams(latent_dim, lr, beta_1):
    generator = build_generator(latent_dim, num_classes=2)
    optimizer = Adam(learning_rate=lr, beta_1=beta_1)
    generator.compile(loss='binary_crossentropy', optimizer=optimizer)
    return generator

# Try different hyperparameters
generator = compile_generator_with_hyperparams(latent_dim=128, lr=0.0002, beta_1=0.5)

# Save the generator and discriminator models
generator.save('generator_model.h5')
discriminator.save('discriminator_model.h5')

# Save the entire cGAN model
cgan.save('cgan_model.h5')

# Load the models back for future use
from tensorflow.keras.models import load_model
loaded_generator = load_model('generator_model.h5')
loaded_discriminator = load_model('discriminator_model.h5')
loaded_cgan = load_model('cgan_model.h5')

import datetime

log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# Add TensorBoard callback to training loop
cgan.fit(dataset, epochs=50, callbacks=[tensorboard_callback])

def generate_and_save_images(generator, latent_dim, class_label, output_dir='generated_images'):
    os.makedirs(output_dir, exist_ok=True)
    noise = np.random.normal(0, 1, (10, latent_dim))
    labels = np.array([class_label] * 10)
    generated_images = generator.predict([noise, labels])

    for i, img in enumerate(generated_images):
        plt.imsave(f"{output_dir}/image_{i}.png", img.reshape(128, 128), cmap='gray')

generate_and_save_images(loaded_generator, latent_dim=128, class_label=1)