In [1]:
import numpy
import torch
import functools
import matplotlib
from torch.distributions import Categorical
from itertools import product

In [2]:
D = 4
H = 8
R0 = 1


def get_reward(pos: torch.Tensor):
    reward = R0
    reward += functools.reduce(lambda a, b: a * b, [0.25 < abs(pos[d] / (H - 1) - 0.5) <= 0.5 for d in range(D)])
    reward += functools.reduce(lambda a, b: a * b, [0.30 < abs(pos[d] / (H - 1) - 0.5) < 0.4 for d in range(D)])
    return reward

In [3]:
class Node:
    def __init__(self, pos: torch.Tensor, term: bool):
        self.pos = pos
        self.term = term
        self.index = get(pos, term)
        self.next = []
        self.prev = []
        self.chance = torch.tensor([])
        self.flow_next = torch.tensor([])


def get(pos: torch.Tensor, term: bool):
    index = 0
    for i in range(D - 1, -1, -1):
        index += 2 * pos[i] * pow(H, D - i - 1)
    index += term

    return index.item()

In [4]:
def add_edge(v, u):
    nodes[int(v)].next.append(int(u))
    nodes[int(u)].prev.append(int(v))

nodes = []
coord_diap = [range(H) for _ in range(D)]
for coords in product(*coord_diap):
    nodes.append(Node(torch.tensor(coords), False))
    nodes.append(Node(torch.tensor(coords), True))

for coords in product(*coord_diap):
    coord = torch.tensor(coords)
    for d in range(D):
        diff = torch.zeros(D)
        diff[d] = 1
        
        if coords[d] + 1 < H:
            add_edge(get(coord, False), get(coord + diff, False))
    add_edge(get(coord, False), get(coord, True))
    
    nodes[get(coord, False)].chance = torch.zeros(len(nodes[get(coord, False)].next))
    nodes[get(coord, False)].flow_next = torch.zeros(len(nodes[get(coord, False)].next))

In [5]:
used = [0 for i in range(len(nodes))]
topsort = []


def dfs(v):
    used[v] = True
    for u in nodes[v].next:
        if used[u]:
            continue
        dfs(u)

    topsort.append(v)


dfs(0)

In [6]:
flow = [0 for i in range(len(nodes))]

for v in topsort:
    if v == 0:
        continue
    if nodes[v].term:
        flow[v] = get_reward(nodes[v].pos)
        
    pb = 1 / len(nodes[v].prev)

    for u in nodes[v].prev:
        flow[u] += flow[v] * pb
        nodes[u].flow_next[nodes[u].next.index(v)] = flow[v] * pb

for v in topsort:
    if nodes[v].term:
        continue

    for i in range(len(nodes[v].next)):
        nodes[v].chance[i] = nodes[v].flow_next[i] / flow[v]

In [7]:
def sample():
    v = 0
    while not nodes[v].term:
        next = Categorical(nodes[v].chance).sample()
        v = nodes[v].next[next.item()]

    return nodes[v].pos

In [11]:
rewards = torch.zeros(*[H for i in range(D)])
coord_diap = [range(H) for _ in range(D)]

for coord in product(*coord_diap):
    rewards[tuple(coord)] = get_reward(torch.tensor(coord))

rewards /= rewards.sum()

def loss(samples=1000):
    counter = torch.zeros(*[H for i in range(D)])
    for i in range(samples):
        pos = sample()
        counter[tuple(pos)] += 1

    counter /= counter.sum()

    return (rewards - counter).abs().sum()


print(loss(400000))

tensor(0.0805)
