## Code to build likelihood ratio estimator

Estimating 

$r = \frac{P(D_W, D_P, \theta)}{P(D_W, \theta) P(D_P)}$

In [None]:
import tensorflow as tf
import tensorflow_probability as tfp
import tqdm
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import random
from tensionnet import wmapplanck


In [None]:
optimizer = tf.keras.optimizers.legacy.Adam(
                learning_rate=1e-3)

model = tf.keras.models.Sequential([
  tf.keras.layers.Dense(norm_data.shape[1], activation='sigmoid'),
  tf.keras.layers.BatchNormalization(),
  tf.keras.layers.Dense(100, activation='sigmoid',
                        kernel_initializer=tf.keras.initializers.GlorotNormal()),
  tf.keras.layers.BatchNormalization(),
  tf.keras.layers.Dense(100, activation='sigmoid',
                        kernel_initializer=tf.keras.initializers.GlorotNormal()),
  tf.keras.layers.BatchNormalization(),
  tf.keras.layers.Dense(1, activation='linear',
                        kernel_initializer=tf.keras.initializers.GlorotNormal()),
])

In [None]:
@tf.function(jit_compile=True)
def _test_step(param, truth):
        
    r"""
    This function is used to calculate the loss value at each epoch and
    adjust the weights and biases of the neural networks via the
    optimizer algorithm.
    """
    prediction = tf.transpose(model(param, training=True))[0]
    prediction = tf.keras.layers.Activation('sigmoid')(prediction)
    truth = tf.convert_to_tensor(truth)
    loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)(truth, prediction)
    return loss

@tf.function(jit_compile=True)
def _train_step(params, truth):

    r"""
    This function is used to calculate the loss value at each epoch and
    adjust the weights and biases of the neural networks via the
    optimizer algorithm.
    """

    with tf.GradientTape() as tape:
        prediction = tf.transpose(model(params, training=True))[0]
        prediction = tf.keras.layers.Activation('sigmoid')(prediction)
        truth = tf.convert_to_tensor(truth)
        loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)(truth, prediction)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(
        zip(gradients,
            model.trainable_variables))
    return loss


Training loop...

In [None]:

epochs = 50
batch_size = 300
patience = 10

data_train, data_test, labels_train, labels_test = \
        train_test_split(data, labels, test_size=0.2)

train_dataset = np.hstack([data_train, labels_train[:, np.newaxis]]).astype(np.float32)
train_dataset = tf.data.Dataset.from_tensor_slices(train_dataset)
train_dataset = train_dataset.batch(batch_size)

loss_history = []
test_loss_history = []
c = 0
#for i in tqdm.tqdm(range(epochs)):
for i in range(epochs):

    epoch_loss_avg = tf.keras.metrics.Mean()

    loss = [_train_step(x[:, :-1], x[:, -1]) for x in  train_dataset]
    epoch_loss_avg.update_state(loss)
    loss_history.append(epoch_loss_avg.result())

    test_loss_history.append(_test_step(data_test, labels_test))
    print('Epoch: {} Loss: {:.5f} Test Loss: {:.5f}'.format(
        i, loss_history[-1], test_loss_history[-1]))

    c += 1
    if i == 0:
        minimum_loss = test_loss_history[-1]
        minimum_epoch = i
        minimum_model = model
    else:
        if test_loss_history[-1] < minimum_loss:
            minimum_loss = test_loss_history[-1]
            minimum_epoch = i
            minimum_model = model
            c = 0
    if minimum_model:
        if c == patience:
            print('Early stopped. Epochs used = ' + str(i) +
                    '. Minimum at epoch = ' + str(minimum_epoch))
            model = minimum_model
            break

plt.plot(loss_history, label='train')
plt.plot(test_loss_history, label='test')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.savefig('loss_history.png', dpi=300, bbox_inches='tight')


In [None]:
def __call__(tdata, params):

        r_values = []
        for i in range(len(params)):
            ps = tf.convert_to_tensor(np.array([[*tdata, *params[i]]]).astype('float32'))
            logr = model(ps).numpy()[0]
            r_values.append(logr)

        r_values = np.array(r_values).T[0]
        return r_values

true_params = np.array([0.2, 15, 7])
true_data = gaussian(true_params)
true_signals = (true_data - signals.mean())/signals.std()


samples = prior(10000)
norm_params = (samples - theta.mean(axis=0))/theta.std(axis=0)

# logL - log Z
r_values = __call__(true_signals, norm_params)
prior_volume = np.log(1/np.prod(theta_max - theta_min))
logP = r_values + prior_volume
args = np.argsort(logP)
samples = samples[args]
logP = logP[args]

cbar = plt.scatter(samples[:, 0], samples[:, 1], c=logP- logP.max(), s=10, cmap='viridis')
plt.axvline(true_params[0], color='r', linestyle='--')
plt.axhline(true_params[1], color='r', linestyle='--')
plt.colorbar(cbar)
plt.xlabel(r'$A$')
plt.ylabel(r'$z_c$')
plt.savefig('example_posterior.png', dpi=300, bbox_inches='tight')