In [None]:
import numpy as np
import matplotlib.pyplot as plt

### Optimal Behaviour and User Modelling

Optimal user behaviour reduces a target's energy most rapidly. This involves exactly tracking the target's position, instantly absorbing all potential energy as it is converted from kinetic energy. In practice real users have constraint precision and act with delays, both present in their visual perception, cognitive process, and motor execution. To capture delays and constraint precision, we model user input as a first-order lag with Gaussian noise.

#### First Order Lag User

A first order lag system receives a repeated step function control signal.

In [None]:

# control signal u
num_repetitions = 3
num_steps_per_phase = 10
u = np.hstack([np.zeros((num_repetitions, num_steps_per_phase)), np.ones((num_repetitions, num_steps_per_phase))]).reshape(-1)
plt.plot(u, label='u')
plt.title('control signal');
plt.xlabel('timestep t (int)');

# lag response
k = 0.5 # lag constant [0, 1]
y = np.zeros_like(u)
y[0] = 0
for t in range(1, u.shape[0]):
  y[t] = (1-k) * y[t-1] + k * u[t]
  
plt.plot(y, label='y')
plt.legend()

In [None]:
# 2. Wrap behavior in an agent class.
class FirstOrderLag:
  """"
  First order lag (vectorized).
  
  Args:
      conductivity (np.array): moving average weight of new input, in [0,1].
      s0 (np.array): initial state after reset. 
      bounds (np.array): minimum and maximum state values
  """
  
  def __init__(self, conductivity=0.5, s0=0, bounds=None):
    self.k = conductivity
    self.s0 = s0
    self.bounds = bounds
    
  def reset(self):
    self.s = np.copy(self.s0)
    self._clip()
    return self.s
  
  def step(self, u):
    self.s = (1-self.k) * self.s + self.k * u
    self._clip()
    return self.s
  
  def _clip(self):
    if self.bounds is None:
      return
    
    self.s = np.clip(self.s, self.bounds[0], self.bounds[1])

In [None]:
# 3. simulate agent response to control signal.
user = FirstOrderLag(conductivity=0.15)
y = [user.reset()] + [user.step(u[t]) for t in range(1, u.shape[0])]
plt.plot(u)
plt.plot(y)

## User Interface

To illustrate how active inference can help adapt parameters of complex user interfaces to improve performance, we explore a target selection task with artificially complicated but well understood interaction dynamics. A visual display shows $N$ targets in motion mimicking the harmonic oscillation of masses on springs. All springs are *anchored* at the same height to a horizontal line that is controlled by the user. Each target has a different parametrization corresponding to different oscillation frequencies. A target is selected by reducing its mechanical energy to fall below some threshold $\tau$ while keeping all other targets' mechanical energies above the threshold. The mechanical energy is defined as the sum of potential energy and kinetic energy, with all energy being kinetic at the zero crossing and all energy being potential at the oscillation extrema. The oscillation amplitude thereby gives the user feedback about the effect of their inputs on progress towards selecting individual targets. Moving the anchor towards or away from a target reduces or increase its potential energy, respectively.

The harmonic oscillators act as a common mediating mechanism between various input spaces (e.g., trajectories of a mouse pointer, a visually tracked hand, gaze, or device tilting movements) and the control space (one of $N$ target selection). By decoupling the interaction metaphor from the specific input modality we can leverage users' mental model of the dynamics to make control across multiple modalities more intuitive, provided that parametrisations of the dynamics can be found that work well for each modality and user. Traditionally, finding good parametrisations was done manually or through trial and error user testing in each new context. More recently, computational interaction methods have been proposed, where user models trained via reinforcement learning are used to evaluate individual interface parametrisations in simulation  off-line. Here, we set out to demonstrate how active inference can be used to simultaneously learn the user model and adapt interface parametrisations to optimize task performance on-line, i.e. while a user interacts with the system.

### Harmonic Oscillators

We parametrise each harmonic oscillator with a mass $m$ and a spring constant $\kappa$, which correspond to the oscillation frequency in Eq. X
$$f=\frac{1}{2\pi}\sqrt{\frac{\kappa}{m}}$$

The state is represented by $(x, \dot x)$ , where $x$ is its distance from the anchor and $\dot x$ its velocity. The mechanical energy $E$, potential energy $U$, and kinetic energy $K$ are defined as in Eq. X
$$
E = U + K 
$$
$$ U = \frac{1}{2}\kappa \cdot x^2 $$
$$ K = \frac{1}{2}m \cdot \dot x^2 $$

The dynamics are described by an ordinary differential equation (ODE) and state updates are performed by taking one Euler step of length $\delta t$ (see Eq. X)

$$ \ddot x = \frac{F}{m} = \frac{-\kappa \cdot \dot x_t}{m} $$
$$ \dot x_t = \dot x_{t-1} + \delta t \cdot \ddot x_t $$
$$ x_t = x_{t-1} + \delta t \cdot \ \dot x_t$$



In [None]:
class HarmonicOscillator():
    
    def __init__(self, 
                 N=1, 
                 mass=1., 
                 spring_constant=1., 
                 damping_coefficient=0., 
                 dt=0.1, 
                 energy_start=1., 
                 energy_max=2):
      self.N = N
      self.mass_start = mass
      self.spring_start = spring_constant
      self.damping_start = damping_coefficient
      self.dt = dt # Euler step length
      self.energy_start = energy_start
      self.energy_max = energy_max
      
      self.x = None
      self.v = None
      self.anchor_start = 0
      
    def frequency(self):
      return 1/(2*np.pi) * np.sqrt(self.spring / self.mass)
    
    def potential_energy(self, x=None, spring=None):
      x = self.x if x is None else x
      spring = self.spring if spring is None else spring
      return 0.5 * spring * (x - self.anchor)**2
    
    def kinetic_energy(self, v=None, mass=None):
      v = self.v if v is None else v
      mass = self.mass if mass is None else mass
      return 0.5 * mass * v**2
    
    def energy(self, x=None, v=None, spring=None, mass=None):
      return self.potential_energy(x, spring) + self.kinetic_energy(v, mass)
    
    def ddot_x(self, dx, v=None, spring=None, damping=None, mass=None):
      v = self.v if v is None else v
      spring = self.spring if spring is None else spring
      damping = self.damping if damping is None else damping
      mass = self.mass if mass is None else mass
      
      f_spring = -spring * dx
      f_friction = -damping * v
      f_total = f_spring + f_friction
      a = f_total / mass
      return a
    
    def reset(self):
      self.anchor = self.anchor_start
      self.mass = self.mass_start * np.ones(self.N)
      self.spring = self.spring_start * np.ones(self.N)
      self.damping = self.damping_start * np.ones(self.N)
      
      # initialise all targets with equal energy and different phase
      phase = np.linspace(0, 2*np.pi, self.N+1)[:-1]
      e = self.energy_start # total mechanical energy
      max_x = np.sqrt(2*e/self.spring) # amplitude along x
      self.x = max_x * np.sin(phase) + self.anchor
      # note: maximum avoids numerical instability near zero in sqrt
      k = np.maximum(0, e - self.potential_energy())
      self.v = np.sqrt(2*k/self.mass) * np.sign(np.cos(phase))
      return {'x': self.x, 'v': self.v, 
              'debug': {'energy': self.energy(), 'mass': self.mass, 'spring': self.spring}
              }
    
    def step(self, action=0):
      # make hypothetical step with original parameters
      self.anchor = action
      dx = self.x - self.anchor
      a = self.ddot_x(dx)
      v = self.v + self.dt * a
      x = self.x + self.dt * v

      # boundary condition: limit energy
      e = self.energy(x=x, v=v)
      factor = np.minimum(1, self.energy_max / e)
      # re-estimate step with scaled parameters
      spring = self.spring * factor
      mass = self.mass * factor
      a = self.ddot_x(dx, spring=spring, mass=mass)
      self.v = self.v + self.dt * a
      self.x = self.x + self.dt * self.v

      return {'x': self.x, 'v': self.v, 
              'debug': {'energy': self.energy(spring=spring, mass=mass), 'mass': mass, 'spring': spring}
             }
    
    def plot_phase_space(self, axis=None):
      if axis is None:
        _, axis = plt.subplots()
      num_steps = int(np.max(1/(self.frequency() * self.dt))) + 2
      outputs = ['x', 'v']
      if self.x is None:
        self.reset()
      
      states = {o: [] for o in outputs}
      for _ in range(num_steps):
        state = self.step()
        for o in outputs:
          states[o].append(state[o])
      
      plt.sca(axis)
      plt.plot(np.array(states['x']), np.array(states['v']), '-')
      plt.title('Phase space');
      plt.xlabel('$x$')
      plt.ylabel("$\dot x$")
        

In [None]:
# 5. simulate uncontrolled MassSpringDamper
num_steps = 200
agent = HarmonicOscillator(N=1, dt=0.1, energy_start=1)
agent.reset()
agent.plot_phase_space()

os = [agent.reset()]
for _ in range(num_steps):
  os.append(agent.step())

fig, axes = plt.subplots(2, 1, figsize=(2*6, 12))

plt.sca(axes[0])
plt.title('absolute position over time')
xs = [o['x'] for o in os]
plt.plot(xs)
plt.ylabel('position')
plt.xlabel('timestep t')

plt.sca(axes[1])
plt.title('energy over time')
es = [o['debug']['energy'] for o in os]
plt.plot(es)
plt.ylabel('E')
plt.xlabel('timestep t')
plt.ylim([0, np.max(es)]);


In [None]:
# simulate dynamics of oscillators with different parameters and different energy
num_objects = 5
mass = np.hstack([ 1 * np.ones((num_objects)), 3 * np.ones(num_objects)]).reshape(-1)
spring = np.hstack([ 3 * np.ones((num_objects)), 1 * np.ones(num_objects)]).reshape(-1)
agent = HarmonicOscillator(N=2*num_objects, mass=mass, spring_constant=spring)

fig, axes = plt.subplots(1, 2, figsize=(2*6, 6))
agent.reset()
agent.plot_phase_space(axes[0])

xs = [agent.reset()['x']]
for _ in range(num_steps):
  xs.append(agent.step()['x'])
  
plt.sca(axes[-1])
plt.plot(xs);
plt.xlabel('t')
plt.ylabel('x')

# User-UI Interaction

## Single object to control

In [None]:
# 6. simulate MassSpringDamper controlled by FirstOrderLag
num_steps = 200

ui = HarmonicOscillator()
# generate an uncontrolled sequence
ui.reset()
xs = [ui.reset()['x']] + [ui.step()['x'] for _ in range(num_steps)]

# close the loop with a first-order lag user
user = FirstOrderLag(conductivity=0.2)
user.reset()
os_user = []
os_ui = [ui.reset()]
for i in range(num_steps):
  # user response to UI stimulus
  o_user = user.step(os_ui[-1]['x'])
  os_user.append(o_user)
  
  # UI update in response to user
  os_ui.append(ui.step(o_user))
  
plt.plot(xs, label='uncontrolled system')
plt.plot(os_user, label='control signal')
plt.plot([o['x'] for o in os_ui], label='controlled system')
plt.title('absolute position over time')
plt.xlabel('timestep t')
plt.ylabel('position')
plt.legend()

## Selective control

In [None]:
# 7. simulate a set of MassSpringDampers with different mass and selective control with FirstOrderLag
num_targets = 8
num_steps = 200
user_target = 1

ui = HarmonicOscillator(N=num_targets)
# generate an uncontrolled sequence
ui.reset()
xs_uncontrolled = [ui.reset()['x']] + [ui.step()['x'] for _ in range(num_steps)]

# close the loop with a first-order lag user
user = FirstOrderLag(conductivity=0.2)
user.reset()
os_user = []
os_ui = [ui.reset()]
for i in range(num_steps):
  # user response to UI stimulus
  o_user = user.step(os_ui[-1]['x'][user_target])
  os_user.append(o_user)
  # UI update in response to user
  os_ui.append(ui.step(o_user))
  
fig, ax = plt.subplots(2, 1, figsize=(2*6, 12))

plt.sca(ax[0])
plt.title('Position over time')
xs = [o['x'] for o in os_ui]
plt.plot(xs)
plt.plot(np.array(xs_uncontrolled)[:,user_target], 'r--', label='uncontrolled system')
plt.plot(os_user, label='control signal')
plt.plot(np.array(xs)[:,user_target], 'r', label='controlled system')
plt.xlabel('timestep t')
plt.ylabel('absolute position')
plt.legend()


plt.sca(ax[1])
plt.title('Energy over time')
es = [o['debug']['energy'] for o in os_ui]
plt.plot(es)
plt.plot(np.array(es)[:,user_target], 'r')
plt.ylabel('mechanical energy E')
plt.xlabel('timestep t')

## Selection Trigger (naive)

As the number of targets increases
- the threshold needs lowered
- the EMA conductivity needs to be reduced


In [None]:
# exponential moving average (first-order lag)

threshold = 0.2
ema = FirstOrderLag(conductivity=0.2, s0 = np.ones_like(es[0]))
es_ema = [ema.reset()] + [ema.step(E) for E in es]
selection = [y < threshold for y in ys]

fig, ax = plt.subplots(figsize=(16, 16))
plt.plot([0, len(xs)], [threshold]*2, 'r--', label='threshold')
plt.plot(es_ema)
plt.plot(np.array(es_ema)[:,user_target], 'r-', label='controlled system', linewidth=3)
plt.plot(np.array(selection)*2, 'k', linewidth=1);
plt.legend()
plt.xlabel('timestep t')
plt.ylabel('smoothed energy EMA(E)')
plt.title('smoothed energy over time')

In [None]:
# User target generator

class TargetGenerator():
  
  def __init__(self, num_targets):
    self.num_targets = num_targets
    
  def reset(self):
    self.s = np.random.choice(self.num_targets)
    self.s_one_hot = np.eye(self.num_targets)[self.s]
    return self.observation()
  
  def step(self, trigger_out):
    # reset if true positive
    if trigger_out.sum() > 0 and trigger_out[self.s]:
      return self.reset()
    
    return self.observation()
  
  def observation(self):
    return {'s': self.s, 's_one_hot': self.s_one_hot}

In [None]:
# putting it all together
# 7. simulate a set of MassSpringDampers with different mass and selective control with FirstOrderLag
num_targets = 8
num_steps = 400

ui = HarmonicOscillator(N=num_targets)
# close the loop with a first-order lag user
user_target = TargetGenerator(num_targets)
user = FirstOrderLag(conductivity=0.2, s0 = 0, bounds=[-4,4])
# filter energy for selection trigger
trigger_in = FirstOrderLag(conductivity=0.1, s0 = np.ones(num_targets) * ui.energy_start)
# trigger by thresholding
def trigger_out(inputs, threshold=threshold):
  return inputs < threshold

def reset():
  o = {}
  o['ui'] = ui.reset()
  o['user'] = user.reset()
  o['user_target'] = user_target.reset()
  o['trigger_in'] = trigger_in.reset()
  o['trigger_out'] = trigger_out(o['trigger_in'])
  return o

def step(a):
  global user_target
  o = {}
  
  # reset UI when event has been triggered
  if a['trigger_out'].sum() > 0:
    ui.reset()
    trigger_in.reset()
  
  o['ui'] = ui.step(a['user'])
  o['trigger_in'] = trigger_in.step(o['ui']['debug']['energy'])
  o['trigger_out'] = trigger_out(o['trigger_in'])
  
  # resample user goal if their target has been triggered
  o['user_target'] = user_target.step(a['trigger_out'])
  o['user'] = user.step(o['ui']['x'][o['user_target']['s']])
  
  return o

os = [reset()]
for i in range(num_steps):
  os.append(step(os[-1]))

In [None]:
# plot dynamics
fig, axes = plt.subplots(7, 1, figsize=(12, 4*7))

# target positions over time
plt.sca(axes[0])
plt.title('target positions over time')
xs = np.array([o['ui']['x'] for o in os])
ts = np.array([o['user_target']['s'] for o in os])
plt.plot(xs)
plt.plot(xs[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep t')
plt.ylabel('absolute position')

# user input over time
y = np.array([o['user'] for o in os])
plt.plot(y, 'k--', label='user input')
plt.legend()

# target mass
plt.sca(axes[1])
plt.title('mass over time')
ms = np.array([o['ui']['debug']['mass'] for o in os])
plt.plot(ms)
plt.plot(ms[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep  t')
plt.ylabel('mass')

# target spring coefficient
plt.sca(axes[2])
plt.title('spring coefficient over time')
ss = np.array([o['ui']['debug']['spring'] for o in os])
plt.plot(ss)
plt.plot(ss[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep t')
plt.ylabel('spring coefficient')

# target energies
plt.sca(axes[3])
plt.title('energy over time')
es = np.array([o['ui']['debug']['energy'] for o in os])
plt.plot(es)
plt.xlabel('timestep t')
plt.ylabel('mechanical energy E')

# user target
plt.sca(axes[4])
plt.title('user target over time')
ts = np.array([o['user_target']['s_one_hot'] for o in os])
plt.plot(ts, '--')
plt.xlabel('timestep t')

# trigger output over time
y = np.array([o['trigger_out'] for o in os])
plt.sca(axes[5])
plt.plot(y)
plt.xlabel('timestep t')
plt.ylabel('trigger output')
plt.title('events triggered over time')

# trigger input over time
y = np.array([o['trigger_in'] for o in os])
plt.sca(axes[6])
plt.plot(y)
plt.plot([0, len(os)], [threshold]*2, 'k--', label='threshold')
plt.legend()
plt.xlabel('timestep t')
plt.ylabel('trigger input')
plt.title('smoothed energy over time')


plt.tight_layout()

In [None]:
# test script that puts it all together
import os
os.chdir('..')
print(os.getcwd())

In [None]:
import importlib
import harmonic_interaction_01 as hi
importlib.reload(hi)

In [None]:
num_steps = 400

agents = hi.init()
os = [hi.reset(agents)]
for _ in range(num_steps):
  os.append(hi.step(agents, os[-1]))

In [None]:
# plot dynamics
fig, axes = plt.subplots(7, 1, figsize=(12, 4*7))

# target positions over time
plt.sca(axes[0])
plt.title('target positions over time')
xs = np.array([o['ui']['x'] for o in os])
ts = np.array([o['user_target']['s'] for o in os])
plt.plot(xs)
plt.plot(xs[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep t')
plt.ylabel('absolute position')

# user input over time
y = np.array([o['user'] for o in os])
plt.plot(y, 'k--', label='user input')
plt.legend()

# target mass
plt.sca(axes[1])
plt.title('mass over time')
ms = np.array([o['ui']['debug']['mass'] for o in os])
plt.plot(ms)
plt.plot(ms[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep  t')
plt.ylabel('mass')

# target spring coefficient
plt.sca(axes[2])
plt.title('spring coefficient over time')
ss = np.array([o['ui']['debug']['spring'] for o in os])
plt.plot(ss)
plt.plot(ss[np.arange(num_steps+1),ts], 'k-', label='user target')
plt.xlabel('timestep t')
plt.ylabel('spring coefficient')

# target energies
plt.sca(axes[3])
plt.title('energy over time')
es = np.array([o['ui']['debug']['energy'] for o in os])
plt.plot(es)
plt.xlabel('timestep t')
plt.ylabel('mechanical energy E')

# user target
plt.sca(axes[4])
plt.title('user target over time')
ts = np.array([o['user_target']['s_one_hot'] for o in os])
plt.plot(ts, '--')
plt.xlabel('timestep t')

# trigger output over time
y = np.array([o['trigger_out'] for o in os])
plt.sca(axes[5])
plt.plot(y)
plt.xlabel('timestep t')
plt.ylabel('trigger output')
plt.title('events triggered over time')

# trigger input over time
y = np.array([o['trigger_in'] for o in os])
plt.sca(axes[6])
plt.plot(y)
plt.plot([0, len(os)], [threshold]*2, 'k--', label='threshold')
plt.legend()
plt.xlabel('timestep t')
plt.ylabel('trigger input')
plt.title('smoothed energy over time')


plt.tight_layout()

## Selection Trigger (Active Inference)

In the simplest setting, we can perform belief updates using the model of the user and of the UI dynamics to reason about the user's target object.

In a more advanced setting, we can intervene on the UI to help resolve ambiguity by changing some of the objects' parameters (mass, spring constant, and damping constant).

# Legacy code

In [None]:
# 8. reason about user intent by observing interaction

# a. traditional approach: moving average velocity below threshold, with
# initial value, capcity (moving average weight) and threshold finetuned
# tie-break on value if threshold is crossed by multiple targets
# - fine-tune initial value to maximum peak velocity
# - fine-tune conductivity to reduce oscillation in controlled object's score
#   near convergence
# - fine-tune threshold to separate controlled from uncontrolled objects
# - trigger action if a single object's score is below threshold

num_objects = 16
num_steps = 200
fig, ax = plt.subplots(figsize=(8*2, 12))

mass = np.ones(num_objects) + np.linspace(0, 1, num_objects)
ui = MassSpringDamper(mass=mass, bounds=[-1, 1])
user = FirstOrderLag(conductivity=0.1)

reasoner = FirstOrderLag(conductivity=0.05, s0=0.1)
#reasoner = SimpleMovingAverage(buffer_size=64, s0=np.ones(num_objects)*0.02)
threshold = 0.001

os_ui = []
os_user = []
ss_reasoner = []
os_reasoner = []

user.reset()
reasoner.reset()
o_ui = ui.reset()

for _ in range(num_steps):
  # stimulus response - target system at index 0
  o_user = user.step(o_ui['x'][-1])
  # log stimulus and response
  os_ui.append(o_ui['x'])
  os_user.append(o_user)
  # simulate next stimulus
  o_ui = ui.step(o_user)
  # reason about ui response to user input
  s_reasoner = reasoner.step(np.abs(o_ui['dx']))
  o_reasoner = s_reasoner < threshold
  ss_reasoner.append(s_reasoner)
  os_reasoner.append(o_reasoner)
  """
  if np.sum(o_reasoner > 1e-6):
    # object selected
    break
  """
  
plt.plot(ss_reasoner, '-');
plt.plot(np.max(np.array(os_reasoner), axis=1) * np.max(ss_reasoner), 'k-', label='triggered');
plt.plot([0, num_steps], [threshold, threshold], 'k--', label='threshold');
plt.xlabel('timestep t')
plt.ylabel('selection score (lower is better)')
plt.legend()