In [1]:
from typing import Sequence, Tuple, Dict
import random

import pandas as pd
import numpy as np
from scipy.optimize import minimize

A few examples of State Value function in the gridworld settings explained in [Chapter 3](http://incompleteideas.net/book/RLbook2018.pdf)

# Gridworld

### Create Gridworld environment

In [2]:
class GridWorldEnv:
    """Manage Gridworld environment"""
    def __init__(self, special_rewards: Dict, special_moves: Dict, dim: int = 5, ):
        """
        Create gridworld
        :param special_rewards: special reward when moving from this place, dict((row, col), reward)
        :param special_rewards: special moves when moving from this place, dict((row, col), (dest_row, dest_col))
        :param dim: grid lateral dimension
        """
        assert len(special_moves) == len(special_rewards)
        assert all(x in special_rewards for x in special_moves)
        
        self.special_reward = special_rewards
        self.special_moves = special_moves
        self.dim = dim
        self.possible_actions = {
            'up': (-1, 0),
            'down': (1, 0),
            'left': (0, -1),
            'right': (0, 1)
        }
        
    def show_full_space(self):
        """Show all possible action/state combinations and their rewards"""
        for i in range(self.dim):
            for k in range(self.dim):
                for action_name, action in self.possible_actions.items():
                    reward, next_pos = self.step(current_position=(i, k), action_name=action_name)
                    print(f"from {(i, k)} move {action_name} --> into {next_pos} with R={reward}")
                    
    def step(self, current_position: Tuple[int, int], action_name: str):
        """Get reward and next position/state"""
        assert action_name in self.possible_actions, f"Action {action_name} not among possible actions"
        
        action_x, action_y = self.possible_actions[action_name]
        i, k = current_position
        next_pos = i+action_x, k+action_y
        if (i, k) in self.special_moves:  # special move get fixed reward and always move to one location
            next_pos = self.special_moves[(i, k)]
            reward = self.special_reward[(i, k)]
        elif next_pos[0] < 0 or next_pos[0] > self.dim-1 or next_pos[1] < 0 or next_pos[1] > self.dim-1:
            next_pos = i, k
            reward = -1
        else:
            reward = 0
        
        return reward, next_pos
    
    def grid_to_vec(self, row: int, col: int) -> int:
        """get index of vector which corresponds to a grid position"""
        assert 0 <= row < self.dim, f"Invalid row number {row}, max is {self.dim - 1}"
        assert 0 <= col < self.dim, f"Invalid column number {col}, max is {self.dim - 1}"
        
        return row * self.dim + col

    def vec_to_grid(self, idx: int) -> Tuple[int, int]:
        """get grid position which corresponds to index of vector"""
        assert 0 <= idx < self.dim * self.dim, f"Invalid index {idx}, max is {self.dim * self.dim - 1}"
        
        return idx // self.dim, idx % self.dim
    
    def dict_to_grid(self, fun_on_grid: dict):
        ans = []
        for row in range(self.dim):
            ans.append({f"col_{col}": fun_on_grid[(row, col)] for col in range(self.dim)})

        return pd.DataFrame(ans)

Use GridWorld with a small grid and display full spectrum of states-actions

In [3]:
env = GridWorldEnv(special_rewards={(1, 1): 10}, special_moves={(1, 1): (0, 0)}, dim=2)
env.show_full_space()

from (0, 0) move up --> into (0, 0) with R=-1
from (0, 0) move down --> into (1, 0) with R=0
from (0, 0) move left --> into (0, 0) with R=-1
from (0, 0) move right --> into (0, 1) with R=0
from (0, 1) move up --> into (0, 1) with R=-1
from (0, 1) move down --> into (1, 1) with R=0
from (0, 1) move left --> into (0, 0) with R=0
from (0, 1) move right --> into (0, 1) with R=-1
from (1, 0) move up --> into (0, 0) with R=0
from (1, 0) move down --> into (1, 0) with R=-1
from (1, 0) move left --> into (1, 0) with R=-1
from (1, 0) move right --> into (1, 1) with R=0
from (1, 1) move up --> into (0, 0) with R=10
from (1, 1) move down --> into (0, 0) with R=10
from (1, 1) move left --> into (0, 0) with R=10
from (1, 1) move right --> into (0, 0) with R=10


### Define solver for the Bellman equation

In [4]:
class BellmanSolver:
    """Bellman equation solver"""
    def __init__(self, env: GridWorldEnv, gamma: float):
        """
        Initialize solver
        :param env: gridworld environment
        :param: gamma: discount factor
        """
        self.gamma = gamma
        self.env = env
    
    def bellman_eq_random_policy(self):
        """Solve bellman uquation under random policy, this is a linear system of equations"""
        size = self.env.dim ** 2  # linear system size
        coeff = 1/4  # probability under random policy
        matrix = np.zeros([size, size])
        known_term = np.zeros(size)

        # build matrix
        for idx in range(size):  # loop on equations
            current_position = self.env.vec_to_grid(idx)
            matrix[idx, idx] = 1

            for action in self.env.possible_actions:  # loop on four possible actions
                reward, next_position = self.env.step(current_position=current_position, action_name=action)
                known_term[idx] += coeff * reward
                idx_other = self.env.grid_to_vec(*next_position)
                matrix[idx, idx_other] -= coeff * self.gamma

        # solve linear system and push results to grid
        return np.linalg.solve(matrix, known_term)
    
    def bellman_optimal(self, v: np.array) -> np.array:
        """
        Nonlinear Bellman equation for the optimal value function
        :param v: current value function
        :return: L2 norm of current value of the residuals of the equation 
        """
        f = np.zeros(v.shape)
        for idx in range(f.size):
            current_position = self.env.vec_to_grid(idx)

            max_over_a = -np.Inf
            for action in self.env.possible_actions:  # loop on four possible actions
                reward, next_position = self.env.step(current_position=current_position, action_name=action)
                idx_next = self.env.grid_to_vec(*next_position)
                max_over_a = max(max_over_a, reward + self.gamma * v[idx_next])

            f[idx] = max_over_a - v[idx]

        return np.linalg.norm(f)
    
    def find_optimal_policy(self, optimal_val_fun: np.array) -> np.array:
        """
        Given an optimal value function, find the optimal policy
        :param optimal_val_fun: optimal value function
        :return: optimal policy
        """
        ans = {}
        for idx_pos in range(optimal_val_fun.size):
            pos = self.env.vec_to_grid(idx_pos)

            values, actions = [], []
            for action in self.env.possible_actions:
                r, next_pos = self.env.step(action_name=action, current_position=pos)
                idx = self.env.grid_to_vec(*next_pos)
                values.append(r + self.gamma * optimal_val_fun[idx])
                actions.append(action)
            values = np.array([np.around(x, 3) for x in values])

            ans[pos] = [actions[i] for i in np.where(values == values.max())[0]]

        return ans

### Value function under random policy
For a given policy (e.g. random policy) the Belmlman equation leads to the value function by solving a system of linear equations.

The cell below gives the value function for each state, under optimal policy.
Result should be the same as in [figure 3.2](http://incompleteideas.net/book/RLbook2018.pdf)

In [5]:
# initialze environment
env = GridWorldEnv(special_rewards={(0, 1): 10, (0, 3): 5}, special_moves={(0, 1): (4, 1), (0, 3): (2, 3)})

# initialze solver
solver = BellmanSolver(env=env, gamma=.9)

# solve Bellman equation under random policy
res = solver.bellman_eq_random_policy()

# plot result on grid
env.dict_to_grid(fun_on_grid={env.vec_to_grid(x): np.around(res[x], 1) for x in range(res.size)})

Unnamed: 0,col_0,col_1,col_2,col_3,col_4
0,3.3,8.8,4.4,5.3,1.5
1,1.5,3.0,2.3,1.9,0.5
2,0.1,0.7,0.7,0.4,-0.4
3,-1.0,-0.4,-0.4,-0.6,-1.2
4,-1.9,-1.3,-1.2,-1.4,-2.0


### Optimal value function
The optimal value function can be found by solving the Belmann optimality equation (eq. 3.19). The Bellman optimaly equation is nonlinear in $v(s)$. Here we solve it numerically as a minimization problem: find the minimum of the L2 norm of the residuals (the equation is verified when the norm goes to zero).

In [6]:
res_optim = minimize(solver.bellman_optimal, res, method='SLSQP', options={'maxiter': 1e4})
assert res_optim['success'], f'Optimization unsucessfull! {res_optim["message"]}'
print(f"Objective function value: {res_optim['fun']}")
optimal_val_fun = res_optim['x']

Objective function value: 8.44544193687595e-06


The result, shown below, should be the same as in figure 3.5-B.

In [7]:
# plot result on grid
env.dict_to_grid(fun_on_grid={env.vec_to_grid(x): np.around(optimal_val_fun[x], 1)
                              for x in range(optimal_val_fun.size)})

Unnamed: 0,col_0,col_1,col_2,col_3,col_4
0,22.0,24.4,22.0,19.4,17.5
1,19.8,22.0,19.8,17.8,16.0
2,17.8,19.8,17.8,16.0,14.4
3,16.0,17.8,16.0,14.4,13.0
4,14.4,16.0,14.4,13.0,11.7


### Optimal policy
Once the value function is known, the optimal policy is found as the policy that maximize the one step action "any policy that is greedy with respect to the optimal evaluation function v⇤ is an optimal policy". This results should be the same as figure 3.5-C.

In [8]:
env.dict_to_grid(solver.find_optimal_policy(optimal_val_fun))

Unnamed: 0,col_0,col_1,col_2,col_3,col_4
0,[right],"[up, down, left, right]",[left],"[up, down, left, right]",[left]
1,"[up, right]",[up],"[up, left]",[left],[left]
2,"[up, right]",[up],"[up, left]","[up, left]","[up, left]"
3,"[up, right]",[up],"[up, left]","[up, left]","[up, left]"
4,"[up, right]",[up],"[up, left]","[up, left]","[up, left]"
