In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time

## Import data

In [2]:
data = pd.read_csv("data_train_log_return.csv", header=None).drop(columns=[0])
data.columns = [f'index{i}' for i in range(1, 5)]
data

Unnamed: 0,index1,index2,index3,index4
0,0.012495,0.011126,0.003252,0.006625
1,0.011439,0.002691,0.001206,0.006947
2,0.000632,0.007277,0.004049,0.000074
3,0.017828,0.028210,0.007758,0.007382
4,0.021115,0.019642,0.009238,0.011499
...,...,...,...,...
741,0.001938,0.008833,0.003927,0.005106
742,0.005003,0.018943,0.003057,0.001988
743,0.007683,0.001958,0.007002,0.006467
744,0.003396,0.001280,0.007621,0.001680


## VAE

In [3]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers




In [4]:
class VAE(tf.keras.Model):
    """Convolutional variational autoencoder."""

    def __init__(self, latent_dim, input_dim=data.shape[1]):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim

        self.encoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(input_dim,)),
                # No activation
                tf.keras.layers.Dense(latent_dim + latent_dim),
            ]
        )

        self.decoder = tf.keras.Sequential(
            [
                tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
                tf.keras.layers.Dense(128, activation="relu"),
                # No activation
                tf.keras.layers.Dense(input_dim)
            ]
        )
        
    @tf.function
    def sample(self, eps=None):
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)

    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean

    def decode(self, z, apply_sigmoid=False):
        logits = self.decoder(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits

In [5]:
def log_normal_pdf(sample, mean, logvar, raxis=1):
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(-.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi), axis=raxis)


def compute_loss(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = model.decode(z)
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    logpx_z = -tf.reduce_sum(cross_ent)
    logpz = log_normal_pdf(z, 0., 0.)
    logqz_x = log_normal_pdf(z, mean, logvar)
    return -tf.reduce_mean(logpx_z + logpz - logqz_x)


@tf.function
def train_step(model, x, optimizer):
    """Executes one training step and returns the loss.

    This function computes the loss and gradients, and uses the latter to
    update the model's parameters.
    """
    with tf.GradientTape() as tape:
        loss = compute_loss(model, x)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [6]:
def generate_samples(model, epoch, test_sample):
    mean, logvar = model.encode(test_sample)
    z = model.reparameterize(mean, logvar)
    predictions = model.sample(z)
    return predictions

In [7]:
from sklearn.model_selection import train_test_split
X_train, X_test = train_test_split(data.values)

In [8]:
X_train, X_test = tf.cast(X_train, dtype=tf.float32), tf.cast(X_test, dtype=tf.float32)

In [9]:
epochs = 100_000
latent_dim = 2
num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
model = VAE(latent_dim)




In [10]:
generate_samples(model, 0, X_test)
optimizer = tf.keras.optimizers.Adam(1e-4)
loss = tf.keras.metrics.Mean()

for epoch in range(1, epochs + 1):
    start_time = time.time()
    train_step(model, X_train, optimizer)
    end_time = time.time()

    loss(compute_loss(model, X_test))
    elbo = -loss.result()
    #display.clear_output(wait=False)
    if epoch % 1000 == 0:
        print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'
            .format(epoch, elbo, end_time - start_time))
    generate_samples(model, epoch, X_test)

Epoch: 1000, Test set ELBO: -276.0724182128906, time elapse for current epoch: 0.0020716190338134766
Epoch: 2000, Test set ELBO: -176.76661682128906, time elapse for current epoch: 0.001003265380859375
Epoch: 3000, Test set ELBO: -135.7589569091797, time elapse for current epoch: 0.002007722854614258
Epoch: 4000, Test set ELBO: -114.4892807006836, time elapse for current epoch: 0.0010046958923339844
Epoch: 5000, Test set ELBO: -101.4852294921875, time elapse for current epoch: 0.001961231231689453
Epoch: 6000, Test set ELBO: -92.65465545654297, time elapse for current epoch: 0.0020291805267333984
Epoch: 7000, Test set ELBO: -86.25782775878906, time elapse for current epoch: 0.001962900161743164
Epoch: 8000, Test set ELBO: -81.4201431274414, time elapse for current epoch: 0.0020067691802978516
Epoch: 9000, Test set ELBO: -77.64749145507812, time elapse for current epoch: 0.002005338668823242
Epoch: 10000, Test set ELBO: -74.62753295898438, time elapse for current epoch: 0.00197505950927

KeyboardInterrupt: 