# Code to Produce an Output from the Models
Presumably this will need to be transferred to a script-based format but this is the code to turn an input image into a controller action

## Loading Things in

In [1]:
# imports
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Flatten, Input, Dense, Dropout, Lambda, Reshape, MaxPooling2D, LSTM, Reshape
import os
import cv2
import numpy as np

In [2]:
# variables for potential modification
img_height = 128 # this and all below variables should be the same for the trained images and input images
img_width = 128
num_channels = 1
input_shape = (img_height, img_width, num_channels)
z_len = 2048 # the length of the image compression made by the encoder
a_len = 1 # the length of the action vector

In [3]:
# model architectures

# ==========VAE========================

# load the vae (have to make the architecture again, make sure the code below
#   matches the code in the Data Prepper/VAE Trainer)


# ====== Encoder ======
# changing this will make the model exponentially larger or smaller
latent_dim = 2048

# the model (saved in x)
input_img = Input(shape=input_shape, name='encoder_input')
x = Conv2D(64, 3, padding='same', activation='relu')(input_img)
x = MaxPooling2D((2,2), padding = 'same')(x)
x = Dropout(0.2)(x)
x = Conv2D(128, 3, padding='same', activation='relu')(x)
x = MaxPooling2D((2,2), padding = 'same')(x)
x = Dropout(0.2)(x)
x = Conv2D(64, 3, padding='same', activation='relu')(x)
x = MaxPooling2D((2,2), padding = 'same')(x)
x = Dropout(0.2)(x)
x = Conv2D(32, 3, padding='same', activation='relu')(x)

conv_shape = K.int_shape(x) #Shape of conv to be provided to decoder
#Flatten
x = Flatten()(x)
x = Dense(latent_dim*2, activation='relu')(x)

# Two outputs, for latent mean and log variance (std. dev.)
#  Use these to sample random variables in latent space to which inputs are mapped. 
z_mu = Dense(latent_dim, name='latent_mu')(x)   #Mean values of encoded input
z_sigma = Dense(latent_dim, name='latent_sigma')(x)  #Std dev. (variance) of encoded input

#REPARAMETERIZATION TRICK
# Define sampling function to sample from the distribution
# Reparameterize sample based on the process defined by Gunderson and Huang
# into the shape of: mu + sigma squared x eps
#This is to allow gradient descent to allow for gradient estimation accurately. 
def sample_z(args):
    z_mu, z_sigma = args
    eps = K.random_normal(shape=(K.shape(z_mu)[0], K.int_shape(z_mu)[1]))
    return z_mu + K.exp(z_sigma / 2) * eps

# sample vector from the latent distribution
# z is the labda custom layer we are adding for gradient descent calculations
  # using mu and variance (sigma)
z = Lambda(sample_z, output_shape=(latent_dim, ), name='z')([z_mu, z_sigma])

#Z (lambda layer) will be the last layer in the encoder.
# Define and summarize encoder model.
encoder = Model(input_img, [z_mu, z_sigma, z], name='encoder')

# ==== Decoder ====

# decoder takes the latent vector as input
decoder_input = Input(shape=(latent_dim, ), name='decoder_input')

# Need to start with a shape that can be remapped to original image shape as
#we want our final utput to be same shape original input.
#So, add dense layer with dimensions that can be reshaped to desired output shape
x = Dense(conv_shape[1]*conv_shape[2]*conv_shape[3], activation='relu')(decoder_input)
# reshape to the shape of last conv. layer in the encoder, so we can 
x = Reshape((conv_shape[1], conv_shape[2], conv_shape[3]))(x)
# upscale (conv2D transpose) back to original shape
# use Conv2DTranspose to reverse the conv layers defined in the encoder
x = Conv2DTranspose(32, 3, padding='same', activation='relu',strides=(2, 2))(x)
x = Conv2DTranspose(32, 3, padding='same', activation='relu')(x)
x = Conv2DTranspose(64, 3, padding='same', activation='relu',strides=(2, 2))(x)
x = Conv2DTranspose(64, 3, padding='same', activation='relu')(x)
x = Conv2DTranspose(64, 3, padding='same', activation='relu',strides=(2, 2))(x)
#Can add more conv2DTranspose layers, if desired. 
#Using sigmoid activation
x = Conv2DTranspose(num_channels, 3, padding='same', activation='sigmoid', name='decoder_output')(x)

# Define and summarize decoder model
decoder = Model(decoder_input, x, name='decoder')

# apply the decoder to the latent sample 
z_decoded = decoder(z)

# ===== Loss Function =====

class CustomLayer(keras.layers.Layer):
    def vae_loss(self, x, z_decoded):
        x = K.flatten(x)
        z_decoded = K.flatten(z_decoded)
        
        # Reconstruction loss (as we used sigmoid activation we can use binarycrossentropy)
        recon_loss = keras.metrics.binary_crossentropy(x, z_decoded)
        
        # KL divergence
        kl_loss = -5e-4 * K.mean(1 + z_sigma - K.square(z_mu) - K.exp(z_sigma), axis=-1)
        return K.mean(recon_loss + kl_loss)

    # add custom loss to the class
    def call(self, inputs):
        x = inputs[0]
        z_decoded = inputs[1]
        loss = self.vae_loss(x, z_decoded)
        self.add_loss(loss, inputs=inputs)
        return x

# apply the custom loss to the input images and the decoded latent distribution sample
y = CustomLayer()([input_img, z_decoded])
# y is basically the original image after encoding input img to mu, sigma, z
# and decoding sampled z values.
#This will be used as output for vae

# ===========RNN===========================

# Layers
input_to_rnn = Input(shape=(1,z_len))

x = LSTM(z_len, return_sequences=True)(input_to_rnn)
x = Dropout(0.2)(x)
x = Dense(z_len)(x)
x = Dropout(0.2)(x)

rnn_output = Dense(2048, activation='sigmoid')(x)

# ============Controller=========================

# Layers

input_to_controller = Input(shape=(1, z_len*2))

x = Dense(z_len)(input_to_controller)
x = Dropout(0.2)(x)
x = Dense(z_len/2)(x)
x = Dropout(0.2)(x)
x = Dense(z_len/4)(x)
x = Dropout(0.2)(x)
x = Dense(z_len/16)(x)
x = Dropout(0.2)(x)

ctrl_output = Dense(a_len, activation='sigmoid')(x)

In [4]:
# model loading
# load encoder
vae = Model(input_img, y, name = 'vae')
vae.load_weights(os.getcwd() + "\\models\\vae.h5")
encoder = Model(vae.input, vae.layers[15].output)
# load rnn and controller
rnn = Model(input_to_rnn, rnn_output, name = 'rnn')
rnn.load_weights(os.getcwd() + "\\models\\rnn.h5")
ctrl = Model(input_to_controller, ctrl_output, name = 'controller')
ctrl.load_weights(os.getcwd() + "\\models\\cntrl.h5")

## Input/Output

In [5]:
# right now, will need to have saved an image to run it through the models
#     edit img_path for desired image to process
img_path = os.getcwd() + "\\images\\2021-02-27\\2021-02-27-1350-01-24-22.NEF"
# loading in image and reshaping
img_array = cv2.imread(img_path)
img_array = cv2.cvtColor(img_array, cv2.COLOR_BGR2GRAY)
img_array = cv2.resize(img_array, (img_height, img_width))
img_array = img_array.reshape(-1, img_height, img_width, 1) # this 1 might need to be num_channels, but I'm not sure. If num_channels is ever not set to 1, keep an eye on this

In [29]:
# predictions by each piece of the model
z = encoder.predict(img_array) # encode image
z = z.reshape(-1, 1, 2048) # reshape for rnn
zprime = rnn.predict(z) # make prediction
z_and_zprime = np.reshape(np.concatenate((z[0][0], zprime[0][0])), (1, z_len*2))[None,:,:] # concat for controller
action = ctrl.predict(z_and_zprime)

(1, 1, 4096)


In [30]:
# results
print("z:", z)
print("z':", zprime)
print("action:", action)

z: [[[ 23.53091  114.87907  102.02956  ... -94.815125 -53.232956 108.08724 ]]]
z': [[[0.16432846 0.29797998 0.32640016 ... 0.4681464  0.48924953 0.26297385]]]
action: [[[1.]]]
