In [None]:
import numpy as np
from rpy2.robjects.packages import importr
from rpy2.robjects import numpy2ri

from pyGCM.gcm_test import GCMTest
from pyGCM.residual_methods import linear_regression_residuals

# Make sure rpy2's conversion is activated
numpy2ri.activate()

# Import the R package containing gcm_test
GCM = importr('GeneralisedCovarianceMeasure')


def compare_gcm(E, X, Y,
                get_resid='GP',
                categorical_E=False,
                categorical_X=False,
                categorical_Y=False,
                resid_E=None,
                resid_Y=None):
    """
    Compare Python-based GCM test with the R-based GCM test (kernel ridge).

    If 'resid_E' or 'resid_Y' are provided, we pass them directly
    to the Python GCM test, bypassing internal residual computation.

    Similarly, if 'resid_E'/'resid_Y' are given, we also pass them
    to R's gcm_test() as 'resid.XonZ' or 'resid.YonZ', aligned
    with how we map Python variables to R.

    Args:
        E, X, Y (np.ndarray): Data arrays with shape (n_samples, ...)
        get_resid (str): 'GP', 'linear', 'categorical', or callable.
        categorical_E, categorical_X, categorical_Y (bool):
            Indicate which variables should be treated as categorical in Python GCM.
        resid_E, resid_Y (np.ndarray or None):
            Optional precomputed residuals for E, Y given X (shape ~ (n,) or (n,1)).

    Returns:
        (float, float, float): (python_test_stat, python_p_val, r_p_val)
    """
    py_gcm_tester = GCMTest(get_resid=get_resid)
    python_test_stat, python_p_val = py_gcm_tester.fit(
        E, X, Y,
        resid_E=resid_E,
        resid_Y=resid_Y,
        categorical_E=categorical_E,
        categorical_X=categorical_X,
        categorical_Y=categorical_Y
    )


    r_resid_XonZ = None
    r_resid_YonZ = None
    if resid_Y is not None and resid_E is not None:
        r_resid_XonZ = resid_Y.reshape(-1, 1)  # Python's Y|X
        r_resid_YonZ = resid_E.reshape(-1, 1)  # Python's E|X

        r_gcm_result = GCM.gcm_test(
            X=Y,
            Y=E,
            Z=X,
            regr_method="kernel.ridge",
            resid_XonZ=r_resid_XonZ,
            resid_YonZ=r_resid_YonZ
        )
    else:
        # No precomputed residuals => let R do the regression
        r_gcm_result = GCM.gcm_test(
            X=Y,
            Y=E,
            Z=X,
            regr_method="kernel.ridge"
        )

    r_p_val = r_gcm_result.rx2('p.value')[0]

    print("Python GCM Test (possibly with precomputed residuals):")
    print(f"   Test Statistic = {python_test_stat:.6f}")
    print(f"   p-value        = {python_p_val:.6g}")

    print("R GCM Test (kernel.ridge):")
    print(f"   p-value        = {r_p_val:.6g}")
    print("-" * 50)

    return python_test_stat, python_p_val, r_p_val


if __name__ == "__main__":
    np.random.seed(42)
    n = 100

    ############################################################################
    # TEST 1) All Continuous
    ############################################################################
    X_continuous = np.random.randn(n, 2)
    E_continuous = 0.5 * X_continuous[:, [0]] + 0.1 * np.random.randn(n, 1)
    Y_continuous = 0.8 * X_continuous[:, [1]] + 0.1 * np.random.randn(n, 1)

    print("=== TEST 1: All Continuous ===")
    compare_gcm(E_continuous, X_continuous, Y_continuous,
                get_resid='GP',
                categorical_E=False,
                categorical_X=False,
                categorical_Y=False)

    ############################################################################
    # TEST 2) E is Categorical
    ############################################################################
    X_continuous = np.random.randn(n, 2)
    E_categorical = np.random.choice([0, 1, 2], size=(n, 1), p=[0.3, 0.5, 0.2])
    Y_continuous = 0.8 * X_continuous[:, [1]] + 0.1 * np.random.randn(n, 1)

    print("=== TEST 2: E is Categorical ===")
    compare_gcm(E_categorical, X_continuous, Y_continuous,
                get_resid='GP',
                categorical_E=True,
                categorical_X=False,
                categorical_Y=False)

    ############################################################################
    # TEST 3) X is Categorical
    ############################################################################
    E_continuous = 0.5 * np.random.randn(n, 1)  # some continuous E
    X_categorical = np.random.choice([0, 1, 2], size=(n, 1))
    Y_continuous = 0.5 * X_categorical + 0.1 * np.random.randn(n, 1)

    print("=== TEST 3: X is Categorical ===")
    compare_gcm(E_continuous, X_categorical, Y_continuous,
                get_resid='categorical',
                categorical_E=False,
                categorical_X=True,
                categorical_Y=False)

    ############################################################################
    # TEST 4) Precomputed Residuals (Both Python & R)
    ############################################################################
    # We'll re-use continuous data for clarity. Suppose we want to do a linear
    # regression for Y on X, E on X, store the residuals, and hand them to
    # both Python and R tests so neither fits any regression internally.

    X_cont = np.random.randn(n, 2)
    E_cont = 0.5 * X_cont[:, [0]] + 0.1 * np.random.randn(n, 1)
    Y_cont = 0.8 * X_cont[:, [1]] + 0.1 * np.random.randn(n, 1)

    # Precompute residuals externally
    resid_E_lin = linear_regression_residuals(E_cont, X_cont)  # shape ~ (n,) or (n,1)
    resid_Y_lin = linear_regression_residuals(Y_cont, X_cont)

    print("=== TEST 4: Precomputed Residuals ===")
    compare_gcm(
        E_cont, X_cont, Y_cont,
        get_resid='linear',  # It's ignored for E,Y since we're passing precomputed anyway
        categorical_E=False,
        categorical_X=False,
        categorical_Y=False,
        resid_E=resid_E_lin,
        resid_Y=resid_Y_lin
    )
