In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import glob
import json
import numpy as np
import tensorflow as tf
from PIL import Image
import matplotlib.pyplot as plt
from tensorflow.keras import layers

print("TensorFlow version:", tf.__version__)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
CONTOURS_DIR = "/content/drive/MyDrive/park-GAN/dataset/contours"
MASTERPLANS_DIR = "/content/drive/MyDrive/park-GAN/dataset/master_plans"
AREAS_JSON = "/content/drive/MyDrive/park-GAN/dataset/park_areas.json"

MODELS_DIR = "/content/drive/MyDrive/park-GAN/models/v2.1"
CHECKPOINTS_DIR = "/content/drive/MyDrive/park-GAN/checkpoints/v2.1"

os.makedirs(os.path.join(MODELS_DIR, "generator"), exist_ok=True)
os.makedirs(os.path.join(MODELS_DIR, "discriminator"), exist_ok=True)
os.makedirs(os.path.join(MODELS_DIR, "embedder"), exist_ok=True)
os.makedirs(CHECKPOINTS_DIR, exist_ok=True)

with open(AREAS_JSON, 'r') as f:
    park_areas = json.load(f)

IMG_HEIGHT = 512
IMG_WIDTH = 512
BATCH_SIZE = 64
LAMBDA = 100   # L1 weight for pix2pix
EMBED_DIM = 8  # Area embedding dimension


In [None]:
def get_core_id(fname):
    base = os.path.splitext(os.path.basename(fname))[0]
    if base.endswith("_contour"):
        return base.replace("_contour", "")
    if base.endswith("_schema"):
        return base.replace("_schema", "")
    return base

def get_park_name(core_id):
    parts = core_id.rsplit('_', 1)
    if len(parts) == 2 and parts[1].isdigit():
        return parts[0]
    return core_id

contour_paths = sorted(glob.glob(os.path.join(CONTOURS_DIR, "*_contour.*")))
masterplan_dict = {get_core_id(mp_path): mp_path for mp_path in glob.glob(os.path.join(MASTERPLANS_DIR, "*_schema.*"))}

pairs = []
for cpath in contour_paths:
    cid = get_core_id(cpath)
    park_name = get_park_name(cid)
    if cid in masterplan_dict:
        area_val = park_areas.get(park_name, 0.0)
        pairs.append((cpath, masterplan_dict[cid], area_val))

print("Total valid pairs:", len(pairs))

Total valid pairs: 970


In [None]:
missing_parks = set(get_park_name(get_core_id(cpath)) for cpath in contour_paths) - set(park_areas.keys())
if missing_parks:
    print("Warning: Missing park areas for:", missing_parks)

In [None]:
def load_image_pair(contour_path, masterplan_path):
    contour = Image.open(contour_path).convert('RGB').resize((IMG_WIDTH, IMG_HEIGHT))
    masterplan = Image.open(masterplan_path).convert('RGB').resize((IMG_WIDTH, IMG_HEIGHT))
    c_img = np.array(contour, dtype=np.float32) / 255.0
    m_img = np.array(masterplan, dtype=np.float32) / 255.0
    return c_img, m_img

@tf.function
def combine_with_area(img, area, embedder):
    area_2d = tf.expand_dims(area, axis=-1)
    area_emb = embedder(area_2d)
    area_emb_tiled = tf.tile(tf.reshape(area_emb, [tf.shape(area_emb)[0], 1, 1, EMBED_DIM]),
                             [1, IMG_HEIGHT, IMG_WIDTH, 1])
    return tf.concat([img, area_emb_tiled], axis=-1)

def data_generator():
    for cpath, mpath, area_val in pairs:
        c_img, m_img = load_image_pair(cpath, mpath)
        yield (c_img, m_img, area_val)

raw_dataset = tf.data.Dataset.from_generator(
    data_generator,
    output_types=(tf.float32, tf.float32, tf.float32),
    output_shapes=((IMG_HEIGHT, IMG_WIDTH, 3), (IMG_HEIGHT, IMG_WIDTH, 3), ())
)

dataset = (
    raw_dataset
    .map(lambda c, m, a: (c, m, a), num_parallel_calls=tf.data.AUTOTUNE)
    .shuffle(buffer_size=len(pairs))
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

print("Dataset created.")

Dataset created.


In [None]:
class AreaEmbedder(tf.keras.Model):
    def __init__(self, embedding_dim=8):
        super().__init__()
        self.dense1 = layers.Dense(16, activation='relu')
        self.dense2 = layers.Dense(embedding_dim, activation=None)

    def call(self, area_tensor):
        x = self.dense1(area_tensor)
        x = self.dense2(x)
        return x

area_embedder = AreaEmbedder(embedding_dim=EMBED_DIM)


def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    block = tf.keras.Sequential()
    block.add(layers.Conv2D(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        block.add(layers.BatchNormalization())
    block.add(layers.LeakyReLU())
    return block

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    block = tf.keras.Sequential()
    block.add(layers.Conv2DTranspose(filters, size, strides=2, padding='same', kernel_initializer=initializer, use_bias=False))
    block.add(layers.BatchNormalization())
    if apply_dropout:
        block.add(layers.Dropout(0.5))
    block.add(layers.ReLU())
    return block

def build_generator(in_channels=3+EMBED_DIM, out_channels=3):
    inputs = layers.Input(shape=[IMG_HEIGHT, IMG_WIDTH, in_channels])
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
    ]
    up_stack = [
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4),
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
    ]
    initializer = tf.random_normal_initializer(0., 0.02)
    last = layers.Conv2DTranspose(out_channels, 4, strides=2, padding='same', kernel_initializer=initializer, activation='tanh')

    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])
    x = last(x)
    return tf.keras.Model(inputs=inputs, outputs=x)

generator = build_generator()


def build_discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    in_channels = 6 + 2*EMBED_DIM
    inputs = layers.Input(shape=[IMG_HEIGHT, IMG_WIDTH, in_channels])
    x = downsample(64, 4, apply_batchnorm=False)(inputs)
    x = downsample(128, 4)(x)
    x = downsample(256, 4)(x)
    x = layers.ZeroPadding2D()(x)
    x = layers.Conv2D(512, 4, strides=1, kernel_initializer=initializer, use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU()(x)
    x = layers.ZeroPadding2D()(x)
    x = layers.Conv2D(1, 4, strides=1, kernel_initializer=initializer)(x)
    return tf.keras.Model(inputs=inputs, outputs=x)

discriminator = build_discriminator()

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def generator_loss(disc_generated_output, gen_output, target):
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
    total_gen_loss = gan_loss + LAMBDA * l1_loss
    return total_gen_loss, gan_loss, l1_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)
    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)
    return (real_loss + generated_loss) * 0.5

generator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4, beta_1=0.5)

In [None]:
@tf.function
def train_step(contours, masterplans, areas):
    with tf.GradientTape(persistent=True) as tape:
        contour_with_area = combine_with_area(contours, areas, area_embedder)
        fake_masterplan = generator(contour_with_area, training=True)
        real_with_area = combine_with_area(masterplans, areas, area_embedder)
        fake_with_area = combine_with_area(fake_masterplan, areas, area_embedder)
        disc_real_in = tf.concat([contour_with_area, real_with_area], axis=-1)
        disc_fake_in = tf.concat([contour_with_area, fake_with_area], axis=-1)
        disc_real_output = discriminator(disc_real_in, training=True)
        disc_fake_output = discriminator(disc_fake_in, training=True)
        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_fake_output, fake_masterplan, masterplans)
        disc_loss = discriminator_loss(disc_real_output, disc_fake_output)

    generator_gradients = tape.gradient(gen_total_loss, generator.trainable_variables + area_embedder.trainable_variables)
    discriminator_gradients = tape.gradient(disc_loss, discriminator.trainable_variables)
    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables + area_embedder.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))
    return gen_total_loss, gen_gan_loss, gen_l1_loss, disc_loss

def fit(dataset, start_epoch, total_epochs, val_contours, val_masterplans, val_areas):
    for epoch in range(start_epoch, total_epochs + 1):
        print(f"Starting epoch {epoch}/{total_epochs}")
        for step, (contours_batch, masterplans_batch, area_batch) in enumerate(dataset):
            gen_loss, gan_loss, l1_loss, disc_loss = train_step(contours_batch, masterplans_batch, area_batch)
            if step % 50 == 0:
                print(f"  Step {step}, Gen Loss: {gen_loss.numpy():.4f} (GAN: {gan_loss.numpy():.4f}, L1: {l1_loss.numpy():.4f}), Disc Loss: {disc_loss.numpy():.4f}")

        if epoch % 10 == 0:
            generator.save_weights(os.path.join(MODELS_DIR, "generator", f"generator_epoch_{epoch}.weights.h5"))
            discriminator.save_weights(os.path.join(MODELS_DIR, "discriminator", f"discriminator_epoch_{epoch}.weights.h5"))
            area_embedder.save_weights(os.path.join(MODELS_DIR, "embedder", f"area_embedder_epoch_{epoch}.weights.h5"))

            val_contour_with_area = combine_with_area(val_contours, val_areas, area_embedder)
            val_fake_masterplan = generator(val_contour_with_area, training=False)
            for i in range(min(5, val_contours.shape[0])):
                fig, axes = plt.subplots(1, 3, figsize=(15, 5))
                fig.suptitle(f"Epoch {epoch}", fontsize=16)
                axes[0].imshow(val_contours[i])
                axes[0].set_title(f"Contour (Area={val_areas[i].numpy():.2f}ha)")
                axes[1].imshow((val_fake_masterplan[i].numpy() + 1) / 2.0)
                axes[1].set_title("Generated Masterplan")
                axes[2].imshow(val_masterplans[i])
                axes[2].set_title("Real Masterplan")
                for ax in axes:
                    ax.axis("off")
                image_path = os.path.join(CHECKPOINTS_DIR, f"val_epoch_{epoch}_sample_{i}.png")
                plt.savefig(image_path)
                plt.close()


In [None]:
VAL_CONTOUR_PATHS = [
    "/content/drive/MyDrive/park-GAN/dataset/contours/nl_hag_bogaardplein_1_contour.jpg",
    "/content/drive/MyDrive/park-GAN/dataset/contours/us_elp_centennial_7_contour.jpg"
]
VAL_MASTERPLAN_PATHS = [
    "/content/drive/MyDrive/park-GAN/dataset/master_plans/nl_hag_bogaardplein_1_schema.jpg",
    "/content/drive/MyDrive/park-GAN/dataset/master_plans/us_elp_centennial_7_schema.jpg"
]

val_contours_list = []
val_masterplans_list = []
val_areas_list = []

for c_path, m_path in zip(VAL_CONTOUR_PATHS, VAL_MASTERPLAN_PATHS):
    c_img, m_img = load_image_pair(c_path, m_path)
    val_contours_list.append(c_img)
    val_masterplans_list.append(m_img)
    core_id = get_core_id(c_path)
    park_name = get_park_name(core_id)
    area_val = park_areas.get(park_name, 0.0)
    val_areas_list.append(area_val)

val_contours = tf.stack(val_contours_list)
val_masterplans = tf.stack(val_masterplans_list)
val_areas = tf.convert_to_tensor(val_areas_list, dtype=tf.float32)

generator.load_weights(os.path.join(MODELS_DIR, "generator", f"generator_epoch_900.weights.h5"))
discriminator.load_weights(os.path.join(MODELS_DIR, "discriminator", f"discriminator_epoch_900.weights.h5"))

area_embedder.build(input_shape=(None, 1))
area_embedder.load_weights(os.path.join(MODELS_DIR, "embedder", f"area_embedder_epoch_900.weights.h5"))

In [None]:
fit(dataset, start_epoch=901, total_epochs=1200, val_contours=val_contours, val_masterplans=val_masterplans, val_areas=val_areas)