In [58]:
import numpy as np

## environment class

In [59]:
class Grid:
    def __init__(self, rows, cols, start):
        self.rows = rows
        self.cols = cols
        self.i = start[0]
        self.j = start[1]

        
    def set(self, rewards, actions):
        # rewards should be a dict of: (i, j): r (row, col): reward
        # actions should be a dict of: (i, j): A (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions

        
    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]

        
    def current_state(self):
        return (self.i, self.j)

    
    def is_terminal(self, s):
        return s not in self.actions

    
    def move(self, action):
        # check if legal move first
        if action in self.actions[(self.i, self.j)]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        # return a reward (if any)
        return self.rewards.get((self.i, self.j), 0)

    
    def undo_move(self, action):
        # these are the opposite of what U/D/L/R should normally do
        if action == 'U':
            self.i += 1
        elif action == 'D':
            self.i -= 1
        elif action == 'R':
            self.j -= 1
        elif action == 'L':
            self.j += 1
        # raise an exception if we arrive somewhere we shouldn't be
        # should never happen
        assert(self.current_state() in self.all_states())

    
    def game_over(self):
        # returns true if game is over, else false
        # true if we are in a state where no actions are possible
        return (self.i, self.j) not in self.actions

    
    def all_states(self):
        # possibly buggy but simple way to get all states
        # either a position that has possible next actions
        # or a position that yields a reward
        return set(self.actions.keys()) | set(self.rewards.keys())


def standard_grid():
    # define a grid that describes the reward for arriving at each state
    # and possible actions at each state
    # the grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .  .  .  1
    # .  x  . -1
    # s  .  .  .
    g = Grid(3, 4, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
        (0, 0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('L', 'D', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('L', 'R', 'U'),
        (2, 3): ('L', 'U'),
    }
    g.set(rewards, actions)
    return g


def negative_grid(step_cost=-0.1):
    # in this game we want to try to minimize the number of moves
    # so we will penalize every move
    g = standard_grid()
    g.rewards.update({
        (0, 0): step_cost,
        (0, 1): step_cost,
        (0, 2): step_cost,
        (1, 0): step_cost,
        (1, 2): step_cost,
        (2, 0): step_cost,
        (2, 1): step_cost,
        (2, 2): step_cost,
        (2, 3): step_cost,
    })
    return g

In [60]:
def print_value(V, g):
    for i in range(g.rows):
        print('----------------------------')
        for j in range(g.cols):
            v = V.get((i,j), 0)
            if v>=0:
                print('| %.2f '%v, end='')
            else:
                print('|%.2f '%v, end='') # negative sign takes up an extra space
        print('|')
    print('----------------------------')
    

def print_policy(P, g):
    for i in range(g.rows):
        print('-----------------')
        for j in range(g.cols):
            a = P.get((i,j), ' ')
            print('| %s '%a, end='')
        print('|')
    print('-----------------')

## Monte Carlo (MC) method
advantage of this method is that we do not require to go through each and every state of game but the states we experience while playing

## fixed policy - deterministic action

In [61]:

# let's see how V(s) changes as we get further away from the reward
gamma = 0.9 # discount factor

def play_game(grid, policy):
    # returns a list of states and corresponding returns
    # reset game to start at a random position
    # we need to do this, because given our current deterministic policy
    # we would never end up at certain states, but we still want to measure their value
    
    start_states = list(grid.actions.keys())
    start_idx = np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])

    s = grid.current_state()
    states_rewards = [(s, 0)]  # list of tuples (state, reward)
    while not grid.game_over():
        a = policy[s]
        r = grid.move(a)
        s = grid.current_state()
        states_rewards.append((s, r))
    # calculate the returns by working backwards from the terminal state
    G=0
    states_returns = []
    first=True
    for s, r in reversed(states_rewards):
        # the value of the terminal state is 0 by definition
        # we should ignore the first state we encounter
        # and ignore the last G, which is meaningless since it doesn't correspond to any move
        if first:
            first = False
        else:
            states_returns.append((s, G))
        G = r + gamma*G
    states_returns.reverse() # we want it to be in order of state visited
    return states_returns

In [63]:
# use the standard grid again (0 for every step) so that we can compare
# to iterative policy evaluation
grid = standard_grid()

# print rewards
print("rewards:")
print_value(grid.rewards, grid)

states = grid.all_states()

# state -> action
policy = {
    (2, 0): 'U',
    (1, 0): 'U',
    (0, 0): 'R',
    (0, 1): 'R',
    (0, 2): 'R',
    (1, 2): 'R',
    (2, 1): 'R',
    (2, 2): 'R',
    (2, 3): 'U'
}

# initialize V(s) = 0, and returns where G values from different episodes will be saved for different states
V = {}
returns = {}
for s in states:
    V[s] = 0
    returns[s] = []    
    
# play game for n episodes
for t in range(100):
    # generate an episode using pi
    states_returns = play_game(grid, policy)
    seen_states = set()
    for s, G in states_returns:
        # check if we have already seen s
        # called "first-visit" MC policy evaluation
        # in this if a state repeats again in a particular episode then we consider the value of only first visit
        if s not in seen_states:
            returns[s].append(G)
            V[s] = np.mean(returns[s])
            seen_states.add(s)
            
print('policy:')
print_policy(policy, grid)

print("values:")
print_value(V, grid)
            

rewards:
----------------------------
| 0.00 | 0.00 | 0.00 | 1.00 |
----------------------------
| 0.00 | 0.00 | 0.00 |-1.00 |
----------------------------
| 0.00 | 0.00 | 0.00 | 0.00 |
----------------------------
policy:
-----------------
| R | R | R |   |
-----------------
| U |   | R |   |
-----------------
| U | R | R | U |
-----------------
values:
----------------------------
| 0.81 | 0.90 | 1.00 | 0.00 |
----------------------------
| 0.73 | 0.00 |-1.00 | 0.00 |
----------------------------
| 0.66 |-0.81 |-0.90 |-1.00 |
----------------------------


when i play the game for say 100 times then it covers almost all the states and we get the same reward table as we were getting for markov decision process

## windy environment or non deterministic action
notice that we are not optimizing policy here just general a new set of values which will be a little more negative compared to previous approach

In [34]:
# threshold for convergence
SMALL_ENOUGH = 10e-4

# let's see how V(s) changes as we get further away from the reward
gamma = 0.9 # discount factor

# set different actions possible for a particular state
all_possible_actions = ['U', 'D', 'R', 'L']

def random_action(a):
    p = np.random.random()
    if p<0.5:
        return a
    else:
        return np.random.choice([action for action in all_possible_actions if action!=a])

def play_game(grid, policy):
    # returns a list of states and corresponding returns
    # reset game to start at a random position
    # we need to do this, because given our current deterministic policy
    # we would never end up at certain states, but we still want to measure their value
    
    start_states = list(grid.actions.keys())
    start_idx = np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])

    s = grid.current_state()
    states_rewards = [(s, 0)]  # list of tuples (state, reward)
    while not grid.game_over():
        a = policy[s]
        a = random_action(a)
        r = grid.move(a)
        s = grid.current_state() 
        states_rewards.append((s, r))
    # calculate the returns by working backwards from the terminal state
    G=0
    states_returns = []
    first=True
    for s, r in reversed(states_rewards):
        # the value of the terminal state is 0 by definition
        # we should ignore the first state we encounter
        # and ignore the last G, which is meaningless since it doesn't correspond to any move
        if first:
            first = False
        else:
            states_returns.append((s, G))
        G = r + gamma*G
    states_returns.reverse() # we want it to be in order of state visited
    return states_returns

In [39]:
# use the standard grid again (0 for every step) so that we can compare
# to iterative policy evaluation
grid = standard_grid()

# set different actions possible for a particular state
all_possible_actions = ['U', 'D', 'R', 'L']

# print rewards
print("rewards:")
print_value(grid.rewards, grid)

states = grid.all_states()

# defining a initial policy
policy = {
    (2, 0): 'U',
    (1, 0): 'U',
    (0, 0): 'R',
    (0, 1): 'R',
    (0, 2): 'R',
    (1, 2): 'U',
    (2, 1): 'L',
    (2, 2): 'U',
    (2, 3): 'L',
  }


# initialize V(s) = 0, and returns where G values from different episodes will be saved for different states
V = {}
returns = {}
for s in states:
    V[s] = 0
    returns[s] = []    
    
# play game for n episodes
for t in range(5000):
    # generate an episode using pi
    states_returns = play_game(grid, policy)
    seen_states = set()
    for s, G in states_returns:
        # check if we have already seen s
        # called "first-visit" MC policy evaluation
        # in this if a state repeats again in a particular episode then we consider the value of only first visit
        if s not in seen_states:
            returns[s].append(G)
            V[s] = np.mean(returns[s])
            seen_states.add(s)
            
print('policy:')
print_policy(policy, grid)

print("values:")
print_value(V, grid)

# state -> action
# found by policy_iteration_random on standard_grid
# MC method won't get exactly this, but should be close
# values:
# ---------------------------
#  0.43|  0.56|  0.72|  0.00|
# ---------------------------
#  0.33|  0.00|  0.21|  0.00|
# ---------------------------
#  0.25|  0.18|  0.11| -0.17|
# policy:
# ---------------------------
#   R  |   R  |   R  |      |
# ---------------------------
#   U  |      |   U  |      |
# ---------------------------
#   U  |   L  |   U  |   L  |

rewards:
----------------------------
| 0.00 | 0.00 | 0.00 | 1.00 |
----------------------------
| 0.00 | 0.00 | 0.00 |-1.00 |
----------------------------
| 0.00 | 0.00 | 0.00 | 0.00 |
----------------------------
policy:
-----------------
| R | R | R |   |
-----------------
| U |   | U |   |
-----------------
| U | L | U | L |
-----------------
values:
----------------------------
| 0.43 | 0.55 | 0.73 | 0.00 |
----------------------------
| 0.33 | 0.00 | 0.23 | 0.00 |
----------------------------
| 0.25 | 0.18 | 0.12 |-0.15 |
----------------------------


## mc control method i.e. optimizing policy
here to optimize policy we use Q values instead of V values bcos Q values consider not just state but action as well

In [38]:
def play_game(grid, policy):
    # returns a list of states and corresponding returns
    # reset game to start at a random position
    # we need to do this, because given our current deterministic policy
    # we would never end up at certain states, but we still want to measure their value
    # this is called the "exploring starts" method
    
    start_states = list(grid.actions.keys())
    start_idx = np.random.choice(len(start_states))
    grid.set_state(start_states[start_idx])

    s = grid.current_state()
    a = np.random.choice(all_possible_actions)  # first action is uniformly random
    
    # be aware of the timing
    # each triple is s(t), a(t), r(t)
    # but r(t) results from taking action a(t-1) from s(t-1) and landing in s(t)
    states_actions_rewards = [(s, a, 0)]  # list of tuples (state, action, reward)
    
    # hack so that we do not end up in a very long episode when we have a random policy
    seen_states = set()
    seen_states.add(grid.current_state())
    while True:
        old_s = grid.current_state()
        r = grid.move(a)
        s = grid.current_state()
        if s in seen_states:
            states_actions_rewards.append((s, None, -10))
            break
        elif grid.game_over():
            states_actions_rewards.append((s, None, r))
            break
        else:
            a = policy[s]
            states_actions_rewards.append((s, a, r))
        seen_states.add(s)
        
    # calculate the returns by working backwards from the terminal state
    G=0
    state_action_returns = []
    first=True
    for s, a, r in reversed(states_actions_rewards):
        # the value of the terminal state is 0 by definition
        # we should ignore the first state we encounter
        # and ignore the last G, which is meaningless since it doesn't correspond to any move
        if first:
            first = False
        else:
            state_action_returns.append((s, a, G))
        G = r + gamma*G
    state_action_returns.reverse() # we want it to be in order of state visited
    return state_action_returns

def max_dict(d):
    # returns the argmax (key) and max (value) from a dictionary
    max_key = None
    max_value = float('-inf')
    for key, value in d.items():
        if value>max_value:
            max_key = key
            max_value = value
    return max_key, max_value

In [40]:
# let's see how V(s) changes as we get further away from the reward
gamma = 0.9 # discount factor

# set different actions possible for a particular state
all_possible_actions = ['U', 'D', 'R', 'L']

# use the standard grid again (0 for every step) so that we can compare
# to iterative policy evaluation
grid = standard_grid()

states = grid.all_states()

# print rewards
print("rewards:")
print_value(grid.rewards, grid)

# random initialize policy
policy = {}
for s in grid.actions.keys():
    policy[s] = np.random.choice(all_possible_actions)
    
# initialize Q(s, a) & returns
# create a dict Q with different states then embedded dictionary with different actions
Q = {}
returns = {} # dictionary of state -> list of returns we've received
for s in states:
    if s in grid.actions: # not a terminal state
        Q[s] = {}
        for a in all_possible_actions:
            Q[s][a] = 0 # needs to be initialized to something so we can argmax it
            returns[(s, a)] = []

# repeat until convergence
# play game for n episodes
for t in range(2000):
    # generate an episode using pi
    states_actions_returns = play_game(grid, policy)
    seen_states_actions = set()
    for s, a, G in states_actions_returns:
        # check if we have already seen s
        # called "first-visit" MC policy evaluation
        # in this if a state repeats again in a particular episode then we consider the value of only first visit
        sa = (s, a)
        if sa not in seen_states_actions:
            returns[sa].append(G)
            Q[s][a] = np.mean(returns[sa])
            seen_states_actions.add(sa)

    # update policy
    for s in policy.keys():
        policy[s] = max_dict(Q[s])[0]

print('policy:')
print_policy(policy, grid)

# find V
V = {}
for s, Qs in Q.items():
    V[s] = max_dict(Q[s])[1]

print("final values:")
print_value(V, grid)

rewards:
----------------------------
| 0.00 | 0.00 | 0.00 | 10.00 |
----------------------------
| 0.00 | 0.00 | 0.00 |-10.00 |
----------------------------
| 0.00 | 0.00 | 0.00 | 0.00 |
----------------------------
policy:
-----------------
| R | R | R |   |
-----------------
| U |   | U |   |
-----------------
| R | R | U | L |
-----------------
final values:
----------------------------
| 2.50 | 4.72 | 10.00 | 0.00 |
----------------------------
| 2.37 | 0.00 | 6.15 | 0.00 |
----------------------------
| 2.25 | 3.53 | 5.56 |-1.52 |
----------------------------


## mc control method with windy environment i.e. optimizing policy without exploring start position and random action

In [41]:
def random_action(a, eps=0.1):
    # choose given a with probability 1 - eps + eps/4
    # choose some other a' != a with probability eps/4
    p = np.random.random()
    
    # if p < (1 - eps + eps/len(ALL_POSSIBLE_ACTIONS)):
    #   return a
    # else:
    #   tmp = list(ALL_POSSIBLE_ACTIONS)
    #   tmp.remove(a)
    #   return np.random.choice(tmp)
    #
    # this is equivalent to the above
    if p<(1-eps):
        return a
    else:
        return np.random.choice(all_possible_actions)

def play_game(grid, policy):
    # returns a list of states and corresponding returns
    # in this version we will NOT use "exploring starts" method
    # instead we will explore using an epsilon-soft policy

    s = (2, 0)
    grid.set_state(s)

    a = random_action(policy[s])
    
    # be aware of the timing
    # each triple is s(t), a(t), r(t)
    # but r(t) results from taking action a(t-1) from s(t-1) and landing in s(t)
    states_actions_rewards = [(s, a, 0)]  # list of tuples (state, action, reward)
    
    while True:
        r = grid.move(a)
        s = grid.current_state()
        if grid.game_over():
            states_actions_rewards.append((s, None, r))
            break
        else:
            a = random_action(policy[s]) # the next state is stochastic
            states_actions_rewards.append((s, a, r))
        
    # calculate the returns by working backwards from the terminal state
    G=0
    state_action_returns = []
    first=True
    for s, a, r in reversed(states_actions_rewards):
        # the value of the terminal state is 0 by definition
        # we should ignore the first state we encounter
        # and ignore the last G, which is meaningless since it doesn't correspond to any move
        if first:
            first = False
        else:
            state_action_returns.append((s, a, G))
        G = r + gamma*G
    state_action_returns.reverse() # we want it to be in order of state visited
    return state_action_returns

def max_dict(d):
    # returns the argmax (key) and max (value) from a dictionary
    max_key = None
    max_value = float('-inf')
    for key, value in d.items():
        if value>max_value:
            max_key = key
            max_value = value
    return max_key, max_value

In [56]:
# let's see how V(s) changes as we get further away from the reward
gamma = 0.9 # discount factor

# set different actions possible for a particular state
all_possible_actions = ['U', 'D', 'R', 'L']

# use the standard grid again (0 for every step) so that we can compare
# to iterative policy evaluation
grid = negative_grid(step_cost=-0.1)

states = grid.all_states()

# print rewards
print("rewards:")
print_value(grid.rewards, grid)

# random initialize policy
policy = {}
for s in grid.actions.keys():
    policy[s] = np.random.choice(all_possible_actions)
    
# initialize Q(s, a) & returns
# create a dict Q with different states then embedded dictionary with different actions
Q = {}
returns = {} # dictionary of state -> list of returns we've received
for s in states:
    if s in grid.actions: # not a terminal state
        Q[s] = {}
        for a in all_possible_actions:
            Q[s][a] = 0 # needs to be initialized to something so we can argmax it
            returns[(s, a)] = []

# repeat until convergence
# play game for n episodes
for t in range(5000):
    # generate an episode using pi
    states_actions_returns = play_game(grid, policy)
    seen_states_actions = set()
    for s, a, G in states_actions_returns:
        # check if we have already seen s
        # called "first-visit" MC policy evaluation
        # in this if a state repeats again in a particular episode then we consider the value of only first visit
        sa = (s, a)
        if sa not in seen_states_actions:
            returns[sa].append(G)
            Q[s][a] = np.mean(returns[sa])
            seen_states_actions.add(sa)

    # calculate new policy pi(s) = argmax[a]{ Q(s,a) }
    for s in policy.keys():
        policy[s] = max_dict(Q[s])[0]

print('policy:')
print_policy(policy, grid)

# find V
V = {}
for s in policy.keys():
    V[s] = max_dict(Q[s])[1]

print("final values:")
print_value(V, grid)

rewards:
----------------------------
|-0.10 |-0.10 |-0.10 | 1.00 |
----------------------------
|-0.10 | 0.00 |-0.10 |-1.00 |
----------------------------
|-0.10 |-0.10 |-0.10 |-0.10 |
----------------------------
policy:
-----------------
| R | R | R |   |
-----------------
| U |   | U |   |
-----------------
| U | R | U | L |
-----------------
final values:
----------------------------
| 0.58 | 0.77 | 1.00 | 0.00 |
----------------------------
| 0.41 | 0.00 | 0.78 | 0.00 |
----------------------------
| 0.25 | 0.33 | 0.53 | 0.46 |
----------------------------
