In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

from os import listdir
from numpy import asarray, savez_compressed
from tensorflow.keras.utils import img_to_array, load_img
import tensorflow as tf

from keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.layers import Input, Conv2D, LeakyReLU, Activation, Concatenate, BatchNormalization, Conv2DTranspose, Dense
from tensorflow.keras.initializers import RandomNormal
from keras.utils.vis_utils import plot_model

import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Data Visualization & Processing 

In [None]:
# load all images in a directory into memory
def load_images(path, size=(256,512)):
    src_list, tar_list = list(), list()
    
    # enumerate filenames in directory, assume all are images
    for filename in listdir(path):
        
        # load and resize the image
        pixels = load_img(path + filename, target_size=size)
        
        # convert to an array
        pixels = img_to_array(pixels)
        
        # split into satellite and map
        sat_img, map_img = pixels[:, :256], pixels[:, 256:]
        
        src_list.append(sat_img)
        tar_list.append(map_img)
        
    return [asarray(src_list), asarray(tar_list)]

In [None]:
path = "/kaggle/input/pix2pix-maps/train/"

[src_img, tar_img] = load_images(path)

print("Loaded: ", src_img.shape, tar_img.shape)

# save as compressed numpy array
filename = "maps_256.npz"
savez_compressed(filename, src_img, tar_img)
print("Saved dataset: ", filename)


In [None]:
pat = "/kaggle/input/pix2pix-maps/train/1.jpg"
pix = load_img(pat,target_size=(550, 1000))
a=img_to_array(pix)
a.shape

In [None]:
import matplotlib.pyplot as plt

plt.imshow(a/255.)

sat_img, map_img = a[:, :500], a[:, 500:]

In [None]:
plt.imshow(sat_img/255.)

In [None]:
plt.imshow(map_img/255.)

In [None]:
from numpy import load
from matplotlib import pyplot
# load the face dataset
data = load("/kaggle/working/maps_256.npz")
src_images, tar_images = data["arr_0"], data["arr_1"]
print("Loaded: ", src_images.shape, tar_images.shape)
# plot source images
n_samples = 3
for i in range(n_samples):
    pyplot.subplot(2, n_samples, 1 + i)
    pyplot.axis("off")
    pyplot.imshow(src_images[i].astype("uint8"))
# plot target image
for i in range(n_samples):
    pyplot.subplot(2, n_samples, 1 + n_samples + i)
    pyplot.axis("off")
    pyplot.imshow(tar_images[i].astype("uint8"))
    pyplot.show()


# Model Configuration Starts here

## Discriminator Model
The discriminator is a deep convolutional neural network that performs image classification.
Specifically, conditional-image classification. It takes both the source image (e.g. satellite photo)
and the target image (e.g. Google maps image) as input and predicts the likelihood of whether
target image is a real or fake translation of the source image. The discriminator design is based
on the effective receptive field of the model, which defines the relationship between one output
of the model to the number of pixels in the input image. This is called a PatchGAN model and
is carefully designed so that each output prediction of the model maps to a 70 × 70 square or
patch of the input image. The benefit of this approach is that the same model can be applied
to input images of different sizes, e.g. larger or smaller than 256 × 256 pixels.

In [None]:
from keras.layers import Input, Conv2D, LeakyReLU, Activation, Concatenate, BatchNormalization, Conv2DTranspose, Dense, Dropout


In [None]:
def discriminator(image_shape=[256, 256, 3]):
    
    # An idle initializers for neural network (that's what paper say let's see)
    init = RandomNormal(stddev=0.02)
    
    # size of the source image input
    inp = tf.keras.layers.Input(shape=image_shape, name='input_image')

    # size of target image input
    tar = tf.keras.layers.Input(shape=image_shape, name='target_image')

    
    merge = Concatenate()([inp,tar]) # (batch_size, 256, 256, channels*2)
    
    # first hidden layer : C64
    one = Conv2D(64,kernel_size=(4,4),strides=(2,2), padding="same",kernel_initializer=init)(merge)
    avt1 = LeakyReLU(0.2)(one)
    
    # second hidden layer : C128
    two = Conv2D(128,kernel_size=(4,4),strides=(2,2), padding="same",kernel_initializer=init)(avt1)
    d = BatchNormalization()(two)
    avt2 = LeakyReLU(alpha=0.2)(d)
    
    # third hidden layer : C256
    three = Conv2D(256,kernel_size=(4,4),strides=(2,2), padding="same",kernel_initializer=init)(avt2)
    d2 = BatchNormalization()(three)
    avt3 = LeakyReLU(alpha=0.2)(d2)
    
    # fourth hidden layer : C512
    four = Conv2D(512,kernel_size=(4,4),strides=(2,2), padding="same",kernel_initializer=init)(avt3)
    d3 = BatchNormalization()(four)
    avt4 = LeakyReLU(alpha=0.2)(d3)
    
    # second-last layer : C512
    d = Conv2D(512, (4,4), padding="same", kernel_initializer=init)(avt4)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha=0.2)(d)

    # patch output/hidden layer 
    out = Conv2D(1,(4,4),padding="same",kernel_initializer=init)(d)
    patch_out = Activation("sigmoid")(out)
    
    model = Model([inp,tar],patch_out)
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(optimizer=opt, loss="binary_crossentropy", loss_weights=0.5)
    
    return model

In [None]:
image_shape = (256,256,3)
# create the model
model = discriminator(image_shape)
# summarize the model
model.summary()
# plot the model
plot_model(model, show_shapes=True,show_layer_names=True)

### Generator Model: Encoder block utility

In [None]:
def encoder_block(layer_in, units, batchnorm = True):
    
    init = RandomNormal(stddev=0.02)
    
    enc = Conv2D(units,(4,4),strides=(2,2),padding="same",kernel_initializer=init)(layer_in)
    
    # conditionally add batch norm, as this function is utility
    if batchnorm:
        enc = BatchNormalization()(enc)
    
    g = LeakyReLU(alpha=0.2)(enc)
    
    return g
    

### Generator Model: Decoder Block utility

In [None]:
def decoder_block(layer_in, units, skip_in, dropout = True):
    
    init = RandomNormal(stddev=0.02)
    
    dec = Conv2DTranspose(units,(4,4),strides=(2,2),padding="same",kernel_initializer=init)(layer_in)
    
    g = BatchNormalization()(dec, training=True)

    # conditionally add batch norm, as this function is utility
    if dropout:
        g = Dropout(0.5)(g, training=True)
        
    # merge with skip connection
    g = Concatenate()([g, skip_in])
    # relu activation
    g = Activation("relu")(g)

    return g
    

## Generator Model Definition: U-net struture
The generator model is more complex than the discriminator model. The generator is
an encoder-decoder model using a U-Net architecture. The model takes a source image (e.g.
satellite photo) and generates a target image (e.g. Google maps image). It does this by first
downsampling or encoding the input image down to a bottleneck layer, then upsampling or
decoding the bottleneck representation to the size of the output image

In [None]:
def generator(image_shape=(256,256,3)):
    
    init = RandomNormal(stddev=0.02)
    
    input_img = Input(shape=image_shape)
    
    # Encoder Block
    e1 = encoder_block(input_img, 64, batchnorm=False)
    e2 = encoder_block(e1,128)
    e3 = encoder_block(e2,256)
    e4 = encoder_block(e3,512)
    e5 = encoder_block(e4,512)
    e6 = encoder_block(e5,512)
    e7 = encoder_block(e6,512)
    
    # bottleneck, no batch norm and relu
    b = Conv2D(512, (4,4), strides=(2,2), padding="same", kernel_initializer=init)(e7)
    b = Activation("relu")(b)
    
    # Decoder Block
    d1 = decoder_block(b,512,e7)
    d2 = decoder_block(d1,512,e6)
    d3 = decoder_block(d2,512,e5)
    d4 = decoder_block(d3,512,e4,dropout=False)
    d5 = decoder_block(d4,256,e3,dropout=False)
    d6 = decoder_block(d5,128,e2,dropout=False)
    d7 = decoder_block(d6,64,e1,dropout=False)

    # output
    g = Conv2DTranspose(3, (4,4), strides=(2,2), padding="same", kernel_initializer=init)(d7)
    out_image = Activation("tanh")(g)
    
    # define model
    model = Model(input_img, out_image)
    return model

In [None]:
model = generator()
plot_model(model)

In [None]:
def pix2pix_model(g_model, d_model, input_img):
    
    d_model.trainable = False
    
    in_img = Input(shape=input_img)
    
    gen_out = g_model(in_img)
    
    dis_out = d_model([in_img, gen_out])
    
    model = Model(in_img, [gen_out, dis_out])
    
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    
    model.compile(optimizer=opt, loss=['binary_crossentropy','mae'], loss_weights=[1,100])
    
    return model

In [None]:
# load and prepare training images
def load_real_samples(filename):
    # load the compressed arrays
    data = np.load(filename)
    # unpack the arrays
    X1, X2 = data["arr_0"], data["arr_1"]
    # scale from [0,255] to [-1,1]
    X1 = (X1 - 127.5) / 127.5
    X2 = (X2 - 127.5) / 127.5
    return [X1, X2]


In [None]:
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
    # unpack dataset
    trainA, trainB = dataset
    # choose random instances
    ix = np.random.randint(0, trainA.shape[0], n_samples)
    # retrieve selected images
    X1, X2 = trainA[ix], trainB[ix]
    # generate "real" class labels (1)
    y = np.ones((n_samples, patch_shape, patch_shape, 1))
    return [X1, X2], y


In [None]:
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, samples, patch_shape):
    # generate fake instance
    X = g_model.predict(samples)
    # create "fake" class labels (0)
    y = np.zeros((len(X), patch_shape, patch_shape, 1))
    return X, y


In [None]:
def summarize_performance(step, g_model, dataset, n_samples=3):
    # select a sample of input images
    [X_realA, X_realB], _ = generate_real_samples(dataset, n_samples, 1)
    # generate a batch of fake samples
    X_fakeB, _ = generate_fake_samples(g_model, X_realA, 1)
    # scale all pixels from [-1,1] to [0,1]
    X_realA = (X_realA + 1) / 2.0
    X_realB = (X_realB + 1) / 2.0
    X_fakeB = (X_fakeB + 1) / 2.0
    # plot real source images
    for i in range(n_samples):
        pyplot.subplot(3, n_samples, 1 + i)
        pyplot.axis("off")
        pyplot.imshow(X_realA[i])
    # plot generated target image
    for i in range(n_samples):
        pyplot.subplot(3, n_samples, 1 + n_samples + i)
        pyplot.axis("off")
        pyplot.imshow(X_fakeB[i])
    # plot real target image
    for i in range(n_samples):
        pyplot.subplot(3, n_samples, 1 + n_samples*2 + i)
        pyplot.axis("off")
        pyplot.imshow(X_realB[i])
    # save plot to file
    filename1 = "plot_%06d.png" % (step+1)
    pyplot.savefig(filename1)
    pyplot.close()
    # save the generator model
    filename2 = "model_%06d.h5" % (step+1)
    g_model.save(filename2)
    print(">Saved: %s and %s" % (filename1, filename2))

In [None]:
# train pix2pix models
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):
    # determine the output square shape of the discriminator
    n_patch = d_model.output_shape[1]
    # unpack dataset
    trainA, trainB = dataset
    # calculate the number of batches per training epoch
    bat_per_epo = int(len(trainA) / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # manually enumerate epochs
    for i in range(n_steps):
        # select a batch of real samples
        [X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
        # generate a batch of fake samples
        X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
        # update discriminator for real samples
        d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
        # update discriminator for generated samples
        d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
        # update the generator
        g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
        # summarize performance
        print(">%d, d1[%.3f] d2[%.3f] g[%.3f]" % (i+1, d_loss1, d_loss2, g_loss))
        # summarize model performance
        if (i+1) % (bat_per_epo * 10) == 0:
            summarize_performance(i, g_model, dataset)


In [None]:
dataset = load_real_samples('/kaggle/working/maps_256.npz')
print(f'Loaded data size : {dataset[0].shape}, {dataset[1].shape}')

img_shape = dataset[0].shape[1:]

d_model = discriminator(img_shape)
g_model = generator(img_shape)

gan_model = pix2pix_model(g_model,d_model,img_shape)

train(d_model,g_model=g_model,gan_model=gan_model,dataset=dataset,n_epochs=50)