<a href="https://colab.research.google.com/github/DFwJZ/Real-time_Bot_Detection/blob/main/Cresci2017_AutoEncoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import pickle

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, LeakyReLU, BatchNormalization, \
    Flatten, Dense, Reshape, Conv2DTranspose, Activation, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
import numpy as np

In [None]:
tf.compat.v1.disable_eager_execution()

In [None]:
from tensorflow.keras.layers import Dense, Input, Lambda, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
from tensorflow.keras.losses import mse, binary_crossentropy
import numpy as np
import os
import pickle

class VAE:
    """
    VAE represents a Deep Variational Autoencoder architecture with mirrored encoder and decoder components.
    """

    def __init__(self, input_shape, intermediate_dim, latent_space_dim):
        self.input_shape = input_shape  # (768,)
        self.intermediate_dim = intermediate_dim  # e.g., 256
        self.latent_space_dim = latent_space_dim  # e.g., 10

        self.encoder = None
        self.decoder = None
        self.model = None

        self.mu = None  # Mean
        self.log_variance = None  # Variance

        self._build()

    def _build(self):
        self._build_encoder()
        self._build_decoder()
        self._build_autoencoder()

    def _build_encoder(self):
        encoder_input = Input(shape=self.input_shape, name="encoder_input")
        x = Dense(self.intermediate_dim, activation='relu')(encoder_input)

        self.mu = Dense(self.latent_space_dim, name="mu")(x)
        self.log_variance = Dense(self.latent_space_dim, name="log_variance")(x)

        def sample_point_from_normal_distribution(args):
            mu, log_variance = args
            epsilon = K.random_normal(shape=K.shape(mu), mean=0., stddev=1.)
            sampled_point = mu + K.exp(log_variance / 2) * epsilon
            return sampled_point

        encoder_output = Lambda(sample_point_from_normal_distribution, name="encoder_output")([self.mu, self.log_variance])

        self.encoder = Model(encoder_input, encoder_output, name="encoder")

    def _build_decoder(self):
        decoder_input = Input(shape=(self.latent_space_dim,), name="decoder_input")
        x = Dense(self.intermediate_dim, activation='relu')(decoder_input)
        decoder_output = Dense(np.prod(self.input_shape), activation='sigmoid')(x)

        self.decoder = Model(decoder_input, decoder_output, name="decoder")

    def _build_autoencoder(self):
        autoencoder_input = Input(shape=self.input_shape, name="autoencoder_input")
        encoded_img = self.encoder(autoencoder_input)
        decoded_img = self.decoder(encoded_img)
        self.model = Model(autoencoder_input, decoded_img, name="autoencoder")

    def compile(self, learning_rate=0.0001):
        self.model.compile(optimizer=Adam(learning_rate), loss=self._calculate_combined_loss,
                           metrics=[self._calculate_reconstruction_loss, self._calculate_kl_loss])

    def _calculate_combined_loss(self, y_target, y_predicted):
        reconstruction_loss = self._calculate_reconstruction_loss(y_target, y_predicted)
        kl_loss = self._calculate_kl_loss(y_target, y_predicted)
        combined_loss = reconstruction_loss + kl_loss
        return combined_loss

    def _calculate_reconstruction_loss(self, y_target, y_predicted):
        reconstruction_loss = mse(y_target, y_predicted)
        return reconstruction_loss

    def _calculate_kl_loss(self, y_target, y_predicted):
        kl_loss = 1 + self.log_variance - K.square(self.mu) - K.exp(self.log_variance)
        kl_loss = K.sum(kl_loss, axis=-1)
        kl_loss *= -0.5
        return kl_loss

    def train(self, x_train, batch_size, num_epochs):
        self.model.fit(x_train, x_train, batch_size=batch_size, epochs=num_epochs, shuffle=True)

    def summary(self):
        self.encoder.summary()
        self.decoder.summary()
        self.model.summary()

In [None]:
if __name__ == "__main__":
    autoencoder = VAE(
        input_shape = 768,
        intermediate_dim = 256,  # e.g., 256
        latent_space_dim = 10  # e.g., 10
    )
    autoencoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 768)]        0           []                               
                                                                                                  
 dense (Dense)                  (None, 256)          196864      ['encoder_input[0][0]']          
                                                                                                  
 mu (Dense)                     (None, 10)           2570        ['dense[0][0]']                  
                                                                                                  
 log_variance (Dense)           (None, 10)           2570        ['dense[0][0]']                  
                                                                                            