<a href="https://colab.research.google.com/github/aryanarora07/ML-AI/blob/main/GAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
import numpy as np
import os
from glob import glob
import time
from PIL import Image
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from google.colab import drive
import zipfile
from tqdm.notebook import tqdm

# Check for GPU availability
print("TensorFlow version:", tf.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))



def load_and_preprocess_data(data_dir, img_shape=(128, 128, 3), batch_size=64):
    """
    Load and preprocess images from a directory
    """
    print("Loading and preprocessing dataset...")

    # Create a dataset from the directory
    try:
        dataset = tf.keras.preprocessing.image_dataset_from_directory(
            data_dir,
            label_mode=None,  # We don't need labels for GAN
            image_size=(img_shape[0], img_shape[1]),
            batch_size=batch_size,
            shuffle=True
        )
    except Exception as e:
        # If the above fails (e.g., no subdirectories), try with direct image loading
        print(f"Directory loading failed: {e}")
        print("Attempting to load from root directory...")

        # Create a directory structure expected by the loader
        new_dir = data_dir + "_structured"
        os.makedirs(new_dir + "/images", exist_ok=True)

        # Move images to new structure
        for ext in ['*.jpg', '*.jpeg', '*.png']:
            for file in glob(os.path.join(data_dir, ext)):
                os.system(f"cp '{file}' '{new_dir}/images/'")

        dataset = tf.keras.preprocessing.image_dataset_from_directory(
            new_dir,
            label_mode=None,
            image_size=(img_shape[0], img_shape[1]),
            batch_size=batch_size,
            shuffle=True
        )

    # Normalize images to [0, 1] range
    normalization_layer = layers.Rescaling(1./255)
    dataset = dataset.map(lambda x: normalization_layer(x))

    # Use prefetch to optimize performance
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

    return dataset

def manual_load_data(data_dir, img_shape=(128, 128), batch_size=64):
    """
    Manually load and preprocess images from a directory
    """
    print("Manually loading dataset...")
    image_paths = glob(os.path.join(data_dir, "**/*.jpg"), recursive=True) + \
                  glob(os.path.join(data_dir, "**/*.jpeg"), recursive=True) + \
                  glob(os.path.join(data_dir, "**/*.png"), recursive=True)

    if not image_paths:
        raise ValueError(f"No images found in {data_dir}")

    print(f"Found {len(image_paths)} images")

    # Create a smaller dataset if there are too many images (for testing)
    if len(image_paths) > 1000:
        print("Using a subset of 1000 images for faster processing")
        np.random.shuffle(image_paths)
        image_paths = image_paths[:1000]

    # Function to load and preprocess a single image
    def preprocess_image(img_path):
        try:
            img = load_img(img_path, target_size=img_shape)
            img_array = img_to_array(img)
            # Normalize to [0, 1]
            img_array = img_array / 255.0
            return img_array
        except Exception as e:
            print(f"Error processing {img_path}: {e}")
            # Return a blank image in case of error
            return np.zeros(img_shape + (3,))

    # Load and preprocess all images with a progress bar
    print("Loading images...")
    images = []
    for path in tqdm(image_paths):
        images.append(preprocess_image(path))

    images = np.array(images)

    # Create dataset
    dataset = tf.data.Dataset.from_tensor_slices(images)
    dataset = dataset.shuffle(buffer_size=len(images))
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

    return dataset

# @title Show sample images
def show_sample_images(dataset, n_samples=25):
    """
    Display sample images from the dataset
    """
    plt.figure(figsize=(10, 10))

    # Get a batch from the dataset
    for images in dataset.take(1):
        for i in range(min(n_samples, len(images))):
            plt.subplot(5, 5, i + 1)
            plt.imshow(images[i])
            plt.axis('off')

    plt.tight_layout()
    plt.show()

# ====== GAN Architecture ======
# @title Generator and Discriminator models

def build_generator(latent_dim, output_shape):
    """
    Build the generator model
    """
    model = models.Sequential(name="Generator")

    # Starting with a dense layer that takes the latent vector
    n_nodes = 8 * 8 * 256  # Foundation for 8x8 feature maps
    model.add(layers.Dense(n_nodes, input_dim=latent_dim))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Reshape((8, 8, 256)))

    # Upsampling blocks to reach target size
    # First upsampling: 8x8 -> 16x16
    model.add(layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.BatchNormalization())

    # Second upsampling: 16x16 -> 32x32
    model.add(layers.Conv2DTranspose(64, (4, 4), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.BatchNormalization())

    # Third upsampling: 32x32 -> 64x64
    model.add(layers.Conv2DTranspose(32, (4, 4), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.BatchNormalization())

    # Fourth upsampling: 64x64 -> 128x128 (if needed based on target size)
    if output_shape[0] >= 128:
        model.add(layers.Conv2DTranspose(16, (4, 4), strides=(2, 2), padding='same'))
        model.add(layers.LeakyReLU(alpha=0.2))
        model.add(layers.BatchNormalization())

    # Output layer with tanh activation to get values in [-1, 1]
    model.add(layers.Conv2D(output_shape[2], (3, 3), padding='same', activation='tanh'))

    return model

def build_discriminator(input_shape):
    """
    Build the discriminator model
    """
    model = models.Sequential(name="Discriminator")

    # First convolutional block
    model.add(layers.Conv2D(32, (3, 3), strides=(2, 2), padding='same', input_shape=input_shape))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))

    # Second convolutional block
    model.add(layers.Conv2D(64, (3, 3), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())

    # Third convolutional block
    model.add(layers.Conv2D(128, (3, 3), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())

    # Fourth convolutional block
    model.add(layers.Conv2D(256, (3, 3), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU(alpha=0.2))
    model.add(layers.Dropout(0.3))
    model.add(layers.BatchNormalization())

    # Flatten and output
    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation='sigmoid'))

    return model

# Define GAN
def build_gan(generator, discriminator):
    """
    Combine generator and discriminator into a GAN
    """
    # For training the combined model, we don't want to update the discriminator weights
    discriminator.trainable = False

    model = models.Sequential(name="GAN")
    model.add(generator)
    model.add(discriminator)

    return model

# ====== Training Functions ======
# @title GAN Training Functions

def save_generated_images(generator, epoch, latent_dim, directory, n_samples=16):
    """
    Generate and save images
    """
    # Generate images
    noise = tf.random.normal([n_samples, latent_dim])
    generated_images = generator(noise, training=False)

    # Scale from [-1,1] to [0,1]
    generated_images = (generated_images + 1) / 2.0

    # Plot images
    fig, axes = plt.subplots(4, 4, figsize=(10, 10))

    for i, ax in enumerate(axes.flatten()):
        if i < n_samples:
            # Plot image
            ax.imshow(generated_images[i])
            ax.axis('off')

    # Save plot (make sure directory exists)
    os.makedirs(directory, exist_ok=True)
    plt.tight_layout()
    plt.savefig(f"{directory}/epoch_{epoch}.png")
    plt.close()

    # Also display in the notebook
    if epoch % 10 == 0 or epoch == "final":
        # Display the same images in the notebook
        fig, axes = plt.subplots(4, 4, figsize=(10, 10))

        for i, ax in enumerate(axes.flatten()):
            if i < n_samples:
                ax.imshow(generated_images[i])
                ax.axis('off')

        plt.tight_layout()
        plt.show()

def train_gan(generator, discriminator, gan, dataset, latent_dim, n_epochs=100, n_batch=64, save_interval=10):
    """
    Train the GAN
    """
    # Prepare directories for saving samples
    sample_dir = "/content/generated_samples"
    os.makedirs(sample_dir, exist_ok=True)

    # Create checkpoint directory
    checkpoint_dir = "/content/checkpoints"
    os.makedirs(checkpoint_dir, exist_ok=True)

    # Calculate steps per epoch (might be inaccurate for unknown dataset sizes)
    try:
        steps_per_epoch = len(dataset)
    except:
        # If dataset doesn't have len, make an estimate
        steps_per_epoch = 100  # Arbitrary default

    # Progress visualization
    loss_history = {
        'disc_loss': [],
        'gen_loss': []
    }

    # Training loop
    for epoch in range(n_epochs):
        start_time = time.time()
        disc_loss_total = 0
        gen_loss_total = 0

        # Create progress bar
        print(f"Epoch {epoch+1}/{n_epochs}")
        progress_bar = tqdm(total=steps_per_epoch, desc=f"Training")

        # Train on batches
        batch_count = 0
        for real_images in dataset:
            # Get batch size (might be different for last batch)
            batch_size = tf.shape(real_images)[0]
            batch_count += 1

            # Skip small batches
            if batch_size < 8:
                continue

            # Generate random noise for G
            noise = tf.random.normal([batch_size, latent_dim])

            # Generate fake images
            fake_images = generator(noise, training=True)

            # Create labels for real and fake images
            real_labels = tf.ones((batch_size, 1)) * 0.9  # Using label smoothing (0.9 instead of 1)
            fake_labels = tf.zeros((batch_size, 1))

            # Add random noise to labels for robustness
            real_labels += 0.05 * tf.random.uniform((batch_size, 1))
            fake_labels += 0.05 * tf.random.uniform((batch_size, 1))

            # Train discriminator
            with tf.GradientTape() as disc_tape:
                # Predictions
                real_predictions = discriminator(real_images, training=True)
                fake_predictions = discriminator(fake_images, training=True)

                # Calculate losses
                real_loss = tf.keras.losses.binary_crossentropy(real_labels, real_predictions)
                fake_loss = tf.keras.losses.binary_crossentropy(fake_labels, fake_predictions)
                disc_loss = real_loss + fake_loss

            # Apply gradients
            disc_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
            discriminator.optimizer.apply_gradients(zip(disc_gradients, discriminator.trainable_variables))

            # Update the generator
            noise = tf.random.normal([batch_size, latent_dim])

            # Labels for generated images (telling G we want D to think these are real)
            misleading_labels = tf.ones((batch_size, 1))

            with tf.GradientTape() as gen_tape:
                # Generate images
                fake_images = generator(noise, training=True)
                # Get predictions
                predictions = discriminator(fake_images, training=True)
                # Calculate loss
                gen_loss = tf.keras.losses.binary_crossentropy(misleading_labels, predictions)

            # Apply gradients
            gen_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
            generator.optimizer.apply_gradients(zip(gen_gradients, generator.trainable_variables))

            # Track progress
            current_disc_loss = tf.reduce_mean(disc_loss)
            current_gen_loss = tf.reduce_mean(gen_loss)
            disc_loss_total += current_disc_loss
            gen_loss_total += current_gen_loss

            # Update progress bar
            progress_bar.update(1)
            progress_bar.set_postfix({
                'D_loss': f"{current_disc_loss:.4f}",
                'G_loss': f"{current_gen_loss:.4f}"
            })

            # Stop if we've reached the estimated steps (avoid infinite loops)
            if batch_count >= steps_per_epoch:
                break

        # Close progress bar
        progress_bar.close()

        # Calculate average loss per epoch
        avg_disc_loss = disc_loss_total / batch_count
        avg_gen_loss = gen_loss_total / batch_count

        # Store losses for plotting
        loss_history['disc_loss'].append(avg_disc_loss)
        loss_history['gen_loss'].append(avg_gen_loss)

        # Print epoch summary
        print(f"Epoch {epoch+1}/{n_epochs} - {time.time()-start_time:.2f}s - D Loss: {avg_disc_loss:.4f}, G Loss: {avg_gen_loss:.4f}")

        # Save samples
        if (epoch + 1) % save_interval == 0 or epoch == 0:
            save_generated_images(generator, epoch+1, latent_dim, sample_dir)

        # Save model periodically
        if (epoch + 1) % 20 == 0:
            generator.save(f"{checkpoint_dir}/generator_epoch_{epoch+1}.h5")
            discriminator.save(f"{checkpoint_dir}/discriminator_epoch_{epoch+1}.h5")

            # Save to Google Drive for persistence
            generator.save(f"/content/drive/MyDrive/GAN_Models/generator_epoch_{epoch+1}.h5")

    # Plot loss history
    plt.figure(figsize=(10, 5))
    plt.plot(loss_history['disc_loss'], label='Discriminator')
    plt.plot(loss_history['gen_loss'], label='Generator')
    plt.title('GAN Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f"{sample_dir}/loss_history.png")
    plt.show()

    return loss_history

# @title Generate Art Portfolio

def generate_art_portfolio(generator_path, output_dir, n_images=50, latent_dim=100, img_size=(128, 128)):
    """
    Generate a portfolio of abstract art from a trained generator
    """
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Load the generator
    generator = tf.keras.models.load_model(generator_path)

    # Generate images with progress bar
    print(f"Generating {n_images} images...")
    for i in tqdm(range(n_images)):
        # Generate random noise
        noise = tf.random.normal([1, latent_dim])

        # Generate image
        generated_image = generator(noise, training=False)

        # Scale from [-1,1] to [0,1]
        generated_image = (generated_image + 1) / 2.0

        # Convert to numpy array
        img_array = generated_image[0].numpy()

        # Convert to PIL image and save
        img = Image.fromarray((img_array * 255).astype(np.uint8))
        img.save(f"{output_dir}/abstract_art_{i+1}.png")

    # Display a sample of generated images
    plt.figure(figsize=(15, 15))
    sample_indices = np.random.choice(n_images, min(25, n_images), replace=False) + 1

    for i, idx in enumerate(sample_indices):
        if i < 25:  # Show up to 25 images
            img = Image.open(f"{output_dir}/abstract_art_{idx}.png")
            plt.subplot(5, 5, i + 1)
            plt.imshow(np.array(img))
            plt.axis('off')

    plt.tight_layout()
    plt.show()

    print(f"Generated {n_images} images in {output_dir}")
    print(f"Files are also saved to your Google Drive at: {output_dir}")