In [None]:
import torch
import numpy as np
import random


class BlicketEnv:
    def __init__(self, true_hypothesis):
        # Represent the ground truth causal structure
        self.true_hypothesis = true_hypothesis

        # Action Space: A, B, C, AB, BC, AC, ABC
        self.actions = ["A", "B", "C", "AB", "BC", "AC", "ABC"]

    def step(self, action):
        # Take action and return observation
        if self.true_hypothesis.endswith("dis"):
            if set(action).issubset(set(self.true_hypothesis[:-4])):
                return 1 # detector turns on
        else: # conjunctive hypothesis
            if set(action) == set(self.true_hypothesis[:-4]):
                return 1 # detector turns on
        return 0 # detector remains off

    def reset(self):
        pass # No need for resetting in the current setup


class BayesianModel:
    def __init__(self):
        # Define all possible hypotheses
        self.hypotheses = [
            "A-dis", "B-dis", "C-dis", "AB-dis", "AC-dis", "BC-dis", "ABC-dis",
            "AB-con", "AC-con", "BC-con", "ABC-con"
        ]

        # Initialize with uniform prior
        self.prior = torch.ones(len(self.hypotheses)) / len(self.hypotheses)

    def update_belief(self, action, observation):
        likelihoods = torch.tensor([BlicketEnv(h).step(action) for h in self.hypotheses], dtype=torch.float32)

        # Apply Bayes Rule
        numerator = likelihoods * self.prior
        denominator = torch.sum(numerator)
        self.prior = numerator / denominator

    def select_action(self, epsilon=0.5):
        expected_information_gain = []
        eta = 1e-10  # Small constant to avoid log(0)

        # Estimate expected information gain for each action
        for action in BlicketEnv("").actions:
            future_belief = self.prior.clone()
            likelihoods = torch.tensor([BlicketEnv(h).step(action) for h in self.hypotheses], dtype=torch.float32)
            numerator = likelihoods * future_belief
            denominator = torch.sum(numerator)
            future_belief = numerator / (denominator + eta)  # Prevent division by zero

            # Calculate KL-divergence safely
            divergence = torch.sum(future_belief * (torch.log(future_belief + eta) - torch.log(self.prior + epsilon)))
            expected_information_gain.append(divergence)

        # ε-greedy exploration
        if random.random() < epsilon:
            return random.choice(BlicketEnv("").actions)
        else:
            return BlicketEnv("").actions[np.argmax(expected_information_gain)]


true_hypothesis = "AB-con"  # for example
env = BlicketEnv(true_hypothesis)
model = BayesianModel()

for _ in range(100):  # Example number of steps
    action = model.select_action()
    observation = env.step(action)
    model.update_belief(action, observation)
    print(f"Action: {action}, Observation: {observation}, Belief: {model.prior}")


Action: ABC, Observation: 0, Belief: tensor([0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.5000, 0.0000, 0.0000,
        0.0000, 0.5000])
Action: A, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: A, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: AB, Observation: 1, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: BC, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: C, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: C, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: B, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: C, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: A, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.])
Action: C, Observation: 0, Belief: tensor([0., 0., 0., 0., 0., 0

In [None]:
def evaluate(true_hypothesis, num_episodes=1000, epsilon=0.1):
    """
    Evaluate the BayesianModel's ability to determine the true hypothesis.
    """
    env = BlicketEnv(true_hypothesis)
    model = BayesianModel()

    # Track how many steps it takes for the model to become confident
    steps_to_converge = []

    for episode in range(num_episodes):
        model.prior = torch.ones(len(model.hypotheses)) / len(model.hypotheses)  # Reset belief
        converged = False
        step = 0
        while not converged and step < 100:  # Max of 20 steps for convergence
            action = model.select_action(epsilon)
            observation = env.step(action)
            model.update_belief(action, observation)

            step += 1

            # Check if belief for the true hypothesis exceeds a threshold (e.g., 0.9)
            true_hypothesis_index = model.hypotheses.index(true_hypothesis)
            if model.prior[true_hypothesis_index] > 0.9:
                converged = True

        steps_to_converge.append(step if converged else 100)

    return sum(steps_to_converge) / num_episodes  # Average steps to converge

true_hypothesis = "AB-con"  # for example
avg_steps = evaluate(true_hypothesis, epsilon=0.5)
print(f"On average, it took {avg_steps:.2f} steps to determine the true hypothesis.")


On average, it took 20.00 steps to determine the true hypothesis.


In [None]:
class Node:
    def __init__(self, belief, parent_action=None):
        self.belief = belief
        self.children = {}
        self.parent_action = parent_action

def expand_node(node, depth=1, max_depth=5):
    if depth > max_depth:
        return np.inf

    # If only one hypothesis is left or its probability is very high
    max_belief = torch.max(node.belief)
    if len(node.belief[node.belief > 1e-6]) == 1 or max_belief > 0.95:
        return depth

    # For each action, create a child node and compute its belief
    model = BayesianModel()
    model.prior = node.belief

    depths = []
    for action in BlicketEnv("").actions:
        future_belief = model.prior.clone()
        likelihoods = torch.tensor([BlicketEnv(h).step(action) for h in model.hypotheses], dtype=torch.float32)
        numerator = likelihoods * future_belief
        denominator = torch.sum(numerator)
        future_belief = numerator / denominator

        child_node = Node(future_belief, action)
        node.children[action] = child_node
        depths.append(expand_node(child_node, depth + 1, max_depth))

    # Return the minimum expected depth from this node
    return min(depths)

def best_policy_sequence():
    root = Node(torch.ones(len(BayesianModel().hypotheses)) / len(BayesianModel().hypotheses))
    expand_node(root)
    sequence = []

    current = root
    while current.children:
        action = min(current.children, key=lambda a: len(current.children[a].children))
        sequence.append(action)
        current = current.children[action]

    return sequence

def evaluate_tree_policy(true_hypothesis, max_steps=20):
    """
    Evaluate the tree-based policy's ability to determine the true hypothesis.
    """
    env = BlicketEnv(true_hypothesis)
    model = BayesianModel()

    action_sequence = best_policy_sequence()

    for step in range(min(max_steps, len(action_sequence))):
        action = action_sequence[step]
        observation = env.step(action)
        model.update_belief(action, observation)

        # Check if the belief for the true hypothesis exceeds a threshold (e.g., 0.9)
        true_hypothesis_index = model.hypotheses.index(true_hypothesis)
        if model.prior[true_hypothesis_index] > 0.9:
            return step + 1  # Return number of steps to determine the hypothesis

    return max_steps

# Test on different hypotheses
all_hypotheses = ["A-dis", "B-dis", "C-dis", "AB-dis", "AC-dis", "BC-dis", "ABC-dis", "AB-con", "AC-con", "BC-con", "ABC-con"]
results = {}
for hypothesis in all_hypotheses:
    avg_steps = evaluate_tree_policy(hypothesis)
    results[hypothesis] = avg_steps

print(results)


{'A-dis': 20, 'B-dis': 20, 'C-dis': 20, 'AB-dis': 20, 'AC-dis': 20, 'BC-dis': 20, 'ABC-dis': 2, 'AB-con': 20, 'AC-con': 20, 'BC-con': 20, 'ABC-con': 20}
