In [1]:
import tensorflow as tf
import numpy as np
import os
import glob
from tensorflow.keras import layers , models
import matplotlib.pyplot as plt
from PIL import Image
import time
from tensorflow.keras.preprocessing.image import ImageDataGenerator


In [2]:
# Set random seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

In [3]:
# Configuration
BUFFER_SIZE = 1000
BATCH_SIZE = 32
IMAGE_SIZE = 128
CHANNELS = 1
EPOCHS = 150
NOISE_DIM = 100
NUM_EXAMPLES_TO_GENERATE = 1000
BASE_DIR = "/kaggle/input/chest-xray-pneumonia/chest_xray"
EPOCHS_CONV = 20

In [4]:
class ChestXRayDCGAN:
    def __init__(self, class_name):
        self.class_name = class_name
        self.generator = self.build_generator()
        self.discriminator = self.build_discriminator()
        
        # Optimizers
        self.generator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5, beta_2=0.9)
        self.discriminator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5, beta_2=0.9)
        
        # WGAN-GP hyperparameters
        self.gp_weight = 10.0
        self.n_critic = 5
        self.critic_counter = 0  # Add a counter to track critic iterations
        
        # Create directories for saving generated images
        self.save_dir = os.path.join("generated_images",self.class_name)
        if not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)

    def load_and_preprocess_data(self, data_dirs):
        """Load and preprocess images from the given directory"""
        
        image_paths = []
        for data_dir in data_dirs:
            image_paths.extend(glob.glob(os.path.join(data_dir, '*.jpeg')))

        dataset = tf.data.Dataset.from_tensor_slices(image_paths)
        
        def preprocess_image(path):
            image = tf.io.read_file(path)
            image = tf.image.decode_jpeg(image, channels=CHANNELS)
            image = tf.image.resize(image, [IMAGE_SIZE, IMAGE_SIZE])
            image = tf.cast(image, tf.float32)
            image = (image - 127.5) / 127.5
            return image

        dataset = dataset.map(preprocess_image)
        print(f'Images read successfully , total {len(image_paths)}')
        dataset = dataset.shuffle(BUFFER_SIZE)
        dataset = dataset.batch(BATCH_SIZE)
        return dataset

    def build_generator(self):
        """Build the Generator model"""
        model = tf.keras.Sequential([
            layers.Dense(8*8*256, use_bias=False, input_shape=(NOISE_DIM,)),
            layers.BatchNormalization(),
            layers.LeakyReLU(),
            layers.Reshape((8, 8, 256)),

            layers.Conv2DTranspose(128, 5, strides=2, padding='same', use_bias=False),
            layers.BatchNormalization(),
            layers.LeakyReLU(),

            layers.Conv2DTranspose(64, 5, strides=2, padding='same', use_bias=False),
            layers.BatchNormalization(),
            layers.LeakyReLU(),

            layers.Conv2DTranspose(32, 5, strides=2, padding='same', use_bias=False),
            layers.BatchNormalization(),
            layers.LeakyReLU(),

            layers.Conv2DTranspose(CHANNELS, 5, strides=2, padding='same', use_bias=False, activation='tanh')
        ])
        return model

    
    def build_discriminator(self):
        """Build the Discriminator model"""
        model = tf.keras.Sequential([
            layers.Conv2D(64, 5, strides=2, padding='same', input_shape=[IMAGE_SIZE, IMAGE_SIZE, CHANNELS]),
            layers.LeakyReLU(),
            layers.Dropout(0.3),

            layers.Conv2D(128, 5, strides=2, padding='same'),
            layers.LeakyReLU(),
            layers.Dropout(0.3),

            layers.Conv2D(256, 5, strides=2, padding='same'),
            layers.LeakyReLU(),
            layers.Dropout(0.3),

            layers.Conv2D(512, 5, strides=2, padding='same'),
            layers.LeakyReLU(),
            layers.Dropout(0.3),

            layers.Flatten(),
            layers.Dense(1)
        ])
        return model
    
    def gradient_penalty(self, real_images, fake_images):
        """Compute gradient penalty for WGAN-GP with dynamic batch size."""
        # Ensure dynamic batch size handling
        batch_size = tf.shape(real_images)[0]

        # Create alpha with appropriate shape
        alpha = tf.random.uniform(shape=[batch_size, 1, 1, 1], minval=0.0, maxval=1.0)

        # Align fake_images to the batch size of real_images
        fake_images = fake_images[:batch_size]

        # Compute interpolated images
        interpolated = alpha * real_images + (1 - alpha) * fake_images

        # Gradient computation
        with tf.GradientTape() as tape:
            tape.watch(interpolated)
            pred = self.discriminator(interpolated, training=True)

        # Compute gradients and their norms
        grads = tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]) + 1e-8)  # Avoid division by zero
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp



    
    def train_step(self, images):
        """Single training step with WGAN-GP loss and proper generator training"""
        current_batch_size = tf.shape(images)[0]
        noise = tf.random.normal([current_batch_size, NOISE_DIM])

        # Train discriminator
        with tf.GradientTape() as disc_tape:
            generated_images = self.generator(noise, training=True)

            real_output = self.discriminator(images, training=True)
            fake_output = self.discriminator(generated_images, training=True)

            # WGAN loss
            disc_loss = tf.reduce_mean(fake_output) - tf.reduce_mean(real_output)
            
            # Gradient penalty
            gp = self.gradient_penalty(images, generated_images)
            disc_loss += self.gp_weight * gp

        # Apply discriminator gradients
        disc_gradients = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
        self.discriminator_optimizer.apply_gradients(zip(disc_gradients, self.discriminator.trainable_variables))

        # Initialize generator loss
        gen_loss = tf.constant(0.0)

        # Train generator every n_critic steps
        self.critic_counter += 1
        if self.critic_counter >= self.n_critic:
            with tf.GradientTape() as gen_tape:
                generated_images = self.generator(noise, training=True)
                fake_output = self.discriminator(generated_images, training=True)
                gen_loss = -tf.reduce_mean(fake_output)

            gen_gradients = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
            self.generator_optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables))
            
            # Reset counter
            self.critic_counter = 0

        return gen_loss, disc_loss


    def generate_and_save_images(self):
        """Generate and save images"""
        noise = tf.random.normal([NUM_EXAMPLES_TO_GENERATE, NOISE_DIM])
        generated_images = self.generator(noise, training=False)

        for i in range(NUM_EXAMPLES_TO_GENERATE):
            image = generated_images[i].numpy()
            image = ((image + 1) * 127.5).astype(np.uint8)
            img = Image.fromarray(image.reshape(IMAGE_SIZE, IMAGE_SIZE))
            img.save(os.path.join(self.save_dir, f'sample_{i}.png'))

    def train(self, dataset, epochs=EPOCHS):
        """Training loop with improved logging"""
        for epoch in range(epochs):
            start = time.time()
            gen_losses = []
            disc_losses = []
            
            # Reset critic counter at the start of each epoch
            self.critic_counter = 0

            for image_batch in dataset:
                # Skip very small batches
                if tf.shape(image_batch)[0] < 3:
                    continue
                    
                gen_loss, disc_loss = self.train_step(image_batch)
                gen_losses.append(gen_loss)
                disc_losses.append(disc_loss)

            # Calculate mean losses
            avg_gen_loss = tf.reduce_mean([loss for loss in gen_losses if not tf.math.is_nan(loss)])
            avg_disc_loss = tf.reduce_mean([loss for loss in disc_losses if not tf.math.is_nan(loss)])
            
            print(f'Epoch {epoch + 1}')
            print(f'Generator Loss: {avg_gen_loss:.4f}')
            print(f'Discriminator Loss: {avg_disc_loss:.4f}')
            print(f'Time: {time.time() - start:.2f} sec')
            print('-' * 50)



    def generate_samples(self):
        """
        Generate new samples after training and save them to disk.
        Args:
            num_samples (int): Number of samples to generate.
            output_dir (str): Directory to save generated samples.
        """

        # Generate random noise and create images
        noise = tf.random.normal([NUM_EXAMPLES_TO_GENERATE, NOISE_DIM])
        generated_images = self.generator(noise, training=False)

        # Denormalize images from [-1, 1] to [0, 255] and save
        generated_images = (generated_images + 1) * 127.5  # Rescale to [0, 255]
        generated_images = tf.cast(generated_images, tf.uint8)

        for i, image in enumerate(generated_images):
            tf.keras.preprocessing.image.save_img(
                os.path.join(self.save_dir, f'sample_{i + 1}.png'),
                image.numpy()
            )

        print(f'Saved generated samples to "{self.save_dir}".')

In [5]:
# Create DCGAN instances for both classes
normal_dcgan = ChestXRayDCGAN('NORMAL')

# Load and preprocess data for both classes
normal_dataset = normal_dcgan.load_and_preprocess_data(['/kaggle/input/chest-xray-pneumonia/chest_xray/train/NORMAL','/kaggle/input/chest-xray-pneumonia/chest_xray/test/NORMAL', '/kaggle/input/chest-xray-pneumonia/chest_xray/val/NORMAL'])

# Train both models
print("Training NORMAL class DCGAN...")
normal_dcgan.train(normal_dataset)
    
normal_dcgan.generate_samples()


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Images read successfully , total 1583
Training NORMAL class DCGAN...
Epoch 1
Generator Loss: 2.8836
Discriminator Loss: -69.3560
Time: 30.38 sec
--------------------------------------------------
Epoch 2
Generator Loss: 7.6897
Discriminator Loss: -93.7511
Time: 24.35 sec
--------------------------------------------------
Epoch 3
Generator Loss: 4.0227
Discriminator Loss: -61.3834
Time: 24.91 sec
--------------------------------------------------
Epoch 4
Generator Loss: 5.7665
Discriminator Loss: -46.2072
Time: 24.92 sec
--------------------------------------------------
Epoch 5
Generator Loss: 4.2846
Discriminator Loss: -37.6610
Time: 25.29 sec
--------------------------------------------------
Epoch 6
Generator Loss: 3.0985
Discriminator Loss: -29.3714
Time: 25.94 sec
--------------------------------------------------
Epoch 7
Generator Loss: 2.6505
Discriminator Loss: -22.0566
Time: 25.29 sec
--------------------------------------------------
Epoch 8
Generator Loss: 1.8964
Discriminat

In [6]:
!zip -r NORMAL.zip "/kaggle/working/generated_images/NORMAL"

  pid, fd = os.forkpty()


  adding: kaggle/working/generated_images/NORMAL/ (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_756.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_674.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_820.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_440.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_534.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_791.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_651.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_879.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_706.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_649.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_644.png (stored 0%)
  adding: kaggle/working/generated_images/NORMAL/sample_914.png (stored 0%)
  adding: kag

In [7]:
from IPython.display import FileLink
FileLink(r'NORMAL.zip')


In [8]:
# Create DCGAN instances for both classes
pneumonia_dcgan = ChestXRayDCGAN('PNEUMONIA')

# Load and preprocess data for both classes
pneumonia_dataset = pneumonia_dcgan.load_and_preprocess_data(['/kaggle/input/chest-xray-pneumonia/chest_xray/train/PNEUMONIA','/kaggle/input/chest-xray-pneumonia/chest_xray/test/PNEUMONIA', '/kaggle/input/chest-xray-pneumonia/chest_xray/val/PNEUMONIA'])

# Train both models
print("Training PNEUMONIA class DCGAN...")
pneumonia_dcgan.train(pneumonia_dataset)
    
pneumonia_dcgan.generate_samples()


Images read successfully , total 4273
Training PNEUMONIA class DCGAN...
Epoch 1
Generator Loss: 7.2911
Discriminator Loss: -86.6490
Time: 57.16 sec
--------------------------------------------------
Epoch 2
Generator Loss: 1.2014
Discriminator Loss: -40.9689
Time: 54.43 sec
--------------------------------------------------
Epoch 3
Generator Loss: 1.3522
Discriminator Loss: -22.0018
Time: 54.56 sec
--------------------------------------------------
Epoch 4
Generator Loss: 0.3775
Discriminator Loss: -13.4167
Time: 54.54 sec
--------------------------------------------------
Epoch 5
Generator Loss: 0.5191
Discriminator Loss: -10.6459
Time: 54.58 sec
--------------------------------------------------
Epoch 6
Generator Loss: -0.3881
Discriminator Loss: -10.4799
Time: 54.93 sec
--------------------------------------------------
Epoch 7
Generator Loss: -1.5657
Discriminator Loss: -9.4479
Time: 54.58 sec
--------------------------------------------------
Epoch 8
Generator Loss: -1.3666
Discri

In [9]:
!zip -r PNEUMONIA.zip "/kaggle/working/generated_images/PNEUMONIA"

  adding: kaggle/working/generated_images/PNEUMONIA/ (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_756.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_674.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_820.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_440.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_534.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_791.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_651.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_879.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_706.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_649.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sample_644.png (stored 0%)
  adding: kaggle/working/generated_images/PNEUMONIA/sa

In [10]:
from IPython.display import FileLink
FileLink(r'PNEUMONIA.zip')

In [11]:
import shutil

In [12]:
# Define the paths of the two source folders
source_folder1 = "/kaggle/working/generated_images/PNEUMONIA"
source_folder2 = "/kaggle/input/chest-xray-pneumonia/chest_xray/train/PNEUMONIA"


In [13]:
# Define the path for the new destination folder
destination_folder = "/kaggle/working/generated_images_train/PNEUMONIA"

In [14]:
# Create the destination folder if it doesn't exist
os.makedirs(destination_folder, exist_ok=True)

In [15]:
# Function to copy files from source to destination
def copy_files(source, destination):
    for filename in os.listdir(source):
        source_path = os.path.join(source, filename)
        destination_path = os.path.join(destination, filename)
        
        # Copy only files (skip directories if any)
        if os.path.isfile(source_path):
            shutil.copy(source_path, destination_path)

In [16]:
# Copy files from both folders to the destination folder
copy_files(source_folder1, destination_folder)
copy_files(source_folder2, destination_folder)

In [17]:
file_count = len([f for f in os.listdir(destination_folder)])
file_count

4875

In [18]:
# Data Generators with Augmentation
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

In [19]:
val_test_datagen = ImageDataGenerator(rescale=1./255)


In [20]:
# Load and preprocess data
train_generator = train_datagen.flow_from_directory(
    "/kaggle/working/generated_images_train",
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    color_mode='grayscale'
)

Found 4875 images belonging to 1 classes.


In [21]:
validation_generator = val_test_datagen.flow_from_directory(
    os.path.join(BASE_DIR, 'val'),
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    color_mode='grayscale'
)

Found 16 images belonging to 2 classes.


In [22]:
test_generator = val_test_datagen.flow_from_directory(
    os.path.join(BASE_DIR, 'test'),
    target_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    class_mode='binary',
    color_mode='grayscale'
)

Found 624 images belonging to 2 classes.


In [23]:
# Build the CNN model
def create_model():
    model = models.Sequential([
        # First Convolutional Block
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(IMAGE_SIZE, IMAGE_SIZE, 1)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        
        # Second Convolutional Block
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        
        # Third Convolutional Block
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        
        # Fourth Convolutional Block
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        
        # Flatten and Dense Layers
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.5),
        layers.Dense(1, activation='sigmoid')
    ])
    return model

In [24]:
# Create and compile the model
model = create_model()
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

In [25]:
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    min_lr=1e-6
)


In [26]:
# Train the model
history = model.fit(
    train_generator,
    epochs=EPOCHS_CONV,
    validation_data=validation_generator,
    callbacks=[early_stopping, reduce_lr]
)

Epoch 1/20


  self._warn_if_super_not_called()
I0000 00:00:1732452859.639487      70 service.cc:145] XLA service 0x7abdc801d940 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1732452859.639566      70 service.cc:153]   StreamExecutor device (0): Tesla T4, Compute Capability 7.5
I0000 00:00:1732452859.639574      70 service.cc:153]   StreamExecutor device (1): Tesla T4, Compute Capability 7.5


[1m  3/153[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m10s[0m 68ms/step - accuracy: 0.4514 - loss: 1.1673

I0000 00:00:1732452864.709680      70 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m153/153[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 147ms/step - accuracy: 0.5144 - loss: 0.9744 - val_accuracy: 0.5000 - val_loss: 1.2773 - learning_rate: 1.0000e-04
Epoch 2/20
[1m153/153[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 112ms/step - accuracy: 0.6272 - loss: 0.7336 - val_accuracy: 0.5000 - val_loss: 1.2977 - learning_rate: 1.0000e-04
Epoch 3/20
[1m153/153[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 111ms/step - accuracy: 0.7233 - loss: 0.5707 - val_accuracy: 0.5000 - val_loss: 0.9502 - learning_rate: 1.0000e-04
Epoch 4/20
[1m153/153[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 112ms/step - accuracy: 0.8053 - loss: 0.4457 - val_accuracy: 0.5000 - val_loss: 0.8830 - learning_rate: 1.0000e-04
Epoch 5/20
[1m153/153[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 110ms/step - accuracy: 0.8570 - loss: 0.3532 - val_accuracy: 0.3750 - val_loss: 0.9224 - learning_rate: 1.0000e-04
Epoch 6/20
[1m153/153[0m [32m━━━━━━━━━━━━━━