# Reinforcement learning agents

This notebook gathers the functions creating different kinds of agents for foraging and target search in various scenarios, adapted for their use in the reinforcement learning paradigm.

In [None]:
#| default_exp rl_framework.numba.agents

#| hide
# Libs

In [None]:
#| export
import numpy as np
from numba.experimental import jitclass
from numba import float64, int64,  njit, prange

In [None]:
#| hide
import matplotlib.pyplot as plt

# Helpers

## Random sampling from array with probs

In [None]:
# |export
@njit
def rand_choice_nb(arr, prob):
    """
    :param arr: A 1D numpy array of values to sample from.
    :param prob: A 1D numpy array of probabilities for the given samples.
    :return: A random sample from the given array with a given probability.
    """
    return arr[np.searchsorted(np.cumsum(prob), np.random.random(), side="right")]

# Forager

In [None]:
# |export
@jitclass([("size_state_space", int64[:]),           
           ("initial_prob_distr", float64[:,:]),           
           ("fixed_policy", float64[:,:]) ,
           ("h_matrix", float64[:,:]) ,
           ("g_matrix", float64[:,:]) ,
           ("h_0", float64[:,:]),
           ("prefactor_1", float64[:]),
           ("prefactor_2", float64[:]),
           ("last_upd_G", float64[:,:])
          ])
class Forager():
    num_actions : int
    gamma_damping : float
    eta_glow_damping : float
    policy_type : str
    beta_softmax : float
    num_percepts : int
    agent_state : int
    size_state_space : np.array
    initial_prob_distr : np.array
    fixed_policy : np.array    
    h_matrix : np.array
    g_matrix : np.array
    h_0 : np.array
    g_update : str
    # Efficient H update
    prefactor_1: np.array
    prefactor_2: np.array
    max_no_H_update : int
    N_upd_H : int
    # Efficient G update
    last_upd_G: np.array
    N_upd_G: int
    
    def __init__(self, 
                 # Number of actions the agent can take
                 num_actions : int, 
                 # Size of the state space, given as an array where each entry is the dimension of each environmental feature 
                 size_state_space : np.array, 
                 # Gamma damping from PS
                 gamma_damping=0.0, 
                 # Eta damping from PS
                 eta_glow_damping=0.0,
                 # Policy type. Can be 'standard' or 'softmax'
                 policy_type='standard', 
                 # Beta parameter for softmax policy
                 beta_softmax=3, 
                 # Initial probability distribution for the H matrix
                 initial_prob_distr = np.array([[],[]]), 
                 # Fixed policy for the agent to follow
                 fixed_policy=np.array([[],[]]),
                 # Max number of steps without updating the H matrix. After this number, the full H matrix is updated
                 max_no_H_update = int(1e4),
                 # Type of update for the G matrix. Can be 's' (sum) or 'r' (reset)
                 # Works as follows: s (sum) -> g_mat += 1 or r (reset) -> gmat = 1 when updating gmat
                 g_update = 's', 
                ):
        """

        This class defines a Forager agent, able to perform actions and learn from rewards based on the PS paradigm.

        This is an updated version from the one used in the original paper (https://doi.org/10.1088/1367-2630/ad19a8), 
        taking into account the improvements made to the H and G matrices proposed by Michele Caraglio in our paper
        (https://doi.org/10.1039/D3SM01680C).
        """
        
        self.agent_state = 0
        
        self.num_actions = num_actions
        self.size_state_space = size_state_space
        self.num_percepts = int(np.prod(self.size_state_space)) # total number of possible percepts
        
        self.gamma_damping = gamma_damping
        self.eta_glow_damping = eta_glow_damping
        self.policy_type = policy_type
        self.beta_softmax = beta_softmax
        self.initial_prob_distr = initial_prob_distr
        self.fixed_policy = fixed_policy    
        self.g_update = g_update
        
        self.init_matrices()
        
        # For H update
        self.max_no_H_update = max_no_H_update      
        self.N_upd_H = 0
        self.prefactor_1 = (1-self.gamma_damping)**(np.arange(1,self.max_no_H_update+1)) 
        self.prefactor_2 = np.zeros(self.max_no_H_update)
        for i in range(max_no_H_update):
            self.prefactor_2[i] = self.gamma_damping*np.sum((1-self.gamma_damping)**np.arange(i+1))
            
        # For G update
        self.last_upd_G = np.zeros((self.num_actions, self.num_percepts))
        self.N_upd_G = 0
                              
        
    def init_matrices(self):

        self.g_matrix = np.zeros((self.num_actions, self.num_percepts)) #glow matrix, for processing delayed rewards

        # initialize h matrix with different values
        if len(self.initial_prob_distr[0]) > 0:          
            self.h_0 = self.initial_prob_distr
            self.h_matrix = self.h_0.copy()
        else: 
            self.h_matrix = np.ones((self.num_actions, self.num_percepts), dtype=np.float64) #Note: the first index specifies the action, the second index specifies the percept.
            
    def _learn_post_reward(self, reward):
        '''Given a reward, updates the whole H-matrix taking into account that we did not have updates
        for the last N_upd_H steps.'''
        # Update the full G matrix
        self._G_upd_full()
        
        if self.N_upd_H == 0:
            print('Counter for h_matrix is zero, check that your are properly updating it!')
        if len(self.initial_prob_distr[0]) > 0:
            self.h_matrix = self.prefactor_1[self.N_upd_H-1] * self.h_matrix + self.prefactor_2[self.N_upd_H-1] * self.h_0 + reward * self.g_matrix
        else:
            self.h_matrix = self.prefactor_1[self.N_upd_H-1] * self.h_matrix + self.prefactor_2[self.N_upd_H-1] + reward * self.g_matrix
        self.N_upd_H = 0
        
    def _H_upd_single_percept(self, t, percept):
        '''Given a percept and the time t passed since the last H-matrix update,
        returns the corresponding --updated-- column of the H-matrix for all actions.
        This updated is local and does no affect the H-matrix.'''
        if len(self.initial_prob_distr[0]) > 0:
            return self.prefactor_1[t-1] * self.h_matrix[:, percept] + self.prefactor_2[t-1] * self.h_0[:, percept]
        else:
            return self.prefactor_1[t-1] * self.h_matrix[:, percept] + self.prefactor_2[t-1] 
        
    def _G_upd_single_percept(self, percept, action):
        '''Given a percept-action tuple, updates that element of the G-matrix. Updates the last_upd_G
        to keep track of when was the matrix updated.''' 

        if self.g_update == 's': # For the current (a,s) tuple, we damp and sum one
            if self.eta_glow_damping == 1:
                # We do this because below we would have 0**0 = 1
                self.g_matrix[action, percept] = 1
            else:
                self.g_matrix[action, percept] = (1 - self.eta_glow_damping)**(self.N_upd_G - self.last_upd_G[action, percept])*self.g_matrix[action, percept] + 1
        elif self.g_update == 'r':
            self.g_matrix[action, percept] = 1
        
        
        # Then update the last_upd matrix
        self.last_upd_G[action, percept] = self.N_upd_G
        
    def _G_upd_full(self):
        '''Given the current number of steps without an update, updates the whole G-matrix.
        Then, resets all counters.'''
        self.g_matrix = (1 - self.eta_glow_damping)**(self.N_upd_G - self.last_upd_G) * self.g_matrix
        self.N_upd_G = 0
        self.last_upd_G = np.zeros((self.num_actions, self.num_percepts))
            
            
    def percept_preprocess(self, observation):
        """
        Takes a multi-feature percept and reduces it to a single integer index.

        Parameters
        ----------
        observation : ARRAY of integers >=0, of the same length as self.num_percepts_list
            List that describes the observation. Each entry is the value that each feature takes in the observation.
            observation[i] < num_percepts_list[i] (strictly)

        Returns
        -------
        percept : int
            Percept index that corresponds to the input observation.

        """
        
        percept = 0
        for idx_obs, obs_feature in enumerate(observation):
            percept += int(obs_feature * np.prod(self.size_state_space[:idx_obs]))  
        return percept
    
    def deliberate(self, observation):
        """
        Given an observation , this method chooses the next action and records that choice in the g_matrix.

        Parameters
        ----------
        observation : list
            List that describes the observation, as specified in percept_preprocess.
        action : optional, bool
            Mostly for debugging, we can input the action and no deliberation takes place, but g_matrix is updated

        Returns
        -------
        action : int
            Index of the chosen action.

        """
        percept = self.percept_preprocess(observation) 
        
        
        # Probabilities must be of update h_matrix. We feed the prob distr the update h_matrix
        # for the percept, but don't update the h_matrix
        if self.N_upd_H == 0:
            current_h_mat = self.h_matrix[:, percept]
        else:
            current_h_mat = self._H_upd_single_percept(self.N_upd_H, percept)
        probs = self.probability_distr(percept, h_matrix = current_h_mat)        
        action = rand_choice_nb(arr = np.arange(self.num_actions), prob = probs)
        
        # Update the G matrix for current (s,a) tuple
        self._G_upd_single_percept(percept, action)
        
        return action
    
    def probability_distr(self, percept, h_matrix = None):
        """
        UPDATE (added the optional input)
         
        Given a percept index, this method returns a probability distribution over actions.

        Parameters
        ----------
        percept : int
            Index of the given percept.

        Returns
        -------
        probability_distr : np.array, length = num_actions
            Probability for each action (normalized to unit sum), computed according to policy_type.

        """
        
        if self.policy_type == 'standard':
            h_vector = self.h_matrix[:, percept] if h_matrix is None else h_matrix
            probability_distr = h_vector / np.sum(h_vector)
        elif self.policy_type == 'softmax':
            h_vector = self.beta_softmax * self.h_matrix[:, percept] if h_matrix is None else h_matrix
            h_vector_mod = h_vector - np.max(h_vector)
            probability_distr = np.exp(h_vector_mod) / np.sum(np.exp(h_vector_mod))
        return probability_distr
    
    def learn(self, reward):
        """
        Given a reward, this method updates the h matrix.

        Parameters
        ----------
        reward : float
            Value of the obtained reward.
        """
        if len(self.initial_prob_distr[0]) > 0:
            self.h_matrix =  self.h_matrix - self.gamma_damping * (self.h_matrix - self.h_0) + reward * self.g_matrix
        else:
            self.h_matrix =  self.h_matrix - self.gamma_damping * (self.h_matrix - 1.) + reward * self.g_matrix
            
    def reset_g(self):
        """
        Resets the g_matrix.
        """
        self.g_matrix = np.zeros((self.num_actions, self.num_percepts), dtype=np.float64)
        
    def deliberate_fixed_policy(self, observation):
        """
        Given an observation , this method chooses the next action according to the fixed policy specified as attribute of the class.

        Parameters
        ----------
        observation : list
            List that describes the observation, as specified in percept_preprocess.

        Returns
        -------
        action : int
            Index of the chosen action.

        """
        percept = self.percept_preprocess(observation) 
        if len(self.fixed_policy[0]) > 0:
            action = rand_choice_nb(arr = np.arange(self.num_actions), prob = self.fixed_policy[percept])
        else:
            print('No fixed policy was given to the agent. The action will be selected randomly.')
            action = np.random.choice(self.num_actions)
    
        self.g_matrix = (1 - self.eta_glow_damping) * self.g_matrix
        self.g_matrix[action, percept] += 1 #record latest decision in g_matrix
    
        return action
    
    def act(self, action):
        """
        Agent performs the given action.

        Parameters
        ----------
        action : int (0, 1)
            1 if it changes direction, 0 otherwise
        """
        
        # If the agent changes direction   
        if action == 1:
            self.agent_state = 0
        else:
            self.agent_state += 1  
            
    
    def get_state(self):  
        ''' simplified to case of single forager. Returns list because is what deliberate needs'''
        return np.array([self.agent_state])

# Parallel training launchers

## For ResetEnv

### Search loop

In [None]:
#| export                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
@njit
def train_loop_reset(episodes, time_ep, agent, env, h_mat_allT = False, when_save_h_mat = 1, reset_after_reward = True):  

    if h_mat_allT: 
        policy_t = np.zeros((int(np.ceil(episodes/when_save_h_mat)), 
                             agent.h_matrix.shape[-1]))
        idx_policy_save = 0
        
    save_rewards = np.zeros(episodes)
    
    for ep in range(episodes):
        
        #initialize environment and agent's counter and g matrix
        env.init_env()
        agent.agent_state = 0
        agent.reset_g()

        for t in range(time_ep):
            agent.N_upd_H += 1
            agent.N_upd_G += 1

            #get perception
            state = agent.get_state()
            
            # if we reached the maximum state space, we perform turn action
            if state == agent.h_matrix.shape[-1]:
                action = 1
            # else we do as normal    
            else: 
                action = agent.deliberate(state)
                
            #act (update counter)
            agent.act(action)

            #update positions
            reward = env.update_pos(action)            

            if reward == 1 or agent.N_upd_H == agent.max_no_H_update-1:
                agent._learn_post_reward(reward)
            
            if reset_after_reward == True and reward != 0:
                agent.agent_state = 0

            # Saving
            save_rewards[ep] += reward
        if h_mat_allT and ep % when_save_h_mat == 0:
            policy_t[idx_policy_save] = agent.h_matrix[0,:] / agent.h_matrix.sum(0)
            idx_policy_save += 1
      
    return (save_rewards/time_ep, policy_t) if h_mat_allT else (save_rewards/time_ep, agent.h_matrix)

In [None]:
#| hide
#|eval: false

# from rl_opts.rl_framework.numba.environments import ResetEnv_1D
# from rl_opts.rl_framework.numba.agents import Forager
# import numpy as np 

env = ResetEnv_1D(L = 5, D = 1/2)
agent = Forager(num_actions = 2,
                          size_state_space = np.array([100]))

res = train_loop_reset(100, 100, agent, env)

[[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
  1. 1. 1. 1.]]


### Launchers

> Note: we have to separate the launchers in 1D and 2D because of `numba` compilation, which would give errors due to the enviroments asking for different inputs.

#### 1D

In [None]:
#| export
from rl_opts.rl_framework.numba.environments import ResetEnv_1D



@njit(parallel = True)
def run_agents_reset_1D(episodes, time_ep, N_agents,
                       # Environment props
                       D = 1/2, L = 10.0, 
                       # Agent props
                       num_actions = 2,
                       size_state_space = np.array([100]),
                       gamma_damping = 0.00001,
                       eta_glow_damping = 0.1,
                       g_update = 's',
                       initial_prob_distr = np.array([[],[]]),
                       policy_type = 'standard', 
                       beta_softmax = 3,  
                       fixed_policy = np.array([[],[]]),
                       max_no_H_update = int(1e3),
                       h_mat_allT = False, 
                       reset_after_reward = True,
                       # When we want N_agent != number of max cores, we use this to make few runs
                       # over the selected number of cores, given by N_agents.
                       num_runs = None 
                      ):

    if num_runs is None:
        total_agents = N_agents
    else:
        total_agents = N_agents*num_runs
    
    save_rewards = np.zeros((total_agents, episodes))
    if h_mat_allT:
        save_h_matrix = np.zeros((total_agents, episodes, size_state_space.prod()))  
    else:        
        save_h_matrix = np.zeros((total_agents, 2, size_state_space.prod())) 

    # if N_agents is an integer, we consider that we run this at full cores
    if num_runs is None:
    
        for n_agent in prange(N_agents):
            
            agent = Forager(num_actions, size_state_space,
                                      gamma_damping, eta_glow_damping, 
                                      policy_type, beta_softmax,
                                      initial_prob_distr,fixed_policy,max_no_H_update,g_update)
            env = ResetEnv_1D(L, D)
            
            rews, mat = train_loop_reset(episodes, time_ep, agent, env, h_mat_allT, reset_after_reward)            
     
            # print(rews.shape, rews[0], rews[0].dtype)
            
            save_rewards[n_agent] = rews
            save_h_matrix[n_agent] = mat

    # If it is a list, the first number is the number of parallel agents and the second the 
    # times we run those parallel agents
    else:

        n_run = -1        
        for run in range(num_runs):
            n_run += 1
            for idxa in prange(N_agents):
            
                agent = Forager(num_actions, size_state_space,
                                gamma_damping, eta_glow_damping, 
                                policy_type, beta_softmax,
                                initial_prob_distr,fixed_policy,max_no_H_update, g_update)
                env = ResetEnv_1D(L, D)
                
                rews, mat = train_loop_reset(episodes, time_ep, agent, env, h_mat_allT, reset_after_reward)            
                         
                save_rewards[idxa*num_runs+n_run] = rews
                save_h_matrix[idxa*num_runs+n_run] = mat
        
        
    return save_rewards, save_h_matrix

In [None]:
#|hide
#|eval: false
# from rl_opts.rl_framework.numba.environments import ResetEnv_1D
# from rl_opts.rl_framework.numba.agents import Forager, train_loop_reset


rews, mats = run_agents_reset_1D(5, 10, 5, L = 2, num_runs=2, eta_glow_damping=0);

#### 2D

In [None]:
#| export
from rl_opts.rl_framework.numba.environments import ResetEnv_2D


@njit(parallel = True)
def run_agents_reset_2D(episodes, time_ep, N_agents,
                     # Environment props
                     dist_target = 10.0, radius_target = 1.0, D = 1/2,
                     # Agent props
                     num_actions = 2,
                     size_state_space = np.array([100]),
                     gamma_damping = 0.00001,
                     eta_glow_damping = 0.1,
                     initial_prob_distr = np.array([[],[]]),
                     policy_type = 'standard', 
                     beta_softmax = 3,  
                     fixed_policy = np.array([[],[]]),
                     max_no_H_update = int(1e3),
                     h_mat_allT = False, when_save_h_mat = 1,
                     reset_after_reward = True,
                     g_update = 's',
                    # When we want N_agent != number of max cores, we use this to make few runs
                    # over the selected number of cores, given by N_agents.
                    num_runs = None                         
                      ):
    
    save_rewards = np.zeros((N_agents, episodes))
    if h_mat_allT:
        save_h_matrix = np.zeros((N_agents, 
                                  int(np.ceil(episodes/when_save_h_mat)), 
                                  size_state_space.prod()))  
    else:        
        save_h_matrix = np.zeros((N_agents, 2, size_state_space.prod())) 

    # if N_agents is an integer, we consider that we run this at full cores
    if num_runs is None:
    
        for n_agent in prange(N_agents):
            
            agent = Forager(num_actions, size_state_space,
                            gamma_damping, eta_glow_damping, 
                            policy_type, beta_softmax,
                            initial_prob_distr,fixed_policy,max_no_H_update,g_update)
            
            env = ResetEnv_2D(dist_target, radius_target, D)
            
            rews, mat = train_loop_reset(episodes, time_ep, agent, env, h_mat_allT, when_save_h_mat, reset_after_reward)            
    
        
            save_rewards[n_agent] = rews
            save_h_matrix[n_agent] = mat

    # If it is a list, the first number is the number of parallel agents and the second the 
    # times we run those parallel agents
    else:

        n_run = -1        
        for run in range(num_runs):
            n_run += 1
            for idxa in prange(N_agents):
            
                agent = Forager(num_actions, size_state_space,
                                gamma_damping, eta_glow_damping, 
                                policy_type, beta_softmax,
                                initial_prob_distr,fixed_policy,max_no_H_update, g_update)
                env = ResetEnv_2D(dist_target, radius_target, D)
                
                rews, mat = train_loop_reset(episodes, time_ep, agent, env, h_mat_allT, reset_after_reward)            
                         
                save_rewards[idxa*num_runs+n_run] = rews
                save_h_matrix[idxa*num_runs+n_run] = mat
        
    return save_rewards, save_h_matrix

In [None]:
#| hide
#|eval: false

# from rl_opts.rl_framework.numba.environments import ResetEnv_2D
# from rl_opts.rl_framework.numba.agents import Forager, train_loop_reset


run_agents_reset_2D(10,10, 15, dist_target = 10, radius_target = 1, D = 1, 
                           size_state_space=np.array([3]),
                          h_mat_allT=True, when_save_h_mat=5);

# nbdev

In [None]:
#| hide
from nbdev import nbdev_export ; nbdev_export()