In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from keras.layers import Lambda, Input, Dense
from keras.models import Model
from keras.datasets import mnist
from keras.losses import mse, binary_crossentropy
from keras import backend as K

from numpy import genfromtxt
from numpy import savetxt

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.style as style
from mpl_toolkits import mplot3d
from mpl_toolkits.mplot3d import Axes3D

from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint

import os
from tensorflow.keras.losses import mse, binary_crossentropy
from tensorflow.keras.utils import plot_model
from google.colab import drive

from keras.models import Sequential
from sklearn.metrics import accuracy_score
from keras.utils import to_categorical

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# reparameterization trick
# instead of sampling from Q(z|X), sample epsilon = N(0,I)
# z = z_mean + sqrt(var) * epsilon
def sampling(args):
    """Reparameterization trick by sampling from an isotropic unit Gaussian.
    # Arguments
        args (tensor): mean and log of variance of Q(z|X)
    # Returns
        z (tensor): sampled latent vector
    """

    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    # by default, random_normal has mean = 0 and std = 1.0
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon


def plot_results(models,
                 data,
                 batch_size=128,
                 model_name="vae_mnist"):
    """Plots labels and MNIST digits as a function of the 2D latent vector
    # Arguments
        models (tuple): encoder and decoder models
        data (tuple): test data and label
        batch_size (int): prediction batch size
        model_name (string): which model is using this function
    """

    encoder, decoder = models
    x_test, y_test = data
    os.makedirs(model_name, exist_ok=True)

    filename = os.path.join(model_name, "vae_mean.png")
    # display a 2D plot of the FC classes in the latent space
    z_mean, _, _ = encoder.predict(x_test,
                                   batch_size=batch_size)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=y_test)
    #savetxt('/content/drive/MyDrive/z_mean0_2D_Midad_Interdim.csv', z_mean[:,0], delimiter=',')
    #savetxt('/content/drive/MyDrive/z_mean1_2D_Midad_Interdim.csv', z_mean[:,1], delimiter=',')
    #savetxt('/content/drive/MyDrive/labels_dataVAE_2D_Midad_Interdim.csv', y_test, delimiter=',')
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    #plt.xlim(-6,6)
    #plt.ylim(-6,6)
    plt.savefig(filename)
    plt.show()

    filename = os.path.join(model_name, "FCs_over_latent.png")
    # display a 30x30 2D manifold of FCs
    n = 10
    digit_size = 115
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of FC classes in the latent space
    grid_x = np.linspace(-6, 6, n)
    grid_y = np.linspace(-6, 6, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[i * digit_size: (i + 1) * digit_size,
                   j * digit_size: (j + 1) * digit_size] = digit

    plt.figure(figsize=(10, 10))
    start_range = digit_size // 2
    end_range = (n - 1) * digit_size + start_range + 1
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure) #cmap='Greys_r')
    plt.grid(color='w', linewidth=2)
    plt.savefig(filename)
    plt.show()

In [None]:
# MNIST dataset

# load data
my_data = genfromtxt('/content/drive/MyDrive/dataset_FCsim.csv',delimiter=',') # my_data = genfromtxt('FC_multi_ADCNFTD.csv',delimiter=',')
label = genfromtxt('/content/drive/MyDrive/labels.csv',delimiter=',') # label = genfromtxt('labels_ADCNFTD.csv',delimiter=',')

#my_data1 = np.squeeze(my_data[np.where((label==1)|(label==2)),:])
#label1 = label[np.where((label==1)|(label==2))]
my_data1= my_data

# split data in train and test (the data is randomized before )
x_train = my_data1[0:int(len(my_data1)*0.7)]
x_test = my_data1[int(len(my_data1)*0.7)+1:len(my_data1)]


label_1 = label
y_train = label_1[0:int(len(my_data1)*0.7)]
y_test = label_1[int(len(my_data1)*0.7)+1:len(my_data1)]


original_dim = 13225

**VAE TRAINING**

In [None]:
# network parameters
input_shape = (original_dim, )
intermediate_dim = 1028
batch_size = 128
latent_dim = 2
epochs = 10

# VAE model = encoder + decoder
# build encoder model
inputs = Input(shape=input_shape, name='encoder_input')
x = Dense(intermediate_dim, activation='relu')(inputs)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)

# use reparameterization trick to push the sampling out as input
# note that "output_shape" isn't necessary with the TensorFlow backend
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# instantiate encoder model
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()
#plot_model(encoder, to_file='vae_mlp_encoder.png', show_shapes=True)

# build decoder model
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = Dense(original_dim, activation='sigmoid')(x)

# instantiate decoder model
decoder = Model(latent_inputs, outputs, name='decoder')
decoder.summary()
#plot_model(decoder, to_file='vae_mlp_decoder.png', show_shapes=True)

# instantiate VAE model
outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae_mlp')



models = (encoder, decoder)
data = (x_test, y_test)
reconstruction_loss = binary_crossentropy(inputs,outputs)

reconstruction_loss *= original_dim
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
optimizer = Adam(lr=0.004)
vae.compile(optimizer=optimizer) # vae.compile(optimizer='adam')
vae.summary()

from keras.callbacks import History
history = History()
vae.fit(x_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.1,
        callbacks=[history])


**CLASSIFICATOR TRAINING**

In [None]:
num_classes = 5

# Classification model for the latent space
x_train_encoded,_,_ = encoder.predict(x_train, batch_size=batch_size)
x_test_encoded,_,_ = encoder.predict(x_test, batch_size=batch_size)

y_train_encoded = to_categorical(y_train, num_classes)
y_test_encoded = to_categorical(y_test, num_classes)

classifier = Sequential([Dense(64, activation='tanh', input_shape=(latent_dim,)),
                         Dense(32, activation='tanh'), Dense(32, activation='tanh'),
                         Dense(num_classes, activation='softmax')])
optimizer = Adam(lr=0.001)
classifier.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
classifier.fit(x_train_encoded, y_train_encoded, epochs=30, batch_size=32, validation_data=(x_test_encoded,y_test_encoded))

label_pred = classifier.predict(x_test_encoded)
label_pred = np.argmax(label_pred, axis=1)
accuracy = accuracy_score(y_test,label_pred)
print("Classification accuracy: ", accuracy)

**x100**

In [None]:
num_classes = 5
acc_class = []
rep = list(range(1,101))
for i in range(100):
  # Classification model for the latent space
  x_train_encoded,_,_ = encoder.predict(x_train, batch_size=batch_size)
  x_test_encoded,_,_ = encoder.predict(x_test, batch_size=batch_size)

  y_train_encoded = to_categorical(y_train, num_classes)
  y_test_encoded = to_categorical(y_test, num_classes)

  classifier = Sequential([Dense(64, activation='tanh', input_shape=(latent_dim,)),
                          Dense(32, activation='tanh'), Dense(32, activation='tanh'),
                          Dense(num_classes, activation='softmax')])
  optimizer = Adam(lr=0.001)
  classifier.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  classifier.fit(x_train_encoded, y_train_encoded, epochs=30, batch_size=32, validation_data=(x_test_encoded,y_test_encoded))

  label_pred = classifier.predict(x_test_encoded)
  label_pred = np.argmax(label_pred, axis=1)
  accuracy = accuracy_score(y_test,label_pred)
  acc_class.append(accuracy)
  print("Classification accuracy for trial ", i, ': ', accuracy)

legend = ['Correct Classes']
plt.hist([round(x,3) for x in acc_class])
plt.xlabel("Acurracy")
plt.ylabel('#')
plt.legend(legend)
plt.show()

**SAVING DATA**

In [None]:
z_mean, _, _ = encoder.predict(x_test, batch_size=batch_size)

savetxt('/content/drive/MyDrive/z_mean0_2D.csv', z_mean[:,0], delimiter=',')
savetxt('/content/drive/MyDrive/z_mean1_2D.csv', z_mean[:,1], delimiter=',')
savetxt('/content/drive/MyDrive/labels_dataVAE_2D.csv', y_test, delimiter=',')
savetxt('/content/drive/MyDrive/TFG_paper/2D/accuracy.csv', acc_class, delimiter=',')
vae.save_weights('/content/drive/MyDrive/vae_2D_Midad_Interdim.h5')



**NULL HYPOTHESIS**

In [None]:
# MNIST dataset

# load data
my_data = genfromtxt('/content/drive/MyDrive/dataset_FCsim_NH.csv',delimiter=',') # my_data = genfromtxt('FC_multi_ADCNFTD.csv',delimiter=',')
label = genfromtxt('/content/drive/MyDrive/labels_NH.csv',delimiter=',') # label = genfromtxt('labels_ADCNFTD.csv',delimiter=',')

#my_data1 = np.squeeze(my_data[np.where((label==1)|(label==2)),:])
#label1 = label[np.where((label==1)|(label==2))]
my_data1= my_data

# split data in train and test (the data is randomized before )
x_train = my_data1[0:int(len(my_data1)*0.7)]
x_test = my_data1[int(len(my_data1)*0.7)+1:len(my_data1)]


label_1 = label
y_train = label_1[0:int(len(my_data1)*0.7)]
y_test = label_1[int(len(my_data1)*0.7)+1:len(my_data1)]


original_dim = 13225

In [None]:
# network parameters
input_shape = (original_dim, )
intermediate_dim = 1028
batch_size = 128
latent_dim = 2
epochs = 10

# VAE model = encoder + decoder
# build encoder model
inputs = Input(shape=input_shape, name='encoder_input')
x = Dense(intermediate_dim, activation='relu')(inputs)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)

# use reparameterization trick to push the sampling out as input
# note that "output_shape" isn't necessary with the TensorFlow backend
z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# instantiate encoder model
encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
encoder.summary()
#plot_model(encoder, to_file='vae_mlp_encoder.png', show_shapes=True)

# build decoder model
latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
x = Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = Dense(original_dim, activation='sigmoid')(x)

# instantiate decoder model
decoder = Model(latent_inputs, outputs, name='decoder')
decoder.summary()
#plot_model(decoder, to_file='vae_mlp_decoder.png', show_shapes=True)

# instantiate VAE model
outputs = decoder(encoder(inputs)[2])
vae = Model(inputs, outputs, name='vae_mlp')



models = (encoder, decoder)
data = (x_test, y_test)
reconstruction_loss = binary_crossentropy(inputs,outputs)

reconstruction_loss *= original_dim
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5
vae_loss = K.mean(reconstruction_loss + kl_loss)
vae.add_loss(vae_loss)
optimizer = Adam(lr=0.004)
vae.compile(optimizer=optimizer) # vae.compile(optimizer='adam')
vae.summary()

from keras.callbacks import History
history = History()
vae.fit(x_train,
        epochs=epochs,
        batch_size=batch_size,
        validation_split=0.1,
        callbacks=[history])

In [None]:
num_classes = 5
acc_NH = []
rep = list(range(1,101))
for i in range(100):
  # Classification model for the latent space
  x_train_encoded,_,_ = encoder.predict(x_train, batch_size=batch_size)
  x_test_encoded,_,_ = encoder.predict(x_test, batch_size=batch_size)

  y_train_encoded = to_categorical(y_train, num_classes)
  y_test_encoded = to_categorical(y_test, num_classes)

  classifier = Sequential([Dense(64, activation='tanh', input_shape=(latent_dim,)),
                          Dense(32, activation='tanh'), Dense(32, activation='tanh'),
                          Dense(num_classes, activation='softmax')])
  optimizer = Adam(lr=0.001)
  classifier.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
  classifier.fit(x_train_encoded, y_train_encoded, epochs=30, batch_size=32, validation_data=(x_test_encoded,y_test_encoded))

  label_pred = classifier.predict(x_test_encoded)
  label_pred = np.argmax(label_pred, axis=1)
  accuracy = accuracy_score(y_test,label_pred)
  acc_NH.append(accuracy)
  print("Classification accuracy for trial ", i, ': ', accuracy)

legend = ['Random Classes']
plt.hist([round(x,3) for x in acc_NH], color= 'maroon')
plt.xlabel("Acurracy")
plt.ylabel('#')
plt.legend(legend)
plt.show()

In [None]:
z_mean, _, _ = encoder.predict(x_test, batch_size=batch_size)

savetxt('/content/drive/MyDrive/TFG_paper/12D/accuracy_NH.csv', acc_NH, delimiter=',')
#vae.save_weights('/content/drive/MyDrive/TFG_psychosis/Coses_finals/12D/NH/vae_12D_Midad_Interdim.h5')

In [None]:
legend = ['Correct Classes', 'Random Classes']
plt.hist([round(x,3) for x in acc_class])
plt.hist([round(x,3) for x in acc_NH], color= 'maroon')
plt.xlabel("Acurracy")
plt.ylabel('#')
plt.legend(legend)
plt.show()