#  Maze demo

In this notebook, we show how to define a maze domain and solve it with algorithms scikit-decide hub.
If you want to see the demo without caring about details, run all cells and go directly to [last section](#Demo). 

In [None]:
from enum import Enum
from typing import NamedTuple, Optional, Any, List
from copy import deepcopy
from time import sleep
from collections import deque
import random
from math import sqrt

import ipywidgets as widgets
import matplotlib.pyplot as plt
from stable_baselines3 import PPO

from skdecide import DeterministicPlanningDomain, Space, Value
from skdecide.builders.domain import UnrestrictedActions, Renderable
from skdecide.utils import rollout, match_solvers, load_registered_solver
from skdecide.hub.space.gym import ListSpace, EnumSpace, MultiDiscreteSpace
from skdecide.hub.solver.lazy_astar import LazyAstar

Use `ipympl` matplotlib backend so that matplotlib figures are included in jupyter widgets.

In [None]:
%matplotlib ipympl

## Domain definition

Define your state space (agent positions) & action space (agent movements).

In [None]:
class State(NamedTuple):
    x: int
    y: int


class Action(Enum):
    up = 0
    down = 1
    left = 2
    right = 3

Define your domain type from a base template (DeterministicPlanningDomain here) with optional refinements (UnrestrictedActions & Renderable here).

In [None]:
class D(DeterministicPlanningDomain, UnrestrictedActions, Renderable):
    T_state = State  # Type of states
    T_observation = T_state  # Type of observations
    T_event = Action  # Type of events
    T_value = float  # Type of transition values (rewards or costs)
    T_predicate = bool  # Type of logical checks
    T_info = None  # Type of additional information in environment outcome

Implement the maze domain by 
- filling all non-implemented methods 
- adding a constructor to define the maze & start/end positions.

And also define (to help solvers that need it)
- an heuristic for search algorithms
- state features for width-based algorithms


In [None]:
empty_cell = " "

class MyDomain(D):

    def __init__(self, start, end, maze_str):
        self.start = start
        self.end = end
        self.maze_str = maze_str.strip()
        self.maze = self.maze_str.splitlines()

        # for rendering
        rendered_maze = []
        for line in self.maze:
            row = []
            for c in line:
                if c == empty_cell:
                    row.append(1)
                else:
                    row.append(0)
            if len(row) > 0:
                rendered_maze.append(row)
        rendered_maze[self.end.x][self.end.y] = 0.7
        self.rendered_maze = rendered_maze
        
        self._ax = None
        self._fig = None
        self._image = None

    def _get_next_state(self, memory: D.T_state, action: D.T_event) -> D.T_state:
        # Move agent according to action (except if bumping into a wall)
        next_x, next_y = memory.x, memory.y
        if action == Action.up:
            next_x -= 1
        if action == Action.down:
            next_x += 1
        if action == Action.left:
            next_y -= 1
        if action == Action.right:
            next_y += 1
        return State(next_x, next_y) if self.maze[next_x][next_y] == empty_cell else memory

    def _get_transition_value(self, memory: D.T_state, action: D.T_event, next_state: Optional[D.T_state] = None) -> \
            Value[D.T_value]:
        # Set cost to 1 when moving (energy cost) and to 2 when bumping into a wall (damage cost)
        return Value(cost=1 if next_state != memory else 2)

    def _get_initial_state_(self) -> D.T_state:
        # Set the start position as initial state
        return self.start

    def _get_goals_(self) -> Space[D.T_observation]:
        # Set the end position as goal
        return ListSpace([self.end])

    def _is_terminal(self, state: D.T_state) -> D.T_agent[D.T_predicate]:
        # Stop an episode only when goal reached
        return self._is_goal(state)

    def _get_action_space_(self) -> Space[D.T_event]:
        # Define action space
        return EnumSpace(Action)

    def _get_observation_space_(self) -> Space[D.T_observation]:
        # Define observation space
        num_rows = len(self.maze)
        num_cols = max([len(row) for row in self.maze])
        return MultiDiscreteSpace([num_rows, num_cols])

    def _render_from(self, memory: D.T_state, **kwargs: Any) -> Any:
        #  display maze in a matplotlib image
        if self._ax is None:
            plt.ioff()
            fig, ax = plt.subplots(1)
            ax.set_aspect('equal')  # set the x and y axes to the same scale
            plt.xticks([])  # remove the tick marks by setting to an empty list
            plt.yticks([])  # remove the tick marks by setting to an empty list
            ax.invert_yaxis()  # invert the y-axis so the first row of data is at the top
            self._ax = ax
            self._fig = fig
            plt.ion()
            fig.canvas.header_visible = False
            fig.canvas.footer_visible = False
            fig.canvas.resizable = False
            fig.canvas.layout.width = '100%'
            fig.canvas.layout.height = '100%'
            fig.set_dpi(1)
            fig.set_figwidth(500)
            fig.set_figheight(500)
        maze = deepcopy(self.rendered_maze)
        maze[memory.x][memory.y] = 0.3
        if self._image is None:
            self._image = self._ax.imshow(maze)
        else:
            self._image.set_data(maze)
            self._ax.figure.canvas.draw()
        plt.pause(0.001)
        
    def heuristic(self, s: D.T_state) -> Value:
        return Value(cost=sqrt((self.end.x - s.x)**2 + (self.end.y - s.y)**2))
    
    def state_features(self, s: D.T_state) -> List[float]:
        return [s.x, s.y]


## Maze generator
We use here the "recursive backtracker" algorithm which is a randomized depth-first search algorithm.
The chosen implementation is actually an iterative one to avoid max recursion stack issues. 
See for instance https://en.wikipedia.org/wiki/Maze_generation_algorithm  for more details.

In [None]:
odd_row_pattern = "+-"
odd_row_end_cell = "+"
even_row_pattern = "| "
even_row_end_cell = "|"
empty_cell = " "

deltas_neighbour = [
    (0, 2),
    (2, 0),
    (0, -2),
    (-2, 0),
]


def init_maze(width, height):
    semiwidth = width // 2
    semiheight = height // 2 
    odd_row = list(odd_row_pattern) * semiwidth + [odd_row_end_cell]
    even_row = list(even_row_pattern) * semiwidth + [even_row_end_cell]
    maze = [list(row) for _ in range(semiheight) for row in (odd_row, even_row)] + [list(odd_row)]
    return maze


def get_neighbours(cell, width, height):
    i, j = cell
    # all potential neighbours
    neighbours = [(i + di, j +dj) for (di, dj) in deltas_neighbour]
    # remove illicit neighbours
    neighbours = [(i, j) for (i, j) in neighbours 
                  if (i>0) and (i<height) and (j>0) and (j<width)]
    return neighbours

def generate_maze_str(width, height):
    """Generate a maze string with given width and height.
    
    Width and height are assumed to be odd so that the maze is surrounded by a wall
    and 1 charcter over 2 is a cell followed by a connexion or a wall.
    
    """
    maze = init_maze(width, height)
    
    first_cell = (1, 1)
    stack = deque([first_cell])
    visited = {first_cell}
    while len(stack) > 0:
        current_cell = stack.pop()
        unvisited_neighbours = [cell for cell in get_neighbours(current_cell, width, height) if cell not in visited]
        if len(unvisited_neighbours) > 0:
            stack.append(current_cell)
            next_cell = random.choice(unvisited_neighbours)
            i1, j1 = current_cell
            i2, j2 = next_cell
            wall_to_remove = ()
            maze[(i1 + i2) // 2][(j1 + j2) // 2] = empty_cell
            visited.add(next_cell)
            stack.append(next_cell)
    
    maze_str = "\n".join(["".join(row) for row in maze])
    return maze_str


Here is an example of generated maze string.

In [None]:
print(generate_maze_str(25, 15))

## Potential solvers

List of interesting solvers with default config.

In [None]:
try_solvers = [

    # Simple greedy
    {'name': 'Simple greedy',
     'entry': 'SimpleGreedy',
     'need_domain_factory': False,
     'config': {}},

    # Lazy A* (classical planning)
    {'name': 'Lazy A* (classical planning)',
     'entry': 'LazyAstar',
     'need_domain_factory': False,
     'config': {'heuristic': lambda d, s: d.heuristic(s), 'verbose': False}},

    # A* (planning)
    {'name': 'A* (planning)',
     'entry': 'Astar',
     'need_domain_factory': True,
     'config': {'heuristic': lambda d, s: d.heuristic(s),
                'parallel': False,
                'debug_logs': False}},

    # LRTA* (classical planning)
    {'name': 'LRTAStar',
     'entry': 'LRTAstar',
     'need_domain_factory': False,
     'config': {'max_depth': 200,
                'max_iter': 1000,
                'heuristic': lambda d, s: d.heuristic(s),
                'verbose': True}},

    # UCT (reinforcement learning / search)
    {'name': 'UCT (reinforcement learning / search)',
     'entry': 'UCT',
     'need_domain_factory': True,
     'config': {'time_budget': 200,
                'rollout_budget': 100000,
                'heuristic': lambda d, s: (d.heuristic(s), 10000),
                'online_node_garbage': True,
                'max_depth': 1000,
                'ucb_constant': 1.0 / sqrt(2.0),
                'parallel': False,
                'debug_logs': False}},

    # PPO: Proximal Policy Optimization (deep reinforcement learning)
    {'name': 'PPO: Proximal Policy Optimization (deep reinforcement learning)',
     'entry': 'StableBaseline',
     'need_domain_factory': False,
     'config': {'algo_class': PPO, 'baselines_policy': 'MlpPolicy',
                'learn_config': {'total_timesteps': 30000},
                'verbose': 1}},

    # POMCP: Partially Observable Monte-Carlo Planning (online planning for POMDP)
    {'name': 'POMCP: Partially Observable Monte-Carlo Planning (online planning for POMDP)',
     'entry': 'POMCP',
     'need_domain_factory': False,
     'config': {}},

    # CGP: Cartesian Genetic Programming (evolution strategy)
    {'name': 'CGP: Cartesian Genetic Programming (evolution strategy)',
     'entry': 'CGP',
     'need_domain_factory': False,
     'config': {'folder_name': 'TEMP', 'n_it': 25}},

    # Rollout-IW (classical planning)
    {'name': 'Rollout-IW (classical planning)',
     'entry': 'RIW',
     'need_domain_factory': True,
     'config': {'state_features': lambda d, s: d.state_features(s),
                'use_state_feature_hash': False,
                'use_simulation_domain': True,
                'time_budget': 200,
                'rollout_budget': 100000,
                'max_depth': 1000,
                'exploration': 0.25,
                'online_node_garbage': True,
                'continuous_planning': True,
                'parallel': False,
                'debug_logs': False}},

    # IW (classical planning)
    {'name': 'IW (classical planning)',
     'entry': 'IW',
     'need_domain_factory': True,
     'config': {'state_features': lambda d, s: d.state_features(s),
                'node_ordering': lambda a_gscore, a_novelty, a_depth, b_gscore, b_novelty, b_depth: a_novelty > b_novelty,
                'parallel': False,
                'debug_logs': False}},

    # BFWS (classical planning)
    {'name': 'BFWS (planning) - (num_rows * num_cols) binary encoding (1 binary variable <=> 1 cell)',
     'entry': 'BFWS',
     'need_domain_factory': True,
     'config': {'state_features': lambda d, s: d.state_features(s),
                'heuristic': lambda d, s: d.heuristic(s),
                'termination_checker': lambda d, s: d.is_goal(s),
                'parallel': False,
                'debug_logs': False}}
]
solvers = map(lambda s: dict(s, entry=load_registered_solver(s['entry'])), try_solvers)
solvers = list(filter(lambda s: s['entry'] is not None, solvers))

Check compatibility with maze domain.

In [None]:
solver_candidates = [s['entry'] for s in solvers if s['entry'] is not None]
default_domain = MyDomain(start=State(1,1), end=State(x=13, y=23), maze_str=generate_maze_str(height=15, width=25))
compatible_solver_classes = match_solvers(default_domain, candidates=solver_candidates)
compatible_solvers = [s for s in solvers if s["entry"] in compatible_solver_classes]
print(compatible_solver_classes)

## Jupyter widgets

Define jupyter widgets used for demo

In [None]:
width_slider = widgets.IntSlider(
    value=21,
    min=1,
    max=101,
    step=2,
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d',
)
width_label = widgets.Label("Width:", layout={"width": "15%"})
width_box = widgets.HBox(
    (
        width_label,
        width_slider
    ),
    layout={"width": "100%"},
)

height_slider = widgets.IntSlider(
    value=21,
    min=1,
    max=101,
    step=2,
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d',
)
height_label = widgets.Label("Height:", layout={"width": "15%"})
height_box = widgets.HBox(
    (
        height_label,
        height_slider
    ),
    layout={"width": "100%"},
)

generate_button = widgets.Button(
    description='Generate',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Generate maze',
    icon='wrench' # (FontAwesome names without the `fa-` prefix)
)

solver_options = [(solver["name"], solver) for solver in compatible_solvers]
default_solver = solver_options[0][1]
for name, solver in solver_options:
    if name.startswith("Lazy A*"):
        default_solver = solver
        break
solver_select = widgets.Dropdown(
    options=solver_options,
    value=default_solver,
#     description='Solver:',
    disabled=False,
    layout={"width": "90%"}

)
solve_button = widgets.Button(
    description='Solve',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Solve maze',
    icon='cogs', # (FontAwesome names without the `fa-` prefix)
)

box_layout = widgets.Layout(
    align_items="center", 
    border = "1px solid black",
    margin = "1em",
    padding = "1em",
) 

vbox_generate = widgets.VBox(
    (width_box, height_box, generate_button), 
    layout = box_layout,
)


vbox_solve = widgets.VBox(
    (solver_select, solve_button), 
    layout = box_layout,
)

vbox_settings = widgets.VBox(
    (vbox_generate, vbox_solve), 
    layout = widgets.Layout(width="40%")
)

maze_output = widgets.Output(
    layout= widgets.Layout(
        border="1px solid black",
        width="60%",
        height="500px"
    )
)

console_output = widgets.Output(
    layout= widgets.Layout(
#         border="1px solid black",
        min_height="50px",
        margin="1em 0em",
    )
)

demo_mainwidget = widgets.VBox(
    (widgets.HBox((vbox_settings, maze_output),), console_output),
)


Manage widgets events.

In [None]:
class Demo:
    """Store domain."""
    domain = None

demo = Demo()

def init_maze_output():
    maze_output.clear_output()
    with maze_output:
        reset_maze_output()
    
def reset_maze_output():
    demo.domain.render(demo.domain.start)

    
def on_generate_clicked(b):
    console_output.clear_output()
    width = width_slider.value
    height = height_slider.value
    maze_str = generate_maze_str(width=width, height=height)
    start = State(1, 1)
    end = State(height - 2, width - 2)
    demo.domain = MyDomain(start, end, maze_str)
    init_maze_output()

def on_solve_clicked(b):
    reset_maze_output()
    console_output.clear_output()
    selected_solver = solver_select.value
    solver_class = selected_solver["entry"]
    solver_config = selected_solver["config"]
    domain_factory = lambda: MyDomain(demo.domain.start, demo.domain.end, demo.domain.maze_str)
    if selected_solver["need_domain_factory"]:
        solver_config["domain_factory"] = domain_factory
    width = width_slider.value
    height = height_slider.value
    max_steps = (width -2) * (height -2)
    # Compute solution and visualize it
    with console_output:
        with solver_class(**solver_config) as solver:
            MyDomain.solve_with(solver, domain_factory)
            rollout(demo.domain, solver, max_steps=max_steps, max_framerate=80, verbose=False)
    
generate_button.on_click(on_generate_clicked)
solve_button.on_click(on_solve_clicked)

# init first maze    
on_generate_clicked(None)


## Demo

Use the widgets to decide the size of maze to generate and choose the solver to use to solve it.

In [None]:
demo_mainwidget