<a href="https://colab.research.google.com/github/Mentel1/Escape-Room-RL/blob/main/Escape_Room.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Packages.

We import the following libraries that are required for this assignment. We shall be using the following libraries:
1. jdc: Jupyter magic that allows defining classes over multiple jupyter notebook cells.
2. numpy: the fundamental package for scientific computing with Python.
3. matplotlib: the library for plotting graphs in Python.
4. RL-Glue: the library for reinforcement learning experiments.
5. BaseEnvironment, BaseAgent: the base classes from which we will inherit when creating the environment and agent classes in order for them to support the RL-Glue framework.
6. Manager: the file allowing for visualization and testing.
7. itertools.product: the function that can be used easily to compute permutations.
8. tqdm.tqdm: Provides progress bars for visualizing the status of loops.

In [3]:
! pip install jdc
! pip install rl_glue



DEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 is no longer maintained. pip 21.0 will drop support for Python 2.7 in January 2021. More details about Python 2 support in pip can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support pip 21.0 will remove support for this functionality.
You should consider upgrading via the 'c:\python27\python.exe -m pip install --upgrade pip' command.





DEPRECATION: Python 2.7 reached the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 is no longer maintained. pip 21.0 will drop support for Python 2.7 in January 2021. More details about Python 2 support in pip can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support pip 21.0 will remove support for this functionality.
ERROR: Could not find a version that satisfies the requirement rl_glue (from versions: none)
ERROR: No matching distribution found for rl_glue
You should consider upgrading via the 'c:\python27\python.exe -m pip install --upgrade pip' command.


In [59]:
import os, sys
import jdc
import numpy as np
from rl_glue import RLGlue
import gym
from Agent import BaseAgent 
from Environment import BaseEnvironment  
from manager import Manager
from itertools import product
from tqdm import tqdm

# Section 1. Environment

In the first part of this assignment, you will get to see how the Cliff Walking environment is implemented. You will also get to implement parts of it to aid your understanding of the environment and more generally how MDPs are specified. In particular, you will implement the logic for:
 1. Converting 2-dimensional coordinates to a single index for the state,
 2. One of the actions (action up),
 3. Reward and termination.
 
Given below is an annotated diagram of the environment with more details that may help in completing the tasks of this part of the assignment. Note that we will be creating a more general environment where the height and width positions can be variable but the start, goal and cliff grid cells have the same relative positions (bottom left, bottom right and the cells between the start and goal grid cells respectively).



Once you have gone through the code and begun implementing solutions, it may be a good idea to come back here and see if you can convince yourself that the diagram above is an accurate representation of the code given and the code you have written.

In [13]:
# Do not modify this cell!

# Create empty EscapeRoomEnvironment class.
# These methods will be filled in later cells.
class EscapeRoomEnvironment(BaseEnvironment):
    def env_init(self, agent_info={}):
        raise NotImplementedError

    def env_start(self, state):
        raise NotImplementedError
        
    def env_render(self):
        raise NotImplementedError

    def env_step(self, reward, state):
        raise NotImplementedError

    def env_end(self, reward):
        raise NotImplementedError
        
    def env_cleanup(self, reward):
        raise NotImplementedError
    
    # helper method
    def state(self, loc):
        raise NotImplementedError

## env_init()

The first function we add to the environment is the initialization function which is called once when an environment object is created. In this function, the grid dimensions and special locations (start and goal locations and the cliff locations) are stored for easy use later.

In [47]:
%%add_to EscapeRoomEnvironment

# Do not modify this cell!

# Work Required: No.
def env_init(self, env_info={}):
        """Setup for the environment called when the experiment first starts.
        Note:
            Initialize a tuple with the reward, first state, boolean
            indicating if it's terminal.
        """
        
        # Note, we can setup the following variables later, in env_start() as it is equivalent. 
        # Code is left here to adhere to the note above, but these variables are initialized once more
        # in env_start() [See the env_start() function below.]
        
        reward = None
        state = None # See Aside
        termination = None
        self.reward_state_term = (reward, state, termination)
        
        # AN ASIDE: Observation is a general term used in the RL-Glue files that can be interachangeably 
        # used with the term "state" for our purposes and for this assignment in particular. 
        # A difference arises in the use of the terms when we have what is called Partial Observability where 
        # the environment may return states that may not fully represent all the information needed to 
        # predict values or make decisions (i.e., the environment is non-Markovian.)
        
        self.grid_h = env_info.get("grid_height", 5) 
        self.grid_w = env_info.get("grid_width", 5)
        self.grid_shape = (self.grid_h, self.grid_w)
        
 
        self.start_loc = (self.grid_h-1, self.grid_w//2)
        # Goal location is the bottom-right corner. (max x, max y).
        self.goal_loc = (0,self.grid_w//2)
        # The door is in the middle of the top line of the room
        self.obstacle_loc = (self.goal_loc[0]+1,self.goal_loc[1])
        # There is an obstacle in front of the door

        # map bounds
        self.UP_map_bound = [(-1, y) for y in range(-1,self.grid_w+1)]
        self.DOWN_map_bound = [(self.grid_h, y) for y in range(-1,self.grid_w+1)]
        self.RIGHT_map_bound = [(x, -1) for x in range(-1,self.grid_h+1)]
        self.LEFT_map_bound = [(x, self.grid_w) for x in range(-1,self.grid_h+1)]
        self.forbidden_locs = self.UP_map_bound + self.DOWN_map_bound + self.RIGHT_map_bound + self.LEFT_map_bound + [self.obstacle_loc]
        self.forbidden_locs = list(set(self.forbidden_locs))

        self.key_loc = (self.grid_h-1,self.grid_w-1)
        assert self.key_loc not in self.forbidden_locs, "key location init is forbidden, try another location"
        assert self.start_loc not in self.forbidden_locs, "start location init is forbidden, try another location" 
        assert self.goal_loc not in self.forbidden_locs, "goal location init is forbidden, try another location" 
        #The key is in the bottom right corner
        self.got_key = False
        #The player does not have the key in the beginning

## env_start()

In env_start(), we initialize the agent location to be the start location and return the state corresponding to it as the first state for the agent to act upon. Additionally, we also set the reward and termination terms to be 0 and False respectively as they are consistent with the notion that there is no reward nor termination before the first action is even taken.

In [15]:
%%add_to EscapeRoomEnvironment

# Do not modify this cell!

# Work Required: No.
def env_start(self):
    """The first method called when the episode starts, called before the
    agent starts.

    Returns:
        The first state from the environment.
    """
    reward = 0
    # agent_loc will hold the current location of the agent
    self.agent_loc = self.start_loc
    # state is the one dimensional state representation of the agent location.
    state = (*self.agent_loc,self.got_key)
    termination = False
    self.reward_state_term = (reward, state, termination)

    return self.reward_state_term[1]

## env_render

In [65]:
%%add_to EscapeRoomEnvironment

def env_render(self):
    """render the current state to terminal
    0 : background (' ')
    1 : player ('P')
    2 : door ('D')
    3 : key ('K')
    4 : left/right wall ('|')
    5 : top/bottom wall ('-')
    6 : obstacle ('X')
    """
    lut = {0:' ', 
           1:gym.utils.colorize('P',"blue"),
           2:gym.utils.colorize('D',"green"),
           3:gym.utils.colorize('K',"yellow"),
           4:'|',
           5:'-',
           6:gym.utils.colorize('X',"red"),
           }
           
    r = np.zeros(self.grid_shape, dtype='int8')

    r[self.goal_loc] = 2 # door
    if not self.got_key : 
      r[self.key_loc] = 3 # key
    r[self.obstacle_loc] = 6


    agent_state = self.reward_state_term[1]

    if agent_state is not None :
      agent_loc = agent_state[:2]
      r[agent_loc] = 1

    r = np.pad(r, 1, mode='constant',constant_values=4)
    r[0][:] = 5
    r[-1][:] = 5
    r_str = ""
    for i in range(r.shape[0]):
      for j in range(r.shape[1]):
        r_str += lut[r[i,j]]
      r_str += '\n'
    # r_str += '('+self.action_space_lut[self._game.player_last_action] + ')\n'
    return r_str


In [66]:
env = EscapeRoomEnvironment()
init_params = {
    "grid_width": 6,
    "grid_height": 8,
}

env.env_init(env_info = init_params)
env.env_start()

# Render the game
os.system("clear")
sys.stdout.write(env.env_render())

--------
|   [32mD[0m  |
|   [31mX[0m  |
|      |
|      |
|      |
|      |
|      |
|   [34mP[0m [33mK[0m|
--------


126

## *Implement* env_step()

Once an action is taken by the agent, the environment must provide a new state, reward and termination signal. 

In the Cliff Walking environment, agents move around using a 4-cell neighborhood called the Von Neumann neighborhood (https://en.wikipedia.org/wiki/Von_Neumann_neighborhood). Thus, the agent has 4 available actions at each state. Three of the actions have been implemented for you and your first task is to implement the logic for the fourth action (Action UP).

Your second task for this function is to implement the reward logic. Look over the environment description given earlier in this notebook if you need a refresher for how the reward signal is defined.

In [None]:
%%add_to CliffWalkEnvironment

# Work Required: Yes. Fill in the code for action UP and implement the logic for reward and termination.
# Lines: ~7.
def env_step(self, action):
    """A step taken by the environment.

    Args:
        action: The action taken by the agent

    Returns:
        (float, state, Boolean): a tuple of the reward, state,
            and boolean indicating if it's terminal.
    """

    if action == 0: # UP (Task 1)
        # Hint: Look at the code given for the other actions and think about the logic in them.
        possible_next_loc = (self.agent_loc[0] - 1, self.agent_loc[1])
        if possible_next_loc[0] >= 0 and possible_next_loc != self.obstacle: # Within Bounds?
            self.agent_loc = possible_next_loc
        else:
            pass # Stay
        ### END CODE HERE ###
    elif action == 1: # LEFT
        possible_next_loc = (self.agent_loc[0], self.agent_loc[1] - 1)
        if possible_next_loc[1] >= 0 and possible_next_loc != self.obstacle: # Within Bounds?
            self.agent_loc = possible_next_loc
        else:
            pass # Stay.
    elif action == 2: # DOWN
        possible_next_loc = (self.agent_loc[0] + 1, self.agent_loc[1])
        if possible_next_loc[0] < self.grid_h and possible_next_loc != self.obstacle: # Within Bounds?
            self.agent_loc = possible_next_loc
        else:
            pass # Stay.
    elif action == 3: # RIGHT
        possible_next_loc = (self.agent_loc[0], self.agent_loc[1] + 1)
        if possible_next_loc[1] < self.grid_w and possible_next_loc != self.obstacle: # Within Bounds?
            self.agent_loc = possible_next_loc
        else:
            pass # Stay.
    else: 
        raise Exception(str(action) + " not in recognized actions [0: Up, 1: Left, 2: Down, 3: Right]!")

    reward = -1
    terminal = False

    ### START CODE HERE ###
    # Hint: Consider the initialization of reward and terminal variables above. Then, note the 
    # conditional statements and comments given below and carefully ensure to set the variables reward 
    # and terminal correctly for each case.
    if self.agent_loc[0] == self.grid_h-1:
      if 1 <= self.agent_loc[1] < self.grid_w-1:
        reward = -100
        self.agent_loc = (3,0)
      elif self.agent_loc[1] == self.grid_w-1:
        terminal = True

    ### END CODE HERE ###

    self.reward_state_term = (reward, self.state(self.agent_loc), terminal)
    return self.reward_state_term

## env_cleanup()

There is not much cleanup to do for the Cliff Walking environment. Here, we simply reset the agent location to be the start location in this function.

In [None]:
%%add_to CliffWalkEnvironment

# Do not modify this cell!

# Work Required: No.
def env_cleanup(self):
    """Cleanup done after the environment ends"""
    self.agent_loc = self.start_loc

# Section 2. Agent

In this second part of the assignment, you will be implementing the key updates for Temporal Difference Learning. There are two cases to consider depending on whether an action leads to a terminal state or not.

In [None]:
# Do not modify this cell!

# Create empty TDAgent class.
# These methods will be filled in later cells.

class TDAgent(BaseAgent):
    def agent_init(self, agent_info={}):
        raise NotImplementedError
        
    def agent_start(self, state):
        raise NotImplementedError

    def agent_step(self, reward, state):
        raise NotImplementedError

    def agent_end(self, reward):
        raise NotImplementedError

    def agent_cleanup(self):        
        raise NotImplementedError
        
    def agent_message(self, message):
        raise NotImplementedError

## agent_init()

As we did with the environment, we first initialize the agent once when a TDAgent object is created. In this function, we create a random number generator, seeded with the seed provided in the agent_info dictionary to get reproducible results. We also set the policy, discount and step size based on the agent_info dictionary. Finally, with a convention that the policy is always specified as a mapping from states to actions and so is an array of size (# States, # Actions), we initialize a values array of shape (# States,) to zeros.

In [None]:
%%add_to TDAgent

# Do not modify this cell!

# Work Required: No.
def agent_init(self, agent_info={}):
    """Setup for the agent called when the experiment first starts."""

    # Create a random number generator with the provided seed to seed the agent for reproducibility.
    self.rand_generator = np.random.RandomState(agent_info.get("seed"))

    # Policy will be given, recall that the goal is to accurately estimate its corresponding value function. 
    self.policy = agent_info.get("policy")
    # Discount factor (gamma) to use in the updates.
    self.discount = agent_info.get("discount")
    # The learning rate or step size parameter (alpha) to use in updates.
    self.step_size = agent_info.get("step_size")

    # Initialize an array of zeros that will hold the values.
    # Recall that the policy can be represented as a (# States, # Actions) array. With the 
    # assumption that this is the case, we can use the first dimension of the policy to
    # initialize the array for values.
    self.values = np.zeros((self.policy.shape[0],))

## agent_start()

In agent_start(), we choose an action based on the initial state and policy we are evaluating. We also cache the state so that we can later update its value when we perform a Temporal Difference update. Finally, we return the action chosen so that the RL loop can continue and the environment can execute this action.

In [None]:
%%add_to TDAgent

# Do not modify this cell!

# Work Required: No.
def agent_start(self, state):
    """The first method called when the episode starts, called after
    the environment starts.
    Args:
        state (Numpy array): the state from the environment's env_start function.
    Returns:
        The first action the agent takes.
    """
    # The policy can be represented as a (# States, # Actions) array. So, we can use 
    # the second dimension here when choosing an action.
    action = self.rand_generator.choice(range(self.policy.shape[1]), p=self.policy[state])
    self.last_state = state
    return action

## agent_step()

In agent_step(), the agent must:

- Perform an update to improve the value estimate of the previously visited state, and
- Act based on the state provided by the environment.


In [None]:
%%add_to TDAgent

# Work Required: No. 
def agent_step(self, reward, state):
    """A step taken by the agent.
    Args:
        reward (float): the reward received for taking the last action taken
        state (Numpy array): the state from the
            environment's step after the last action, i.e., where the agent ended up after the
            last action
    Returns:
        The action the agent is taking.
    """
    # We should perform an update with the last state given that we now have the reward and
    # next state. We break this into two steps. Recall for example that the Monte-Carlo update 
    # had the form: V[S_t] = V[S_t] + alpha * (target - V[S_t]), where the target was the return, G_t.
    target = reward + self.discount * self.values[state]
    self.values[self.last_state] = self.values[self.last_state] + self.step_size * (target - self.values[self.last_state])

    # Having updated the value for the last state, we now act based on the current 
    # state, and set the last state to be current one as we will next be making an 
    # update with it when agent_step is called next once the action we return from this function 
    # is executed in the environment.

    action = self.rand_generator.choice(range(self.policy.shape[1]), p=self.policy[state])
    self.last_state = state

    return action

## agent_end() 

TD update for the case where an action leads to a terminal state.

In [None]:
%%add_to TDAgent

# Work Required: No. 

def agent_end(self, reward):
    """Run when the agent terminates.
    Args:
        reward (float): the reward the agent received for entering the terminal state.
    """
    # Here too, we should perform an update with the last state given that we now have the 
    # reward. Note that in this case, the action led to termination. Once more, we break this into 
    # two steps, computing the target and the update itself that uses the target and the 
    # current value estimate for the state whose value we are updating.
    target = reward
    self.values[self.last_state] = self.values[self.last_state] + self.step_size * (target - self.values[self.last_state])
