In [1]:
import tensorflow as tf
import PIL 
from tensorflow import keras
from matplotlib import pyplot as plt
from datetime import datetime
import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model

***
CONFIDENTIAL AND PROPRIETARY

*COPYRIGHT Â© [2025] [Radu-Constantin Flesar]*

*ALL RIGHTS RESERVED. UNAUTHORIZED COPYING*


*REPRODUCTION, OR DISTRIBUTION OF THIS*


*CODE, IN WHOLE OR IN PART, IS STRICTLY*

*PROHIBITED WITHOUT PRIOR WRITTEN PERMISSION FROM THE AUTHORS:*


Flesar Radu-Constantin (radu.flesar02@e-uvt.ro)
***

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Initialize the ImageDataGenerator
datagen = ImageDataGenerator(rescale=1./255)

# Load the images from the directory
folder_path = "./DataCT/"
image_dataset = datagen.flow_from_directory(
    directory=folder_path,
    target_size=(512, 512),
    batch_size=32
)



In [None]:
print(image_dataset.class_indices)

In [3]:
def build_generator():
    model = Sequential()
    model.add(layers.Dense(64 * 64 * 512, input_dim=100))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    model.add(layers.Reshape((64, 64, 512)))
    
    model.add(layers.Conv2DTranspose(256, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    
    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', activation='tanh'))
    
    return model

def build_discriminator():
    model = Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=(512, 512, 3)))
    model.add(layers.LeakyReLU(0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Conv2D(512, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU(0.2))
    model.add(layers.Dropout(0.3))
    
    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation='sigmoid'))
    
    return model

In [4]:
discriminator = build_discriminator()
generator = build_generator()


2025-02-17 03:18:43.628141: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2 Max
2025-02-17 03:18:43.628184: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 64.00 GB
2025-02-17 03:18:43.628191: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 24.00 GB
2025-02-17 03:18:43.628263: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2025-02-17 03:18:43.628313: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [5]:
generator.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_1 (Dense)             (None, 2097152)           211812352 
                                                                 
 batch_normalization_3 (Bat  (None, 2097152)           8388608   
 chNormalization)                                                
                                                                 
 leaky_re_lu_4 (LeakyReLU)   (None, 2097152)           0         
                                                                 
 reshape (Reshape)           (None, 64, 64, 512)       0         
                                                                 
 conv2d_transpose (Conv2DTr  (None, 128, 128, 256)     3277056   
 anspose)                                                        
                                                                 
 batch_normalization_4 (Bat  (None, 128, 128, 256)    

In [6]:
discriminator.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 256, 256, 64)      4864      
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 256, 256, 64)      0         
                                                                 
 dropout (Dropout)           (None, 256, 256, 64)      0         
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 128, 128)     204928    
                                                                 
 batch_normalization (Batch  (None, 128, 128, 128)     512       
 Normalization)                                                  
                                                                 
 leaky_re_lu_1 (LeakyReLU)   (None, 128, 128, 128)     0         
                                                        

In [29]:
g_opt = keras.optimizers.legacy.Adam(learning_rate=0.00001) 
d_opt = keras.optimizers.legacy.Adam(learning_rate=0.00001) 
g_loss = keras.losses.BinaryCrossentropy()
d_loss = keras.losses.BinaryCrossentropy()

In [30]:
class GAN(Model): 
    def __init__(self, generator, discriminator, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.generator = generator 
        self.discriminator = discriminator 
        
    def compile(self, g_opt, d_opt, g_loss, d_loss, *args, **kwargs): 
        super().compile(*args, **kwargs)
    
        self.g_opt = g_opt
        self.d_opt = d_opt
        self.g_loss = g_loss
        self.d_loss = d_loss 

    def train_step(self, batch):
        real_images, _ = batch
        tf.random.set_seed(datetime.now().microsecond)
        batch_size = tf.shape(real_images)[0]
        fake_images = self.generator(tf.random.normal([32, 100]), training=False)
        
        with tf.GradientTape() as d_tape: 
            yhat_real = self.discriminator(real_images, training=True) 
            yhat_fake = self.discriminator(fake_images, training=True)
            yhat_realfake = tf.concat([yhat_real, yhat_fake], axis=0)
            
            y_realfake = tf.concat([tf.zeros_like(yhat_real), tf.ones_like(yhat_fake)], axis=0)
            
            noise_real = 0.15*tf.random.uniform(tf.shape(yhat_real))
            noise_fake = -0.15*tf.random.uniform(tf.shape(yhat_fake))
            y_realfake += tf.concat([noise_real, noise_fake], axis=0)
            
            total_d_loss = self.d_loss(y_realfake, yhat_realfake)
            
        # Backpropagation
        dgrad = d_tape.gradient(total_d_loss, self.discriminator.trainable_variables) 
        self.d_opt.apply_gradients(zip(dgrad, self.discriminator.trainable_variables))
        
        with tf.GradientTape() as g_tape: 
            gen_images = self.generator(tf.random.normal([32, 100]), training=True)
                                        
            predicted_labels = self.discriminator(gen_images, training=False)
                                        
            total_g_loss = self.g_loss(tf.zeros_like(predicted_labels), predicted_labels) 
            
        ggrad = g_tape.gradient(total_g_loss, self.generator.trainable_variables)
        self.g_opt.apply_gradients(zip(ggrad, self.generator.trainable_variables))
        
        return {"d_loss":total_d_loss, "g_loss":total_g_loss}
    
    def fitt(self, data, epochs=1, verbose=1):
        for epoch in range(epochs):
            print(f"Epoch {epoch+1}/{epochs}")
            for i, batch in enumerate(data):
                result = self.train_step(batch)
                if verbose and i % 100 == 0:
                    print(f"Step {i}: discriminator loss = {result['d_loss']}, generator loss = {result['g_loss']}")

In [31]:
gan = GAN(generator, discriminator)

In [32]:
gan.compile(g_opt, d_opt, g_loss, d_loss)

In [33]:
import os
from tensorflow.keras.preprocessing.image import array_to_img
from tensorflow.keras.callbacks import Callback

In [34]:
import os
from tensorflow.keras.preprocessing.image import array_to_img
from tensorflow.keras.callbacks import Callback, EarlyStopping
class ModelMonitor(Callback):
    def __init__(self, num_img=2, latent_dim=100):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.uniform((self.num_img, self.latent_dim))
        generated_images = generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = array_to_img(generated_images[i]) 
            path = os.path.join('/Users/raduflesar/Documents/CTs/images/', f'generated_img_{epoch}_{i}.png')
            img.save(path)

        # Save model every 10 epochs
        if epoch % 10 == 0:
            generator.save(os.path.join('/Users/raduflesar/Documents/CTs/Models/', f'generator_{epoch}.h5'))
            discriminator.save((os.path.join('/Users/raduflesar/Documents/CTs/Models/', f'discriminator_{epoch}.h5')))

early_stopping = EarlyStopping(
    monitor='loss',
    patience=500,
    restore_best_weights=True
)

`Train`

In [None]:
model = gan.fit(image_dataset, epochs=40000, callbacks=[ModelMonitor(), early_stopping])