# Script that does some reinforcement learning (Q-learning)

From an article on https://pythonprogramming.net

Q-learning is 'model free' insofaras it doesn't require prior knowledge of the situation. It just samples its state and figures out the reward based on actions taken in response

In [6]:
import numpy as np
import gym

In [None]:
# Basic steps comprise making an environment. Doing some stuff, then resetting the environment

env = gym.make("MountainCar-v0")
print(env.action_space.n)  # This prints the number of actions that can be passed to the agent within the environment
# We denote these actions 0, 1, 2 to correspond to push left, stay still, push right in this particular example. The agent
# doesnt' know that though...
env.reset()

done = False
while not done:
    action = 1  # always go right!
    env.step(action)
    env.render()

# In the popup box, you can see that the agent doesn't have the energy to make it up the hill.

### Task definition: Allow the agent to learn how to get up the hill by increasing its momentum in the dip

This is the setup whilst we know that the agent can take only three actions: Stay still, move left, move right.

Each time you call env.reset(), an observation about the state is made that can be used to motivate the agent's decision.

If the environment is running continuously in a loop, observations can be made continuously as outputs of the env.step() command

In [3]:
env = gym.make("MountainCar-v0")
state = env.reset()

i = 0
while i < 10:
    action = 2
    new_state, reward, done, _ = env.step(action)
    print(reward, new_state)
    i += 1

# Output is:
# [reward [observationState observationState]]
# The reward of -1 is for any step (Reaching the top of the hill is given reward 0)
# For the benifit of our human interpretation, the observation state values are x position and velocity.
# They have no physical meaning to the agent though

-1.0 [-0.51112031  0.00091328]
-1.0 [-0.50930059  0.00181971]
-1.0 [-0.50658808  0.00271251]
-1.0 [-0.5030031   0.00358498]
-1.0 [-0.49857249  0.00443061]
-1.0 [-0.49332939  0.00524309]
-1.0 [-0.48731301  0.00601638]
-1.0 [-0.48056823  0.00674478]
-1.0 [-0.47314529  0.00742295]
-1.0 [-0.4650993   0.00804599]


Given the position and velocity, we can put together an algorithm that helps the agent decide whether it can make it up the hill, or whether it needs to build up more momentum.

We want to do this by building up a Q-table which is a table of Q values. For each action in each state there is a Q value, thus with the ability to inspect or query every state, we can build up a table of the Q values which will describe how(?) to climb the hill.

In [4]:
# In this example we can directly query the environment to find the values
print(env.observation_space.high)
print(env.observation_space.low)

[0.6  0.07]
[-1.2  -0.07]


Above figures represent the full range of the sample space: Observation state 1 (index 0 above) can vary from -1.2 -> 0.6 and Observation state 2 (index 1 above) can vary from -0.07 -> 0.07

However, the printout of 10 sampled values above show that there is a much finer degree of discretisation in the actual sampled values compared to the range - this means it would be inefficient and time consuming to try and fill up the whole table at the current resolution

We continue with a courser discretisation of the sample space into 20 points for each observation state

In [5]:
DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE


print(discrete_os_win_size)

[0.09  0.007]


In [7]:
# Using the above discretisation, we now have a 20x20x3 table of randomised Q-values
# The 20x20 is all possible combinations of the observation states (in the lower resolution)
# The x3 accounts for all the possible actions we could take (0, 1 or 2)

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))
# Actual randomised values are negative because the reward for a step is -1, whilst the reward for reaching the top of the 
# hill is 0

"*When we're being "greedy" and trying to "exploit" our environment, we will choose to go with the action that has the highest Q value for this state. Sometimes, however, especially initially, we may instead wish to "explore" and just choose a random action. These random actions are how our model will learn better moves over time. So how do we learn over time? We need to update these Q values! How do we update those Q values?*"

$Q^\mathrm{new}(s_t,a_t)\leftarrow (1-\alpha)\cdot Q(s_t,a_t)+\alpha\cdot (r_t +\gamma\cdot \mathrm{max}_a [Q(s_{t+1},a)])$

where

$Q(s_t,a_t)$ is the old value<br>
$\alpha$ is the learning rate<br>
$r_t$ is the reward<br>
$\gamma$ is the discount factor<br>
$\mathrm{max}_a [Q(s_{t+1},a)]$ is the estimate of the optimal future value<br>
$(r_t +\gamma\cdot \mathrm{max}_a [Q(s_{t+1},a)])$ is the learned value

The discount is a measure of how much to prioritise the future reward over immediate reward. It's between 0,1 and generally closer to 1. It's closer to 1 because the overall objective is to get the agent to learn how to reach the final (desired) reward

The estimate of the optimal future value is obtained after we perform our action - we update our Q-values based partially on the next step's best Q-value. Once the objective has been reached once, the reward at that step "gets slowly back-propagated, one step at a time, per episode".

### Environment setup:

In [8]:
env = gym.make("MountainCar-v0")
env.reset()

DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE

# Q-Learning settings
LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 25000

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))

# Function that converts the 'continous' outputs of state query into the 20x20 discretisation we want
def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))

### Execution loop

In [11]:
discrete_state = get_discrete_state(env.reset())  # This is the initial state
done = False
while not done:
    action = np.argmax(q_table[discrete_state])  # Instead of just action=2 forcing it always to go left, allow it to make a choice based on the Q-table
    new_state, reward, done, _ = env.step(action)

    new_discrete_state = get_discrete_state(new_state)

    env.render()
    #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

    # NOTE THAT WE DON'T UPDATE THE Q-VALUES OF STEPS THAT HAVE ALREADY BEEN MADE
    # If simulation did not end yet after last step - update Q table
    if not done:

        # Maximum possible Q value in next step (for new state)
        max_future_q = np.max(q_table[new_discrete_state])

        # Current Q value (for current state and performed action)
        current_q = q_table[discrete_state + (action,)]

        # And here's our equation for a new Q value for current state and action
        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # Update Q table with new Q value
        q_table[discrete_state + (action,)] = new_q


    # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
    elif new_state[0] >= env.goal_position:
        #q_table[discrete_state + (action,)] = reward
        q_table[discrete_state + (action,)] = 0

    discrete_state = new_discrete_state


env.close()

The first time the above cell is run, the output is a short sequence of the agent rocking back and forth.

Each time you run the above cell, the agent learns a little more about the environment and how to navigate it.

### Loop over multiple episodes (epochs) to allow training to build up

In [12]:
SHOW_EVERY = 1000

for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    # Don't want to see the rendering every time if you're training over thousands of epochs
    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False
    
    while not done:

        action = np.argmax(q_table[discrete_state])  # Instead of just action=2 forcing it always to go left, allow it to make a choice based on the Q-table
        new_state, reward, done, _ = env.step(action)

        new_discrete_state = get_discrete_state(new_state)

        env.render()
        #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # NOTE THAT WE DON'T UPDATE THE Q-VALUES OF STEPS THAT HAVE ALREADY BEEN MADE
        # If simulation did not end yet after last step - update Q table
        if not done:

            # Maximum possible Q value in next step (for new state)
            max_future_q = np.max(q_table[new_discrete_state])

            # Current Q value (for current state and performed action)
            current_q = q_table[discrete_state + (action,)]

            # And here's our equation for a new Q value for current state and action
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

            # Update Q table with new Q value
            q_table[discrete_state + (action,)] = new_q


        # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
        elif new_state[0] >= env.goal_position:
            #q_table[discrete_state + (action,)] = reward
            q_table[discrete_state + (action,)] = 0

        discrete_state = new_discrete_state


env.close()

0


KeyboardInterrupt: 

### Exploration

In the above loop, the agent doesn't really get better, because the reward for every step is -1, and there's only one point where it would get the desired reward of 0. Therefore, every point in the environment (aside from the destination) is basically the same and it doesn't know where to go.

We need the agent to reach the destination once, so the Q-table is given knowledge about the preferred direction. Having found the destination once, the reward is 'back-propagated' through the epochs of training. This is motivated with an exploration parameter epsilon

In [14]:
# Exploration settings
epsilon = 1  # Not a constant, qoing to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES//2
epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)

Now the same loop as above but with the epsilon included:

In [15]:
for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False

    while not done:

        if np.random.random() > epsilon:
            # Get action from Q table
            action = np.argmax(q_table[discrete_state])
        else:
            # Get random action
            action = np.random.randint(0, env.action_space.n)


        new_state, reward, done, _ = env.step(action)

        new_discrete_state = get_discrete_state(new_state)

        if episode % SHOW_EVERY == 0:
            env.render()
        #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # If simulation did not end yet after last step - update Q table
        if not done:

            # Maximum possible Q value in next step (for new state)
            max_future_q = np.max(q_table[new_discrete_state])

            # Current Q value (for current state and performed action)
            current_q = q_table[discrete_state + (action,)]

            # And here's our equation for a new Q value for current state and action
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

            # Update Q table with new Q value
            q_table[discrete_state + (action,)] = new_q


        # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
        elif new_state[0] >= env.goal_position:
            #q_table[discrete_state + (action,)] = reward
            q_table[discrete_state + (action,)] = 0

        discrete_state = new_discrete_state

    # Decaying is being done every episode if episode number is within decaying range
    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value


env.close()

0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
