In [2]:
import os
import numpy as np
from PIL import Image

def load_cityscapes_dataset(dataset_path):
    real_images = []
    generated_images = []

    # Get the list of file names in the dataset directory
    file_names = os.listdir(dataset_path)

    for file_name in file_names:
        # Load the image
        image_path = os.path.join(dataset_path, file_name)
        image = Image.open(image_path)

        # Split the image into real and generated
        width = image.width
        real_image = image.crop((0, 0, width // 2, image.height))
        generated_image = image.crop((width // 2, 0, width, image.height))

        # Convert images to numpy arrays
        real_image = np.array(real_image)
        generated_image = np.array(generated_image)

        # Normalize pixel values to the range [-1, 1]
        real_image = (real_image / 127.5) - 1.0
        generated_image = (generated_image / 127.5) - 1.0

        # Add the images to the respective lists
        real_images.append(real_image)
        generated_images.append(generated_image)

    # Convert the lists to numpy arrays
    real_images = np.array(real_images)
    generated_images = np.array(generated_images)

    return real_images, generated_images


In [3]:
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Conv2D, ZeroPadding2D, \
    LeakyReLU, BatchNormalization
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import numpy as np
import matplotlib.pyplot as plt


2023-06-08 15:23:29.633561: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [5]:
class GAN():
    def __init__(self, img_rows, img_cols, channels, latent_dim):
        self.img_rows = img_rows
        self.img_cols = img_cols
        self.channels = channels
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.latent_dim = latent_dim

        optimizer = Adam(0.0002, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
                                   optimizer=optimizer,
                                   metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        validity = self.discriminator(img)

        # The combined model (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, validity)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

    def build_generator(self):
        model = Sequential()

        model.add(Dense(256, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(1024))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        img = model(noise)

        return Model(noise, img)


    def build_discriminator(self):
        model = Sequential()

        model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=self.img_shape, padding='same'))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(64, kernel_size=3, strides=2, padding='same'))
        model.add(ZeroPadding2D(padding=((0,1),(0,1))))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(128, kernel_size=3, strides=2, padding='same'))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Conv2D(256, kernel_size=3, strides=1, padding='same'))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.25))
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        model.summary()

        img = Input(shape=self.img_shape)
        validity = model(img)

        return Model(img, validity)

In [6]:
def train_pix2pix(real_images, generated_images, epochs, batch_size):
    # Initialize the generator and discriminator
    generator = GAN.build_generator()
    discriminator = GAN.build_discriminator()

    # Compile the models
    generator.compile(...)
    discriminator.compile(...)

    for epoch in range(epochs):
        # ---------------------
        #  Train Discriminator
        # ---------------------

        # Select a random batch of images
        idx = np.random.randint(0, real_images.shape[0], batch_size)
        real_batch = real_images[idx]
        generated_batch = generated_images[idx]

        # Generate fake images using the generator
        fake_batch = generator.predict(generated_batch)

        # Train the discriminator
        discriminator_loss_real = discriminator.train_on_batch(real_batch, ...)
        discriminator_loss_fake = discriminator.train_on_batch(fake_batch, ...)
        discriminator_loss = 0.5 * np.add(discriminator_loss_real, discriminator_loss_fake)

        # -----------------
        #  Train Generator
        # -----------------

        # Generate fake images using the generator
        fake_batch = generator.predict(generated_batch)

        # Train the generator
        generator_loss = generator.train_on_batch([generated_batch, fake_batch], ...)

        # Print the losses for monitoring
        print(f"Epoch {epoch}: [D loss: {discriminator_loss}] [G loss: {generator_loss}]")


In [8]:
real_images, generated_images = load_cityscapes_dataset("train")
train_pix2pix(real_images, generated_images, epochs=200, batch_size=32)

: 

: 