<div align="center">
    <h2>Reinforcement Learning Summer 2024</h2>
    <h2>Prof. Dr. Frank Kirchner</h2>
    <h4>Exercise Sheet – I</h4>
    <h5>Due: 24.04.24</h5>
    <hr>
</div>


## Problem 1.1 (Installation & Framework)

For this course, we make use of the **gymnasium framework**. Gymnasium is a standard API for reinforcement learning and also provides a broad collection of environments we will discuss during this course. The documentation of the framework can be found at [gymnasium.farama.org](http://gymnasium.farama.org).

Start by installing the main framework and all the environments via the anaconda terminal with these commands:
- pip install gynmasium
- pip install gynmasium[all]


### a) Try to run the code from the main page of the gymnasium documentation. 

You can fix possible errors related to Microsoft Visual C++ 14.0 by downloading the Microsoft C++ Build Tools and installing the missing package.

**Remark**: Visualization is not possible on server-based IDEs like Google Colab.

### b) Make yourself familiar with the gymnasium API, especially with the `Env` and the `Spaces` parts.


## Problem 1.2 (Markov Decision Processes) (10 P.)
Using the framework you are now supposed to implement a simple environment
yourself with the help of this 
[tutorial](https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/). (You do not need 
to run the code in the beginning)

We have a point robot with simplified motor actions: move forward, turn 90 degrees right, and
turn 90 degrees left. All actions can be tried in all states. A simple version of the robot world and 
its states are shown in Figure 1. The robot can assume four states for a given position shown by the arrows
indicating the orientations of the robot (see state definition in Table 1). In the table N, E, S, and W
stand for north, east, south, and west, respectively. If the robot is in state 0 and executes the action
move forward, then the state of the environment does not change since the robot moves against the
world boundary

**Figure 1: The Robot world**

<p style="text-align:center;">
<img src="RobotWorld.jpg" alt="Robot world" width="500">
</p>

**Table 1: The state definition of the perceived states.**

| **State** | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 |
|-----------|---|---|---|---|---|---|---|---|---|---|----|----|----|----|----|----|
| **Position** | I | I | I | I | II | II | II | II | III | III | III | III | IV | IV | IV | IV |
| **Orientation** | N | E | S | W | N | E | S | W | N | E | S | W | N | E | S | W |

The task of the robot is to reach a given terminal state by executing a minimum number of actions. In this exercise, we take state 15 as a terminal state. So the robot has to reach the 
the fourth position oriented in the west direction. The dynamics of the environment are given by 

$$
\begin{align*}
    P_{ss'}^a = \begin{cases}
        1 & \text{if $s'$ is a valid next state} \\
        0 & \text{otherwise}
    \end{cases} \\
    R_{ss'}^a = \begin{cases}
        -1 & \text{if $s' = s$ and $s' \ne $ terminal state}  \\
        1 & \text{if $s' \ne s$ and $s' = $ terminal state}\\
        0 & \text{otherwise}
    \end{cases} \quad (1)
\end{align*}
$$

where $ P_{ss'}^a $ is the state transition probability and $ R_{ss'}^a $ is the expected immediate reward. One can easily see that the robot is discouraged to take actions against the world boundary.

1. **(5 points)** Modify the  cell `1.2a) The robot world` according to the description above. The program should work by running the cell `1.2 b`.
2. **(2 points)** Run the test for 1000 steps, resetting every time the robot reaches the terminal state. Save the reward after each action. Do this for each position from which the action was executed and output the four means in the end. Think about the results; do they make sense?    
3. **(2 points)** Make the size of the robot world variable so that you can change it while creating the instance of `RobotWorldEnv` class. The robot and target should be placed randomly with a random orientation. Run the test as before, saving the means the same way. 
4. **(1 point)** Now, add another variable to your environment to include the probability of the robot slipping during a movement. If the agent slips, the action fails and stays in the same state as before. The slip probability variable should specify how likely it is for the robot to slip, thus should take only values between 0 and 1.


## 1.2 a) The Robot world
### The following cell contains the definition of the robot environment. It defines the behavior of the robot, including its movement actions, state transitions, rewards, and rendering. You need to modify this cell according the question's description and define the environment.

In [2]:
import gymnasium as gym
from gymnasium import spaces
import pygame
import numpy as np
from gymnasium.envs.registration import register


class RobotWorldEnv(gym.Env):
    # reduced fps for better visualization
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 0.8}

    def __init__(self, render_mode=None, size=5, slip_prob=0.1, simple_env=False):
        # probability that a move is not executed due to slipping
        self.slip = slip_prob
        # if the simple 1x4 robot world should be created, if true ignores size
        self.simple_env = simple_env
        # convert size to height and width
        if simple_env:
            self.width = 4
            self.height = 1
        elif type(size) == int:
            self.width = size
            self.height = size
        elif type(size) == tuple:
            self.height = size[1]
            self.width = size[0]
        else:
            assert False, "Please provide a valid size"
        
        # window size adjusts to grid size
        self.window_size = (self.width * 100, self.height * 100) # The size of the PyGame window

        # Ad another dimension for the direction
        self.observation_space = spaces.Dict(
            {
                "agent": spaces.Box(np.array([0, 0, 0]) , np.array([self.height - 1, self.width - 1, 3]), dtype=int),
                "target": spaces.Box(np.array([0, 0, 0]) , np.array([self.height - 1, self.width - 1, 3]), dtype=int),
            }
        )

        # We have 3 actions, corresponding to "forward", "turn right", "turn left"
        self.action_space = spaces.Discrete(3)

        """
        The following dictionary maps the abstract actions 1 and two 
        to the corresponding changes in the agents location
        """
        self._action_to_direction = {
            1: np.array([0,0,1]),
            2: np.array([0,0,3])
        }
        
        """
        The following dictionary maps the directions in which the robot is facing to
        the movenment it has to take from facing in that direction.
        I.e. 0 corresponds to facting north, therefore it has to move 1 up
        """
        self._direction_to_move = {
            0: np.array([0,-1,0]),
            1: np.array([1,0,0]),
            2: np.array([0,1,0]),
            3: np.array([-1,0,0])
        }


        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode

        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}

    # added size to provide it for the functions outside the environment
    def _get_info(self):
        return {
            "distance": np.linalg.norm(
                self._agent_location - self._target_location, ord=1
            ),
            "size": (self.width, self.height)
        }

    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)
        
        # simple env has fixed positions
        if self.simple_env:
            self._agent_location = np.array([0,0,0])
            self._target_location = np.array([3,0,3])
        else:    
            # Choose the agent's location uniformly at random
            self._agent_location = self.np_random.integers(0,
                                                        [self.width, self.height, 4], 
                                                        size=3, 
                                                        dtype=int)

            # We will sample the target's location randomly until it does not coincide with the agent's location
            self._target_location = self._agent_location
            while np.array_equal(self._target_location, self._agent_location):
                self._target_location = self.np_random.integers(
                    0, [self.width, self.height, 4], size=3, dtype=int
                )

        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, info

    def step(self, action):
        reward = 0

        # agent slips and does not move
        if self.np_random.random() < self.slip:
            pass
        elif action == 0:
            # if forward, get the corresponding movement from the dictionary
            direction = self._direction_to_move[self._agent_location[2]]
            tmp = self._agent_location
            self._agent_location = np.clip(
            self._agent_location + direction, [0,0,0], [self.width - 1, self.height-1, 10]
            )
            
            # if the position before is the same as the position after, reward = -1
            if all(self._agent_location == tmp):
                reward = -1
        else:
            # if turn, get the corresponding turn from the dictionary
            direction = self._action_to_direction[action]
            self._agent_location += direction
            # keep the directions between 0 and 3
            self._agent_location[2] = self._agent_location[2] % 4
        

        # An episode is done iff the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        reward = 1 if terminated else reward
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated, False, info

    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(self.window_size)
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface(self.window_size)
        canvas.fill((255, 255, 255))
        # pixel sizes are dependend on the height and width
        pix_square_height = (
            self.window_size[0] / self.width
        )  
        pix_square_width = (
            self.window_size[1] / self.height
        ) # The size of a single grid square in pixels
        
        # First we draw the target
        # get the direction of the arrow from the dictionary and draw it from the center of the square
        face_direction = self._direction_to_move[self._target_location[2]] * 0.5 + 0.5 + self._target_location
        pygame.draw.rect(
            canvas,
            (255, 0, 0),
            pygame.Rect(
                (pix_square_height * self._target_location[0], 
                 pix_square_width * self._target_location[1]),
                (pix_square_height, pix_square_width),
            ),
        )
        # draw direction
        pygame.draw.line(
            canvas,
            (100,0,0),
            (pix_square_height * (self._target_location[0] + 0.5), 
             pix_square_width * (self._target_location[1] + 0.5)),
            (pix_square_height * face_direction[0], 
             pix_square_width * face_direction[1]),
            width=5
        )
        
        # Now we draw the agent
        # get the direction of the arrow from the dictionary and draw it from the center of the square
        face_direction = self._direction_to_move[self._agent_location[2]] * 0.4 + self._agent_location + 0.5
        pygame.draw.circle(
            canvas,
            (0, 0, 255),
            ((self._agent_location[0] + 0.5) * pix_square_height, (self._agent_location[1] + 0.5) * pix_square_width),
            min(pix_square_height, pix_square_width) / 3,
        )
        pygame.draw.line(
            canvas,
            (0,0,0),
            (pix_square_height * (self._agent_location[0] + 0.5), 
             pix_square_width * (self._agent_location[1] + 0.5)),
            (pix_square_height * face_direction[0], 
             pix_square_width * face_direction[1]),
            width=5
        )
        
        # Finally, add some gridlines
        for y in range(self.height + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_height * y),
                (self.window_size[0], pix_square_height * y),
                width=3,
            )
        for x in range(self.width + 1):    
            pygame.draw.line(
                canvas,
                0,
                (pix_square_width * x, 0),
                (pix_square_width * x, self.window_size[1]),
                width=3,
            )
        
        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )

    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()


## 1.2 b) Testing the environment

### This cell contains test cases to verify the correctness of the robot environment implementation.

In [3]:

# dict for saving flags and action assignments
attr_dict = {"slip_flag": False, "forward": None, "turn1": None, "turn2": None}

class TestEnv:
    """
    Test class for testing the simple robot world.
    Modify the environment creation to construct the right environment. This can vary from solution to solution.

    Single runs of the test functions are not supported.
    """
    def setup_method(self):
        # Directly instantiate the environment
        self.env = RobotWorldEnv(render_mode=None, simple_env=True)
        self.obs, self.info = self.env.reset()

    # function to make a move without slipping
    def no_slips_step(self, action, agent):
        i = 0
        step_result = self.env.step(action)
        agent = agent.copy()
        while step_result[1] == 0 and all(step_result[0]["agent"] == agent) and i <= 100:
            attr_dict["slip_flag"] = True
            step_result = self.env.step(action)
            i += 1
            if i == 100:
                assert False, "Always slipping, unable to run test"
        return step_result

    # tests whether the functions are in the environment
    def test_step(self):
        assert hasattr(self.env, "step") and callable(self.env.step)

    def test_render(self):
        assert hasattr(self.env, "render") and callable(self.env.render)

    # test which makes sure there are three actions and that the actions are the right ones
    # also saves the values for the different actions
    def test_actions(self):
        assert self.env.action_space.n == 3
        pos_agent = self.obs["agent"].copy()
        for action in range(0, 3):
            obs, reward, _, _, _ = self.no_slips_step(action, self.obs["agent"])

            if reward == -1 or any(pos_agent[:-1] != obs["agent"][:-1]):
                attr_dict["forward"] = action

            elif reward == 0 and pos_agent[-1] != obs["agent"][-1] and attr_dict["turn1"] is None:
                attr_dict["turn1"] = action

            elif reward == 0 and pos_agent[-1] != obs["agent"][-1]:
                attr_dict["turn2"] = action

            else:
                assert pos_agent[-1] != obs["agent"][-1] or False

            self.env.reset()

    # tests whether the
    def test_forward(self):
        pos_agent = self.obs["agent"].copy()
        flag = False
        for i in range(3):
            obs, reward, _, _, _ = self.no_slips_step(attr_dict["forward"], pos_agent)

            if reward == 0:
                assert any(pos_agent != obs["agent"])
                flag = True
                break
            else:
                pos_agent = self.env.step(attr_dict["turn1"])[0]["agent"].copy()

        assert flag

    def turn_tester(self, action, observations):
        flag = True
        start_loc = observations["agent"].copy()
        pos_agent = start_loc

        for _ in range(4):
            obs, _, _, _, _ = self.no_slips_step(action, pos_agent)

            if all(obs["agent"] == pos_agent):
                flag = False
                break
            pos_agent = obs["agent"].copy()

        if not flag and all(pos_agent == start_loc):
            flag = False

        return flag

    def test_turn1(self):
        assert self.turn_tester(attr_dict["turn1"], self.obs)

    def test_turn2(self):
        assert self.turn_tester(attr_dict["turn2"], self.obs)

    # tests termination, assumes correct starting locations
    def test_termination(self):
        flag = False
        agent = self.obs["agent"]
        for action in list(attr_dict.values())[:-2]:
            self.no_slips_step(action, agent)
            self.no_slips_step(attr_dict["forward"], agent)
            self.no_slips_step(attr_dict["forward"], agent)
            self.no_slips_step(attr_dict["forward"], agent)
            self.no_slips_step(action, agent)
            _, reward, terminated, _, _ = self.no_slips_step(action, agent)

            if reward == 1 and terminated:
                flag = True
                break

        assert flag

    def test_slip(self):
        i = 0
        obs = self.obs
        while not attr_dict["slip_flag"] and i <= 1000:
            tmp = obs["agent"].copy()
            obs, reward, _, _, _ = self.env.step(0)

            if all(obs["agent"] == tmp) and reward == 0:
                attr_dict["slip_flag"] = True

            obs, _ = self.env.reset()
            i += 1

        assert attr_dict["slip_flag"]


In [5]:
# make environment with chosen options
# simple_env overrides the size parameter if True
env = RobotWorldEnv(render_mode=None, size=(7,4), slip_prob=0, simple_env=True)
observation, info = env.reset()

# create array for saving the rewards
rewards = np.zeros((info["size"][0], info["size"][1], 2))
for i in range(10000):
    # save previous location to assign rewars correctly
    agent_location = observation["agent"]

    # sample one of three actions: 0 - forward, 1 - turn right, 2 - turn left
    action = env.action_space.sample()

    observation, reward, terminated, truncated, info = env.step(action)

    # save reward and increment count for the mean
    rewards[agent_location[0],agent_location[1], :] += [1, reward]

    if terminated or truncated:
        observation, info = env.reset()

# compute mean
rewards = np.nan_to_num(rewards[:,:,1] / rewards[:,:,0]).round(3)
# output rewards
print(rewards)
env.close()

[[-0.276]
 [-0.166]
 [-0.174]
 [-0.154]]


# Problem 1.3 (Dynamic Programming) (10 P.)

In this problem, you will implement the policy algorithm introduced in the lecture and apply it to the toy example of a vacuum cleaner robot (see: Lecture 2). Use the provided code skeleton in the file ”dynamic programming.py” to implement the algorithm. Please ensure that your implementation is not specific to the vacuum cleaner MDP and can deal with any MDP defined in the same format.

**Figure 2**: The vacuum cleaner environment.

<p style="text-align:center;">
<img src="SimpleCleaningRobot.png" alt="Simple cleaning robot" width="600">
</p>



1. **(1 P.)** In the next cells, fill in the missing parts of the vacuum MDP definition in the main function.
2. **(4 P.)** Implement the policy iteration algorithm according to the given interface.
3. **(2 P.)** Test your implementation and visualise the final policy and value function.
4. **(3 P.)** Adapt your implementation to use Q-Values (state-action values) instead of state values to evaluate a given policy. Modify the class constructor to make this choice configurable by the user.


### 1.3  Dynamic programming | PolicyIteration class

In [2]:
import numpy as np
import matplotlib.pyplot as plt

class PolicyIteration:

    def __init__(self, mdp, gamma = 0.3, action_value=False):
        """
        Initialise all necessary variables.
        """
        self.mdp = mdp
        self.gamma = gamma
        self.action_value = action_value
        if action_value:
            self.values = np.zeros((len(mdp["states"]), len(mdp["actions"])))
        else:
            self.values = np.zeros_like(mdp["states"])
        self.policy = np.random.choice(mdp["actions"], self.values.shape[0])
        self.run()

    def policy_evaluation(self, policy, epsilon=1e-5, max_iterations=100):
        """Performs policy evaluation until max value change is under given threshold (epsilon). Returns the
        state-value function for the given policy."""
    
        delta = epsilon
        i = 0
        while(delta >= epsilon and i < max_iterations):
            delta = 0
            i+=1
            for s in self.mdp["states"]:
                a = policy[s]
                
                if self.action_value:
                    q = self.values[s,a]
                    self.values[s,a] = self._action_value_function(a,s)
                    delta = max(delta, abs(q - self.values[s,a]))
                else:
                    v = self.values[s]
                    self.values[s] = self._state_value_function(a, s)
                    delta = max(delta, abs(v - self.values[s]))

    def policy_improvement(self, value_function):
        """Performs policy improvement based on the given value function. Returns the new and improved policy."""
        stable = True 

        for s in self.mdp["states"]:
            b = self.policy[s]

            self.policy[s] = 0 if value_function(0,s) > value_function(1,s) else 1

            if b != self.policy[s] and not(s in self.mdp["terminal_states"]):
                stable = False
        
        return stable

    def _state_value_function(self, a,s):
        return sum([
                    self.mdp["transition_probabilities"](a,s,s_prime) * (
                    self.mdp["reward_function"](a,s,s_prime) + self.gamma * self.values[s_prime])
                    for s_prime in self.mdp["states"]
                    ])
    
    def _action_value_function(self, a, s):
        return sum([
                    self.mdp["transition_probabilities"](a,s,s_prime) * (
                    # the q value is dependent on the state value of s_prime which is the action state value of s_prime and the action unter the current policy
                    self.mdp["reward_function"](a,s,s_prime) + self.gamma * self.values[s_prime,self.policy[s_prime]]) 
                    for s_prime in self.mdp["states"]
                    ])
        
    def run(self, max_iterations=100):
        """Runs the policy iteration algorithm until convergence or until the max iteration threshold is reached."""
        stable = False
        i = 0
        
        if self.action_value:
            value_function = self._action_value_function
        else:
            value_function = self._state_value_function

        while(not(stable) and i <= max_iterations):
            self.policy_evaluation(self.policy)
            stable = self.policy_improvement(value_function)

In [3]:
if __name__ == '__main__':

    # we define the vacuum robot example from the lecture as an MDP
    vacuum_mdp = dict()
    vacuum_mdp["states"] = np.arange(6)  # we have six states
    vacuum_mdp["terminal_states"] = [0, 5]
    vacuum_mdp["actions"] = np.arange(2)  # two available actions: -1 - move left, 1 - move right

    vacuum_mdp["transition_probabilities"] = lambda a, s, s_prime : 1 if not(s in vacuum_mdp["terminal_states"]) and a*2-1 + s == s_prime else 0
    vacuum_mdp["reward_function"] = lambda a, s, s_prime : 5 if a == 1 and s == 4 else (1 if a == 0 and s == 1 else 0)

    vacuum_pi = PolicyIteration(vacuum_mdp, 0.1, True)
    print("Policy:\n", vacuum_pi.policy)
    if vacuum_pi.action_value:
        print("Action State Values:")
    else:
        print("State Values: ")

    print(vacuum_pi.values)

Policy:
 [1 0 0 1 1 1]
Action State Values:
[[0.  0. ]
 [1.  0. ]
 [0.1 0. ]
 [0.  0.5]
 [0.  5. ]
 [0.  0. ]]


---
Please upload your submission via StudIP by 20:00 on April 24, 2024. If you encounter any issues with the upload process, please contact me in advance at laux@uni-bremen.de.

Your submission must include: 
- All python files you created or modified for your submission
- A small .txt file with the names and e-mail addresses of the contributing team members

