In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp

2024-02-26 13:48:49.825758: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# Generate some synthetic data
true_mean = 2.0
true_stddev = 1.0
Y = np.random.normal(loc=true_mean, scale=true_stddev, size=100)
print(np.mean(Y), np.std(Y))

1.8025703545183407 0.9944207885383278


In [3]:
# Define the log-likelihood function
def log_likelihood(params):
    mu, sigma = params[0], params[1]
    likelihoods = tfp.distributions.Normal(loc=mu, scale=sigma).log_prob(Y)
    return tf.reduce_sum(likelihoods)

# Define initial parameter values
params_initial = tf.Variable([0.0, 2.0])

# Use TensorFlow's GradientTape to compute gradients
optimizer = tf.optimizers.Adam(0.5)

for step in range(100):
    with tf.GradientTape() as tape:
        loss = -log_likelihood(params_initial)
    gradients = tape.gradient(loss, [params_initial])
    optimizer.apply_gradients(zip(gradients, [params_initial]))

print(params_initial.numpy())

[1.8062607  0.99680305]


In [4]:
# Compute the Fisher Information Matrix using the Hessian
with tf.GradientTape() as tape:
    with tf.GradientTape() as tape2:
        loss = -log_likelihood(params_initial)  # Remove the extra minus sign here
    gradients = tape2.gradient(loss, params_initial)
hessian = tape.jacobian(gradients, params_initial)
fim = tf.linalg.inv(hessian)
print("Fisher Information Matrix:")
print(fim.numpy())

Fisher Information Matrix:
[[9.9364379e-03 3.7051737e-05]
 [3.7051883e-05 5.0039501e-03]]


In [47]:
diagonals = tf.Variable([1, 1, 1])
tf.linalg.diag(diagonals).numpy()

array([[1, 0, 0],
       [0, 1, 0],
       [0, 0, 1]], dtype=int32)

In [77]:
v = tf.Variable([0, 1])

tf.gather(v, [0, 1, 1, 1]).numpy()

array([0, 1, 1, 1], dtype=int32)

In [10]:
def convert_params(params): 
    ''' 
    lambda, gamma, mu, c, b, sigma_d, sigma_p 
    '''
    c = params[3] 
    J = tf.gather(params, [0, 2, 0, 1])
    J = tf.tensor_scatter_nd_update(J, [[2]], [J[2]*c])
    J = tf.reshape(J, (2, 2))
    B = tf.linalg.diag(params[5:]**2)
    B = tf.tensor_scatter_nd_update(B, [[1, 1]], [B[1, 1] + B[0, 0]*c*c])
    B = tf.tensor_scatter_nd_update(B, [[1, 0]], [B[0, 0]*c])
    B = tf.tensor_scatter_nd_update(B, [[0, 1]], [B[0, 1]])
    return J, B

In [17]:
params = tf.Variable([-1, -2, 3, 4, 5, 8, 9], dtype=tf.double)
J, B  = convert_params(params)
print(J.numpy(), B.numpy())

[[-1.  3.]
 [-4. -2.]] [[  64.    0.]
 [ 256. 1105.]]


## test out multivariate normal distribution

In [19]:
x = np.ones((10, 2))
loc = x @ tf.transpose(J) 

dist = tfp.distributions.MultivariateNormalTriL(loc=loc, scale_tril=tf.linalg.cholesky(B))
dist.log_prob(x).numpy()

array([-6.86926927, -6.86926927, -6.86926927, -6.86926927, -6.86926927,
       -6.86926927, -6.86926927, -6.86926927, -6.86926927, -6.86926927])

In [28]:
def minuslogP(params, traj, dt): 
    J, B = convert_params(params) 
    b = params[4]
    tril = tf.linalg.cholesky(B*dt)
    dx = traj[:, 1:] - traj[:, :-1]
    det = J @ traj[:, :-1]
    det = tf.tensor_scatter_nd_update(det, [[1]], [det[1] - b * det[0]**3])
    dist = tfp.distributions.MultivariateNormalTriL(loc=tf.transpose(det)*dt, scale_tril=tril)
    return  - tf.reduce_sum(dist.log_prob(dx.T))

In [29]:
params = tf.Variable([-1, -2, 3, 4, 5, 8, 9], dtype=tf.double)

traj = np.zeros((2, 10))
print(minuslogP(params, traj, 1).numpy())

55.030888668828595


In [31]:
tf.maximum(0.0, params).numpy()

array([0., 0., 3., 4., 5., 8., 9.])

In [35]:
minimum = np.array([1.0, 1.0])  # The center of the quadratic bowl.
scales = np.array([2.0, 3.0])  # The scales along the two axes.

# The objective function and the gradient.
cost = lambda x: tf.reduce_sum(scales * tf.math.squared_difference(x, minimum), axis=-1)
def quadratic_loss_and_gradient(x):
    return tfp.math.value_and_gradient(cost,x)

start = tf.constant([0.6, 0.8])  # Starting point for the search.
optim_results = tfp.optimizer.bfgs_minimize(
  quadratic_loss_and_gradient, initial_position=start, tolerance=1e-8)

# Check that the search converged
assert(optim_results.converged)
# Check that the argmin is close to the actual value.
np.testing.assert_allclose(optim_results.position, minimum)
# Print out the total number of function evaluations it took. Should be 5.
print ("Function evaluations: %d" % optim_results.num_objective_evaluations)

Function evaluations: 5
