In [None]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

In [None]:
import pandas
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tqdm import tqdm
import cv2
from tensorflow.keras import layers, Sequential, datasets, Model
import keras.src.saving
import warnings
warnings.filterwarnings('ignore')

In [None]:
data_folder = "/home/dhawi/Documents/dataset"
dataset = data_folder + "/AI_project"
model_folder = "/home/dhawi/Documents/model"
history_folder = "/home/dhawi/Documents/History"

In [None]:
IMG_WIDTH  = 128
IMG_HEIGHT = 128

In [None]:
from tqdm import tqdm
import cv2
def load_images_from_folder(folder, subfolder):
    images = []
    gray = []
    # lab = []
    foldername = os.path.join(folder, subfolder)
    for sub in os.listdir(foldername):
        subfoldername = os.path.join(foldername, sub)
        for filename in tqdm(os.listdir(subfoldername)):
            img = cv2.imread(os.path.join(subfoldername, filename))
            img = cv2.resize(img, (IMG_HEIGHT, IMG_WIDTH))
        # convert the image to RGB (images are read in BGR in OpenCV)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            gry = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
            if img is not None:
                images.append(img/255.0)
                gray.append(gry)
                # lab.append(label)
    return np.array(images), np.array(gray)

In [None]:
caries, caries_gray = load_images_from_folder(dataset, "Caries")
gingivitis, gingivitis_gray = load_images_from_folder(dataset, "Gingivitis")
wsl, wsl_gray = load_images_from_folder(dataset, "White Spot Lesion")

In [None]:
fixed_noise = tf.random.normal((60, 1024))

In [None]:
from keras import ops
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

In [None]:
def show_images(images):
    for i in range(20):
        plt.subplot(4, 5, i + 1)
        plt.imshow(images[i])
        plt.xticks([])
        plt.yticks([])
    plt.show()
# show_images(X_train)

In [None]:
def build_discriminator():
    Discriminator = Sequential([
        # First convolutional layer
        layers.Conv2D(256, kernel_size=(3, 3), strides=2, padding='same', input_shape=(128, 128, 3)),
        layers.LeakyReLU(alpha=0.2),  # Leaky ReLU activation

        # Second convolutional layer
        layers.Conv2D(128, kernel_size=(3, 3), strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.BatchNormalization(),  # Batch normalization for stability

        # Third convolutional layer
        layers.Conv2D(64, kernel_size=(3, 3), strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.BatchNormalization(),

        # Flatten and final output layer
        layers.Flatten(),
        layers.Dense(1)  # Output single value (validity score)
    ])
    return Discriminator

In [None]:
def build_generator():
    Generator = Sequential([
        # First dense layer
        layers.Dense(8 * 8 * 128, input_shape=(1024,)),
        layers.LeakyReLU(alpha=0.2),

        # Reshape the dense output into (8, 8, 128)
        layers.Reshape((8, 8, 128)),

        # First Conv2DTranspose layer: Upsamples to (16, 16, 128)
        layers.Conv2DTranspose(128, kernel_size=(3, 3), strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.BatchNormalization(),

        # Second Conv2DTranspose layer: Upsamples to (32, 32, 64)
        layers.Conv2DTranspose(64, kernel_size=(3, 3), strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.BatchNormalization(),

        # Third Conv2DTranspose layer: Upsamples to (64, 64, 32)
        layers.Conv2DTranspose(32, kernel_size=(3, 3), strides=2, padding='same'),
        layers.LeakyReLU(alpha=0.2),
        layers.BatchNormalization(),

        # Fourth Conv2DTranspose layer: Upsamples to (128, 128, 3) with sigmoid activation for image output
        layers.Conv2DTranspose(3, kernel_size=(3, 3), strides=2, padding='same', activation='sigmoid')
    ])
    return Generator

In [None]:
class WGAN(Model):
    def __init__(self, generator, discriminator, latent_dim, lambda_gp=10):
        super(WGAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        self.latent_dim = latent_dim
        self.lambda_gp = lambda_gp
        self.history = {"Generator Loss": [], "Discriminator Loss": []}

    def compile(self, gen_optimizer, disc_optimizer, criterion=None):
        super(WGAN, self).compile()
        self.generator_optimizer = gen_optimizer
        self.discriminator_optimizer = disc_optimizer
        self.cross_entropy = criterion

    def gradient_penalty(self, real_images, fake_images):
        batch_size = tf.shape(real_images)[0]
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        interpolated_images = real_images * alpha + fake_images * (1 - alpha)
        
        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated_images)
            pred = self.discriminator(interpolated_images, training=True)
        
        grads = gp_tape.gradient(pred, interpolated_images)
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def discriminator_loss(self, real_output, fake_output, gp):
        real_loss = -tf.reduce_mean(real_output)
        fake_loss = tf.reduce_mean(fake_output)
        total_loss = real_loss + fake_loss + self.lambda_gp * gp
        return total_loss

    def generator_loss(self, fake_output):
        return -tf.reduce_mean(fake_output)

    def train_step(self, real_images):
        batch_size = tf.shape(real_images)[0]
        
        # Sample random noise
        noise = tf.random.normal((batch_size, self.latent_dim))
        
        # Train Discriminator
        for _ in range(5):  # WGAN usually trains the discriminator more than the generator
            with tf.GradientTape() as disc_tape:
                fake_images = self.generator(noise, training=True)
                
                real_output = self.discriminator(real_images, training=True)
                fake_output = self.discriminator(fake_images, training=True)

                # Gradient Penalty
                gp = self.gradient_penalty(real_images, fake_images)
                
                # Calculate the loss for the discriminator
                disc_loss = self.discriminator_loss(real_output, fake_output, gp)
            
            disc_grads = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
            self.discriminator_optimizer.apply_gradients(zip(disc_grads, self.discriminator.trainable_variables))

        # Train Generator
        with tf.GradientTape() as gen_tape:
            generated_images = self.generator(noise, training=True)
            fake_output = self.discriminator(generated_images, training=True)
            gen_loss = self.generator_loss(fake_output)

        gen_grads = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        self.generator_optimizer.apply_gradients(zip(gen_grads, self.generator.trainable_variables))

        # Return loss metrics for Keras's `fit` to log
        return {"Generator Loss": gen_loss, "Discriminator Loss": disc_loss}




In [None]:
def build_model(model_name):
    # clear the session for a clean run
    keras.backend.clear_session()
    vae_encoder = model_folder + "/" + model_name + "_encoder.h5"
    vae_decoder = model_folder + "/" + model_name + "_decoder.h5"
    encoder = keras.src.saving.load_model(vae_encoder, custom_objects={'Sampling': Sampling}, compile=False)
    decoder = keras.src.saving.load_model(vae_decoder, compile=False)
    Generator = build_generator()
    Discriminator = build_discriminator()
    edgan_model = EDGAN(Generator, Discriminator, encoder, decoder)
    edgan_model.compile(gen_optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001),
                  disc_optimizer = tf.keras.optimizers.Adam(learning_rate = 0.0001),
                  criterion = tf.keras.losses.BinaryCrossentropy(True))
    
    return edgan_model

In [None]:
import matplotlib.pyplot as plt
import json

def show_history(history, model_name):
    plt.plot(history.history['Generator Loss'])
    plt.plot(history.history['Discriminator Loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    # plt.legend(['train', 'test'], loc='upper left')
    plt.show()
    
    # Get the dictionary containing each metric and the loss for each epoch
    history_dict = history.history
    # Save it under the form of a json file
    history_file = history_folder + "/" + model_name + "_history.json"
    json.dump(history_dict, open(history_file, 'w'))

In [None]:
def save_model(model, model_name):
    generator_path = model_folder + "/" + model_name + "_encoder.h5"
    discriminator_path = model_folder + "/" + model_name + "_decoder.h5"
    model_path = model_folder + "/" + model_name + "_model.h5"
    model.generator.save(generator_path)
    model.discriminator.save(discriminator_path)
    model.save(model_path)

In [None]:
# Initialize the models
generator = build_generator()
discriminator = build_discriminator()

# Optimizers
optimizer_G = tf.keras.optimizers.Adam(learning_rate=opt.lr, beta_1=opt.b1, beta_2=opt.b2)
optimizer_D = tf.keras.optimizers.Adam(learning_rate=opt.lr, beta_1=opt.b1, beta_2=opt.b2)

# Loss functions
def d_loss_fn(real_validity, fake_validity, gradient_penalty_loss):
    return -tf.reduce_mean(real_validity) + tf.reduce_mean(fake_validity) + lambda_gp * gradient_penalty_loss

def g_loss_fn(fake_validity):
    return -tf.reduce_mean(fake_validity)