In [2]:
from typing import Optional
import numpy as np
import gymnasium as gym


class GridWorldEnv(gym.Env):

    def __init__(self, size: int = 5):
        # The size of the square grid
        self.size = size

        # Define the agent and target location; randomly chosen in `reset` and updated in `step`
        self._agent_location = np.array([-1, -1], dtype=np.int32)
        self._target_location = np.array([-1, -1], dtype=np.int32)

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`-1}^2
        self.observation_space = gym.spaces.Dict(
            {
                "agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        )

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = gym.spaces.Discrete(4)
        # Dictionary maps the abstract actions to the directions on the grid
        self._action_to_direction = {
            0: np.array([1, 0]),  # right
            1: np.array([0, 1]),  # up
            2: np.array([-1, 0]),  # left
            3: np.array([0, -1]),  # down
        }
    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}
        
    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            )
        }
        
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        return observation, info
        
    def step(self, action):
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid bounds
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        # An environment is completed if and only if the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        truncated = False
        reward = 1 if terminated else 0  # the agent is only reached at the end of the episode
        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

In [3]:
gym.register(
    id="gymnasium_env/GridWorld-v0",
    entry_point=GridWorldEnv,
)

In [4]:
from collections import defaultdict
import numpy as np

class QLearningAgent:
    def __init__(
        self,
        env: gym.Env,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Q-learning agent."""
        self.env = env
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: dict) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # Convert observation (dict) to a tuple for hashing
        obs_tuple = tuple(obs["agent"]), tuple(obs["target"])
        
        # With probability epsilon, return a random action
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        # With probability (1 - epsilon), act greedily
        else:
            return int(np.argmax(self.q_values[obs_tuple]))

    def update(
        self,
        obs: dict,
        action: int,
        reward: float,
        terminated: bool,
        next_obs: dict,
    ):
        """Updates the Q-value of an action."""
        # Convert observations to tuples for hashing
        obs_tuple = tuple(obs["agent"]), tuple(obs["target"])
        next_obs_tuple = tuple(next_obs["agent"]), tuple(next_obs["target"])

        # Compute the target Q-value
        future_q_value = (not terminated) * np.max(self.q_values[next_obs_tuple])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs_tuple][action]
        )

        # Update the Q-value
        self.q_values[obs_tuple][action] += self.lr * temporal_difference
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        """Decay the exploration rate."""
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)


# Training the agent
def train_agent(env, agent, num_episodes=1000):
    for episode in range(num_episodes):
        obs, info = env.reset()
        done = False

        while not done:
            action = agent.get_action(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)
            agent.update(obs, action, reward, terminated, next_obs)
            obs = next_obs
            done = terminated or truncated

        # Decay epsilon after each episode
        agent.decay_epsilon()

        if episode % 100 == 0:
            print(f"Episode {episode}, Epsilon: {agent.epsilon}")

    print("Training completed.")


In [5]:
env = GridWorldEnv(size=5)
agent = QLearningAgent(
    env=env,
    learning_rate=0.1,
    initial_epsilon=1.0,
    epsilon_decay=0.001,
    final_epsilon=0.01,
    discount_factor=0.95,
)

In [6]:
train_agent(env, agent, num_episodes=1000)

Episode 0, Epsilon: 0.999
Episode 100, Epsilon: 0.8989999999999999
Episode 200, Epsilon: 0.7989999999999998
Episode 300, Epsilon: 0.6989999999999997
Episode 400, Epsilon: 0.5989999999999996
Episode 500, Epsilon: 0.49899999999999956
Episode 600, Epsilon: 0.39899999999999947
Episode 700, Epsilon: 0.2989999999999994
Episode 800, Epsilon: 0.1989999999999993
Episode 900, Epsilon: 0.0989999999999992
Training completed.


In [7]:
def play_with_trained_agent(env, agent, num_episodes=10):
    for episode in range(num_episodes):
        obs, info = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = agent.get_action(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)
            obs = next_obs
            done = terminated or truncated
            total_reward += reward

        print(f"Episode {episode + 1}, Total Reward: {total_reward}")


In [8]:
play_with_trained_agent(env, agent, num_episodes=10)

Episode 1, Total Reward: 1
Episode 2, Total Reward: 1
Episode 3, Total Reward: 1
Episode 4, Total Reward: 1
Episode 5, Total Reward: 1
Episode 6, Total Reward: 1
Episode 7, Total Reward: 1
Episode 8, Total Reward: 1
Episode 9, Total Reward: 1
Episode 10, Total Reward: 1
