# Architecture Variational Autoencoder
Train a variational autoencoder then test it with a view samples to see the reconstruction.

In [None]:
import matplotlib.pyplot as matPlt
import numpy as np

import tensorflow as tf
import keras
from keras import layers
from tqdm.keras import TqdmCallback

import rasterio

import junodch_utils_read_img as utils

# Data preparation
### Fetch data from file

In [None]:
folderName = "img/Sokoto/"
pathSatellite = folderName + "Sentinel-2.tif"
pathNight = folderName + "Night_VIIRS.tif"
pathValidation = folderName + "Population_GHSL.tif"

aoi = utils.getImgBorder(pathSatellite)

# Fetch coords
dataCoords, dataRadiance = utils.getTilesCoordsPerimeter(pathNight, area=aoi)

trainMask = dataRadiance>0.2
lightCoords = dataCoords[trainMask]

print('Tiles:',dataCoords.shape[0])
print('Light Tile:',lightCoords.shape[0])

#### Fetch images

In [None]:
with rasterio.open(pathSatellite) as f:
  trainData, _ = utils.getEachImgFromCoord(f, lightCoords, True)
trainData = utils.formatData(trainData, res=64, toFloat=True)
print(trainData.shape)

# Variational Autoencoder

In [None]:
# Input encoder
input_shape = keras.Input(shape=trainData.shape[1:])

optimizer = keras.optimizers.Adam(
  learning_rate=0.001,
  beta_1=0.9,
  beta_2=0.999,
)

lossFunction = keras.losses.MeanSquaredError() # l2

activationFunction = 'relu'

earlyStop = tf.keras.callbacks.EarlyStopping(monitor='loss', min_delta=0, patience=3)

'''
def loss_func(encoder_mu, encoder_log_variance):
  def vae_reconstruction_loss(y_true, y_predict):
    reconstruction_loss_factor = 1000
    reconstruction_loss = keras.backend.mean(keras.backend.square(y_true-y_predict), axis=[1, 2, 3])
    return reconstruction_loss_factor * reconstruction_loss

  def vae_kl_loss(encoder_mu, encoder_log_variance):
    kl_loss = -0.5 * keras.backend.sum(1.0 + encoder_log_variance - keras.backend.square(encoder_mu) - keras.backend.exp(encoder_log_variance), axis=1)
    return kl_loss

  def vae_kl_loss_metric(y_true, y_predict):
    kl_loss = -0.5 * keras.backend.sum(1.0 + encoder_log_variance - keras.backend.square(encoder_mu) - keras.backend.exp(encoder_log_variance), axis=1)
    return kl_loss

  def vae_loss(y_true, y_predict):
    reconstruction_loss = vae_reconstruction_loss(y_true, y_predict)
    kl_loss = vae_kl_loss(encoder_mu, encoder_log_variance)

    loss = reconstruction_loss + kl_loss
    return loss

  return vae_loss
  
lossFunction = loss_func(encoder_mu, encoder_logvar)
'''

class Sampling(layers.Layer):
  def call(self, inputs):
    mu, log_variance = inputs
    epsilon = tf.keras.backend.random_normal(shape=tf.keras.backend.shape(mu), mean=0.0, stddev=1.0)
    return mu + tf.keras.backend.exp(log_variance/2) * epsilon

latent_space_dim = 8*8*16

# mu encoder
cnn = layers.Conv2D(32,(3,3), 2, padding='same', activation=activationFunction)(input_shape)
cnn = layers.Conv2D(32,(3,3), 2, padding='same', activation=activationFunction)(cnn)
encoded = layers.Conv2D(16,(3,3), 2, padding='same', activation=activationFunction, name='displayable_encoder')(cnn)

shape_before_flatten = keras.backend.int_shape(encoded)[1:]

encoder_mu = layers.Flatten(name='encoder')(encoded)

# logvar encoder
cnn = layers.Conv2D(32,(3,3), 2, padding='same', activation=activationFunction)(input_shape)
cnn = layers.Conv2D(32,(3,3), 2, padding='same', activation=activationFunction)(cnn)
encoded = layers.Conv2D(16,(3,3), 2, padding='same', activation=activationFunction)(cnn)
encoder_logvar = layers.Flatten()(encoded)

encoder_output = Sampling()([encoder_mu, encoder_logvar])

decoder_dense_layer = layers.Dense(np.prod(shape_before_flatten), name="decoder_dense")(encoder_output)
decoder_reshape = layers.Reshape(target_shape=shape_before_flatten)(decoder_dense_layer)

cnn = layers.Conv2DTranspose(32,(3,3),2, padding='same', activation=activationFunction)(decoder_reshape)
cnn = layers.Conv2DTranspose(32,(3,3),2, padding='same', activation=activationFunction)(cnn)
decoder = layers.Conv2DTranspose(3, (3,3),2, padding='same', activation='sigmoid', name='decoder')(cnn)

autoencoder = keras.Model(input_shape, decoder)

autoencoder.compile(optimizer=optimizer, loss=lossFunction)

print('Encoder shape:',autoencoder.get_layer('encoder').output_shape)

result = autoencoder.fit(trainData, trainData,
                          epochs=5,
                          batch_size=2,
                          shuffle=True,
                          verbose=0,
                          callbacks=[
                            TqdmCallback(verbose=1), # Concise display progression
                            earlyStop,
                          ],
                        )


In [None]:
matPlt.plot(result.history['loss'][:], label='Training')
autoencoder.summary()

In [None]:
print("Test display some tiles")

indexesTest = [*np.argwhere(trainMask)[100:600:25].flatten(), *range(0,100000,5000)]

with rasterio.open(pathSatellite) as f:
  dataTest, _ = utils.coordsToImgsFormated(f, dataCoords[indexesTest], res=64)

utils.displayAutoEncoderResults(autoencoder, dataTest, showDetail=0, precision=5)

In [None]:
autoencoder.save('model/var_autoencoder_64px_encoder_1024')