In [1]:
import os
import pickle
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
import matplotlib.pyplot as plt
plt.ioff()
plt.rcParams['figure.dpi'] = 150
plt.rcParams['text.usetex'] = True
plt.rcParams['font.size'] = 12

tfkl = tf.keras.layers
tfpl = tfp.layers
tf.keras.backend.set_floatx("float64")
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

In [2]:
def load_mdn(DT, N_C, user):
    """
    Loads MDN model.
    """

    model = tf.keras.Sequential(
        [tfkl.Dense(256, activation='tanh'),
         tfkl.Dense(256, activation='tanh'),
         tfkl.Dense(256, activation='tanh'),
         tfkl.Dense(256, activation='tanh'),
         tfkl.Dense(512, activation='tanh'),
         tfkl.Dense(512, activation='tanh'),
         tfkl.Dense(N_C * 15, activation=None),
         tfpl.MixtureSameFamily(N_C, tfpl.MultivariateNormalTriL(4))])

    model.load_weights(
        f"../models/{user}/GDP_{DT:.0f}day_NC{N_C}/trained/weights.index").expect_partial()
    return model

In [82]:
def load_scalers(DT, N_C, user):
    """
    Loads scaler objects relating to MDN models.
    """

    with open(f"../models/{user}/GDP_{DT:.0f}day_NC{N_C}/Xscaler.pkl", "rb") as file:
        Xscaler = pickle.load(file)

    with open(f"../models/{user}/GDP_{DT:.0f}day_NC{N_C}/Yscaler.pkl", "rb") as file:
        Yscaler = pickle.load(file)
    return Xscaler, Yscaler

def given(X_1, X_0, Y_scaler, model):
    """
    Evaluates transition density for fixed X_0.
    """
    return Y_scaler.invert_standardisation_prob(
        np.exp(
            model.log_prob(
                Y_scaler.standardise(X_1 - X_0))))

In [93]:
# Model parameters.
N_C = 32
DT = 4
user = "masha"

# Load our trained model.
model = load_mdn(DT, N_C, user)
X_scaler, Y_scaler = load_scalers(DT, N_C, user)

# Define the initial position of the first particle and the second particle.
A0 = np.array([0.4, 0.4])
# B0 = np.array([0.6, 0.6, 0.6, 0.6]).reshape(4, 1) 
B0 = np.array([0.6, 0.6])

# Combine the initial positions to get a vector our model can understand.
initial_position = np.hstack([A0, B0]).reshape(4, 1)
print(f"{initial_position = }")

model_at_init = model(X_scaler.standardise(initial_position))

x = np.linspace(0, 1, 5)
y = np.linspace(0, 1, 5)
xx, yy = np.meshgrid(x, y, copy=True)

B1_positions = np.vstack([xx.ravel(), yy.ravel()])
print(f"{B1_positions.shape = }")

# Fix the position of the second point, A1.
A1 = np.array([0.5, 0.5]).reshape(2, 1)

A1_positions = np.repeat(A1, 25, axis=1)
print(f"{A1_positions.shape = }")

updated_positions = np.vstack([A1_positions, B1_positions])
print(f"{updated_positions.shape = }")

initial_position = array([[0.4],
       [0.4],
       [0.6],
       [0.6]])
B1_positions.shape = (2, 25)
A1_positions.shape = (2, 25)
updated_positions.shape = (4, 25)


In [97]:
probs = np.zeros_like(updated_positions)

for i in range(updated_positions.shape[1]):
    p = updated_positions[:, i]
    # print(p)
    prob = given(p, initial_position, Y_scaler, model_at_init)
    # print(prob, "\n")

    probs[:, i] = prob


print(f"{probs.shape = }")

# x0_probs = probs[:2, :].reshape((2, 25)) 


# plt.figure(figsize=(8, 6))
# plt.contourf(xx1, yy1, x0_probs, cmap='viridis')
# plt.colorbar(label='Probability Density')
# plt.xlabel('X')
# plt.ylabel('Y')
# plt.title('2D Gaussian Distribution')
# plt.grid(True)
# plt.show()

probs.shape = (4, 25)
