<a href="https://colab.research.google.com/github/bxck75/A1_Colabs/blob/master/Gans/imageGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!unzip /content/drive/MyDrive/garbagekids.zip

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Reshape, Conv2DTranspose, LeakyReLU, BatchNormalization, Conv2D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, Callback, LearningRateScheduler
import matplotlib.pyplot as plt
import psutil

# Enable mixed precision training
# tf.keras.mixed_precision.set_global_policy('mixed_float16')

# Define the GAN architecture
def define_gan(input_shape):
    generator = Sequential()
    generator.add(Dense(256 * 16 * 16, input_dim=100))
    generator.add(LeakyReLU(alpha=0.2))
    generator.add(Reshape((16, 16, 256)))
    generator.add(Conv2DTranspose(128, (4, 4), strides=(2, 2), padding='same'))
    generator.add(LeakyReLU(alpha=0.2))
    generator.add(Conv2DTranspose(64, (4, 4), strides=(2, 2), padding='same'))
    generator.add(LeakyReLU(alpha=0.2))
    generator.add(Conv2DTranspose(3, (4, 4), strides=(2, 2), padding='same', activation='tanh'))

    discriminator = Sequential()
    discriminator.add(Conv2D(64, (4, 4), strides=(2, 2), padding='same', input_shape=input_shape))
    discriminator.add(LeakyReLU(alpha=0.2))
    discriminator.add(Conv2D(128, (4, 4), strides=(2, 2), padding='same'))
    discriminator.add(LeakyReLU(alpha=0.2))
    discriminator.add(Flatten())
    discriminator.add(Dense(1, activation='sigmoid'))

    gan = Sequential()
    gan.add(generator)
    gan.add(discriminator)

    return generator, discriminator, gan

# Preprocess the images
def preprocess_image(image_path, target_shape):
    image = load_img(image_path, target_size=target_shape)
    image = img_to_array(image)
    image = (image - 127.5) / 127.5  # Normalize the image to [-1, 1]
    return image

# Monitor memory usage
def monitor_memory_usage():
    process = psutil.Process(os.getpid())
    memory_usage = process.memory_info().rss / 1024 / 1024  # in MB
    print(f"Memory Usage: {memory_usage:.2f} MB")

# Load and preprocess the images from a folder recursively
def load_images_from_folder(folder_path, target_shape):
    images = []
    for root, _, files in os.walk(folder_path):
        for filename in files:
            if filename.endswith(('.jpg', '.jpeg', '.png')):
                image_path = os.path.join(root, filename)
                image = preprocess_image(image_path, target_shape)
                images.append(image)
    return np.array(images)

# Generate a batch of random noise
def generate_noise(batch_size, latent_dim):
    return np.random.normal(0, 1, (batch_size, latent_dim))

# Learning rate scheduler
def lr_scheduler(epoch, lr):
    if epoch < 50:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

# Train the GAN
def train_gan(images, latent_dim, epochs, batch_size):
    generator, discriminator, gan = define_gan(images[0].shape)

    discriminator.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])
    discriminator.trainable = False
    gan.compile(loss='binary_crossentropy', optimizer=Adam())

    batches_per_epoch = int(images.shape[0] / batch_size)
    half_batch = int(batch_size / 2)

    checkpoint_callback = ModelCheckpoint('gan_weights_epoch_{epoch}.h5', save_weights_only=True)

    latest_epoch = get_latest_epoch()
    if latest_epoch > 0:
        generator.load_weights(f'generator_weights_epoch_{latest_epoch}.h5')
        discriminator.load_weights(f'discriminator_weights_epoch_{latest_epoch}.h5')
        print(f"Loaded pre-existing weights for the generator and discriminator from epoch {latest_epoch}.")

    # Loss history for plotting
    generator_losses = []
    discriminator_losses = []
    # Save a visual graph of the training values
    def save_training_graph(generator_losses, discriminator_losses):
        plt.figure(figsize=(12, 6))
        plt.plot(generator_losses, label='Generator Loss')
        plt.plot(discriminator_losses, label='Discriminator Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('GAN Training Losses')
        plt.legend()
        plt.savefig('training_graph.png')

    class LossPlotCallback(Callback):
        def on_epoch_end(self, epoch, logs=None):
            generator_losses.append(logs['loss'])
            discriminator_losses.append(logs['discriminator_loss'])
            plot_losses(generator_losses, discriminator_losses)
            save_training_graph(generator_losses, discriminator_losses)
    # Define learning rate scheduler callback
    lr_callback = LearningRateScheduler(lr_scheduler)

    for epoch in range(latest_epoch, epochs):
        for batch in range(batches_per_epoch):

            real_images = images[batch * half_batch: (batch + 1) * half_batch]
            noise = generate_noise(half_batch, latent_dim)
            fake_images = generator.predict(noise)

            # Train the discriminator
            discriminator_loss_real = discriminator.train_on_batch(real_images, np.ones(half_batch))
            discriminator_loss_fake = discriminator.train_on_batch(fake_images, np.zeros(half_batch))
            discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

            # Train the generator
            noise = generate_noise(batch_size, latent_dim)
            generator_loss = gan.train_on_batch(noise, np.ones(batch_size))

            # Monitor memory usage
            monitor_memory_usage()

            print(f'Epoch {epoch + 1}/{epochs} | Batch {batch + 1}/{batches_per_epoch} | '
                  f'Discriminator Loss: {discriminator_loss[0]} | Generator Loss: {generator_loss}')

        # Save the weights after each epoch
        generator.save_weights(f'generator_weights_epoch_{epoch + 1}.h5')
        discriminator.save_weights(f'discriminator_weights_epoch_{epoch + 1}.h5')

        # Generate a fake image
        fake_image = generate_fake_image(generator, latent_dim)
        fake_image_path = f'fake_image_epoch_{epoch + 1}.png'
        save_image(fake_image, fake_image_path)
        print(f'Saved fake image: {fake_image_path}')

    plot_losses(generator_losses, discriminator_losses)

# Generate a fake image using the trained generator
def generate_fake_image(generator, latent_dim):
    noise = generate_noise(1, latent_dim)
    fake_image = generator.predict(noise)
    fake_image = (fake_image[0] * 0.5 + 0.5) * 255  # Denormalize and extract the first image
    fake_image = fake_image.astype(np.uint8)
    return fake_image

# Save the generated image
def save_image(image, path):
    image = tf.keras.preprocessing.image.array_to_img(image)
    image.save(path)

# Get the latest epoch from existing weights files
def get_latest_epoch():
    generator_weights_files = [f for f in os.listdir() if f.startswith('generator_weights_epoch_')]
    discriminator_weights_files = [f for f in os.listdir() if f.startswith('discriminator_weights_epoch_')]
    weights_files = set(generator_weights_files + discriminator_weights_files)

    if not weights_files:
        return 0

    latest_epoch = max(int(file.split('_')[-1].split('.')[0]) for file in weights_files)
    return latest_epoch


# Plot the losses
def plot_losses(generator_losses, discriminator_losses):
    plt.figure(figsize=(12, 6))
    plt.plot(generator_losses, label='Generator Loss')
    plt.plot(discriminator_losses, label='Discriminator Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('GAN Training Losses')
    plt.legend()
    plt.show()

# Set the parameters
image_folder_path = 'garbage'  # Replace with the path to your image folder
target_image_shape = (128, 128, 3)  # Adjust the shape based on your image dimensions
latent_dimension = 100
training_epochs = 150
batch_size = 128

# Load and preprocess the images
images = load_images_from_folder(image_folder_path, target_image_shape)
num_images = len(images)
print(f'Total images found: {num_images}')

# Train the GAN
train_gan(images, latent_dimension, training_epochs, batch_size)


In [None]:
import cv2,os ,glob
from PIL import Image

def create_video(images, output_path, fps=24):
    # Determine the size of the first image
    with Image.open(images[0]) as img:
        width, height = img.size

    # Define the video codec
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')

    # Create a video writer object
    video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Iterate over the list of images and write them to the video
    for image_path in images:
        img = cv2.imread(image_path)
        video_writer.write(img)

    # Release the video writer
    video_writer.release()
image_list = glob.glob('/content/fake_image_epoch_*.png')
print(len(image_list))
output_video_path = 'output.mp4'

create_video(image_list, output_video_path)

print("Video created successfully!")