### Multiagent Active Blockference

This notebook is an experimental exploration of multi-agent active inference. CadCAD is not used at this point.

We are considering an environment with two agents, Karl and Thomas, who are trying to move to a preferred state without colliding.

In [77]:
import itertools
import numpy as np
import copy
import sys

# adding tools to the system path
sys.path.insert(0, '../../')

from blockference.envs.grid_env import GridAgent
from blockference.gridference import ActiveGridference
from blockference.agent import Agent

In [91]:
# start with 2x2 grid
grid = list(itertools.product(range(3), repeat=2))
border = np.sqrt(len(grid)) - 1
pos_dict = {}
for i in range(0, len(grid)):
    pos_dict[i] = grid[i]
print(pos_dict)
num_agents = 2 # start with 2 agents
init_pos = [0, 3] # agents will start at positions 0 and 3

{0: (0, 0), 1: (0, 1), 2: (0, 2), 3: (1, 0), 4: (1, 1), 5: (1, 2), 6: (2, 0), 7: (2, 1), 8: (2, 2)}


In [92]:
# getting the grid positions and indexes for the two agents K & T
init_K = init_pos[0]
init_T = init_pos[1]
init_K_pos = pos_dict[init_K]
init_T_pos = pos_dict[init_T]

In [95]:
# getting the preferred grid positions and indexes for the two agents A & B
# their preferred position will be the one where the other agent starts
pref_K = 8
pref_T = 0
pref_K_pos = pos_dict[pref_K]
pref_T_pos = pos_dict[pref_T]

#### Observations and States
In a single-agent environment, observations and states are both just the number of positions (because the agent can be at 4 different positions (4 states) and have 4 different observations).

Adding an extra agents adds extra complexity. We let our agents be strictly non-interacting, i.e. they cannot occupy the same position on the grid at the same time.

#### Generative Model Tensors
Now we define the tensors describing the generative model. For detailed explanations of what each tensor means/does, see:
https://pymdp-rtd.readthedocs.io/en/latest/notebooks/active_inference_from_scratch.html

## Alternative way of thinking about states & state modalities (current)
The two modalities of the multiagent POMDP:
- location: "where am I in the world (on the grid)"
- agent awareness: "where is the other agent with respect to me in the world"

These modalities will be reflected in the **A** and **B** matrices.

In [45]:
# E vector (affordances)
E = ["UP", "DOWN", "LEFT", "RIGHT", "STAY"]

In [59]:
# location
n_states = len(grid)
n_observations = len(grid)

In [60]:
# A matrix
# Note: maybe multi-agent actinf does not change B but rather A & D
A = np.eye(n_observations, n_states)
print(A)

[[1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]]


In [61]:
# other agent relative location (currently 1-step depth)
second_agent_locations = ["NONE", "NEXT_LEFT", "NEXT_RIGHT", "ABOVE", "BELOW"]

n_states_second = len(second_agent_locations)
n_observations_second = len(second_agent_locations)

A_second = np.eye(n_observations_second, n_states_second)
print(A_second)

[[1. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 1.]]


In [64]:
full_A = np.array((A, A_second), dtype='object')

In [66]:
full_A.shape

(2,)

In [67]:
# B matrix
# Note: B should only encode prior beliefs about !controllable! transitions between hidden states
# why/how can we assume the actions of the other agents are within controllable transitions?
B = np.zeros((len(grid), len(grid), len(E)))

for action_id, action_label in enumerate(E):

    for curr_state, grid_location in enumerate(grid):

        y, x = grid_location

        if action_label == "DOWN":
            next_y = y - 1 if y > 0 else y
            next_x = x
        elif action_label == "UP":
            next_y = y + 1 if y < border else y
            next_x = x
        elif action_label == "LEFT":
            next_x = x - 1 if x > 0 else x
            next_y = y
        elif action_label == "RIGHT":
            next_x = x + 1 if x < border else x
            next_y = y
        elif action_label == "STAY":
            next_x = x
            next_y = y
        new_location = (next_y, next_x)
        next_state = grid.index(new_location)
        B[next_state, curr_state, action_id] = 1.0
print(B)

[[[0. 1. 1. 0. 1.]
  [0. 0. 1. 0. 0.]
  [0. 1. 0. 0. 0.]
  [0. 0. 0. 0. 0.]]

 [[0. 0. 0. 1. 0.]
  [0. 1. 0. 1. 1.]
  [0. 0. 0. 0. 0.]
  [0. 1. 0. 0. 0.]]

 [[1. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [1. 0. 1. 0. 1.]
  [0. 0. 1. 0. 0.]]

 [[0. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0.]
  [0. 0. 0. 1. 0.]
  [1. 0. 0. 1. 1.]]]


Second modality of the **B** matrix is the transition probabilities given an observation of a second agent.
This can either be:
- "there is an agent *above* me"
- "there is an agent *below* me"
- "there is an agent *to the right* of me"
- "there is an agent *to the left* of me"
- "there is no agent next to me

This modality should track the *relative* position of the agent with respect to the second agent.
This can then be scaled to arbitrary many agents by using this matrix for tracking the position of different agents relative to each other.

In the following, the K_agent is the one whose generative model we're modeling, T_agent is the agent who K_agent is perceiving.

(Note: we might possibly need to add a third modality, colliding/not-colliding, for encoding preferences)

In [103]:
second_agent_locations = ["NONE", "NEXT_LEFT", "NEXT_RIGHT", "ABOVE", "BELOW"]

B_second = np.zeros((len(second_agent_locations), len(second_agent_locations), len(E)))
pos_idx = {"NONE": 0, "NEXT_LEFT": 1, "NEXT_RIGHT": 2, "ABOVE": 3, "BELOW": 4}

for action_id, action_label in enumerate(E):

    for curr_state, T_location in enumerate(second_agent_locations):

        if action_label == "UP":
            next_T_location = "NONE" if T_location != "ABOVE" else "ABOVE"
        elif action_label == "DOWN":
            next_T_location = "NONE" if T_location != "BELOW" else "BELOW"
        elif action_label == "LEFT":
            next_T_location = "NONE" if T_location != "NEXT_LEFT" else "NEXT_LEFT"
        elif action_label == "RIGHT":
            next_T_location = "NONE" if T_location != "NEXT_RIGHT" else "NEXT_RIGHT"
        elif action_label == "STAY":
            next_T_location = T_location
        new_T_location = next_T_location
        next_state = pos_idx[new_T_location]
        B_second[next_state, curr_state, action_id] = 1.0

print(B_second)

[[[1. 1. 1. 1. 1.]
  [1. 1. 0. 1. 0.]
  [1. 1. 1. 0. 0.]
  [0. 1. 1. 1. 0.]
  [1. 0. 1. 1. 0.]]

 [[0. 0. 0. 0. 0.]
  [0. 0. 1. 0. 1.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]]

 [[0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 1. 1.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]]

 [[0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [1. 0. 0. 0. 1.]
  [0. 0. 0. 0. 0.]]

 [[0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0.]
  [0. 1. 0. 0. 1.]]]


In [104]:
full_B = np.array((B, B_second), dtype='object')

In [105]:
full_B.shape

(2,)

In [106]:
A_gm = copy.deepcopy(full_A)
B_gm = copy.deepcopy(full_B)

In [107]:
agent = Agent(A=A_gm, B=B_gm)

In [108]:
agent.D = [init_K, 0] # initial K position & initial K position relative to T, 0 means "NONE"

In [109]:
agent.E = E # adding agent affordances to Agent class instance

In [110]:
agent.C = [pref_K, 0] # preferred location & preferred relative relation to second agent (again "NONE")

This concludes the initialization of the single agent for the multi-agent POMDP. What follows is an attempt at a full 2-agent Blockference simulation.

In [118]:
from radcad import Model, Simulation, Experiment

In [111]:
agent_K = copy.deepcopy(agent)
agent_T = copy.deepcopy(agent)

# change Thomas' prior & preference
agent_T.D = [init_T, 0]
agent_T.C = [pref_T, 0]

In [119]:
initial_state = {
    'agent_K': agent_K,
    'agent_T': agent_T,
    'env_state': '', # TODO
}

In [120]:
params = {
}

In [None]:
def p_rel_position(params, substep, state_history, previous_state):
    return # TODO

def p_actinf(params, substep, state_history, previous_state):
    return # TODO

In [None]:
def s_prior(params, substep, state_history, previous_state, policy_input):
    return # TODO

def s_env(params, substep, state_history, previous_state, policy_input):
    return # TODO

In [None]:
state_update_blocks = [
    {
        'policies': {
            # TODO
        },
        'variables': {
            # TODO
        }
    }
]

In [None]:
model = Model(
    # Model initial state
    initial_state=initial_state,
    # Model Partial State Update Blocks
    state_update_blocks=state_update_blocks,
    # System Parameters
    params=params
)

In [None]:
simulation = Simulation(
    model=model,
    timesteps=20,  # Number of timesteps
    runs=1  # Number of Monte Carlo Runs
)

In [None]:
result = simulation.run()

In [None]:
df = pd.DataFrame(result)
df