# Code used to train a cGAN to generate DEMs conditioned to rgb satellite images in google Colab (GPU accelerated)

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals

try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

import os
import numpy as np
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output

from google.colab import drive
from google.colab import files
import sys
import imageio
import csv


TensorFlow 2.x selected.


In [0]:
#Input image = satellite, Real Image = DEM
BUFFER_SIZE = 400
BATCH_SIZE = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256
INPUT_CHANNELS = 3
OUTPUT_CHANNELS = 1
LAMBDA = 100
EPOCHS = 300

In [0]:
def resize(input_image, real_image, height, width):
    input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

    return input_image, real_image

def random_crop(input_image, real_image):
    stacked_image = tf.stack([input_image, real_image], axis=0)
    cropped_image = tf.image.random_crop(
      stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])

    return cropped_image[0], cropped_image[1]


# normalizing the images to [-1, 1]

def normalize(input_image, real_image):
    input_image = (input_image / 127.5) - 1
    real_image = (real_image / 127.5) - 1

    return input_image, real_image

@tf.function()
def random_jitter(input_image, real_image):
    # resizing to 286 x 286 x 3
    input_image, real_image = resize(input_image, real_image, 286, 286)

    # randomly cropping to 256 x 256 x 3
    input_image, real_image = random_crop(input_image, real_image)

    if tf.random.uniform(()) > 0.5:
        # random mirroring
        input_image = tf.image.flip_left_right(input_image)
        real_image = tf.image.flip_left_right(real_image)

    return input_image, real_image

def load_image_train(input_image, real_image):
    # real_image = tf.image.grayscale_to_rgb(real_image)
    # input_image, real_image = random_jitter(input_image, real_image)
    input_image, real_image = normalize(input_image, real_image)
    # real_image = tf.image.rgb_to_grayscale(real_image)
    return input_image, real_image

def load_image_test(input_image, real_image):
    # real_image = tf.image.grayscale_to_rgb(real_image)
    # input_image, real_image = random_jitter(input_image, real_image)
    input_image, real_image = normalize(input_image, real_image)
    # real_image = tf.image.rgb_to_grayscale(real_image)
    return input_image, real_image

    return input_image, real_image
# load and prepare training images
def load_real_samples(filename):
	# load compressed arrays
	data = np.load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	return [X1, X2]

In [4]:
drive.mount('/content/drive')
path = '/content/drive/My Drive/ImageToDEM/'
sys.path.append(path)
# load image data
dataset = load_real_samples(path + 'Data/Atacama.npz')
print('Loaded', dataset[0].shape, dataset[1].shape)
dataset[0] = tf.cast(dataset[0],tf.float32)
dataset[1] = tf.cast(dataset[1],tf.float32)

train_dataset = tf.data.Dataset.from_tensor_slices((dataset[0], dataset[1]))
train_dataset = train_dataset.map(load_image_train)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(1)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive
Loaded (196, 256, 256, 3) (196, 256, 256, 1)


In [0]:
def downsample(filters, size, apply_batchnorm=True):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

  if apply_batchnorm:
    result.add(tf.keras.layers.BatchNormalization())

  result.add(tf.keras.layers.LeakyReLU())

  return result

down_model = downsample(3, 4)

In [0]:
def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
    tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))

  result.add(tf.keras.layers.BatchNormalization())

  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())

  return result

up_model = upsample(3, 4)

In [0]:
def Generator(input_channels=INPUT_CHANNELS, output_channels=OUTPUT_CHANNELS):
  down_stack = [
    downsample(64, 4, apply_batchnorm=False), # (bs, 128, 128, 64)
    downsample(128, 4), # (bs, 64, 64, 128)
    downsample(256, 4), # (bs, 32, 32, 256)
    downsample(512, 4), # (bs, 16, 16, 512)
    downsample(512, 4), # (bs, 8, 8, 512)
    downsample(512, 4), # (bs, 4, 4, 512)
    downsample(512, 4), # (bs, 2, 2, 512)
    downsample(512, 4), # (bs, 1, 1, 512)
  ]

  up_stack = [
    upsample(512, 4, apply_dropout=True), # (bs, 2, 2, 1024)
    upsample(512, 4, apply_dropout=True), # (bs, 4, 4, 1024)
    upsample(512, 4, apply_dropout=True), # (bs, 8, 8, 1024)
    upsample(512, 4), # (bs, 16, 16, 1024)
    upsample(256, 4), # (bs, 32, 32, 512)
    upsample(128, 4), # (bs, 64, 64, 256)
    upsample(64, 4), # (bs, 128, 128, 128)
  ]

  initializer = tf.random_normal_initializer(0., 0.02)
  last = tf.keras.layers.Conv2DTranspose(output_channels, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh') # (bs, 256, 256, 3)

  concat = tf.keras.layers.Concatenate()

  inputs = tf.keras.layers.Input(shape=[None,None,input_channels])
  x = inputs

  # Downsampling through the model
  skips = []
  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    x = concat([x, skip])

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

In [0]:
generator = Generator()


In [0]:
def Discriminator(input_channels=INPUT_CHANNELS, output_channels=OUTPUT_CHANNELS):
  initializer = tf.random_normal_initializer(0., 0.02)

  inp = tf.keras.layers.Input(shape=[None, None, input_channels], name='input_image')
  tar = tf.keras.layers.Input(shape=[None, None, output_channels], name='target_image')

  x = tf.keras.layers.concatenate([inp, tar]) # (bs, 256, 256, channels*2)

  down1 = downsample(64, 4, False)(x) # (bs, 128, 128, 64)
  down2 = downsample(128, 4)(down1) # (bs, 64, 64, 128)
  down3 = downsample(256, 4)(down2) # (bs, 32, 32, 256)

  zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3) # (bs, 34, 34, 256)
  conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1) # (bs, 31, 31, 512)

  batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

  leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

  zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu) # (bs, 33, 33, 512)

  last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2) # (bs, 30, 30, 1)

  return tf.keras.Model(inputs=[inp, tar], outputs=last)

discriminator = Discriminator()

In [0]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(disc_real_output, disc_generated_output):
  real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

  generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

  total_disc_loss = real_loss + generated_loss

  return total_disc_loss

def generator_loss(disc_generated_output, gen_output, target):
  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # mean absolute error
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  total_gen_loss = gan_loss + (LAMBDA * l1_loss)

  return total_gen_loss, gan_loss, l1_loss

generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [0]:

def generate_images(model, test_input, tar):
  prediction = model(test_input, training=True)
  plt.figure(figsize=(15,15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Input Image', 'Ground Truth', 'Predicted Image']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    if(display_list[i].shape[2] == 1):
      display_list[i] = tf.image.grayscale_to_rgb(display_list[i])
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()

def save_images(model, test_input, tar, e, path):
  prediction = model(test_input, training=True)
  plt.figure(figsize=(15,15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Input Image', 'Ground Truth', 'Predicted Image']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # getting the pixel values between [0, 1] to plot it.
    if(display_list[i].shape[2] == 1):
      display_list[i] = tf.image.grayscale_to_rgb(display_list[i])
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.savefig(path + "/epoch_"+ str(e) + '.png')
  print(path + "/epoch_"+ str(e) + '.png')
  
def save_pretty(model, pretty, e, path):
  k = 0
  for l in range(len(pretty[0])):
    if not os.path.isdir(path + "/epoch" + str(e)):
      os.mkdir(path + "/epoch" + str(e))

    pretty_input = (pretty[0][l]/127.5)-1
    pretty_input =  tf.cast(pretty_input,tf.float32)
    pretty_input = tf.expand_dims(pretty_input, 0)
    pretty_target = (pretty[1][l]/127.5) -1
    prediction = model(pretty_input, training=True)
    imageio.imwrite(path + "/epoch" + str(e) + "/pretty_"+ str(k) + 'pred.png', np.uint8( 255 * np.array(prediction[0] * 0.5 + 0.5)))
    imageio.imwrite(path + "/epoch" + str(e) + "/pretty_"+ str(k) + 'in.png', np.uint8( 255 * np.array(pretty_input[0] * 0.5 + 0.5)))
    imageio.imwrite(path + "/epoch" + str(e) + "/pretty_"+ str(k) + 'tar.png', np.uint8( 255 * np.array(pretty_target * 0.5 + 0.5)))
    k += 1

def savedata(dataname, *data):
  with open(dataname, 'a') as fd:
    writer = csv.writer(fd)
    writer.writerow(list(data))

In [0]:
@tf.function
def train_step(input_image, target):
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    gen_output = generator(input_image, training=True)

    disc_real_output = discriminator([input_image, target], training=True)
    disc_generated_output = discriminator([input_image, gen_output], training=True)

    gen_loss, gan_loss, l1_loss = generator_loss(disc_generated_output, gen_output, target)
    disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

  generator_gradients = gen_tape.gradient(gen_loss,
                                          generator.trainable_variables)
  discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

  generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))
  return gen_loss, gan_loss, l1_loss, disc_loss

In [0]:
def fit(train_ds, epochs, test_ds, pretty=[], validation = []):
  dataname = "/content/drive/My Drive/ImageToDEM/ncGAN/models/results/ncGANData.csv"
  if(pretty):
    print("Pretty Images chosen")
    for k in range(len(pretty[0])):

      pretty_input = (pretty[0][k]/127.5)-1
      pretty_target = (pretty[1][k]/127.5) -1
      display_list = [pretty_input, pretty_target]
      title = ['Pretty Image', 'Ground Truth']
      for i in range(2):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        # getting the pixel values between [0, 1] to plot it.
        if(display_list[i].shape[2] == 1):
          plt.imshow(display_list[i][:, :, 0]*0.5 + 0.5, cmap='gray')
        else:
          plt.imshow(display_list[i]*0.5 + 0.5)
        plt.axis('off')
      plt.show()
  epoch_times = []
  for epoch in range(epochs):
    start = time.time()

    # Train
    l1Avg = []
    genAvg = []
    ganAvg = []
    discAvg = []
    for input_image, target in train_ds:
      gen_loss, gan_loss, l1_loss, disc_loss = train_step(input_image, target)
      l1Avg.append(l1_loss)
      genAvg.append(gen_loss)
      ganAvg.append(gan_loss)
      discAvg.append(disc_loss)



    # for val

    clear_output(wait=True)
    l1AvgVal = []
    genAvgVal = []
    ganAvgVal = []
    discAvgVal = []
    for input_imageval, targetval in validation:
      gen_output = generator(input_imageval, training=False)
      disc_real_output = discriminator([input_imageval, targetval], training=False)
      disc_generated_output = discriminator([input_imageval, gen_output], training=False)

      gen_loss, gan_loss, l1_loss = generator_loss(disc_generated_output, gen_output, targetval)
      disc_loss = discriminator_loss(disc_real_output, disc_generated_output)
      l1AvgVal.append(l1_loss)
      genAvgVal.append(gen_loss)
      ganAvgVal.append(gan_loss)
      discAvgVal.append(disc_loss)

    L1Loss = np.mean(l1Avg)
    genLoss = np.mean(genAvg)
    ganLoss = np.mean(ganAvg)
    discLoss = np.mean(discAvg)

    ValL1Loss = np.mean(l1AvgVal)
    ValgenLoss = np.mean(genAvgVal)
    ValganLoss = np.mean(ganAvgVal)
    ValdiscLoss = np.mean(discAvgVal)

    
    savedata(dataname, epoch, L1Loss, genLoss, ganLoss, discLoss, ValL1Loss, ValgenLoss, ValganLoss, ValdiscLoss)
    epoch_times.append(time.time()-start)
    # Test on the same image so that the progress of the model can be 
    # easily seen.
    # for example_input, example_target in test_ds.take(1):
    #   generate_images(generator, example_input, example_target)
    #   save_images(generator, example_input, example_target, epoch, '/content/drive/My Drive/ImageToDEM/ncGAN/models/results')
    if(pretty):
      save_pretty(generator, pretty, epoch, '/content/drive/My Drive/ImageToDEM/ncGAN/models/pretty')
    # saving (checkpoint) the model every 20 epochs
    if (epoch + 1) % 20 == 0 or epoch==0:
      tf.saved_model.save(generator, '/content/drive/My Drive/ImageToDEM/ncGAN/models')
      print('Model saved')
    print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
                                                        time.time()-start))
    print ('Estimated Time left for remaining {} epochs is {} sec = {} hours\n'.format(epochs - epoch - 1, (epochs - epoch - 1) * sum(epoch_times) / len(epoch_times), (epochs - epoch - 1) * sum(epoch_times) / len(epoch_times)/3600))

In [14]:
# load image data
datasetT = load_real_samples(path + 'DataAll/TestPrettyRGB.npz')

pretty = [5, 6, 11, 13]
pretty_dataset = ([datasetT[0][x] for x in pretty], [datasetT[1][x] for x in pretty])

print('Loaded', datasetT[0].shape, datasetT[1].shape)
datasetT[0] = tf.cast(datasetT[0],tf.float32)
datasetT[1] = tf.cast(datasetT[1],tf.float32)

test_dataset = tf.data.Dataset.from_tensor_slices((datasetT[0], datasetT[1]))
test_dataset = test_dataset.map(load_image_test)
test_dataset = test_dataset.shuffle(BUFFER_SIZE)
test_dataset = test_dataset.batch(1)



Loaded (16, 256, 256, 3) (16, 256, 256, 1)


In [15]:
# load image data
datasetVal = load_real_samples(path + 'DataAll/ValidateAllRGB.npz')

print('Loaded', datasetVal[0].shape, datasetVal[1].shape)
datasetVal[0] = tf.cast(datasetVal[0],tf.float32)
datasetVal[1] = tf.cast(datasetVal[1],tf.float32)

validation_dataset = tf.data.Dataset.from_tensor_slices((datasetVal[0], datasetVal[1]))
validation_dataset = validation_dataset.map(load_image_test)
validation_dataset = validation_dataset.shuffle(BUFFER_SIZE)
validation_dataset = validation_dataset.batch(1)

Loaded (241, 256, 256, 3) (241, 256, 256, 1)


In [0]:
if not os.path.isdir('/content/drive/My Drive/ImageToDEM/ncGAN/models'):
  os.mkdir('/content/drive/My Drive/ImageToDEM/ncGAN/models')
if not os.path.isdir('/content/drive/My Drive/ImageToDEM/ncGAN/models/pretty'):
  os.mkdir('/content/drive/My Drive/ImageToDEM/ncGAN/models/pretty')
if not os.path.isdir('/content/drive/My Drive/ImageToDEM/ncGAN/models/results'):
  os.mkdir('/content/drive/My Drive/ImageToDEM/ncGAN/models/results')

In [17]:
fit(train_dataset, EPOCHS, pretty_dataset)

INFO:tensorflow:Assets written to: /content/drive/My Drive/ImageToDEM/ncGAN/models/assets
Model saved
Time taken for epoch 300 is 25.077869653701782 sec

Estimated Time left for remaining 0 epochs is 0.0 sec = 0.0 hours



In [18]:
# Run the trained model on the entire test dataset
for inp, tar in test_dataset:
  generate_images(generator, inp, tar)

Output hidden; open in https://colab.research.google.com to view.