In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
from sklearn.model_selection import train_test_split

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
latent_dim = 2

encoder_inputs = keras.Input(shape=(88, 1))
x = layers.Conv1D(8, 3, activation='relu', padding='same', dilation_rate=2)(encoder_inputs)
x = layers.MaxPooling1D(2)(x)
x = layers.Conv1D(16, 3, activation='relu', padding='same', dilation_rate=2)(x)
x = layers.MaxPooling1D(2)(x)
x = layers.Flatten()(x)
encoder = layers.Dense(128, activation='relu')(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(encoder)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(encoder)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(128, activation='relu')(latent_inputs)
x = layers.Dense(88 * 8)(x)
x = layers.Reshape((88, 8))(x)
x = layers.Conv1D(16, 3, activation='relu', padding='same')(x)
x = layers.UpSampling1D(2)(x)
x = layers.Conv1D(8, 3, activation='relu', padding='same')(x)
x = layers.UpSampling1D(2)(x)
decoded = layers.Conv1D(1, 3, activation='sigmoid', padding='same')(x)
decoded = layers.Cropping1D(cropping=(0, 264))(decoded)  # Crop to the desired output shape
decoder = keras.Model(latent_inputs, decoded, name="decoder")
decoder.summary()

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
        return reconstruction

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum((data - reconstruction) ** 2, axis=(1, 2))
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        print('ovo je oblik y', y.shape)
        print('ovo je y', y)
        print('ovo je oblik y_pred' ,y_pred.shape)
        print('ovo je y_pred', y_pred)
        self.compute_loss(y=y, y_pred=y_pred)
        # Update the metrics.
        for metric in self.metrics:
            if metric.name != "total_loss":
                metric.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 88, 1)]      0           []                               
                                                                                                  
 conv1d (Conv1D)                (None, 88, 8)        32          ['input_1[0][0]']                
                                                                                                  
 max_pooling1d (MaxPooling1D)   (None, 44, 8)        0           ['conv1d[0][0]']                 
                                                                                                  
 conv1d_1 (Conv1D)              (None, 44, 16)       400         ['max_pooling1d[0][0]']          
                                                                                            

In [2]:
from sklearn.preprocessing import MinMaxScaler

features = pd.read_csv('features.csv')
labels = pd.read_csv('labels.csv')
(x_train, x_test, y_train, y_test) = train_test_split(features, labels, test_size=0.2, random_state=42)

#print(x_train)
#print(x_test)
#print(y_train)
#print(y_test)

# Convert columns to NumPy arrays
x_train_features = x_train.values
x_test_features = x_test.values

# Apply scaling to the features
scaler = MinMaxScaler()

x_train = scaler.fit_transform(x_train_features)
x_test = scaler.transform(x_test_features)

#print(x_train)
#print(x_test)

x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
y_train = np.expand_dims(y_train, -1)
y_test = np.expand_dims(y_test, -1)

In [3]:
vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(x_train, batch_size=128, epochs=5)
#vae.fit(x_train, batch_size=128, epochs=5, validation_data=(x_test, y_test, None))
#vae.fit(x_train, batch_size=128, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x2a3569a42b0>

In [4]:
#print(x_test.shape)
#print(y_test.shape)

validation_loss = vae.evaluate(y_test, y_test)
print("Validation Loss:", validation_loss)


ValueError: in user code:

    File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\engine\training.py", line 1557, in test_function  *
        return step_function(self, iterator)
    File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\engine\training.py", line 1546, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\engine\training.py", line 1535, in run_step  **
        outputs = model.test_step(data)
    File "C:\Users\PC\AppData\Local\Temp\ipykernel_16380\2656676389.py", line 95, in test_step
        y_pred = self(x, training=False)
    File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\PC\AppData\Local\Temp\__autograph_generated_filetkjb_xs8.py", line 10, in tf__call
        (z_mean, z_log_var, z) = ag__.converted_call(ag__.ld(self).encoder, (ag__.ld(inputs),), None, fscope)

    ValueError: Exception encountered when calling layer "vae" (type VAE).
    
    in user code:
    
        File "C:\Users\PC\AppData\Local\Temp\ipykernel_16380\2656676389.py", line 65, in call  *
            z_mean, z_log_var, z = self.encoder(inputs)
        File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler  **
            raise e.with_traceback(filtered_tb) from None
        File "C:\Anaconda\envs\autoencoders\lib\site-packages\keras\engine\input_spec.py", line 264, in assert_input_compatibility
            raise ValueError(f'Input {input_index} of layer "{layer_name}" is '
    
        ValueError: Input 0 of layer "encoder" is incompatible with the layer: expected shape=(None, 88, 1), found shape=(None, 1, 1)
    
    
    Call arguments received by layer "vae" (type VAE):
      • inputs=tf.Tensor(shape=(None, 1, 1), dtype=int64)


In [None]:
#VIZUALIZACIJA
import matplotlib.pyplot as plt

img_width = 89
img_height = 1
num_channels = 1

def viz_latent_space(encoder, data, labels):
    input_data, target_data = data
    input_label, target_label = labels
    mu, _, _ = encoder.predict(input_data)
    print(target_label)
    labels = input_label['label']
    print(labels)
    print(labels.shape)
    # Plot points with label 0 in blue
    plt.scatter(mu[labels == 0, 0], mu[labels == 0, 1], c='blue', label='Label 0')
    
    # Plot points with label 1 in red
    plt.scatter(mu[labels == 1, 0], mu[labels == 1, 1], c='red', label='Label 1')
    
    # Plot points with label 2 in green
    plt.scatter(mu[labels == 2, 0], mu[labels == 2, 1], c='green', label='Label 2')
    
    # Plot points with label 3 in yellow
    plt.scatter(mu[labels == 3, 0], mu[labels == 3, 1], c='yellow', label='Label 3')
    
    plt.xlabel('z - dim 1')
    plt.ylabel('z - dim 2')
    plt.colorbar()
    plt.legend()
    plt.show()
    
#label_data = pd.read_csv('tmp.csv')
#labels = label_data['Antiviral'].values
labels = (y_train, y_test)

#print(x_test)
#print(y_test)
    
data = (x_train, x_test)
#print(x_train.shape)
viz_latent_space(encoder, data, labels)   

In [None]:
#VIZUALIZACIJA validacije

def viz_latent_space_val(encoder, data, labels):
    input_data, target_data = data
    input_label, target_label = labels
    mu, _, _ = encoder.predict(target_data)
    labels = target_label['label']
    print(labels)
    print(labels.shape)
    # Plot points with label 0 in blue
    plt.scatter(mu[labels == 0, 0], mu[labels == 0, 1], c='blue', label='Label 0')
    
    # Plot points with label 1 in red
    plt.scatter(mu[labels == 1, 0], mu[labels == 1, 1], c='red', label='Label 1')
    
    # Plot points with label 2 in green
    plt.scatter(mu[labels == 2, 0], mu[labels == 2, 1], c='green', label='Label 2')
    
    # Plot points with label 3 in yellow
    plt.scatter(mu[labels == 3, 0], mu[labels == 3, 1], c='yellow', label='Label 3')
    
    plt.xlabel('z - dim 1')
    plt.ylabel('z - dim 2')
    plt.colorbar()
    plt.legend()
    plt.show()
    
#label_data = pd.read_csv('tmp.csv')
#labels = label_data['Antiviral'].values
labels = (y_train, y_test)
    
data = (x_train, x_test)
print(x_train.shape)
viz_latent_space_val(encoder, data, labels)   