In [1]:
from typing import Optional, Union, Tuple

import numpy as np
import tensorflow as tf
# import tensorflow_probability as tfp
import tensorflow_addons as tfa
import pandas as pd
from sklearn.metrics import accuracy_score


@tf.function
def identity(x):
    return x

ModuleNotFoundError: No module named 'tensorflow_addons'

In [None]:
import pandas as pd
import numpy as np

np.random.seed(42)  

num_samples = 1000

data = {}
for i in range(1, 13):
    col_name = f"Feature_{i}"
    data[col_name] = np.random.rand(num_samples)  

df = pd.DataFrame(data)
df['Target'] = np.random.randint(0, 2, size=num_samples) 

print(df.head())
feature_columns = list(df.columns[:-1])  

print(f"Số lượng mẫu: {len(df)}, Số lượng cột: {len(df.columns)}")



   Feature_1  Feature_2  Feature_3  Feature_4  Feature_5  Feature_6  \
0   0.374540   0.185133   0.261706   0.672703   0.571996   0.393636   
1   0.950714   0.541901   0.246979   0.796681   0.805432   0.473436   
2   0.731994   0.872946   0.906255   0.250468   0.760161   0.854547   
3   0.598658   0.732225   0.249546   0.624874   0.153900   0.340004   
4   0.156019   0.806561   0.271950   0.571746   0.149249   0.869650   

   Feature_7  Feature_8  Feature_9  Feature_10  Feature_11  Feature_12  Target  
0   0.648257   0.038799   0.720268    0.913578    0.373641    0.533031       1  
1   0.172386   0.186773   0.687283    0.525360    0.332912    0.137899       1  
2   0.872395   0.831246   0.095754    0.724910    0.176154    0.591243       0  
3   0.613116   0.766768   0.922572    0.436048    0.607267    0.314786       0  
4   0.157204   0.350643   0.568472    0.630035    0.476624    0.052349       0  
Số lượng mẫu: 1000, Số lượng cột: 13


In [None]:
class GLUBlock(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None,
                 virtual_batch_size: Optional[int] = 128, 
                 momentum: Optional[float] = 0.02):
        super(GLUBlock, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
            
        self.fc_outout = tf.keras.layers.Dense(self.units, 
                                               use_bias=False)
        self.bn_outout = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                            momentum=self.momentum)
        
        self.fc_gate = tf.keras.layers.Dense(self.units, 
                                             use_bias=False)
        self.bn_gate = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                          momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None):
        output = self.bn_outout(self.fc_outout(inputs), 
                                training=training)
        gate = self.bn_gate(self.fc_gate(inputs), 
                            training=training)
    
        return output * tf.keras.activations.sigmoid(gate) # GLU
    
class FeatureTransformerBlock(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] = 0.02, skip=False):
        super(FeatureTransformerBlock, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        self.skip = skip
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
        
        self.initial = GLUBlock(units = self.units, 
                                virtual_batch_size=self.virtual_batch_size, 
                                momentum=self.momentum)
        self.residual =  GLUBlock(units = self.units, 
                                  virtual_batch_size=self.virtual_batch_size, 
                                  momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None):
        initial = self.initial(inputs, training=training)
        
        if self.skip == True:
            initial += inputs

        residual = self.residual(initial, training=training) # skip
        
        return (initial + residual) * np.sqrt(0.5)

class AttentiveTransformer(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int] = 128, 
                 momentum: Optional[float] = 0.02):
        super(AttentiveTransformer, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
            
        self.fc = tf.keras.layers.Dense(self.units, 
                                        use_bias=False)
        self.bn = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                     momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], priors: Optional[Union[tf.Tensor, np.ndarray]] = None, training: Optional[bool] = None) -> tf.Tensor:
        feature = self.bn(self.fc(inputs), 
                          training=training)
        if priors is None:
            output = feature
        else:
            output = feature * priors
        
        return tfa.activations.sparsemax(output)

class TabNetStep(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] =0.02):
        super(TabNetStep, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
        
        self.unique = FeatureTransformerBlock(units = self.units, 
                                              virtual_batch_size=self.virtual_batch_size, 
                                              momentum=self.momentum,
                                              skip=True)
        self.attention = AttentiveTransformer(units = input_shape[-1], 
                                              virtual_batch_size=self.virtual_batch_size, 
                                              momentum=self.momentum)
        
    def call(self, inputs, shared, priors, training=None) -> Tuple[tf.Tensor]:  
        split = self.unique(shared, training=training)
        keys = self.attention(split, priors, training=training)
        masked = keys * inputs
        
        return split, masked, keys

In [None]:
class TabNetEncoder(tf.keras.layers.Layer):
    def __init__(self, units: int =1, 
                 n_steps: int = 3, 
                 n_features: int = 8,
                 outputs: int = 1, 
                 gamma: float = 1.3,
                 epsilon: float = 1e-8, 
                 sparsity: float = 1e-5, 
                 virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] =0.02):
        super(TabNetEncoder, self).__init__()
        
        self.units = units
        self.n_steps = n_steps
        self.n_features = n_features
        self.virtual_batch_size = virtual_batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.momentum = momentum
        self.sparsity = sparsity
        
    def build(self, input_shape: tf.TensorShape):            
        self.bn = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                     momentum=self.momentum)
        self.shared_block = FeatureTransformerBlock(units = self.n_features, 
                                                    virtual_batch_size=self.virtual_batch_size, 
                                                    momentum=self.momentum)        
        self.initial_step = TabNetStep(units = self.n_features, 
                                       virtual_batch_size=self.virtual_batch_size, 
                                       momentum=self.momentum)
        self.steps = [TabNetStep(units = self.n_features, 
                                 virtual_batch_size=self.virtual_batch_size, 
                                 momentum=self.momentum) for _ in range(self.n_steps)]
        self.final = tf.keras.layers.Dense(units = self.units, 
                                           use_bias=False)
    

    def call(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> Tuple[tf.Tensor]:        
        entropy_loss = 0.
        encoded = 0.
        output = 0.
        importance = 0.
        prior = tf.reduce_mean(tf.ones_like(X), axis=0)
        
        B = prior * self.bn(X, training=training)
        shared = self.shared_block(B, training=training)
        _, masked, keys = self.initial_step(B, shared, prior, training=training)

        for step in self.steps:
            entropy_loss += tf.reduce_mean(tf.reduce_sum(-keys * tf.math.log(keys + self.epsilon), axis=-1)) / tf.cast(self.n_steps, tf.float32)
            prior *= (self.gamma - tf.reduce_mean(keys, axis=0))
            importance += keys
            
            shared = self.shared_block(masked, training=training)
            split, masked, keys = step(B, shared, prior, training=training)
            features = tf.keras.activations.relu(split)
            
            output += features
            encoded += split
            
        self.add_loss(self.sparsity * entropy_loss)
          
        prediction = self.final(output)
        return prediction, encoded, importance

In [None]:
class TabNetDecoder(tf.keras.layers.Layer):
    def __init__(self, units=1, 
                 n_steps = 3, 
                 n_features = 8,
                 outputs = 1, 
                 gamma = 1.3,
                 epsilon = 1e-8, 
                 sparsity = 1e-5, 
                 virtual_batch_size=128, 
                 momentum=0.02):
        super(TabNetDecoder, self).__init__()
        
        self.units = units
        self.n_steps = n_steps
        self.n_features = n_features
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        self.shared_block = FeatureTransformerBlock(units = self.n_features, 
                                                    virtual_batch_size=self.virtual_batch_size, 
                                                    momentum=self.momentum)
        self.steps = [FeatureTransformerBlock(units = self.n_features,
                                              virtual_batch_size=self.virtual_batch_size, 
                                              momentum=self.momentum) for _ in range(self.n_steps)]
        self.fc = [tf.keras.layers.Dense(units = self.units) for _ in range(self.n_steps)]
    

    def call(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> tf.Tensor:
        decoded = 0.
        
        for ftb, fc in zip(self.steps, self.fc):
            shared = self.shared_block(X, training=training)
            feature = ftb(shared, training=training)
            output = fc(feature)
            
            decoded += output
        return decoded

In [2]:
import numpy as np
import tensorflow as tf
import keras
from keras import ops
from keras import layers

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

### Encoder VAE

In [None]:
class VAE_Encoder(tf.keras.layers.Layer):
    def __init__(self, tabnet_encoder, mlp_units, latent_dim=64, name='vae_encoder', **kwargs):
        super(VAE_Encoder, self).__init__(name=name, **kwargs)
        self.tabnet_encoder = tabnet_encoder
        self.mlp = tf.keras.Sequential([
            tf.keras.layers.Dense(units, activation='relu') for units in mlp_units
        ])
        self.mean_layer = tf.keras.layers.Dense(latent_dim)
        self.logvar_layer = tf.keras.layers.Dense(latent_dim)

    def call(self, inputs, training=False):
        _, encoded, _ = self.tabnet_encoder(inputs, training=training)
        encoded = self.mlp(encoded, training=training)
        z_mean = self.mean_layer(encoded)
        z_log_var = self.logvar_layer(encoded)
        z = Sampling()([z_mean, z_log_var])
        return z_mean, z_log_var,z


### Decoder VAE

In [None]:
class VAE_Decoder(tf.keras.layers.Layer):
    def __init__(self, mlp_units, tabnet_decoder, name='vae_decoder', **kwargs):
        super(VAE_Decoder, self).__init__(name=name, **kwargs)
        self.mlp = tf.keras.Sequential([
            tf.keras.layers.Dense(units, activation='relu') for units in reversed(mlp_units)
        ])
        self.tabnet_decoder = tabnet_decoder

    def call(self, inputs=64, training=False):
        decoded = self.mlp(inputs, training=training)
        reconstructed = self.tabnet_decoder(decoded, training=training)
        return reconstructed


In [None]:
class VAE_Tabnet_MLPS(tf.keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tf.keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")
        self.classification_loss_tracker = keras.metrics.Mean(name="classification_loss")
        self.accuracy_tracker = keras.metrics.BinaryAccuracy(name="accuracy")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]
    
    def call(self, inputs):
        return inputs

    def train_step(self, data):
        x_train, y_train = data 
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(x_train)
            print(z.shape)
            reconstruction = self.decoder(z)
            classification_output = self.classifier(z)
            
            reconstruction_loss = keras.losses.binary_crossentropy(x_train, reconstruction)
            reconstruction_loss = tf.reduce_mean(tf.reduce_sum(reconstruction_loss, axis=1))

            # accuracy = self.accuracy_tracker(y_train, classification_output)

            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            print(classification_output.shape,y_train.shape)
            
            classification_loss = keras.losses.binary_crossentropy(y_train, classification_output)
            print('hello')
            classification_loss = tf.reduce_mean(classification_loss)
            
            total_loss = reconstruction_loss + kl_loss + classification_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        self.classification_loss_tracker.update_state(classification_loss)
        self.accuracy_tracker.update_state(y_train, classification_output)

        
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
            "classification_loss": self.classification_loss_tracker.result(),
            "accuracy": self.accuracy_tracker.result()
        }


In [None]:
# Khởi tạo TabNetEncoder và TabNetDecoder
latent_dim=64
tabnet_encoder = TabNetEncoder(units=latent_dim, n_steps=3, n_features=12)
tabnet_decoder = TabNetDecoder(units=12, n_steps=3, n_features=12)

# Khởi tạo VAE
vae_encoder = VAE_Encoder(tabnet_encoder, mlp_units=[64, 32], latent_dim=16)
vae_decoder = VAE_Decoder(mlp_units=[32, 64], tabnet_decoder=tabnet_decoder)
vae = VAE(encoder=vae_encoder, decoder=vae_decoder)

# Compile và huấn luyện VAE
vae.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
vae.fit(df, epochs=50)

# Sử dụng mô hình để dự đoán và tái tạo dữ liệu
reconstructed_data = vae.predict(df)
