In [1]:
import numpy as np
from scipy.integrate import odeint

In [2]:
def dUdt(U, t, a, b):
        x = U[:len(a)]
        y = U[len(a):]

        dxdt = a*x - x*y
        dydt = b*x*y - y

        return np.concatenate([dxdt, dydt])

In [3]:
a = np.random.uniform(-10, 10, 1000)
b = np.random.uniform(-10, 10, 1000)
x0, y0 = np.array([0.5] * 1000), np.array([0.5] * 1000)
S0 = np.concatenate([x0, y0])
tspan = np.linspace(0, 10, 100)
sol = odeint(dUdt, S0, tspan, args=(a, b))

In [4]:
x_sol = sol[:, :len(a)] # Prey
y_sol = sol[:, len(a):] # Predator

In [58]:
import numpy as np 
from scipy.stats import rankdata
from scipy.spatial.distance import pdist, squareform, cdist

def wasserstein_distance(simulated_sample: np.ndarray, observed_sample: np.ndarray) -> float:
    # Mean Difference between simulated and observed
    distance = np.mean(np.abs(simulated_sample - observed_sample), axis=0)
    return distance

def maximum_mean_discrepancy(simulated_sample: np.ndarray, observed_sample: np.ndarray, gamma = 1.0) -> float:
    nrow = simulated_sample.shape[0]
    ncol = simulated_sample.shape[1]
    distances_XX = np.zeros((ncol, nrow, nrow))  # Distances within simulated_sample
    distances_YY = np.zeros((ncol, nrow, nrow))  # Distances within observed_sample
    distances_XY = np.zeros((ncol, nrow, nrow))  # Distances between simulated_sample and observed_sample

    for i in range(ncol):
        distances_XX[i] = squareform(pdist(simulated_sample[:, i, np.newaxis], metric='euclidean'))
        distances_YY[i] = squareform(pdist(observed_sample[:, i, np.newaxis], metric='euclidean'))
        distances_XY[i] = cdist(simulated_sample[:, i, np.newaxis], observed_sample[:, i, np.newaxis], metric='euclidean')

    # Gaussian kernel
    KXX = np.exp(-distances_XX ** 2 / (2 * gamma ** 2))
    KYY = np.exp(-distances_YY ** 2 / (2 * gamma ** 2))
    KXY = np.exp(-distances_XY ** 2 / (2 * gamma ** 2))

    mean_KXX = np.mean(KXX, axis=(1, 2))  # Mean of kernel within simulated_sample
    mean_KYY = np.mean(KYY, axis=(1, 2))  # Mean of kernel within observed_sample
    mean_KXY = np.mean(KXY, axis=(1, 2))  # Mean of kernel between simulated_sample and observed_sample

    # MMD vectorized for each column
    mmd_values = mean_KXX + mean_KYY - 2 * mean_KXY

    return mmd_values

def cramer_von_mises(simulated_sample: np.ndarray, observed_sample: np.ndarray) -> float:
    if len(simulated_sample) != len(observed_sample):
        return "Size of samples not equal."
    
    nrow = simulated_sample.shape[0]
    ncol = simulated_sample.shape[1]
    combined = np.concatenate((simulated_sample, observed_sample))
    # Find corresponding ranks in h associated with simulated/observed
    combined_rank = rankdata(combined, axis=0)
    simulated_rank = combined_rank[:nrow]
    observed_rank = combined_rank[nrow:]

    # Calculate distance
    idx = np.tile(np.arange(1, nrow+1), (ncol, 1)).T
    observed_sum = np.sum((observed_rank - idx)**2, axis=0)
    simulated_sum = np.sum((simulated_rank - idx)**2, axis=0)
    rank_sum = nrow * (observed_sum + simulated_sum)
    distance = rank_sum / (2*nrow**3) - (4*nrow**2 - 1)/(12*nrow)
    return distance

def energy_dist(simulated_sample: np.ndarray, observed_sample: np.ndarray) -> float:
    nrow = simulated_sample.shape[0]
    ncol = simulated_sample.shape[1]
    distances_XX = np.zeros((ncol, nrow, nrow))  # Distances within array1
    distances_YY = np.zeros((ncol, nrow, nrow))  # Distances within array2
    distances_XY = np.zeros((ncol, nrow, nrow))  # Distances between array1 and array2

    for i in range(ncol):
        distances_XX[i] = squareform(pdist(simulated_sample[:, i, np.newaxis], metric='euclidean'))
        distances_YY[i] = squareform(pdist(observed_sample[:, i, np.newaxis], metric='euclidean'))
        distances_XY[i] = cdist(simulated_sample[:, i, np.newaxis], observed_sample[:, i, np.newaxis], metric='euclidean')

    mean_dist_XY = np.mean(distances_XY, axis=(1, 2))  # Mean distance between columns
    mean_dist_XX = np.mean(distances_XX, axis=(1, 2))  # Mean distance within array1
    mean_dist_YY = np.mean(distances_YY, axis=(1, 2))  # Mean distance within array2

    # Calculate the energy distances for each column in a vectorized way
    energy_distances = 2 * mean_dist_XY - mean_dist_XX - mean_dist_YY

    # Output the energy distances for each column
    return energy_distances

def kullback_leibler_divergence(simulated_sample: np.ndarray, observed_sample: np.ndarray) -> float:
    nrow = simulated_sample.shape[0]
    ncol = simulated_sample.shape[1]
    distances_XX = np.zeros((ncol, nrow, nrow))
    distances_XY = np.zeros((ncol, nrow, nrow))
    
    # ln(n/(n-1))
    log_term = np.log(nrow/(nrow-1))
    for i in range(ncol):
        # pairwise distance
        distances_XX[i] = squareform(pdist(simulated_sample[:,i, np.newaxis], metric="euclidean"))
        # pairwise distance between Z and Y
        distances_XY[i] = cdist(simulated_sample[:,i, np.newaxis], observed_sample[:,i, np.newaxis], metric="euclidean")
    
    # min_(j!=i)|z_i-z_j|
    XXflat = distances_XX.reshape(ncol, -1)
    XXflat_nonzero = np.where(XXflat_nonzero==0, np.inf, XXflat)
    nonzero_min_XX = np.min(XXflat_nonzero, axis=1)

    # min_j|z_i-y_j|
    XYflat = distances_XY.reshape(ncol,-1)
    XYflat_nozero = np.where(XYflat==0, np.inf, XYflat)
    nonzero_min_XY = np.min(XYflat_nozero, axis=1)

    kld = np.log(nonzero_min_XY/nonzero_min_XX)/nrow + log_term
    return kld



In [6]:
observed = odeint(dUdt, (0.5, 0.5), tspan, args=([1], [1]))
x_observed = observed[:, 0]
x_obs_mod = np.tile(x_observed, (1000, 1)).T
y_observed = observed[:, 1]
y_obs_mod = np.tile(y_observed, (1000, 1)).T

In [47]:
tempx = np.array([[0.5, 0.5, 0.5, 0.5], [1, 0.7, 0.55, 0.56], [2, 0.9, 0.6, 0.61]])
tempy = np.array([[0.5, 0.5, 0.5, 0.5], [2, 2, 2, 2], [8, 8, 8, 8]])

nrow = tempx.shape[0]
ncol = tempx.shape[1]
distances_xx = np.zeros((ncol, 3, 3))
distances_xy = np.zeros((ncol, 3, 3))

log_term = np.log(nrow/(nrow-1))

for i in range(ncol):
    distances_xx[i] = squareform(pdist(tempx[:,i, np.newaxis], metric="euclidean"))
    distances_xy[i] = cdist(tempx[:,i, np.newaxis], tempy[:,i, np.newaxis], metric="euclidean")

xflat = distances_xx.reshape(ncol,-1)
xflat_nozero = np.where(xflat==0, np.inf, xflat)
non_zero_min = np.min(xflat_nozero, axis=1)

xyflat = distances_xy.reshape(ncol,-1)
xyflat_nozero = np.where(xyflat==0, np.inf, xyflat)
non_zero_min_xy = np.min(xyflat_nozero, axis=1)

np.log(non_zero_min_xy/non_zero_min)/nrow + log_term

array([0.40546511, 0.40546511, 0.40546511, 0.46623896])

In [60]:
temp = energy_dist(x_sol, x_obs_mod)
min(temp), max(temp)

(0.06394574195342351, 7.726784387045699e+39)

In [None]:
# Cramer-von Mises Distance

# # combine simulated and observed
# tempcomb = np.concatenate((tempx, tempy))

# # Find corresponding ranks in h associated with simulated/observed
# combrank = rankdata(tempcomb, axis=0)
# simrank = combrank[:len(tempx)]
# obsrank = combrank[len(tempx):]

# # Calculate distance
# obs_idx = np.tile(np.arange(1, len(tempy)+1), (len(tempy[0]), 1)).T
# sim_idx = np.tile(np.arange(1, len(tempx)+1), (len(tempy[0]), 1)).T
# obs_sum = np.sum((obsrank - obs_idx)**2, axis=0)
# sim_sum = np.sum((simrank - sim_idx)**2, axis=0)
# rank_sum = len(tempx)*(obs_sum + sim_sum)
# distance = rank_sum / 2*(len(tempx) ** 4) - (4*(len(tempx) **2)-1)/(12*len(tempx))
# np.sort(simrank, axis=0)