# Probabilistic Bayesian Neural Networks
https://keras.io/examples/keras_recipes/bayesian_neural_networks/

## Setup

### Ambiente
Creamos un ambiente con la paquetería necesaria

``conda create -n env_tf_bayes``

``conda activate env_tf_bayes``

``pip install tensorflow-probability``

``pip install tensorflow-datasets``

### Bibliotecas

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_datasets as tfds
import tensorflow_probability as tfp
import pandas as pd






## Create training and evaluation datasets

In [2]:
############# Código original
# def get_train_and_test_splits(train_size, batch_size=1):
#     # We prefetch with a buffer the same size as the dataset because th dataset
#     # is very small and fits into memory.
#     dataset = (
#         tfds.load(name="wine_quality", as_supervised=True, split="train")
#         .map(lambda x, y: (x, tf.cast(y, tf.float32)))
#         .prefetch(buffer_size=dataset_size)
#         .cache()
#     )
#     # We shuffle with a buffer the same size as the dataset.
#     train_dataset = (
#         dataset.take(train_size).shuffle(buffer_size=train_size).batch(batch_size)
#     )
#     test_dataset = dataset.skip(train_size).batch(batch_size)

#     return train_dataset, test_dataset

In [3]:
# def get_train_and_test_splits(train_size, batch_size=1):
#     # Importar datos
#     data = pd.read_csv("./../data/train_wine.csv")

#     # VARIABLE OBJETIVO Y VARIABLES INDEPENDIENTES
#     features = data.drop(columns=["quality", "id"])
#     labels = data["quality"]

#     # Convertir a tensores de TensorFlow
#     features_dict = {col: tf.convert_to_tensor(features[col].values, dtype=tf.float32) for col in features.columns}
#     labels_tensor = tf.convert_to_tensor(labels.values, dtype=tf.float32)

#     # Crear dataset de TensorFlow
#     dataset = tf.data.Dataset.from_tensor_slices((features_dict, labels_tensor))
#     dataset = dataset.cache().shuffle(len(data)).prefetch(buffer_size=tf.data.AUTOTUNE)

#     # Definir train_size y test_size correctamente
#     test_size = len(data) - train_size  # Corregir tamaño del dataset de prueba

#     train_dataset = dataset.take(train_size).batch(batch_size)
#     test_dataset = dataset.skip(train_size).take(test_size).batch(batch_size)  # Agregar `take(test_size)`

#     return train_dataset, test_dataset

In [38]:
def get_train_and_test_splits(train_size, batch_size=1):
    data = pd.read_csv("./../data/train_wine.csv")

    # Define features and labels
    features = data.drop(columns=["id", "quality"])
    labels = data["quality"]

    # Convert to dict of tf.Tensors
    features_dict = {
        col: tf.convert_to_tensor(features[col].values.reshape(-1, 1), dtype=tf.float32)
        for col in features.columns
    }
    labels_tensor = tf.convert_to_tensor(labels.values.reshape(-1, 1), dtype=tf.float32)

    # Create tf.data.Dataset
    dataset = tf.data.Dataset.from_tensor_slices((features_dict, labels_tensor))
    dataset = dataset.shuffle(len(data)).cache().prefetch(buffer_size=tf.data.AUTOTUNE)

    test_size = len(data) - train_size

    train_dataset = dataset.take(train_size).batch(batch_size)
    test_dataset = dataset.skip(train_size).take(test_size).batch(batch_size)

    return train_dataset, test_dataset


## Compile, train, and evaluate the model

In [39]:
hidden_units = [8, 8]
learning_rate = 0.001


def run_experiment(model, loss, train_dataset, test_dataset):

    model.compile(
        optimizer=keras.optimizers.RMSprop(learning_rate=learning_rate),
        loss=loss,
        metrics=[keras.metrics.RootMeanSquaredError(name="rmse")],
    )

    print("Start training the model...")
    model.fit(train_dataset, epochs=num_epochs, validation_data=test_dataset)
    print("Model training finished.")
    _, rmse = model.evaluate(train_dataset, verbose=0)
    print(f"Train RMSE: {round(rmse, 3)}")

    print("Evaluating model performance...")
    _, rmse = model.evaluate(test_dataset, verbose=0)
    print(f"Test RMSE: {round(rmse, 3)}")

## Create model inputs

In [40]:
FEATURE_NAMES = [
    "fixed acidity",
    "volatile acidity",
    "citric acid",
    "residual sugar",
    "chlorides",
    "free sulfur dioxide",
    "total sulfur dioxide",
    "density",
    "pH",
    "sulphates",
    "alcohol",
]


def create_model_inputs():
    inputs = {}
    for feature_name in FEATURE_NAMES:
        inputs[feature_name] = layers.Input(
            name=feature_name, shape=(1,), dtype=tf.float32
        )
    return inputs

## Experiment 1: standard neural network
We create a standard deterministic neural network model as a baseline.

In [41]:
def create_baseline_model():
    inputs = create_model_inputs()
    input_values = [value for _, value in sorted(inputs.items())]
    features = keras.layers.concatenate(input_values)
    features = layers.BatchNormalization()(features)

    # Create hidden layers with deterministic weights using the Dense layer.
    for units in hidden_units:
        features = layers.Dense(units, activation="sigmoid")(features)
    # The output is deterministic: a single point estimate.
    outputs = layers.Dense(units=1)(features)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

Let's split the wine dataset into training and test sets, with 85% and 15% of the examples, respectively.

In [42]:
dataset_size = 4898
batch_size = 256
train_size = int(dataset_size * 0.85)
train_dataset, test_dataset = get_train_and_test_splits(train_size, batch_size)

Now let's train the baseline model. We use the MeanSquaredError as the loss function.

In [43]:
num_epochs = 100
mse_loss = keras.losses.MeanSquaredError()
baseline_model = create_baseline_model()
run_experiment(baseline_model, mse_loss, train_dataset, test_dataset)

Start training the model...
Epoch 1/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 57ms/step - loss: 33.2223 - rmse: 5.7636 - val_loss: 30.3474 - val_rmse: 5.5088
Epoch 2/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step - loss: 30.7395 - rmse: 5.5443 - val_loss: 28.5035 - val_rmse: 5.3389
Epoch 3/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step - loss: 28.9490 - rmse: 5.3804 - val_loss: 26.7583 - val_rmse: 5.1728
Epoch 4/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step - loss: 27.2485 - rmse: 5.2200 - val_loss: 25.0850 - val_rmse: 5.0085
Epoch 5/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step - loss: 25.6046 - rmse: 5.0601 - val_loss: 23.4732 - val_rmse: 4.8449
Epoch 6/100
[1m17/17[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step - loss: 24.0158 - rmse: 4.9005 - val_loss: 21.9260 - val_rmse: 4.6825
Epoch 7/100
[1m17/17[0m [32m━━━━━

We take a sample from the test set use the model to obtain predictions for them. Note that since the baseline model is deterministic, we get a single a point estimate prediction for each test example, with no information about the uncertainty of the model nor the prediction.

In [44]:
sample = 10
examples, targets = list(test_dataset.unbatch().shuffle(batch_size * 10).batch(sample))[
    0
]

predicted = baseline_model(examples).numpy()
for idx in range(sample):
    print(f"Predicted: {round(float(predicted[idx][0]), 1)} - Actual: {targets[idx]}")

Predicted: 6.0 - Actual: [7.]
Predicted: 5.6 - Actual: [6.]
Predicted: 5.3 - Actual: [5.]
Predicted: 5.4 - Actual: [6.]
Predicted: 6.1 - Actual: [6.]
Predicted: 5.4 - Actual: [5.]
Predicted: 6.1 - Actual: [5.]
Predicted: 5.4 - Actual: [4.]
Predicted: 5.4 - Actual: [5.]
Predicted: 6.0 - Actual: [6.]


## Experiment 2: Bayesian neural network (BNN)

The object of the Bayesian approach for modeling neural networks is to capture the epistemic uncertainty, which is uncertainty about the model fitness, due to limited training data.

The idea is that, instead of learning specific weight (and bias) values in the neural network, the Bayesian approach learns weight distributions - from which we can sample to produce an output for a given input - to encode weight uncertainty.

Thus, we need to define prior and the posterior distributions of these weights, and the training process is to learn the parameters of these distributions.

In [50]:
# # Define the prior weight distribution as Normal of mean=0 and stddev=1.
# # Note that, in this example, the we prior distribution is not trainable,
# # as we fix its parameters.
# def prior(kernel_size, bias_size, dtype=None):
#     n = kernel_size + bias_size
#     prior_model = keras.Sequential(
#         [
#             tfp.layers.DistributionLambda(
#                 lambda t: tfp.distributions.MultivariateNormalDiag(
#                     loc=tf.zeros(n), scale_diag=tf.ones(n)
#                 )
#             )
#         ]
#     )
#     return prior_model


# # Define variational posterior weight distribution as multivariate Gaussian.
# # Note that the learnable parameters for this distribution are the means,
# # variances, and covariances.
# def posterior(kernel_size, bias_size, dtype=None):
#     n = kernel_size + bias_size
#     posterior_model = keras.Sequential(
#         [
#             tfp.layers.VariableLayer(
#                 tfp.layers.MultivariateNormalTriL.params_size(n), dtype=dtype
#             ),
#             tfp.layers.MultivariateNormalTriL(n),
#         ]
#     )
#     return posterior_model


In [58]:
import tensorflow_probability as tfp
tfd = tfp.distributions

def prior_fn(dtype, shape, name, trainable, add_variable_fn):
    return tfd.Independent(
        tfd.Normal(loc=tf.zeros(shape, dtype=dtype), scale=1.0),
        reinterpreted_batch_ndims=len(shape)
    )

def posterior_fn(dtype, shape, name, trainable, add_variable_fn):
    loc = add_variable_fn(
        name=name + '_loc',
        shape=shape,
        initializer=tf.initializers.random_normal(stddev=0.1),
        dtype=dtype,
        trainable=True
    )
    scale_raw = add_variable_fn(
        name=name + '_scale_raw',
        shape=shape,
        initializer=tf.initializers.random_normal(mean=-3.0, stddev=0.1),
        dtype=dtype,
        trainable=True
    )
    scale = tf.nn.softplus(scale_raw)
    return tfd.Independent(
        tfd.Normal(loc=loc, scale=scale),
        reinterpreted_batch_ndims=len(shape)
    )


In [61]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

FEATURE_NAMES = [
    "fixed acidity", "volatile acidity", "citric acid", "residual sugar",
    "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density",
    "pH", "sulphates", "alcohol"
]

# Dummy batch of data (2 samples)
sample_inputs = {
    name: tf.constant([[1.0], [2.0]], dtype=tf.float32)
    for name in FEATURE_NAMES
}
sample_labels = tf.constant([[5.0], [6.0]], dtype=tf.float32)
dataset = tf.data.Dataset.from_tensor_slices((sample_inputs, sample_labels)).batch(2)


In [62]:
def create_model_inputs():
    return {
        name: layers.Input(name=name, shape=(1,), dtype=tf.float32)
        for name in FEATURE_NAMES
    }

def create_bnn_model(train_size=2):
    inputs = create_model_inputs()
    input_values = [inputs[name] for name in FEATURE_NAMES]
    x = layers.concatenate(input_values)
    x = layers.BatchNormalization()(x, training=False)

    x = tfp.layers.DenseVariational(
        units=8,
        make_prior_fn=prior_fn,
        make_posterior_fn=posterior_fn,
        kl_weight=1.0 / train_size,
        activation="sigmoid"
    )(x)

    outputs = layers.Dense(1)(x)
    return keras.Model(inputs=inputs, outputs=outputs)


In [63]:
model = create_bnn_model()
example = next(iter(dataset))
output = model(example[0])  # 🧨 If this works, you're good!
print("✅ Output shape:", output.shape)

AttributeError: 'tuple' object has no attribute 'rank'

We use the tfp.layers.DenseVariational layer instead of the standard keras.layers.Dense layer in the neural network model.

In [59]:
def create_bnn_model(train_size):
    inputs = create_model_inputs()

    input_values = [inputs[feature] for feature in FEATURE_NAMES]
    features = keras.layers.concatenate(input_values)
    features = layers.BatchNormalization()(features, training=False)  # <-- CRITICAL FIX

    for units in hidden_units:
        features = tfp.layers.DenseVariational(
            units=units,
            make_prior_fn=prior_fn,           # use your fixed versions!
            make_posterior_fn=posterior_fn,
            kl_weight=1.0 / train_size,
            activation="sigmoid",
        )(features)

    outputs = layers.Dense(units=1)(features)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

The epistemic uncertainty can be reduced as we increase the size of the training data. That is, the more data the BNN model sees, the more it is certain about its estimates for the weights (distribution parameters). Let's test this behaviour by training the BNN model on a small subset of the training set, and then on the full training set, to compare the output variances.

### Train BNN with a small training subset.

In [60]:
num_epochs = 500
train_sample_size = int(train_size * 0.3)
small_train_dataset = train_dataset.unbatch().take(train_sample_size).batch(batch_size)

bnn_model_small = create_bnn_model(train_sample_size)
run_experiment(bnn_model_small, mse_loss, small_train_dataset, test_dataset)

AttributeError: 'tuple' object has no attribute 'rank'

In [29]:
class DebugLayer(tf.keras.layers.Layer):
    def __init__(self, msg=""):
        super().__init__()
        self.msg = msg

    def call(self, inputs):
        tf.print(f"🔍 {self.msg} - inputs type:", type(inputs))
        tf.print(f"🔍 {self.msg} - inputs shape:", inputs.shape if hasattr(inputs, "shape") else "NO SHAPE")
        return inputs

In [36]:
def create_bnn_model(train_size):
    inputs = create_model_inputs()

    # Important: preserve order of features to match dataset
    input_values = [inputs[feature] for feature in FEATURE_NAMES]
    x = layers.concatenate(input_values)
    x = layers.BatchNormalization()(x, training=False)  # <- critical fix

    for i, units in enumerate(hidden_units):
        x = tfp.layers.DenseVariational(
            units=units,
            make_prior_fn=prior_fn,           # Make sure it's the fixed callable
            make_posterior_fn=posterior_fn,
            kl_weight=1.0 / train_size,
            activation="sigmoid",
        )(x)

    outputs = layers.Dense(1)(x)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model



In [37]:
bnn_model_small = create_bnn_model(train_sample_size)
pred = bnn_model_small(example_batch[0])  # ✅ Should no longer crash
print(pred.shape)

AttributeError: 'tuple' object has no attribute 'rank'

Since we have trained a BNN model, the model produces a different output each time we call it with the same input, since each time a new set of weights are sampled from the distributions to construct the network and produce an output. The less certain the mode weights are, the more variability (wider range) we will see in the outputs of the same inputs.

In [None]:
def compute_predictions(model, iterations=100):
    predicted = []
    for _ in range(iterations):
        predicted.append(model(examples).numpy())
    predicted = np.concatenate(predicted, axis=1)

    prediction_mean = np.mean(predicted, axis=1).tolist()
    prediction_min = np.min(predicted, axis=1).tolist()
    prediction_max = np.max(predicted, axis=1).tolist()
    prediction_range = (np.max(predicted, axis=1) - np.min(predicted, axis=1)).tolist()

    for idx in range(sample):
        print(
            f"Predictions mean: {round(prediction_mean[idx], 2)}, "
            f"min: {round(prediction_min[idx], 2)}, "
            f"max: {round(prediction_max[idx], 2)}, "
            f"range: {round(prediction_range[idx], 2)} - "
            f"Actual: {targets[idx]}"
        )


compute_predictions(bnn_model_small)

### Train BNN with the whole training set.

In [None]:
num_epochs = 500
bnn_model_full = create_bnn_model(train_size)
run_experiment(bnn_model_full, mse_loss, train_dataset, test_dataset)

compute_predictions(bnn_model_full)

In [70]:
def create_bnn_model(train_size=2):
    inputs = create_model_inputs()
    input_values = [inputs[name] for name in FEATURE_NAMES]
    x = layers.concatenate(input_values)
    x = SafeBatchNorm()(x)

    x = tfp.layers.DenseVariational(
        units=8,
        make_prior_fn=prior_fn(),
        make_posterior_fn=posterior_fn(),
        kl_weight=1.0 / train_size,
        activation="sigmoid"
    )(x)

    outputs = layers.Dense(1)(x)
    return keras.Model(inputs=inputs, outputs=outputs)

In [66]:
class SafeBatchNorm(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__()
        self.bn = layers.BatchNormalization(**kwargs)

    def call(self, inputs, training=False):
        output = self.bn(inputs, training=training)
        if isinstance(output, tuple):
            return output[0]
        return output

In [71]:
model = create_bnn_model()
example = next(iter(dataset))
output = model(example[0])
print("✅ Output shape:", output.shape)


AttributeError: 'tuple' object has no attribute 'rank'

In [68]:
import tensorflow_probability as tfp
tfd = tfp.distributions

def prior_fn():
    return lambda dtype, shape, name, trainable, add_variable_fn: tfd.Independent(
        tfd.Normal(loc=tf.zeros(shape, dtype=dtype), scale=1.0),
        reinterpreted_batch_ndims=len(shape)
    )

def posterior_fn():
    return lambda dtype, shape, name, trainable, add_variable_fn: tfd.Independent(
        tfd.Normal(
            loc=add_variable_fn(
                name + '_loc',
                shape=shape,
                initializer=tf.initializers.random_normal(stddev=0.1),
                dtype=dtype,
                trainable=trainable,
            ),
            scale=tf.nn.softplus(add_variable_fn(
                name + '_scale',
                shape=shape,
                initializer=tf.initializers.random_normal(mean=-3.0, stddev=0.1),
                dtype=dtype,
                trainable=trainable,
            )),
        ),
        reinterpreted_batch_ndims=len(shape),
    )


NameError: name 'x' is not defined

## Experiment 3: probabilistic Bayesian neural network
So far, the output of the standard and the Bayesian NN models that we built is deterministic, that is, produces a point estimate as a prediction for a given example. We can create a probabilistic NN by letting the model output a distribution. In this case, the model captures the aleatoric uncertainty as well, which is due to irreducible noise in the data, or to the stochastic nature of the process generating the data.

In this example, we model the output as a IndependentNormal distribution, with learnable mean and variance parameters. If the task was classification, we would have used IndependentBernoulli with binary classes, and OneHotCategorical with multiple classes, to model distribution of the model output.

In [None]:
def create_probablistic_bnn_model(train_size):
    inputs = create_model_inputs()
    features = keras.layers.concatenate(list(inputs.values()))
    features = layers.BatchNormalization()(features)

    # Create hidden layers with weight uncertainty using the DenseVariational layer.
    for units in hidden_units:
        features = tfp.layers.DenseVariational(
            units=units,
            make_prior_fn=prior,
            make_posterior_fn=posterior,
            kl_weight=1 / train_size,
            activation="sigmoid",
        )(features)

    # Create a probabilisticå output (Normal distribution), and use the `Dense` layer
    # to produce the parameters of the distribution.
    # We set units=2 to learn both the mean and the variance of the Normal distribution.
    distribution_params = layers.Dense(units=2)(features)
    outputs = tfp.layers.IndependentNormal(1)(distribution_params)

    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

Since the output of the model is a distribution, rather than a point estimate, we use the negative loglikelihood as our loss function to compute how likely to see the true data (targets) from the estimated distribution produced by the model.

In [None]:
def negative_loglikelihood(targets, estimated_distribution):
    return -estimated_distribution.log_prob(targets)


num_epochs = 1000
prob_bnn_model = create_probablistic_bnn_model(train_size)
run_experiment(prob_bnn_model, negative_loglikelihood, train_dataset, test_dataset)

Now let's produce an output from the model given the test examples. The output is now a distribution, and we can use its mean and variance to compute the confidence intervals (CI) of the prediction.

In [None]:
prediction_distribution = prob_bnn_model(examples)
prediction_mean = prediction_distribution.mean().numpy().tolist()
prediction_stdv = prediction_distribution.stddev().numpy()

# The 95% CI is computed as mean ± (1.96 * stdv)
upper = (prediction_mean + (1.96 * prediction_stdv)).tolist()
lower = (prediction_mean - (1.96 * prediction_stdv)).tolist()
prediction_stdv = prediction_stdv.tolist()

for idx in range(sample):
    print(
        f"Prediction mean: {round(prediction_mean[idx][0], 2)}, "
        f"stddev: {round(prediction_stdv[idx][0], 2)}, "
        f"95% CI: [{round(upper[idx][0], 2)} - {round(lower[idx][0], 2)}]"
        f" - Actual: {targets[idx]}"
    )

In [None]:
print(f"Tamaño del dataset de prueba: {len(list(test_dataset))}")

In [None]:
data = pd.read_csv("./../data/WineQT.csv")
print(f"Tamaño total del dataset: {len(data)}")

In [None]:
dataset_size = 1143
train_size = int(dataset_size * 0.85)
test_size = dataset_size - train_size

print(f"Train size: {train_size}, Test size: {test_size}")

In [None]:
test_dataset = dataset.skip(train_size).take(test_size).batch(batch_size)

print(f"Ejemplos en test_dataset después de skip: {len(list(test_dataset))}")

In [8]:
data = pd.read_csv("./../data/train_wine.csv")

In [9]:
data.head()

Unnamed: 0,id,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,0,7.7,0.63,0.0,2.4,0.078,4.0,14.0,0.9965,3.31,0.53,9.2,5.0
1,1,6.8,0.48,0.32,2.5,0.086,33.0,58.0,0.9974,3.53,0.49,9.7,6.0
2,2,6.4,0.59,0.01,2.8,0.086,3.0,10.0,0.99716,3.45,0.49,9.5,6.0
3,3,7.0,0.74,0.24,2.1,0.072,14.0,28.0,0.99498,3.37,0.52,10.4,5.0
4,4,11.5,0.32,0.32,2.8,0.082,14.0,37.0,0.9956,3.6,0.65,12.3,6.0
