## 6 Dimensional Variational Autoencoder trained with STELIB library (SVO)
Based on data from the STELIB service developed by the Spanish Virtual Observatory in the framework of the IAU Comission G5 Working Group : Spectral Stellar Libraries
http://svocats.cab.inta-csic.es/stelib/index.php
Data set: http://svocats.cab.inta-csic.es/stelib/index.php?action=search

Adrián García Riber and Francsico Serradilla.
Polytechnic University of Madrid

In [None]:
from astropy.io import fits
import matplotlib.pylab as plt
import numpy as np
from numpy import save

import os
from pathlib import Path

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

import tensorflow as tf
from tensorflow.keras import layers

In [None]:
root = '/Users/adrian/Documents/FITS_Library/stelib'

In [None]:
# Path and name of one file to check the library
file = root+"/stelib_spec_fits_HD268623_moy.fits"

# Print the header
sp = fits.open(file)
print('\n\nHeader of the spectrum :\n\n', sp[0].header, '\n\n')

# Extracting and normalizing the fluxes
flux2 = np.array(sp[0].data)
flux_norm = np.reshape(flux2/(np.nanmax(flux2)), (sp[0].header['NAXIS1']))

# Extracting the wavelengths
wave2 = np.ones(sp[0].header['NAXIS1'], dtype=float)
for i in range(sp[0].header['NAXIS1']):
    wave2[i] = sp[0].header['CRVAL1'] + i*sp[0].header['CDELT1']

# Closing the fits-file
sp.close()
# Plot the spectrum
fig = plt.figure(1, figsize=(12, 8))
plt.plot(wave2, flux_norm)
plt.xlabel('Wavelength [Å]')
plt.ylabel('ADU')
plt.title(file)
plt.show()

In [None]:
# Counting the spectra and printing the spectrum dimension
num = 1
for path, subdirs, files in os.walk(root):
    for nanme in files:
        num += 1
dim1 = sp[0].header['NAXIS1']
print(num)
print(dim1)

In [None]:
# Creating the custom_set with all the spectra and generating labels to enable recovering header information
curves = 0
custom_set = np.zeros((num, dim1))
label_set = np.zeros((num, ), dtype=int)
spectra_set = [''] * num 

for path, subdirs, files in os.walk(root):
    for name in files:
        file = [os.path.join(path, name)]
        str = " " 
        Ffile = (str.join(file))
        route = Path(name)
        Fname = route.with_suffix('')
        Fpng = route.with_suffix('.png')

        data, header = fits.getdata(Ffile, header=True)
        hdu_number = 0
        fits.getheader(Ffile, hdu_number)
        fits_file = Ffile
        
        with fits.open(fits_file, mode='readonly') as hdulist:
            hdulist.info()
            data = np.array(hdulist[0].data)
             
            data_norm = np.reshape(data/(np.nanmax(data)), (sp[0].header['NAXIS1']))

            
            label_set[curves] = curves 
            spectra_set[curves] = name
            for i in range (dim1):
                custom_set[curves,i] = (data_norm[i])
        hdulist.close   
        curves += 1
               
        print ("Spectra loaded:",curves+1, "spectra");


In [None]:
custom_set.shape

In [None]:
label_set = np.asarray(label_set)

In [None]:
label_set.shape

In [None]:
# Augmenting the dataset through the repetition of the spectra
augmentation = 15
custom_set = np.repeat(custom_set, augmentation, axis=0)
label_set = np.repeat(label_set, augmentation, axis=0)

In [None]:
custom_set.shape

In [None]:
label_set.shape

In [None]:
# Generating the data set
dataset = tf.data.Dataset.from_tensor_slices((custom_set, label_set))
dataset.element_spec

In [None]:
# Splitting the data set
x_train,x_test,y_train,y_test=train_test_split(custom_set,label_set,test_size=0.2,random_state=123)

In [None]:
# Defining the VAE sampling function
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim), mean=0., stddev=0.00000001)#0.001_e-3
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
# Defining the layers' dimensions
original_dim = dim1
latent_dim = 6
intermediate_dim = dim1/3
intermediate_dim2 = intermediate_dim/2
intermediate_dim3 = intermediate_dim2/2
intermediate_dim4 = intermediate_dim3/latent_dim

In [None]:
original_dim

In [None]:
# Defining the Encoder
original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
x1 = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
x2 = layers.Dense(intermediate_dim2, activation="relu")(x1)
x3 = layers.Dense(intermediate_dim3, activation="relu")(x2)
x4 = layers.Dense(intermediate_dim4, activation="relu")(x3)



z_mean = layers.Dense(latent_dim, name="z_mean")(x4)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x4)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

In [None]:
encoder.summary()

In [None]:
# Defining the Decoder
latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
x5 = layers.Dense(intermediate_dim4, activation="relu")(latent_inputs)
x6 = layers.Dense(intermediate_dim3, activation="relu")(x5)
x7 = layers.Dense(intermediate_dim2, activation="relu")(x6)
x8 = layers.Dense(intermediate_dim, activation="relu")(x7)


outputs = layers.Dense(original_dim, activation="sigmoid")(x8)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

In [None]:
decoder.summary()

In [None]:
# Creating the model
outputs = decoder(z)
vae6D = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="VAE6D")

In [None]:
vae6D.summary()

In [None]:
# Adding KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae6D.add_loss(kl_loss)

In [None]:
vae6D.summary()

In [None]:
# Training
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-5)
vae6D.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae6D.fit(x_train, x_train, epochs=100, batch_size=2)

In [None]:
# Testing
encoded_test = encoder(x_test)

In [None]:
decoded_test = vae6D(x_test)

In [None]:
decoded_imgs_test=decoded_test.numpy()
decoded_imgs_test

## Ploting some results

In [None]:
figure = 0
fig, ax = plt.subplots(1, 2, figsize=(16, 6))
ax[0].plot(wave2, x_test[figure])
ax[0].set_xlabel('Original Spectra')   
ax[1].plot(wave2, decoded_imgs_test[figure])
ax[1].set_xlabel('Decoded Spectra')

In [None]:
for m in range(20):
    figure = m
    fig, ax = plt.subplots(1, 2, figsize=(16, 6))
    ax[0].plot(wave2, x_test[figure])
    ax[0].set_xlabel('Original Spectra')   
    ax[1].plot(wave2, decoded_imgs_test[figure])
    ax[1].set_xlabel('Decoded Spectra')
plt.close(fig)

In [None]:
# Plotting the relation between original spectrum and decoded output
for sample in range(20):
    _ = plt.plot(x_test[sample], decoded_imgs_test[sample], 'o')
    plt.show()

In [None]:
# Calculating R_square for the test set
r2_score(x_test, decoded_imgs_test, multioutput='variance_weighted')

In [None]:
# Calculating R_square for each spectrum
r2_score(x_test[0], decoded_imgs_test[0], multioutput='variance_weighted')

## Saving the Model

In [None]:
weights = vae6D.get_weights()
vae6D.save_weights('STELIB_6DVAE-augmented_Weights_OK', weights)

In [None]:
vae6D.save('STELIB_6DVAE-augmented_OK.tf', save_format='tf')

In [None]:
encoder.save('STELIB_6D_Encoder-augmented_OK.tf', save_format='tf')

In [None]:
decoder.save('STELIB_6D_Decoder-augmented_OK.tf', save_format='tf')