In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from collections import Counter

from causallearn.search.ConstraintBased.PC import pc
from causallearn.utils.cit import kci

from epc import epc

In [2]:
def gdata(n):
    """
    Generate data based on a causal chain: A -> B -> C -> D -> E.

    Args:
        n (int): Number of samples.

    Returns:
        np.ndarray: A dataset with columns [A, B, C, D, E].
    """
    def random_nonlinear_function(x, function_type):
        """Applies a nonlinear transformation based on the specified type."""
        if function_type == "linear":
            return x
        elif function_type == "cubic":
            return x ** 3
        elif function_type == "tanh":
            return np.tanh(x)
        else:
            raise ValueError("Unsupported function type")

    # Define random nonlinear transformation types for each causal relationship
    F_type_A = np.random.choice(["linear", "cubic", "tanh"])
    F_type_B = np.random.choice(["linear", "cubic", "tanh"])
    F_type_C = np.random.choice(["linear", "cubic", "tanh"])
    F_type_D = np.random.choice(["linear", "cubic", "tanh"])
    F_type_E = np.random.choice(["linear", "cubic", "tanh"])

    # Generate noise for each variable
    E_A = np.random.normal(0, 1, n)
    E_B = np.random.normal(0, 1, n)
    E_C = np.random.normal(0, 1, n)
    E_D = np.random.normal(0, 1, n)
    E_E = np.random.normal(0, 1, n)

    # Generate data for each variable in the causal chain
    A = np.random.normal(0, 3, n)  # A is the root cause, no parents
    B = random_nonlinear_function(A + E_B, F_type_B)
    C = random_nonlinear_function(B + E_C, F_type_C)
    D = random_nonlinear_function(C + E_D, F_type_D)
    E = random_nonlinear_function(D + E_E, F_type_E)

    # Standardize each variable
    A = (A - np.mean(A)) / np.std(A)
    B = (B - np.mean(B)) / np.std(B)
    C = (C - np.mean(C)) / np.std(C)
    D = (D - np.mean(D)) / np.std(D)
    E = (E - np.mean(E)) / np.std(E)

    # Combine data into a single array
    data = np.array([A, B, C, D, E]).T

    return data


truSk = np.array([[ 0,  1,  0,  0,  0],  # A -> B
                  [ 1,  0,  1,  0,  0],  # B -> C
                  [ 0,  1,  0,  1,  0],  # C -> D
                  [ 0,  0,  1,  0,  1],  # D -> E
                  [ 0,  0,  0,  1,  0]]) # No outgoing edge from E

In [4]:
t = 500
n = 800
err = 0

for i in tqdm(range(t), desc="Processing"):
    np.random.seed(i)
    data = gdata(n)
    cg = pc(data, 0.01, kci, show_progress=False)
    sk = np.abs(cg.G.graph)
    if not np.all(np.abs(sk) == truSk):
        err += 1
err/t

Processing: 100%|██████████| 500/500 [3:03:20<00:00, 22.00s/it]  


0.502

In [None]:
t = 100
n = 800
err_e = 0

for i in tqdm(range(t), desc="Processing"):
    np.random.seed(i)
    data = gdata(n)
    ecg = epc(data, 0.01, "Gamma", 4, show_progress=False)
    esk =np.abs(ecg.G.graph)
    if not np.all(np.abs(esk) == truSk):
        err_e += 1
err_e/t

Processing: 100%|██████████| 100/100 [20:06<00:00, 12.06s/it]


0.54

In [8]:
t = 100
n = 800
err_e = 0

for i in tqdm(range(t), desc="Processing"):
    np.random.seed(i)
    data = gdata(n)
    ecg = epc(data, 0.01, "Gamma", 16, show_progress=False)
    esk =np.abs(ecg.G.graph)
    if not np.all(np.abs(esk) == truSk):
        err_e += 1
err_e/t

Processing: 100%|██████████| 100/100 [07:02<00:00,  4.22s/it]


0.56

In [None]:
t = 100
n = 800
err_e = 0

for i in tqdm(range(t), desc="Processing"):
    np.random.seed(i)
    data = gdata(n)
    ecg = epc(data, 0.01, "ACAT", 4, show_progress=False)
    esk =np.abs(ecg.G.graph)
    if not np.all(np.abs(esk) == truSk):
        err_e += 1
err_e/t