In [1]:
import numpy as np
import logging
from scipy import stats
from sklearn.gaussian_process import GaussianProcessRegressor

In [2]:
def _rbf_dot(x):

    n = x.shape[0]

    G = np.sum(x * x, 1).reshape(n, 1)
    Q = np.tile(G, (1, n))
    H = Q + G.T - 2 * np.dot(x, x.T)

    dists = Q + G.T - 2 * np.dot(x, x.T)
    dists = dists - np.tril(dists)
    dists = dists.reshape(n ** 2, 1)
    deg = np.sqrt(0.5 * np.median(dists[dists > 0]))

    H = np.exp(-H / 2 / (deg ** 2))

    return H

def hsic_test(x, y, alpha=0.05, normalize=True):
    """Hilbert-Schmidt independence criterion
    https://github.com/huawei-noah/trustworthyAI/blob/master/gcastle/castle/common/independence_tests.py#L550
    """
    if normalize:
        x = (x - np.mean(x)) / np.std(x)
        y = (y - np.mean(y)) / np.std(y)

    n = x.shape[0]

    H = np.identity(n) - np.ones((n, n), dtype=float) / n
    K = _rbf_dot(x)
    L = _rbf_dot(y)
    Kc = np.dot(np.dot(H, K), H)
    Lc = np.dot(np.dot(H, L), H)

    testStat = np.sum(Kc.T * Lc) / n

    varHSIC = (Kc * Lc / 6) ** 2
    varHSIC = (np.sum(varHSIC) - np.trace(varHSIC)) / n / (n - 1)
    varHSIC = varHSIC * 72 * (n - 4) * (n - 5) / n / (n - 1) / (n - 2) / (n - 3)

    K = K - np.diag(np.diag(K))
    L = L - np.diag(np.diag(L))

    bone = np.ones((n, 1), dtype=float)
    muX = np.dot(np.dot(bone.T, K), bone) / n / (n - 1)
    muY = np.dot(np.dot(bone.T, L), bone) / n / (n - 1)
    mHSIC = (1 + muX * muY - muX - muY) / n
    al = mHSIC ** 2 / varHSIC
    bet = varHSIC * n / mHSIC

    thresh = stats.gamma.ppf(1 - alpha, al, scale=bet)[0][0]

    if testStat < thresh:
        return 1
    else:
        return 0

In [3]:
 # Nonlinear ANM Algorithm
class NonlinearANM(object):

    def __init__(self, model=None, gp_params={}, test_method=hsic_test):
        self.logger = logging.getLogger(__name__)
        self.model = GaussianProcessRegressor(**gp_params) if model is None else model
        self.test_method = hsic_test

    def bivariate_learn(self, X, Y):
        accepted = {}
        causal_graph = np.zeros((2, 2))
        
        ## Independence Test
        ind_test_res = self.test_method(X, Y)

        if ind_test_res:
            self.logger.info('No Causality between X and Y because they are independent')
            return causal_graph

        ## Consistency Test
        for idx in range(2):
            # Cause (C) -> Effect (E)
            candidate = 'X->Y' if idx == 0 else 'Y->X'
            C, E = (X, Y) if idx == 0 else (Y, X)
            
            # Fit Nonlinear Model
            model.fit(C, E)
            
            # Residuals
            pred = model.predict(C)
            residual = E - pred

            # Goodness of Fit score
            score = model.score(C, E)

            # Residual Independence Test
            ind_test_res = self.test_method(C, E)

            if ind_test_res:
                # accept this model if independent
                causal_graph[idx, (idx+1)%2] = score
                self.logger.info('Accept', 'X->Y' if idx == 0 else 'Y->X')

        # Result
        if sum(causal_graph) == 0:
            self.logger.info('Cannot idenfity the valid cause and effect')
            return causal_graph

        elif sum(causal_graph) == 2:
            # compute higher score
            causal_graph[np.argmax(causal_graph)] = 1
            causal_graph[0<causal_graph<1] = 0
            self.logger.info('Accept an edge with the higher score')

        else:
            causal_graph[causal_graph > 0] = 1
        
        return causal_graph

In [4]:
# Generate pseudo dataset
num_data = 100
num_features = 1

np.random.seed(0)
X = np.random.rand(num_data, num_features)
y = np.random.rand(num_data, 1)
X.shape, y.shape

((100, 1), (100, 1))

In [5]:
model = NonlinearANM()

In [6]:
causal_graph = model.bivariate_learn(X, y)
causal_graph

array([[0., 0.],
       [0., 0.]])