In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import tensorflow as tf
import pandas as pd
import tensorflow.compat.v1 as v1

# TabNet Model

In [None]:
def glu(act, n_units):
  """Generalized linear unit nonlinear activation."""
  return act[:, :n_units] * tf.nn.sigmoid(act[:, n_units:])


class TabNet(object):
  """TabNet model class."""

  def __init__(self,
               columns,
               num_features,
               feature_dim,
               output_dim,
               num_decision_steps,
               relaxation_factor,
               batch_momentum,
               virtual_batch_size,
               num_classes,
               epsilon=0.00001):

    self.columns = columns
    self.num_features = num_features
    self.feature_dim = feature_dim
    self.output_dim = output_dim
    self.num_decision_steps = num_decision_steps
    self.relaxation_factor = relaxation_factor
    self.batch_momentum = batch_momentum
    self.virtual_batch_size = virtual_batch_size
    self.num_classes = num_classes
    self.epsilon = epsilon

  def encoder(self, data, reuse, is_training):
    """TabNet encoder model."""

    with tf.variable_scope("Encoder", reuse=reuse):

      # Reads and normalizes input features.
      features = tf.feature_column.input_layer(data, self.columns)
      features = tf.layers.batch_normalization(
          features, training=is_training, momentum=self.batch_momentum)
      batch_size = tf.shape(features)[0]

      # Initializes decision-step dependent variables.
      output_aggregated = tf.zeros([batch_size, self.output_dim])
      masked_features = features
      mask_values = tf.zeros([batch_size, self.num_features])
      aggregated_mask_values = tf.zeros([batch_size, self.num_features])
      complemantary_aggregated_mask_values = tf.ones(
          [batch_size, self.num_features])
      total_entropy = 0

      if is_training:
        v_b = self.virtual_batch_size
      else:
        v_b = 1

      for ni in range(self.num_decision_steps):

        # Feature transformer with two shared and two decision step dependent
        # blocks is used below.

        reuse_flag = (ni > 0)

        transform_f1 = tf.layers.dense(
            masked_features,
            self.feature_dim * 2,
            name="Transform_f1",
            reuse=reuse_flag,
            use_bias=False)
        transform_f1 = tf.layers.batch_normalization(
            transform_f1,
            training=is_training,
            momentum=self.batch_momentum,
            virtual_batch_size=v_b)
        transform_f1 = glu(transform_f1, self.feature_dim)

        transform_f2 = tf.layers.dense(
            transform_f1,
            self.feature_dim * 2,
            name="Transform_f2",
            reuse=reuse_flag,
            use_bias=False)
        transform_f2 = tf.layers.batch_normalization(
            transform_f2,
            training=is_training,
            momentum=self.batch_momentum,
            virtual_batch_size=v_b)
        transform_f2 = (glu(transform_f2, self.feature_dim) +
                        transform_f1) * np.sqrt(0.5)

        transform_f3 = tf.layers.dense(
            transform_f2,
            self.feature_dim * 2,
            name="Transform_f3" + str(ni),
            use_bias=False)
        transform_f3 = tf.layers.batch_normalization(
            transform_f3,
            training=is_training,
            momentum=self.batch_momentum,
            virtual_batch_size=v_b)
        transform_f3 = (glu(transform_f3, self.feature_dim) +
                        transform_f2) * np.sqrt(0.5)

        transform_f4 = tf.layers.dense(
            transform_f3,
            self.feature_dim * 2,
            name="Transform_f4" + str(ni),
            use_bias=False)
        transform_f4 = tf.layers.batch_normalization(
            transform_f4,
            training=is_training,
            momentum=self.batch_momentum,
            virtual_batch_size=v_b)
        transform_f4 = (glu(transform_f4, self.feature_dim) +
                        transform_f3) * np.sqrt(0.5)

        if ni > 0:

          decision_out = tf.nn.relu(transform_f4[:, :self.output_dim])

          # Decision aggregation.
          output_aggregated += decision_out

          # Aggregated masks are used for visualization of the
          # feature importance attributes.
          scale_agg = tf.reduce_sum(
              decision_out, axis=1, keep_dims=True) / (
                  self.num_decision_steps - 1)
          aggregated_mask_values += mask_values * scale_agg

        features_for_coef = (transform_f4[:, self.output_dim:])

        if ni < self.num_decision_steps - 1:

          # Determines the feature masks via linear and nonlinear
          # transformations, taking into account of aggregated feature use.
          mask_values = tf.layers.dense(
              features_for_coef,
              self.num_features,
              name="Transform_coef" + str(ni),
              use_bias=False)
          mask_values = tf.layers.batch_normalization(
              mask_values,
              training=is_training,
              momentum=self.batch_momentum,
              virtual_batch_size=v_b)
          mask_values *= complemantary_aggregated_mask_values
          mask_values = tf.contrib.sparsemax.sparsemax(mask_values)

          # Relaxation factor controls the amount of reuse of features between
          # different decision blocks and updated with the values of
          # coefficients.
          complemantary_aggregated_mask_values *= (
              self.relaxation_factor - mask_values)

          # Entropy is used to penalize the amount of sparsity in feature
          # selection.
          total_entropy += tf.reduce_mean(
              tf.reduce_sum(
                  -mask_values * tf.log(mask_values + self.epsilon),
                  axis=1)) / (
                      self.num_decision_steps - 1)

          # Feature selection.
          masked_features = tf.multiply(mask_values, features)

          # Visualization of the feature selection mask at decision step ni
          tf.summary.image(
              "Mask for step" + str(ni),
              tf.expand_dims(tf.expand_dims(mask_values, 0), 3),
              max_outputs=1)

      # Visualization of the aggregated feature importances
      tf.summary.image(
          "Aggregated mask",
          tf.expand_dims(tf.expand_dims(aggregated_mask_values, 0), 3),
          max_outputs=1)

      return output_aggregated, total_entropy

  def classify(self, activations, reuse):
    """TabNet classify block."""

    with tf.variable_scope("Classify", reuse=reuse):
      logits = tf.layers.dense(activations, self.num_classes, use_bias=False)
      predictions = tf.nn.softmax(logits)
      return logits, predictions

  def regress(self, activations, reuse):
    """TabNet regress block."""

    with tf.variable_scope("Regress", reuse=reuse):
      predictions = tf.layers.dense(activations, 1)
      return predictions

# To Do


Application

In [None]:
path = "https://raw.githubusercontent.com/shrikant-temburwar/Wine-Quality-Dataset/master/winequality-red.csv"
df_r = pd.read_csv(path,delimiter=";")
#df_r['Color'] = 1

path = "https://raw.githubusercontent.com/shrikant-temburwar/Wine-Quality-Dataset/master/winequality-white.csv"
df_w = pd.read_csv(path,delimiter=";")
#df_w['Color'] = 0
#df = df_r.append(df_w)
df = df_w
df
X = df.loc[:,df.columns != 'quality']


In [None]:
# Broken Down TabNet

# something to note, this loop is being fed ALREADY BATCHED & SHUFFLED data
# I think these low level Tensorflow models need to be fed a TF dataset

# So feature column input is using dicts and tensors to navigate the input data

# First step is to input the data & batch normalize
# Input data
features = tf.keras.layers.InputLayer()
# batch normalize
features = tf.keras.layers.BatchNormalization(features)
# get size of batches
batch_size = tf.shape(features[0])

# Initializes decision-step dependent variables.

# Dimensions of output, not sure what this will effect, as later is mapped to classify or regress
output_aggregated = tf.zeros([batch_size, #output_dim])
# instantiate masked features, this will be updated via feature selection process
masked_features = features
mask_values = tf.zeros([batch_size, #num_features])
aggregated_mask_values = tf.zeros([batch_size, #num_features])
complemantary_aggregated_mask_values = tf.ones([batch_size, #num_features])
# entropy starting value
total_entropy = 0

# If we are training, the batch size is set
if is_training:
  v_b = #virtual_batch_size
# if we are not training, the batch size is 1 for prediction(?)
else:
  v_b = 1

tf.keras.layers.De

# feed normalized data into feature transformer
for ni in range(#num_decision_steps):

  reuse_flag = (ni > 0)

 #Set to None to maintain a linear activation.  

  transform_f1 = tf.keras.layers.Dense(
      masked_features,
      self.feature_dim * 2,
      reuse=reuse_flag,
      use_bias=False)
  
  transform_f1 = tf.layers.batch_normalization(
      transform_f1,
      training=is_training,
      momentum=self.batch_momentum,
      virtual_batch_size=v_b)
      transform_f1 = glu(transform_f1, self.feature_dim)

# Kaggle Models

https://www.kaggle.com/marcusgawronsky/tabnet-in-tensorflow-2-0/notebook

In [None]:
from typing import Optional, Union, Tuple

import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import tensorflow_addons as tfa
import pandas as pd
from sklearn.metrics import accuracy_score

#GLU Block
#The first component we are going to need to build is our GLUBlock which complises two fully 
#connected layers, two ghost batch normalization, our identity and sigmoid activation function 
#and multiplication operation. Here we use Tensorflow 2.0 custom layer subclassing to make this 
#layer easy to work with a reusable across the rest of our model. Here I have added a number of 
#type-hints for users to make working with this customer layer easy to follow and apply.

class GLUBlock(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None,
                 virtual_batch_size: Optional[int] = 128, 
                 momentum: Optional[float] = 0.02):
        super(GLUBlock, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
            
        self.fc_outout = tf.keras.layers.Dense(self.units, 
                                               use_bias=False)
        self.bn_outout = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                            momentum=self.momentum)
        
        self.fc_gate = tf.keras.layers.Dense(self.units, 
                                             use_bias=False)
        self.bn_gate = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                          momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None):
        output = self.bn_outout(self.fc_outout(inputs), 
                                training=training)
        gate = self.bn_gate(self.fc_gate(inputs), 
                            training=training)
    
        return output * tf.keras.activations.sigmoid(gate) # GLU

In [None]:
#Feature Transformer Block
#Here we again use subclassing to define a layer to represent either the shared 
#or independent steps to 'Feature Transformer' in the diagram above. This block 
#comprises two GLU Blocks with a skip connection form the output of the first block 
#to the output of the second. Here I have had to add a flag to add a skip connection 
#over the first GLU Block, as the this is only present in the decision step dependent block.

class FeatureTransformerBlock(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] = 0.02, skip=False):
        super(FeatureTransformerBlock, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        self.skip = skip
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
        
        self.initial = GLUBlock(units = self.units, 
                                virtual_batch_size=self.virtual_batch_size, 
                                momentum=self.momentum)
        self.residual =  GLUBlock(units = self.units, 
                                  virtual_batch_size=self.virtual_batch_size, 
                                  momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None):
        initial = self.initial(inputs, training=training)
        
        if self.skip == True:
            initial += inputs

        residual = self.residual(initial, training=training) # skip
        
        return (initial + residual) * np.sqrt(0.5)

In [None]:
#Attention Block
#This block is simple to implement and involves prior to the actual mask 
#operation, just a dense layer fed into a batch normalization layer, followed 
#by a sparsemax actication function. The major complication in this block is in 
#how to handle TabNet prior, used to encourage orthogonal feature selection 
#across decision steps. Here we just use it as an input to our layer and 
#reserve to handle the updates to our priors in our TabNet step layer.

class AttentiveTransformer(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int] = 128, 
                 momentum: Optional[float] = 0.02):
        super(AttentiveTransformer, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
            
        self.fc = tf.keras.layers.Dense(self.units, 
                                        use_bias=False)
        self.bn = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                     momentum=self.momentum)
        
    def call(self, inputs: Union[tf.Tensor, np.ndarray], priors: Optional[Union[tf.Tensor, np.ndarray]] = None, training: Optional[bool] = None) -> tf.Tensor:
        feature = self.bn(self.fc(inputs), 
                          training=training)
        if priors is None:
            output = feature
        else:
            output = feature * priors
        
        return tfa.activations.sparsemax(output)

In [None]:
#TabNetStep
#In this TabNetStep Block I take a nunmber of design decision to make 
#implmentation and reusability simpler. At this layer we take as inputs our 
#batch normalized features, the output of our shared feature transformer, 
#and our priors of the current step and output the features embedding at our 
#split point, the masked feature to used in the shared feature transfomer black 
#of the next step and the mask used in our attention operation. This mask will 
#be important as we most though layers in ensuring new features are selected 
#across steps and providing local and global feature attributions for each 
#output. This block comprises our FeatureTransformerBlock and Attention 
#Transfomer block and starts to piece all our components together.

class TabNetStep(tf.keras.layers.Layer):
    def __init__(self, units: Optional[int] = None, virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] =0.02):
        super(TabNetStep, self).__init__()
        self.units = units
        self.virtual_batch_size = virtual_batch_size
        self.momentum = momentum
        
    def build(self, input_shape: tf.TensorShape):
        if self.units is None:
            self.units = input_shape[-1]
        
        self.unique = FeatureTransformerBlock(units = self.units, 
                                              virtual_batch_size=self.virtual_batch_size, 
                                              momentum=self.momentum,
                                              skip=True)
        self.attention = AttentiveTransformer(units = input_shape[-1], 
                                              virtual_batch_size=self.virtual_batch_size, 
                                              momentum=self.momentum)
        
    def call(self, inputs, shared, priors, training=None) -> Tuple[tf.Tensor]:  
        split = self.unique(shared, training=training)
        keys = self.attention(split, priors, training=training)
        masked = keys * inputs
        
        return split, masked, keys

In [None]:
#TabNetEncoder
#I opted to present the entire model architecture as a layer. This makes this 
#easier to work with between use cases, as we apply TabNet in unsupervised, 
#self-supervised and multiple supervised domains without having to rewrite 
#large tracts of code each time. You will see here, we accumulate our feature 
#embeddings at each decision step, update our priors and compute out entropy 
#loss used to limit how often features are reused across steps. This makes for 
#a complicated layer, but in many ways adds modularity which is very useful going forward.

class TabNetEncoder(tf.keras.layers.Layer):
    def __init__(self, units: int =1, 
                 n_steps: int = 3, 
                 n_features: int = 8,
                 outputs: int = 1, 
                 gamma: float = 1.3,
                 epsilon: float = 1e-8, 
                 sparsity: float = 1e-5, 
                 virtual_batch_size: Optional[int]=128, 
                 momentum: Optional[float] =0.02):
        super(TabNetEncoder, self).__init__()
        
        self.units = units
        self.n_steps = n_steps
        self.n_features = n_features
        self.virtual_batch_size = virtual_batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.momentum = momentum
        self.sparsity = sparsity
        
    def build(self, input_shape: tf.TensorShape):            
        self.bn = tf.keras.layers.BatchNormalization(virtual_batch_size=self.virtual_batch_size, 
                                                     momentum=self.momentum)
        self.shared_block = FeatureTransformerBlock(units = self.n_features, 
                                                    virtual_batch_size=self.virtual_batch_size, 
                                                    momentum=self.momentum)        
        self.initial_step = TabNetStep(units = self.n_features, 
                                       virtual_batch_size=self.virtual_batch_size, 
                                       momentum=self.momentum)
        self.steps = [TabNetStep(units = self.n_features, 
                                 virtual_batch_size=self.virtual_batch_size, 
                                 momentum=self.momentum) for _ in range(self.n_steps)]
        self.final = tf.keras.layers.Dense(units = self.units, 
                                           use_bias=False)
    

    def call(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> Tuple[tf.Tensor]:        
        entropy_loss = 0.
        encoded = 0.
        output = 0.
        importance = 0.
        prior = tf.reduce_mean(tf.ones_like(X), axis=0)
        
        B = prior * self.bn(X, training=training)
        shared = self.shared_block(B, training=training)
        _, masked, keys = self.initial_step(B, shared, prior, training=training)

        for step in self.steps:
            entropy_loss += tf.reduce_mean(tf.reduce_sum(-keys * tf.math.log(keys + self.epsilon), axis=-1)) / tf.cast(self.n_steps, tf.float32)
            prior *= (self.gamma - tf.reduce_mean(keys, axis=0))
            importance += keys
            
            shared = self.shared_block(masked, training=training)
            split, masked, keys = step(B, shared, prior, training=training)
            features = tf.keras.activations.relu(split)
            
            output += features
            encoded += split
            
        self.add_loss(self.sparsity * entropy_loss)
          
        prediction = self.final(output)
        return prediction, encoded, importance

In [None]:
CATEGORICAL_COLUMNS = ['line_stat', 'serv_type', 'serv_code',
                       'bandwidth', 'term_reas_code', 'term_reas_desc',
                       'with_phone_service', 'current_mth_churn']
NUMERIC_COLUMNS = ['contract_month', 'ce_expiry', 'secured_revenue', 'complaint_cnt']

df = pd.read_csv('/content/bbs_cust_base_scfy_20200210.csv').assign(complaint_cnt = lambda df: pd.to_numeric(df.complaint_cnt, 'coerce'))

df.loc[:, NUMERIC_COLUMNS] = df.loc[:, NUMERIC_COLUMNS].astype(np.float32).pipe(lambda df: df.fillna(df.mean())).pipe(lambda df: (df - df.mean())/df.std())

df.loc[:, CATEGORICAL_COLUMNS] = df.loc[:, CATEGORICAL_COLUMNS].astype(str).applymap(str).fillna('')

df = df.groupby('churn').apply(lambda df: df.sample(df.churn.value_counts().min()))
df.head()

In [None]:
from sklearn.model_selection import train_test_split

def get_labels(x: pd.Series) -> pd.Series:
    """
    Converts strings to unqiue ints for use in Pytorch Embedding
    """
    labels, levels = pd.factorize(x)
    return pd.Series(labels, name=x.name, index=x.index)

X, E, y = (df
           .loc[:, NUMERIC_COLUMNS]
           .astype('float32')
           .join(pd.get_dummies(df.loc[:, CATEGORICAL_COLUMNS])),
           df
           .loc[:, NUMERIC_COLUMNS]
           .astype('float32')
           .join(df.loc[:, CATEGORICAL_COLUMNS].apply(get_labels).add(1).astype('int32')),
           df.churn == 'Y')

X_train, X_valid, E_train, E_valid, y_train, y_valid = train_test_split(X.to_numpy(), E, y.to_numpy(), train_size=250000, test_size=250000)

In [None]:
def get_feature(x: pd.DataFrame, dimension=1):
    if x.dtype == np.float32:
        return tf.feature_column.numeric_column(x.name)
    else:
        return tf.feature_column.embedding_column(
        tf.feature_column.categorical_column_with_identity(x.name, num_buckets=x.max() + 1, default_value=0),
        dimension=dimension)
    
def df_to_dataset(X: pd.DataFrame, y: pd.Series, shuffle=False, batch_size=50000):
    ds = tf.data.Dataset.from_tensor_slices((dict(X.copy()), y.copy()))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(X))
    ds = ds.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
    return ds

columns = [get_feature(f) for k, f in E_train.iteritems()]
feature_column = tf.keras.layers.DenseFeatures(columns, trainable=True)

train, valid = df_to_dataset(E_train, y_train), df_to_dataset(E_valid, y_valid)

In [None]:
#The first application we will be looking at is in supervised learning this 
#is a primary aim of TabNet so is one we will explore. Here I tried to trick 
#to a number of hyperparameter defaults found in other implementation, exploring 
#only a smaller feature vector size for the purpose of visualization later on. 
#In my experiment this hampers the formance of the model greatly but, in my 
#implementation, reduces greatly the overall footprint of the model given 
#the use of weight sharing across the steps.

#Here I use Tensorflow 2's model subclassing approach to make 
#explainations and feature visualization easier later on. For 
#production use, the subclassing API does have some limitation in 
#how model can be serialized and unserialized- some of which have 
#been adressed in Tensorflow 2.2 and 2.3 releases.

class TabNetClassifier(tf.keras.Model):
    def __init__(self, outputs: int = 1, 
                 n_steps: int = 3, 
                 n_features: int = 8,
                 gamma: float = 1.3, 
                 epsilon: float = 1e-8, 
                 sparsity: float = 1e-5, 
                 feature_column: Optional[tf.keras.layers.DenseFeatures] = None, 
                 pretrained_encoder: Optional[tf.keras.layers.Layer] = None,
                 virtual_batch_size: Optional[int] = 128, 
                 momentum: Optional[float] = 0.02):
        super(TabNetClassifier, self).__init__()
        
        self.outputs = outputs
        self.n_steps = n_steps
        self.n_features = n_features
        self.feature_column = feature_column
        self.pretrained_encoder = pretrained_encoder
        self.virtual_batch_size = virtual_batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.momentum = momentum
        self.sparsity = sparsity
        
        if feature_column is None:
            self.feature = tf.keras.layers.Lambda(identity)
        else:
            self.feature = feature_column
            
        if pretrained_encoder is None:
            self.encoder = TabNetEncoder(units=outputs, 
                                        n_steps=n_steps, 
                                        n_features = n_features,
                                        outputs=outputs, 
                                        gamma=gamma, 
                                        epsilon=epsilon, 
                                        sparsity=sparsity,
                                        virtual_batch_size=self.virtual_batch_size, 
                                        momentum=momentum)
        else:
            self.encoder = pretrained_encoder

    def forward(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> Tuple[tf.Tensor]:
        X = self.feature(X)
        output, encoded, importance = self.encoder(X)
          
        prediction = tf.keras.activations.sigmoid(output)
        return prediction, encoded, importance
    
    def call(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> tf.Tensor:
        prediction, _, _ = self.forward(X)
        return prediction
    
    def transform(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> tf.Tensor:
        _, encoded, _ = self.forward(X)
        return encoded
    
    def explain(self, X: Union[tf.Tensor, np.ndarray], training: Optional[bool] = None) -> tf.Tensor:
        _, _, importance = self.forward(X)
        return importance

In [None]:
m = TabNetClassifier(outputs=1, n_steps=3, n_features = 2, feature_column=feature_column, virtual_batch_size=250)
m.compile(tf.keras.optimizers.Adam(learning_rate=0.025), tf.keras.losses.binary_crossentropy)
m.fit(train, epochs=100)

In [None]:
tf_tabnet_y_pred = m.predict(train)

accuracy_score(y_train, tf_tabnet_y_pred > 0.5)

0.875036

In [None]:
tf_tabnet_y_pred = m.predict(valid)

accuracy_score(y_valid, tf_tabnet_y_pred > 0.5) 

0.876124

# TabNet in TF 2.0 from Ostamand

https://github.com/ostamand/tensorflow-tabnet

## Notes:

The FeatureBlock function is nested within the FeatureTransformer

Need to understand how to fit this model, which requires understanding of each sub process block. If I can understand what is happening, then I can make my own NN with attention blocks, transformers, encoders, etc.

Notes 12/26

Try to write your own model using feature transformers & attention blocks using classes

(1) understand how to implement a model using classes

(2) understand how to assign tensorflow names using ->

(3) understand the building of the model with the loop & blocks

Understanding

TabNet uses soft feature selection via an embedding matrix

"[TabNet] embeds soft feature selection with controllable
sparsity via sequential attention."

In [None]:
from typing import List, Tuple

import tensorflow as tf
from tensorflow_addons.activations import sparsemax

In [None]:
class FeatureBlock(tf.keras.Model):
    def __init__(
        self,
        feature_dim: int,
        apply_glu: bool = True,
        bn_momentum: float = 0.9,
        bn_virtual_divider: int = 32,
        fc: tf.keras.layers.Layer = None,
        epsilon: float = 1e-5,
    ):
        super(FeatureBlock, self).__init__()
        self.apply_gpu = apply_glu
        self.feature_dim = feature_dim

        units = feature_dim * 2 if apply_glu else feature_dim

        self.fc = tf.keras.layers.Dense(units, use_bias=False) if fc is None else fc
        self.bn = GhostBatchNormalization(virtual_divider=bn_virtual_divider, momentum=bn_momentum)

    def call(self, x, training: bool = None, alpha: float = 0.0):
        x = self.fc(x)
        x = self.bn(x, training=training, alpha=alpha)
        if self.apply_gpu:
            return glu(x, self.feature_dim)
        return x

In [None]:
class AttentiveTransformer(tf.keras.Model):
    def __init__(self, feature_dim: int, bn_momentum: float, bn_virtual_divider: int):
        super(AttentiveTransformer, self).__init__()
        self.block = FeatureBlock(
            feature_dim,
            bn_momentum=bn_momentum,
            bn_virtual_divider=bn_virtual_divider,
            apply_glu=False,
        )

    def call(self, x, prior_scales, training=None, alpha: float = 0.0):
        x = self.block(x, training=training, alpha=alpha)
        return sparsemax(x * prior_scales)

In [None]:
def glu(x, n_units=None):
    """Generalized linear unit nonlinear activation."""
    return x[:, :n_units] * tf.nn.sigmoid(x[:, n_units:])

In [None]:
class FeatureTransformer(tf.keras.Model):
    def __init__(
        self,
        feature_dim: int,
        fcs: List[tf.keras.layers.Layer] = [],
        n_total: int = 4,
        n_shared: int = 2,
        bn_momentum: float = 0.9,
        bn_virtual_divider: int = 1,
    ):
        super(FeatureTransformer, self).__init__()
        self.n_total, self.n_shared = n_total, n_shared

        kargs = {
            "feature_dim": feature_dim,
            "bn_momentum": bn_momentum,
            "bn_virtual_divider": bn_virtual_divider,
        }

        # build blocks
        self.blocks: List[FeatureBlock] = []
        for n in range(n_total):
            # some shared blocks
            if fcs and n < len(fcs):
                self.blocks.append(FeatureBlock(**kargs, fc=fcs[n]))
            # build new blocks
            else:
                self.blocks.append(FeatureBlock(**kargs))

    def call(self, x: tf.Tensor, training: bool = None, alpha: float = 0.0) -> tf.Tensor:
      
        x = self.blocks[0](x, training=training, alpha=alpha)
        for n in range(1, self.n_total):
            x = x * tf.sqrt(0.5) + self.blocks[n](x, training=training, alpha=alpha)
        return x

    @property
    def shared_fcs(self):
        return [self.blocks[i].fc for i in range(self.n_shared)]

In [None]:
class GhostBatchNormalization(tf.keras.Model):
    def __init__(
        self, virtual_divider: int = 1, momentum: float = 0.9, epsilon: float = 1e-5
    ):
        super(GhostBatchNormalization, self).__init__()
        self.virtual_divider = virtual_divider
        self.bn = BatchNormInferenceWeighting(momentum=momentum)

    def call(self, x, training: bool = None, alpha: float = 0.0):
        if training:
            chunks = tf.split(x, self.virtual_divider)
            x = [self.bn(x, training=True) for x in chunks]
            return tf.concat(x, 0)
        return self.bn(x, training=False, alpha=alpha)

    @property
    def moving_mean(self):
        return self.bn.moving_mean

    @property
    def moving_variance(self):
        return self.bn.moving_variance


class BatchNormInferenceWeighting(tf.keras.layers.Layer):
    def __init__(self, momentum: float = 0.9, epsilon: float = None):
        super(BatchNormInferenceWeighting, self).__init__()
        self.momentum = momentum
        self.epsilon = tf.keras.backend.epsilon() if epsilon is None else epsilon

    def build(self, input_shape):
        channels = input_shape[-1]

        self.gamma = tf.Variable(
            initial_value=tf.ones((channels,), tf.float32), trainable=True,)
        
        self.beta = tf.Variable(
            initial_value=tf.zeros((channels,), tf.float32), trainable=True,)

        self.moving_mean = tf.Variable(
            initial_value=tf.zeros((channels,), tf.float32), trainable=False,)
        
        self.moving_mean_of_squares = tf.Variable(
            initial_value=tf.zeros((channels,), tf.float32), trainable=False,)

    def __update_moving(self, var, value):
        var.assign(var * self.momentum + (1 - self.momentum) * value)

    def __apply_normalization(self, x, mean, variance):
        return self.gamma * (x - mean) / tf.sqrt(variance + self.epsilon) + self.beta

    def call(self, x, training: bool = None, alpha: float = 0.0):
        mean = tf.reduce_mean(x, axis=0)
        mean_of_squares = tf.reduce_mean(tf.pow(x, 2), axis=0)

        if training:
            # update moving stats
            self.__update_moving(self.moving_mean, mean)
            self.__update_moving(self.moving_mean_of_squares, mean_of_squares)

            variance = mean_of_squares - tf.pow(mean, 2)
            x = self.__apply_normalization(x, mean, variance)
        else:
            mean = alpha * mean + (1 - alpha) * self.moving_mean
            variance = (alpha * mean_of_squares + (1 - alpha) * self.moving_mean_of_squares) - tf.pow(mean, 2)
            x = self.__apply_normalization(x, mean, variance)

        return x

In [None]:
# Model
class TabNet(tf.keras.Model):
    def __init__(
        self,
        num_features: int,
        feature_dim: int,
        output_dim: int,
        feature_columns: List = None,
        n_step: int = 1,
        n_total: int = 4,
        n_shared: int = 2,
        relaxation_factor: float = 1.5,
        bn_epsilon: float = 1e-5,
        bn_momentum: float = 0.7,
        bn_virtual_divider: int = 1,
    ):
        """TabNet
        Will output a vector of size output_dim.
        Args:
            num_features (int): Number of features.
            feature_dim (int): Embedding feature dimention to use.
            output_dim (int): Output dimension.
            feature_columns (List, optional): If defined will add a DenseFeatures layer first. Defaults to None.
            n_step (int, optional): Total number of steps. Defaults to 1.
            n_total (int, optional): Total number of feature transformer blocks. Defaults to 4.
            n_shared (int, optional): Number of shared feature transformer blocks. Defaults to 2.
            relaxation_factor (float, optional): >1 will allow features to be used more than once. Defaults to 1.5.
            bn_epsilon (float, optional): Batch normalization, epsilon. Defaults to 1e-5.
            bn_momentum (float, optional): Batch normalization, momentum. Defaults to 0.7.
            bn_virtual_divider (int, optional): Batch normalization. Full batch will be divided by this.
        """
        super(TabNet, self).__init__()
        self.output_dim, self.num_features = output_dim, num_features
        self.n_step, self.relaxation_factor = n_step, relaxation_factor
        self.feature_columns = feature_columns

        if feature_columns is not None:
            self.input_features = tf.keras.layers.DenseFeatures(feature_columns)

        self.bn = tf.keras.layers.BatchNormalization(momentum=bn_momentum, epsilon=bn_epsilon)

        kargs = {
            "feature_dim": feature_dim + output_dim,
            "n_total": n_total,
            "n_shared": n_shared,
            "bn_momentum": bn_momentum,
            "bn_virtual_divider": bn_virtual_divider,
        }

        # first feature transformer block is built first to get the shared blocks
        self.feature_transforms: List[FeatureTransformer] = [FeatureTransformer(**kargs)]
        self.attentive_transforms: List[AttentiveTransformer] = []

        for i in range(n_step):
            self.feature_transforms.append(
                FeatureTransformer(**kargs, fcs=self.feature_transforms[0].shared_fcs)
            )
            self.attentive_transforms.append(
                AttentiveTransformer(num_features, bn_momentum, bn_virtual_divider)
            )

    def call(self, features: tf.Tensor, training: bool = None, alpha: float = 0.0) -> Tuple[tf.Tensor, tf.Tensor]:
        if self.feature_columns is not None:
            features = self.input_features(features)

        bs = tf.shape(features)[0]
        out_agg = tf.zeros((bs, self.output_dim))
        prior_scales = tf.ones((bs, self.num_features))
        masks = []

        features = self.bn(features, training=training)
        masked_features = features

        total_entropy = 0.0

        for step_i in range(self.n_step + 1):

            x = self.feature_transforms[step_i](masked_features, training=training, alpha=alpha)

            if step_i > 0:
                out = tf.keras.activations.relu(x[:, : self.output_dim])
                out_agg += out

            # no need to build the features mask for the last step
            if step_i < self.n_step:
                x_for_mask = x[:, self.output_dim :]

                mask_values = self.attentive_transforms[step_i](x_for_mask, prior_scales, training=training, alpha=alpha)

                # relaxation factor of 1 forces the feature to be only used once.
                prior_scales *= self.relaxation_factor - mask_values

                masked_features = tf.multiply(mask_values, features)

                # entropy is used to penalize the amount of sparsity in feature selection
                total_entropy = tf.reduce_mean(
                    tf.reduce_sum(
                        tf.multiply(mask_values, 
                                    tf.math.log(mask_values + 1e-15)),
                        axis=1,
                    )
                )

                masks.append(tf.expand_dims(tf.expand_dims(mask_values, 0), 3))

        loss = total_entropy / self.n_step

        return out_agg, loss, masks

## Fit Model

### Data

In [None]:
import tensorflow_datasets as tfds
def prepare_dataset(
    ds: tf.data.Dataset,
    batch_size: int,
    shuffle: bool = False,
    drop_remainder: bool = False,
):
    size_of_dataset = ds.reduce(0, lambda x, _: x + 1).numpy()
    if shuffle:
        ds = ds.shuffle(buffer_size=size_of_dataset, seed=SEED)
    ds: tf.data.Dataset = ds.batch(batch_size, drop_remainder=drop_remainder)

    @tf.function
    def prepare_data(features):
        image = tf.cast(features["image"], tf.float32)
        bs = tf.shape(image)[0]
        image = tf.reshape(image / 255.0, (bs, -1))
        return image, features["label"]

    autotune = tf.data.experimental.AUTOTUNE
    ds = ds.map(prepare_data, num_parallel_calls=autotune).prefetch(autotune)
    return ds

    # first 80% for train. remaining 20% for val & test dataset for final eval.
ds_tr, ds_val, ds_test = tfds.load(
        name="mnist",
        split=["train[:80%]", "train[-20%:]", "test"],
        data_dir="mnist",
        shuffle_files=False,
    )

batch_size = 32
SEED = 42
ds_tr = prepare_dataset(ds_tr, batch_size, shuffle=True, drop_remainder=True)
ds_val = prepare_dataset(ds_val, batch_size, shuffle=False, drop_remainder=False)
ds_test = prepare_dataset(ds_test, batch_size, shuffle=False, drop_remainder=False)

### Model

In [None]:
DEFAULTS = {"num_features": 784, "n_classes": 10, "min_learning_rate": 1e-6} 

model = TabNetClassifier(
        num_features=DEFAULTS["num_features"],
        feature_dim= 32,
        output_dim= 32,
        n_classes=DEFAULTS["n_classes"],
        n_step= 4,
        relaxation_factor=1.5,
        sparsity_coefficient=0.0001,
        bn_momentum=0.7,
        bn_virtual_divider=1,  # let's not use Ghost Batch Normalization. batch sizes are too small
        dp=0.0)

optimizer = tf.keras.optimizers.Adam(
        learning_rate=0.02,
        clipnorm=2,
    )

lossf = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model.compile(
        optimizer,
        loss=lossf,
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy(name="accuracy")],
    )

In [None]:
model.fit(ds_tr)

In [None]:
def glu(act, n_units):
  """Generalized linear unit nonlinear activation."""
  return act[:, :n_units] * tf.nn.sigmoid(act[:, n_units:])


class Transformer(layers.Layer):
  def __init__(self,feature_dim):

    super(Transformer,self).__init__()

    units = feature_dim * 2
    self.dense = layers.Dense(units,use_bias=False)
    self.bn = layers.BatchNormalization()

  def call(self,x,training=False):
    x = self.dense(x)
    x = self.bn(x, training = training)
    return x


class MyModel(keras.Model):
  def __init__(self,feature_dim):

    super(MyModel,self).__init__()
    self.feature_dim = feature_dim
    
    self.one = Transformer(self.feature_dim)
    self.two = Transformer(self.feature_dim)
    self.three = Transformer(self.feature_dim)
    self.four = Transformer(self.feature_dim)
    self.out = layers.Dense(1)

# I think if I were to use logic it would be in the call
  def call(self,input_tensor):
    x = self.one(input_tensor)
    t1=glu(x,self.feature_dim)
    x = glu(x,self.feature_dim)  
   
    x = self.two(x)
    t2 = glu(x,self.feature_dim)
    x = (glu(x,self.feature_dim)+t1)*np.sqrt(0.5)
    # I don't understand what is coming out of these models so it is hard to tell what is going on
    # doesn't make well for understanding
    
    x = self.three(x)
    t3 = glu(x,self.feature_dim)
    x = (glu(x,self.feature_dim)+t2)*np.sqrt(0.5)

    x = self.four(x)
    x = (glu(x,self.feature_dim)+t3)*np.sqrt(0.5)

    x = tf.nn.relu(x)
    return self.out(x)

# Not sure how to do the sequential attention, or the masked features
# I think I have the layers, probably not though

model = MyModel(feature_dim = X.shape[1])

model.compile(
    loss = 'mse',
    metrics=[tf.keras.metrics.MeanSquaredError()]
)

model.fit(X,y,epochs=20,batch_size=40)


# Object Oriented Tensorflow Models



## Subclassing

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist

In [None]:
(x_train, y_train), (x_test,y_test) = mnist.load_data()
x_train = x_train.reshape(-1,28,28,1).astype("float32") / 255.0
x_test = x_test.reshape(-1,28,28,1).astype("float32") / 255.0

In [None]:
class CNNBlock(layers.Layer):
  def __init__(self,out_channels,kernel_size=3):
    super(CNNBlock,self).__init__()
    self.conv = layers.Conv2D(out_channels, kernel_size, padding='same')
    self.bn = layers.BatchNormalization()

  def call(self,input_tensor, training=False):
    x = self.conv(input_tensor)
    x = self.bn(x, training=training)
    x = tf.nn.relu(x)
    return x

class ResBlock(layers.Layer):
  def __init__(self,channels):
    super(ResBlock,self).__init__()
    self.cnn1 = CNNBlock(channels[0])
    self.cnn2 = CNNBlock(channels[1])
    self.cnn3 = CNNBlock(channels[2])
    self.pooling = layers.MaxPooling2D()
    self.identity_mapping = layers.Conv2D(channels[1],1, padding='same')

  def call(self,input_tensor,training=False):
    x = self.cnn1(input_tensor,training=training)
    x = self.cnn2(x,training=training)
    x = self.cnn3(
        x + self.identity_mapping(input_tensor), training=training
    )
    return self.pooling(x)


class ResNet_Like(keras.Model):
  def __init__(self,num_classes=10):
    super(ResNet_Like,self).__init__()
    self.block1 = ResBlock([32,32,64])
    self.block2 = ResBlock([128,128,265])
    self.block3 = ResBlock([128,265,512])
    self.pool = layers.GlobalAveragePooling2D()
    self.classifier = layers.Dense(num_classes)

  def call(self,input_tensor,training=False):
    x = self.block1(input_tensor,training=training)
    x = self.block2(x,training=training)
    x = self.block3(x,training=training)
    x = self.pool(x)
    return self.classifier(x)

  def model(self):
    x = keras.Input(shape=(28,28,1))
    return keras.Model(inputs=[x], outputs=self.call(x))

model = ResNet_Like(num_classes=10)

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"],
)


In [None]:
model.fit(x_train,y_train,batch_size=64,epochs=1)
print(model.model().summary())
model.evaluate(x_test,y_test,batch_size=64)

In [None]:
(x_train, y_train), (x_test,y_test) = mnist.load_data()
x_train = x_train.reshape(-1,28*28).astype("float32") / 255.0
x_test = x_test.reshape(-1,28*28).astype("float32") / 255.0

class Dense(layers.Layer):
  def __init__(self,units):
    super(Dense,self).__init__()
    self.units = units
   

  def build(self,input_shape):
    self.w = self.add_weight(
        name = 'w',
        shape=(input_shape[-1],self.units),
        initializer='random_normal',
        trainable=True,
    )
    self.b = self.add_weight(
        name='b', shape=(self.units,), initializer = 'zeros', trainable=True,
    )

  def call(self,inputs):
    return tf.matmul(inputs,self.w) + self.b




class MyRelu(layers.Layer):
  def __init__(self):
    super(MyRelu,self).__init__()

  def call(self,x):
    return tf.math.maximum(x,0)


class MyModel(keras.Model):
  def __init__(self,num_classes=10):
    super(MyModel,self).__init__()
    self.dense1 = Dense(64)
    self.dense2 = Dense(num_classes)
    #self.dense1 = layers.Dense(64)
    #self.dense2 = layers.Dense(num_classes)
    self.relu = MyRelu()

  def call(self,input_tensor):
    x = self.relu(self.dense1(input_tensor))
    return self.dense2(x)

model = MyModel()

model.compile(
    optimizer=keras.optimizers.Adam(),
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"],
)
model.fit(x_train,y_train,batch_size=32,epochs=1)



<tensorflow.python.keras.callbacks.History at 0x7fa7a4dad438>

## Understanding Object Orientation

In [None]:
# with class statement define what you are building
# I know thus far about Layers.layer & keras.Model
class CustLay(Layers.layer):
  # start with defining whats init
  def __init__(self,units):
    # this super commant is a little confusing, very object-y
    super(CustLay,self).__init__()

    # this layer has a fully connected dense layer & a batch norm layer
    self.dense = layers.Dense(units)
    self.bn = layers.BatchNormalization()

  # next define the call
  # the call uses the self attributes defined above & applies them to the passed through x
  def call(self, x):
    x = self.dense(x)
    x = self.bn(x)
    return x


In [None]:
class Flex(layers.Layer):
  def __init__(self,out_features,**kwargs):
    super(Flex,self).__init__(**kwargs)
    self.out_features = out_features

  # this build step allows for flexible input
  def build(self,input_shape):
    self.w = tf.Variable(
        tf.random.normal([input_shape[-1],self.out_features]), name='w')
    self.b = tf.Variable(tf.zeros([self.out_features]), name = 'b')

  def call(self,inputs):
    # matrix multiplication of random variable + zeros
    return tf.matmul(inputs,self.w) + self.b

flex_dense = Flex(out_features=3)

flex_dense.variables

# Call it, with predictably random results
print("Model results:", flex_dense(tf.constant([[2.0, 2.0, 2.0], [3.0, 3.0, 3.0]])))
flex_dense.variables

Model results: tf.Tensor(
[[-5.2164702  2.7636783  2.7398803]
 [-7.8247046  4.1455173  4.1098204]], shape=(2, 3), dtype=float32)


[<tf.Variable 'flex_12/w:0' shape=(3, 3) dtype=float32, numpy=
 array([[-1.41189   ,  0.39761004,  1.5689397 ],
        [-0.8508605 , -0.85104465,  0.0049968 ],
        [-0.34548447,  1.8352737 , -0.20399626]], dtype=float32)>,
 <tf.Variable 'flex_12/b:0' shape=(3,) dtype=float32, numpy=array([0., 0., 0.], dtype=float32)>]

In [None]:
try:
  print("Model results:", flex_dense(tf.constant([[2.0, 2.0, 2.0, 2.0]])))
except tf.errors.InvalidArgumentError as e:
  print("Failed:", e)

Failed: Matrix size-incompatible: In[0]: [1,4], In[1]: [3,3] [Op:MatMul]


In [None]:
class MySequentialModel(tf.keras.Model):
  def __init__(self, name=None, **kwargs):
    super().__init__(**kwargs)

    self.dense_1 = flex_dense
    self.dense_2 = Flex(out_features=2)
  def call(self, x):
    x = self.dense_1(x)
    return self.dense_2(x)

# You have made a Keras model!
my_sequential_model = MySequentialModel(name="the_model")

# Call it on a tensor, with random results
print("Model results:", my_sequential_model(tf.constant([[2.0, 2.0, 2.0]])))

Model results: tf.Tensor([[2.144764 3.754323]], shape=(1, 2), dtype=float32)


## Mixing with Functional API

In [None]:
# a look at mixing with functional API
inputs = tf.keras.Input(shape=[3,])

x = Flex(3)(inputs)
x = Flex(2)(x)

my_functional_model = tf.keras.Model(inputs=inputs,outputs=x)

my_functional_model.summary()

my_functional_model(tf.constant([[2.0, 2.0, 2.0]]))

## What is sharing & decision steps?


In [None]:
# I can build a simple model

## Basics

In [None]:
# Dense Layer
class Dense(layers.Layer):
  def __init__(self,units,input_dim):
    super(Dense,self).__init__()
    self.dense = layers.Dense(units)

  def call(self, x):
    x = self.dense(x)
    return x

# BN Layer
class Batch(layers.Layer):
  def __init__(self):
    super(Batch,self).__init__()
    self.bn = layers.BatchNormalization()
  def call(self,x):
    return self.bn(x)

# Ok so this model works
# my model has two fully connected layers
# the output dim is done in the __init__

class ModMod(keras.Model):
  def __init__(self,name=None,output_dim=10):
    super(ModMod,self).__init__()
    self.dense1 = layers.Dense(20)
    self.batch = layers.BatchNormalization()
    self.Batch = Batch()
    self.dense2 = layers.Dense(output_dim)


  # ok I got it to work
  # was breaking bc of the input/output of batch normalization
  # I did not specify the input, but then I did & it works
  def call(self,input):
    x = tf.nn.relu(self.dense1(input))
    x = self.Batch(x)
    return self.dense2(x)

In [None]:
# need to mix this with the functional API
# can you loop in the FAPI?
# if so, I think you could recreate the tabnet

# focus on sharing & decision step next
# create a transformer & see what that does/is

In [None]:
path = "https://raw.githubusercontent.com/shrikant-temburwar/Wine-Quality-Dataset/master/winequality-white.csv"
df = pd.read_csv(path,delimiter=";")

X = df.loc[:,df.columns != 'quality']
y = df['quality']

from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(X,y)

In [None]:
model = ModMod()
model.compile(optimizer="adam", loss="mse")

model.fit(x_train,y_train,epochs=10)
model.evaluate(x_test,y_test)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


0.6614018678665161