### Imports

In [14]:
import numpy as np
import random
import sys 
import pandas as pd 
pd.options.display.float_format = '{:,.3f}'.format

In [15]:
ACTIONS = ("up", "down", "left", "right") 
REWARDS = {" ": -1, ".": 0.1, "+": 100, "-": -10}
TERMINALS = ("+", "-")
OBSTACLES = ("#")

gamma = 1

#Made here a random move possible if needed for other implementations, and for fun :)
rand_move_probability = 0

class World:  
  def __init__(self, width, height):
    self.width = width
    self.height = height
    self.grid = np.full((width, height), ' ', dtype='U1')
  
  def add_obstacle(self, start_x, start_y, end_x=None, end_y=None):
    """
    Create an obstacle in either a single cell or rectangle.
    """
    if end_x == None: end_x = start_x
    if end_y == None: end_y = start_y
    
    self.grid[start_x:end_x + 1, start_y:end_y + 1] = OBSTACLES[0]

  def add_reward(self, x, y, reward):
    assert reward in REWARDS, f"{reward} not in {REWARDS}"
    self.grid[x, y] = reward

  def add_terminal(self, x, y, terminal):
    assert terminal in TERMINALS, f"{terminal} not in {TERMINALS}"
    self.grid[x, y] = terminal

  def is_obstacle(self, x, y):
    if x < 0 or x >= self.width or y < 0 or y >= self.height:
      return True
    else:
      return self.grid[x ,y] in OBSTACLES 

  def is_terminal(self, x, y):
    return self.grid[x ,y] in TERMINALS

  def get_reward(self, x, y):
    """ 
    Return the reward associated with a given location
    """ 
    return REWARDS[self.grid[x, y]]

  def get_next_state(self, current_state, action):
    """
    Get the next state given a current state and an action. The outcome can be
    stochastic  where rand_move_probability determines the probability of 
    ignoring the action and performing a random move.
    """    
    assert action in ACTIONS, f"Unknown acion {action} must be one of {ACTIONS}"
    
    x, y = current_state 
    
    # If our current state is a terminal, there is no next state
    if self.grid[x, y] in TERMINALS:
      return None

    # Check of a random action should be performed:
    if np.random.rand() < rand_move_probability:
      action = np.random.choice(ACTIONS)

    if action == "up":      y -= 1
    elif action == "down":  y += 1
    elif action == "left":  x -= 1
    elif action == "right": x += 1

    # If the next state is an obstacle, stay in the current state
    return (x, y) if not self.is_obstacle(x, y) else current_state


In [16]:
world = World(4, 4)
world.add_terminal(3, 3, "+")

print(world.grid.T)

[[' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ']
 [' ' ' ' ' ' ' ']
 [' ' ' ' ' ' '+']]


The implementation of on-policy n-step Sarsa 

In [17]:
import numpy as np
import random

def n_step_sarsa(world, n, alpha, epsilon, gamma, num_episodes = 1000):
    states = [(x, y) for x in range(world.width) for y in range(world.height)]
    Q = {state: {action: 0 for action in ACTIONS} for state in states}
    policy_dict = {state: {action: 1 / len(ACTIONS) for action in ACTIONS} for state in states}

    for _ in range(num_episodes):
        state = (np.random.randint(0, world.width), np.random.randint(0, world.height))
        while world.is_terminal(*state):
            state = (np.random.randint(0, world.width), np.random.randint(0, world.height))
        action = epsilon_greedy_policy(state, policy_dict, epsilon)
        
        T = float('inf')
        t = 0
        G = 0

        states_list = [state]
        actions_list = [action]
        rewards_list = [0]

        while True:
            if t < T:
                next_state = world.get_next_state(state, action)
                reward = world.get_reward(*state)
                rewards_list.append(reward)
                next_action = "None"

                if next_state is None or world.is_terminal(*next_state):
                    T = t + 1
                else:
                    next_action = epsilon_greedy_policy(next_state,policy_dict, epsilon)
                    states_list.append(next_state)
                    actions_list.append(next_action)

                state = next_state
                action = next_action

            tau = t - n + 1
            if tau >= 0:
                G = sum(gamma ** (i - tau - 1) * rewards_list[i] for i in range(tau + 1, min(tau + n, T) + 1))
                
                if tau + n < T:
                    G += gamma ** n * Q[states_list[tau + n]][actions_list[tau + n]]

                current_state = states_list[tau]
                current_action = actions_list[tau]
                Q[current_state][current_action] += alpha * (G - Q[current_state][current_action])

            if tau == T - 1: 
                break
            t += 1

        for state in states:
            action_values = Q[state]
            best_action = max(action_values, key=action_values.get)
            for action in ACTIONS:
                policy_dict[state][action] = epsilon / len(ACTIONS)
            policy_dict[state][best_action] += 1 - epsilon

    return Q, policy_dict


def epsilon_greedy_policy(state, policy_dict, epsilon=0.1):
    if random.uniform(0, 1) < epsilon:
        return random.choice(ACTIONS)
    else:
        action_values = policy_dict[state]
        max_prob = max(action_values.values())
        best_actions = [action for action, prob in action_values.items() if prob == max_prob]
        return random.choice(best_actions) 

In [18]:
def print_policy(Q, policy, world=world, method='Q'):
    grid_width = world.width
    grid_height = world.height

    grid = [['' for _ in range(grid_width)] for _ in range(grid_height)]
    
    for state in sorted(Q.keys()):

        if world.is_terminal(*state):
            continue
        
        if method == 'Q':
            best_action = max(Q[state], key=Q[state].get)
        elif method == 'policy':
            best_action = max(policy[state], key=policy[state].get)
        else:
            raise ValueError("Method should be either 'Q' or 'policy'")
        
        action_abbr = action_abbreviation(best_action)
        
        x, y = state

        grid[y][x] = action_abbr
    
    for row in grid:
        print(" ".join(row))

def action_abbreviation(action):
    if action == 'up':
        return 'U'
    elif action == 'down':
        return 'D'
    elif action == 'left':
        return 'L'
    elif action == 'right':
        return 'R'
    else:
        return action


In [19]:
n = 3
alpha = 0.1
epsilon = 0.1
gamma = 0.99
num_episodes = 10000

Q, policy = n_step_sarsa(world, n, alpha, epsilon, gamma, num_episodes)
print_policy(Q,policy, method='Q')

R D R D
D R D D
R R D D
R R R 


In [34]:
for item in Q.keys():
    print(item,max(Q[item],key=Q[item].get))

(0, 0) right
(0, 1) down
(0, 2) right
(0, 3) right
(1, 0) down
(1, 1) right
(1, 2) right
(1, 3) right
(2, 0) right
(2, 1) down
(2, 2) down
(2, 3) right
(3, 0) down
(3, 1) down
(3, 2) down
(3, 3) up


In [35]:
for item in Q.items():
    print(item)

((0, 0), {'up': -33.175794607787175, 'down': -31.185526022476147, 'left': -17.659041987538437, 'right': -6.894798500186999})
((0, 1), {'up': -10.893573012228329, 'down': -5.714833566381222, 'left': -11.37883214789113, 'right': -12.704090892627159})
((0, 2), {'up': -8.844059022094493, 'down': -4.9374884393423, 'left': -6.321997649674107, 'right': -4.319578369492129})
((0, 3), {'up': -7.135620281116557, 'down': -9.349098911200908, 'left': -8.602590246937837, 'right': -3.094968702374016})
((1, 0), {'up': -8.441468023550861, 'down': -5.1301533131843735, 'left': -10.01677788540369, 'right': -8.435440704124549})
((1, 1), {'up': -6.191701621919399, 'down': -5.105445962199412, 'left': -6.655227536188954, 'right': -4.15123400437261})
((1, 2), {'up': -5.193985662167155, 'down': -4.206290506404372, 'left': -5.5751156079189, 'right': -3.057710192381763})
((1, 3), {'up': -4.740462068229153, 'down': -3.3634678069411614, 'left': -4.540297097003025, 'right': -2.060643107256684})
((2, 0), {'up': -15.23