In [1]:
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import keras.backend as K
from sklearn.utils import shuffle


## Load Data

In [2]:
dataset = pd.read_csv('datasets/Fs2_B_DANCE_WALK_KIN_0.5sec.csv')

### Remove Emotions from dataset & Split into Train/Test sets

In [3]:
dataset = dataset.drop(columns=['EMOTION_P', 'EMOTION_A', 'EMOTION_D'])

train_dataset = dataset.sample(frac=0.8, random_state=42)
test_dataset = dataset.drop(train_dataset.index)

print("No Training Samples:",train_dataset.shape[0])
print("No Test Samples:",test_dataset.shape[0])

train_dataset = shuffle(train_dataset)
test_dataset = shuffle(test_dataset)

train_dataset = np.asarray(train_dataset)
test_dataset = np.asarray(test_dataset)

x_train = train_dataset.reshape((len(train_dataset), np.prod(train_dataset.shape[1:])))
x_test = test_dataset.reshape((len(test_dataset), np.prod(test_dataset.shape[1:])))

print(len(x_train[0]))

No Training Samples: 39539
No Test Samples: 9885
20


## Build Model

### Settings

In [4]:
latent_dim = 5
intermediate_dim = 10
input_shape = (dataset.shape[1],)

### Sampler

In [5]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        
        batch = tf.shape(z_mean)[0]
        
        dim = tf.shape(z_mean)[1]
        
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

    
def sample_point_from_normal_distribution(args):
    mu, log_variance = args
    epsilon = K.random_normal(shape=K.shape(mu), mean=0.0, stddev=1.0)
    sampled_point = mu + K.exp(log_variance / 2) * epsilon
    return sampled_point

### Encoder

In [6]:
encoder_inputs = keras.Input(shape=input_shape)
x = layers.Dense(intermediate_dim, activation="relu")(encoder_inputs)

z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)

z = Sampling()([z_mean, z_log_var])

encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 20)]         0           []                               
                                                                                                  
 dense (Dense)                  (None, 10)           210         ['input_1[0][0]']                
                                                                                                  
 z_mean (Dense)                 (None, 5)            55          ['dense[0][0]']                  
                                                                                                  
 z_log_var (Dense)              (None, 5)            55          ['dense[0][0]']                  
                                                                                            

### Decoder

In [7]:
latent_inputs = keras.Input(shape=(latent_dim,))

x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)

decoder_outputs = layers.Dense(input_shape[0], activation="relu")(x)

decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 5)]               0         
                                                                 
 dense_1 (Dense)             (None, 10)                60        
                                                                 
 dense_2 (Dense)             (None, 20)                220       
                                                                 
Total params: 280
Trainable params: 280
Non-trainable params: 0
_________________________________________________________________


### Define VAE as a model with a custom train_step

In [20]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            
            reconstruction = self.decoder(z)
            mse = tf.keras.losses.MeanSquaredError()
            reconstruction_loss = mse(data, reconstruction)

            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            
            total_loss = reconstruction_loss + kl_loss
            
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


## Train VAE

In [21]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(dataset, epochs=30, batch_size=128)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.callbacks.History at 0x7fa1ac349f50>

### Test

In [22]:
sample = np.asarray(dataset.iloc[1])
sample = sample.reshape(1,-1)
print(sample)

[[ 0.43891064  0.26525414  0.22958809  0.28302481  0.46816296  0.45200459
   0.35045947  0.3334689   0.286151    0.27875033  0.01188453 -0.14239762
  -0.06731151  0.98744555  0.09445444  0.14805178  0.1248618   0.01411131
   0.01639102  0.02076603  0.01443551  0.0214161   1.08583639  0.77999091
   1.57553756  1.04939322  1.36723691]]


In [23]:
z_mean, z_log_var, z = vae.encoder.predict(sample)

In [24]:
regen = vae.decoder.predict(z)
print(regen)

[[0.31382895 0.33650857 0.30301332 0.44027448 0.40246135 0.40505338
  0.32231513 0.30672076 0.28865004 0.27346662 0.         0.
  0.         0.7126938  0.32364178 0.11008604 0.14605412 0.48309255
  0.33371082 0.51423603 0.43828836 0.31764993 1.4207661  0.745396
  1.5575461  1.154783   0.8909904 ]]


In [10]:
def vae_loss(input_vols, output_vols):
    beta = 1e-7
    kl_loss = tf.keras.backend.sum(-1 - tf.keras.backend.log(tf.keras.backend.exp(z_log_var)) + tf.keras.backend.exp(z_log_var) + tf.keras.backend.square(z_mean))/2
    return tf.keras.backend.mean((input_vols-output_vols)**2) + beta*kl_loss

In [220]:
class VAE:
    def __init__(self, encoder, decoder, encoder_input, z_mean, z_log_var):
        self.encoder = encoder
        self.decoder = decoder
        
        self._model_input = encoder_input
        self.mu = z_mean
        self.log_variance = z_log_var
        
        self.model = None
        self._build_autoencoder()
        
        self.reconstruction_loss_weight = 1000
        
    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()
        
    def compile(self, learning_rate=0.0001):
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=optimizer,
                          loss=self._calculate_combined_loss,
                          metrics=[self._calculate_reconstruction_loss,
                                   self._calculate_kl_loss])
        
    def train(self, train_set, batch_size, num_epochs):
        self.model.fit(train_set, train_set,
                      batch_size = batch_size,
                      epochs=num_epochs,
                      shuffle=True)
        
    def reconstruct(self, samples):
        latent_representations = self.encoder.predict(samples)
        reconstructed = self.decoder.predict(latent_representations)
        return reconstructed, latent_representations
    

    
    def _build_autoencoder(self):
        model_output = self.decoder(self.encoder(self._model_input))
        self.model = keras.Model(self._model_input, model_output, name="AutoEncoder")
        return
    
    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        print("HIIIIIIIIIIII")

        combined_loss = (self.reconstruction_loss_weight * reconstruction_loss) + kl_loss
        print(combined_loss)
        return combined_loss
    
    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        error = y_target - y_predicted
        reconstruction_loss = K.mean(K.square(error), axis=1) # this axis may be wrong
        
        return reconstruction_loss
    
    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = -0.5 * K.sum(1 + self.log_variance - K.square(self.mu) - K.exp(self.log_variance), axis=1)
        return kl_loss