In [1]:
from __future__ import annotations
from typing import Type, Union
import pandas as pd
import numpy as np

### Defining types

In [2]:
Id = (str, int)
Edge = (Id, Id)
Node = Union["StateNode", "EvidenceNode", "ActionNode", "UtilityNode"]

### Node classes

In [3]:
class StateNode:
    def __init__(self, name: str, time: int):
        self.id: Id = (name, time)
            
    def get_name(self):
        return self.id[1]
            
    def get_time(self):
        return self.id[1]

    def add_pt(self, pt: dict[Union[Id, str], list[int]]):
        self.pt = pd.DataFrame(pt)
        
    def get_sample(self, sample: dict[Id, int]) -> pd.DataFrame:
        # Get the row relative to the current sample where current node is false
        sample = {k: v for k, v in sample.items() if (k in self.pt) and (k != self.id)}
        df = self.pt
        for node_id in sample:
            df = df.loc[df[node_id] == sample[node_id]]
        df = df.loc[df[self.id] == 0]
        
        # Generate random number
        number = np.random.uniform()
        r = int(np.random.uniform() > df["Prob"])
        
        return r
        
    
class EvidenceNode(StateNode):
    pass


class ActionNode(StateNode):
    def __init__(self, name: str, time: int, actions: list[int]):
        super().__init__(name, time)
        self.actions = actions
        
    def add_value(self, value: int):
        self.pt = pd.DataFrame({self.id: [value, 1-value], "Prob": [1, 0]})


class UtilityNode(StateNode):
    pass

## Dynamic Decision Network Class

In [4]:
class DynamicDecisionNetwork:
    def __init__(self):
        self.node_map: dict[Id, Node] = {}
        self.graph: dict[Id, [Id]] = {}
        self.knowns: list[Id, int] = {}
        self.time: int = 0
        
    def add_nodes(self, nodes: list[Node]):
        for node in nodes:
            if node.id not in self.node_map:
                self.node_map[node.id] = node
                self.graph[node.id] = []
        
    def add_edges(self, edges: list[Edge]):
        for edge in edges:
            s, d = edge
            if s not in self.graph:
                self.graph[s] = []
            self.graph[s].append(d)
            
    def is_leaf(self, node_id: Id):
        # For a node to be leaf it cant have children
        return len(self.graph[node_id]) == 0
    
    def is_root(self, node_id: Id):
        # For a node to be root it cant have parents
        parents = []
        for key in self.graph:
            if node_id in self.graph[key]:
                parents.append(key)
        return len(parents) == 0
        
    def get_edges(self):
        edges = []
        for s in self.graph:
            for d in self.graph[s]:
                edges.append((s, d))
        return edges
        
    def add_pt(self, node_id: Id, pt: dict[Union[Id, str], int]):
        self.node_map[node_id].add_pt(pt)
        
    def get_nodes_by_type(self, node_type: Type(Node)) -> list[Id]:
        return [k for k, v in self.node_map.items() if type(v) is node_type]
    
    def get_nodes_by_type_and_time(self, node_type: Type(Node), time: int) -> list[Id]:
        return [node for node in self.get_nodes_by_type(node_type) if self.node_map[node].get_time() == time]
    
    def query(self, query: list[Id], evidence: dict[Id, int] = {}, n_samples: int = 1000) -> pd.DataFrame:
        
        # Add value of actions to the nodes
        action_nodes = [k for k in evidence if type(self.node_map[k]) is ActionNode]
        for node in action_nodes:
            self.node_map[node].add_value(evidence[node])
        
        # Create empty sampling dictionary
        sample_dict = {node_id: [] for node_id in self.node_map}
        
        # Create multiple samples
        cur_samples = 0
        while (cur_samples < n_samples):
            
            # Create empty sample
            sample = {}
            queue = [k for k in self.node_map if self.is_root(k)]
            
            # Sample a result from each root node
            while len(queue) != 0:
                
                # Sample from head of queue
                sample[queue[0]] = self.node_map[queue[0]].get_sample(sample)
                
                # Add head's children to queue
                queue += self.graph[queue[0]]
                
                # Remove head from queue
                queue.pop(0)
            
            # Pass sample results to sample_dict if it matches with evidence
            matches = [sample[node_id] == evidence[node_id] for node_id in evidence]
            if all(matches):
                for node_id in sample_dict:
                    sample_dict[node_id].append(sample[node_id])
                cur_samples += 1
                
        # Turn result into probability table
        df = pd.DataFrame(sample_dict)
        df = df.value_counts(normalize=True).to_frame("Prob")
        
        # Group over query variables and sum over all other variables
        df = df.groupby(query).sum().reset_index()
        
        return df
    
    def initialize(self):
        # Get all evidence and reward nodes ids
        init_nodes = self.get_nodes_by_type(EvidenceNode) + self.get_nodes_by_type(UtilityNode)
        
        # Get a single sample from the initial network
        self.node_map[("Action", 0)].add_value(1) # FIX THIS
        sample = self.query(query=init_nodes, n_samples=1)
        sample = {col: int(sample[col]) for col in sample if col != "Prob"}
        self.knowns = sample
        
        # Change nodes X0 and X1 to X1 and X2
        X0_X1_ids = self.get_nodes_by_type_and_time(StateNode, 0) + self.get_nodes_by_type_and_time(StateNode, 1)
        for (n, t) in X0_X1_ids:
            self.graph[(n, t+1)] = self.graph.pop((n, t))
            
        # Add vertices A1, E2 and R2
        A_E_R_ids = self.get_nodes_by_type(ActionNode) + self.get_nodes_by_type(EvidenceNode) + self.get_nodes_by_type(UtilityNode)
        self.add_nodes([(n, t+1) for (n, t) in A_E_R_ids])
        
        # Add edges (A1, X2) as replica of (A0, X1)
        node_type_filter = lambda nid, ntype: type(self.node_map[nid]) == ntype
        A0_X1_edges = [(s, d) for (s, d) in self.get_edges() if (node_type_filter(s, ActionNode) and node_type_filter(d, StateNode))]
        A1_X2_edges = [((s, ts+1), (d, td+1)) for ((s, ts), (d, td)) in A0_X1_edges]
        self.add_edges(A1_X2_edges)
        
        # Add edges (X2, E2) as replica of (X1, E1)
        X1_E1_edges = [(s, d) for (s, d) in self.get_edges() if (node_type_filter(s, StateNode) and node_type_filter(d, EvidenceNode))]
        X2_E2_edges = [((s, ts+1), (d, td+1)) for ((s, ts), (d, td)) in X1_E1_edges]
        self.add_edges(X2_E2_edges)
        
        # Add edges (X2, R2) as replica of (X1, R1)
        X1_R1_edges = [(s, d) for (s, d) in self.get_edges() if (node_type_filter(s, StateNode) and node_type_filter(d, UtilityNode))]
        X2_R2_edges = [((s, ts+1), (d, td+1)) for ((s, ts), (d, td)) in X1_R1_edges]
        self.add_edges(X2_E2_edges)
        
        self.time += 1
        

## Defining the DNN initial structure

In [5]:
# Create nodes
state_node0 = StateNode("State", 0)
state_node1 = StateNode("State", 1)
ev_node = EvidenceNode("Evidence", 1)
util_node = UtilityNode("Reward", 1)
action_node = ActionNode("Action", 0, [0, 1])

# Create the initial DDN structure
bn = DynamicDecisionNetwork()
bn.add_nodes([state_node0, state_node1, ev_node, util_node, action_node])
bn.add_edges([
    (("State", 0), ("State", 1)), 
    (("Action", 0), ("State", 1)), 
    (("State", 1), ("Evidence", 1)), 
    (("State", 1), ("Reward", 1))
])

## Giving the DNN its probability tables

In [6]:
# Add data for node State 0
data = {("State", 0): [0,1], "Prob": [1,0]}
bn.add_pt(("State", 0), data)

# Add data for node State 1
data = {
    ("State", 0): [0,0,0,0,1,1,1,1], ("Action", 0): [0,0,1,1,0,0,1,1], ("State", 1): [0,1,0,1,0,1,0,1], 
    "Prob": [0.5,0.5,0.9,0.1,0.7,0.3,0.4,0.6]
}
bn.add_pt(("State", 1), data)

# Add data for node Evidence
data = {("State", 1): [0,0,1,1], ("Evidence", 1): [0,1,0,1], "Prob": [0.8,0.2,0.2,0.8]}
bn.add_pt(("Evidence", 1), data)

# Add data for node Utility
data = {("State", 1): [0,0,1,1], ("Reward", 1): [0,1,0,1], "Prob": [1,0,0.1,0.9]}
bn.add_pt(("Reward", 1), data)

In [7]:
bn.query(query=[("Reward",1)], evidence={("State",0): 0, ("Action",0): 1})

Unnamed: 0,"(Reward, 1)",Prob
0,0,0.903
1,1,0.097


In [None]:
bn.initialize()

In [None]:
bn.knowns