In [1]:
import numpy as np
from collections import defaultdict, deque
from graphs import graph2 as graph
class ProbabilisticGraphSampler:
    def __init__(self, graph):
        """
        Initialize the sampler with a graph.
        :param graph: 
            keys : random variables
            values : (parents, expression).
                      Example:
                      {
                          "z": ([], lambda: np.random.binomial(1, 0.5)),
                          "y": (["z"], lambda values: np.random.normal(-1.0 if values["z"] == 0 else 1.0, 1.0))
                      }
        """
        self.graph = graph
        self.values = {}
        self.sorted_nodes = self.topological_sort()

    def topological_sort(self):
        """
        Perform a topological sort of the graph.
        :return: A list of nodes in topological order.
        """
        # Build the dependency graph
        in_degree = defaultdict(int)  # Count of incoming edges for each node
        adj_list = defaultdict(list)  # Adjacency list for graph traversal

        for node, (parents, _) in self.graph.items():
            for parent in parents:
                in_degree[node] += 1
                adj_list[parent].append(node)

        # Collect nodes with no incoming edges
        queue = deque([node for node in self.graph if in_degree[node] == 0])
        sorted_nodes = []

        while queue:
            node = queue.popleft()
            sorted_nodes.append(node)

            # Reduce in-degree for child nodes
            for neighbor in adj_list[node]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        if len(sorted_nodes) != len(self.graph):
            print(sorted_nodes)
            raise ValueError("Graph contains a cycle!")

        return sorted_nodes

    def sample_trace(self):
        """
        Sample a single trace by evaluating all nodes in topological order.
        :return: A dictionary of sampled values for all nodes.
        """
        self.values.clear()  # Clear previous sampled values
        for node in self.sorted_nodes:
            parents, expression = self.graph[node]
            # Gather parent values
            parent_values = {p: self.values[p] for p in parents}
            # Evaluate the current node
            self.values[node] = expression(parent_values)
        return self.values




#print(graph2)
# Create the sampler
sampler = ProbabilisticGraphSampler(graph)

# Sample traces
for _ in range(5):
    trace = sampler.sample_trace()
    print(trace)



{'r': -0.328645612419242, 'z_1': -0.8627988172290724, 'z_2': 0.14193076738594435, 'z_3': 0.8603888886431568, 'z_4': -1.0677018511558818, 'z_5': 0.16780821997661727}
{'r': -0.0005221091047100355, 'z_1': 1.2235618929608307, 'z_2': 0.756905416266034, 'z_3': 1.4101992067537954, 'z_4': 1.3539529083478594, 'z_5': -0.14994422009879133}
{'r': -0.2427899820663699, 'z_1': 0.20737797232749863, 'z_2': -1.5529013061258747, 'z_3': -0.5103824357232114, 'z_4': 1.7022129890553668, 'z_5': 0.48002117357596974}
{'r': -0.27734444175649897, 'z_1': 0.22694910617007596, 'z_2': -0.08309487176121189, 'z_3': 1.0082864943536725, 'z_4': 0.7802491731929786, 'z_5': 0.7458784244262707}
{'r': -0.36851699370719226, 'z_1': 0.3583949406132298, 'z_2': 1.0749112655737874, 'z_3': -1.106609349838729, 'z_4': -0.49599562630825544, 'z_5': 0.4701173774697211}


In [2]:
# Test for loop
from ast_class import If, Constant, Sample, Let, Observe, Variable, Assign, For
from translator import Translator

# prog = [
#     Sample("x", ('normal', 0.0, 1.0)),
#     Let("z",
#         If(
#             # condition: x>0.0
#             ('>', Variable("x"), Constant(0.0)),
#             Constant(10.0),
#             Constant(-10.0)
#         ),
#         # body => sample y
#         Sample("y", ('normal', Variable("z"), 1.0))
#     ),
#     Observe(('normal', Variable("y"), 1.0), 0.5)
# ]

# We'll define an initial "acc" = 0
prog = [
    Assign("acc", Constant(0)),
    Sample("x", ("normal", 0.0, 1.0)),
    For("i", 3, [
        Assign("acc", ("+", Variable("acc"), Variable("i")))
    ]),
    Observe(("normal", Variable("x"), 1.0), 0.5),
]

translator = Translator()
graph = translator.translate_program(prog)

print("\nFINAL GRAPH NODES:\n")
for name, node in graph.items():
    print(f"{name} -> {node}")


FINAL GRAPH NODES:

const_0 -> GraphNode(const_0, dist=None, obs=None, parents=[], det_expr=('const', 0))
acc -> GraphNode(acc, dist=None, obs=None, parents=['op_+_9'], det_expr=('id', 'op_+_9'))
const_1 -> GraphNode(const_1, dist=None, obs=None, parents=[], det_expr=('const', 0.0))
const_2 -> GraphNode(const_2, dist=None, obs=None, parents=[], det_expr=('const', 1.0))
normal_3 -> GraphNode(normal_3, dist=None, obs=None, parents=['const_1', 'const_2'], det_expr=('dist', 'normal', ['const_1', 'const_2']))
x -> GraphNode(x, dist=('normal_3',), obs=None, parents=['normal_3'], det_expr=None)
i_4 -> GraphNode(i_4, dist=None, obs=None, parents=[], det_expr=('const', 0))
op_+_5 -> GraphNode(op_+_5, dist=None, obs=None, parents=['acc', 'i_4'], det_expr=('+', 'acc', 'i_4'))
i_6 -> GraphNode(i_6, dist=None, obs=None, parents=[], det_expr=('const', 1))
op_+_7 -> GraphNode(op_+_7, dist=None, obs=None, parents=['acc', 'i_6'], det_expr=('+', 'acc', 'i_6'))
i_8 -> GraphNode(i_8, dist=None, obs=None,

In [6]:
# Test Importance Sampling
from ast_class import If, Constant, Sample, Let, Observe, Variable, Assign, For
from translator import Translator
from importance_sampling import ImportanceSampler
import math

# 1) Define a simple AST:
#    x ~ Normal(0,1)
#    observe x ~ Normal(0,1) with obs=0.5
# program_ast = [
#     Sample("x", ("normal", 0.0, 1.0)),
#     Observe(("normal", Variable("x"), 1.0), 0.5)
# ]

program_ast = [
        Sample("x", ("normal", 0.0, 1.0)),
        Observe(("normal", Variable("x"), 2.0), 0.5)
    ]

# 2) Translate AST -> Graph
translator = Translator()
graph = translator.translate_program(program_ast)

# 3) Instantiate ImportanceSampler
sampler = ImportanceSampler(graph)

# 4) Draw multiple samples
N = 5000
samples = []
weights = []
for _ in range(N):
    env, log_weight = sampler.sample_one()
    samples.append(env)
    weights.append(math.exp(log_weight))  # store weight in linear scale

# 5) Analyze results: e.g. compute weighted mean of x
weighted_sum = 0.0
total_weight = 0.0
for env, w in zip(samples, weights):
    weighted_sum += env["x"] * w
    total_weight += w
posterior_mean_x = weighted_sum / total_weight

print(f"Approx. posterior mean of x = {posterior_mean_x:.3f}")

Approx. posterior mean of x = 0.095
