Grid from sutton, page 163

In [1]:
%matplotlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import hsv_to_rgb

Using matplotlib backend: <object object at 0x0000025B29D17280>


In [2]:
def change_range(values, vmin=0, vmax=1):
    start_zero = values - np.min(values)
    return (start_zero / (np.max(start_zero) + 1e-7)) * (vmax - vmin) + vmin

In [3]:
class GridWorld:
    terrain_color = dict(normal=[127/360, 0, 96/100],
                         objective=[26/360, 100/100, 100/100],
                         cliff=[247/360, 92/100, 70/100],
                         player=[344/360, 93/100, 100/100])
        
    def __init__(self):
        self.player = None
        self._create_grid()  
        self._draw_grid()
        
    def _create_grid(self, initial_grid=None):
        self.grid = self.terrain_color['normal'] * np.ones((4, 12, 3))
        self._add_objectives(self.grid)
        
    def _add_objectives(self, grid):
        grid[-1, 1:11] = self.terrain_color['cliff']
        grid[-1, -1] = self.terrain_color['objective']
        
    def _draw_grid(self):
        self.fig, self.ax = plt.subplots(figsize=(12, 4))
        self.ax.grid(which='minor')       
        self.q_texts = [self.ax.text(*self._id_to_position(i)[::-1], '0',
                                     fontsize=11, verticalalignment='center', 
                                     horizontalalignment='center') for i in range(12 * 4)]     
         
        self.im = self.ax.imshow(hsv_to_rgb(self.grid), cmap='terrain',
                                 interpolation='nearest', vmin=0, vmax=1)        
        self.ax.set_xticks(np.arange(12))
        self.ax.set_xticks(np.arange(12) - 0.5, minor=True)
        self.ax.set_yticks(np.arange(4))
        self.ax.set_yticks(np.arange(4) - 0.5, minor=True)
        
    def reset(self):
        self.player = (3, 0)        
        return self._position_to_id(self.player)
    
    def step(self, action):
        # Possible actions
        if action == 0 and self.player[0] > 0:
            self.player = (self.player[0] - 1, self.player[1])
        if action == 1 and self.player[0] < 3:
            self.player = (self.player[0] + 1, self.player[1])
        if action == 2 and self.player[1] < 11:
            self.player = (self.player[0], self.player[1] + 1)
        if action == 3 and self.player[1] > 0:
            self.player = (self.player[0], self.player[1] - 1)
            
        # Rules
        if all(self.grid[self.player] == self.terrain_color['cliff']):
            reward = -100
            done = True
        elif all(self.grid[self.player] == self.terrain_color['objective']):
            reward = 0
            done = True
        else:
            reward = -1
            done = False
            
        return self._position_to_id(self.player), reward, done
    
    def _position_to_id(self, pos):
        ''' Maps a position in x,y coordinates to a unique ID '''
        return pos[0] * 12 + pos[1]
    
    def _id_to_position(self, idx):
        return (idx // 12), (idx % 12)
        
    def render(self, q_values=None, action=None, max_q=False, colorize_q=False):
        assert self.player is not None, 'You first need to call .reset()'  
        
        if colorize_q:
            assert q_values is not None, 'q_values must not be None for using colorize_q'            
            grid = self.terrain_color['normal'] * np.ones((4, 12, 3))
            values = change_range(np.max(q_values, -1)).reshape(4, 12)
            grid[:, :, 1] = values
            self._add_objectives(grid)
        else:            
            grid = self.grid.copy()
            
        grid[self.player] = self.terrain_color['player']       
        self.im.set_data(hsv_to_rgb(grid))
               
        if q_values is not None:
            xs = np.repeat(np.arange(12), 4)
            ys = np.tile(np.arange(4), 12)  
            
            for i, text in enumerate(self.q_texts):
                if max_q:
                    q = max(q_values[i])    
                    txt = '{:.2f}'.format(q)
                    text.set_text(txt)
                else:                
                    actions = ['U', 'D', 'R', 'L']
                    txt = '\n'.join(['{}: {:.2f}'.format(k, q) for k, q in zip(actions, q_values[i])])
                    text.set_text(txt)
                
        if action is not None:
            self.ax.set_title(action, color='r', weight='bold', fontsize=32)

        plt.pause(0.01)

In [4]:
UP = 0
DOWN = 1
RIGHT = 2
LEFT = 3
actions = ['UP', 'DOWN', 'RIGHT', 'LEFT']

In [5]:
env = GridWorld()

# Grid

<img src="imges/fig1.png" width="400" height="200">

We need a table of values that maps each state-action pair to a value, we'll create such table and initialize all values to zero (or to a random value)

In [6]:
# The number of states in simply the number of "squares" in our grid world, in this case 4 * 12
num_states = 4 * 12
# We have 4 possible actions, up, down, right and left
num_actions = 4

q_values = np.zeros((num_states, num_actions))

In [7]:
df = pd.DataFrame(q_values, columns=[' up ', 'down', 'right', 'left'])
df.index.name = 'States'

In [8]:
df.head()

Unnamed: 0_level_0,up,down,right,left
States,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0.0,0.0,0.0,0.0
1,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0


In [9]:
# Maybe talk on why we need exploration?

Exploitation: Focuses on using the knowledge already gathered to maximize immediate rewards.

Exploration: Involves trying new actions to discover potentially better strategies or states.

Without exploration, the agent risks getting stuck in suboptimal policies because it never tries untested actions that might yield higher rewards in the long run.

In [10]:
def egreedy_policy(q_values, state, epsilon=0.1):
    ''' 
    Choose an action based on a epsilon greedy policy.    
    A random action is selected with epsilon probability, else select the best action.    
    Returns:
        action --type(int)
    '''
    # choose a randon integer from [0, 1)
    explore_or_exploit = np.random.random()
    if explore_or_exploit < epsilon: # do random action
        # randomly select an integer from the range [0, 1, 2, 3]
        action = np.random.choice(4)
    else:
        action = np.argmax(q_values[state, :])
    return action        

In [11]:
def q_learning(env, num_episodes=500, render=True, exploration_rate=0.1,
               learning_rate=0.5, gamma=0.9):    
    q_values = np.zeros((num_states, num_actions))
    ep_rewards = [] # episode_rewards
    
    ## TODO
    for episode in range(0, num_episodes):
        state = env.reset()
        reach_terminal_state = False
        sum_of_rewards = 0
        while not reach_terminal_state:
            action = egreedy_policy(q_values, state, exploration_rate)
            # do the action
            next_state , reward, reach_terminal_state = env.step(action)
            sum_of_rewards += reward

            sample = reward + gamma * np.max(q_values[next_state, :])
            q_values[state][action] = (1 - learning_rate) * q_values[state][action] + learning_rate * sample

            state = next_state

            # # if want to render
            # if render:
            #     env.render(q_values, action=actions[action], colorize_q=True)
        ep_rewards.append(sum_of_rewards)

    return ep_rewards, q_values

In [12]:
q_learning_rewards, q_values = q_learning(env, gamma=0.9, learning_rate=1, render=True)

In [13]:
env.render(q_values, colorize_q=True)

# output
for q-learning

<img src="imges/fig2.png" width="400" height="200">
<img src="imges/fig2_render.png" width="400" height="200">

In [14]:
np.mean(q_learning_rewards)

-39.14

In [15]:
q_learning_rewards, _ = zip(*[q_learning(env, render=False, exploration_rate=0.1,
                                         learning_rate=1) for _ in range(10)])
avg_rewards = np.mean(q_learning_rewards, axis=0)
mean_reward = [np.mean(avg_rewards)] * len(avg_rewards)

fig, ax = plt.subplots()
ax.set_xlabel('Episodes')
ax.set_ylabel('Rewards')
ax.plot(avg_rewards)
ax.plot(mean_reward, 'g--')

print('Mean Reward: {}'.format(mean_reward[0]))

Mean Reward: -41.40820000000001


# SARSA learning
For a learning agent in any Reinforcement Learning algorithm it’s policy can be of two types:
1.On Policy: In this, the learning agent learns the value function according to the current action derived from the policy currently being used.
2.Off Policy: In this, the learning agent learns the value function according to the action derived from another policy.

Q-Learning technique is an Off Policy technique and uses the greedy approach to learn the Q-value. SARSA technique, on the other hand, is an On Policy and uses the action performed by the current policy to learn the Q-value.

## Q learning:
Q(s, a) <- (1 - learning_rate) * Q(s, a) + learning_rate * (reward + gamma + max_a'(Q(s', a')))
## SARSA learning:
Q(s, a) <- (1 - learning_rate) * Q(s, a) + learning_rate * (reward + gamma + Q(s', a'))

In [15]:
def sarsa(env, num_episodes=500, render=True, exploration_rate=0.1,
          learning_rate=0.5, gamma=0.9):
    q_values_sarsa = np.zeros((num_states, num_actions))
    ep_rewards = []
    
    ## TODO
    for episode in range(0, num_episodes):
        state = env.reset()
        reach_terminal_state = False
        sum_of_rewards = 0
        action = egreedy_policy(q_values_sarsa, state, exploration_rate)
        while not reach_terminal_state:
            # do the action
            next_state , reward, reach_terminal_state = env.step(action)
            sum_of_rewards += reward

            # do the next action
            next_action = egreedy_policy(q_values_sarsa, state, exploration_rate)

            sample = reward + gamma * q_values_sarsa[next_state, next_action]
            q_values_sarsa[state][action] = (1 - learning_rate) * q_values_sarsa[state][action] + learning_rate * sample

            state = next_state
            action = next_action
            
            # # if wanna render
            # if render:
            #     env.render(q_values, action=actions[action], colorize_q=True)
        ep_rewards.append(sum_of_rewards)  
    return ep_rewards, q_values_sarsa

In [16]:
sarsa_rewards, q_values_sarsa = sarsa(env, render=True, learning_rate=0.5, gamma=0.99)

In [17]:
np.mean(sarsa_rewards)

-66.4096

In [18]:
sarsa_rewards, _ = zip(*[sarsa(env, render=False, exploration_rate=0.1) for _ in range(10)]) # for in range(100)

avg_rewards = np.mean(sarsa_rewards, axis=0)
mean_reward = [np.mean(avg_rewards)] * len(avg_rewards)

fig, ax = plt.subplots()
ax.set_xlabel('Episodes')
ax.set_ylabel('Rewards')
ax.plot(avg_rewards)
ax.plot(mean_reward, 'g--')

print('Mean Reward: {}'.format(mean_reward[0]))

KeyboardInterrupt: 

## Visualization

In [18]:
def play(q_values):
    env = GridWorld()
    state = env.reset()
    done = False

    while not done:    
        # Select action
        action = egreedy_policy(q_values, state, 0.0)
        # Do the action
        next_state, reward, done = env.step(action)  

        # Update state and action        
        state = next_state  
        
        env.render(q_values=q_values, action=actions[action], colorize_q=True)

In [19]:
play(q_values)

In [None]:
play(q_values_sarsa)

# output
for sarsa-learning

<img src="imges/fig3.png" width="400" height="200">
<img src="imges/fig3_render.png" width="400" height="200">