In [None]:
#loading data
x_full_na = np.loadtxt(os.path.join(data_dir,'x_full.dat'), delimiter=',')
y_full_na = np.loadtxt('y_full.dat', delimiter=',')

print(x_full_na.shape)

In [None]:
#variational autoencoders
from keras.layers import Lambda, Input, Dense
from keras.models import Model

from keras.losses import mse, binary_crossentropy
from keras.utils import plot_model
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_curve
from keras.layers import Input, Dense, Lambda, Flatten, Reshape
import numpy as np
import matplotlib.pyplot as plt
import argparse
import os

def sampling(args):
  z_mean, z_log_var = args
  batch = K.shape(z_mean) [0]
  dim = K.int_shape(z_mean) [1]
  epsilon = K.random_normal(shape=(batch, dim)) #by default, random_normal has mean = 0 and std = 1.0

  return z_mean + K.exp(0.5 * z_log_var) * epsilon

# get dataset
X_train, X_test, y_train, y_test = train_test_split(x_full_na,y_full_na,test_size=0.2, shuffle=True)
X_train = X_train.reshape(X_train.shape[0],X_train.shape[1],1)
X_test = X_test.reshape(X_test.shape[0],X_test.shape[1],1)

print(X_train.shape)
# reshape and normalization
image_size = X_train.shape[1]
original_dim = image_size 
x_train = X_train /255
x_test = X_test /255
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255


# network parameters and learning parameters
batch_size = 32
original_shape = X_train.shape[1:]
original_dim = 1006
latent_dim = 4
intermediate_dim = 128
final_dim = 64
epochs = 50
epsilon_std = 1.0


# encoder model
in_layer = Input(shape=original_shape)
x = Flatten()(in_layer)
h = Dense(intermediate_dim, activation='relu')(x)
h = Dense(final_dim, activation = 'relu')(h)
z_mean = Dense(latent_dim)(h)
z_log_var = Dense(latent_dim)(h)

# sampling 
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var]) 

# instantiate encoder model
decoder_f = Dense(final_dim, activation='relu')
decoder_h = Dense(intermediate_dim, activation='relu')
decoder_mean = Dense(original_dim, activation='sigmoid')

f_decoded = decoder_f(z)
h_decoded = decoder_h(f_decoded)
x_decoded_mean = decoder_mean(h_decoded)
x_decoded_img = Reshape(original_shape)(x_decoded_mean)

# instantiate VAE model
vae = Model(in_layer, x_decoded_img)

# Compute VAE loss
xent_loss = original_dim * binary_crossentropy(x, x_decoded_mean)
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
vae_loss = K.mean(xent_loss + kl_loss)

vae.add_loss(vae_loss)
vae.compile(optimizer='rmsprop')
vae.summary()




In [None]:
anomaly_test = x_train[y_train==1]

vae.fit(x_train[y_train==0],
        shuffle=True,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(anomaly_test, None))

In [None]:
# VAE loss



# plot loss history
from sklearn.metrics import roc_auc_score, roc_curve
mse_score = np.concatenate([model_mse(X_test), model_mse(anomaly_test)],0)
true_label = [0]*X_test.shape[0]+[1]*anomaly_test.shape[0]
if roc_auc_score(true_label, mse_score)<0.5:
    mse_score *= -1
fpr, tpr, thresholds = roc_curve(true_label, mse_score)
auc_score = roc_auc_score(true_label, mse_score)
fig, ax1 = plt.subplots(1, 1, figsize = (8, 8))
ax1.plot(fpr, tpr, 'b.-', label = 'ROC Curve (%2.2f)' %  auc_score)
ax1.plot(fpr, fpr, 'k-', label = 'Random Guessing')
ax1.legend();


In [None]:
# Visualization of latent space
z_mean, _, _ = encoder.predict(x_test, batch_size=batch_size)
plt.figure(figsize=(12, 10))
plt.scatter(z_mean[:, 0], z_mean[:, 1])
plt.xlabel("z[0]")
plt.ylabel("z[1]")
plt.title('Test Data Latent Space')
plt.show()

In [None]:
# Visualization of latent space
z_mean, _, _ = encoder.predict(x_train, batch_size=batch_size)
plt.figure(figsize=(12, 10))
plt.scatter(z_mean[:, 0], z_mean[:, 1])
plt.xlabel("z[0]")
plt.ylabel("z[1]")
plt.title('Train Data Latent Space')
plt.show()