In [1]:
from causaldag.structure_learning import gspo
import causaldag as cd
import random
import numpy as np
from causaldag.utils.ci_tests import MemoizedCI_Tester, gauss_ci_test, gauss_ci_suffstat, msep_test

## Setup graph, samples, and sufficient statistic

In [2]:
nnodes = 5
nodes = set(range(nnodes))
nlatent = 3
exp_nbrs = 3
nsamples = 5000

Sample a random Erdos-Renyi DAG with random weights. Then, take $n$ samples and forget the first `nlatent` values.

In [3]:
random.seed(869674)
np.random.seed(986)

dag = cd.rand.directed_erdos(nnodes+nlatent, exp_nbrs/(nnodes+nlatent/2))
gdag = cd.rand.rand_weights(dag)
all_samples = gdag.sample(nsamples)
observed_samples = all_samples[:, nlatent:]

Compute the sufficient statistics for the conditional independence test you are using. Pass those sufficient statistics and the conditional independence testing function to a `CI_Tester`. In this case, we use partial-correlation based CI testing.

In [4]:
suffstat = gauss_ci_suffstat(observed_samples)
ci_tester = MemoizedCI_Tester(gauss_ci_test, suffstat, alpha=.01)

## Estimate the MAG and compare to the true MAG

Compute the ground-truth MAG.

In [5]:
mag = dag.marginal_mag(set(range(nlatent)), relabel='default')

In [6]:
print("Directed edges:", mag.directed)
print("Bidirected edges:", mag.bidirected)

Directed edges: {(1, 2), (0, 4), (3, 4), (1, 0), (3, 2), (1, 4)}
Bidirected edges: {frozenset({0, 2}), frozenset({1, 3}), frozenset({0, 3})}


Estimate a MAG using GSPo.

In [7]:
est_mag = gspo(nodes, ci_tester)

  zero_ixs = p_values > alpha


In [8]:
print("Estimated directed edges:", est_mag.directed)
print("Estimated bidirected edges:", est_mag.bidirected)

Estimated directed edges: {(2, 3), (0, 4), (3, 4), (1, 4), (3, 0)}
Estimated bidirected edges: {frozenset({0, 1}), frozenset({1, 3}), frozenset({1, 2})}


In [9]:
est_mag.shd_skeleton(mag)

1

In [10]:
est_mag.markov_equivalent(mag)

False

## Oracle Estimation

We can also use m-separation in the true graph as our CI tester to check consistency of the algorithm.

In [11]:
oracle_ci_tester = MemoizedCI_Tester(msep_test, mag)

In [12]:
est_mag_oracle = gspo(nodes, oracle_ci_tester)

In [13]:
print("Estimated directed edges:", est_mag_oracle.directed)
print("Estimated bidirected edges:", est_mag_oracle.bidirected)

Estimated directed edges: {(1, 2), (0, 4), (3, 4), (3, 1), (2, 0), (1, 4), (3, 0), (1, 0), (3, 2)}
Estimated bidirected edges: set()


In [14]:
est_mag_oracle.markov_equivalent(mag)

True