In [None]:
import sys
sys.path.insert(0, '../..')
# sys.path.insert(0, '../../ArgCausalDisco')
# sys.path.insert(0, '../../notears')

In [None]:
from src.abasp.factory import ABASPSolverFactory
from src.abasp.utils import Fact, RelationEnum

In [None]:
def parse_fact_line(line):
    line = line.strip()
    relation, rest = line.split('(', 1)
    rest = rest.split(')')[0]
    node1, node2, node_set = rest.split(',')
    node1 = int(node1.strip())
    node2 = int(node2.strip())
    node_set = node_set.strip()
    if node_set == 'empty':
        node_set = {}
    else:
        node_set = node_set.strip()[1:].split('y')
        node_set = {int(x) for x in node_set if x.strip()}

    return Fact(
        relation=RelationEnum[relation.strip()],
        node1=node1,
        node2=node2,
        node_set=node_set,
        score=1,
    )

def facts_from_file(filename):
    facts = []
    with open(filename, 'r') as f:
        for line in f:
            facts.append(parse_fact_line(line))
    return facts


path_start = '../../ArgCausalDisco/encodings/test_lps/'

facts_from_file(f'{path_start}five_node_colombo_example.lp')[:4]

In [None]:
def get_models(n_nodes, facts):
    # Create the ABASPSolverFactory
    factory = ABASPSolverFactory(n_nodes=n_nodes)

    # Create the solver
    solver = factory.create_solver(facts)
    models = solver.enumerate_extensions('ST')
    return models

def get_graph_from_model(model):
    graph = set()
    for a in model.assumptions:
        if a.startswith('arr'):
            _, node1, node2 = a.split('_')
            node1 = int(node1)
            node2 = int(node2)
            graph.add((node1, node2))
    return frozenset(graph)

In [None]:
import networkx as nx
import numpy as np
import pandas as pd

In [None]:
def CausalABA(n_nodes, facts_location):
    facts = facts_from_file("../../ArgCausalDisco/" + facts_location)
    models = get_models(n_nodes, facts)
    return models, None

def model_to_set_of_arrows(model):
    return get_graph_from_model(model)

In [None]:
def five_node_colombo_example():
    scenario = "five_node_colombo_example"
    facts_location = f"encodings/test_lps/{scenario}.lp"
    B_true = np.array( [[ 0,  0,  1,  1,  1],
                        [ 0,  0,  1,  0,  1],
                        [ 0,  0,  0,  1,  1],
                        [ 0,  0,  0,  0,  1],
                        [ 0,  0,  0,  0,  0]])
    n_nodes = B_true.shape[0]

    expected = {frozenset({(0, 2), (1, 2), (0, 4), (2, 4), (3, 4), (0, 3), (1, 4), (2, 3)})}

    models, _ = CausalABA(n_nodes, facts_location)
    model_sets = set()
    for model in models:
        arrows = model_to_set_of_arrows(model)
        model_sets.add(frozenset(arrows))            
    print(model_sets)
    assert set(model_sets) == expected
    return model_sets
five_node_colombo_example()

In [None]:
def five_node_sprinkler_example():
    scenario = "five_node_sprinkler_example"
    facts_location = f"encodings/test_lps/{scenario}.lp"
    B_true = np.array( [[ 0,  1,  1,  0,  0],
                        [ 0,  0,  0,  1,  0],
                        [ 0,  0,  0,  1,  0],
                        [ 0,  0,  0,  0,  1],
                        [ 0,  0,  0,  0,  0]])
    n_nodes = B_true.shape[0]

    expected = frozenset({(0, 1), (0, 2), (1, 3), (2, 3), (3, 4)})

    models, _ = CausalABA(n_nodes, facts_location)
    model_sets = set()
    for model in models:
        arrows = model_to_set_of_arrows(model)
        model_sets.add(frozenset(arrows))            

    print(model_sets)
    assert expected in model_sets

In [None]:
five_node_sprinkler_example()