In [None]:
import numpy as np

In [88]:
class Grid:    # Environment
    def __init__(self, width, height, start):
        self.width = width
        self.height = height
        self.i = start[0]
        self.j = start[1]
    
    def set(self, rewards, actions):
        # rewards should be a dictionary of: (i, j): r, or: (row, col): reward
        # actions should be a dictionary of: (i, j): A or (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions
        
    def set_state(self, s):
        # The state s is the location of the player in the grid: s = (i, j)
        self.i = s[0]
        self.j = s[1]
        
    def current_state(self):
        return (self.i, self.j)
    
    def is_terminal(self, s):
        # A terminal state won't be in the actions dictionary (since it won't have any associated action)
        return s not in self.actions
    
    def move(self, action):
        # check if legal move first
        # Possible actions: U/D/L/R
        if action in self.actions[self.current_state()]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        # return reward (if any)
        return self.rewards.get(self.current_state(), 0)
    
    def undo_move(self, action):
        if action == 'U':
            self.i += 1
        elif action == 'D':
            self.i -= 1
        elif action == 'R':
            self.j -= 1
        elif action == 'L':
            self.j += 1
        # raise an exception if we arrive somewhere we shouldn't be
        # should never happen
        assert(self.current_state() in self.all_states())
        
    def game_over(self):
        # The game is over if we are in a state where no action is possible
        return self.current_state() not in self.actions
    
    def all_states(self):
        # Cast to a set to avoid repetition in states
        return set(list(self.rewards.keys()) + list(self.actions.keys()))
    
    def draw_grid(self):
        states = self.all_states()
        for i in range(self.height):
            for j in range(self.width):
                s = (i, j)
                symbol = ''
                if s in states:
                    if self.current_state() == s:
                        symbol = 's'
                    else:
                        symbol = '.'
                else:
                    symbol = 'x'
                print(symbol, end = '')
                if j != self.width - 1:
                    print('   ', end = '')
            print('\n')

In [89]:
def standard_negative_grid():
    # Define a grid that describes the reward for arriving at each state
    #     and possible actions at each state
    # The grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .   .   .   1
    # .   x   .  -1
    # s   .   .   .
    g = Grid(4, 3, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
        (0,0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('D', 'L', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }
    g.set(rewards, actions)
    for s in g.all_states():
        if not g.is_terminal(s):
            g.rewards[s] = -0.1
    return g

In [90]:
SMALL_ENOUGH = 10e-4    # threshold for convergence
GAMMA = 0.9    # discount factor

In [91]:
# V is the value function dictionary, and g is the grid (environment)
def print_values(V, g):
    for i in range(g.height):
        print('-------------------------')
        print('|', end = "")
        for j in range(g.width):
            v = V.get((i, j), 0)
            if v >= 0:
                print(" %.2f|" % v, end = "")
            else:
                print("%.2f|" % v, end = "")    # negative sign takes up an extra space
        print()
    print('-------------------------')

In [92]:
# P is the policy dictionary (Mapping each state to the action to take)
def print_policy(P, g):
    for i in range(g.height):
        print('-------------------------')
        print('|', end = '')
        for j in range(g.width):
            a = P.get((i, j), ' ')
            print('  %s  |' % a, end = '')
        print()
    print('-------------------------')

In [108]:
def standard_grid():
    # Define a grid that describes the reward for arriving at each state
    #     and possible actions at each state
    # The grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .   .   .   1
    # .   x   .  -1
    # s   .   .   .
    g = Grid(4, 3, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
        (0,0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('D', 'L', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }
    g.set(rewards, actions)
    for s in g.all_states():
        if not g.is_terminal(s):
            g.rewards[s] = 0
    return g

In [255]:
def play_game(grid, policy, start = (None, None)):
    unecessary_action = False
    states_actions_rewards = []
    traversed_states = []
    if start[0] is None:
        # Reset game to start at random position
        start_states = list(grid.actions.keys())
        start_idx = np.random.choice(len(start_states))
        grid.set_state(start_states[start_idx])
    else:
        grid.set_state(start[0])
    
    s = grid.current_state()
    traversed_states.append(s)
    #grid.draw_grid()
    if start[1] is not None:
        a = start[1]
        old_state = s
        #print('action:', a)
        grid.move(a)
        s = grid.current_state()
        if s in traversed_states:
            states_actions_rewards.append((old_state, a, -100))
            unecessary_action = True
        else:
            # Save state and reward obtained
            states_actions_rewards.append((old_state, a, grid.rewards[s]))
    
    
    s = grid.current_state()    # starting position
    while not unecessary_action and not grid.game_over():
        traversed_states.append(s)
        # Get action to take from policy, then perform action
        a = policy[s]
        #print('action:', a)
        old_state = s
        grid.move(a)
        s = grid.current_state()
        if s in traversed_states:
            states_actions_rewards.append((old_state, a, -100))
            unecessary_action = True
        else:
            # Save state and reward obtained
            r = grid.rewards[s]
            states_actions_rewards.append((old_state, a, r))
        #grid.draw_grid()
    
    if not unecessary_action:
        states_actions_rewards.append((s, None, 0))
    #print('game over')
    #print('----------------------------------------')
    G = 0
    states_actions_returns = []
    #print(states_actions_rewards)
    # Easier to compute returns when reversed
    states_actions_rewards.reverse()
    for s, a, r in states_actions_rewards:
        G = r + GAMMA * G
        states_actions_returns.append((s, a, G))
    # reverse back for order
    states_actions_returns.reverse()
    
    return states_actions_returns

In [256]:
# N is the number of episodes
def first_visit_monte_carlo_prediction(grid, policy, N):
    states = grid.all_states()
    V = {}
    counts = {}
    for s in states:
        V[s] = 0
        counts[s] = 0    # Keeps track of the number of times a state is visited
    for i in range(1, N + 1):
        states_actions_returns = play_game(grid, policy)
        seen_states = set()
        #print(states_actions_returns)
        for j in range(len(states_actions_returns)):
            s, a, g = states_actions_returns[j]
            # Check states that were seen before to see if
            #  the current state was seen already
            if not s in seen_states:
                counts[s] += 1
                # Efficient way to calculate the mean
                V[s] = (1 - 1 / counts[s]) * V[s] + (1 / counts[s]) * g          
                seen_states.add(s)
    return V

In [263]:
g = standard_grid()
policy = {
    (0,0):'R',
    (0, 1): 'L',
    (0, 2):'R',
    (1, 0):'U',
    (1, 2):'R',
    (2, 0):'U',
    (2, 1):'R',
    (2, 2):'R',
    (2, 3):'U'
}

In [264]:
print_policy(policy, g)

-------------------------
|  R  |  L  |  R  |     |
-------------------------
|  U  |     |  R  |     |
-------------------------
|  U  |  R  |  R  |  U  |
-------------------------


In [265]:
V = first_visit_monte_carlo_prediction(g, policy, 1000)

In [266]:
print_values(V, g)

-------------------------
|-92.67|-97.33| 1.00| 0.00|
-------------------------
|-81.00| 0.00|-1.00| 0.00|
-------------------------
|-72.90|-0.81|-0.90|-1.00|
-------------------------


In [268]:
g = standard_grid()
play_game(g, policy, start = ((2, 0), 'U'))

[((2, 0), 'U', -72.9),
 ((1, 0), 'U', -81.0),
 ((0, 0), 'R', -90.0),
 ((0, 1), 'L', -100.0)]

In [186]:
class RandomGrid:    # Environment
    def __init__(self, width, height, start):
        self.width = width
        self.height = height
        self.i = start[0]
        self.j = start[1]
    
    def set(self, rewards, actions):
        # rewards should be a dictionary of: (i, j): r, or: (row, col): reward
        # actions should be a dictionary of: (i, j): A or (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions
        
    def set_state(self, s):
        # The state s is the location of the player in the grid: s = (i, j)
        self.i = s[0]
        self.j = s[1]
        
    def current_state(self):
        return (self.i, self.j)
    
    def is_terminal(self, s):
        # A terminal state won't be in the actions dictionary (since it won't have any associated action)
        return s not in self.actions
    
    def move(self, action):
        # check if legal move first
        # Possible actions: U/D/L/R
        if action in self.actions[self.current_state()]:
            r = np.random.random()
            remaining_actions = list(self.actions[self.current_state()])
            # if r > 0.5, use the given action, else chose a different one
            if r < 0.5:
                p = 0.5 / 3    # prob of any other action occuring
                remaining_actions.remove(action)   # if r < 0.5, then the given action will not be performed
                n = len(remaining_actions)
                action_changed = False    # if it stays false, then the action we are attempting to choose is illegal, so do nothing
                for i in range(n):
                    if i * p <= r < (i + 1) * p:
                        action = remaining_actions[i]
                        action_changed = True
                        break
                if not action_changed:
                    action = ''
            
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        return self.rewards.get(self.current_state(), 0), action
                
    def move_without_randomness(self, action):
        # check if legal move first
        # Possible actions: U/D/L/R
        if action in self.actions[self.current_state()]:
            if action == 'U':
                self.i -= 1
            elif action == 'D':
                self.i += 1
            elif action == 'R':
                self.j += 1
            elif action == 'L':
                self.j -= 1
        # return reward (if any)
        return self.rewards.get(self.current_state(), 0)
               
        # return action selected and reward (if any)
        return action, self.rewards.get(self.current_state(), 0)
    
    def undo_move(self, action):
        if action == 'U':
            self.i += 1
        elif action == 'D':
            self.i -= 1
        elif action == 'R':
            self.j -= 1
        elif action == 'L':
            self.j += 1
        # raise an exception if we arrive somewhere we shouldn't be
        # should never happen
        assert(self.current_state() in self.all_states())
        
    def game_over(self):
        # The game is over if we are in a state where no action is possible
        return self.current_state() not in self.actions
    
    def all_states(self):
        # Cast to a set to avoid repetition in states
        return set(list(self.rewards.keys()) + list(self.actions.keys()))
    
    def draw_grid(self):
        states = self.all_states()
        for i in range(self.height):
            for j in range(self.width):
                s = (i, j)
                symbol = ''
                if s in states:
                    if self.current_state() == s:
                        symbol = 's'
                    else:
                        symbol = '.'
                else:
                    symbol = 'x'
                print(symbol, end = '')
                if j != self.width - 1:
                    print('   ', end = '')
            print('\n')

In [192]:
def standard_random_grid():
    # Define a grid that describes the reward for arriving at each state
    #     and possible actions at each state
    # The grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .   .   .   1
    # .   x   .  -1
    # s   .   .   .
    g = RandomGrid(4, 3, (2, 0))
    rewards = {(0, 3): 1, (1, 3): -1}
    actions = {
        (0,0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('D', 'L', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }
    g.set(rewards, actions)
    for s in g.all_states():
        if not g.is_terminal(s):
            g.rewards[s] = 0
    return g

In [193]:
def negative_random_grid():
    # Define a grid that describes the reward for arriving at each state
    #     and possible actions at each state
    # The grid looks like this
    # x means you can't go there
    # s means start position
    # number means reward at that state
    # .   .   .   1
    # .   x   .  -1
    # s   .   .   .
    g = RandomGrid(4, 3, (2, 0))
    actions = {
        (0,0): ('D', 'R'),
        (0, 1): ('L', 'R'),
        (0, 2): ('D', 'L', 'R'),
        (1, 0): ('U', 'D'),
        (1, 2): ('U', 'D', 'R'),
        (2, 0): ('U', 'R'),
        (2, 1): ('L', 'R'),
        (2, 2): ('U', 'L', 'R'),
        (2, 3): ('U', 'L')
    }
    rewards = {(0, 3): 1, (1, 3): -1}
    
    # Penalise the player for each step, to see if he can finish with the minimum step
    for s in actions.keys():
        rewards[s] = -0.1
    g.set(rewards, actions)
    return g

In [194]:
g = standard_random_grid()
winning_policy = {
    (0,0):'R',
    (0, 1): 'R',
    (0, 2):'R',
    (1, 0):'U',
    (1, 2):'U',
    (2, 0):'U',
    (2, 1):'L',
    (2, 2):'U',
    (2, 3):'L'
}

In [195]:
print_policy(winning_policy, g)

-------------------------
|  R  |  R  |  R  |     |
-------------------------
|  U  |     |  U  |     |
-------------------------
|  U  |  L  |  U  |  L  |
-------------------------


In [196]:
V = first_visit_monte_carlo_prediction(g, winning_policy, 1000)

In [197]:
print_values(V, g)

-------------------------
|-70.49|-56.96|-29.47| 0.00|
-------------------------
|-76.45| 0.00|-36.14| 0.00|
-------------------------
|-78.53|-73.81|-52.19|-58.67|
-------------------------


In [198]:
print_policy(winning_policy, g)

-------------------------
|  R  |  R  |  R  |     |
-------------------------
|  U  |     |  U  |     |
-------------------------
|  U  |  L  |  U  |  L  |
-------------------------


In [199]:
ALL_POSSIBLE_ACTIONS = ['U', 'D', 'L', 'R']

In [232]:
def get_policy(grid, Q):
    policy = {}
    max_values_dict = {}
    for s, a in Q.keys():
        if s not in max_values_dict:
            if not grid.is_terminal(s):
                max_values_dict[s] = Q[(s, a)]
                policy[s] = a
        else:
            if Q[(s, a)] > max_values_dict[s]:
                max_values_dict[s] = Q[(s, a)]
                policy[s] = a
    return policy

In [284]:
def get_value_function(grid, Q):
    V = {}
    for s, a in Q.keys():
        if s not in V:
            if not grid.is_terminal(s):
                V[s] = Q[(s, a)]
        else:
            if Q[(s, a)] > V[s]:
                V[s] = Q[(s, a)]
    return V

In [285]:
def monte_carlo_control(grid, N):
    Q = {}
    policy = {}
    states = grid.all_states()
    non_terminal_states = list(grid.actions.keys())
    counts = {}
    
    # Initialize Q and policy randomly
    for s in states:
        if not grid.is_terminal(s):
            # Initialize Q
            for a in ALL_POSSIBLE_ACTIONS:
                Q[(s, a)] = np.random.random()
                counts[(s, a)] = 0
            # Initialize policy
            a_idx = np.random.choice(len(ALL_POSSIBLE_ACTIONS))
            policy[s] = ALL_POSSIBLE_ACTIONS[a_idx]
        else:
            Q[(s, None)] = 0
            counts[(s, None)] = 0
    
    for i in range(N):
        # Randomly select a state
        state_idx = np.random.choice(len(non_terminal_states))
        state = non_terminal_states[state_idx]
        
        # Randomly select an action
        a_idx = np.random.choice(len(ALL_POSSIBLE_ACTIONS))
        action = ALL_POSSIBLE_ACTIONS[a_idx]
        
        states_actions_returns = play_game(grid, policy, start = (state, action))
        
        # Policy iteration
        seen_states = set()
        for s, a, g in states_actions_returns:
            if not s in seen_states:
                counts[(s, a)] += 1
                # Efficient way to calculate the mean
                Q[(s, a)] = (1 - 1 / counts[(s, a)]) * Q[(s, a)] + (1 / counts[(s, a)]) * g          
                seen_states.add(s)
                
        # Policy improvement
        policy = get_policy(grid, Q)
            
    return Q, policy


In [300]:
g = standard_negative_grid()
Q, policy = monte_carlo_control(g, 100000)

In [301]:
print_policy(policy, g)

-------------------------
|  D  |  R  |  R  |     |
-------------------------
|  D  |     |  R  |     |
-------------------------
|  R  |  R  |  U  |  U  |
-------------------------


In [298]:
V = get_value_function(g, Q)

In [299]:
print_values(V, g)

-------------------------
|-24.54|-28.22| 1.00| 0.00|
-------------------------
|-29.17| 0.00|-1.00| 0.00|
-------------------------
|-23.32|-18.43|-12.73|-1.00|
-------------------------
