# Imports

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Minimal Agent

This environment/ agent was adapted from 

**Paper** https://arxiv.org/abs/1503.04187

**GitHub**
https://github.com/vschaik/Active-Inference/blob/78e37aca669fef611e7c82534142139715382c92/IFE.ipynb



## Environment

In [None]:
# environment
class MinimalEnv(object):
  """ Wrap-around 1D state space with single food source.
  
  The probability of sensing food at locations near the food source decays 
  exponentially with increasing distance.
  
  state (int): 1 of N discrete locations in 1D space.
  observation (bool): food detected yes/ no.
  actions(int): {-1, 1} intention to move left or right.
  """
  def __init__(self, 
               N = 16, # how many discrete locations can the agent reside in
               s_0 = 0, # where does the agent start each episode?
               s_food = 8, # where is the food?
               p_move = 0.75, # execute intent with p, else don't move.
               p_o_max = 0.9, # maximum probability of sensing food
               o_decay = 0.2 # decay rate of observing distant food source
               ):
    
    self.o_decay = o_decay
    self.p_move = p_move
    self.p_o_max = p_o_max
    self.s_0 = s_0
    self.s_food = s_food
    self.s_N = N
    self.o_N = 2 # {False, True} indicating whether food has been found
    self.a_N = 2 # {0, 1} to move left/ right in wrap-around 1D state-space
    """
    environment dynamics are governed by two probability distributions
    1. state transition probability p(s'|s, a)
    2. emission/ observation probability p(o|s)
    although we only need to be able to sample from these distributions to 
    implement the environment, we pre-compute the full conditional probability
    ables here so agents can access the true dynamics if required.
    """
    self.p_o_given_s = self.emission_probability() # Matrix A
    self.p_s1_given_s_a = self.transition_dynamics() # Matrix B
    self.s_t = None # state at current timestep


  def transition_dynamics(self):
    """ computes transition probability p(s'| s, a) 
    
    Returns:
    p[s, a, s1] of size (s_N, a_N, s_N)
    """

    p = np.zeros((self.s_N, self.a_N, self.s_N))
    p[:,0,:] = self.p_move * np.roll(np.identity(self.s_N), -1, axis=1) \
              + (1-self.p_move) * np.identity(self.s_N)
    p[:,1,:] = self.p_move * np.roll(np.identity(self.s_N), 1, axis=1) \
              + (1-self.p_move) * np.identity(self.s_N)
    return p

  def emission_probability(self):
    """ computes conditional probability table p(o|s). 
    
    Returns:
    p[s, o] of size (s_N, o_N)
    """
    s = np.arange(self.s_N)
    # distance from food source
    d = np.minimum(np.abs(s - self.s_food), 
                   np.abs(s + self.s_N - self.s_food))
    p = np.zeros((self.s_N, self.o_N))
    # exponentially decaying concentration ~ probability of detection
    p[:,1] = self.p_o_max * np.exp(-self.o_decay * d)
    p[:,0] = 1 - p[:,1]
    return p

  def reset(self):
    self.s_t = self.s_0
    return self.sample_o()

  def step(self, a):
    if (self.s_t is None):
      print("Warning: reset environment before first action.")
      self.reset()

    if (a not in [0, 1]):
      print("Warning: only permitted actions are [0, 1].")

    # convert action index to action
    a = [-1,1][a]

    if np.random.random() < self.p_move:
      self.s_t = (self.s_t + a) % self.s_N
    return self.sample_o()

  def sample_o(self):
    return np.random.random() < self.p_o_given_s[self.s_t,1]

  

Sampling environment interactions of a random agent.

In [None]:
np.random.seed(1)

n_steps = 100

env = MinimalEnv()
ss, os = [], []
o = env.reset()
ss.append(env.s_t)
os.append(o)

for i in range(n_steps):
  a = np.random.choice([0,1]) # random agent
  o = env.step(a)
  ss.append(env.s_t)
  os.append(o)

fig, ax = plt.subplots(3, 1, figsize=(16, 12))
ax[0].plot(env.p_o_given_s[:,1])
ax[0].set_xlabel('state')
ax[0].set_ylabel('$p(o=True|s)$')
ax[1].plot(ss, label='agent state $s_t$')
ax[1].plot(np.ones_like(ss) * env.s_food, 
           'r--', label='target state $s^*$', linewidth=1)
ax[1].set_xlabel('timestep t')
ax[1].legend()
ax[2].plot(np.array(os))
ax[2].set_xlabel('timestep t')
ax[2].set_ylabel('observation o')

## Agent

In [None]:


# agent
"""
- the agent has an internal brain state b
- it receives a sensory state o in each timestep
- it chooses an action a to minimise variational free energy
- it updates its brain state to b' based on b, a, and s

"""
def softmax(x):
  e = np.exp(x - x.max())
  return e / e.sum()

def KL(a, b):
  """ Discrete KL divergence."""
  return np.dot(a, (np.log(a) - np.log(b)))

def get_b_star(s_star=0, s_N=16):
  b = np.zeros(s_N)
  b[s_star] = 10
  return b

class MinimalAgent(object):

  def __init__(self,
               p_s1_given_s_a, # true environment transition probability
               p_o_given_s, # true environment emission probability
               b_star, # logits of desired state distribution (thought desire was expressed in terms of sensor states?)
               a_N=2, # number of discrete actions
               b_N=16, # number of internal states (tabular representation of p(b))
               ):
    
    # environment dynamics
    self.p_s1_given_s_a = p_s1_given_s_a
    self.p_o_given_s = p_o_given_s
    self.a_N = a_N
    
    # belief state
    self.b_N = b_N # number of belief states
    self.b_star = b_star # desired distribution over belief states
    self.b_t = None # current belief state (undefined before reset)

  def reset(self):
    self.b_t = np.zeros(self.b_N) # uniform belieft at start

  def act(self, o):
    min_fe = None
    argmin_fe = None

    # evaluate policies by evaluating single next action
    # - more generally, we evaluate trajectories of actions (pi: a_0, ..., a_tau)
    # - we pick actions by sampling from softmax(G_pi)
    for a in range(self.a_N):
      # Note: we use action indices to represent actions (not {-1, 1})
      fe = self.free_energy(self.b_star, o, a)
      if (min_fe is None) or (fe < min_fe):
        min_fe = fe
        argmin_fe = a

      
    return argmin_fe

  @staticmethod
  def q(b):
    """ Variational distribution of environment state s given belief state b.
        p(s|b)

      (model_encoding, variational_density)
    """
    return softmax(b)

  @classmethod
  def dq(self, b):
    """ Derivative of the variational distribution.
     (model_encoding_derivative)
    """
    q = self.q(b)
    # Softmax derivative
    return np.diag(q) - np.outer(q, q)

  def generative_density(self, b, o, a):
    """
    Next state prediction from generative model.
    Here, the generative model is equal to the true environment dynamics.
    (generative_density)

    P(s', o | b, a) = Sum_over_s(P(s' | a, s) * P(o | s) * P(s | b))
    s' only depends on a and s, o only depends on s, and s only depends on b.

    Agent's prediction of next state probability given belief state and action
    (calculated separately for both sensory states).
    """

    # generative model of the next state p(s1, o | b, a)
    # todo: adapt to return joint for both observations
    p_o_s_given_b = self.q(self.b_t) * self.p_o_given_s[:,o] # joint prob p(o, s| b)
    p_s1_o_given_b_a = np.dot(p_o_s_given_b, self.p_s1_given_s_a[:,a,:])
    return p_s1_o_given_b_a

  def free_energy(self, b_star, o, a):
    # estimate of expected free energy, used for action selection
    q = self.q(b_star) # where I want to be
    p = self.generative_density(self.b_t, o, a=a) # where I get to taking action a
    return KL(q, p)

  def update_state(self, o, a, n_steps, lr=1.0):
    # internal belief state at time t+1 can be initialised
    # a) uniformly (expressing minimal knowledge about the future)
    # b) biased towards the current state (assuming small changes)
    # c) by updating current belief according to current world model
    #    this assumes that we know the inverse q^-1(b|s) which, in general, we don't
    b_prime = np.copy(self.b_t) # (b), alternatively np.zeros(self.b_N) (a)

    # posterior joint of next state and last observation given  last action 
    # and last belief state. This is constant across update iterations
    p = self.generative_density(self.b_t, o, a)
    #plt.plot(p, label="$p(s', o | b, a)$")

    for i in range(n_steps):
      q = self.q(b_prime)
      # KL(q, p)
      #F = np.dot(q, (np.log(q) - np.log(p))
      
      # free energy gradient wrt belief state
      dq = self.dq(b_prime)
      Y = 1 + (np.log(q) - np.log(p))
      db = np.dot(dq, Y)

      b_prime -= lr * db

    self.b_t = b_prime


In [None]:
# environment
p_o_max = 4.0**(-1.0/16)
o_decay = np.log(4)/16
s_food = 8
p_move=0.75
s_0 = 0
env = MinimalEnv(p_o_max=p_o_max,
                 o_decay=o_decay,
                 s_food=s_food,
                 p_move=p_move,
                 s_0=s_0)

# agent
target_position = 11
agent = MinimalAgent(p_s1_given_s_a=env.p_s1_given_s_a, 
                     p_o_given_s=env.p_o_given_s,
                     a_N=2,
                     b_star = get_b_star(target_position))
agent.reset()

# simulation
n_epochs = 100 # environment steps
n_steps = 100 # gradient descent steps per epoch
lr = 1.0

ss, oo, aa, bb = [], [], [], []
o = int(env.reset())
for env_step in range(n_epochs):

  if env_step % 10 == 0:
    print(f'Env step {env_step}')

  # pick action that minimises free energy
  a = agent.act(o)

  ss.append(env.s_t)
  oo.append(o)
  aa.append(a)
  bb.append(softmax(agent.b_t))

  # update belief to minimise free energy
  agent.update_state(o, a, n_steps, lr=lr)

  # take action in environment
  o = int(env.step(a))

# display trace
fig, ax = plt.subplots(3, 1, figsize=(16, 6*3))

plt.sca(ax[0])
plt.plot(ss, label='$s_t$')
plt.plot(np.ones_like(ss)*target_position, label='target')
plt.legend()
plt.xlim(0,n_epochs)

plt.sca(ax[1])
bb = np.array(bb)
plt.imshow(bb.T,
              interpolation="nearest", 
              aspect = "auto", 
              vmin = 0, vmax = bb.max(), 
              cmap = "viridis", origin='lower')
plt.plot([0,n_steps], [target_position, target_position], 'r--', label='target')
plt.plot([0,n_steps], [s_food, s_food], 'w--', label='food')
plt.plot(ss, label='$s_t$')
plt.xlim(0,n_epochs)
plt.legend()

plt.sca(ax[2])
plt.plot(aa, label='action')
plt.legend()

# Open Questions

- The Minimal Agent expresses desire as a distribution over brain states (internal model of world states); I was expecting this to be defined as a distribution over sensor states instead (want to always find food or want to find food 30% of the time).

- the initial guess b_prime before gradient descent exploits the simplicity of the model (sigmoid of table), which does not hold in general; use b' = b or b'=uniform to start with instead.


# Open Tasks

- **Agree on notation**

- The minimal agent assumes the true environment dynamics to be known; in RL we usually learn an implicit or explicit model from data. Learn internal model of the world through interaction (Chapter 7.4)

- Make belief non-tabular (e.g., Gaussian or a neural network)

- Define preference in terms of sensory states instead of belief states

- Attack a problem where planning (beyond single-step dynamics) is required or highly beneficial.

- Revisit collaborative Active Inference paper

- Revisit pointing without a pointer paper

- Implement pointing without a pointer as collaborative active inference




# Pointing without a pointer

In [None]:
class User(object):

  def __init__(self, n_targets=2):
    
    self.n_targets = n_targets
    self.goal = None # index of preferred target, assigned during reset()

  def reset(self):
    self.intent = np.random.choice(self.n_targets)

  def step(self, observation):
    pass