In [181]:
import copy
from dataclasses import dataclass
from collections import namedtuple, defaultdict
from msdm.core.mdp import TabularMarkovDecisionProcess
from msdm.core.pomdp import TabularPOMDP
from msdm.core.distributions import DictDistribution

State = namedtuple("State", "x y door_states")
Action = namedtuple("Action", "dx dy open")
Observation = namedtuple("Observation", "x y door_states")

class KeysAndDoors(TabularPOMDP):
    def __init__(
        self,
        coherence=.95,
        discount_rate=.95,
        step_cost=-1,
        target_reward=100,
        grid=None
    ):
        """
        Parameters
        ---------
        :coherence:       
        :discount_rate:
        :step_cost:
        :reward:
        :grid:            A multiline string.
                          `s` is the initial state,
                          `#` are walls,
                          't' is the target
                          'd' are closed doors
                          'o' are open doors
                          'l' are locked doors
        """
        if grid is None:
            grid = \
            """
            t....
            ##d##
            .....
            ##s..
            """
        grid = [list(r.strip()) for r in grid.split('\n') if len(r.strip()) > 0]
        self.grid = grid
        self.loc_features = {}
        self.features_loc = defaultdict(list)

        self.height = len(self.grid)
        self.width = len(self.grid[0])

        self.door_loc = []

        # Initialize all grid positions
        for y, row in enumerate(grid):
            for x, f in enumerate(row):
                self.loc_features[(x, y)] = f
                self.features_loc[f].append((x, y))

                if f == 'd':
                    self.door_loc.append((x, y))

        self.coherence = coherence
        self.discount_rate = discount_rate
        self.step_cost = step_cost
        self.target_reward = target_reward

        

    def initial_state_dist(self):
        x, y = self.features_loc['s'][0]
        initial_door_stat = tuple(False for d in self.door_loc)
        return DictDistribution({
            State(x=x, y=y, door_states=initial_door_stat): 1.0,
        })

    #Current Actions: x, y, open door
    def actions(self, s):
        return (
            Action(0, -1, False),
            Action(0, 1, False),
            Action(-1, 0, False),
            Action(1, 0, False),
            Action(0, 0, True),
        )

    def is_absorbing(self, s):
        loc = (s.x, s.y)
        return self.loc_features[loc] == 't'

    def next_state_dist(self, s, a):
        x, y = s.x, s.y
        nx, ny = (s.x + a.dx, s.y + a.dy)
        door_states = list(s.door_states)
        adjacent = []

        # Don't consider states outside of the grid
        if nx < 0 or nx >= self.width or ny < 0 or ny >= self.height:
            return DictDistribution({State(x=x, y=y, door_states=tuple(door_states)): 1.0})

        # Check if agent is on an edge
        if (x-1 >= 0):
            adjacent.append((x-1, y))
        if (x+1 < self.width):
            adjacent.append((x+1, y))
        if (y-1 >= 0):
            adjacent.append((x, y-1))
        if (y+1 < self.height):
            adjacent.append((x, y+1))

        # Open Door
        if a.open:
            for adj in adjacent:
                # If a door is opened
                if self.loc_features.get(adj) == 'd':
                    door_index = self.door_loc.index(adj)
                    door_states[door_index] = True

        # Handles movement for blocked spaces
        if self.loc_features.get((nx, ny), '#') == '#':
            nx, ny = (s.x, s.y)
        if (nx, ny) in self.door_loc:
            if not door_states[self.door_loc.index((nx, ny))]:
                nx, ny = x, y
        if self.loc_features.get((nx, ny), 'l') == 'l':
            nx, ny = (s.x, s.y)

        return DictDistribution({
            State(x=nx, y=ny, door_states=tuple(door_states)): 1.0
        })

    def reward(self, s, a, ns):
        r=0
        r += self.step_cost
        if self.loc_features[(ns.x, ns.y)] == 't':
            r += self.target_reward
        return r

    def observation_dist(self, a, ns):
        obs_grid = set()
        radius = 4
        # Bresenham's line alg. (double check if implemented correctly)
        def line_of_sight(x0, y0, xf, yf):
            dx = abs(xf-x0)
            dy = abs(yf-y0)
            if x0 < xf:
                sx = 1
            else:
                sx = -1
            if y0 < yf:
                sy = 1
            else:
                sy = -1
            decisionParam = dx - dy

            x = x0
            y = y0


            while True:
                # Stop if we hit a wall or our target
                if self.loc_features.get((x, y), '#') == '#' and (x, y) != (xf, yf):
                    return False
                if x == xf and y == yf:
                    return True

                # Determine whether we need to change the y param
                decisionParam2 = 2 * decisionParam
                if decisionParam2 > -dy:
                    decisionParam -= dy
                    x += sx
                if decisionParam2 < dx:
                    decisionParam += dx
                    y += sy
            
        # Determine which parts of the grid are currently visible
        for dy in range(-radius, radius+1):
            for dx in range(-radius, radius +1):
                cx = ns.x + dx
                cy = ns.y + dy

                if (cx < 0 or cx >= self.width):
                    continue 
                if (cy < 0 or cy >= self.height):
                    continue

                if line_of_sight(ns.x, ns.y, cx, cy):
                    obs_grid.add((cx, cy))

        obs_door = []
        # Keep track of which doors are visible
        for door, (doorx, doory) in enumerate(self.door_loc):
            if (doorx, doory) in obs_grid:
                obs_door.append(ns.door_states[door])
            else:
                obs_door.append(None)

        return DictDistribution({
                Observation(x=ns.x, y=ns.y, door_states=tuple(obs_door)): 1.0
        })

    def state_string(self, s):
        grid = copy.deepcopy(self.grid)

        for door, (door_x, door_y) in enumerate(self.door_loc):
            if s.door_states[door]:
                grid[door_y][door_x] = 'o'

        for y, row in enumerate(grid):
            for x, f in enumerate(row):
                if (x, y) == (s.x, s.y):
                    grid[y][x] = '@'
        return '\n'.join([''.join(r) for r in grid])


In [182]:
from msdm.algorithms import  PointBasedValueIteration
hh = KeysAndDoors(
    coherence=.9,
    grid=
        """
        t....
        ##d##
        .....
        ##s..
        """,
    discount_rate=.9
)



try:
    print("Starting planning process...")
    pbvi_res = PointBasedValueIteration(
        min_belief_expansions=0,
        max_belief_expansions=100
    ).plan_on(hh)
    print("Planning successful!")
except Exception as e:
    print(f"Error during planning: {type(e).__name__}: {str(e)}")

Starting planning process...
Planning successful!


In [184]:
# pbvi_res.policy
traj = pbvi_res.policy.run_on(hh)
tuple(traj[0])
for t, step in enumerate(traj):
    sstr = hh.state_string(step.state)
    print(f"state {t}: \n", sstr, sep="")
    print(step.action)
    print(step.observation)
    print()

state 0: 
t....
##d##
.....
##@..
Action(dx=0, dy=0, open=True)
Observation(x=2, y=3, door_states=(False,))

state 1: 
t....
##d##
.....
##@..
Action(dx=-1, dy=0, open=False)
Observation(x=2, y=3, door_states=(False,))

state 2: 
t....
##d##
.....
##@..
Action(dx=0, dy=0, open=True)
Observation(x=2, y=3, door_states=(False,))

state 3: 
t....
##d##
.....
##@..
Action(dx=0, dy=0, open=True)
Observation(x=2, y=3, door_states=(False,))

state 4: 
t....
##d##
.....
##@..
Action(dx=1, dy=0, open=False)
Observation(x=3, y=3, door_states=(False,))

state 5: 
t....
##d##
.....
##s@.
Action(dx=-1, dy=0, open=False)
Observation(x=2, y=3, door_states=(False,))

state 6: 
t....
##d##
.....
##@..
Action(dx=-1, dy=0, open=False)
Observation(x=2, y=3, door_states=(False,))

state 7: 
t....
##d##
.....
##@..
Action(dx=1, dy=0, open=False)
Observation(x=3, y=3, door_states=(False,))

state 8: 
t....
##d##
.....
##s@.
Action(dx=0, dy=0, open=True)
Observation(x=3, y=3, door_states=(False,))

state 9: 
t