In [None]:
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.animation as animation

In [None]:
!pip install ffmpeg

In [None]:
class AbstractAgent:
    '''Abstract class for all agents. '''

    def __init__(self, env):
        self.env = env

    def step(self):
        '''Take an action '''
        raise NotImplementedError

In [None]:
class AbstractEnvironment:
    '''Base class for all enviornments.'''

    def reset(self):
        '''Reset the state of the environment to original.

        Returns:
        The new state
        '''
        raise NotImplementedError

    def step(self, action):
        '''Run the enivornment one step by taking an action.

        Arguments:
        action - The action to take

        Returns:
        The amount of reward recieved
        '''
        raise NotImplementedError

    def get_num_states(self):
        '''Get the number of states in the environment'''
        raise NotImplementedError

    def get_num_actions(self):
        '''Get the number of actions the agent can perform'''
        raise NotImplementedError

    def get_current_state(self):
        '''Get the current state of the environment.'''
        raise NotImplementedError

    def get_state_space(self):
        '''Get the possible states of the environment'''
        raise NotImplementedError

    def get_action_space(self):
        '''Get the possible actions in the environment'''
        raise NotImplementedError


In [None]:
class GridMaze(AbstractEnvironment):
    #TODO make a 5X5 grid
    default_maze = ["#G.",
                    "##.",
                    "..A"]
    action_movements = {
        0: np.array((-1, 0)),
        #TODO map the remaining set of actions to Integer domain
    }

    def __init__(self, maze=None):
        if maze is None:
            maze = self.default_maze.copy()
        self.maze = np.array(list(map(list, maze)))
        self.reset()

    def reset(self):
        #TODO move the agent back to its starting position


    def step(self, action):
        # if action not in self.action_movements.keys():
        # raise ValueError(f"Invalid action {action}. Please use N, W, S or E.")


        #TODO determine the new position of the agent
        new_pos=

        # TODO update new position when the agent reaches the border
        new_pos =

        # TODO update new position when the agent hits an obstucle next
        new_pos =

        self.agent_pos = new_pos

        #TODO what happens when the agent reaches the GOAL ?
        #TODO update reward and grid
        if new_pos_char == "G":
            reward=

        else:
            reward=

        return reward

    def get_current_state(self):
        return tuple(self.agent_pos)

    def get_state_space(self):
        return list(np.ndindex(self.maze.shape))

    def get_action_space(self):
        return list(self.action_movements.keys())

    def get_num_states(self):
        return len(self.get_state_space())

    def get_num_actions(self):
        return len(self.get_action_space())

    def render(self):
        'plot the maze'
        fig, ax = plt.subplots(1, 1)
        ax.imshow(self.maze == "#", vmin=0, vmax=1, cmap="Greys")
        h, w = self.maze.shape
        ax.plot([-0.5, -0.5, w - 0.5, w - 0.5, -0.5],
                [-0.5, h - 0.5, h - 0.5, -0.5, -0.5], 'k', lw=3)
        y, x = self.agent_pos
        self.agent_dot, = plt.plot((x,), (y,), 'ro', ms=20)
        rew_y, rew_x = np.where(self.maze == "G")
        ax.plot(rew_x, rew_y, "b*", ms=30)
        return fig

In [None]:
class MultiQLearner(AbstractAgent):
    def __init__(self, env, alpha=0.02, beta=1, gamma=1, epsilon=0.5):
        super().__init__(env)
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        # Table of Q values with states on the rows and actions on the columns
        self.Q_table = [[0.0 for j in range(self.env.get_num_actions())] for i in range(self.env.get_num_states())]
        self.epsilon = epsilon  # exploration probability


    def step(self):
        #TODO implement the Q-Learning algorithm to train the agent
        # get current state
        state = self.env.get_current_state()
        state_index = self.env.get_state_space().index(state)

        # TODO select an action by epsilon greedy approach
        action =

        # TODO perform the action and compute the reward
        reward = self.env.step(action)


        # TODO update q table
        self.Q_table[state_index][action] =

        return action, reward

In [None]:
import IPython
import matplotlib
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.animation
import IPython.display


class RLRunner:

    def __init__(self, agent):
        self.agent = agent
        self.states = []
        self.actions = []
        self.rewards = []

    def step(self):
        # TODO make the agent perform an action and record the reward

    def run(self, n_steps):
        #TODO train the agent for n_steps

    def plot_cumulative_rewards(self):
        fig, ax = plt.subplots(1, 1)
        #TODO plot all the rewards obtained so far
        ax.plot()
        ax.set_xlabel("Step")
        ax.set_ylabel("Rewards (cuml)")

    def plot_reward_rate(self, window_size=100):
        fig, ax = plt.subplots(1, 1)
        #TODO plot rewards obtained for every "window_size"

        ax.set_xlabel("Step")
        ax.set_ylabel("Reward/step")

    def plot_max_value_per_state(self):
        #Plot max value per state as a heat map


    def animate_history(self,fps=20, frames=None,):
        if frames is None:
            frames = len(self.states)
        fig = self.agent.env.render()

        def anim_update(i):
            y, x = self.states[i]
            self.agent.env.agent_dot.set_data((x,), (y,))

        anim = matplotlib.animation.FuncAnimation(fig, anim_update,
                                                  frames=frames,
                                                  interval=1000.0 / fps)

        #TODO add your ffmpeg path from anaconda
        plt.rcParams['animation.ffmpeg_path'] = "/home/miniconda3/bin/ffmpeg"

        video = anim.to_html5_video()
        plt.close(fig)
        with open("myvideo.html", "w") as f:
            print(video, file=f)

        #return IPython.display.HTML(video)

In [None]:
if __name__=='__main__':
    #TODO create a grid
    #TODO create an agent
    #TODO train the agent