In [None]:
import tensorflow as tf
from tensorflow.python.framework.ops import disable_eager_execution
import keras
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Input, Flatten, Dense, Lambda, Reshape, Dropout
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.losses import binary_crossentropy
from keras import backend as K
import csv
import math as math
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
from keras.layers.merge import concatenate as concat 
from keras.models import  Model, load_model
from keras.layers import  LeakyReLU 
from keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import plot_model, to_categorical
from keras.losses import mse
import scipy.stats

from sklearn.model_selection import train_test_split
disable_eager_execution()

In [None]:
batch_size = 256
no_epochs = 100
latent_dim = 8 # Dimension of the latent space
nt=256 # Number of points contained in each trajectories
beta = 10 # Value beta for the loss function


In [None]:
filename = '../input/dataset-final/data_traj_cvae_2obj80000_t_3d_finaltest_traite.csv' # Directory containing the coordinates of each point following the x-axis
raw_data = open(filename, 'rt')
reader = csv.reader(raw_data, delimiter=',')
x = list(reader)
data = np.array(x).astype('float')
print(data.shape)


In [None]:
filenamet = '../input/dataset-final/data_traj_cvae_2obj80000_x_3d_finaltest_traite.csv' # Directory containing the coordinates of each point following the y-axis
raw_datat = open(filenamet, 'rt')
readert = csv.reader(raw_datat, delimiter=',')
t = list(readert)
datat = np.array(t).astype('float')
print(datat.shape)


In [None]:
filenamez = '../input/dataset-final/data_traj_cvae_2obj80000_z_3d_finaltest_traite.csv' # Directory containing the coordinates of each point following the z-axis
raw_dataz = open(filenamez, 'rt')
readerz = csv.reader(raw_dataz, delimiter=',')
zc = list(readerz)
dataz = np.array(zc).astype('float')
print(dataz.shape)

# Definition of the labels specifying the end points of the trajectory
labels = np.zeros((80000))
for i in range (0,80000):
  r= dataz[i,255]
  if r == 11.275:
    labels[i]=0
  if r == -8.05 :
    labels[i]=1
     
print(labels)
print(dataz[75000][255])

In [None]:
# Formation of random validation, training, and test datasets
x_temp, x_test, t_temp, t_test, z_temp, z_test, y_temp, y_test = train_test_split(data, datat, dataz, labels, test_size = 0.01)

x_train, x_valid, t_train, t_valid, z_train, z_valid, y_train, y_valid = train_test_split(x_temp, t_temp, z_temp, y_temp, test_size=0.1)

n_train = len(x_train)
n_test = len(x_test)

print(n_train)
print(n_test)
print(y_train.shape)
print(y_train[1])
print(x_train.shape)


In [None]:
# Labels in one-hot-encoding
Y_train= to_categorical(y_train,2)
Y_test= to_categorical(y_test,2)
Y_valid= to_categorical(y_valid,2)
print(Y_train.shape)
ny = Y_train.shape[1]


In [None]:
# Definition of the input layer of the encoder
Xi = Input(shape=(nt,))
Yi= Input(shape=(nt,))
Zi = Input(shape=(nt,))
label = Input(shape=(ny,))
inputs = concat([Xi, Yi, Zi, label])

In [None]:
# Definition Encoder
encoded = Dense(512, activation = 'tanh')(inputs)
encoded = Dense(256, activation = 'tanh')(inputs)
encoded = Dense(128, activation = 'tanh')(encoded)
encoded = Dense(64, activation = 'tanh')(encoded)
encoded = Dense(32, activation = 'tanh')(encoded)
mu      = Dense(latent_dim, name='latent_m')(encoded)
sigma   = Dense(latent_dim, name='latent_sigma')(encoded)

In [None]:
# reparameterization trick
def sample_z(args):
  mu, sigma = args
  batch     = K.shape(mu)[0]
  dim       = K.int_shape(mu)[1]
  eps       = K.random_normal(shape=(batch, dim))
  return mu + K.exp(sigma / 2) * eps

z       = Lambda(sample_z, output_shape=(latent_dim, ), name='z')([mu, sigma])
zc = concat([z,label])

In [None]:
# Instantiate encoder
encoder = Model([Xi,Yi,Zi,label], [mu, sigma, zc], name='encoder')
encoder.summary()

In [None]:
# Definition Decoder
d_i   = Input((latent_dim+ny, ))
decoded = Dense(32, activation='tanh')(d_i)
decoded = Dense (64, activation = 'tanh')(decoded)
decoded = Dense (128, activation = 'tanh')(decoded)
decoded = Dense (256, activation = 'tanh')(decoded)
decoded = Dense (512, activation = 'tanh')(decoded)
oa     = Dense(nt, name='decoder_outputa')(decoded)
ob     = Dense(nt, name='decoder_outputb')(decoded)
oc     = Dense(nt, name='decoder_outputc')(decoded)

In [None]:
# Decoder
decoder = Model(d_i, [oa,ob,oc], name='decoder')
decoder.summary()

In [None]:
#  VAE
vae_outputs = decoder(encoder([Xi,Yi,Zi,label])[2])
vae = Model([Xi,Yi,Zi,label], vae_outputs, name='vae')
vae.summary()

In [None]:
# Define loss
def kl_reconstruction_loss(i, o):
  # Reconstruction loss
  reconstruction_loss = mse(K.flatten(i),K.flatten(o))
  # KL divergence loss
  kl_loss = beta*(1 + sigma - K.square(mu) - K.exp(sigma))
  kl_loss = K.sum(kl_loss, axis=-1)
  kl_loss *= -0.5
  
  return K.mean(reconstruction_loss + kl_loss)

In [None]:
# Compile VAE
vae.compile(optimizer='adam', loss=kl_reconstruction_loss, experimental_run_tf_function=False)

# Train autoencoder
results = vae.fit([t_train, x_train, z_train, Y_train], [t_train,x_train,z_train], epochs = no_epochs, batch_size = batch_size, validation_data = ([t_valid,x_valid,z_valid,Y_valid], [t_valid,x_valid,z_valid]))

In [None]:
# QC training and validation curves (should follow eachother)
plt.figure(figsize=(8,2))
plt.plot(results.history['val_loss'], label='val')
plt.plot(results.history['loss'], label='train')
plt.xlabel('epoch index')
plt.ylabel('loss value ')
plt.legend()
plt.show()

In [None]:
# Trajectory Test 
encoded_test = encoder.predict([t_test,x_test,z_test,Y_test])
vae_test = vae.predict([t_test,x_test,z_test,Y_test])

mpl.rcParams['legend.fontsize'] = 10
fig = plt.figure()
ax = fig.gca(projection='3d')
ax.plot(t_test[150],x_test[150],z_test[150],'o-', label='true signal')  
ax.plot(vae_test[0][150],vae_test[1][150],vae_test[2][150],'o-', label='encoded-decoded signal')

ax.view_init(elev=20., azim=120)

ax.legend()

print(vae_test[0][150][255])
print(vae_test[1][150][255])
print(vae_test[2][150][255])

In [None]:
# Generate random latent space points for trajectory generation

import random
def construct(digit, z = None):
  out = np.zeros((1,latent_dim+ny))
 
  out[0,0]=random.uniform(-10,10)
  out[0,1]=random.uniform(-10,10)
  out[0,2]=random.uniform(-10,10)
  out[0,3]=random.uniform(-10,10)
  out[0,4]=random.uniform(-10,10)
  out[0,5]=random.uniform(-10,10)
  out[0,6]=random.uniform(-10,10)
  out[0,7]=random.uniform(-10,10)

  out[:,digit+latent_dim]=1.
  if z is None:
    return (out)
  else:
    for i in range (len(z)):
      out[:,i]=z[i]
    return out


In [None]:
# Create datasets for the control VAE training 

import csv
x = np.zeros((120000,256))
y = np.zeros((120000,256))
z = np.zeros((120000,256))

xb = np.zeros((120000,256))
yb = np.zeros((120000,256))
zb = np.zeros((120000,256))

lat = np.zeros((120000,10))
latb = np.zeros((120000,10))
for i in range (0,120000):
    
    sample = construct(0)
    sampleb = construct(1)
    #print(sample)
    #print(sampleb)

    s=decoder.predict(sample)
    sb=decoder.predict(sampleb)
    """
    mpl.rcParams['legend.fontsize'] = 10
    fig = plt.figure()
    ax = fig.gca(projection='3d')
    ax.plot(s[0][0],s[1][0],s[2][0],'o-', label=' z=11.275')  
    ax.view_init(elev=20., azim=120)
    ax.legend()
    
    fig2 = plt.figure()
    ax2 = fig2.gca(projection='3d')
    ax2.plot(sb[0][0],sb[1][0],sb[2][0],'o-', label='z=-8.05')
    ax2.view_init(elev=20., azim=120)

    ax2.legend()
    """

    

    x[i,:]= s[0][0]
    y[i,:]= s[1][0]
    z[i,:]= s[2][0]
    xb[i,:]= sb[0][0]
    yb[i,:]= sb[1][0]
    zb[i,:]= sb[2][0]
    
    lat[i,:]=sample
    latb[i,:]=sampleb


file = open ('data_traj_testleg1.csv','w+', newline='')
with file:
    write = csv.writer(file)
    write.writerows(x)
filey = open ('data_trajy_testleg1.csv','w+', newline='')
with filey:
    writey = csv.writer(filey)
    writey.writerows(y)
filez = open ('data_trajz_testleg1.csv','w+', newline='')
with filez:
    writez= csv.writer(filez)
    writez.writerows(z)

file = open ('data_traj_testleg2.csv','w+', newline='')
with file:
    write = csv.writer(file)
    write.writerows(xb)
filey = open ('data_trajy_testleg2.csv','w+', newline='')
with filey:
    writey = csv.writer(filey)
    writey.writerows(yb)
filez = open ('data_trajz_testleg2.csv','w+', newline='')
with filez:
    writez= csv.writer(filez)
    writez.writerows(zb)
    
filelat = open ('latent.csv','w+', newline='')
with filelat:
    write = csv.writer(filelat)
    write.writerows(lat)
filelatb = open ('latentb.csv','w+', newline='')
with filelatb:
    write = csv.writer(filelatb)
    write.writerows(latb)


In [None]:
#Evaluation of trajectories produce with the help of the control VAE

evaluationx = np.zeros((300,256))
evaluationy = np.zeros((300,256))
evaluationz = np.zeros((300,256))
evaluationxb = np.zeros((300,256))
evaluationyb = np.zeros((300,256))
evaluationzb = np.zeros((300,256))

filename = '../input/dataset-final/control.csv'
raw_data = open(filename, 'rt')
reader = csv.reader(raw_data, delimiter=',')
x = list(reader)
ctrl = np.array(x).astype('float')


filename = '../input/dataset-final/controlb.csv'
raw_data = open(filename, 'rt')
reader = csv.reader(raw_data, delimiter=',')
x = list(reader)
ctrlb = np.array(x).astype('float')
s_c=np.zeros((1,10))
s_cb=np.zeros((1,10))
for i in range (0,300):
    for y in range(0,10):
        s_c[0][y]=ctrl[i][y]
        s_cb[0][y]=ctrlb[i][y]
    print(s_c)
    print(s_cb)
    
    s=decoder.predict(s_c)
    sb=decoder.predict(s_cb)
    

    evaluationx[i,:] = s[0][0]
    evaluationy[i,:] = s[1][0]
    evaluationz[i,:] = s[2][0]
    
    evaluationxb[i,:] = sb[0][0]
    evaluationyb[i,:] = sb[1][0]
    evaluationzb[i,:] = sb[2][0]

file = open ('evaluationx.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationx)
    
file = open ('evaluationy.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationy)
    
file = open ('evaluationz.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationz)
    
file = open ('evaluationxb.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationxb)
    
file = open ('evaluationyb.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationyb)
    
file = open ('evaluationzb.csv','w+', newline='')
with file:
    write= csv.writer(file)
    write.writerows(evaluationzb)
