<h2 style="font-family:comic sans ms; text-align:center; color:cyan"> Surface Defect Image Generation using DCGAN (Tensorflow) </h1>

In [2]:
import os
import random
import warnings
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras import models, layers, Sequential
from tensorflow.keras.callbacks import Callback, ModelCheckpoint
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.layers import (Input, Reshape, Dropout, Dense, Flatten, BatchNormalization, 
                                        Activation, LeakyReLU, ReLU, PReLU, Conv2D, Conv2DTranspose,
                                        RandomTranslation, RandomRotation, RandomFlip)
warnings.filterwarnings('ignore')

In [3]:
# Set up TensorFlow to use the GPU (execute this section only if training on a GPU is intended)
gpus = tf.config.experimental.list_physical_devices('GPU')  # Retrieve a list of all detected GPU devices
if gpus:  # Verify the presence of available GPUs
    try:
        # Limit TensorFlow to using only the first GPU detected
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        print("Using GPU") 
    except RuntimeError as e:
        # Print any errors that occur during GPU configuration
        print(e)

In [4]:
# Parameters
IMG_SIZE = 256  # Original image size: [200, 200, 3]
N_CHANNELS = 3
GEN_INPUT_SIZE = 100   # size of a vector given as input to the generator
BATCH_SIZE = 32

In [11]:
# To combine images from training and validation directory
train_dir = "./NEU-DET/train/images/scratches"
val_dir = './NEU-DET/validation/images/scratches'

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,  
    label_mode=None,   # yields only image tensor (batch_size, image_size[0], image_size[1], num_channels), not tuple (image, label)
    batch_size=BATCH_SIZE,
    image_size=(IMG_SIZE, IMG_SIZE),
    # shuffle=False
    )

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
    val_dir,  
    label_mode=None,   
    batch_size=BATCH_SIZE,
    image_size=(IMG_SIZE, IMG_SIZE),
    # shuffle=False
    )

# Combine both dataset to increase no. of samples
dataset = train_ds.concatenate(val_ds)

dataset = dataset.map(lambda x: (x / 127.5) - 1) # Normalize to ([-1, 1]), as common in GAN
len(dataset)

Found 240 files.
Found 60 files.


10

In [12]:
augment_layers = tf.keras.Sequential([
        # RandomTranslation(0.3, 0.3),
        RandomRotation(factor = (-0.05, 0.05)),  # random from (-18, 18) degrees : 0.05*360 = 18
        # RandomFlip(mode='horizontal_and_vertical'),
      ])
aug_dataset = dataset.map(lambda image: augment_layers(image, training=True))

aug_dataset1 = dataset.map(lambda image: tf.image.flip_left_right(image))
aug_dataset2 = dataset.map(lambda image: tf.image.flip_up_down(image))
aug_dataset3 = dataset.map(lambda image: tf.image.rot90(image))

# Concatenate all augmented datasets
dataset = dataset.concatenate(aug_dataset).concatenate(aug_dataset1).concatenate(aug_dataset2).concatenate(aug_dataset3)
len(dataset)

50

In [14]:
# X = iter(dataset).next()
# X.shape, np.max(X), np.min(X), X.dtype

In [None]:
for images in dataset.skip(30):
  plt.figure(figsize=(14, 6))
  for i in range(10):
    ax = plt.subplot(2, 5, i + 1)
    plt.imshow((images[i].numpy() + 1) / 2.0)  # rescale [-1, 1] --> [0, 1] for imshow()
    plt.axis("off")
  break

In [16]:
def build_generator(input_size):
    """
    Constructs the generator network for the GAN.
    
    Parameters:
        input_size: The dimension of the input random vector for the generator.
    
    Returns:
        generator_model: A Keras model representing the generator architecture.
    """
    initializer = RandomNormal(mean=0.0, stddev=0.02)  # Set the kernel initializer
    
    generator_model = Sequential([ 
    
    # Block 1: Initial dense layer expanding the seed vector into a higher-dimensional space
    Dense(8 * 8 * 512, kernel_initializer=initializer, input_dim=input_size),  # Create initial feature set
    BatchNormalization(),  # for training stability
    ReLU(),  
    Reshape((8, 8, 512)),

    # Block 2: Upscale to (8, 8, 512)
    Conv2DTranspose(256, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer), 
    BatchNormalization(), 
    ReLU(), 

    # Block 3: Upscale to (16, 16, 256)
    Conv2DTranspose(128, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
    BatchNormalization(),  
    ReLU(),  

    # Block 4: Upscale to (32, 32, 128)
    Conv2DTranspose(64, kernel_size=3, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
    BatchNormalization(), 
    ReLU(), 

    Conv2DTranspose(32, kernel_size=3, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
    BatchNormalization(), 
    ReLU(), 

    # Block 5: Upscale to (64, 64, 3)
    Conv2DTranspose(3, kernel_size=3, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
    Activation('tanh'),  # to scale output pixel values between [-1, 1]
    ]) 
    
    return generator_model   


generator_model = build_generator(GEN_INPUT_SIZE)  
generator_model.summary()

In [17]:
def build_discriminator(img_size, num_channels):
    """
    Constructs the discriminator model for a GAN.

    Parameters:
        img_size: Length of one side of the square input image.
        num_channels: The number of channels in the input image.

    Returns:
        model: A Keras model instance representing the discriminator.
    """
    initializer = RandomNormal(mean=0.0, stddev=0.02)  # Initialize the weights of the layers

    model = Sequential([  

            Conv2D(64, kernel_size=3, strides=2, padding='same', use_bias=False, input_shape=(img_size, img_size, num_channels), 
                                                                                 kernel_initializer=initializer),
            LeakyReLU(alpha=0.2), 
        
            Conv2D(128, kernel_size=3, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
            BatchNormalization(),  # Batch normalization for stabilizing training
            LeakyReLU(alpha=0.2),   
        
            Conv2D(128, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
            BatchNormalization(),   
            LeakyReLU(alpha=0.2),  
        
            Conv2D(256, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
            BatchNormalization(),  
            LeakyReLU(alpha=0.2),  
        
            Conv2D(256, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
            BatchNormalization(), 
            LeakyReLU(alpha=0.2),  

            Conv2D(256, kernel_size=5, strides=2, padding='same', use_bias=False, kernel_initializer=initializer),
            BatchNormalization(), 
            LeakyReLU(alpha=0.2),  
        
            Conv2D(1, kernel_size=4, strides=1, padding='valid', use_bias=False, kernel_initializer=initializer),   
            Flatten(),  
            Activation('sigmoid'),  # Sigmoid -> to produce a probability value
            ])

    return model  


discriminator = build_discriminator(IMG_SIZE, N_CHANNELS)
discriminator.summary()

<h1 style="font-family:comic sans ms; text-align:center; color:violet"> DCGAN Model Class </h1>

In [18]:
class DCGANModel(tf.keras.Model):
    """
    A custom subclass of keras. 
    Model that implements a unique training routine and calculates loss metrics.
    """
    
    def __init__(self, random_vector_size, img_dimension, img_channels, **kwargs):
        """
        Args:
            random_vector_size: Dimension of the noise vector used by the generator.
            img_dimension: Length of each side of the square input image.
            img_channels: The number of color channels in the image.
        """
        super(DCGANModel, self).__init__(**kwargs)

        self.generator = build_generator(random_vector_size)
        self.discriminator = build_discriminator(img_dimension, img_channels)
        self.random_vector_size = random_vector_size

    
    def compute_generator_loss(self, discriminator_output):
        """
        Args:
            discriminator_output: Predictions from the discriminator for generated images.
        
        Returns:
            The computed loss value based on the discriminator's evaluation.
        """
        return cross_entropy_loss(tf.ones_like(discriminator_output), discriminator_output)

    
    def compute_discriminator_loss(self, real_predictions, fake_predictions, label_smoothing=0.1):
        """
        Args:
            real_predictions: Discriminator's output for real images.
            fake_predictions: Discriminator's output for images generated by the generator.
        
        Returns:
            The total loss computed for the discriminator.
        """
        real_loss_value = cross_entropy_loss(tf.ones_like(real_predictions) * (1 - label_smoothing), real_predictions)
        fake_loss_value = cross_entropy_loss(tf.zeros_like(fake_predictions), fake_predictions)
        return real_loss_value + fake_loss_value

    
    def compile_model(self, generator_optimizer, discriminator_optimizer):
        """
        Prepares the model for training by defining the optimizers.
        
        Args:
            generator_optimizer: Optimizer configured for the generator.
            discriminator_optimizer: Optimizer configured for the discriminator.
        """
        super(DCGANModel, self).compile()
        self.gen_optimizer = generator_optimizer   # to be used in next method
        self.disc_optimizer = discriminator_optimizer

    """
    The @tf.function decorator is used to optimize Python functions 
    by converting them into TensorFlow computation graphs. 
    """
    @tf.function    
    def train_step(self, input_data):
        """
        Executes one training step using a given batch of data.
        
        Args:
            input_data: A batch sampled from the training dataset.
        
        Returns:
            A dictionary containing the losses for both generator and discriminator.
        """
        batch_size = tf.shape(input_data)[0]
        
        random_seed = tf.random.normal(shape=(batch_size, self.random_vector_size))
        
        with tf.GradientTape() as generator_tape, tf.GradientTape() as discriminator_tape:
            generated_image = self.generator(random_seed, training=True)
        
            real_predictions = self.discriminator(input_data, training=True)
            fake_predictions = self.discriminator(generated_image, training=True) 
            
            generator_loss_value = self.compute_generator_loss(fake_predictions)   # generator takes only fake predictions
            discriminator_loss_value = self.compute_discriminator_loss(real_predictions, fake_predictions)

            generator_gradients = generator_tape.gradient(generator_loss_value, self.generator.trainable_variables)
            discriminator_gradients = discriminator_tape.gradient(discriminator_loss_value, self.discriminator.trainable_variables)

            self.gen_optimizer.apply_gradients(zip(generator_gradients, self.generator.trainable_variables))
            self.disc_optimizer.apply_gradients(zip(discriminator_gradients, self.discriminator.trainable_variables))
        
        return {
            "gen_loss": generator_loss_value,
            "disc_loss": discriminator_loss_value
        }

In [19]:
cross_entropy_loss = BinaryCrossentropy()
generator_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)
discriminator_optimizer = Adam(learning_rate=0.0002, beta_1=0.5)

# Instantiate and compile the model
dcgan = DCGANModel(GEN_INPUT_SIZE, IMG_SIZE, N_CHANNELS)
dcgan.compile_model(generator_optimizer, discriminator_optimizer)

<h1 style="font-family:comic sans ms; text-align:center; color:pink"> CALLBACKS </h3>

In [24]:
class SaveImages(tf.keras.callbacks.Callback):
    def __init__(self, generator, noise, num_rows, num_cols, margin):
        super(SaveImages, self).__init__()  # call the parent constructor
        self.generator = generator
        self.noise = noise
        self.num_rows = num_rows
        self.num_cols = num_cols
        self.margin = margin

    def on_epoch_end(self, epoch, logs=None):
        # Generate images at the end of each epoch
        generated_images = self.generator(self.noise, training=False)
        generated_images = (generated_images + 1) / 2.0  # Rescaled from the [-1, 1] (common in GANs) to [0, 1] for visualization.
        
        # Plot images in a grid
        fig, ax = plt.subplots(self.num_rows, self.num_cols, figsize=(self.num_cols * 2, self.num_rows * 2))
        fig.patch.set_facecolor('#fefefe')  # Set figure background color

        os.makedirs("./Images", exist_ok=True)
        for i in range(self.num_rows):
            for j in range(self.num_cols):
                ax[i, j].imshow(generated_images[i * self.num_cols + j])
                ax[i, j].axis('off')  # Hide axes
                
                # Add a border around each image
                for spine in ax[i, j].spines.values():
                    spine.set_visible(True)
                    spine.set_linewidth(2)
                    spine.set_color('#b56576' if (i + j) % 2 == 0 else '#e56b6f')  # Alternate border colors
        plt.subplots_adjust(wspace=self.margin, hspace=self.margin)
        plt.savefig(f"./Images/generated_image_epoch_{epoch + 1}.png", bbox_inches='tight', dpi=300)
        plt.close()

In [25]:
# Checkpoint callback to save model weights
filepath = "./Model/model.weights.h5"

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath = filepath,  # Use the correct file extension for saving weights
    monitor = 'gen_loss',
    save_weights_only = True,  # Save only the weights
    # save_best_only = True,  # Save only the best model according to the monitored metric
    # verbose = 1
    )

# Define constants for grid layout
NUM_ROWS = 3
NUM_COLS = 3
MARGIN = 0.3  # spacing between images in the grid

# Fixed seed for consistent image generation across epochs
fixed_seed = tf.random.normal(shape=(NUM_ROWS * NUM_COLS, GEN_INPUT_SIZE))

# Use SaveImages callback
save_images_callback = SaveImages(dcgan.generator, fixed_seed, NUM_ROWS, NUM_COLS, MARGIN)

<h1 style="font-family:comic sans ms; text-align:center; color:RED"> Model Training </h3>

In [27]:
# Training the DCGAN model

EPOCHS = 2  # train upto 100 epochs
with tf.device('/GPU:0'):
    history = dcgan.fit(dataset, epochs=EPOCHS, batch_size=BATCH_SIZE,
                        callbacks=[
                            save_images_callback,  
                            checkpoint_callback
                            ])

Epoch 1/2
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - disc_loss: 0.5771 - gen_loss: 3.8713

ValueError: You are saving a model that has not yet been built. Try building the model first by calling it on some data or by using `build()`.

In [None]:
def plot_losses(history):   
    generator_losses = history.history['gen_loss']  
    discriminator_losses = history.history['disc_loss']  
    plt.figure(figsize=(10, 5))
    plt.plot(generator_losses, label='Generator Loss')
    plt.plot(discriminator_losses, label='Discriminator Loss')
    # Adding aesthetics
    plt.title('Generator and Discriminator Losses')
    plt.xlabel('Epoch', fontsize=14)
    plt.ylabel('Loss', fontsize=14)
    plt.legend(fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()  # Adjust layout to prevent clipping
    plt.show()

plot_losses(history)

<h2 style="font-family:comic sans ms; text-align:center; color:purple"> Model Evaluation </h2>

In [None]:
# # Load model weights
# weights_dir = "./Model/" 
# try:
#     dcgan.load_weights(os.path.join(weights_dir, "model.weights.h5"))
#     # dcgan.load_weights(os.path.join(weights_dir, "model.weights.h5"))
# except FileNotFoundError as e:
#     print(e)
    
# def generate_images(generator):
#     # Generate 16 images 
#     n_images = 16
#     noise = tf.random.normal([n_images, GEN_INPUT_SIZE]) 
#     generated_images = generator(noise)
#     plt.figure(figsize=(12, 12))
#     for i in range(generated_images.shape[0]):
#         plt.subplot(4, 4, i+1)
#         plt.imshow((generated_images[i, ...] +1) / 2.0)  # Rescale from [-1, 1] to [0, 1] for plt.imshow()
#         plt.axis("off")
#     plt.suptitle('Generated Surface Defects Images', fontsize=14)
#     plt.show()
    
# generate_images(dcgan.generator)