In [1]:
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Input, Concatenate, BatchNormalization, LeakyReLU
from tensorflow.keras import Model
import matplotlib.pyplot as plt
import os
import logging
from typing import Tuple, List
import numpy as np

In [2]:
# Enable GPU memory growth to prevent memory allocation issues
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [4]:
class ImageProcessor:
    """Handles image loading and preprocessing operations."""

    @staticmethod
    def load_and_split_image(image_file: str) -> Tuple[tf.Tensor, tf.Tensor]:
        """
        Load and split an image into input and target components.

        Args:
            image_file: Path to the image file

        Returns:
            Tuple of input and target images as tensors
        """
        try:
            image = tf.io.read_file(image_file)
            image = tf.image.decode_jpeg(image)

            # Split image into two halves
            width = tf.shape(image)[1] // 2
            input_image = image[:, :width, :]
            target_image = image[:, width:, :]

            # Normalize images to [-1, 1]
            input_image = (tf.cast(input_image, tf.float32) / 127.5) - 1
            target_image = (tf.cast(target_image, tf.float32) / 127.5) - 1

            return input_image, target_image

        except Exception as e:
            logger.error(f"Error processing image {image_file}: {str(e)}")
            raise

In [5]:
class Generator(Model):
    """Generator model for image translation."""

    def __init__(self):
        super(Generator, self).__init__()
        self.initializer = tf.random_normal_initializer(0., 0.02)
        self._build_encoder()
        self._build_decoder()

    def _build_encoder(self):
        """Build encoder layers."""
        self.down_stack = [
            self._downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
            self._downsample(128, 4),  # (batch_size, 64, 64, 128)
            self._downsample(256, 4),  # (batch_size, 32, 32, 256)
            self._downsample(512, 4),  # (batch_size, 16, 16, 512)
        ]

    def _build_decoder(self):
        """Build decoder layers."""
        self.up_stack = [
            self._upsample(256, 4),  # (batch_size, 32, 32, 256)
            self._upsample(128, 4),  # (batch_size, 64, 64, 128)
            self._upsample(64, 4),   # (batch_size, 128, 128, 64)
        ]

        self.last = Conv2DTranspose(
            3, 4, strides=2, padding='same',
            kernel_initializer=self.initializer,
            activation='tanh'
        )

    def _downsample(self, filters: int, size: int, apply_batchnorm: bool = True):
        """Create a downsampling layer."""
        initializer = self.initializer

        result = tf.keras.Sequential()
        result.add(Conv2D(filters, size, strides=2, padding='same',
                         kernel_initializer=initializer, use_bias=False))

        if apply_batchnorm:
            result.add(BatchNormalization())

        result.add(LeakyReLU())
        return result

    def _upsample(self, filters: int, size: int, apply_dropout: bool = False):
        """Create an upsampling layer."""
        initializer = self.initializer

        result = tf.keras.Sequential()
        result.add(Conv2DTranspose(filters, size, strides=2, padding='same',
                                 kernel_initializer=initializer, use_bias=False))

        result.add(BatchNormalization())

        if apply_dropout:
            result.add(tf.keras.layers.Dropout(0.5))

        result.add(tf.keras.layers.ReLU())
        return result

    def call(self, x, training=True):
        """Forward pass of the generator."""
        skips = []

        # Encoder
        for down in self.down_stack:
            x = down(x)
            skips.append(x)

        skips = reversed(skips[:-1])

        # Decoder
        for up, skip in zip(self.up_stack, skips):
            x = up(x)
            x = tf.keras.layers.Concatenate()([x, skip])

        x = self.last(x)
        return x

In [41]:
class Discriminator(tf.keras.Model):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(64, (4, 4), strides=(2, 2), padding='same')
        self.leaky_relu1 = tf.keras.layers.LeakyReLU(alpha=0.2)
        # Assuming BatchNormalization is applied after conv3
        self.bn = BatchNormalization()  # Create BatchNormalization instance here
        self.conv2 = tf.keras.layers.Conv2D(128, (4, 4), strides=(2, 2), padding='same')
        # Initialize BatchNormalization layers outside the call method
        self.batch_norm1 = tf.keras.layers.BatchNormalization()
        self.batch_norm2 = tf.keras.layers.BatchNormalization()
        self.leaky_relu2 = tf.keras.layers.LeakyReLU(alpha=0.2)
        self.initializer = tf.random_normal_initializer(0., 0.02)
        self._build_model()

    def _build_model(self):
        """Build discriminator architecture."""
        self.conv1 = Conv2D(64, 4, strides=2, padding='same',
                           kernel_initializer=self.initializer)
        self.conv2 = Conv2D(128, 4, strides=2, padding='same',
                           kernel_initializer=self.initializer)
        self.conv3 = Conv2D(256, 4, strides=2, padding='same',
                           kernel_initializer=self.initializer)
        self.output_layer = Conv2D(1, 4, padding='same',
                                 kernel_initializer=self.initializer)

    def call(self, inputs, training=True):
        """Forward pass of the discriminator."""
        x = self.conv1(inputs)
        x = self.leaky_relu1(x)
        x = self.conv2(x)
        x = self.batch_norm2(x, training=training) # Pass training argument
        x = self.leaky_relu2(x)
        x = self.conv1(inputs)
        x = LeakyReLU()(x)

        x = self.conv3(x)
        x = self.bn(x, training=training)  # Use the pre-created instance
        x = LeakyReLU()(x)

        # Concatenate input_image and target along the channels dimension
        x = tf.concat(inputs, axis=-1)  # or axis=3 if channels are last

        x = self.conv1(x)  # Now pass the concatenated tensor to conv1
        x = LeakyReLU()(x)

        # Concatenate the input and target images along the channel dimension
        x = tf.concat(inputs, axis=-1) # Assuming inputs is a list [input_image, target]
        x = self.conv1(x)  # Pass the concatenated tensor to the first layer

        return self.output_layer(x)

In [27]:
class GANTrainer:
    def __init__(self, generator, discriminator):
        self.generator = generator
        self.discriminator = discriminator
        self.gen_optimizer = tf.keras.optimizers.Adam(1e-4)
        self.disc_optimizer = tf.keras.optimizers.Adam(1e-4)

    @tf.function
    def train_step(self, input_image, target):
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            gen_output = self.generator(input_image, training=True)

            # Concatenate input_image and target along the channel dimension
            real_images = tf.concat([input_image, target], axis=-1)
            fake_images = tf.concat([input_image, gen_output], axis=-1)

            disc_real_output = self.discriminator(real_images, training=True)
            disc_fake_output = self.discriminator(fake_images, training=True)

            gen_loss = self._generator_loss(disc_fake_output, gen_output, target)
            disc_loss = self._discriminator_loss(disc_real_output, disc_fake_output)

        gen_gradients = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
        disc_gradients = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)

        self.gen_optimizer.apply_gradients(zip(gen_gradients, self.generator.trainable_variables))
        self.disc_optimizer.apply_gradients(zip(disc_gradients, self.discriminator.trainable_variables))

        return gen_loss, disc_loss

    def _generator_loss(self, disc_generated_output: tf.Tensor, gen_output: tf.Tensor, target: tf.Tensor) -> tf.Tensor:
        """Calculate generator loss."""
        gan_loss = self._adversarial_loss(disc_generated_output)
        l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
        total_gen_loss = gan_loss + (100 * l1_loss)
        return total_gen_loss

    def _discriminator_loss(self, disc_real_output: tf.Tensor, disc_generated_output: tf.Tensor) -> tf.Tensor:
        """Calculate discriminator loss."""
        real_loss = self._adversarial_loss(disc_real_output, real=True)
        generated_loss = self._adversarial_loss(disc_generated_output, real=False)
        total_disc_loss = real_loss + generated_loss
        return total_disc_loss

    def _adversarial_loss(self, output: tf.Tensor, real: bool = True) -> tf.Tensor:
        """Calculate adversarial loss."""
        if real:
            return tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.ones_like(output), output))
        else:
            return tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.zeros_like(output), output))


In [8]:
def load_dataset(image_path: str, target_path: str, batch_size: int = 32) -> tf.data.Dataset:
    """
    Load and prepare the dataset for training.

    Args:
        image_path: Directory containing input images
        target_path: Directory containing target images
        batch_size: Number of samples per batch

    Returns:
        tf.data.Dataset: Prepared dataset for training
    """
    image_dataset = tf.keras.utils.image_dataset_from_directory(
        image_path,
        label_mode=None,
        image_size=(256, 256),
        batch_size=batch_size,
        shuffle=True
    )

    target_dataset = tf.keras.utils.image_dataset_from_directory(
        target_path,
        label_mode=None,
        image_size=(256, 256),
        batch_size=batch_size,
        shuffle=True
    )

    train_dataset = tf.data.Dataset.zip((image_dataset, target_dataset))

    def preprocess(input_image, target_image):
        input_image = tf.cast(input_image, tf.float32) / 127.5 - 1
        target_image = tf.cast(target_image, tf.float32) / 127.5 - 1
        return input_image, target_image

    return train_dataset.map(preprocess).cache().prefetch(tf.data.AUTOTUNE)


In [9]:
def display_sample(generator: Generator, test_input: tf.Tensor, test_target: tf.Tensor):
    """
    Display a sample of input, target, and generated images.

    Args:
        generator: Trained generator model
        test_input: Input image tensor
        test_target: Target image tensor
    """
    generated_image = generator(test_input, training=False)
    plt.figure(figsize=(12, 12))

    images = [test_input[0], test_target[0], generated_image[0]]
    titles = ['Input', 'Target', 'Generated']

    for i, (image, title) in enumerate(zip(images, titles)):
        plt.subplot(1, 3, i + 1)
        plt.title(title)
        plt.imshow(image * 0.5 + 0.5)  # Denormalize the image
        plt.axis('off')

    plt.show()

In [10]:
from google.colab import files
import zipfile
import io

# Upload the zip file file A
uploaded = files.upload()

# Get the name of the uploaded zip file (assuming only one file was uploaded)
zip_filename = list(uploaded.keys())[0]

# Extract the contents of the zip file
with zipfile.ZipFile(io.BytesIO(uploaded[zip_filename]), 'r') as zip_ref:
    zip_ref.extractall('/content/') # Extract to '/content/' directory

print(f"Folder '{zip_filename[:-4]}' uploaded and extracted to '/content/'")

Saving trainA.zip to trainA.zip
Folder 'trainA' uploaded and extracted to '/content/'


In [11]:
# Upload the zip file file B
uploaded = files.upload()

# Get the name of the uploaded zip file (assuming only one file was uploaded)
zip_filename = list(uploaded.keys())[0]

# Extract the contents of the zip file
with zipfile.ZipFile(io.BytesIO(uploaded[zip_filename]), 'r') as zip_ref:
    zip_ref.extractall('/content/') # Extract to '/content/' directory

print(f"Folder '{zip_filename[:-4]}' uploaded and extracted to '/content/'")

Saving trainB.zip to trainB.zip
Folder 'trainB' uploaded and extracted to '/content/'


In [12]:
import os

# Set path
# Update image_path and target_path to point to the extracted folders
image_path = os.path.join('/content/', 'trainA/') # Use os.path.join for platform independence
target_path = os.path.join('/content/', 'trainB/')

def main():
    # Load dataset
    train_dataset = load_dataset(image_path, target_path)

    # Create models
    generator = Generator()
    discriminator = Discriminator()

    # Create trainer
    trainer = GANTrainer(generator, discriminator)

    # Training loop
    epochs = 50
    for epoch in range(epochs):
        for input_image, target in train_dataset:
            gen_loss, disc_loss = trainer.train_step(input_image, target)

        # Log progress
        logger.info(f"Epoch {epoch + 1}/{epochs}, Gen Loss: {gen_loss:.4f}, Disc Loss: {disc_loss:.4f}")

        # Display sample every 10 epochs
        if (epoch + 1) % 10 == 0:
            display_sample(generator, input_image, target)

    # Save the trained generator
    generator.save('trained_generator.h5')
    logger.info("Training completed. Generator saved as 'trained_generator.h5'")

In [None]:
if __name__ == "__main__":
    main()

Found 10000 files.
Found 10000 files.


