In [2]:
import random

In [3]:
#Let us have a gridworld
#ref: Chapter 17, Artificial Intelligence a Modern Approach
#ref: CS188 https://inst.eecs.berkeley.edu/~cs188/fa19/
#ref: https://inst.eecs.berkeley.edu/~cs188/fa19/assets/slides/lec8.pdf
#ref: https://courses.cs.washington.edu/courses/cse473/13au/slides/17-mdp-rl.pdf

#This class will create a 2D grid of row x colums 
#Some of the cells can be disabled by putting it into walls
#cells are addressed just like 2d arrays (r,c)
#There are possibly many terminal states
#terminal states have only one action available: Exit 
#Transistion is as per the book 80% action and 20%sideways ( a variable noise is used to control this distribution)
#There is a special end state, (-1,-1), from which NO action is available. This state is used as a final state.

#Actions #just some alias
Up    = 0
Down  = 1
Right = 2
Left  = 3
Exit  = 4

class GridWorld :
    #Default is as given in the AIMA book
    def __init__(self, 
                 rows    =3, 
                 columns =4, 
                 walls   =[(1,1)], terminals= {(0,3):+1.0, (1,3):-1.0}, 
                 gamma   =1.0, 
                 living_reward=0,
                 noise   =0.2
                ) :
        """We dont expect these parameters to change during the agent run"""
        self.rows      = rows
        self.columns   = columns
        self.N         = rows * columns #total cells
        self.walls     = walls
        self.terminals = terminals #dictionary of terminal celss and their rewards.
        self.gamma     = gamma
        self.living_reward = living_reward
        self.all_actions   = [ Up, Down, Right, Left, Exit ]
        self.end_state     = (-1, -1) #a dummy state to reach after taking Exit
        self.all_states    = [(r,c) for r in range(rows) for c in range(columns) if (r,c) not in walls ] + [self.end_state]
        self.noise         = noise
        
        
        #transitions from each state and the probabilities
        self.noise                = noise
        self.action_transitions   = { 
            Up:   ([Up,    Left, Right], [1-noise, noise/2, noise/2 ]),
            Down: ([Down,  Left, Right], [1-noise, noise/2, noise/2 ]),
            Left: ([Left,  Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Right:([Right, Up,   Down ], [1-noise, noise/2, noise/2 ]),
            Exit :([Exit], [1.0])
        }
    
    def actions(self, state) :
        """returns all valid actions from the current state"""
        if state in self.terminals :
            return [Exit]
        if state == self.end_state :
            return [] #No action available.
        return [ Up, Down, Right, Left ]
    
    def reward(self, state, action, next_state=None) :
        """reward is the instantaneous reward. It is usually R(s,a,s')"""
        #In grid world the reward depends only on state.
        if state in self.terminals :
            return self.terminals[state] #dict has the terminal values +1 or -1
        if state == self.end_state :
            return 0.0
        return self.living_reward        #usually a small -ve value
    
    def transitions(self, state, action) :
        """return a list of tuple(nextstate, action, probability)"""
        actual_actions, probs = self.action_transitions[action]
        return [ self._next_cell(state, a) for a in actual_actions ], actual_actions, probs
    
    def move(self, state, action) :
        """Take the action and return the tuple(new_state, reward, is_terminal)"""                          
        assert action in self.actions(state) #just a check if this is a valid action at this time or not
        
        cells, actions, p = self.transitions(state, action)
        
        #we choose one cell acccording to probabilities
        new_state   = random.choices(cells, weights=p)[0] #only one; we take index 0                
        reward      = self.reward(state, action) #
        
        is_terminal = False
        if new_state == self.end_state :
            is_terminal = True
            
        return new_state, reward, is_terminal #keep the same for mat as OpenAI gym.
    
    def _next_cell(self, state, action) : 
        """Blindly takes the action without checking anything and returns the position"""
        r,c = state #row & column
        if action == Exit :
            return self.end_state
        if action == Up :
            target = r-1, c  
        if action == Down :
            target = r+1, c
        if action == Right :
            target = r, c+1  
        if action == Left :
            target = r, c-1 
        
        if self._valid_cell(target) :
            return target
        return state #stay put the target is invalid.
    
    def _valid_cell(self, cell) :
        """Returns true if the cell is a valid cell"""
        r, c = cell #this may be an illegal node; we need to check
        
        #is it any of the walls?
        if (r,c) in self.walls :
            return False
        
        #is it outside the grid?
        if r < 0 or r >= self.rows or c < 0 or c >= self.columns :
            return False
        
        return True
    
    #pretty print the grid and agent if given.
    def print(self, agent_state=None) :
        for r in range(self.rows) :
            for c in range(self.columns) :
                cell = (r,c)
                if cell in self.walls :
                    print('# ', end='')
                elif cell in self.terminals :
                    if self.terminals[cell] > 0 :
                        print('+', end=' ')
                    else :
                        print('-', end=' ')
                elif cell == agent_state :
                    print('@ ', end='')
                else :
                    print('. ', end='')
            print("")


In [9]:
#This is a simple class to hold the policy dictionary
#useful for printing the policy and hiding some details.

class Policy :
    def __init__(self, grid_world=None) :
        """Holds one policy and returns actions according to it"""
        self.grid_world = grid_world
        self.policy     = { } #{ state: policy_action}
        
    def __getitem__(self, state) :
        return self.policy[state]
    
    def __setitem__(self, state, action) :
        self.policy[state] = action
    
    
    #Just a pretty print function for easy debugging
    def print(self) :
        print_chars = {Up:'^', Down:'v', Right:'>', Left:'<', Exit:'+'}
        for state in [(r,c) for r in range(self.grid_world.rows) for c in range(self.grid_world.columns)]:
            
            if state in self.grid_world.terminals :
                if self.grid_world.terminals[state] >= 0 :
                    print('+', end=' ') #positive reward terminal
                else :
                    print('-', end=' ') #-ve reward terminal
                    
            elif state not in self.policy :
                print('#', end=' ') #walls
            else :
                print(print_chars[self.policy[state]], end=' ') #directions >, <, ^, v
                
            if (state[1]+1) % self.grid_world.columns == 0 :
                print("") #just a newline

In [24]:
gw = GridWorld(gamma=0.01, living_reward=-0.04)
start = (2,0) #as in the book

In [25]:
value_dict = {state: 0 for state in gw.all_states}
def expected_value(gw, s, a, value_dict):
    exp_value = 0
    states, actions, probs = gw.transitions(s, a)
    for i, state in enumerate(states):
        exp_value += probs[i] * (gw.reward(s, a) + value_dict[state])
    return exp_value
expected_value(gw, (0,3), Exit, value_dict)

1.0

In [26]:
def value_iteration(gw):
    '''returns the dictionary containing the values for each state'''
    value_dict = {state: 0 for state in gw.all_states}
    N = 0
    while N < 100:
        for state in gw.all_states:
            if state != (-1,-1):
                list_values = []
                for action in gw.actions(state):
                    list_values.append(expected_value(gw, state, action, value_dict))
                value_dict[state] = max(list_values)
        N += 1
    return value_dict

In [27]:
V = value_iteration(gw)
V

{(0, 0): 0.8115582191780822,
 (0, 1): 0.8678082191780823,
 (0, 2): 0.9178082191780822,
 (0, 3): 1.0,
 (1, 0): 0.7615582191780823,
 (1, 2): 0.6602739726027398,
 (1, 3): -1.0,
 (2, 0): 0.7053082191780823,
 (2, 1): 0.6553082191780822,
 (2, 2): 0.6114155251141553,
 (2, 3): 0.38792491121258255,
 (-1, -1): 0}

In [55]:
#policy extraction
def policy_from_values(gw, V):
    actions = Policy(gw)
    for state in gw.all_states:
        if state != (-1,-1):
            list_values = []
            for action in gw.actions(state):
                list_values.append((expected_value(gw, state, action, V), action))
            max_value, best_action = max(list_values)
        actions[state] = best_action
    return actions
a = policy_from_values(gw, V)

In [39]:
#policy extraction with numpy
import numpy as np
def policy_from_values(gw, V):
    actions = Policy(gw)
    for state in gw.all_states:
        if state != (-1,-1):
            list_values = []
            for action in gw.actions(state):
                list_values.append((expected_value(gw, state, action, V)))
        actions[state] = np.argmax(list_values)
    return actions
a = policy_from_values(gw, V)
#Note: to avoid relooping through all the states, implement q-value iteration (q-value = exp_value(s, a, V(s')))
#Very similar to value iteration

In [15]:
a.print()

> > > + 
^ # > - 
> > > ^ 


In [111]:
start_state = (2, 0)
rewards = 0
end = False
while end == 0:
    next_state, reward, end = gw.move(start_state, a[start_state])
    rewards += reward
    start_state = next_state
    print(next_state)
print(rewards)

(1, 0)
(0, 0)
(0, 1)
(0, 2)
(0, 3)
(-1, -1)
-9.0


In [66]:
#Policy iteration: more efficient since policy can be figured much before the values converge
#1. Policy evaluation: calculate utilities for some fixed policy until convergence.
#2. Policy improvement: update policy using one-step look-ahead with resulting converged (but not optimal) 
#utilities as future values
#Repeat 1 and 2 until convergence
#policy_value(s) = expected total discounted rewards starting in s and following that policy
actions = Policy(gw)
for state in gw.all_states:
    if state != (-1,-1):
        actions[state] = gw.actions(state)[0]
value_dict = {state: 0 for state in gw.all_states}
def values_from_policy(gw, actions, value_dict):
    list_values = []
    for state in gw.all_states:
        if state != (-1,-1):
            value_dict[state] = (expected_value(gw, state, actions[state], value_dict))
    return value_dict
values_from_policy(gw, actions, value_dict)

{(0, 0): -0.04000000000000001,
 (0, 1): -0.044,
 (0, 2): -0.044399999999999995,
 (0, 3): 1.0,
 (1, 0): -0.07200000000000002,
 (1, 2): -0.07552000000000002,
 (1, 3): -1.0,
 (2, 0): -0.09760000000000002,
 (2, 1): -0.04976,
 (2, 2): -0.10539200000000001,
 (2, 3): -0.8505392,
 (-1, -1): 0}

In [85]:
gw = GridWorld(gamma=0.01, living_reward=-2)
start = (2,0) #as in the book

In [86]:
def policy_iteration(gw):
    value_dict = {state: 0 for state in gw.all_states}
    actions = Policy(gw)
    for state in gw.all_states:
        if state != (-1,-1):
            actions[state] = gw.actions(state)[0]
    N = 0
    while N < 10:
        values = values_from_policy(gw, actions, value_dict)
        actions = policy_from_values(gw, values)
        N += 1
    return actions, values

In [87]:
c, values = policy_iteration(gw)
c.print()

> > > + 
^ # > - 
> > > ^ 


In [88]:
values

{(0, 0): -7.042965366862879,
 (0, 1): -4.230087445001626,
 (0, 2): -1.7300525832514788,
 (0, 3): 1.0,
 (1, 0): -9.543245731548135,
 (1, 2): -3.5704528159278244,
 (1, 3): -1.0,
 (2, 0): -10.816037602817046,
 (2, 1): -8.474564126466234,
 (2, 2): -5.974447911839857,
 (2, 3): -3.7749391752645356,
 (-1, -1): 0}