In [2]:
import typing as typ

import numpy as np

from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.factors.discrete import TabularCPD
from pgmpy.inference import VariableElimination

# X -> Actions -> Outcomes <- X
model = DiscreteBayesianNetwork(
    [
        ("X", "A"), # X influences action 
        ("X", "O"), # X influences outcome
        ("A", "O"), # action influences outcome
    ]
)


# Prior P(X) for X - some external world thing that is upstream of what action (A) is taken but also has some independent influence on the outcome (O), so that we can make a distinction between CDT (do-operator causal intervention surgery on A) and EDT (Bayesian update propagation through the net)
cpd_X = TabularCPD(
    variable="X",
    variable_card=2, # number of possible outcomes
    # coin flip
    values=[
        [0.5],
        [0.5],
    ],
)

# P(A|X)
cpd_A = TabularCPD(
    variable="A",
    variable_card=3,
    evidence=["X"],
    evidence_card=[2],
    # uniform if X=0, otherwise: P(A=0|X=1)=0.1, P(A=1|X=1)=0.3, P(A=2|X=1)=0.6
    values=[
        [1/3, 0.1],
        [1/3, 0.3],
        [1/3, 0.6],
    ]
)

# P(O|X,A)
cpd_O = TabularCPD(
    variable="O",
    variable_card=3,
    evidence=["X", "A"],
    evidence_card=[2, 3],
    # rows correspond to possible Observation values; columns corresopnd to possible X×Action values
    values=[
        [1/3, 1/6, 1/4, 0.1, 0.05, 0.90],
        [1/3, 1/4, 1/2, 0.1, 0.05, 0.05],
        [1/3, 7/12, 1/4, 0.8, 0.90, 0.05],
    ],
)


model.add_cpds(cpd_X, cpd_A, cpd_O)
assert model.check_model()

# utility of possible outcomes
O_utility = np.array([10, 30, 20])

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
class DecisionTheoryResult(typ.NamedTuple):
    """A result of a decision theory algorithm applied to a particular problem."""
    dt: typ.Literal["cdt", "edt"] # cdt, edt, etc
    """Decision theory of choice."""
    model: DiscreteBayesianNetwork
    """Causal model used by the algorithm."""
    A_to_O_probs: np.ndarray
    """Matrix where rows correspond to possible action, columns correspond to possible outcomes, and entries correspond to probabilities of outcomes, conditional on actions."""
    A_to_EU: np.ndarray
    """Matrix where rows correspond to possible action, columns correspond to possible outcomes, and entries correspond to expected utilities of outcomes, conditional on actions."""
    O_utility: np.ndarray
    """Terminal/intrinsic utilities of outcomes (vector, i.e. 1D array)."""
    best_action: int
    """Index of the highest-expected-utility action."""
    best_action_EU: float
    """Expected utility of the highest-expected-utility action."""

def model_to_var_cards(model: DiscreteBayesianNetwork) -> dict[str, int]:
    """Extract the model's nodes' cardinalities."""
    return {
        cpd.variable: typ.cast(int, cpd.variable_card)
        for cpd in typ.cast(list[TabularCPD], model.get_cpds())
    }

## EDT

In [4]:
def edt(model: DiscreteBayesianNetwork, O_utility: np.ndarray) -> DecisionTheoryResult:
    var_cards = model_to_var_cards(model)
    assert len(O_utility) == var_cards["O"]
    infer = VariableElimination(model)

    # For every action, get the vector of outcome probabilities.
    A_to_O_probs = np.array(
        [
            infer.query(variables=["O"], evidence={"A": a}).values #type:ignore
            for a in range(var_cards["A"])
        ]
    )
    # Actions to expected utilities.
    A_to_EU = A_to_O_probs @ O_utility
    best_action = A_to_EU.argmax().item()
    best_action_EU = A_to_EU.max().item()
    return DecisionTheoryResult(
        "edt",
        model,
        A_to_O_probs,
        A_to_EU,
        O_utility,
        best_action,
        best_action_EU,
    )

edt_result = edt(model, O_utility)
print(f"{edt_result.best_action = }")
print(f"{edt_result.best_action_EU = }")

edt_result.best_action = 1
edt_result.best_action_EU = 20.438596491228072


## CDT

In [5]:
def cdt(model: DiscreteBayesianNetwork, O_utility: np.ndarray) -> DecisionTheoryResult:
    var_cards = model_to_var_cards(model)
    assert len(O_utility) == var_cards["O"]

    # We will fill out this matrix row-by-row (action-by-action)
    A_to_O_probs = np.zeros((var_cards["A"], var_cards["O"]))

    # For every action, perform a do-operator intervention that assigns prior probability one to that action.
    for a in range(var_cards["A"]):
        # Copy the edges of the model but not conditional probabilities
        do_model = DiscreteBayesianNetwork(model.edges())
        # Remove the X->A connection
        do_model.remove_edges_from([("X", "A")])
        # Define new conditional probability for A, independent of any other nodes
        do_model_cpd_A = TabularCPD(
            variable="A",
            variable_card=var_cards["A"],
            values=[[1.0] if a == a_ else [0.0] for a_ in range(var_cards["A"])],
        )
        # Add the new CPD and the old CPDs (back) to the model
        do_model.add_cpds(cpd_X, do_model_cpd_A, cpd_O)
        assert do_model.check_model()
        # The rest is an in the EDT function
        do_infer = VariableElimination(model=do_model)
        result = do_infer.query(variables=["O"], evidence={"A": a}).values #type: ignore
        A_to_O_probs[a, :] = result #type:ignore
    
    A_to_EU = A_to_O_probs @ O_utility
    best_action = A_to_EU.argmax().item()
    best_action_EU = A_to_EU.max().item()
    return DecisionTheoryResult(
        "cdt",
        model,
        A_to_O_probs,
        A_to_EU,
        O_utility,
        best_action,
        best_action_EU
    )

cdt_res = cdt(model, O_utility)
print(f"{cdt_res.best_action = }")
print(f"{cdt_res.best_action_EU = }")

cdt_res.best_action = 1
cdt_res.best_action_EU = 20.416666666666668
