# **GANDALF: Gated Adaptive Network for Deep Automated Learning of Features**

*By Cristian Leo*

### **Import Libraries**

In [1]:
import numpy as np
import random
import pandas as pd
import json

# pytorch_tabular for GANDALF
from pytorch_tabular.tabular_model import TabularModel
from pytorch_tabular.models.gandalf import GANDALFConfig
from pytorch_tabular.config import DataConfig, OptimizerConfig, TrainerConfig
from pytorch_tabular.models.common.heads import LinearHeadConfig

# pytorch for MLP
import torch
from torch.utils.data import DataLoader, TensorDataset
from torch import nn, optim

### **GANDALF components from Scratch**

In [2]:
def softmax(x):
    """
    Compute the softmax of x.

    Parameters:
    - x: numpy array of shape (N, D), where N is the number of samples and D is the number of classes.

    Returns:
    - softmax_output: numpy array of shape (N, D), the softmax probabilities for each class.
    """
    # Ensure numerical stability by subtracting the max value from each row
    x_max = np.max(x, axis=1, keepdims=True)
    exp_x = np.exp(x - x_max)
    sum_exp_x = np.sum(exp_x, axis=1, keepdims=True)
    softmax_output = exp_x / sum_exp_x
    return softmax_output.round(5) # rounding for better readability

def t_softmax(input, t=None, dim=-1):
    """
    Compute the temperature-scaled softmax of input.

    Parameters:
    - input: numpy array of shape (N, D), where N is the number of samples and D is the number of classes.
    - t: float, temperature parameter. If None, the default value of 0.5 is used. Higher values of t result in a softer probability distribution.
    - dim: int, the dimension along which the softmax is computed.

    Returns:
    - softmax_output: numpy array of shape (N, D), the softmax probabilities for each class.
    """
    if t is None:
        t = 0.5
    assert np.all(t >= 0.0)
    maxes = np.max(input, axis=dim, keepdims=True)
    input_minus_maxes = input - maxes

    w = np.maximum(input_minus_maxes + t, 0) + 1e-8 # this is the ReLU function with a small epsilon
    e_x = np.exp(input_minus_maxes + np.log(w))
    return (e_x / np.sum(e_x, axis=dim, keepdims=True)).round(5) # rounding for better readability


x = np.array([[1, 2, 3], [4, 5, 6]])
print("Input:")
print(x)
print("\n#### Softmax ####")
print(softmax(x))
print("\n#### Temperature-scaled softmax ####")
print(t_softmax(x, t=0.5))

Input:
[[1 2 3]
 [4 5 6]]

#### Softmax ####
[[0.09003 0.24473 0.66524]
 [0.09003 0.24473 0.66524]]

#### Temperature-scaled softmax ####
[[0. 0. 1.]
 [0. 0. 1.]]


In [3]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def tanh(x):
    return np.tanh(x)

def dropout(x, rate=0.0):
    """
    Apply dropout to the input array x.

    Parameters:
    - x: numpy array.
    - rate: float, dropout rate. Probability of setting a value to zero.

    Returns:
    - x: numpy array, output tensor after applying dropout
    """
    if rate > 0.0:
        keep_prob = 1 - rate
        mask = np.random.rand(*x.shape) < keep_prob
        return np.where(mask, x / keep_prob, 0)
    return x

class GatedFeatureLearningUnit:
    def __init__(self, n_features_in, n_stages, feature_sparsity=0.3, dropout=0.0):
        self.n_features_in = n_features_in
        self.n_stages = n_stages
        self.feature_sparsity = feature_sparsity
        self.dropout_rate = dropout
        self._build_network()

    def _create_feature_mask(self):
        feature_masks = np.concatenate([
            np.random.beta(a=random.uniform(0.5, 10.0), b=random.uniform(0.5, 10.0), size=(self.n_features_in,))
            for _ in range(self.n_stages)
        ]).reshape(self.n_stages, self.n_features_in)
        return feature_masks

    def _build_network(self):
        self.W_in = [np.random.randn(2 * self.n_features_in, 2 * self.n_features_in) for _ in range(self.n_stages)]
        self.b_in = [np.random.randn(2 * self.n_features_in) for _ in range(self.n_stages)]
        self.W_out = [np.random.randn(2 * self.n_features_in, self.n_features_in) for _ in range(self.n_stages)]
        self.b_out = [np.random.randn(self.n_features_in) for _ in range(self.n_stages)]
        self.feature_masks = self._create_feature_mask()

    def forward(self, x):
        h = x
        for d in range(self.n_stages):
            feature = t_softmax(self.feature_masks[d], t=self.feature_sparsity) * x

            # Gated feature learning unit
            h_in = np.dot(np.concatenate([feature, h], axis=-1), self.W_in[d]) + self.b_in[d]
            z = sigmoid(h_in[:, :self.n_features_in])
            r = sigmoid(h_in[:, self.n_features_in:])
            h_out = tanh(np.dot(np.concatenate([r * h, x], axis=-1), self.W_out[d]) + self.b_out[d])
            h = dropout((1 - z) * h + z * h_out, self.dropout_rate)
        return h
    
    def __call__(self, x):
        return self.forward(x)
    
    
glfu = GatedFeatureLearningUnit(n_features_in=4, n_stages=2, feature_sparsity=0.3, dropout=0.0)
x = np.array([[1, 2, 3, 4], [5, 6, 7, 8]])
print("Input:")
print(x)
print("\nOutput:")
print(glfu(x))


Input:
[[1 2 3 4]
 [5 6 7 8]]

Output:
[[ 0.98712015  0.63038223  0.0652355   0.70431647]
 [ 4.34546121  0.06498121 -0.18498697 -0.98712291]]


In [4]:
class RSoftmax:
    """
    RSoftmax activation function.

    Parameters:
    - dim: int, the dimension along which the softmax is computed.
    - eps: float, small value to avoid division by zero.
    """
    def __init__(self, dim: int = -1, eps: float = 1e-8):
        self.dim = dim
        self.eps = eps

    @staticmethod
    def softmax(x, t=1.0, axis=-1):
        """Compute softmax values for each sets of scores in x."""
        e_x = np.exp((x - np.max(x, axis=axis, keepdims=True)) / t)
        return e_x / e_x.sum(axis=axis, keepdims=True)

    @classmethod
    def calculate_t(cls, input, r, dim: int = -1, eps: float = 1e-8):
        """
        Calculate the temperature parameter t for the RSoftmax function.

        Parameters:
        - input: numpy array of shape (N, D), where N is the number of samples and D is the number of classes.
        - r: float, the fraction of zeros in the output.
        - dim: int, the dimension along which the softmax is computed.
        - eps: float, small value to avoid division by zero.

        Returns:
        - t: numpy array of shape (N, 1), the temperature parameter for each sample.
        """
        assert np.all((0.0 <= r) & (r <= 1.0))

        maxes = np.max(input, axis=dim, keepdims=True)
        input_minus_maxes = input - maxes

        zeros_mask = np.exp(input_minus_maxes) == 0.0
        zeros_frac = zeros_mask.sum(axis=dim, keepdims=True).astype(float) / input.shape[dim]

        q = np.clip((r - zeros_frac) / (1 - zeros_frac), 0.0, 1.0)
        x_minus_maxes = input_minus_maxes * (~zeros_mask).astype(float)
        if q.ndim > 1:
            t = -np.quantile(x_minus_maxes, q.ravel(), axis=dim, keepdims=True) + eps
            t = np.squeeze(t, axis=dim).diagonal().reshape(-1, 1) + eps
        else:
            t = -np.quantile(x_minus_maxes, q, axis=dim) + eps
        return t

    def forward(self, input, r):
        t = self.calculate_t(input, r, self.dim, self.eps)
        return self.softmax(input, t, self.dim)
    
r_softmax = RSoftmax()
x = np.array([[1, 2, 3], [4, 5, 6]])
print("Input:")
print(x)
print("\n#### RSoftmax ####")
print(r_softmax.forward(x, r=0.1))

Input:
[[1 2 3]
 [4 5 6]]

#### RSoftmax ####
[[0.1729912  0.30150792 0.52550087]
 [0.1729912  0.30150792 0.52550087]]


### **Application of GANDALF using pytorch_tabular**

In [5]:
task_type = {
    'features': 'num', 
    'target': 'num'
}

# Select which TASK. Can be either a single TASK or a list of TASK
SEED = 42
BATCH_SIZE = 1024
MAX_EPOCHS = 100
EARLY_STOPPING = True
OPTIMIZER = "AdamW"
WEIGHT_DECAY = 1e-6
LR_SCHEDULER = "CosineAnnealingWarmRestarts"
LR_SCHEDULER_PARAMS = {"T_0": 10, "T_mult": 1, "eta_min": 1e-5}
LEARNING_RATE = 1e-3

MODEL_PARAMS = {
    "gflu_stages":10
}

In [6]:
var_type = task_type['features']
target_type = task_type['target']

X = pd.read_parquet('data/data_train.parquet')
y = pd.read_parquet(f'data/target_train.parquet')

split_ratio = 0.8
split = int(len(X)*split_ratio)

X_test = X.iloc[split:]
y_test = y.iloc[split:]

X = X.iloc[:split]
y = y.iloc[:split]

names = json.load(open('data/attribute_names.json'))
cat_indicator = json.load(open('data/categorical_indicator.json'))

# Find categorical and numerical features
cat_cols = [n for n, c in zip(names, cat_indicator) if c]
num_cols = list(set(names)-set(cat_cols))

# Assigning classification or regression as task based on target type
task_pt = "classification" #if target_type!="num" else "regression"

# PyTorch Tabular expects s single dataframe as input
X['target'] = y.values

# Cat Cols as Categorical dtype messes with the categorical encoding
X[cat_cols] = X[cat_cols].astype(str)
X_test[cat_cols] = X_test[cat_cols].astype(str)


In [7]:
# Define Configs | Check API for other options
data_config = DataConfig(
    target=[
        "target"
    ],
    continuous_cols=num_cols,
    categorical_cols=cat_cols,
    num_workers=3
)
trainer_config = TrainerConfig(
    batch_size=BATCH_SIZE,
    max_epochs=MAX_EPOCHS,
    early_stopping="valid_loss" if EARLY_STOPPING else None,  # Monitor valid_loss for early stopping
    early_stopping_mode="min",  # Set the mode as min because for val_loss, lower is better
    early_stopping_patience=5,  # No. of epochs of degradation training will wait before terminating
    checkpoints="valid_loss",  # Save best checkpoint monitoring val_loss
    load_best=True,  # After training, load the best checkpoint
    progress_bar="none",  # Turning off Progress bar
    trainer_kwargs=dict(enable_model_summary=False),  # Turning off model summary
#             fast_dev_run=True
)
optimizer_config = OptimizerConfig(
    optimizer=OPTIMIZER,
    optimizer_params={"weight_decay": WEIGHT_DECAY},
    lr_scheduler=LR_SCHEDULER,
    lr_scheduler_params=LR_SCHEDULER_PARAMS,
)
head_config = LinearHeadConfig(
    layers="",
    dropout=0.2,
    initialization=(  # No additional layer in head, just a mapping layer to output_dim
        "kaiming"
    ),
).__dict__  # Convert to dict to pass to the model config (OmegaConf doesn't accept objects)
model_config = GANDALFConfig(
    task=task_pt,
    learning_rate=LEARNING_RATE,
    head="LinearHead",  # Linear Head
    head_config=head_config,  # Linear Head Config
    **MODEL_PARAMS
)
# Initialize the Tabular Model
tabular_model = TabularModel(
    data_config=data_config,
    model_config=model_config,
    optimizer_config=optimizer_config,
    trainer_config=trainer_config,
    verbose=True,
    suppress_lightning_logger=True
)
# If you have separate VAL defined, you can pass in that. If not, PyTorch Tabular will automatically take a sample out of Training to use as Validation
tabular_model.fit(
    train=X,
    seed=SEED
)


/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pytorch_lightning/callbacks/model_checkpoint.py:639: Checkpoint directory /Users/cristianleo/Documents/Documents - Cristian’s Laptop/GitHub/models-from-scratch-python/GANDALF/saved_models exists and is not empty.
/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:436: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.
/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:436: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.
/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pytorch_lightning/loops/fit_loop.py:293: The number of training batches (15) is smaller than the logging interval Train

In [None]:
# Predict on Test Data
prediction = tabular_model.predict(X_test, progress_bar=None)
preds = prediction.iloc[:,1].values

preds = pd.DataFrame(preds)
preds.index = X_test.index
acc = (preds.values.round() == y_test.values).mean()
print(f"Accuracy: {acc:.2%}")

Accuracy: 0.77


### **Application of MLP using torch**

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_layers, dropout):
        super(MLP, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.n_layers = n_layers
        self.dropout = dropout
        self._build_network()
        
    def _build_network(self):
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(self.input_dim, self.hidden_dim))
        self.layers.append(nn.ReLU())
        for _ in range(self.n_layers-1):
            self.layers.append(nn.Linear(self.hidden_dim, self.hidden_dim))
            self.layers.append(nn.ReLU())
            self.layers.append(nn.Dropout(self.dropout))
        self.layers.append(nn.Linear(self.hidden_dim, self.output_dim))
        
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

split = int(len(X)*0.8)
X = X.drop(columns='target')
X_train, X_val = X[:split].values.astype(np.float64), X[split:].values.astype(np.float64)
y_train, y_val = y[:split].values.astype(np.float64), y[split:].values.astype(np.float64)

# Assuming X_train, y_train, X_val, y_val are your datasets
train_dataset = TensorDataset(torch.from_numpy(X_train).float(), torch.from_numpy(y_train).float())
val_dataset = TensorDataset(torch.from_numpy(X_val).float(), torch.from_numpy(y_val).float())

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Initialize the model
mlp = MLP(input_dim=X_train.shape[1], hidden_dim=8, output_dim=1, n_layers=2, dropout=0.1)

# Loss function and optimizer
criterion = nn.MSELoss()  # For regression tasks
optimizer = optim.Adam(mlp.parameters(), lr=0.001)

# Training loop
n_epochs = 100

for epoch in range(n_epochs):
    mlp.train()
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = mlp(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
    
    # Validation loop
    mlp.eval()
    val_loss = 0
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = mlp(inputs)
            val_loss += criterion(outputs, targets).item()
    
    val_loss /= len(val_loader)
    print(f'Epoch {epoch+1}, Validation Loss: {val_loss:.4f}')

Epoch 1, Validation Loss: 0.2374
Epoch 2, Validation Loss: 0.2228
Epoch 3, Validation Loss: 0.2157
Epoch 4, Validation Loss: 0.2105
Epoch 5, Validation Loss: 0.2065
Epoch 6, Validation Loss: 0.2020
Epoch 7, Validation Loss: 0.1993
Epoch 8, Validation Loss: 0.1946
Epoch 9, Validation Loss: 0.1916
Epoch 10, Validation Loss: 0.1887
Epoch 11, Validation Loss: 0.1851
Epoch 12, Validation Loss: 0.1834
Epoch 13, Validation Loss: 0.1824
Epoch 14, Validation Loss: 0.1786
Epoch 15, Validation Loss: 0.1813
Epoch 16, Validation Loss: 0.1758
Epoch 17, Validation Loss: 0.1731
Epoch 18, Validation Loss: 0.1721
Epoch 19, Validation Loss: 0.1728
Epoch 20, Validation Loss: 0.1705
Epoch 21, Validation Loss: 0.1695
Epoch 22, Validation Loss: 0.1679
Epoch 23, Validation Loss: 0.1705
Epoch 24, Validation Loss: 0.1656
Epoch 25, Validation Loss: 0.1751
Epoch 26, Validation Loss: 0.1694
Epoch 27, Validation Loss: 0.1631
Epoch 28, Validation Loss: 0.1628
Epoch 29, Validation Loss: 0.1635
Epoch 30, Validation Lo

In [None]:
preds = mlp(torch.from_numpy(X_test.values.astype(np.float64)).float())

acc = ((preds.detach().numpy().round() == y_test.values).mean())
print(f"Accuracy: {acc:.2%}")

Accuracy: 76.52%
