In [1]:
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal
import itertools

np.random.seed(37)

class Data(object):
    def __init__(self, data, means, cov, points=50):
        self.data = data
        self.means = means
        self.cov = cov
        self.df = pd.DataFrame(data, columns=['x1', 'x2', 'x3'])
        self.p_xyz = multivariate_normal(means, cov)
        self.p_xz = multivariate_normal(means[[0, 2]], cov[[0, 2]][:, [0, 2]])
        self.p_yz = multivariate_normal(means[[1, 2]], cov[[1, 2]][:, [1, 2]])
        self.p_z = multivariate_normal(means[2], cov[2, 2])
        self.x_vals = np.linspace(self.df.x1.min(), self.df.x1.max(), num=points, endpoint=True)
        self.y_vals = np.linspace(self.df.x2.min(), self.df.x2.max(), num=points, endpoint=True)
        self.z_vals = np.linspace(self.df.x3.min(), self.df.x3.max(), num=points, endpoint=True)

    def get_cmi(self):
        x_vals = self.x_vals
        y_vals = self.y_vals
        z_vals = self.z_vals
        prod = itertools.product(*[x_vals, y_vals, z_vals])

        p_z = self.p_z
        p_xz = self.p_xz
        p_yz = self.p_yz
        p_xyz = self.p_xyz
        quads = ((p_xyz.pdf([x, y, z]), p_z.pdf(z), p_xz.pdf([x, z]), p_yz.pdf([y, z])) for x, y, z in prod)

        cmi = sum((xyz * (np.log(z) + np.log(xyz) - np.log(xz) - np.log(yz)) for xyz, z, xz, yz in quads))
        return cmi


def get_serial(N=1000):
    x1 = np.random.normal(1, 1, N)
    x3 = np.random.normal(1 + 3.5 * x1, 1, N)
    x2 = np.random.normal(1 - 2.8 * x3, 3, N)

    data = np.vstack([x1, x2, x3]).T
    means = data.mean(axis=0)
    cov = np.cov(data.T)

    return Data(data, means, cov)

def get_diverging(N=1000):
    x3 = np.random.normal(1, 1, N)
    x1 = np.random.normal(1 + 2.8 * x3, 1, N)
    x2 = np.random.normal(1 - 2.8 * x3, 3, N)

    data = np.vstack([x1, x2, x3]).T
    means = data.mean(axis=0)
    cov = np.cov(data.T)

    return Data(data, means, cov)

def get_converging(N=1000):
    x1 = np.random.normal(2.8, 1, N)
    x2 = np.random.normal(8.8, 3, N)
    x3 = np.random.normal(1 + 0.8 * x1 + 0.9 * x2, 1, N)


    data = np.vstack([x1, x2, x3]).T
    means = data.mean(axis=0)
    cov = np.cov(data.T)
    print(cov)

    return Data(data, means, cov)

m_s = get_serial()
m_d = get_diverging()
m_c = get_converging()

[[1.01322936 0.03372871 0.79304614]
 [0.03372871 8.70631141 7.86940133]
 [0.79304614 7.86940133 8.65863189]]


In [2]:
%%time
m_s.get_cmi()

CPU times: user 8.71 s, sys: 0 ns, total: 8.71 s
Wall time: 8.73 s


0.012372411431840964

In [3]:
%%time
m_d.get_cmi()

CPU times: user 8.98 s, sys: 0 ns, total: 8.98 s
Wall time: 9.01 s


9.612131185749485e-05

In [4]:
%%time
m_c.get_cmi()

CPU times: user 8.79 s, sys: 57.6 ms, total: 8.85 s
Wall time: 8.8 s


11.209703669891105

In [6]:
N = 1000
x1 = np.random.normal(1, 1, N)
x3 = np.random.normal(1 + 3.5 * x1, 1, N)
x2 = np.random.normal(1 - 2.8 * x3, 3, N)

data = np.vstack([x1, x2, x3]).T
means = data.mean(axis=0)
cov = np.cov(data.T)



In [2]:
from CMI_estimation import CMINE_lib as CMINE
dataset = CMINE.create_dataset(GenModel='Gaussian_nonZero', Params=(10,1,5), Dim=5, N=80000)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  method='lar', copy_X=True, eps=np.finfo(np.float).eps,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  method='lar', copy_X=True, eps=np.finfo(np.float).eps,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.float).eps, copy_Gram=True, verbose=0,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.float).eps, copy_X=True, fit_path=True,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  eps=np.finfo(np.float).eps, copy_X=True, fit_path=True,
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes

In [12]:
data = []
for i in range(len(dataset)):
    data.append(dataset[i].mean(axis=1))

data = np.array(data).T

In [19]:
data.shape

(80000, 3)

In [20]:
#data = np.vstack([x1, x2, x3]).T
means = data.mean(axis=0)
cov = np.cov(data.T)
print(Data(data, means, cov).get_cmi())



nan


In [22]:
print(conditional_mutual_information(data[0],data[1],data[2]))

-3.2958368660043287


In [24]:
print(gaussian_conditional_mutual_info(data[0],data[1],data[2]))

LinAlgError: singular matrix

In [14]:
from sklearn.feature_selection import mutual_info_regression
from sklearn.metrics import mutual_info_score
from sklearn.datasets import make_regression
import numpy as np

# Generate sample data with 3 features and 1 target variable
X, y = make_regression(n_samples=100, n_features=3, n_informative=2, random_state=42)

# Calculate mutual information between each feature and the target variable
mi = mutual_info_regression(X, y)

# Sort features by mutual information scores
sorted_idx = np.argsort(mi)[::-1]

# Select top 2 features with highest mutual information scores
X_selected = X[:, sorted_idx[:2]]

# Calculate Gaussian Conditional Mutual Information between the 2 selected features
gcmi = mutual_info_score(X_selected[:, 0], X_selected[:, 1])

print("Gaussian Conditional Mutual Information:", gcmi)


Gaussian Conditional Mutual Information: 4.605170185988092


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  dtype=np.int):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  dtype=np.int)


In [15]:
X.shape

(100, 3)

In [17]:
y.shape

(100,)

In [18]:
mi

array([0.06029122, 0.47274624, 0.27362509])

In [3]:
import numpy as np
from scipy import stats

def conditional_mutual_information(x, y, z):
    """
    Computes the conditional mutual information I(x,y|z) for three random variables x, y, and z.
    x, y, z: numpy arrays of shape (n,) containing the values of the random variables.
    """
    # Compute the joint probabilities
    p_xyz, _ = np.histogramdd((x, y, z), bins=10)
    p_xyz /= np.sum(p_xyz)
    
    # Compute the marginal probabilities
    p_xz = np.sum(p_xyz, axis=1)
    p_yz = np.sum(p_xyz, axis=0)
    p_z = np.sum(p_xyz)
    
    # Compute the conditional probabilities
    p_x_given_z = p_xz / p_z
    p_y_given_z = p_yz / p_z
    
    # Compute the mutual information
    mi = stats.entropy(p_xyz.flatten())
    mi -= stats.entropy(p_xz.flatten())
    mi -= stats.entropy(p_yz.flatten())
    
    # Compute the conditional mutual information
    cmi = mi - stats.entropy(p_x_given_z.flatten()) - stats.entropy(p_y_given_z.flatten())
    
    return cmi

In [4]:
import numpy as np
from scipy.stats import multivariate_normal

def gaussian_conditional_mutual_info(x, y, z):
    """
    Calculates the Gaussian conditional mutual information between two variables x and y given a third variable z.
    """
    n = len(x)
    xyz = np.column_stack((x, y, z))
    cov_xyz = np.cov(xyz, rowvar=False)
    cov_xz = cov_xyz[:2, :2]
    cov_yz = cov_xyz[1:, 1:]
    cov_z = cov_xyz[2:, 2:]
    inv_cov_z = np.linalg.inv(cov_z)
    
    # Calculate the entropy of x given z
    mean_xz = np.mean(xyz[:, :2], axis=0)
    mvn_xz = multivariate_normal(mean_xz, cov_xz)
    entropy_x_given_z = mvn_xz.entropy()
    
    # Calculate the entropy of y given z
    mean_yz = np.mean(xyz[:, 1:], axis=0)
    mvn_yz = multivariate_normal(mean_yz, cov_yz)
    entropy_y_given_z = mvn_yz.entropy()
    
    # Calculate the joint entropy of x and y given z
    mvn_xyz = multivariate_normal(np.mean(xyz, axis=0), cov_xyz)
    entropy_xy_given_z = mvn_xyz.entropy()
    
    # Calculate the conditional mutual information
    mi_xy_given_z = entropy_x_given_z + entropy_y_given_z - entropy_xy_given_z
    return mi_xy_given_z

In [5]:
import numpy as np
from sklearn.neighbors import KernelDensity

def gaussian_conditional_mutual_info_highdim(x, y, z, bandwidth=1.0, kernel='gaussian'):
    """
    Calculates the Gaussian conditional mutual information between two high-dimensional variables x and y given a third variable z.
    """
    n = len(x)
    xyz = np.column_stack((x, y, z))
    
    # Fit kernel density estimators
    kde_xz = KernelDensity(bandwidth=bandwidth, kernel=kernel).fit(xyz[:, :2])
    kde_yz = KernelDensity(bandwidth=bandwidth, kernel=kernel).fit(xyz[:, 1:])
    kde_xyz = KernelDensity(bandwidth=bandwidth, kernel=kernel).fit(xyz)
    
    # Calculate the entropy of x given z
    log_density_xz = kde_xz.score_samples(xyz[:, :2])
    entropy_x_given_z = -np.mean(log_density_xz)
    
    # Calculate the entropy of y given z
    log_density_yz = kde_yz.score_samples(xyz[:, 1:])
    entropy_y_given_z = -np.mean(log_density_yz)
    
    # Calculate the joint entropy of x and y given z
    log_density_xyz = kde_xyz.score_samples(xyz)
    entropy_xy_given_z = -np.mean(log_density_xyz)
    
    # Calculate the conditional mutual information
    mi_xy_given_z = entropy_x_given_z + entropy_y_given_z - entropy_xy_given_z
    return mi_xy_given_z

In [6]:
#I(X;Y|Z) = H(X|Z) + H(Y|Z) - H(X,Y|Z)

In [7]:
gaussian_conditional_mutual_info_highdim(dataset[0],dataset[1], dataset[2])

6.515766454232065