In [1]:
import numpy as np
import random
from collections import defaultdict

class GridWorld:
    """
    A 5x5 GridWorld environment.
    """
    def __init__(self):
        # Define grid dimensions
        self.rows = 5
        self.cols = 5
        self.num_states = self.rows * self.cols
        self.num_actions = 4  # 0: up, 1: down, 2: left, 3: right

        # Define special states for the 5x5 grid
        self.start_state = 20  # Bottom-left corner
        self.goal_state = 4    # Top-right corner

        # Add more complex obstacles
        self.trap_states = [13, 19] # Two traps
        self.wall_states = [6, 11, 16] # A vertical wall column

        self.terminal_states = [self.goal_state] + self.trap_states

    def reset(self):
        """Resets the environment to the starting state."""
        return self.start_state

    def step(self, state, action):
        """Performs an action in a given state."""
        if state in self.terminal_states:
            return state, 0, True

        row, col = state // self.cols, state % self.cols

        if action == 0: row = max(row - 1, 0)
        elif action == 1: row = min(row + 1, self.rows - 1)
        elif action == 2: col = max(col - 1, 0)
        elif action == 3: col = min(col + 1, self.cols - 1)

        next_state = row * self.cols + col

        # Agent cannot enter a wall, so it stays in place
        if next_state in self.wall_states:
            next_state = state

        # Define rewards
        if next_state == self.goal_state:
            reward = 10
        elif next_state in self.trap_states:
            reward = -10
        else:
            reward = -0.1 # Small cost for each move

        done = next_state in self.terminal_states
        return next_state, reward, done

def select_epsilon_greedy_action(Q, state, num_actions, epsilon):
    """
    Selects an action using an epsilon-greedy policy.
    With probability epsilon, it chooses a random action.
    With probability 1-epsilon, it chooses the greedy action.
    """
    if random.uniform(0, 1) < epsilon:
        return random.randint(0, num_actions - 1)
    else:
        return np.argmax(Q[state])

**Sarsa (On-Policy) 实现**

In [2]:
def sarsa(env, num_episodes, alpha, gamma, epsilon):
    """
    Implements the SARSA on-policy TD control algorithm.

    Args:
        env: The environment object.
        num_episodes (int): The number of episodes to run.
        alpha (float): The learning rate.
        gamma (float): The discount factor.
        epsilon (float): The exploration rate for the epsilon-greedy policy.

    Returns:
        Q (defaultdict): The learned action-value function.
    """

    # Initialization
    # q0(s,a) for all (s,a)
    Q = defaultdict(lambda: np.zeros(env.num_actions))

    # For each episode, do
    for i in range(num_episodes):
        if (i + 1) % 500 == 0:
            print(f"Episode {i + 1}/{num_episodes}")

        # Start a new episode
        state = env.reset()
        done = False

        # Generate a0 at s0 following pi_0(s0)
        # (pi_0 is the initial epsilon-greedy policy derived from q0)
        action = select_epsilon_greedy_action(Q, state, env.num_actions, epsilon)

        # If s_t is not the target state, do
        while not done:
            # Collect an experience sample (r_{t+1}, s_{t+1}, a_{t+1})
            # Generate r_{t+1}, s_{t+1} by interacting with the environment
            next_state, reward, done = env.step(state, action)

            # Generate a_{t+1} following pi_t(s_{t+1})
            # This is the key step that makes SARSA on-policy
            next_action = select_epsilon_greedy_action(Q, next_state, env.num_actions, epsilon)

            # Update q-value for (s_t, a_t):
            # Q(s,a) <-- Q(s,a) + alpha * [r + gamma * Q(s',a') - Q(s,a)]
            td_target = reward + gamma * Q[next_state][next_action]
            td_error = td_target - Q[state][action]
            Q[state][action] += alpha * td_error

            # Update policy for s_t: (This is done implicitly by updating Q)
            # The epsilon-greedy policy automatically improves as Q improves.

            # s_t <-- s_{t+1}, a_t <-- a_{t+1}
            state = next_state
            action = next_action

    print("--- SARSA Finished ---")
    return Q

In [3]:
env = GridWorld()
sarsa(env, num_episodes=5, alpha=0.01, gamma=0.99, epsilon=0.1)

--- SARSA Finished ---


defaultdict(<function __main__.sarsa.<locals>.<lambda>()>,
            {20: array([-0.00793823, -0.00696831, -0.00799576, -0.0078804 ]),
             15: array([-0.00695842, -0.00602722, -0.00596901, -0.0079678 ]),
             10: array([-0.00596852, -0.00502839, -0.00596901, -0.0049793 ]),
             5: array([-0.00492079, -0.004019  , -0.0039797 , -0.0039896 ]),
             0: array([-0.0039796, -0.0029996, -0.0030094, -0.0039701]),
             1: array([-0.0039796, -0.0039796, -0.0020391, -0.00298  ]),
             21: array([-0.00596959, -0.00597871, -0.00508602, -0.0049403 ]),
             22: array([-0.00298   , -0.00199   , -0.00202998, -0.00199   ]),
             17: array([-0.00199, -0.001  , -0.00199, -0.00199]),
             12: array([-0.001  , -0.001  , -0.00199, -0.1    ]),
             7: array([-0.001  , -0.001  , -0.00199, -0.001  ]),
             2: array([-0.00199  , -0.00199  , -0.0010099, -0.00199  ]),
             23: array([-0.001    , -0.00199  , -0.0010099

**Q-Learning (Off-Policy) 实现**

In [4]:
def q_learning(env, num_episodes, alpha, gamma, epsilon):

    # Initialization, guess q0(s,a) for all (s,a)
    Q = defaultdict(lambda: np.zeros(env.num_actions))

    # For each episode, do
    for i in range(num_episodes):
        if (i + 1) % 500 == 0:
            print(f"Episode {i + 1}/{num_episodes}")

        # Start a new episode
        state = env.reset()
        done = False

        # For each step t = 0, 1, 2, ... of episode, do
        while not done:
            # Behavior policy pi_b is epsilon-greedy based on current Q
            action = select_epsilon_greedy_action(Q, state, env.num_actions, epsilon)
            # Take action a_t, observe r_{t+1}, s_{t+1}
            next_state, reward, done = env.step(state, action)

            # Update q-value for (s_t, a_t):
            # Q(s,a) <-- Q(s,a) + alpha * [r + gamma * max_a' Q(s',a') - Q(s,a)]
            best_next_action_q_value = np.max(Q[next_state])
            td_target = reward + gamma * best_next_action_q_value
            td_error = td_target - Q[state][action]
            Q[state][action] += alpha * td_error

            # Update target policy for s_t (Implicitly done by updating Q): The optimal target policy pi_T is the greedy policy over the Q-values.
            state = next_state

    print("--- Q-Learning Finished ---")
    return Q

In [5]:
env = GridWorld()
q_learning(env, num_episodes=5, alpha=0.01, gamma=0.99, epsilon=0.1)

--- Q-Learning Finished ---


defaultdict(<function __main__.q_learning.<locals>.<lambda>()>,
            {20: array([-0.00790057, -0.0069979 , -0.0069979 , -0.00787079]),
             15: array([-0.00692902, -0.00602703, -0.0059985 , -0.0059985 ]),
             10: array([-0.00591079, -0.0050185 , -0.00599772, -0.004999  ]),
             5: array([-0.00495  , -0.0040191, -0.0039994, -0.0039994]),
             0: array([-0.0029997, -0.0030096, -0.0029997, -0.0039701]),
             21: array([-0.004999  , -0.004999  , -0.00507613, -0.00590089]),
             22: array([-0.0029701, -0.0019999, -0.0030488, -0.0029701]),
             17: array([-0.001  , -0.00199,  0.     ,  0.     ]),
             12: array([-0.001,  0.   ,  0.   , -0.1  ]),
             7: array([-0.001, -0.001, -0.001, -0.001]),
             2: array([-0.0019999, -0.00199  , -0.0010099, -0.00199  ]),
             3: array([-0.001, -0.001, -0.001,  0.1  ]),
             8: array([-0.001, -0.1  ,  0.   ,  0.   ]),
             13: array([0., 0., 0., 