# Creating the environment

Code entspricht dem Beipsiel der gymnasium dokumentation für das Erstellen einer  Gridworld: https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/
Mehr Erläuterungen in "RenderingGymnasiumTutorial Notes".
TODO: render function is only considering human rendering; unclear what RGB-rendering does

In [1]:
import numpy as np
import pygame

import gymnasium as gym
from gymnasium import spaces

pygame 2.1.3 (SDL 2.0.22, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


The **Reset** function actually demands 5 arguments: **"truncated"** in front of "info". When executing this in a jupyter notebook, it works with 4. If you put it in a .py file, you need to add the 5th argument .

**Rendering** with pygame doesn't work properly in **jupyter notebooks** (shows different behavior when defining the environment in a .py file).

In [2]:
class GridWorldEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 4}

    def __init__(self, render_mode=None, size=8):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window

        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                
            }
        )

        self.grid_types = {
            "water": 0,
            "land": 1
        }

        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = spaces.Discrete(4)

        """
        The following dictionary maps abstract actions from `self.action_space` to
        the direction we will walk in if that action is taken.
        I.e. 0 corresponds to "right", 1 to "up" etc.
        """
        self._action_to_direction = {
            0: np.array([1, 0]),
            1: np.array([0, 1]),
            2: np.array([-1, 0]),
            3: np.array([0, -1]),
        }

        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None

        self.grid_list = None
        self.grid_array=None

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}

        #for including the landscape in the observation space if we want to train the agent in similar environments:
        #return {"agent": self._agent_location, "target": self._target_location, "landscape": self.grid_list}

    def _get_info(self):
        return {
        "distance": np.linalg.norm(
            self._agent_location - self._target_location, ord=1
        )
    }

    def get_grid_type(self, location):
        return self.grid_list[location[1]][location[0]]

    def reset(self, seed=None, options=None):
    # We need the following line to seed self.np_random
        super().reset(seed=seed)

        vertical_land_line = [[self.grid_types["land"] for _ in range(self.size)]]

        self.grid_list = vertical_land_line + vertical_land_line + [
            [self.grid_types["land"] if (i+1) % self.size == self.size/2 or (i+1) % self.size == self.size/2+1 else self.grid_types["water"] for i in range(self.size)]
            for _ in range(self.size-4) #range needs to be self.size- "number of vertical land lines"
        ] + vertical_land_line + vertical_land_line

        self.grid_array = np.array(self.grid_list)

        # Choose the agent's location at uniformly at random and ensure that it doesn't spawn in the water
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)
        agent_grid_type =  self.get_grid_type(self._agent_location)
        while agent_grid_type == self.grid_types["water"]:
            self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)
            agent_grid_type = self.get_grid_type(self._agent_location)

         # Choose the target's location uniformly at random and ensure that it doen't spawn in the water or on the agent
        self._target_location = self._agent_location
        target_grid_type = self.get_grid_type(self._target_location)
        while np.array_equal(self._target_location, self._agent_location) or target_grid_type == self.grid_types["water"]:
            self._target_location = self.np_random.integers(0, self.size, size=2, dtype=int)
            target_grid_type = self.get_grid_type(self._target_location)

        #alternatively: set the target location to a fixed point
        #self._target_location = np.array([self.size-1, self.size-1])

        observation = self._get_obs()
        info = self._get_info()
        self.render()

        return observation, info
    
    def step(self, action):
        terminated = False
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        #give a negative reward for eacch step taken
        reward = -1  

        # An episode is done iff the agent has reached the target or falls into the water
        if np.array_equal(self._agent_location, self._target_location):
            reward =100
            terminated = True

        if self.get_grid_type(self._agent_location)==self.grid_types["water"]:
            reward = -100
            terminated = True

        observation = self._get_obs()
        info = self._get_info()

        self.render()

        return observation, reward, terminated, info
    
    def render(self):
        if self.render_mode == "human":
         return self._render_frame()

    def _render_frame(self):
        if self.window is None:
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size, self.window_size)
            )
            pygame.display.set_caption("Bridge Scenario")
        if self.clock is None:
            self.clock = pygame.time.Clock()

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((0, 0, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        y_coordinate=0
        for row in self.grid_array:
            x_coordinate=0
            for grid in row:
                if self.grid_types["water"] == grid:
                    pygame.draw.rect(
                        canvas,
                        (0,0,255),
                        pygame.Rect(
                        (x_coordinate*pix_square_size, y_coordinate*pix_square_size),
                        (pix_square_size, pix_square_size),
                        ),
                    )
                    
                elif self.grid_types["land"] == grid:
                    pygame.draw.rect(
                        canvas,
                         (139, 69, 19),
                        pygame.Rect(
                        (x_coordinate*pix_square_size, y_coordinate*pix_square_size),
                        (pix_square_size, pix_square_size),
                        ),
                    )
                x_coordinate+=1
            y_coordinate+=1

        #draw the target
        pygame.draw.rect(
            canvas,
            (0, 255, 0),
            pygame.Rect(
                pix_square_size * self._target_location,
                (pix_square_size, pix_square_size),
            ),
        )

        # Now we draw the agent
        pygame.draw.circle(
            canvas,
            (255, 255, 255),
            (self._agent_location + 0.5) * pix_square_size,
            pix_square_size / 3,
        )

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        # The following line copies our drawings from `canvas` to the visible window
        self.window.blit(canvas, canvas.get_rect())
        pygame.event.pump()
        pygame.display.update()

        # We need to ensure that human-rendering occurs at the predefined framerate.
        # The following line will automatically add a delay to keep the framerate stable.
        self.clock.tick(self.metadata["render_fps"])

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()

zum Umgang mit spaces und dem Update von agent und target position im Observation Space (impliziert vollständige Beobachtbarkeit des Environments)

In [3]:
import gym

size = 5  # Replace with the desired size
observation_space = gym.spaces.Dict(
    {
        "agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
        "target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
    }
)

# Retrieving the value associated with the "agent" key
agent_space = observation_space["agent"]

print(observation_space["agent"])

observation_space["agent"] = np.random.default_rng().integers(0, size, size=2, dtype=int)

print(type(observation_space["agent"]))

Box(0, 4, (2,), int64)
<class 'numpy.ndarray'>


In this case, the entry_point is set to a lambda function that creates an instance of your CustomEnv class. This approach is necessary because the Jupyter environment does not directly correspond to a Python module, and the traditional module-based registration may not work as expected.

In [4]:
gym.register(
    id='grid_world-v0',
    entry_point=lambda render_mode=None: GridWorldEnv(render_mode=render_mode),  # Replace with your actual module and class name
    max_episode_steps=300,
    kwargs={'render_mode': None}
)


In [5]:
env = gym.make('grid_world-v0', render_mode='human')

In [6]:
env.observation_space.sample()

OrderedDict([('agent', array([1, 5])), ('target', array([1, 6]))])

# Test the Environment by randomly sampling actions

episodes = 10
env.render(mode="human")
for episode in range(1, episodes+1):
    total_reward = 0
    state = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        #print(action)
        state, reward, done, info = env.step(action)
        total_reward+=reward
        #print("Reward:" + str(reward))
        #print("Total Reward" + str(total_reward))
        print(state)

checking what the grid looks like:

In [7]:
grid_types = {
            "water": 0,
            "land": 1
        }

size=20

grid_list = [[grid_types["land"] for _ in range(size)]] + [
            [grid_types["land"] if (i+1) % size == size/2 or (i+1) % size == size/2+1 else grid_types["water"] for i in range(size)]
            for _ in range(size-2)
        ] + [[grid_types["land"] for _ in range(size)]] 

print(grid_list)

grid_array = np.array(grid_list)

print(grid_array)

for row in grid_array:
    for grid in row:
        if grid_types["water"] == grid:
            print("water")
        else:
            print("land")


[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 

# Creating and training a Q-Learning Agent

In [8]:
from __future__ import annotations
from collections import defaultdict
import matplotlib.pyplot as plt
from tqdm import tqdm

the class assumes that the observations are a dictionary with arrays as values (as in a gridworld the position of the agent and the target or any other objects are two-dimensional); 
if this assumption does not hold, the *convert_observations* method must be rewritten

things that could be added to the code for evaluating the performance of the agent: 
-computes the training error after each step -> calculate average training error after each episode

In [9]:
class RLAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def convert_observations(self, observations):
        obs_list= []
        for ob in observations:
            for element in ob:
                obs_list.append(element)
        tuple_obs = tuple(obs_list)

        return tuple_obs


    def get_action(self, obs) -> int:
        obs = self.convert_observations(obs.values())
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return env.action_space.sample()

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))

    def update(
        self,
        obs: spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                
            }
        ),
        action: int,
        reward: float,
        terminated: bool,
        next_obs: spaces.Dict(
            {
                "agent": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": spaces.Box(0, size - 1, shape=(2,), dtype=int),
                
            }
        )
    ):
        # obs = obs.values()
        # obs_list= []
        # for ob in obs:
        #     for element in ob:
        #         obs_list.append(element)
        # tuple_obs = tuple(obs_list)

        # obs = next_obs.values()
        # obs_list= []
        # for ob in obs:
        #     for element in ob:
        #         obs_list.append(element)
        # tuple_next_obs = tuple(obs_list)

        """Updates the Q-value of an action."""
        obs = self.convert_observations(obs.values())
        next_obs = self.convert_observations(next_obs.values())
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

## Set hyperparameters and instantiate the Q-Learning-Agent class

In [10]:
# hyperparameters
learning_rate = 0.01
start_epsilon = 1.0
n_episodes = 100_000
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1

agent = RLAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

## Train the agent

In [11]:
env = gym.make('grid_world-v0', render_mode= None)

## Training Loop
The **"try-except"**-block allows to manually stop exectuing the cell and thereby terminate the training process without invoking an error. 

The **number of episodes to train** on is set as a hyperparameter in the cell above as how fast the random exploration (epsilon) is decreased depends on the number of episodes that the agent is trained.

In [12]:
env.render_mode = None
try:
    for episode in tqdm(range(n_episodes)):
        state, info = env.reset()
        done = False
        # play one episode
        while not done:
            action = agent.get_action(state)
            next_state, reward, terminated, info = env.step(action)

            # update the agent
            agent.update(state, action, reward, terminated, next_state)

            # update if the environment is done and the current state
            done = terminated
            state = next_state

        agent.decay_epsilon()
except KeyboardInterrupt:
    pass

  0%|          | 0/100000 [00:00<?, ?it/s]

100%|██████████| 100000/100000 [00:24<00:00, 4101.70it/s]


In [15]:
print(agent.q_values)

defaultdict(<function RLAgent.__init__.<locals>.<lambda> at 0x11cbccb80>, {(3, 0, 4, 6): array([-1.13147611,  3.72253948, -1.14038836, -1.12934181]), (2, 0, 4, 6): array([-1.30894472, -0.06889012, -1.34980237, -1.36770285]), (4, 0, 4, 6): array([-1.00100917,  9.4127819 , -1.01640977, -0.99325356]), (4, 1, 4, 6): array([ 1.04585789, 63.87018707,  0.13474321, -0.53836195]), (3, 1, 4, 6): array([36.62788747, -0.58817897, -0.69476445, -0.82257741]), (3, 2, 4, 6): array([ 20.24256244,  -0.41566132, -29.6552305 ,  -0.26355342]), (2, 2, 4, 6): array([0., 0., 0., 0.]), (7, 0, 1, 6): array([-2.10424303, -2.10781326, -2.10917699, -2.10697655]), (6, 0, 1, 6): array([-1.94073456, -1.88609597, -1.93671701, -1.9359178 ]), (6, 1, 1, 6): array([ -1.81625777, -44.17338615,   0.68937608,  -1.80836116]), (5, 1, 1, 6): array([ -1.54961309, -46.37317748,  11.44702062,  -1.5791893 ]), (5, 0, 1, 6): array([-1.7103408 , -0.57623187, -1.71261843, -1.73177151]), (7, 1, 1, 6): array([ -2.02625407, -36.38145139, 

: 

## Test the agent

In [13]:
env = gym.make('grid_world-v0', render_mode= 'human')

The cell below lets you watch the agent until the window is closed. This results in a freezing of the window, but not in a crash of the kernel.

In [14]:
env.render_mode = "human"
agent.epsilon = 0
try: 
    while True:
        state, info = env.reset()
        done = False
        # play one episode
        while not done:
            action = agent.get_action(state)
            next_state, reward, terminated, info = env.step(action)

            # update the agent
            agent.update(state, action, reward, terminated, next_state)

            # update if the environment is done and the current state
            done = terminated
            state = next_state
except pygame.error as e:
    if "display Surface quit" in str(e):
        print("Pygame display surface quit. Exiting gracefully.")
finally:
    # Ensure that the environment is closed
    env.close()

KeyboardInterrupt: 

In [None]:
try: 
    while True:
        env.render_mode = None
        print(env.render_mode)
        agent.epsilon = 0
        for episode in range(n_episodes):
            state, info = env.reset()
            done = False
            # play one episode
            while not done:
                action = agent.get_action(state)
                next_state, reward, terminated, info = env.step(action)

                # update the agent
                agent.update(state, action, reward, terminated, next_state)

                # update if the environment is done and the current state
                done = terminated
                state = next_state
except pygame.error as e:
    if "display Surface quit" in str(e):
        print("Pygame display surface quit. Exiting gracefully.")
finally:
    # Ensure that the environment is closed
    env.close()
    print(reward)

None
100


## Exkurse

**Exkurs**: on the functionality of defualtdict

In [None]:
from collections import defaultdict
import numpy as np

# Assume env.action_space.n is 4 for illustration purposes
env_action_space_n = 4

# Initialize a defaultdict with the default factory as the lambda function
q_values = defaultdict(lambda: np.zeros(env_action_space_n))

# Accessing a non-existing key
key = (1, 2, True)
print(q_values[key])  # Output: [0. 0. 0.]

[0. 0. 0. 0.]


**Exkurs** on tuples as keys for dictionaries

In [None]:
bla = np.array([1,2])
blub = np.array([3,4])

tuple_of_arrays = (bla, blub)
print("Tuple of arrays:", tuple_of_arrays)
print("Type of tuple of arrays:", type(tuple_of_arrays))


my_dict=defaultdict(lambda: np.zeros(env.action_space.n))

tuple_of_integers = (1,2,3,4)
print("Tuple of integers:", tuple_of_integers)
print("Type of tuple of integers:", type(tuple_of_integers))

#tuples of arrays cannot be used as keys; tuples of integers can be used as keys, even thuogh they are both of the class "tuple":
#my_dict[tuple_of_arrays] = 1
my_dict[tuple_of_integers] = (1,2,3,4)

#arrays also can't be used as keys
#my_dict[bla] = 1


Tuple of arrays: (array([1, 2]), array([3, 4]))
Type of tuple of arrays: <class 'tuple'>
Tuple of integers: (1, 2, 3, 4)
Type of tuple of integers: <class 'tuple'>
