# Overview
This notebook contains the source code corresponding to the paper for tanh-normalization.
The example dataset here is a synthetic data set; but the model architecture can supports any other dataset.
Set synthetic=False to use a real world dataset (specific information and license at: https://www.kaggle.com/datasets/alexteboul/diabetes-health-indicators-dataset)

## Imports

In [1]:
import copy
import gc
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from scipy.optimize import minimize_scalar
from scipy.stats import norm
from scipy.stats import wasserstein_distance
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from torch import optim
from tqdm.notebook import tqdm
from ucimlrepo import fetch_ucirepo

## Hyper-parameters

In [None]:
CONFIG = {"dataset_name": "synthetic",
          "model_name": "FeedForward",
          "device": torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu'),
          "lr": 0.001,
          "weight_decay": 1e-4,
          "batch_size": 256,
          "pin_memory": True,
          "epochs": 150,
          "n_times": 10,
          "verbose_rate": 5,
          "n_workers": 8,
          "precomputed": False,
          "n_classes": 10
          }
print(CONFIG['device'], 'Device count:', torch.cuda.device_count())

## Dataset
This code defines a custom (synthetic) dataset class and a function to generate training and test data.
The CustomDataset class takes in data, labels, means and std as inputs.
The data is first normalized using the given means and standard deviation, and stored as a tensor.
The labels are stored as-is.
The len method returns the length of the dataset.
The getitem method returns the data and label at the given index.
The get_dataset function generates a synthetic classification dataset using scikit-learn's make_classification function.
It splits the dataset into training and test sets, and calculates the means and standard deviation of the training data.
The function returns the training data, test data, training labels, test labels, and the means and standard deviation of the training data

In [None]:
# Define a custom dataset class
class CustomDataset(torch.utils.data.Dataset):

    # Initialize the dataset with data, labels, means, and standard deviations
    def __init__(self, data, labels, means, std):
        # Store the data normalized by mean and standard deviation
        self.data = torch.tensor((data - means) / std).float()
        # Store the labels
        self.labels = labels

    # Define the length of the dataset
    def __len__(self):
        return len(self.data)

    # Define how to get an item from the dataset
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


# Define a function to create the dataset and split it into training and testing sets
def get_dataset(synthetic=True):
    if synthetic:
        # Generate synthetic data with the specified number of classes, informative features, and total features
        X, y = make_classification(n_samples=10000, n_classes=CONFIG["n_classes"], n_informative=10, n_features=12)
    else:
        # fetch dataset 
        cdc_diabetes_health_indicators = fetch_ucirepo(id=891)
        # data (as pandas dataframes) 
        X = cdc_diabetes_health_indicators.data.features
        y = cdc_diabetes_health_indicators.data.targets.astype(np.int32)

    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    # Calculate the mean and standard deviation of the training data
    mean, std = X_train.mean(), X_train.std()
    # Return the training and testing data along with their labels and mean and standard deviation
    return X_train, X_test, y_train, y_test, mean, std

## Optimal Tanh-Normalization
This cell defines a function called calculate_optimal_values(), which calculates the optimal alpha values for a given dataset.
First, the training data is converted into a NumPy array. Then, a Gaussian distribution is created with a specified quantile value. This Gaussian distribution is used to optimize the alpha values for each feature in the dataset. The func(x, idx) function takes in an alpha value and an index for a feature and returns the Wasserstein distance between the distribution of the hyperbolic tangent of the alpha multiplied by that feature and the Gaussian distribution created earlier. The minimize_scalar function from scipy is used to find the optimal alpha value for each feature. The bounds parameter sets the lower and upper bounds for the optimization, and the args parameter passes the index of the feature being optimized to the func() function. The method parameter specifies the optimization method. The alphas list is filled with the optimal alpha value for each feature. These values are then converted to a PyTorch tensor and printed. Finally, the alphas tensor is returned from the function.

In [None]:
# Define a function to calculate optimal alpha values
def calculate_optimal_values():
    # Convert training data to numpy array
    data_std = train_set.data.numpy()

    # Define a quantile value and create a Gaussian distribution
    q = 0.001
    x = np.linspace(norm.ppf(q), norm.ppf(1 - q), int(1 / q))
    y = norm.pdf(x) * len(x)
    z = torch.tensor([item for sublist in [[i] * int(j) for i, j in zip(x, y)] for item in sublist])
    gaussian = 2 * (z - z.min()) / (z.max() - z.min()) - 1

    # Define a function to optimize alpha values
    def func(x, idx):
        # Apply hyperbolic tangent to data
        t = np.tanh(x * data_std[:, idx])
        # Calculate Wasserstein distance between distribution of t and Gaussian distribution
        wd = wasserstein_distance(t, gaussian)
        return wd

    # Optimize alpha values for each feature in the data
    alphas = []
    for i in range(data_std.shape[1]):
        # Use scipy's minimize_scalar function to find the optimal alpha value for each feature
        alphas.append(minimize_scalar(func, bounds=(0, 1), method='bounded', args=i)["x"])
        # Print a progress indicator for each feature
        print(">", end="")
    print(".")
    # Convert alpha values to a tensor and print them
    alphas = torch.tensor(alphas)
    print("Alphas:", alphas)

    # Return alpha values
    return alphas

# Normalization & training set-up

In [None]:
X_train, X_test, y_train, y_test, mean, std = get_dataset()
train_set = CustomDataset(X_train, y_train, mean, std)
test_set = CustomDataset(X_test, y_test, mean, std)

In [None]:
alphas = calculate_optimal_values()

In [None]:
pd.DataFrame(train_set.data.numpy()).plot.kde()
plt.title("Standardized")
pd.DataFrame(np.tanh(train_set.data.numpy() * 0.01)).plot.kde()
plt.title("Tanh alpha=0.01")
pd.DataFrame(np.tanh(train_set.data.numpy() * alphas.numpy())).plot.kde()
plt.title("Tanh alpha=optimal")

## Neural Model

In [None]:
# Define a neural network model
class MainModel(nn.Module):
    def __init__(self, in_features, n_classes, layer_size, layer_count, activation_function):
        super().__init__()

        # Define a helper function to return a set of layers to be repeated
        def _get_items():
            return nn.Linear(layer_size, layer_size), nn.LayerNorm(layer_size), activation_function()

        # Define the input layer and output layer, and create a sequence of layers with layer_count repetitions
        self.in_layer = nn.Sequential(nn.Linear(in_features, layer_size), activation_function())
        self.seq = nn.Sequential(*[x for _ in range(layer_count) for x in _get_items()])
        self.out_layer = nn.Linear(layer_size, n_classes)

    # Define the forward pass of the neural network
    def forward(self, x):
        return self.out_layer(self.seq(self.in_layer(x)))


# Define a variable hyperbolic tangent activation function with variable alpha values
class VariableTanh(nn.Module):
    def __init__(self, alphas, random_init=False, fixed=True):
        super().__init__()

        # Initialize alpha values with random values if specified
        if random_init:
            alphas = torch.rand_like(alphas)

        # Calculate the inverse sigmoid of alpha values and reshape to be a row vector
        inv_sig = torch.log(alphas / (1 - alphas))
        self.a = inv_sig.reshape(1, -1).to(CONFIG['device']).float()

        # Convert alpha values to trainable parameters if not fixed
        if not fixed:
            self.a = nn.Parameter(self.a)

        # Initialize alpha history
        self.alpha_history = []

    # Calculate alpha values
    @property
    def alphas(self):
        return torch.sigmoid(self.a)

    # Record alpha values for each epoch
    def record_alpha(self, epoch):
        self.alpha_history.append([epoch] + self.alphas.detach().flatten().tolist())

    # Define the forward pass of the activation function
    def forward(self, x):
        return torch.tanh(self.alphas * x)

## Training logic

In [None]:
# Define a function for running analysis, which takes in three parameters: path, instances, and alphas.
def run_analysis(path, instances, alphas):
    # Get the number of classes from the global variable CONFIG dictionary.
    n_classes = CONFIG['n_classes']
    # Print the number of classes to the console.
    print(f"[num_classes={n_classes}]")
    # Loop over the number of times specified in the global variable CONFIG dictionary.
    for run_idx in tqdm(range(CONFIG['n_times'])):
        # Calculate the number of features by flattening the array of alphas and getting its length.
        in_features = len(alphas.flatten())
        # Create a new instance of the MainModel class with the specified configuration settings.
        original_model = MainModel(in_features=in_features, n_classes=n_classes, layer_size=64, layer_count=4,
                                   activation_function=nn.SiLU)
        # Check if this is the first run. If it is, include the header when writing to the output CSV file.
        include_header = run_idx == 0
        # Loop over the instances (datasets) in reverse order.
        for name, train_loader, test_loader, transform in reversed(instances):
            # Create dictionaries to store the training and test loss values for this instance.
            train_loss_dict, test_loss_dict = {}, {}
            # We want the same weight initialization on all comparisons, so make a deep copy of the original model.
            model = copy.deepcopy(original_model).to(CONFIG["device"])
            # Create a new instance of the VariableTanh class with the specified transformation settings.
            var_tanh = VariableTanh(**transform)
            # Combine the VariableTanh layer with the model using a Sequential container.
            model = nn.Sequential(var_tanh, model)
            # Create dictionaries to store the training and test loss values for this instance.
            train_loss_dict[name] = pd.DataFrame()
            test_loss_dict[name] = pd.DataFrame()
            # Create a new instance of the AdamW optimizer with the specified learning rate and weight decay.
            optimizer = optim.AdamW(model.parameters(), lr=CONFIG['lr'], weight_decay=CONFIG['weight_decay'])
            # Create a new instance of the CosineAnnealingLR learning rate scheduler with the specified maximum number of epochs.
            scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG['epochs'])
            # Create a new instance of the CrossEntropyLoss loss function.
            criterion = nn.CrossEntropyLoss()
            # Print the type of instance (dataset) to the console.
            print("=" * 40)
            print('Type: ' + name)
            # Initialize lists to store the training and test loss values, learning rates, and best scores seen so far.
            test_loss = []
            train_loss = []
            lrs = []
            best_train, best_test = 0, 0
            # Loop over the specified number of epochs.
            for epoch in tqdm(range(CONFIG['epochs'])):
                # Record the alpha values at each epoch, if the model has a 'record_alpha' attribute.
                if hasattr(model, 'record_alpha'):
                    model.record_alpha(epoch)
                # Get the current learning rate from the scheduler.
                _lr = scheduler.get_last_lr()[0]
                lrs.append(_lr)
                # Record the starting time of this epoch.
                start = time.time()
                # Initialize a variable to store the running loss.
                running_loss = 0.0
                # Initialize a variable to keep track of the batch index.
                k = -1
                for i, _data in enumerate(train_loader):
                    k += 1  # Increment the batch index counter.
                    inputs, labels = _data
                    inputs = inputs.to(CONFIG['device'])
                    labels = labels.to(CONFIG['device'], non_blocking=True)
                    # Learn
                    optimizer.zero_grad()  # Reset gradients to zero.
                    outputs = model.forward(inputs)  # Perform forward pass.
                    loss = criterion(outputs, labels)  # Calculate loss.
                    loss.backward()  # Perform backward pass and compute gradients.
                    optimizer.step()  # Update weights using calculated gradients.
                    # Store statistics.
                    running_loss += loss.item()  # Add the batch loss to the running loss.
                scheduler.step()  # Update the learning rate scheduler.
                # Get scores
                if epoch % CONFIG['verbose_rate'] == 0 or epoch == CONFIG['epochs'] - 1:
                    with torch.no_grad():
                        model.eval()  # Set the model to evaluation mode.
                        for stuff in [(train_loss, train_loader), (test_loss, test_loader)]:
                            record, loader = stuff
                            labels_all = []
                            predicted_all = []
                            for _data in loader:
                                inputs, labels = _data
                                inputs = inputs.to(CONFIG['device'])
                                labels = labels.to(CONFIG['device'], non_blocking=True)
                                if name == "Trainable WD-Tanh":
                                    # Apply tanh activation function (if required).
                                    inputs = model.apply_tanh(inputs)
                                # Perform forward pass.
                                outputs = model.forward(inputs)
                                # Get the predicted classes.
                                _, predicted = torch.max(outputs.detach(), 1)
                                # Append the true classes to the labels list.
                                labels_all.extend(labels.squeeze().tolist())
                                # Append the predicted classes to the predicted list.
                                predicted_all.extend(predicted.squeeze().tolist())
                                # Calculate the accuracy score.
                            score = accuracy_score(labels_all, predicted_all)
                            # Append the score to the corresponding record.
                            record.append(score)
                        # Set the model back to training mode.
                        model.train()
                    # Calculate the time taken for the epoch.
                    minutes, seconds = divmod(time.time() - start, 60)
                    # Update the best training loss so far.
                    best_train = max(best_train, train_loss[-1])
                    # Update the best testing loss so far.
                    best_test = max(best_test, test_loss[-1])
                    # Print the statistics for the epoch.
                    print(('[%d | %d] Lr: %.3f \tLoss: %.3f \tTrain score: %.2f |\tTest score: %.2f \t(%d min. %d s.)' %
                           (run_idx, epoch, _lr, running_loss, train_loss[-1], test_loss[-1], minutes, seconds)))
            # Print final alpha value for the Tanh function if its name matches.
            if name == "Tanh [trainable]":
                print(f"Final alpha={var_tanh.alphas.flatten()}")

            # Clear GPU memory cache if GPU is used.
            if CONFIG['device'] != "cpu":
                with torch.cuda.device(CONFIG['device']):
                    torch.cuda.empty_cache()

            # Print best train and test losses for the current method.
            print(f"{name}:\tBest: train_loss={round(best_train, 3)} & test_loss={round(best_test, 3)}")

            # Assuming train_loss_dict[name] and test_loss_dict[name] are DataFrame objects
            train_loss_dict[name] = pd.concat([train_loss_dict[name], pd.DataFrame(list(enumerate(train_loss)))])
            test_loss_dict[name] = pd.concat([test_loss_dict[name], pd.DataFrame(list(enumerate(test_loss)))])

            # Update the columns of the train and test loss dataframes with 'Epoch' and 'Score' and add 'Type' and 'Method' columns.
            for t, d in [('Test', test_loss_dict), ('Train', train_loss_dict)]:
                d[name].columns = ["Epoch", "Score"]
                d[name]["Type"] = t
                d[name]["Method"] = name
                d[name]["Epoch"] *= CONFIG['verbose_rate']

            # Concatenate the train and test loss dataframes and write to a CSV file.
            d = pd.concat(list(test_loss_dict.values()) + list(train_loss_dict.values()))
            d.to_csv(path, index=False, mode='w' if include_header else 'a', header=include_header)

            # If the model has an 'alpha_history' attribute, write its value to a CSV file.
            if hasattr(model, 'alpha_history'):
                pd.DataFrame(model.alpha_history).to_csv("alpha_history_" + path, index=False,
                                                         mode='w' if include_header else 'a', header=include_header)

            # Set include_header flag to False after the first iteration of writing to the CSV file.
            include_header = False

            # Free up memory by deleting the optimizer and model objects and running the garbage collector.
            del optimizer
            del model
            gc.collect()

            # Print a message indicating that the information has been stored to the CSV file.
            print("Stored to csv.!")

    # Print a message indicating that the training process has finished.
    print('Finished!')
    # Read in the CSV file specified by 'path' and store it in '_results_csv'.
    _results_csv = pd.read_csv(path)
    # Return the '_results_csv' and 'lrs' variables.
    return _results_csv, lrs

## Code execution

In [None]:
def execute(dataset_name):
    # Define path to save results
    path = f"{dataset_name}_{CONFIG['model_name']}_dataset.csv"
    # print(path)

    # Define transformations for activation function
    transform_train = {'alphas': alphas, 'fixed': False}
    transform_tanh_1 = {'alphas': torch.tensor(0.9999), 'fixed': True}
    transform_tanh_01 = {'alphas': torch.tensor(0.1), 'fixed': True}
    transform_tanh_001 = {'alphas': torch.tensor(0.01), 'fixed': True}
    transform_tanh_opt = {'alphas': alphas, 'fixed': True}

    # Create a list of all transformations with names
    all_transforms = [("Tanh [optimal]", transform_tanh_opt), ("Tanh [trainable]", transform_train),
                      ("Tanh [0.01]", transform_tanh_001), ("Tanh [0.1]", transform_tanh_01),
                      ("Tanh [1.0]", transform_tanh_1)]

    # Create a list to store dataset loaders for each transformation
    instances = []

    # Loop over all transformations and create a dataset loader for each one
    for name, transform in all_transforms:
        train_set = CustomDataset(X_train, y_train, mean, std)
        test_set = CustomDataset(X_test, y_test, mean, std)
        train_dl_stdz = torch.utils.data.DataLoader(train_set, batch_size=CONFIG['batch_size'], shuffle=True,
                                                    pin_memory=CONFIG['pin_memory'], num_workers=CONFIG['n_workers'])
        test_dl_stdz = torch.utils.data.DataLoader(test_set, batch_size=CONFIG['batch_size'], shuffle=True,
                                                   pin_memory=CONFIG['pin_memory'], num_workers=CONFIG['n_workers'])
        instances.append((name, train_dl_stdz, test_dl_stdz, transform))

    # Run training.
    _ = run_analysis(path, instances, alphas)
    print(f"done.")
    # Load the results from CSV and plot the scores for each epoch and transformation
    results_csv = pd.read_csv(path)
    sns.lineplot(data=results_csv, x="Epoch", y="Score", hue="Method", style="Type", markers=True)
    print(f"done.")


In [None]:
# Execute, run code and print info.
execute(CONFIG['dataset_name'])

In [None]:
def print_alpha_history(path, title=""):
    # Read CSV file into a pandas dataframe.
    df = pd.read_csv(path)
    # Convert the dataframe to long form using the `melt()` function.
    dfm = df.melt(id_vars=['Epoch', 'Type', 'Method'], value_vars=['Score'])
    # Create a line plot using seaborn, with the x-axis using the first column of the dataframe, the y-axis
    # using the "value" column created by the `melt()` function, and the lines colored based on the "Method" column.
    g = sns.lineplot(data=dfm, x=df.columns[0], y="value", hue='Method', style="Type", markers=False)
    # Set the title of the plot.
    g.set_title(title)
    # Plot the figure.
    g.plot()

In [None]:
path = f"synthetic_FeedForward_dataset.csv"
print_alpha_history(path, "Accuracy: Synthetic data set")