In [1]:
import numpy as np
import math
from npeet import entropy_estimators as ee  # alternative import for entropy

In [77]:


def compute_average_mutual_information_np(filename, sigma, k_neighbors=3):
    """
    Compute the average mutual information I_c from trajectory data.
    
    The file is assumed to be a UTF-16 encoded text file with shape (n, 2*M),
    In each pair:
      - The even-indexed columns (0,2,4,...) are the X trajectories.
      - The odd-indexed columns (1,3,5,...) are the corresponding Y trajectories.
      
    The formula is:
    
      I_c = - n * ln(√(2π)*σ)
            - (1/(2σ² * M)) * Σₘ Σₖ (xₘₖ - yₘₖ)²
            + Σₖ H(P[yₖ])
            
    where H(P[yₖ]) is the Shannon entropy (in nats) of the distribution
    of the Y values at time step k, estimated nonparametrically via a k-nearest-neighbor (KNN) estimator.
    
    Parameters:
      filename    : str
          Path to the data file (trajectories.txt).
      sigma       : float
          The noise standard deviation.
      k_neighbors : int, optional
          Number of nearest neighbors for the entropy estimator (default: 3).
    
    Returns:
      I_c_est     : float
          The estimated average mutual information (in nats).

    The specific function that we are using is : 

    
def entropy(x, k=3, base=2):
    The classic K-L k-nearest neighbor continuous entropy estimator
    x should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]]
    if x is a one-dimensional scalar and we have four samples
    
    assert k <= len(x) - 1, "Set k smaller than num. samples - 1"
    x = np.asarray(x)
    n_elements, n_features = x.shape
    x = add_noise(x)
    tree = build_tree(x)
    nn = query_neighbors(tree, x, k)
    const = digamma(n_elements) - digamma(k) + n_features * log(2)
    return (const + n_features * np.log(nn).mean()) / log(base)
    """
    # Load data with UTF-16 encoding
    data = np.loadtxt(filename, encoding="utf-16")
    
    # Get dimensions: n time steps and total_cols = 2*M columns
    n, total_cols = data.shape
    M = total_cols // 2  # number of trajectory pairs

    # --- Term 1: Constant from the Gaussian factor ---
    term1 = - n * math.log(math.sqrt(2 * math.pi) * sigma)
    
    # --- Term 2: Sum over squared differences (X - Y) for all trajectories and time steps ---
    sum_sq_diff = 0.0
    for m in range(M):
        x_traj = data[:, 2*m]       # X trajectory for run m
        y_traj = data[:, 2*m + 1]     # Corresponding Y trajectory
        sum_sq_diff += np.sum((x_traj - y_traj)**2)
    term2 = - (1.0 / (2 * sigma**2 * M)) * sum_sq_diff

    # --- Term 3: Sum over time steps of the entropy of the y-values ---
    # For each time step, collect all Y values from each trajectory and estimate the entropy.
    entropy_sum = 0.0
    for k in range(n):
        # Extract all y-values at time step k from all trajectories (columns 1, 3, 5, ...)
        y_k = data[k, 1::2].reshape(-1, 1)  # reshape to 2D: (M,1)
        # Estimate entropy using the k-nearest neighbor estimator (returns entropy in nats)
        H_k = ee.entropy(y_k, k=k_neighbors)
        entropy_sum += H_k
    term3 = entropy_sum

    # --- Combine terms ---
    #I_c_est = term1 + term2 + term3
    I_c_est = term2
    return I_c_est

if __name__ == '__main__':
    filename = "trajectories.txt"
    sigma = 1/64.565
    time = 50
    I_c_estimated = compute_average_mutual_information_np(filename, sigma, k_neighbors=3)/time
    print("Estimated average mutual information (in nats):", I_c_estimated)


Estimated average mutual information (in nats): -51.15008732364364
