In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_probability as tfp
from sklearn import preprocessing
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

ModuleNotFoundError: No module named 'sklearn'

Generate training data with varied noise

In [None]:
def f(x, sigma):
    epsilon = np.random.randn(*x.shape) * sigma
    return (x-5)**2 + epsilon
train_size = 300

X = np.geomspace(1, 10, train_size).reshape(-1, 1)
Y_true = f(X, sigma=0.0)
Y = np.zeros(X.shape)
for i in range(len(X)):
    x = X[i]
    if 4<x<10:
        Y[i] = f(x, sigma=abs(x)-4)
    else:
        Y[i] = f(x, sigma=0)        
print(X.shape)

Visualize training data

In [None]:
cm = 1/2.54
fig, ax = plt.subplots(1, figsize=(9*cm, 8*cm), sharey='row', dpi=80, facecolor='w', edgecolor='k')
plt.subplots_adjust(left=0.1, right=.98, top=0.98, bottom=0.15, hspace = 0.65, wspace=0.15)
ax.scatter(X, Y, marker='+', label='Training data')
ax.plot(X, Y_true, label='Truth')
plt.legend()

Loss function, Distribution layer, and KL diverenge

In [None]:
def NLL(y, distr): 
  return -distr.log_prob(y) 

def normal_sp(params): 
  return tfp.distributions.Normal(loc=params[:,0:1], scale=1e-3
                                  + tf.math.softplus(0.05 * params[:,1:2])) #both parameters are learnable

kernel_divergence_fn=lambda q, p, _: tfp.distributions.kl_divergence(q, p) / (X.shape[0] )
bias_divergence_fn=lambda q, p, _: tfp.distributions.kl_divergence(q, p) / (X.shape[0] )

Build the network

In [None]:
inputs = tf.keras.layers.Input(shape=(X.shape[1],))

hidden = tfp.layers.DenseFlipout(20,bias_posterior_fn=tfp.layers.util.default_mean_field_normal_fn(),
                           bias_prior_fn=tfp.layers.default_multivariate_normal_fn,
                           kernel_divergence_fn=kernel_divergence_fn,
                           bias_divergence_fn=bias_divergence_fn,activation="relu")(inputs)
hidden = tfp.layers.DenseFlipout(32,bias_posterior_fn=tfp.layers.util.default_mean_field_normal_fn(),
                           bias_prior_fn=tfp.layers.default_multivariate_normal_fn,
                           kernel_divergence_fn=kernel_divergence_fn,
                           bias_divergence_fn=bias_divergence_fn,activation="relu")(hidden)
hidden = tfp.layers.DenseFlipout(20,bias_posterior_fn=tfp.layers.util.default_mean_field_normal_fn(),
                           bias_prior_fn=tfp.layers.default_multivariate_normal_fn,
                           kernel_divergence_fn=kernel_divergence_fn,
                           bias_divergence_fn=bias_divergence_fn,activation="relu")(hidden)
params = tfp.layers.DenseFlipout(2,bias_posterior_fn=tfp.layers.util.default_mean_field_normal_fn(),
                           bias_prior_fn=tfp.layers.default_multivariate_normal_fn,
                           kernel_divergence_fn=kernel_divergence_fn,
                           bias_divergence_fn=bias_divergence_fn)(hidden)
dist = tfp.layers.DistributionLambda(normal_sp)(params)


model = Model(inputs=inputs, outputs=dist)
model.compile(Adam(learning_rate=0.0002), loss=NLL) 
model_params = Model(inputs=inputs, outputs=params)
model.summary()

Train the network

In [None]:
epoch = 10000
batch_size = 150
model.fit(X, Y, epochs=epoch, verbose=1, batch_size = batch_size)

Test the network

In [None]:
cm = 1/2.54  # centimeters in inches
plt.rcParams["font.family"] = "Times New Roman"
plt.rcParams["font.size"] = 9

X_test = np.linspace(-5, 15, 50).reshape(-1, 1)
Y_test = np.zeros([len(X_test), 100])
for j in range(100):
    Y_test[:,j]= np.squeeze(model.predict(X_test))
Pred = np.mean(Y_test, axis=1)
Stdv = np.std(Y_test, axis=1)

X_test = np.squeeze(X_test)    
fig, ax = plt.subplots(1, figsize=(17*cm, 8*cm), sharey='row', dpi=80, facecolor='w', edgecolor='k')
plt.subplots_adjust(left=0.1, right=.98, top=0.98, bottom=0.15, hspace = 0.65, wspace=0.15)
ax.plot(X_test, Pred, 'r-', label='Predictive mean');
ax.scatter(X,Y, marker='+', label='Measured');
ax.fill_between(X_test,Pred+1.96*Stdv,Pred-1.96*Stdv,
                 alpha=0.5, label='95% CI (+/- 1.96std)')
ax.legend()

Compute the model uncertainty

In [None]:
def compute_predictions_pbnn(model, examples):
    prediction_distribution= model(examples)
    prediction_mean = np.squeeze(prediction_distribution.mean().numpy())
    prediction_stdv = np.squeeze(prediction_distribution.stddev().numpy())
    # The 95% CI is computed as mean ± (1.96 * stdv)
    upper = (prediction_mean + (1.96 * prediction_stdv))
    lower = (prediction_mean - (1.96 * prediction_stdv))

    return prediction_mean, prediction_stdv, upper, lower


Means = np.zeros([len(X_test), 100])
Stdvs = np.zeros([len(X_test), 100])
for j in range(100):
    prediction_mean, prediction_stdv, upper, lower = compute_predictions_pbnn(model, X_test)
    Means[:,j] = prediction_mean
    Stdvs[:,j] = prediction_stdv

Expected_Mean = np.mean(Means,axis=1)
Standdev_Mean = np.std(Means,axis=1)

In [None]:
fig, ax = plt.subplots(1, figsize=(17*cm, 8*cm), sharey='row', dpi=80, facecolor='w', edgecolor='k')
plt.subplots_adjust(left=0.1, right=.98, top=0.98, bottom=0.15, hspace = 0.65, wspace=0.15)
ax.plot(X_test, Expected_Mean, 'r-', label='Predictive mean');
ax.scatter(X,Y, marker='+', label='Measured');
ax.fill_between(X_test,Pred+1.96*Standdev_Mean,Pred-1.96*Standdev_Mean,
                 alpha=0.5, label='95% CI (+/- 1.96std)')
ax.legend()