# Mathematical Underpinnings - Lab 6

In [1]:
from sklearn.metrics import mutual_info_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from tqdm import tqdm

## Useful functions

In [2]:
def discetize_2bins(X):
    X_discrete = 1 * (X >= 0)
    return X_discrete

In [3]:
def conditional_permutation(X, Z):

    z_values = np.unique(Z)
    n_z_values = len(z_values)
    n = len(Z)

    X_b = np.zeros(n)

    for i in range(n_z_values):

        z_value_tmp = z_values[i]

        X_b[Z == z_value_tmp] = np.random.permutation(X[Z == z_value_tmp])

    return X_b

In [4]:
def conditional_mutual_information(X, Y, Z):

    z_values = np.unique(Z)
    n_z_values = len(z_values)
    n = len(Z)

    cmi = 0

    for i in range(n_z_values):

        z_value_tmp = z_values[i]
        z_condition = (Z == z_value_tmp)

        X_z = X[z_condition]
        Y_z = Y[z_condition]

        mi_XY_z = mutual_info_score(X_z, Y_z)
        p_z = np.sum(z_condition)/n

        cmi += p_z*mi_XY_z

    return cmi

In [5]:
# II(X;Y;Z)
def interaction_information(X, Y, Z):
    return conditional_mutual_information(X, Y, Z) - mutual_info_score(X, Y)

In [6]:
# II(X;Y;Z1;Z2)
def interaction_information2(X, Y, Z1, Z2):
    Z_1_and_2 = 2*Z2 + Z1
    return interaction_information(X, Y, Z_1_and_2) - interaction_information(X, Y, Z1) - interaction_information(X, Y, Z2)

## Task 1

In [54]:
def secmi2(X, Y, Z):
    n, k = Z.shape

    ii_sum = 0

    for i in range(k):
        ii_sum += interaction_information(Y, X, Z[:, i])

    return mutual_info_score(X, Y) + ii_sum


def secmi3(X, Y, Z):
    n, k = Z.shape

    ii2_sum = 0

    for i in range(k):
        for j in range(i + 1, k):
            ii2_sum += interaction_information2(Y, X, Z[:, i], Z[:, j])

    return secmi2(X, Y, Z) + ii2_sum

### a)

In [55]:
def cond_indep_test_permutation(X, Y, Z, B, stat):

    n_col_Z = Z.shape[1]
    Z_1dim = np.dot(Z, 2 ** np.linspace(0, n_col_Z - 1, n_col_Z))

    if stat == "cmi":
        stat_value = conditional_mutual_information(X, Y, Z_1dim)
    if stat == "secmi2":
        stat_value = secmi2(X, Y, Z)
    if stat == "secmi3":
        stat_value = secmi3(X, Y, Z)

    condition_p_value = 0
    for b in range(B):
        X_b = conditional_permutation(X, Z_1dim)

        if stat == "cmi":
            stat_value_b = conditional_mutual_information(X_b, Y, Z_1dim)
        if stat == "secmi2":
            stat_value_b = secmi2(X_b, Y, Z)
        if stat == "secmi3":
            stat_value_b = secmi3(X_b, Y, Z)

        if stat_value <= stat_value_b:
            condition_p_value += 1

    p_value = (1 + condition_p_value) / (1 + B)

    return 2 * len(X) * stat_value, p_value

### b)

In [61]:
def get_data_b(n=100):
    rng = np.random.default_rng()

    Y_tilde = rng.normal(size=n)
    Y = discetize_2bins(Y_tilde)

    Z1_tilde = rng.normal(loc=Y / 2, scale=1, size=n)
    Z2_tilde = rng.normal(loc=Y / 2, scale=1, size=n)
    Z3_tilde = rng.normal(loc=Y / 2, scale=1, size=n)

    Z1 = discetize_2bins(Z1_tilde)
    Z2 = discetize_2bins(Z2_tilde)
    Z3 = discetize_2bins(Z3_tilde)

    X_tilde = rng.normal(loc=Z1 / 2, scale=1, size=n)
    X = discetize_2bins(X_tilde)

    Z = np.stack([Z1, Z2, Z3], axis=-1)

    return X, Y, Z

In [115]:
p_value_threshold = 0.05
n = 5000
B = 100
N = 100

In [118]:
# X independent of Y given (Z1, Z2)

results = []
for _ in range(N):
    X, Y, Z = get_data_b(n=n)
    Z = Z[:, 0:2]
    cmi_test = cond_indep_test_permutation(X, Y, Z, B, "cmi")
    secmi2_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    secmi3_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    results.append({
        "cmi_stat_value": cmi_test[0],
        "cmi_p_value": cmi_test[1],
        "secmi2_stat_value": secmi2_test[0],
        "secmi2_p_value": secmi2_test[1],
        "secmi3_stat_value": secmi3_test[0],
        "secmi3_p_value": secmi3_test[1],
    })
results_df1 = pd.DataFrame.from_records(results)

In [119]:
print("Rejection rate (cmi):", np.sum(results_df1["cmi_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi2):", np.sum(results_df1["secmi2_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi3):", np.sum(results_df1["secmi3_p_value"] < p_value_threshold) / N)

Rejection rate (cmi): 0.05
Rejection rate (secmi2): 0.09
Rejection rate (secmi3): 0.06


In [116]:
# X independent of Y given (Z2, Z3)

results = []
for _ in range(N):
    X, Y, Z = get_data_b(n=n)
    Z = Z[:, 1:3]
    cmi_test = cond_indep_test_permutation(X, Y, Z, B, "cmi")
    secmi2_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    secmi3_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    results.append({
        "cmi_stat_value": cmi_test[0],
        "cmi_p_value": cmi_test[1],
        "secmi2_stat_value": secmi2_test[0],
        "secmi2_p_value": secmi2_test[1],
        "secmi3_stat_value": secmi3_test[0],
        "secmi3_p_value": secmi3_test[1],
    })
results_df2 = pd.DataFrame.from_records(results)

In [117]:
print("Rejection rate (cmi):", np.sum(results_df2["cmi_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi2):", np.sum(results_df2["secmi2_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi3):", np.sum(results_df2["secmi3_p_value"] < p_value_threshold) / N)

Rejection rate (cmi): 0.46
Rejection rate (secmi2): 0.52
Rejection rate (secmi3): 0.51


### c)

In [134]:
def get_data_c(n=100):

    rng = np.random.default_rng()

    X = rng.choice(2, size=n)
    Z1 = rng.choice(2, size=n)
    Z2 = rng.choice(2, size=n)
    Z3 = rng.choice(2, size=n)

    tmp = (X + Z1 + Z2) % 2
    Y = np.zeros(n)
    Y[tmp == 1] = rng.choice(2, size=np.sum(tmp==1), p=[0.2, 0.8])
    Y[tmp == 0] = rng.choice(2, size=np.sum(tmp==0), p=[0.8, 0.2])

    Z = np.stack([Z1, Z2, Z3], axis=-1)

    return X, Y, Z

In [136]:
# X independent of Y given (Z1, Z2)

results = []
for _ in range(N):
    X, Y, Z = get_data_c(n=n)
    Z = Z[:, 0:2]
    cmi_test = cond_indep_test_permutation(X, Y, Z, B, "cmi")
    secmi2_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    secmi3_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    results.append({
        "cmi_stat_value": cmi_test[0],
        "cmi_p_value": cmi_test[1],
        "secmi2_stat_value": secmi2_test[0],
        "secmi2_p_value": secmi2_test[1],
        "secmi3_stat_value": secmi3_test[0],
        "secmi3_p_value": secmi3_test[1],
    })
results_df1 = pd.DataFrame.from_records(results)

In [137]:
print("Rejection rate (cmi):", np.sum(results_df1["cmi_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi2):", np.sum(results_df1["secmi2_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi3):", np.sum(results_df1["secmi3_p_value"] < p_value_threshold) / N)

Rejection rate (cmi): 1.0
Rejection rate (secmi2): 0.03
Rejection rate (secmi3): 0.04


In [141]:
# X independent of Y given (Z2, Z3)

results = []
for _ in range(N):
    X, Y, Z = get_data_c(n=n)
    Z = Z[:, 1:3]
    cmi_test = cond_indep_test_permutation(X, Y, Z, B, "cmi")
    secmi2_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    secmi3_test = cond_indep_test_permutation(X, Y, Z, B, "secmi2")
    results.append({
        "cmi_stat_value": cmi_test[0],
        "cmi_p_value": cmi_test[1],
        "secmi2_stat_value": secmi2_test[0],
        "secmi2_p_value": secmi2_test[1],
        "secmi3_stat_value": secmi3_test[0],
        "secmi3_p_value": secmi3_test[1],
    })
results_df2 = pd.DataFrame.from_records(results)

In [142]:
print("Rejection rate (cmi):", np.sum(results_df2["cmi_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi2):", np.sum(results_df2["secmi2_p_value"] < p_value_threshold) / N)
print("Rejection rate (secmi3):", np.sum(results_df2["secmi3_p_value"] < p_value_threshold) / N)

Rejection rate (cmi): 0.03
Rejection rate (secmi2): 0.03
Rejection rate (secmi3): 0.04


## Task 2
 
in R