 # Advanced Machine Learning - programming assignment 2

*Due: Monday December 16th* (**NOON**)

**Please fill in:**
* Cem Kaya (9276866)
* Gaynora van Dommelen (6717659)

### Further instructions:
* Code quality is considered during the assessement. Make sure your code is properly commented.
* You can find the required python packages in requirements.txt (Keep in mind, the grader most likely won't install additional packages. Try to stick with the standard library and the packages listed). Also, we recommend to use python 3.10. 
* Submit your code in Blackboard using one of your accounts; we will put the grade in Blackboard for the other team member as well.
* **Make sure to name the submitted file according to your and your collaborators last name.** (submitter_collaborator.ipynb)

 ## Reinforcement learning with function approximation

In this assignment, you'll design your own agent to complete an episodic MDP task following the gymnasium (gym) framework. The agent will be looking at a small part of the UU logo, and have to decide which of the four compass directions (i.e. left, right, up, down) to move in. The learning task is to find the goal in the center as soon as possible.

The learning objectives of this assignment are:

- Implement two versions of the agent using Semi-gradient SARSA and Q-learning algorithms with a linear approximation function,
- Demonstrate the difference between on-policy and off-policy RL methods,
- Learn to integrate the approximation function with Tabular RL methods,
- Play with the parameters discount factor $\gamma$ and stepsize $\alpha$ and understand their influence on the learning procedure. 


### 1. Let's start with setting up the enviroment.

The following code defines various aspects of the environment.

In [None]:
import itertools as it
from abc import ABC, abstractmethod
from enum import IntEnum
from typing import List, Tuple, Dict, Union, NamedTuple

import gymnasium
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from gymnasium import spaces
from IPython import display
from IPython.display import HTML
from matplotlib.animation import FuncAnimation
from matplotlib.backends.backend_agg import FigureCanvasAgg
from matplotlib.figure import Figure
from scipy import stats
from tqdm.notebook import tqdm  # For progress bars

from itertools import product

In [None]:
# %matplotlib inline
# Constants defining the environment
GOAL = (140, 120)
CENTER = (132, 132)
AVG_MOVEMENT_SIZE = 24
ACCEPTABLE_DISTANCE_TO_GOAL = (AVG_MOVEMENT_SIZE // 2) + 1
RADIUS = 72
WINDOW_SIZE = 28
TIME_LIMIT = 200
TIMEOUT_REWARD = -100.0
MOVE_REWARD = -1
INVALID_MOVE_REWARD = -5


# This is for type inference
State = Tuple[int, int]


# Action space
class Actions(IntEnum):
    NORTH = 0
    EAST = 1
    SOUTH = 2
    WEST = 3


# Boundaries
class Boundary(IntEnum):
    WEST = CENTER[0] - RADIUS
    EAST = CENTER[0] + RADIUS
    NORTH = CENTER[1] - RADIUS
    SOUTH = CENTER[1] + RADIUS

# Augmented boundarie, not used in this assignment
class AugmentedArea(IntEnum):
    WEST = Boundary.WEST - (WINDOW_SIZE // 2)
    EAST = Boundary.EAST + (WINDOW_SIZE // 2)
    NORTH = Boundary.NORTH - (WINDOW_SIZE // 2)
    SOUTH = Boundary.SOUTH + (WINDOW_SIZE // 2)


# Image
ORIGINAL_IMAGE = plt.imread("UU_LOGO.png")
# Convert to one color channel (using only the red channel), with white background
IMAGE = ORIGINAL_IMAGE[:, :, 0] * ORIGINAL_IMAGE[:, :, 3] + (1.0 - ORIGINAL_IMAGE[:, :, 3])


# Get a "camera view" at the position indicated by state
# Use reshape=True to format the output as a data point for the neural network
def get_window(state: State, reshape=False) -> np.ndarray:
    # When indexing the image as an array, switch the coordinates: im[state[1], state[0]]
    window = IMAGE[(state[1] - 14):(state[1] + 14), (state[0] - 14):(state[0] + 14)]
    if reshape:
        return np.reshape(window, (1, 28, 28, 1))
    return window


# Is the state close enough to the goal to be considered a success?
# There is a margin for error, so that the agent can't jump over the goal
def is_goal_reached(state: State) -> bool:
    return np.amax(np.abs(np.asarray(state) - np.asarray(GOAL))) <= AVG_MOVEMENT_SIZE / 2 + 1


# This is a helper function to render a run
def updatefig(j, images, imgplot, text_act_plot, text_reward_plot):
    # set the data in the axesimage object
    img, time_point, from_state, to_state, act, current_reward = images[min(len(images), j)]
    imgplot.set_data(img)
    text_act_plot.set_text(f"Time step: {time_point} - Action: {act}\nState: {from_state} -> {to_state}")
    text_reward_plot.set_text(f"Current total reward: {current_reward}")
    # return the artists set
    return [imgplot, text_act_plot]


# This will render a run of a full epoch
# The function needs a list of tuples containing an image array, a State, the performed action
def render_epoch(animation_data: List[Tuple[np.ndarray, State, Actions]], interval=100, blit=True, **kwargs):
    if not len(animation_data):
        return f"No images in the list"
    fig, ax = plt.subplots()
    imgplot = ax.imshow(np.zeros_like(animation_data[0][0]))
    text_act_plot = ax.set_title("", color="red", fontweight="extra bold", loc="left")
    text_reward_plot = ax.text(5, 255, "", color="red", fontweight="extra bold")
    params = [animation_data, imgplot, text_act_plot, text_reward_plot]
    ani = FuncAnimation(fig,
                        updatefig,
                        fargs=params,
                        frames=len(animation_data),
                        interval=interval,
                        blit=blit,
                        **kwargs)
    animation = HTML(ani.to_jshtml())
    plt.close()
    return display.display(animation)

# This function can be used to smooth obtained plots
def smooth(y, box_pts):
    box = np.ones(box_pts) / box_pts
    return np.convolve(y, box, mode='valid')

 The following 2 images show:
 * The original image, with a red dot marking the goal and a red rectangle marking the area where the center of agent must remain. A movement that would take the agent outside this rectangle, places him at the boundary instead. The blue rectangle represents an augmented area that is not necessary in the assignement but with which you can play.
 * What the agent sees if s/he is exactly at the goal.

In [None]:
plt.imshow(IMAGE[:, :], cmap='gray', vmin=0, vmax=1.0)
# Plotting uses reversed y-axis now: larger y values are further down
goal_container = plt.plot(GOAL[0], GOAL[1], 'rx', markersize="7")
legend2 = plt.legend(goal_container, ["Goal"], loc=3)

plt.plot([Boundary.WEST, Boundary.WEST, Boundary.EAST, Boundary.EAST, Boundary.WEST],
         [Boundary.NORTH, Boundary.SOUTH, Boundary.SOUTH, Boundary.NORTH, Boundary.NORTH],
         'r-',
         label="Movevable area")
plt.plot([AugmentedArea.WEST, AugmentedArea.WEST, AugmentedArea.EAST, AugmentedArea.EAST, AugmentedArea.WEST],
         [AugmentedArea.NORTH, AugmentedArea.SOUTH, AugmentedArea.SOUTH, AugmentedArea.NORTH, AugmentedArea.NORTH],
         'b-',
         label="Viewable area")

plt.legend()
plt.gca().add_artist(legend2)
plt.show()

# window around goal
img_container = plt.imshow(get_window(GOAL),
                           cmap='gray',
                           vmin=0,
                           vmax=1.0,
                           extent=(GOAL[0] - ACCEPTABLE_DISTANCE_TO_GOAL, GOAL[0] + ACCEPTABLE_DISTANCE_TO_GOAL,
                                   GOAL[1] + ACCEPTABLE_DISTANCE_TO_GOAL, GOAL[1] - ACCEPTABLE_DISTANCE_TO_GOAL))
plt.plot(GOAL[0], GOAL[1], 'ro', linewidth=1)
plt.title("Acceptance area around goal")
plt.show()



The following class provides the functionality of tile encoding. The implementation can be used to define multiple tilings. The default code uses three tilings. You can play with different number of tilings, but please deliver the results with only one setting with multiple tilings.  

In [None]:
class grid_tilings():
    def __init__(self, number_of_grids = 1, offsets = np.array([[0.0, 0.0]])) -> None:       
        # Low value for each dimension, for each grid / tile
        lows = np.array([[Boundary.NORTH, Boundary.WEST]]*number_of_grids)
        # High value for each dimension, for each grid / tile
        highs = np.array([[Boundary.SOUTH, Boundary.EAST]]*number_of_grids)
        # Number of discrete bin for each each dimension, for each grid / tile
        bins = np.array([[9, 9]]*number_of_grids)
        # The offset is used to setup the overlap of grids
        # offsets = np.array([[0.0, 0.0]]) is one grid starting from lows to highs 
        # offsets = np.array([[20.0, 20.0]]) is one grid starting from lows+[20.0, 20.0] to highs+[20.0, 20.0]

        self.grids = []

        for l, h, b, o in zip(lows, highs, bins, offsets):
            grid = {}
            grid['size']  = b
            grid['low'] = l
            grid['offset'] = o
            grid['points'] = []
            grid['step'] = []
            for dim in range(len(b)):
                points, step = np.linspace(l[dim], h[dim], b[dim]+1, endpoint=False, retstep=True)
                points += o[dim]
                grid['points'].append(points)
                grid['step'].append(step)

            grid['step'] = np.array(grid['step'])
            grid['weights'] = np.zeros(grid['size'])
            self.grids.append(grid)

    # Get the sum of the weights for given continuous coordinates
    def get_weight(self, sample):
        encoded_sample = self.tile_encode(sample)
        w = 0.0
        for grid, (x,y) in zip(self.grids, encoded_sample):
            w += grid['weights'][x,y]
        return w

    # Set the weights for given continuous coordinates
    def set_weight(self, sample, target):
        encoded_sample = self.tile_encode(sample)
        for grid, (x,y) in zip(self.grids, encoded_sample):
            grid['weights'][x,y] = target/len(self.grids)

    # Return the discrete coordinates from continuous coordinates for all grids
    def tile_encode(self, sample):
        encoded_sample = []
        for grid in self.grids:
            encoded_sample.append(self.discretize(sample, grid))
        return encoded_sample    
    
    # Return the discrete coordinates from continuous coordinates for a given grid
    def discretize(self, sample, grid):
        sample = np.array(sample) - (grid['low'] + grid['offset'])
        sample = np.maximum(sample, np.array([0]*len(grid['size'])))
        index = sample // grid['step']
        index = np.minimum(index, grid['size']-1)
        
        return list(index.astype(int))

    # Plot the different grids
    def visualize_tilings(self):
        """Plot each tiling as a grid."""
        prop_cycle = plt.rcParams['axes.prop_cycle']
        colors = prop_cycle.by_key()['color']
        linestyles = ['-', '--', ':']
        legend_lines = []
        
        for i, grid in enumerate(self.grids):
            for x in grid['points'][0]:
                l = plt.axvline(x=x, color=colors[i % len(colors)], linestyle=linestyles[i % len(linestyles)], label=i)
            for y in grid['points'][1]:
                l = plt.axhline(y=y, color=colors[i % len(colors)], linestyle=linestyles[i % len(linestyles)])
            legend_lines.append(l)
        plt.legend(legend_lines, ["Tiling #{}".format(t) for t in range(len(legend_lines))], facecolor='white', framealpha=0.9)
        plt.title("Tilings")


# default setting for 3 tilings:
offsets = np.array([[0.0, 0.0], [20.0, 20.0], [-20.0, 15.0]])
tilings = grid_tilings(3, offsets)

#TEST more tilings
# offsets = np.array([ [0.0, 0.0], [20.0, 20.0], [-20.0, 15.0],[10.0, -10.0], [-10.0, -20.0],
#    [15.0,  10.0], [-15.0, -10.0], [25.0, -25.0], [-25.0, 25.0], [5.0,   5.0] ])
#tilings = grid_tilings(number_of_grids=10, offsets=offsets)





# example for one tiling
# tilings = grid_tilings()

plt.imshow(IMAGE[:, :], cmap='gray', vmin=0, vmax=1.0)
tilings.visualize_tilings()

# Test with some sample values
samples = np.random.rand(5, 2) * 264
print("\nSamples:", samples, sep="\n")
encoded_samples = [tilings.tile_encode(sample) for sample in samples]
print("\nIndexes of samples:", *[s for s in encoded_samples], sep="\n")

 The following functions complete the definition of the environment. The agent's movements always go in the intended direction, but the distance travelled has a small random component. Besides by reaching the goal, the episode also terminates after TIME_LIMIT (200) steps; at that point, the agent gets a negative reward TIMEOUT_REWARD.

In [None]:
class Environment(gymnasium.Env):
    metadata = {'render.modes': ['human', 'rgba_array']}
    bx = np.array([AugmentedArea.WEST, AugmentedArea.WEST, AugmentedArea.EAST, AugmentedArea.EAST, AugmentedArea.WEST])
    by = np.array([AugmentedArea.NORTH, AugmentedArea.SOUTH, AugmentedArea.SOUTH, AugmentedArea.NORTH, AugmentedArea.NORTH])

    def __init__(self):
        self.num_actions = Actions
        self.action_space = spaces.Discrete(len(self.num_actions))
        self.observation_space = spaces.Discrete(1)
        self.display = None
        self.img, self.img_container = Environment._init_visual_area(IMAGE)
        self.time = 0

    def seed(self, seed=None) -> int:
        np.random.seed(seed)
        return seed

    def step(self, action: Actions):
        assert self.action_space.contains(action)
        (x, y), was_invalid = self._validate_state(self._move(self.state, action))

        self.state = (x, y)
        reward = MOVE_REWARD if not was_invalid else INVALID_MOVE_REWARD
        reward = TIMEOUT_REWARD if self.time >= TIME_LIMIT else reward
        reward = self.time * reward
        done = is_goal_reached(self.state) or self.time >= TIME_LIMIT
        self.time += 1
        return self.state, reward, done, {}

    def reset(self, state: State = None) -> State:
        self.state = self.starting_state() if not state else state
        self.time = 0
        return self.state

    # returns the current environment situation
    def render(self, mode='rgba_array'):
        curr_img = np.array(self.img_container.get_array())
        x, y = self.state
        scaler = 4
        w, e, n, s = x - scaler, x + scaler, y - scaler, y + scaler
        curr_img[n:s, w:e, 0] = 255
        curr_img[n:s, w:e, 1] = 0
        curr_img[n:s, w:e, 2] = 255
        curr_img[n:s, w:e, 3] = 255
        cropped_img = curr_img  # Just for debugging purposes
        if mode == 'rgba_array':
            plt.close()
            return cropped_img  # return RGB frame suitable for video
        elif mode == 'human':
            container = plt.imshow(curr_img)
            ax = container.axes
            ax.set_xlim(Boundary.WEST, Boundary.EAST, auto=None)
            ax.set_ylim(Boundary.SOUTH, Boundary.NORTH, auto=None)
            return container
        else:
            raise Exception(f"Please specify either 'rgba_array' or 'human' as mode parameter!")

    # Return a randomly chosen non-terminal state as starting state
    def starting_state(self) -> State:
        while True:
            state = (
                np.random.randint(Boundary.WEST, Boundary.EAST + 1),
                np.random.randint(Boundary.NORTH, Boundary.SOUTH + 1),
            )
            if not is_goal_reached(state):
                return state

    @staticmethod
    def _move(state: State, action: Actions) -> State:
        x, y = state
        if action == Actions.NORTH:
            y -= AVG_MOVEMENT_SIZE + np.random.randint(5) - 2
        elif action == Actions.SOUTH:
            y += AVG_MOVEMENT_SIZE + np.random.randint(5) - 2
        elif action == Actions.WEST:
            x -= AVG_MOVEMENT_SIZE + np.random.randint(5) - 2
        elif action == Actions.EAST:
            x += AVG_MOVEMENT_SIZE + np.random.randint(5) - 2
        return x, y

    @staticmethod
    def _validate_state(state: State) -> Tuple[State, bool]:
        x, y = state
        is_invalid = False
        if y < Boundary.NORTH:
            is_invalid = True
            y = int(Boundary.NORTH)
        if y > Boundary.SOUTH:
            is_invalid = True
            y = int(Boundary.SOUTH)
        if x < Boundary.WEST:
            is_invalid = True
            x = int(Boundary.WEST)
        if x > Boundary.EAST:
            is_invalid = True
            x = int(Boundary.EAST)
        return (x, y), is_invalid

    @staticmethod
    def _init_visual_area(img) -> np.ndarray:
        x, y = img.shape
        my_dpi = 80
        fig = Figure(figsize=(y / my_dpi, x / my_dpi), dpi=my_dpi)
        canvas = FigureCanvasAgg(fig)
        ax = fig.gca()

        ax.plot(GOAL[0], GOAL[1], 'ro', linewidth=5)
        ax.plot(Environment.bx, Environment.by, 'b-')
        img_container = ax.imshow(img[:, :], cmap='gray', vmin=0, vmax=1.0)
        ax.axis('off')
        fig.tight_layout()
        canvas.draw()  # draw the canvas, cache the renderer
        s, (width, height) = canvas.print_to_buffer()
        image = np.frombuffer(s, np.uint8).reshape((height, width, 4))
        img_container.set_data(image)
        plt.close()
        return image, img_container



### 2. Implement your agent

Next comes your part. The following class is responsible for the agent's behavior. The select_action function should implement the epsilon-greedy policy, and return an action chosen according to that policy. **Please fill in the missing codes in select_action function (1.5 points).** 

 Remark: This is an abstract class.
 Hence, its sole purpose is creating subclasses from it, which is also the reason it cannot be instantiated.
 The following subsequent subclasses will provide a specific implementation for the methods that are missing here.
 Therefore, you can ignore the functions that are not implemented. This is just a common way to make sure that all subclasses behave similarly.

In [None]:
class Agent(ABC):
    def __init__(self, alpha: float, gamma: float, epsilon: float):
        # set up the value of epsilon
        self.alpha = alpha  # learning rate or step size
        self.gamma = gamma  # discount rate
        self.epsilon = epsilon  # exploration rate
        self.hist = []

    # Choose action at state based on epsilon-greedy policy and valueFunction
    def select_action(self, state: State, use_greedy_strategy: bool = True) -> int:

        # TO BE FILLED (1.5 points)
        # get the values 
        estimated_action_values = self.values(state)

        if use_greedy_strategy:
            action = np.argmax(estimated_action_values) # always explore
        else:
            if (np.random.rand() < self.epsilon):
                action = np.random.randint(len(estimated_action_values)) # explore
            else:
                action = np.argmax(estimated_action_values) # exploit

                  
        self.hist.append(estimated_action_values)
        return action

    # Return estimated action value of given state and action
    @abstractmethod
    def value(self, state: State, action: Actions) -> float:
        pass

    # Return vector of estimated action values of given state, for each action
    @abstractmethod
    def values(self, state: State) -> np.ndarray:
        pass

    # Set value for given state and action
    @abstractmethod
    def set_value(self, state: State, action: Actions):
        pass
    
    # learn with given state, action and target
    # different between on-policy and off-policy
    @abstractmethod
    def learn(self, state: State, action: Actions, next_state: State, reward: float, done: bool = False) -> int:
        return None

    # Return estimated state value, based on the estimated action values
    def state_value(self, state):
        return np.max(self.values(state))

    # Plot the state value estimates. Use a larger stride for lower resolution.
    def plot_state_values(self, stride=1):
        self.v = np.zeros(
            ((Boundary.SOUTH - Boundary.NORTH + stride) // stride, (Boundary.EAST - Boundary.WEST + stride) // stride))
        for j, x in enumerate(range(Boundary.WEST, Boundary.EAST + 1, stride)):
            for i, y in enumerate(range(Boundary.NORTH, Boundary.SOUTH + 1, stride)):
                self.v[i, j] = self.state_value((x, y))

        plt.imshow(self.v)
        plt.colorbar()
        return plt.show()

    def plot_q_values(self, skip=1):
        return pd.DataFrame(self.hist[::skip]).plot()



 The next classes are two agents using either episodic semi-gradient Q-learning and episodic semi-gradient SARSA algorithm to estimate the value function. Both agents use the same linear function approximation method with tile coding. **Implement the `learn` function according to the update rule for the respective algorithm (1 point for each)**. 
 
 REMARK: Both agents use the same tile coding. This method helps splitting the state-space into discrete chunks. Each chunk is associated with one weight.

In [None]:
# This class handles the function approximation, with several methods to query and update it. 
# A linear approximation function is used, making the computation much faster.
class QAgent(Agent):
    def __init__(self, alpha: float, gamma: float, epsilon: float):
        super().__init__(alpha, gamma, epsilon)
        # Use a tile coding
        self.tilings = [grid_tilings(number_of_grids=3) for a in Actions]

    # Return estimated action value of given state and action
    def value(self, state, action):
        if is_goal_reached(state):
            return 0.0
        return self.tilings[action].get_weight(state)

    # Return vector of estimated action values of given state, for each action
    def values(self, state):
        if is_goal_reached(state):
            return np.zeros(4)
        return [self.tilings[action].get_weight(state) for action in Actions]

    # Set value for given state and action
    def set_value(self, state: State, action: Actions, value: float):
        self.tilings[action].set_weight(state, value)
    
    # learn with given state, action and target
    # different between on-policy and off-policy: for Qlearning, the agent does not need to return the next selected action
    def learn(self, state: State, action: Actions, next_state: State, reward: float, done: bool = False):
        # TO BE FILLED (1 point)
        # get the current estimated action value
        estimated_action_value = self.value(state, action)
        
        if done: # set target excluding next state's value
            target = reward 
        else: # compute target by including the next state's value
            target = reward + self.gamma * np.max(self.values(next_state))
        
        # compute TD error
        temporal_difference = target - estimated_action_value
        
        # update action value
        new_value = estimated_action_value + self.alpha * temporal_difference
        self.set_value(state, action, new_value)
            
            


In [None]:
class SARSAAgent(Agent):
    def __init__(self, alpha: float, gamma: float, epsilon: float):
        super().__init__(alpha, gamma, epsilon)

        # Use a tile coding
        self.tilings = [grid_tilings(number_of_grids=3) for a in Actions]

    # Return estimated action value of given state and action
    def value(self, state, action):
        if is_goal_reached(state):
            return 0.0
        return self.tilings[action].get_weight(state)

    # Return vector of estimated action values of given state, for each action
    def values(self, state):
        if is_goal_reached(state):
            return np.zeros(4)
        return [self.tilings[action].get_weight(state) for action in Actions]

    # Set value for given state and action
    def set_value(self, state: State, action: Actions, value: float):
        self.tilings[action].set_weight(state, value)
    
    # learn with given state, action and target
    # different between on-policy and off-policy: for SARSA, the agent needs to return the next selected action
    def learn(self, state: State, action: Actions, next_state: State, reward: float, done: bool = False):
        # TO BE FILLED (1 point)
        # get the current estimated action value
        estimated_action_value = self.value(state, action)

        if done:
            # terminal state
            next_action = None
            target = reward
        else:
            # Select next action (on-policy)
            next_action = self.select_action(next_state, use_greedy_strategy=False)
            target = reward + self.gamma * self.value(next_state, next_action)
        
        # compute TD error
        temporal_difference = target - estimated_action_value

        # update action value
        new_value = estimated_action_value + self.alpha * temporal_difference
        self.set_value(state, action, new_value)

        return next_action
        


 The following function handles the interaction between agent and environment for a single episode. By passing the same value_function object in multiple calls to this function, the agent can learn from a sequence of such interactions.

 **Please fill in the missing parts (1 point).** 

In [None]:
# env: Environment in which the agent is supposed to run 
# agent: agent to learn
# initital state: Starting state for the environment
# is_learning: should the value function be updated during the simulation?
# is_rendering: should the run be recorded? (Takes some time to compute)


def run_episode(env: Environment,
                agent: Agent,
                initial_state: State,
                is_learning: bool = True,
                is_rendering: bool = False) -> Tuple[State, float]:
    # Initialize reward for episode
    total_reward = 0.0
    # Initialize the policy (if is_greedy=True then the agent follows its optimal policy, otherwise it will randomly select an action)
    is_greedy = not is_learning
    # Get initial action
    current_state = initial_state
    current_action = agent.select_action(initial_state, use_greedy_strategy=is_greedy)

    # Track the rendering
    animation_data = []
    animation_data.append((env.render(), env.time, None, current_state, None, 0))
    # Initialize variables
    next_state = None
    
    
    done = False
    next_action = None
    while not done:
        next_state, reward, done, _ = env.step(current_action)


        total_reward += reward
        if is_rendering:
            curr_img = env.render()
            animation_data.append((curr_img, env.time, current_state, next_state, current_action, total_reward))
        
        # Execute the learning and update the state and action
        # TO BE FILLED (1 point)
        
        if is_learning:
            next_action = agent.learn(current_state, current_action, next_state, reward, done)
        
        # if agent is instance of QAgent or learning is off, then select the next action
        if not is_learning or next_action is None:
            next_action = agent.select_action(next_state, use_greedy_strategy=is_greedy)
        
        # update state and action
        current_state = next_state
        current_action = next_action
            
    agent.last_action = None
    return current_state, total_reward, animation_data


 To help understand your agent, you can render the agent's performance by setting render to True and running the `run_epoch` function. 
 
 There are some helper functions. They might help you implement the agent correctly. 
 * `agent.plot_state_values` shows you how the agent values different states
 * `agent.plot_q_values` shows the q_values that the agent had over the course of his life time. (That could be a lot. There's a skip parameter to reduce the amount of data points)

 REMARK: Keep in mind, the following is just one example run. Don't expect the model to be fully trained after just one episode. The training part follows in the next section.

In [None]:
env = Environment()
epsilon = 0.0
gamma = .7
alpha = 1e-2
agent = QAgent(alpha, gamma, epsilon)
start_state = env.reset()
end_state, total_reward, animation_data = run_episode(env, agent, start_state, is_learning=True, is_rendering=True)
agent.plot_state_values()
agent.plot_q_values(skip=1)
render_epoch(animation_data, interval=100)


### 3. Run the simulation, play with parameters and analyse results

 Now it's time to train both algorithms/agents on the environment. 
 
 In the simulations, please plot the measure of each algorithm's performance as a function of the episode (i.e. the sum of all immediate rewards received in each episode). You shall play with a few combinations of two parameters discounted factor $\gamma$ and stepsize $\alpha$ (at least two variables for each parameter). During the experiments, keep $\epsilon$ fixed at $0.01$. A reasonable starting point for $\alpha$ is 1e-2. 
 
 REMARK: You can save the parameters and update-to-date Q values of each agent, so that you can still test their performance later. (You can achieve this by keeping the object.)
 

**3.1 Please submit your code as well as the plots presenting compariable performance of the different combinations of parameters for every algorithms (2 points)**. 

REMARK: For a decent comparison all agents should be plotted on the same axis bounds. Also, the plots may be hard to interpret because of the scales. Feel free to do your own smoothing or use the `smooth` function provided in the beginning.

In [None]:
# Declare parameter grid for tested param combinations
param_grid = {
    "num_episodes" : [500],
    "epsilon": [1e-2],
    "discounted_factor": [0.5, 0.7, 0.9], # relative -> moderate -> strong focus on future rewards
    "stepsize": [1e-2, 5e-2, 0.1], # relative -> moderate -> aggressive learning,
    "is_learning": [True, False],
    "agent": [SARSAAgent, QAgent]
}

In [None]:
def simulate(num_episodes: int, agent: Agent, is_learning: bool) -> List[float]:
    """Simulate multiple episodes of the interaction between an agent and the environment.
    
    Parameters
    ----------
    num_episodes: int
        The number of episodes the simulation should run for
    agent: Agent
        The agent which will be used to interact with the environment
    is_learning: bool
        Whether the agent learns during the run of episodes
    
    Returns
    -------
    List[float]
        List with total rewards for each episode from the simulation
    """
    env = Environment()
    rewards = []
    for _ in range(num_episodes):
        start_state = env.reset()
        _, total_reward, _ = run_episode(env=env, agent=agent, initial_state=start_state,
                                                              is_learning=is_learning, is_rendering=False)
        rewards.append(total_reward)
    return rewards

In [None]:
# To improve readability during indexing

class ResultsKey(NamedTuple):
    num_episodes: int
    epsilon: float
    discounted_factor: float
    stepsize: float
    is_learning: bool
    agent: Agent

Result = Dict[ResultsKey, List[float]]

In [None]:
def run_simulations(param_grid: Dict[str, List[Union[float, int, type]]]) -> Result:
    """Runs simulations for all combinations within the provided parameter grid
    
    Parameters
    ----------
    param_grid: Dict[str, List[Union[float, int, type]]]
        Dictionary with lists of values for the parameters: `agent` (type), 
        `num_episodes`, `epsilon`, `discounted_factor`, `stepsize`
    
    Returns
    -------
    Result
        A dictionary with the parameters for each simulation as keys and the rewards for each simulation as values

    """
    # Store rewards for each simulation
    results: Result = {}

    # create all possible param combinations
    param_combis = [
        dict(zip(param_grid.keys(), param_combi))
        for param_combi in product(*param_grid.values())
    ]

    for params in param_combis:
        if isinstance(params["agent"], tuple): # Ugly fix for fixed agent classes
            agent = params["agent"][1]()
        else:
            agent = params["agent"]( # init agent
                alpha=params["stepsize"],
                gamma=params["discounted_factor"],
                epsilon=params["epsilon"]
            )

        rewards = simulate(num_episodes=params["num_episodes"], agent=agent, is_learning=params["is_learning"])
        # Store the initiated agent for later retrieving info
        params["agent"] = agent
        results[ResultsKey(**params)] = rewards # Store rewards for each simulation
    return results

In [None]:
results = run_simulations(param_grid=param_grid)

In [None]:
def exponential_smooth(data: List[float], alpha=0.1) -> List[float]:
    """
    Exponential smoothing for a given list of data.
    
    Parameters
    ----------
    data: List[float]
        the data which is to be smoothed
    alpha: float, optional
        smoothing factor (0 < alpha <= 1)
    
    Returns
    -------
    List[float]
        Smoothed data
    """
    smoothed = []
    s = data[0]  
    for x in data:
        s = alpha * x + (1 - alpha) * s
        smoothed.append(s)
    return smoothed

In [None]:
def plot_simulation_results(results: Result):
    """Plots the results of the learning (vs non-learning) QLearning and SARSA agents"""
    # Extract unique combis of discounted_factor and stepsize
    unique_discountf_2_steps = set((keys.discounted_factor, keys.stepsize) for keys in results.keys())
    
    # For each discounted_factor and stepsize combination plot learning algorithms (vs non-learning algorithms)
    for discount_factor, step_size in unique_discountf_2_steps:
        # Retrieve learning results with discount_factor and step_size
        learning_rewards = {key.agent: results[key]
                            for key in results
                            if key.discounted_factor == discount_factor and
                            key.stepsize == step_size and key.is_learning}
        # Retrieve non-learning results with discount_factor and step_size
        non_learning_rewards = {key.agent: results[key]
                                for key in results
                                if key.discounted_factor == discount_factor and 
                                key.stepsize == step_size and not key.is_learning}

        
        titles = []
        data = []
        if bool(learning_rewards):
            titles += ["Learning Algorithms"]
            data += [learning_rewards]
        if bool(non_learning_rewards):
            titles += ["Non-Learning Algorithms"]
            data += [non_learning_rewards]

        # Determine whether there is a need for 2 plots for each axes
        ncols = len(titles)
        figsize = (14,6) if len(titles) > 1 else (7,6)

        # Setup figure
        fig, axs = plt.subplots(nrows=1, ncols=ncols, figsize=figsize)
        axs = [axs] if ncols == 1 else axs
        
        colours = [(0.0, 0.0, 0.9), (0.9,0.3,0.3)] # blue, red respectively

        # Plot the learning algorithms on the left and non-learning algorithms on the right
        for i, (rewards, title) in enumerate(zip(data, titles)):
            ax = axs[i]

            # Plot the rewards for each of the agents
            for j, (agent, reward) in enumerate(rewards.items()):
                rewards_smoothed = exponential_smooth(data=reward)

                if title == "Learning Algorithms":
                    ax.plot(reward, alpha=0.3, label=f"{type(agent).__name__}", color=colours[j])
                ax.plot(rewards_smoothed, label=f"{type(agent).__name__} (smoothed)", color=colours[j])

            # Style plot
            ax.set_title(title)
            ax.set_xlabel("Episodes")
            ax.set_ylabel("Sum of rewards during episode")
            ax.legend()
            ax.grid(True)
        plt.suptitle(f"Performance of Q-Learning and SARSA with\n($\\gamma={discount_factor}$, $\\alpha={step_size}$)")
        plt.tight_layout()
        plt.show()

In [None]:
fig = plot_simulation_results(results=results)

Put your plots here. (In case your code takes too long to run.) 

<blockquote>
<p><b>PLOTS OF LEARNING ALGORITHMS</b></p>
<p>From left to right the plots of the algorithms can be read as having a relative, moderate, and strong focus on future rewards. From top to bottom, the plots can be read as the algorithms used a relative &rightarrow; moderate &rightarrow; aggressive learning approach,</p>
<div style="display: flex; flex-wrap: wrap; gap: 10px;">

<div style="flex: 1; text-align: center;">
<p>DISCOUNTED FACTOR (&gamma;) = 0.5</p>
<p>Relative focus on future awards</p>
<img src="img/performance-learning-discounted-factor-5e-1-stepsize-1e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-5e-1-stepsize-5e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-5e-1-stepsize-1e-1.png" style="width: 100%; margin-bottom: 10px;">
</div>

<div style="flex: 1; text-align: center;">
<p>DISCOUNTED FACTOR (&gamma;) = 0.7</p>
<p>Moderate focus on future awards</p>
<img src="img/performance-learning-discounted-factor-7e-1-stepsize-1e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-7e-1-stepsize-5e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-7e-1-stepsize-1e-1.png" style="width: 100%; margin-bottom: 10px;">
</div>

<div style="flex: 1; text-align: center;">
<p>DISCOUNTED FACTOR (&gamma;) = 0.9</p>
<p>Strong focus on future awards</p>
<img src="img/performance-learning-discounted-factor-9e-1-stepsize-1e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-9e-1-stepsize-5e-2.png" style="width: 100%; margin-bottom: 10px;">
<img src="img/performance-learning-discounted-factor-9e-1-stepsize-1e-1.png" style="width: 100%; margin-bottom: 10px;">
</div>

</div>
</blockquote>

**3.2 Please interpret the results/plots (2 points)**. Explain how two algorithms differ in the learning procedure from another and what the results say about the parameters alpha (step-size) and gamma (decay-rate).

> **INTERPRETATION OF PLOTS**
> 1. _**Difference in learning procedure: Q-Learning Agent vs. SARSA Agent**_  
> The Q-Learning Agent deploys the Q-Learning algorithm, which is an off-policy algorithm, while the SARSA agent, uses the SARSA algorithm, which is an on-policy algorithm. Both Q-Learning and SARSA use the $\varepsilon$-greedy policy to choose their next action (unless they act fully greedy).   
> 
>       The difference between the algorithms lies in their way of updating the q-value:  
> * The Q-Learning algorithm always exploits the greedy action for the next state regardless of the action actually taken during exploration. This makes Q-Learning very much focused on long-term rewards by learning the values of optimal actions but leads to less robustness in environments with high variability.
> * The SARSA algorithm updates the q-value using the action taken during exploration. This makes the SARSA more stable in dynamic environments because it learns from exploratory actions.  
> 
>   This is reflected by the plots which show that the Q-Learning Agent generally converges faster during the early episodes by always using the greedy action for updates compared to the SARSA agent which is focused on learning from the exploratory actions and thereby exhibiting more stable learning curves. 
>
> 2. _**Interpretation of parameters alpha and gamma from plots**_  
> By comparing the plots in a grid-like way (discount rate horizontally and step size vertically), the effects of the discount factor and the stepsize become evident:  
> * Impact of the discount factor:
>   * When comparing the discount rates, the increasing urge to look-ahead for future rewards helps the agents in more quickly reaching stable convergence. 
>   * It also shows that there is not a whole lot of increase in the performance of the agents between using a discount factor of 0.7 and 0.9. 
> * Impact of the step size:
>   * Comparing the agents by their learning approach, it shows that the agents more quickly converge by learning faster when the step size is increased. 
>   * However, increasing the stepsize also leads to overshooting during episodes with large variability which shows by the great fluctuations in the plot. Whereas the QAgent thrives by the strong learning urge, the SARSA agent which works on the principle of stability is more benefitted by a moderate learning urge shown by the slightly more graceful curve in the plots.  
> 
>   The optimal parameters for which both the agents fastest reach convergence during episodes is when they use a strong urge to look ahead for future rewards ($\gamma=0.9$) and for the SARSA agent a moderate learning approach of 0.05, and an agressive learning approach ($\alpha=0.1$) for the Q-Learning Agent.

**3.3 Let us think one step further by looking at the policies you have learned (1.5 points).** 

Please compare the performance of your learned optimal policies using another simulation. In this simulation, you shall have three agents (Qlearning_Agent, SARSA_agent and Random_agent), where all three agents always use their initial policy to behave and find the goal in the given map (make sure the agent does NOT learn anymore!). The initial policies of Qlearning_Agent and SARSA_agent are the optimal policies learned from the above experiments (based on the final estimated Q-values using Q-learning and SARSA algorithm respectively). The inital policy of Random_Agent will select 4 actions randomly. 

Describe the performance of three agents by running each agent on the task and discuss the results (1.5 points). You can use the render function provided above (and other helper functions) observe the different behaviors of three agents. You could also use an appropriate plot to show the different performances. *There is no need to submit your codes for this question.*

Some questions you shall think about and answer (in case there are no differences or no special things, just indicate what you observed):
- How does the learned Q-learning policy and SARSA policy perform differently? Does it show the difference between on-policy and off-policy methods?
- What kind of strategy did the two RL agents learn similarly? How do they differ from the random policy?
- For the RL-learned policies, do you still observe something that does not perform so well? Can you think about possible ways to improve/solve it?

In [None]:
# Retrieve optimally performing agents
q_agent: QAgent = None
sarsa_agent: SARSAAgent = None

for key in results:
    if key.discounted_factor == 0.9 and key.is_learning:
        if key.stepsize == 0.05 and isinstance(key.agent, SARSAAgent):
            sarsa_agent = key.agent
        elif key.stepsize == 0.1 and isinstance(key.agent, QAgent):
            q_agent = key.agent
    if q_agent and sarsa_agent:
        break

In [None]:
# Extract q-values
q_vals_q_agent = q_agent.tilings
q_vals_sarsa_agent = sarsa_agent.tilings


In [None]:
# Create agent that has been pre-trained and doesn't learn anymore
class FixedAgent(Agent):
    def __init__(self, alpha, gamma, epsilon, tilings:List[grid_tilings]):
        super().__init__(alpha, gamma, epsilon)
        self.tilings = tilings
    
    def value(self, state, action):
        if is_goal_reached(state):
            return 0.0
        return self.tilings[action].get_weight(state)

    def values(self, state):
        if is_goal_reached(state):
            return np.zeros(4)
        return [self.tilings[action].get_weight(state) for action in Actions]  
    
    def set_value(self, state: State, action: Actions, value: float) -> None:
        pass
    
    def select_action(self, state: State, use_greedy_strategy = True) -> Actions:
        return super().select_action(state, use_greedy_strategy=True)
    
    def learn(self, state: State, action: Actions, next_state: State, reward: float, done = False) -> None:
        return None

In [None]:
# Create agent that always chooses its actions randomly
class RandomAgent(Agent):
    def __init__(self, alpha: float, gamma: float, epsilon: float):
        super().__init__(alpha, gamma, epsilon)
        self.tilings = [grid_tilings(number_of_grids=3) for a in Actions]

    def value(self, state, action):
        if is_goal_reached(state):
            return 0.0
        return self.tilings[action].get_weight(state)

    def values(self, state):
        if is_goal_reached(state):
            return np.zeros(4)
        return [self.tilings[action].get_weight(state) for action in Actions]
    
    def set_value(self, state: State, action: Actions, value: float) -> None:
        pass
    
    def select_action(self, state: State, use_greedy_strategy = True) -> Actions:
        # estimated action values for the current state
        estimated_action_vals = self.values(state) # Has to happen to determine valid actions
        action_ix_2_qval = list(enumerate(estimated_action_vals)) 
        return action_ix_2_qval[np.random.choice(len(action_ix_2_qval))][0]
    
    def learn(self, state: State, action: Actions, next_state: State, reward: float, done = False):
        return None

In [None]:
param_grid_fixed_agents = {
    "num_episodes" : [500],
    "epsilon": [1e-2],
    "discounted_factor": [0.9],
    "stepsize": [0.1], # :TODO also include 0.05?
    "is_learning": [False],
    "agent": [
        (FixedAgent, lambda: FixedAgent(tilings=q_vals_sarsa_agent, alpha=0.1, gamma=0.9, epsilon=0.01)),
        (FixedAgent, lambda: FixedAgent(tilings=q_vals_q_agent, alpha=0.1, gamma=0.9, epsilon=0.01)),
        RandomAgent
    ]
}

In [None]:
fixed_agents_results = run_simulations(param_grid=param_grid_fixed_agents)

In [None]:
# Adjustment on the plot_simulation_results function to plot the 3 agents in a clear way
def plot_comparison_fixed_agents(results: Result):
    """Plot the performance of a trained SARSAAgent, QAgent, and RandomAgent"""

    labels = ["Trained SARSA Agent", "Trained Q-Learning Agent", "Random Agent"]
    colours = [(0.0, 0.0, 0.9), (0.9, 0.3, 0.3), (0.0, 0.9, 0.3)]

    _, ax = plt.subplots(figsize=(7,6))

    for ix, (label, colour) in enumerate(zip(labels, colours)):
        
        rewards = [results[key]
            for i, key in enumerate(results) 
            if i == ix
        ][0]
        rewards_smoothed = exponential_smooth(data=rewards)
        ax.plot(rewards_smoothed, label=f"{label} (smoothed)", color=colour)
        
    ax.set_title("Performance of Trained QAgent and SARSAAgent\nvs. \nRandom Agent")
    ax.set_xlabel("Episodes")
    ax.set_ylabel("Sum of Rewards during Episode")
    ax.legend()
    ax.grid(True)

    plt.tight_layout()
    plt.show()

In [None]:
plot_comparison_fixed_agents(fixed_agents_results)

Your answers here.

> **_Performance Comparison_**  
> ![](img/fixed-qagent-vs-fixed-sarsaagent-vs-randomagent.png)  
> As the trained Agents had already mostly converged during the training, it is to be expected that they perform near optimal during the episodes. This is reflected by the plot showing almost perfect horizontal lines where the sum of rewards constantly approaches 0. In contrast, the random agent, which doesn't learn and always randomly chooses actions shows large variations within the sum of rewards between episodes and fails to converge.  
>   
> While the random agent randomly explores, it still gathers valuable information on state-action pairs and rewards. If a non-learning agent were to use the q-values from a random agent which does update its q-values, it could possibly perform perform well by using a greedy policy. However, for this to work effectively, the random agent must have explored enough of the environment such that, should the agent be randomly be assigned a starting point, it knows enough of its surroundings to decide an optimal path to the goal. This also means that the q-values will most likely be sub-optimal as there is no way of predicting when the agent has explored its environment enough to have a good grasp of which actions lead to the most rewards due to its random action selection. 

You are almost done! Before handing in, make sure that the code you hand in work, and that all plots are shown. **Submit just one file per team.**

Again, make sure **you name this file according to your last names**.