# Reinforcement Learning

In [2]:
from time import sleep
from IPython.display import clear_output
import random
!pip install gym
import gym
import numpy as np
np.random.seed(0)



We will be using [OpenAI's gym](https://gym.openai.com/docs/) for rendering environments and we will specifically use the [Taxi-v2](https://gym.openai.com/envs/Taxi-v2/) environment for this exercise. 

In [3]:
# Load the Taxi-v2 environment
env = gym.make("Taxi-v2").env

# Standardize expected results
env.seed(0)
env.reset()

print(f"Current State: {env.s}")
env.render()

Current State: 26
+---------+
|R:[43m [0m| : :[34;1mG[0m|
| : : : : |
| : : : : |
| | : | : |
|[35mY[0m| : |B: |
+---------+



  result = entry_point.load(False)


The above section just rendered an example view of the environment. For the Taxi-v2 environment,

1. the block is the taxi and it is yellow if empty and green if it contains a passenger
1. Pipe symbols `|` represent barriers preventing the taxi from moving in that direction
1. R, G, Y, B are all the possible pickup or dropoff locations for a passenger
1. Blue represents the current passenger's pickup location
1. Purple represents the current passenger's dropoff location

The reward scheme for this environment is as follows, "your job is to pick up the passenger at one location and drop them off in another. You receive +20 points for a successful dropoff, and lose 1 point for every timestep it takes. There is also a 10 point penalty for illegal pick-up and drop-off actions."

In [4]:
print(f"The action space is discrete with {env.action_space.n} possibilities.")
print(f"The observation (state) space is discrete with {env.observation_space.n} possibilities.")

The action space is discrete with 6 possibilities.
The observation (state) space is discrete with 500 possibilities.


The following actions are possible in the environment:

1. Move south
1. Move north
1. Move east
1. Move west
1. Pick up passenger
1. Drop off passenger

In [0]:
def initialize_q_table(env):
    """Initialize a Q table for an environment with all 0s
    
    Args:
        env (gym.envs): The environment
    
    Returns:
        np.array: The Q table
    """
    # YOUR CODE HERE
      
    return np.zeros([env.observation_space.n, env.action_space.n])
    


In [6]:
assert initialize_q_table(env).shape == (500, 6)
xenv = gym.make("FrozenLake-v0").env
assert initialize_q_table(xenv).shape ==(16,4)

  result = entry_point.load(False)


In [0]:
def select_action(q_row, method, epsilon=0.5):
    """Select the appropriate action given a Q table row for the state and a chosen method
    
    Args:
        q_row (np.array): The row from the Q table to utilize
        method (str): The method to use, either "random" or "epsilon"
        epsilon (float, optional): Defaults to 0.5. The epsilon value to use for epislon-greed action selection
    
    Raises:
        NameError: If method specified is not supported
    
    Returns:
        int: The index of the action to apply
    """
    if method not in ["random", "epsilon"]:
        raise NameError("Undefined method.")
    
    # YOUR CODE HERE
    random = np.random.rand()
    if method  == "random":
      return int(random*len(q_row))
    if method == "epsilon":
      if random > epsilon:
        return np.argmax(q_row)
      else:
        return int(random*len(q_row))

    
    

In [0]:
assert select_action(np.array([1,2,3,4]), "epsilon", epsilon=0) == 3
assert select_action(np.array([1,2,3,4]), "epsilon", epsilon=1) in range(4)
assert select_action(np.array([1,2,3,4]), "random") in range(4)

The `env.step(action)` method takes a parameter that is the action the agent decides to apply and returns 4 values:
1. The new state
1. The received reward
1. Whether you have completed the task
1. Miscellaneous information

In [9]:
action = 0
vals = env.step(action)
print(f"An example returned from a step with action 0")
print(vals)
print(f"This returns the new state {vals[0]}, the reward received ({vals[1]}) based on performing the action {action}, whether or not the task has been completed, {vals[2]}, and some additional miscellaneous info.")

An example returned from a step with action 0
(126, -1, False, {'prob': 1.0})
This returns the new state 126, the reward received (-1) based on performing the action 0, whether or not the task has been completed, False, and some additional miscellaneous info.


In [0]:
def calculate_new_q_val(q_table, state, action, reward, next_state, alpha, gamma):
    """Calculate the updated Q table value for a particular state and action given the necessary parameters
    
    Args:
        q_table (np.array): The Q table
        state (int): The current state of the simulation's index in the Q table
        action (int): The current action's index in the Q table
        reward (float): The returned reward value from the environment
        next_state (int): The next state of the simulation's index in the Q table (Based on the environment)
        alpha (float): The learning rate
        gamma (float): The discount rate
    
    Returns:
        float: The updated action-value expectation for the state and action
    """
    # YOUR CODE HERE
    a=(1 - alpha) * q_table[state,action]+ alpha* (reward + gamma*q_table[next_state,action])
    return a


In [0]:
test_q = np.array([[1,2,3,4],[1,2,3,4],[1,2,3,4]])
assert -0.05 < calculate_new_q_val(test_q, 0, 1, 10, 1, 0.1, 0.2) - 2.88 < 0.05
assert -0.05 < calculate_new_q_val(test_q, 0, 1, 1, 1, 0.1, 0.1) - 1.94 < 0.05
assert -0.05 < calculate_new_q_val(test_q, 0, 1, -11, 2, 0.1, 0.1) - 0.74 < 0.05

In [0]:
epsilon1_params = {
    "method": "epsilon",
    "epsilon": 0.1,
    "alpha": 0.1,
    "gamma": 0.5
}

In [0]:
epsilon2_params = {
    "method": "epsilon",
    "epsilon": 0.3,
    "alpha": 0.1,
    "gamma": 0.5
}

In [0]:
def train_sim(env, params, n=100):
    """Train a simulation on an environment and return its Q table
    
    Args:
        env (gym.envs): The environment to train in
        params (dict): The parameters needed to train the simulation: method (for action selection), epsilon, alpha, gamma
        n (int, optional): Defaults to 100. The number of simulations to run for training
    
    Returns:
        np.array: The trained Q table from the simulation
    """
    my_q = initialize_q_table(env)
    
    for i in range(n):
        current_state = env.reset()
        done = False
        
        while not done:
            # Get the next action based on current state
            # Step through the environment with the selected action
            # Update the qtable
            
            # YOUR CODE HERE
            method = params["method"]
            epsilon = params["epsilon"]
            alpha = params["alpha"]
            gamma = params["gamma"]
            action = select_action(my_q[current_state], method, epsilon)   
            next_state, reward, done, info = env.step(action)
            
            my_q[current_state, action] = calculate_new_q_val(my_q, current_state, action, reward, next_state, alpha, gamma)

            # Prep for next iteration
            current_state = next_state 

        if (i+1) % 100 == 0:
            print(f"Simulation #{i+1:,} complete.")
        
    return my_q

In [0]:
%%time
n = 10000
epsilon1_q = train_sim(env, epsilon1_params, n)
epsilon2_q = train_sim(env, epsilon2_params, n)

Simulation #100 complete.
Simulation #200 complete.
Simulation #300 complete.


In [0]:
def test_sim(env, q_table, n=100, render=False):
    """Test an environment using a pre-trained Q table
    
    Args:
        env (gym.envs): The environment to test
        q_table (np.array): The pretrained Q table
        n (int, optional): Defaults to 100. The number of test iterations to run
        render (bool, optional): Defaults to False. Whether to display a rendering of the environment
    
    Returns:
        np.array: Array of length n with each value being the cumulative reward achieved in the simulation
    """
    rewards = []
    
    for i in range(n):
        current_state = env.reset()

        tot_reward = 0
        done = False
        step = 0

        while not done:
            
            # Determine the best action
            # Step through the environment
            
            # YOUR CODE HERE
            
            method = params["method"]
            epsilon = params["epsilon"]
            alpha = params["alpha"]
            gamma = params["gamma"]
            
            action = select_action(q_table[current_state], method, epsilon)   
            next_state, reward, done, info = env.step(action)
            
            q_table[current_state, action] = calculate_new_q_val(q_table, current_state, action, reward, next_state, alpha, gamma)

            
            tot_reward += reward
            step +=1
            if render:
                clear_output(wait=True)
                print(f"Simulation: {i + 1}")
                env.render()
                print(f"Step: {step}")
                print(f"Current State: {current_state}")
                print(f"Action: {action}")
                print(f"Reward: {reward}")
                print(f"Total rewards: {tot_reward}")
                sleep(.2)
            if step == 50:
                print("Agent got stuck. Quitting...")
                sleep(.5)
                break
        
        rewards.append(tot_reward)
    
    return np.array(rewards)

In [0]:
# Add render=True to see the simulation running
epsilon1_rewards = test_sim(env, epsilon1_q, 10)

In [0]:
epsilon2_rewards = test_sim(env, epsilon2_q, 10)

In [0]:
print(f"The first epsilon greedy training method was able to get a median reward of {np.median(epsilon1_rewards)}.")
print(f"The second epsilon greedy training method was able to get a median reward of {np.median(epsilon2_rewards)}.")

In [0]:
assert np.median(epsilon1_rewards) > 5
assert np.median(epsilon2_rewards) > 5

## Feedback

In [0]:
def feedback():
    """Provide feedback on the contents of this exercise
    
    Returns:
        string
    """
    # YOUR CODE HERE
    raise NotImplementedError()