In [2]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.utils import plot_model
import os
# import matplotlib.pyplot as plt
# import cv2
# import matplotlib.image as mpimg
import time
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.preprocessing import image_dataset_from_directory

from tensorflow.keras import backend as K

from PIL import Image
import pandas as pd
import csv
import plotly.graph_objects as go
print(tf.__version__)
from tensorflow.python.client import device_lib
# print(device_lib.list_local_devices())
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

2.3.0
Num GPUs Available:  1


In [3]:
# image processing methods
def process_image(imgtensor):
    imgtensor *= (1/255) #
    return imgtensor

def deprocess_image(imgtensor):
    imgtensor *= 255
    imgtensor = np.clip(imgtensor, 0, 255)
    return imgtensor



In [10]:
#set up training tf.data.Dataset so that sample weights can be given as model input
train_dir = 'D:/Datasets/VAE_zeroshot/data_full/processed/train' #your images must be inside a subfolder in the last folder of datadir. if using multiple classes, put each class in one subfolder of the train_data folder. also put validation and test sets each in their own subfolder at the same level as the training folder
validation_dir = 'D:/Datasets/VAE_zeroshot/data_full/processed/test' #your images must be inside a subfolder in the last folder of datadir. if using multiple classes, put each class in one subfolder of the train_data folder. also put validation and test sets each in their own subfolder at the same level as the training folder
#punctured dataset:
# train_dir = 'D:/Datasets/VAE_zeroshot/data_punctured_1p25-1_R0p3_RESIZE/train' #your images must be inside a subfolder in the last folder of datadir. if using multiple classes, put each class in one subfolder of the train_data folder. also put validation and test sets each in their own subfolder at the same level as the training folder
# validation_dir = 'D:/Datasets/VAE_zeroshot/data_punctured_1p25-1_R0p3_RESIZE/test' #

batch_size = 8


#create images dataset
#set shuffle to False since this dataset must match up with the weights dataset (which isnt shuffled)
img_dataset_train = image_dataset_from_directory(
    train_dir, shuffle = False, image_size=(256, 256), batch_size=8, color_mode='grayscale', validation_split=None, label_mode=None)

#rescale pixel values
img_dataset_train = img_dataset_train.map(process_image)
print(img_dataset_train.cardinality()) #set this value as the steps_per_epoch in .fit


#set up validation dataset

#create images dataset
img_dataset_validation = image_dataset_from_directory(
    validation_dir, shuffle = False, image_size=(256, 256), batch_size=8, color_mode='grayscale', validation_split=None, label_mode=None)

#rescale pixel values
img_dataset_validation = img_dataset_validation.map(process_image)
print(img_dataset_validation.cardinality()) #set this value as the number of validation steps in .fit

Found 4499 files belonging to 1 classes.
tf.Tensor(563, shape=(), dtype=int64)
Found 501 files belonging to 1 classes.
tf.Tensor(63, shape=(), dtype=int64)


In [4]:
#Build encoder model
latent_dim = 3 #dimensionality of latent space

encoder_input = Input(shape=(256, 256, 1), name='encoder_input')

x = layers.Conv2D(64, 3, strides=(1, 1), activation='relu', padding='valid', kernel_initializer='RandomNormal',  bias_initializer='zeros')(encoder_input)

x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Conv2D(64, 3, strides=(1, 1), activation='relu', padding='valid', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)

x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Conv2D(128, 3, strides=(1, 1), activation='relu', padding='valid', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)

x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Conv2D(128, 3, strides=(1, 1), activation='relu', padding='valid', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)

x = layers.MaxPooling2D(2, padding='same')(x)
x = layers.Flatten(data_format='channels_last')(x)

z_mean = layers.Dense(latent_dim, activation='linear', name='z_mean', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)
#NOTE: activation function returning z_log_var needs to produce UNBOUNDED values. You cant use say sigmoid since this would force the variance to be a minimum equal to 1 and at max exp(1)


#constant variance sampling layer:
class Sample_latent_vector(keras.layers.Layer):
    def sample(self, inputs):
        sigma = 0.1
        z_mean = inputs
        epsilon = K.random_normal(shape=K.shape(z_mean))
        #output is batch of latent vectors
        return z_mean + epsilon*sigma
    
    def call(self, inputs):
        return self.sample(inputs)
    
z = Sample_latent_vector()(z_mean)

# Build Decoder network

x = layers.Dense(16384, activation='relu', use_bias=False, kernel_initializer='RandomNormal')(z)
x = layers.Reshape((32, 32, 16))(x)
x = layers.UpSampling2D(2, name='decoderupsample1')(x)
x = layers.Conv2DTranspose(64, 3, strides=(1, 1), activation='relu', padding='same', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)
x = layers.BatchNormalization(axis=-1)(x)
x = layers.UpSampling2D(2)(x)
x = layers.Conv2DTranspose(64, 3, strides=(1, 1), activation='relu', padding='same', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)
x = layers.BatchNormalization(axis=-1)(x)
x = layers.UpSampling2D(2)(x)
x = layers.Conv2DTranspose(128, 3, strides=(1, 1), activation='relu', padding='same', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)
x = layers.BatchNormalization(axis=-1)(x)
x = layers.Conv2DTranspose(256, 1, strides=(1, 1), activation='relu', padding='same', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)
x = layers.BatchNormalization(axis=-1)(x)
z_decoded = layers.Conv2DTranspose(1, 1, strides=(1, 1), activation='sigmoid', padding='same', kernel_initializer='RandomNormal',  bias_initializer='zeros')(x)


# make custom loss function by subclassing Layer and using add_loss
class Add_VAE_Loss(keras.layers.Layer):
    
    def compute_entanglement_loss_component(self, inputs):
        z_mean = inputs
        C_total = 0 #total 'absolute covariance' for this batch
        #cycle through each pair of latent space dimensions and compute covariance-like value. Sum over all pairs to get total covariance
        for i in range(latent_dim):
            for j in range(i+1, latent_dim):
                C_total += K.mean(K.abs(z_mean[:,i]*z_mean[:,j]), axis=0) # sum over all z_mean vectors in the batch 
        
        return C_total*2/(latent_dim*(latent_dim-1)) #we normalize by the number of elements in the upper triangular part of the cov matrix     
    
    def compute_VAE_loss(self, inputs):
        #reasonable values for coeffs: K.mean(0.001*KL_loss + 1.0*recon_loss + 0.0001*entanglement_loss, axis=0)
        sigma = 0.1
        input_image, z_mean, output_image = inputs
        KL_loss = K.mean(K.square(z_mean) + K.exp(K.log(K.square(sigma))) - K.log(K.square(sigma)) - 1, axis=1)
        entanglement_loss = self.compute_entanglement_loss_component(z_mean)
        recon_loss = keras.metrics.binary_crossentropy(K.flatten(input_image), K.flatten(output_image))
        return K.mean(0.001*KL_loss + 1.0*recon_loss + 0.0001*entanglement_loss, axis=0) #sample weight is provided as an output from the dataset (generator)
              
    def call(self, layer_inputs):
        if not isinstance(layer_inputs, list):
            ValueError('Input must be list of [encoder_input, z_mean, z_decoded]')
        loss = self.compute_VAE_loss(layer_inputs)
        self.add_loss(loss)
        return layer_inputs[2]
    
#Make VAE model 
outputs = Add_VAE_Loss()([encoder_input, z_mean, z_decoded])
VAE = Model(encoder_input, outputs, name='VAE')
VAE.summary()

VAE.compile(loss=None,
              optimizer=keras.optimizers.Adam(learning_rate=0.0001, beta_1=0.9, beta_2=0.999, amsgrad=False), metrics = ['binary_crossentropy'])


Model: "VAE"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
encoder_input (InputLayer)      [(None, 256, 256, 1) 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 254, 254, 64) 640         encoder_input[0][0]              
__________________________________________________________________________________________________
max_pooling2d (MaxPooling2D)    (None, 127, 127, 64) 0           conv2d[0][0]                     
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 125, 125, 64) 36928       max_pooling2d[0][0]              
________________________________________________________________________________________________

In [7]:
checkpoint_path = r'C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_const_var_Z3D'
#Make checkpoint callback 
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=False,
    monitor='val_loss',
    mode='min',
    save_best_only=True)

VAE.fit(img_dataset_train,
        shuffle=True,
        steps_per_epoch=538,#steps_per_epoch needs to equal exactly the number of batches in the Dataset generator
        epochs=10,
        verbose = 2,
        validation_data=img_dataset_validation,
        validation_steps=27,#=#batches in validation dataset
        callbacks=[model_checkpoint_callback])



Epoch 1/10
INFO:tensorflow:Assets written to: C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_const_var_Z3D\assets
538/538 - 48s - loss: 0.0808 - binary_crossentropy: 0.0000e+00 - val_loss: 0.0612 - val_binary_crossentropy: 0.0000e+00
Epoch 2/10
INFO:tensorflow:Assets written to: C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_const_var_Z3D\assets
538/538 - 48s - loss: 0.0557 - binary_crossentropy: 0.0000e+00 - val_loss: 0.0462 - val_binary_crossentropy: 0.0000e+00
Epoch 3/10
INFO:tensorflow:Assets written to: C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_const_var_Z3D\assets
538/538 - 48s - loss: 0.0436 - binary_crossentropy: 0.0000e+00 - val_loss: 0.0378 - val_binary_crossentropy: 0.0000e+00
Epoch 4/10
INFO:tensorflow:Assets written to: C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_const_var_Z3D\assets
538/538 - 48s - loss: 0.0360 - binary_crossentropy: 0.0000e+00 - val_loss: 0.0321 - val_binary_crossentropy: 0.0000e+00
Epoch 5/10
INFO:tensorflow:Assets wr

<tensorflow.python.keras.callbacks.History at 0x1ea6d6ffd48>

In [5]:
VAE = keras.models.load_model(r"C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\VAE_full_entanglement_loss_const_var")

In [72]:
# make decoder model by copying VAE layers to a new model
VAE.layers[16]
decoder_input = Input(shape = (latent_dim), name='latent_vector')
x = decoder_input
for layer in VAE.layers[17:-1]:
    print(layer.name)
    x = layer(x)
    
decoder = Model(decoder_input, x, name='decoder')
decoder.summary()

dense_30
reshape_5
decoderupsample1
conv2d_transpose_25
batch_normalization_96
up_sampling2d_10
conv2d_transpose_26
batch_normalization_97
up_sampling2d_11
conv2d_transpose_27
batch_normalization_98
conv2d_transpose_28
batch_normalization_99
conv2d_transpose_29
Model: "decoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
latent_vector (InputLayer)   [(None, 12)]              0         
_________________________________________________________________
dense_30 (Dense)             (None, 16384)             196608    
_________________________________________________________________
reshape_5 (Reshape)          (None, 32, 32, 16)        0         
_________________________________________________________________
decoderupsample1 (UpSampling (None, 64, 64, 16)        0         
_________________________________________________________________
conv2d_transpose_25 (Conv2DT (None, 64, 64, 64)        9280  

In [7]:
#make ENCODER that skips the sampling layer and just returns z_mean

VAE.layers[14]
detached_encoder_input = Input(shape=(256, 256, 1), name='detached_encoder_input')
x = detached_encoder_input #
for layer in VAE.layers[1:11]:#[1:15]we must skip the VAE input layer (for loop starts at layer 1 instead of 0) as it causes problems to use the same input layer in two separate models
    print(layer.name)
    x = layer(x)

encoder = Model(detached_encoder_input, x, name='encoder')
encoder.summary()

conv2d
max_pooling2d
conv2d_1
max_pooling2d_1
conv2d_2
max_pooling2d_2
conv2d_3
max_pooling2d_3
flatten
z_mean
Model: "encoder"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
detached_encoder_input (Inpu [(None, 256, 256, 1)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 254, 254, 64)      640       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 127, 127, 64)      0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 125, 125, 64)      36928     
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 63, 63, 64)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 61, 61, 128)       73856     
______________

In [7]:
#create Model for inference which skips the sample_latent_vector layer and just uses z_mean as the latent vector
VAE_2_input = Input(shape=(256, 256, 1), name='VAE_2_input')
VAE_2 = Model(VAE_2_input, decoder(encoder(VAE_2_input)), name="VAE_sampless")
VAE_2.summary()

(1, 256, 256, 1)

In [10]:
#PREDICT images 

#visualize image reconstructions
# train_sample = dataset_with_weights_train.take(24)
validation_sample = img_dataset_validation.take(24)

xhat = VAE.predict(validation_sample, steps=3)
# xhat = VAE_2.predict(train_sample, steps=3)
xhat = deprocess_image(xhat)
xhat = xhat.astype(np.uint8)
xhat.shape
for j in range(20):
    pil_im = Image.fromarray(xhat[j,:,:,0],mode='L') # convert each slice of predicted batch to a PIL image object  
    pil_im.save(r'C:\Users\MrLin\Documents\Experiments\VAE_zeroshot\Results\pred_' + f'{j+4300:02}' + '.png')


In [11]:
# create array of latent vectors to predict. They should lie on one axis of the latent space. Ie all zeros except for the axis being varied
# data_sample = img_dataset_train.take(530)
Z_data_sample = encoder.predict(img_dataset_train)
Z_data_sample.shape

(4499, 3)

In [12]:
fig2 = go.Figure(data=[go.Scatter3d(
    name='training images',
    x=Z_data_sample[:,0],
    y=Z_data_sample[:,1],
    z=Z_data_sample[:,2], 
    mode='markers',
    marker=dict(
        size=1.5, color='black',symbol='circle')
)])

fig2.update_layout(
    width=900,
    height=700,
    scene_xaxis_title_text='\u03BC<sub>1</sub>',
    scene_yaxis_title_text="\u03BC<sub>2</sub>",
    scene_zaxis_title_text="\u03BC<sub>3</sub>",

    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.17,
        font_size=16
    ),
    scene_xaxis = dict(tickfont=dict(size=12),
                title_font_size=20),
    scene_yaxis = dict(tickfont=dict(size=12),
                title_font_size=20),
    scene_zaxis = dict(tickfont=dict(size=12),
                title_font_size=20),
)

fig2.show()