# Planning and Learning with Tabular Methods

**Model-based** methods rely on planning as their primary component, while model-free methods primarily rely on learning.

## Models and Planning

A model of the environment mean anything that an agent can use to predict how the  
environment will respond to its actions. Given a state and an action, a model produces a  
prediction of the resultant next state and next reward.

If the model is stochastic, then there are several possible next states and next rewards, each with some probability of
occurring. Some models produce a description of all possibilities and their probabilities; these we call **distribution models**.
Other models produce just one of the possibilities, sampled according to the probabilities; these we call **sample models**.


Given a starting state and action, a sample model produces a possible transition, and a distribution model generates all
possible transitions weighted by their probabilities of occurring. Given a starting state and a policy,
a sample model could produce an entire episode, and a distribution model could generate all possible episodes and their probabilities.
In either case, we say the model is used to simulate the environment and produce simulated experience.

The word **planning** is used in several different ways in di↵erent fields. We use the  
term to refer to any computational process that takes a model as input and produces or  
improves a policy for interacting with the modeled environment.

In artificial intelligence, there are two distinct approaches to planning according to our
definition. State-space planning, which includes the approach we take in this book,
is viewed primarily as a search through the state space for an optimal policy or an
optimal path to a goal.

In what we call plan-space planning, planning is instead a
search through the space of plans.

## Dyna: Integrated Planning, Acting, and Learning

In [86]:
import collections

class TabularDynaQ():

    def __init__(self, action_space, gamma, alpha, n_model_loop, policy):
        self.gamma  = gamma
        self.alpha  = alpha
        self.n_model_loop = n_model_loop
        self.policy = policy

        self.state_action_value = collections.defaultdict(lambda: np.zeros((action_space.n)))
        self.model = collections.defaultdict(lambda: {})

    def action(self, state):
        return self.policy(self.state_action_value, state)
    
    def observe(self, state, action, reward, next_state):
        target = reward + self.gamma * np.max(self.state_action_value[next_state])
        self.state_action_value[state][action] += self.alpha * (target - self.state_action_value[state][action])

        self.model[state][action] = (reward, next_state)

        for _ in range(self.n_model_loop):
            random_state = np.random.choice(list(self.model.keys()))
            random_action = np.random.choice(list(self.model[random_state].keys()))

            reward_from_model, next_state_from_model = self.model[random_state][random_action]

            target = reward_from_model + self.gamma * np.max(self.state_action_value[next_state_from_model])
            self.state_action_value[random_state][random_action] += self.alpha * (target - self.state_action_value[random_state][random_action])
    
    def optimize(self):
        pass

In [87]:
# Windy Gridworld Env
from enum import Enum

import numpy as np

import gymnasium as gym
from gymnasium import spaces

class Actions(Enum):
    RIGHT = 0
    UP = 1
    LEFT = 2
    DOWN = 3

class DynaMaze(gym.Env):
    metadata = { "render_modes": ["ascii"] }

    def __init__(self, render_mode=None, grid_shape=(6, 9)):
        self._grid_shape = grid_shape

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, self._grid_shape[0] - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, self._grid_shape[0] - 1, shape=(2,), dtype=int),
            }
        )

        self._agent_location = np.array([2, 0], dtype=int)
        self._target_location = np.array([0, 8], dtype=int)

        self._walls_locations = np.array([[1, 2], [2, 2], [3, 2], [4, 5], [0, 7], [1, 7], [2, 7]], dtype=int)

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        # if king's moves are activated then we add the diagonales, so 4 more moves
        self.action_space = spaces.Discrete(4)

        """
        The following dictionary maps abstract actions from `self.action_space` to
        the direction we will walk in if that action is taken.
        i.e. 0 corresponds to "right", 1 to "up" etc.
        """
        self._action_to_direction = {
            Actions.UP.value: np.array([-1, 0]),
            Actions.DOWN.value: np.array([1, 0]),
            Actions.LEFT.value: np.array([0, -1]),
            Actions.RIGHT.value: np.array([0, 1]),
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
    
    def _get_obs(self):
        return str(self._agent_location)
    
    def _get_info(self):
        return {
            "distance": np.linalg.norm(
            self._agent_location - self._target_location, ord=1
            )
        }
    
    def _render_frame(self):
        if self.render_mode == "ascii":
            grid = np.zeros((6, 9))
            grid[self._agent_location[0], self._agent_location[1]] = 1
            grid[self._target_location[0], self._target_location[1]] = 6
            print(grid, flush=True)

    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        target_location = np.array([self._agent_location[0] + direction[0], self._agent_location[1] + direction[1]])


        if np.any(np.all(target_location == self._walls_locations, axis=1)):
            self._agent_location = self._agent_location
        else:
            # We use `np.clip` to make sure we don't leave the grid
            self._agent_location[0] = np.clip(
                self._agent_location[0] + direction[0], 0, self._grid_shape[0] - 1
            )

            self._agent_location[1] = np.clip(
                self._agent_location[1] + direction[1], 0, self._grid_shape[1] - 1
            )

        # An episode is done iff the agent has reached the target
        terminated = np.all(self._agent_location == self._target_location)
        reward = 0 if terminated else -1
        observation = self._get_obs()
        info = self._get_info()

        self._render_frame()

        return observation, reward, terminated, False, info
    
    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # reset agent's position
        self._agent_location = np.array([3, 0], dtype=int)

        observation = self._get_obs()
        info = self._get_info()

        self._render_frame()

        return observation, info

In [88]:
def argmax(array):
    return np.random.choice(np.where(array == np.max(array))[0])

def get_epsilon_greedy_policy(epsilon=0.1):
    def epsilon_greedy_policy(state_action_value, state):
        take_random_action_prob = np.random.uniform(0, 1)

        if take_random_action_prob < epsilon:
            random_action = np.random.randint(0, len(state_action_value[state]))
            return random_action
        else:
            greedy_action = argmax(state_action_value[state])
            return greedy_action
    
    return epsilon_greedy_policy

In [89]:
def play_env(env, agent):
    reward_sum = 0
    nb_steps = 0

    terminated = False
    observation, info = env.reset()

    while not terminated:
        action = agent.action(observation)

        new_observation, reward, terminated, truncated, info = env.step(action)

        agent.observe(observation, action, reward, new_observation)

        observation = new_observation

        reward_sum += reward
        nb_steps += 1
    
    agent.optimize()

    return reward_sum, nb_steps

In [None]:
env = DynaMaze()
agent = TabularDynaQ(env.action_space, 0.95, 0.1, 5, get_epsilon_greedy_policy())

reward_sum, nb_steps = play_env(env, agent)

[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 6.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 