<h1><div style="text-align: center;"> Reinforcement Learning </div></h1>
<h2><div style="text-align: center;"> Assigment: Q-learning agent with $\epsilon$ greedy policy </div></h2>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/INM707/task_2

<h2><div> Task objectives </div></h2> 

- Implement an agent that uses bootstrapping "Q-learning" in a MDP.
- Train and evaluate the q-agent with an $\epsilon-greedy$ policy in the Phoneme environment.
- Visualise the effect of the training parameter on the learning results.

We updated the number of obstacled and the rewards of Phoneme environment. We want to motivate the agent to search rather than only heading to the goal.

- The obstacles move in the same direction as the agent
- The number of obstacles on the grid is:

$ o = \lfloor \frac{a}{10} \rfloor $

- The three modified rewards are: 
1. Reaching the time limit and $ʊ$ collected:  $a\times w_i$
2. Reaching the goal and $ʊ$ left: $ a \times (w_i - ʊ_l - \lfloor \frac{a}{3} \rfloor)$
3. Reaching the goal with not words: $ -a \times 3$

<h2><div>Libraries</div></h2> 

In [None]:
import os 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import time
from matplotlib import style
from collections import defaultdict, namedtuple
from plotting import plot_episodes_stats, rend_sns
from phonemes import Phonemes, Action 
rng=np.random.default_rng()

<h2><div>1. Q-learning Methods and Functions</div></h2>

<h3><div> 1.1 Implementing the Q-learning algorithm - Method</div></h3> 

In [None]:
# noinspection NonAsciiCharacters

class Qlagent(object):
    """
    Learn by using the Q(s,a) updates
    """

    def __init__(self, env, **agent_info):

        self.action_list = ['up', 'down', 'left', 'right', 'grab']

        self.actions = dict(map(reversed, enumerate(self.action_list)))
        self.q_val = defaultdict(
            lambda: {a: 0 for a in self.action_list})  # Optimistic initilisation
        self.env = env
        self.epsilon = agent_info['epsilon']  # Exploration rate
        self.epsilon_init = agent_info['epsilon']  # Exploration rate
        self.decay = agent_info['decay']  # Dicrease the level of exploration
        self.gamma = agent_info['gamma']  # Discount factor
        self.alpha = agent_info['alpha']  # learning rate
        self.min_epsilon = agent_info['min_epsilon']

    def state(self, obs):
        """
        Converts the dictionary of observations into a string
        :param obs: a dictionary with information about the environment
        :return:  a string of the  agent's state
        """
        
        agent = np.array(obs['agent_pos'])
        neigh = obs['neigh']
        phon = obs['ʊ_pos']
        pho_coord = np.array(obs['ʊ_coords'])
        goal = np.array(obs['dist_goal'])
        obstacles = obs['obstacles']
        open_a = np.array(obs['ʌ_coords'])
        long_u = np.array(obs['u:_coords'])

        s = (agent, neigh, obstacles, phon, pho_coord, open_a, long_u, goal)
    
        return self.state_to_string(s)

    @staticmethod
    def state_to_string(state):
        """
        :param state: a tuple with observations
        :return a string of the agent's state
        """
        
        my_str = ''

        for item in state:

            if np.isscalar(item):
                my_str += str(item) + ' '

            elif isinstance(item, np.ndarray):

                for i in item.astype(str):
                    my_str += ' '.join(i) + ' '

            else:
                for i in item:
                    if isinstance(i, np.ndarray):
                        my_str += ' '.join(i.astype(str)) + ' '
                    else:
                        my_str += str(i)
 
        return my_str

    def take_action(self, obs):
        """
        :param obs: a dictionary with information of the environment
        :return: Return an action following an epsilon gredy policy
        """

        s = self.state(obs)

        rand_p = rng.random()

        if rand_p < self.epsilon:

            action = np.random.choice(self.action_list)

        else:

            action = self.argmax_act(s)

        return action

    def argmax_act(self, s):
        """
        :param s: a string contain the current state of the agent
        :return:  Returns the best action by breaking ties.
        """
        max_val = max([self.q_val[s][a] for a in self.action_list])
        max_actions = [a for a in self.action_list if self.q_val[s][a] == max_val]

        return np.random.choice(max_actions)

    def update_epsilon(self):

        self.epsilon *= self.decay
        if self.epsilon < self.min_epsilon:
            self.epsilon = self.min_epsilon

    def update_qval(self, state1, state2, action, reward):
        """
        Return the estimated Q-value difference between the estimate and target state action values
        :param state1: a string with the previous state
        :param state2: a string with the current state
        :param action: the action taken on the previous state
        :param reward: the reward from the previous state-action par
        :return: None 
        """

        prev_s = self.state(state1)

        current_s = self.state(state2)

        predict = self.q_val[prev_s][action]

        td_target = reward + self.gamma * max([self.q_val[current_s][a] for a in self.action_list])

        # Update q-values
        self.q_val[prev_s][action] += self.alpha * (td_target - predict)
                        
    def game_end(self, state1, action, reward):
        """
        Compute the last update when agent terminates.
        :param state1: a string with the previous state
        :param action: the action taken on the previous state
        :param reward: the reward from the previous state-action par
        :return: None 
        """
       
        prev_s = self.state(state1)

        # Update q-values
        self.q_val[prev_s][action] += self.alpha * (reward - self.q_val[prev_s][action])
        
    def reset_q(self):
        """
        :return: reset the q_values to 
        """
        return self.q_val.clear()

    def reset_eps(self):
        """
        :return: reset the epsilon to its initial value
        """
        return self.epsilon_init

<h3><div> 1.2 Running Q-learning experiments </div></h3>

In [None]:
def qlearning_exp(environment, q_agent, stats_log, reset=False, **exp_params):
    """
    :param environment: an numpy array
    :param q_agent: a method that implemets the q-learning algorithm
    :param stats_log: a named tuple to log the episode stats
    :param exp_params: a dictionary with parameter to run the experiment - episodes, start decay
    :return: qvalues and statistics
    """
    short_u = environment.num_words//3
    collected = list()
    acting = list()
    exp_reward = np.zeros(exp_params['episodes'])
    exp_length = np.zeros(exp_params['episodes'])
    exp_mean = np.zeros(exp_params['episodes'])
    exp_var = np.zeros(exp_params['episodes'])
    show_stats = exp_params['show_stats']
    update = exp_params['update']
    start_decay = exp_params['decay_from']

    for episode in tqdm(range(0, exp_params['episodes'])):

        reward_list = list()

        # initialize state
        observation = environment.reset()

        # indicate terminal state
        done = False

        # loops until the state/observation is terminal
        while not done:

            # choose action from state using policy derived from Q
            action = q_agent.take_action(observation)

            acting.append(action)  # for exploration

            # perform an action, observe, give a reward and indicate if terminal
            next_observation, reward, done, steps = environment.env_step(action, prints=False)

            exp_reward[episode] += reward  # accumulated reward

            reward_list.append(reward)

            # agent learn (Q-Learning update)
            q_agent.update_qval(observation, next_observation, action, reward)

            # observation <- next observation
            observation = next_observation

            if update == 'step' and episode + 1 >= start_decay:
                q_agent.update_epsilon()  # updates epsilon per step

        # agent learn (Q-Learning last update)
        q_agent.game_end(observation, action, reward)
        
        if update == 'episode' and episode + 1 >= start_decay:
            q_agent.update_epsilon() 
                 
        exp_length[episode] += steps  # Store steps
        exp_mean[episode] += np.mean(reward_list)  # average reward per episode
        exp_var[episode] += np.std(reward_list)  # std of reward per episode

        # Display environment
        if exp_params['display']:
            rend_str, _, rend_arr = environment.display()
            print(rend_str)

        collected.append(short_u - len(environment.ʊ_pos))
        
        if show_stats:
            print('ʊ collected: {}'.format(collected[-1]))
            print('ʊ in left in the environment: {}'.format(len(environment.ʊ_pos)))
            print('Episode: {} | epsilon: {:.5f} | reward: {} |'.format(episode + 1, q_agent.epsilon,
                                                                    exp_reward[episode]))
    stats = stats_log(length_episodes=exp_length, reward_episodes=exp_reward, 
                      episode_mean_reward=exp_mean, episode_std=exp_var, 
                      words_collected=collected)
    exp_std = np.std(exp_reward)
    exp_total_mean = np.mean(exp_reward)

    q_dict = q_agent.q_val
    
    if reset:
    
        q_agent.reset_q()
    
    return q_dict, exp_std, exp_total_mean, stats


<h3><div> 1.3 Running optimality with stored q-values </div></h3>

In [None]:
def optimality(environment, q_dict, stats_log):
    """
    Runs optimality with q-values from previous experiencesaved
    :param environment: an numpy array
    :param q_dict: a dictionary with stored q-values
    :param stats_log: a named tuple to log the episode stats
    :return: uddated q-values and statistics
    """

    collected = list()
    acting = list()
    exp_reward = np.zeros(exp_params['episodes'])
    exp_length = np.zeros(exp_params['episodes'])
    exp_mean = np.zeros(exp_params['episodes'])
    exp_var = np.zeros(exp_params['episodes'])
    show_stats = exp_params['show_stats']
    update = exp_params['update']
    start_decay = exp_params['decay_from']
    q_agent.q_val = q_dict

    for episode in tqdm(range(0, exp_params['episodes'])):

        reward_list = list()

        # initialize state
        observation = environment.reset()
        short_u = len(environment.short_u)

        # indicate terminal state
        done = False

        # loops until the state/observation is terminal
        while not done:

            # choose action from state using policy derived from Q
            action = q_agent.take_action(observation)

            acting.append(action)  # for exploration

            # perform an action, observe, give a reward and indicate if terminal
            next_observation, reward, done, steps = environment.env_step(action, prints=False)

            exp_reward[episode] += reward  # accumulated reward

            reward_list.append(reward)

            # agent learn (Q-Learning update)
            q_agent.update_qval(observation, next_observation, action, reward)

            # observation <- next observation
            observation = next_observation

            if update == 'step' and episode + 1 >= start_decay:
                q_agent.update_epsilon()  # updates epsilon per step

        # agent learn (Q-Learning last update)
        q_agent.game_end(observation, action, reward)

        if update == 'episode' and episode + 1 >= start_decay:
            q_agent.update_epsilon() 
             
            
        exp_length[episode] += steps  # Store steps
        exp_mean[episode] += np.mean(reward_list)  # average reward per episode
        exp_var[episode] += np.std(reward_list)  # std of reward per episode

        # Display environment
        if exp_params['display']:
            rend_str, _, rend_arr = environment.display()
            print(rend_str)

        collected.append(short_u - len(environment.ʊ_pos))
        
        if show_stats:
            print('ʊ in left in the environment: {}'.format(len(environment.ʊ_pos)))
            print('Episode: {} | epsilon: {} | reward: {} |'.format(episode + 1, 
                                                q_agent.epsilon, exp_reward[episode]))
            print('q_values: {}'.format(len(q_agent.q_val)))
        

    stats = stats_log(length_episodes=exp_length, reward_episodes=exp_reward, 
                      episode_mean_reward=exp_mean, episode_std=exp_var, 
                      words_collected=collected)
    exp_std = np.std(exp_reward)
    exp_total_mean = np.mean(exp_reward)

    
    return q_dict, exp_std, exp_total_mean, stats


<h2><div>2. Environment and Agent's parameters initilisation</div></h2>

<h3><div>2.1 Environment initialisation</div></h3>

In [None]:
env_info = {'seed':True, 'sound': 'short_u'}

my_env= Phonemes((10,10), Action, **env_info)

my_env.reset()
str_dis, _ , envir_rend= my_env.display()

# Information about the environment
print('Available area: {} | Total objects on the grid: {}'.format(my_env.area, my_env.total_objects))
print('Total number of agents on the grid: {}, {} - learning agent and {} moving obstacle(s)'.
      format(my_env.obstacles + 1, 1, my_env.obstacles))
print('Mission: learn the words with phonetic sound {}'.format('ʊ'))
print('Word(s) to find: {}'.format(my_env.short_u))

# Renders the environment on a heatmap
rend_sns(envir_rend)

<h3><div>2.2 Agent Initilisation</div></h3>

In [None]:
agent_info= {'epsilon': 0.4, 'gamma': 0.95, 'alpha': 0.1, 'decay':0.9998, 'min_epsilon': 0.01}
q_agent = Qlagent(my_env, **agent_info)

In [None]:
# Create tuple to store the statistics
episode_stats = namedtuple('Stats',['length_episodes', 'reward_episodes', 'episode_mean_reward', 'episode_std', 'words_collected'])

In [None]:
exp_params = {'episodes':50000, 'display': False, 'update':'episode', 'decay_from':100, 'show_stats':True}

<h2><div>Section 3. Running experiments and visualising results</div></h2>

<h3><div>3.1 Running the experiment</div></h3>

In [None]:
q_dict, exp_std, exp_mean, stats= qlearning_exp(my_env, q_agent, episode_stats, reset=True,
                                                                 **exp_params)

<h3><div>3.2 Saving results </div></h3>

In [None]:
data = stats._asdict()
data_dir = os.path.join(os.getcwd(), 'results')
os.makedirs(data_dir, exist_ok=True)
name = str(exp_params['episodes']) + '_' + str(agent_info['alpha']) + str(agent_info['epsilon']) + '.npz'
np.savez(os.path.join(data_dir, name), data)

<h3><div>3.3 Visualising Statistics</div></h3>

In [None]:
plots = plot_episodes_stats(stats, episodes = exp_params['episodes'], smoothing_window=100, hideplot=False, env_dim=(10,10))

In [None]:
idx = np.argmax(stats.reward_episodes)
print('Maximum reward: {} | Episode: {} | Collected: {} | Steps {}'.format(max(stats.reward_episodes),
                            np.argmax(stats.reward_episodes) + 1, stats.words_collected[idx],
                            stats.length_episodes[idx]))

In [None]:
plot_episodes_stats(stats, episodes = exp_params['episodes'], smoothing_window=750, hideplot=False, env_dim=(10,10))

In [None]:
exp_params = {'episodes':50000, 'display': False, 'update':'episode', 'decay_from':100, 'show_stats':True}

<h3><div>3.4 Parameters search</div></h3>

In [None]:
alphas =[0.01, 0.1, 0.5]
epsilons = [0.01, 0.1, 0.5]

exp_params = {'episodes': 60000, 'display': False, 'update':'episode', 'decay_from': 1, 'show_stats':False}
agent_info['min_epsilon']=0.001
fname = 'comp_' + str(exp_params['episodes']) + '_.npz'

comp_dict = dict()
for e in epsilons:
    for a in alphas:
        q_agent.alpha = a
        q_agent.epsilon = e
        q_dict, exp_std, exp_mean, stats = qlearning_exp(my_env, q_agent, episode_stats,reset=True, **exp_params)
        comp_dict[(e,a)]= stats.reward_episodes
                
np.savez(os.path.join(data_dir, fname), comp_dict)

<h3><div>3.5 Visualise parameter search</div></h3>

In [None]:
plot_params = ['r', 'b', 'g', 'm', 'c', 'y', 'k', 'orange','sienna'] 

plt.figure(figsize=(15,8))

step = 100

for i, (k,v) in enumerate(comp_dict.items()):
    m = np.array([np.mean(v[ii:ii+step]) for ii in range(0, len(v), step)])
    s = np.array([np.std(v[ii:ii+step]) for ii in range(0, len(v), step)])
    plt.plot(range(1, len(v)+1, step), m, color=plot_params[i], label='e={},a={}'.format(str(k[0]),str(k[1])))
    plt.fill_between(range(1, len(v), step), m - s/2, m + s/2, color=plot_params[i], alpha=0.2)
plt.xlabel('episodes')
plt.ylabel('average reward')
plt.legend()
plt.savefig(os.path.join('./plots', 'ql_comparison_{}_{}.png'.format(exp_params['episodes'], '10x10')))

<h2><div>4. Optimality test </div></h2>

In [None]:
exp_params = {'episodes':50000, 'display': False, 'update':'episode', 'decay_from':100, 'show_stats':True}
agent_info['min_epsilon']= 0.1
agent_info['epsilon']= 0.001,
q_dict_op, exp_std, exp_total_mean, op_stats = optimality(my_env, q_dict, episode_stats)

In [None]:
plot_episodes_stats(op_stats, episodes = exp_params['episodes'], smoothing_window=50, hideplot=False, env_dim=(10,10))