In [230]:
import numpy as np

np.set_printoptions(precision=2)

## Grid World and Grid Node

Project done for fun, this implementation of gridworld was to better understand the algorithms and experiment with various permutations of hyperparameters


In [231]:
"""Class GridNode
Primary implementation for Gridnode
params:
    @reward: specifies reward for transitioning away from a given gridnode 
Returns:
    GridNode
"""


class GridNode():

    def __init__(self, reward) -> None:
        self.reward = reward
        self.v = 0
        self.actions = {
            'up': {
                'movement': (0, -1),
                'p': .25,
                'r': 0
            },
            'left': {
                'movement': (-1, 0),
                'p': .25,
                'r': 0
            },
            'right': {
                'movement': (1, 0),
                'p': .25,
                'r': 0
            },
            'down': {
                'movement': (0, 1),
                'p': .25,
                'r': 0
            }
        }

    """
    Changes value of action dict
    """

    def change_action(self, action, key, change):
        self.actions[action][key] = change

    """
    Returns movement to in order to transition to a given node given an action as well as reward for leaving node
    """

    def transition(self, action):
        coords = self.actions[action]['movement']
        r = self.actions[action]['r']
        p = self.actions[action]['p']
        return coords, r, p


class GridWorld():

    def __init__(self, rows, columns, startX, startY, fall) -> None:
        """initializer

        Args:
            rows (int): num rows in grid
            columns (int): num cols in grid
            startX (int): starting posx
            startY (int): starting posy
            fall (int): reward for falling
        """
        nodes = [GridNode(reward=0) for x in range(rows * columns)]
        self.lenNodes = rows * columns
        self.grid = np.array(nodes, dtype=GridNode).reshape(rows, columns)
        self.cur = self.grid[startX, startY]
        for i in range(rows):
            #first row
            self.grid[0, i].change_action(action='up',
                                          key='movement',
                                          change=(0, 0))
            self.grid[0, i].change_action(action='up', key='r', change=fall)
            #first columns
            self.grid[i, 0].change_action(action='left',
                                          key='movement',
                                          change=(0, 0))
            self.grid[i, 0].change_action(action='left', key='r', change=fall)
            #last columns
            self.grid[i, -1].change_action(action='right',
                                           key='movement',
                                           change=(0, 0))
            self.grid[i, -1].change_action(action='right',
                                           key='r',
                                           change=fall)
            #last row
            self.grid[-1, i].change_action(action='down',
                                           key='movement',
                                           change=(0, 0))
            self.grid[-1, i].change_action(action='down', key='r', change=fall)

    def set_special(self, row, column, reward, actions, targetcol, targetrow):
        """sets special node in the grid

        Args:
            row (int): grid row 
            column (int): grid col
            reward (int): new reward 
            actions (str): what action trigers reward 
            targetcol (int): new transition col 
            targetrow (int ): new transition row
        """
        transCoords = (targetcol - column, targetrow - row)

        for action in actions:
            self.grid[row, column].change_action(action=action,
                                                 key='r',
                                                 change=reward)
            self.grid[row, column].change_action(action=action,
                                                 key='movement',
                                                 change=transCoords)

    def peek(self, state, action):
        """ peeks transition, returns rewards, probability and transition node given a the transition 
        Args:
            state (GridNode): cur grid node 
            action (str): action to take


        """
        row, col = np.where(self.grid == state)
        (nx, ny), r, p = state.transition(action)
        newCords = (row + ny, col + nx)

        return r, newCords, p


In [232]:
class Policy():

    def __init__(self, actions) -> None:
        self.actions = actions

    def pick(self, a):
        pass

    def pa(self, a):
        pass


class RandomPolicy(Policy):
    """
    Random policy
    """

    def __init__(self, actions) -> None:
        super().__init__(actions)

    def pick(self):
        """
        randomly pick from any action
        """
        return np.random.choice(self.actions)

    def pa(self):
        """
        returns prob of picking any action

        """
        return 1 / len(self.actions)


In [233]:
def rand_argmax(ar):
    """randomly breaks ties of equal valued indexes 

    Args:
        ar (np.array): array to argmax
    """
    m = np.where(ar == max(ar))[0]
    return np.random.choice(m)


class DP():

    def __init__(self, grid: GridWorld, policy: Policy, gamma) -> None:
        self.grid = grid
        self.v = np.zeros(grid.lenNodes)
        self.policy = policy
        self.gamma = gamma
        self.actions = policy.actions

    def solve():
        pass


class Solver(DP):

    def __init__(self, grid: GridWorld, policy: Policy, gamma) -> None:
        super().__init__(grid, policy, gamma)

    def policyEvaluation(self, epsilon):
        """follows the policy evaluation algorithm found in page 90 of Sutto & Barto
 
        Args:
            epsilon (float): error threshold epsilon


        """
        vs = np.zeros_like(self.grid.grid)
        k = 0
        while 1:
            delta = 0
            for col in range(np.size(self.grid.grid, 1)):
                for row in range(np.size(self.grid.grid, 0)):
                    v = vs[row, col]
                    vs_t = 0
                    for action in self.policy.actions:
                        state = self.grid.grid[row, col]
                        r, coords, pa = self.grid.peek(state, action)
                        vs_t += pa * (r + self.gamma * vs[coords][0])
                    vs[row, col] = vs_t
                    dif = v - vs[row, col]
                    delta = max(delta, abs(dif))
            k += 1
            if epsilon > delta:
                break

        return vs

    def policyImprovement(self, vs, epsilon):
        """follows the policy improvement algorithm found in page 92 of Sutto & Barto
 
        Args:
            epsilon (float): error threshold epsilon
            vs (np.array): value function for a given grid


        """
        same = False
        old_p = np.array([{
            "v": np.NINF,
            "action": None
        } for x in range(len(vs.flatten()))]).reshape(vs.shape)
        m = 0
        while not same:
            best = np.array([{
                "v": np.NINF,
                "action": None
            } for x in range(len(vs.flatten()))]).reshape(vs.shape)

            for col in range(np.size(vs, 1)):
                for row in range(np.size(vs, 0)):
                    for action in self.policy.actions:
                        state: GridNode = self.grid.grid[row, col]
                        r, coords, pa = self.grid.peek(state, action)
                        vstar = (r + self.gamma * vs[coords][0])
                        if best[row, col]['v'] < vstar:
                            best[row, col]['v'] = vstar
                            best[row, col]['action'] = action
                            state.change_action(action, 'p', 1)

                    [
                        state.change_action(a, 'p', 0)
                        for a in self.policy.actions
                        if a != best[row, col]['action']
                    ]

            if (old_p == best).all() or m == 20:
                break
            else:
                old_p = best.copy()
                vs = self.policyEvaluation(epsilon)
        return best, vs

    def ValueIteration(self, epsilon):
        """follows the policy evaluation algorithm found in page 97 of Sutto & Barto
 
        Args:
            epsilon (float): error threshold epsilon

        """
        vs = np.zeros_like(self.grid.grid)
        delta = np.infty
        best = np.array([{
            "v": np.NINF,
            "action": None
        } for x in range(len(vs.flatten()))]).reshape(vs.shape)
        while delta > epsilon:
            delta = 0
            for col in range(np.size(vs, 1)):
                for row in range(np.size(vs, 0)):
                    state: GridNode = self.grid.grid[row, col]
                    action_v = np.zeros_like(self.actions, dtype=np.float64)
                    for i, action in enumerate(self.actions):
                        r, coords, pa = self.grid.peek(state, action)
                        v = vs[row, col]

                        action_v[i] = (r + self.gamma * vs[coords][0])
                    maxInd = rand_argmax(np.array(action_v))
                    best[row][col]['v'] = action_v[maxInd]
                    best[row][col]['action'] = self.actions[maxInd]
                    vs[row][col] = action_v[maxInd]
                    dif = abs(v - vs[row][col])
                    delta = max(delta, dif)
        return best, vs


In [234]:
grid = GridWorld(5, 5, 1, 1, -1)
policy = RandomPolicy(['up', 'down', 'left', 'right'])
grid.set_special(row=0,
                 column=1,
                 reward=10,
                 actions=policy.actions,
                 targetcol=1,
                 targetrow=4)
grid.set_special(row=0,
                 column=3,
                 reward=5,
                 actions=policy.actions,
                 targetcol=3,
                 targetrow=2)
dp = Solver(grid, policy, .9)
f = dp.policyEvaluation(.0001)
pol, vs = dp.policyImprovement(f, .00001)


Testing policy evaluation and policy improvement algorithms

In [235]:
np.array([x.get('action') for x in pol.flatten()]).reshape(5, 5)


array([['right', 'up', 'left', 'up', 'left'],
       ['right', 'up', 'up', 'left', 'left'],
       ['right', 'up', 'up', 'up', 'up'],
       ['right', 'up', 'up', 'up', 'up'],
       ['right', 'up', 'up', 'up', 'up']], dtype='<U5')

In [236]:
np.array([round(x, 2) for x in f.flatten()]).reshape(5, 5)


array([[ 3.31,  8.79,  4.43,  5.32,  1.49],
       [ 1.52,  2.99,  2.25,  1.91,  0.55],
       [ 0.05,  0.74,  0.67,  0.36, -0.4 ],
       [-0.97, -0.44, -0.35, -0.59, -1.18],
       [-1.86, -1.34, -1.23, -1.42, -1.97]])

In [237]:
np.array([round(x, 4) for x in vs.flatten()]).reshape(5, 5)


array([[21.98, 24.42, 21.98, 19.42, 17.48],
       [19.78, 21.98, 19.78, 17.8 , 16.02],
       [17.8 , 19.78, 17.8 , 16.02, 14.42],
       [16.02, 17.8 , 16.02, 14.42, 12.98],
       [14.42, 16.02, 14.42, 12.98, 11.68]])

Testing simply doing value iteration

In [238]:
pol, vs = dp.ValueIteration(.0001)


In [239]:
np.array([x.get('action') for x in pol.flatten()]).reshape(5, 5)


array([['right', 'right', 'left', 'right', 'left'],
       ['up', 'up', 'left', 'left', 'left'],
       ['right', 'up', 'left', 'left', 'left'],
       ['right', 'up', 'left', 'up', 'left'],
       ['right', 'up', 'left', 'up', 'left']], dtype='<U5')

In [240]:
np.array([round(x, 4) for x in f.flatten()]).reshape(5, 5)


array([[ 3.31,  8.79,  4.43,  5.32,  1.49],
       [ 1.52,  2.99,  2.25,  1.91,  0.55],
       [ 0.05,  0.74,  0.67,  0.36, -0.4 ],
       [-0.97, -0.44, -0.35, -0.59, -1.18],
       [-1.86, -1.34, -1.23, -1.42, -1.97]])

In [241]:
np.array([round(x, 4) for x in vs.flatten()]).reshape(5, 5)


array([[21.98, 24.42, 21.98, 19.42, 17.48],
       [19.78, 21.98, 19.78, 17.8 , 16.02],
       [17.8 , 19.78, 17.8 , 16.02, 14.42],
       [16.02, 17.8 , 16.02, 14.42, 12.98],
       [14.42, 16.02, 14.42, 12.98, 11.68]])

Seems like they converge to the same value, but we achieve to do so with much less total operations with value iteration