Copyright 2022 DeepMind Technologies Limited

All software is licensed under the Apache License, Version 2.0 (Apache 2.0);
you may not use this file except in compliance with the Apache 2.0 license.
You may obtain a copy of the Apache 2.0 license at:
https://www.apache.org/licenses/LICENSE-2.0

All other materials are licensed under the Creative Commons Attribution 4.0
International License (CC-BY). You may obtain a copy of the CC-BY license at:
https://creativecommons.org/licenses/by/4.0/legalcode

Unless required by applicable law or agreed to in writing, all software and
materials distributed here under the Apache 2.0 or CC-BY licenses are
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the licenses for the specific language governing
permissions and limitations under those licenses.

This is not an official Google product.

# **Causal analysis of agents using the Agent Debugger**

This colab contains the experiments presented in the paper [Causal Analysis of Agent Behavior for AI Safety](https://arxiv.org/pdf/2103.03938.pdf).

It uses agents which have been previously trained using [Impala](https://arxiv.org/pdf/1802.01561.pdf), and we download their parameters (i.e. neural networks weights) from local files, also open sourced. The environments are based on [Pycolab](https://github.com/deepmind/pycolab), an open source library to build 2D gridworlds.

The main tool used is the "Agent Debugger", which allows us to perform interventions easily and in a standardized way on the agent and the environment.

In [None]:
!git clone https://github.com/deepmind/agent_debugger.git
!pip install -r agent_debugger/requirements.txt

In [None]:
# python3
# @title Imports

import collections
import dill
import functools
import itertools
import random
import requests
from typing import Any

import dm_env
import haiku as hk
import numpy as np
import tensorflow as tf

from agent_debugger.src import impala_net
from agent_debugger.src import impala_agent

from agent_debugger.src.agent_debugger import node as node_lib
from agent_debugger.src.agent_debugger.pycoworld import debugger as pcw_dbg
from agent_debugger.src.agent_debugger.pycoworld import interventions as pcw_interv
from agent_debugger.src.pycoworld import default_constants as pcw_constants
from agent_debugger.src.pycoworld import environment as pcw_env

In [None]:
# @title Constants used throughout.

n_rollouts = 100

# Useful shortcut.
Tile = pcw_constants.Tile

In [None]:
# @title Utils used throughout.

def prepare_init_nodes(debugger: pcw_dbg.PycoworldDebugger) -> list[node_lib.Node]:
  """Returns a list of nodes, which agent and env seeds have been changed."""
  root_node = debugger.get_root_node()
  init_nodes = []
  for seed in range(n_rollouts):
    node = debugger.interventions.change_env_seed(root_node, seed)
    node = debugger.interventions.change_agent_seed(node, 2 * seed)
    init_nodes.append(node)
  return init_nodes

def load_params(agent_name: str) -> Any:
  """Returns a set of parameters downloaded from google storage."""
  url_base = "https://storage.googleapis.com/dm_agent_debugger/{}.dill"
  url = url_base.format(agent_name)
  file = requests.get(url, allow_redirects=True, stream=True)
  return dill.load(file.raw)

def make_agent_from_params(
    params: hk.Params, with_lstm: bool
) -> impala_agent.ImpalaAgent:
  """Returns an Impala agent, adapted for the Agent Debugger.

  Args:
    params: The parameters of the agent (neural network weights).
    with_lstm: Whether the agent contains an LSTM cell or not.
  """
  net_factory = functools.partial(
      impala_net.RecurrentConvNet,
      conv_widths=(128, 128, 128),
      conv_kernels=(3, 3, 3),
      padding='SAME',
      torso_widths=(128,),
      lstm_width=128 if with_lstm else 0,
      head_widths=(128,),
      num_actions=4
  )

  # Changing the keys of the parameters to match the module's name.
  new_params = {}
  for key in params:
    values = key.split('/')
    new_key = '/'.join(['recurrent_conv_net']+values[1:])
    new_params[new_key] = params[key]

  return impala_agent.ImpalaAgent(net_factory=net_factory, params=new_params)

## 1 - Confounders

In [None]:
# @title Create interventions.
# @markdown An intervention acts on a node (see node.py in the code) and changes some internal states (agent or environment).
# @markdown In this case we change the floor type and pill position, which are attributes of the environment.

def change_floor_type(node: node_lib.Node, floor_type: str) -> node_lib.Node:
  new_floor_type = Tile.SAND if floor_type=="sand" else Tile.GRASS
  # These positions are hardcoded for the environment we use, ie "grass_sand".
  for position in [(3, 2), (3, 3), (3, 4), (2, 4), (4, 4)]:
    node = debugger.interventions.replace_backdrop_element(
        node, position=position, new_element_id=new_floor_type)
  return node

def change_pill_position(node: node_lib.Node, pill_pos: str) -> node_lib.Node:
  new_position = (1, 4) if pill_pos=="right" else (5, 4)
  position = debugger.extractors.get_element_positions(node, element_id=Tile.REWARD)[0]
  # Check that the position is actually new, otherwise there is nothing to do.
  if new_position != position:
    node = debugger.interventions.move_drape_element_to(
        node, drape_id=Tile.REWARD, start_position=position, dest_position=new_position)
    # Don't forget to change the terminal states too, which lie underneath the tiles.
    node = debugger.interventions.replace_backdrop_element(node, new_position, Tile.TERMINAL)
    floor_type = debugger.extractors.get_backdrop_curtain(node)[3, 2]
    node = debugger.interventions.replace_backdrop_element(node, position, floor_type)
  return node


In [None]:
# @title Create variable extractors.
# @markdown This tells us what is the reward's postion or the agent's position for a given rollout. We use this information to compute the probabilities.

def get_agent_final_position(rollout: list[node_lib.Node]) -> str:
  """Returns the agent final position, in {"right", "left"}."""
  final_position = debugger.extractors.get_element_positions(
      rollout[-1], Tile.PLAYER)[0]
  return "right" if final_position[0] <= 2 else "left"

def get_reward_position(rollout: list[node_lib.Node]) -> str:
  """Returns the reward position, in {"right", "left"}."""
  reward_position = debugger.extractors.get_element_positions(
      rollout[0], Tile.REWARD)[0]
  return "right" if reward_position[0] <= 2 else "left"

In [None]:
# @title Create the debugger for our agent and environment.
# @markdown A different agent implies a different debugger, because it acts on the coupled agent+environment dynamical system.

agent = "B" #@param["A", "B"]

# Agent A - trained on correlated reward+floor
if agent == "A":
  params = load_params('confounders_a')
  env = pcw_env.build_environment(level='grass_sand')

# Agent B - trained on uncorrelated reward+floor
if agent == "B":
  params = load_params('confounders_b')
  env = pcw_env.build_environment(level='grass_sand_uncorrelated')

trained_agent = make_agent_from_params(params, with_lstm=True)

debugger = pcw_dbg.PycoworldDebugger(trained_agent, env)
root_node = debugger.get_root_node()

In [None]:
# @title Prepare nodes with different agent and environment seeds.
# @markdown This cell creates nodes with different environment and agent seeds. We'll use these nodes later, but intervene on them before creating the rollouts.

init_nodes = prepare_init_nodes(debugger)

In [None]:
# @title Compute conditional probabilities.
# @markdown <strong>Note that, in all the colab, we compute the conditional probabilities using P(A|B) = P(A and B) / P(B).</strong>

reward_position_count = collections.defaultdict(lambda: 0)
reward_match_agent_count = collections.defaultdict(lambda: 0)
for node in init_nodes:
  rollout = debugger.get_rollout(node, maximum_length=15)
  reward_position = get_reward_position(rollout)
  agent_final_position = get_agent_final_position(rollout)
  reward_position_count[reward_position] += 1
  if reward_position == agent_final_position:
    reward_match_agent_count[reward_position] += 1
print("P(T=l | R=l) = " + str(reward_match_agent_count["left"] / (reward_position_count["left"])))
print("P(T=r | R=r) = " + str(reward_match_agent_count["right"] / (reward_position_count["right"])))

In [None]:
# @title Compute interventional probabilities - reward position interventions.

agent_right_count = 0
for node in init_nodes:
  do_reward_right = change_pill_position(node, pill_pos='right')
  rollout = debugger.get_rollout(do_reward_right, maximum_length=15)
  agent_right_count += int(get_agent_final_position(rollout) == 'right')
print("P(T=r | do(R=r)) = " + str(agent_right_count / (len(init_nodes))))

agent_left_count = 0
for node in init_nodes:
  do_reward_left = change_pill_position(node, pill_pos='left')
  rollout = debugger.get_rollout(do_reward_left, maximum_length=15)
  agent_left_count += int(get_agent_final_position(rollout) == 'left')
print("P(T=l | do(R=l)) = " + str(agent_left_count / (len(init_nodes))))

In [None]:
# @title Compute interventional probabilities - floor type interventions.

agent_right_count = 0
for node in init_nodes:
  do_floor_sand = change_floor_type(node, "sand")
  rollout = debugger.get_rollout(do_floor_sand, maximum_length=15)
  agent_right_count += int(get_agent_final_position(rollout) == 'right')
print("P(T=r | do(F=s)) = " + str(agent_right_count / (len(init_nodes))))

agent_left_count = 0
for node in init_nodes:
  do_floor_grass = change_floor_type(node, "grass")
  rollout = debugger.get_rollout(do_floor_grass, maximum_length=15)
  agent_left_count += int(get_agent_final_position(rollout) == 'left')
print("P(T=l | do(F=g)) = " + str(agent_left_count / (len(init_nodes))))

# 2 - Memory

In [None]:
# @title Create variable extractors.

def floor_type(rollout):
  is_sand = bool(debugger.extractors.get_element_positions(rollout[0], Tile.SAND))
  return 'sand' if is_sand else 'grass'

def agent_pos_after_interv(rollout):
  pos = debugger.extractors.get_element_positions(rollout[5], Tile.PLAYER)[0]
  return 'right' if pos.row <= 3 else 'left'

def agent_pos_end(rollout):
  pos = debugger.extractors.get_element_positions(rollout[-1], Tile.PLAYER)[0]
  return 'right' if pos.row <= 3 else 'left'


In [None]:
# @title Create the debugger.

agent = "B" #@param["A", "B"]

env = pcw_env.build_environment(
    level='large_color_memory',
    egocentric_horizon=1
)

# Agent A - with memory
if agent == "A":
  params = load_params('memory_a')
  with_lstm = True

# Agent B - without memory
if agent == "B":
  params = load_params('memory_b')
  with_lstm = False

trained_agent = make_agent_from_params(params, with_lstm=with_lstm)

debugger = pcw_dbg.PycoworldDebugger(trained_agent, env)
init_nodes = prepare_init_nodes(debugger)

In [None]:
# @title Compute conditional probabilities.

floor_type_l = []
agent_pos_after_interv_l = []
agent_pos_end_l = []
for node in init_nodes:
  rollout = debugger.get_rollout(node, maximum_length=15)
  floor_type_l.append(floor_type(rollout))
  agent_pos_after_interv_l.append(agent_pos_after_interv(rollout))
  agent_pos_end_l.append(agent_pos_end(rollout))

print("P(T=l | F=g) = "+str(np.mean(np.logical_and(np.array(agent_pos_end_l)=="left", np.array(floor_type_l)=="grass"))/np.mean(np.array(floor_type_l)=="grass")))
print("P(T=r | F=s) = "+str(np.mean(np.logical_and(np.array(agent_pos_end_l)=="right", np.array(floor_type_l)=="sand"))/np.mean(np.array(floor_type_l)=="sand")))
print("P(P=l | F=g) = "+str(np.mean(np.logical_and(np.array(agent_pos_after_interv_l)=="left", np.array(floor_type_l)=="grass"))/np.mean(np.array(floor_type_l)=="grass")))
print("P(P=r | F=s) = "+str(np.mean(np.logical_and(np.array(agent_pos_after_interv_l)=="right", np.array(floor_type_l)=="sand"))/np.mean(np.array(floor_type_l)=="sand")))

In [None]:
# @title Compute interventional probabilities.

UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3

floor_type_l = []
agent_pos_end_l = []
for node in init_nodes:
  breakpoint = lambda node: node.episode_step == 3
  intervention_at_breakpoint = functools.partial(
      debugger.interventions.change_agent_next_actions,
      forced_next_actions=([UP, UP]))
  rollout = debugger.get_intervened_rollout(
      node, 15,
      breakpoint, intervention_at_breakpoint)
  floor_type_l.append(floor_type(rollout))
  agent_pos_end_l.append(agent_pos_end(rollout))

print("P(T=l | do(P=r), F=g) = "+str(np.mean(np.logical_and(np.array(agent_pos_end_l)=="left", np.array(floor_type_l)=="grass"))/np.mean(np.array(floor_type_l)=="grass")))


floor_type_l = []
agent_pos_end_l = []
for node in init_nodes:
  breakpoint = lambda node: node.episode_step == 3
  intervention_at_breakpoint = functools.partial(
      debugger.interventions.change_agent_next_actions,
      forced_next_actions=([DOWN, DOWN]))
  rollout = debugger.get_intervened_rollout(
      node, 15,
      breakpoint, intervention_at_breakpoint)
  floor_type_l.append(floor_type(rollout))
  agent_pos_end_l.append(agent_pos_end(rollout))

print("P(T=r | do(P=l), F=s) = "+str(np.mean(np.logical_and(np.array(agent_pos_end_l)=="right", np.array(floor_type_l)=="sand"))/np.mean(np.array(floor_type_l)=="sand")))

# 3 - Robust generalization

In [None]:
# @title Create interventions and variable extractors.

def move_reward_to_quadrant(node, quadrant: str):
  all_positions = itertools.product(range(1, 7), range(1, 7))
  if quadrant == 'south':
    filter_quadrant = lambda pos: pos[0] >= 4 and pos[1] >= 4
  elif quadrant == 'north':
    filter_quadrant = lambda pos: pos[0] < 4 and pos[1] < 4
  elif quadrant == 'east':
    filter_quadrant = lambda pos: pos[0] < 4 and pos[1] >= 4
  else:
    filter_quadrant = lambda pos: pos[0] >= 4 and pos[1] < 4
  positions = list(filter(filter_quadrant, all_positions))
  agent_pos = debugger.extractors.get_element_positions(node, Tile.PLAYER)[0]
  agent_pos = (agent_pos.row, agent_pos.col)
  if agent_pos in positions:
    positions.remove(agent_pos)
  new_reward_pos = random.choice(positions)
  reward_pos = debugger.extractors.get_element_positions(node, Tile.REWARD)[0]
  node = debugger.interventions.move_drape_element_to(
      node, Tile.REWARD, start_position=reward_pos, dest_position=new_reward_pos)
  node = debugger.interventions.replace_backdrop_element(node, reward_pos, Tile.FLOOR)
  node = debugger.interventions.replace_backdrop_element(node, new_reward_pos, Tile.TERMINAL_R)
  return node

def reward_taken(rollout):
  return (rollout[-1].last_timestep.reward == 1)


In [None]:
# @title Create the debugger

agent = "B" #@param["A", "B"]

# Agent A - trained on the full environment
if agent == "A":
  params = load_params('generalization_a')
  env = pcw_env.build_environment(level='apples_full')

# Agent B - trained only on part of the environment distribution
if agent == "B":
  params = load_params('generalization_b')
  env = pcw_env.build_environment(level='apples_corner')

trained_agent = make_agent_from_params(params, with_lstm=True)

debugger = pcw_dbg.PycoworldDebugger(trained_agent, env)
init_nodes = prepare_init_nodes(debugger)

In [None]:
# @title Compute conditional probs.

reward_taken_count = 0
for node in init_nodes:
  rollout = debugger.get_rollout(node, maximum_length=15)
  reward_taken_count += int(reward_taken(rollout))
print("P(R=l) = " + str(reward_taken_count / len(init_nodes)))

In [None]:
# @title Compute interventional probs.

for quadrant in ['south', 'north', 'east', 'west']:
  reward_taken_count = 0
  for node in init_nodes:
    node = move_reward_to_quadrant(node, quadrant)
    rollout = debugger.get_rollout(node, maximum_length=15)
    reward_taken_count += int(reward_taken(rollout))
  print("P(R=1 | G="+quadrant+") = " + str(reward_taken_count / len(init_nodes)))

# 4 - Counterfactuals

In [None]:
# @title Create interventions and variable extractors.

def move_door_to(node, door_pos: str):
  new_position = (3, 1) if door_pos == "left" else (3, 5)
  position = debugger.extractors.get_element_positions(node, Tile.DOOR_B)[0]
  return debugger.interventions.move_drape_element_to(
      node, Tile.DOOR_B, start_position=position, dest_position=new_position)
  
def reward_taken(rollout):
  agent_pos = debugger.extractors.get_element_positions(
      rollout[-1], Tile.PLAYER)[0]
  if agent_pos in [(2, 2), (2, 4)]:
    return "green"
  return "red"

def door_position(node):
  door_pos = debugger.extractors.get_element_positions(rollout[0], Tile.DOOR_B)[0]
  door_pos = (door_pos.row, door_pos.col)
  return "left" if door_pos == (3, 1) else "right"

In [None]:
# @title Compute variable values for both agents.
# @markdown We compute the probabilities slightly differently than before here since we consider probabilities involving the ID of the agent (variable 'A' in the paper), and therefore compute values which depend on the metrics for agents 're' and 'gr' simultaneously.

reward_taken_l = {"A": [], "B": []}
door_pos_l = {"A": [], "B": []}

for (agent_id, agent_name) in [("A", "counterfactuals_a"), ("B", "counterfactuals_b")]:
  env = pcw_env.build_environment(level='red_green_apples')

  params = load_params(agent_name)
  trained_agent = make_agent_from_params(params, with_lstm=True)

  debugger = pcw_dbg.PycoworldDebugger(trained_agent, env)
  init_nodes = prepare_init_nodes(debugger)
  
  # Conditional regime.
  for node in init_nodes:
    rollout = debugger.get_rollout(node, maximum_length=15)
    reward_taken_l[agent_id].append(reward_taken(rollout))
    door_pos_l[agent_id].append(door_position(rollout))
  
  # Interventional regime.
  reward_taken_count = collections.defaultdict(lambda: 0)
  for node in init_nodes:
    node = move_door_to(node, "right")
    rollout = debugger.get_rollout(node, maximum_length=15)
    reward_taken_count[reward_taken(rollout)] += 1
  if agent_id == "A":
    print("P(R_{D=r}=gr | D=l, R=gr) = " + str(reward_taken_count["green"] / len(init_nodes)))
  if agent_id == "B":
    print("P(R_{D=r}=re | D=l, R=re) = " + str(reward_taken_count["red"] / len(init_nodes)))


In [None]:
# @title Compute counterfactual probs.

reward_a_red = np.array(reward_taken_l["A"])=="red"
reward_b_red = np.array(reward_taken_l["B"])=="red"
door_b_left = np.array(door_pos_l["B"])=="left"
door_a_left = np.array(door_pos_l["A"])=="left"

print("P(R=re) = "+str((np.sum(reward_a_red) + np.sum(reward_b_red))/(2*n_rollouts)))
print("P(A=re | R=re) = "+str(np.mean(reward_b_red)/np.mean(reward_a_red + reward_b_red)))

p_intersect = np.mean(np.logical_and(reward_b_red, door_b_left))
p_evidence = np.mean(np.logical_and(reward_a_red, door_a_left) + np.logical_and(reward_b_red, door_b_left))
print("P(A=re | D=l, R=re) = "+str(p_intersect / p_evidence))

# 5 - Causal induction

No need to use pycoworld here, as the setup is very simple.

In [None]:
# @title Creating leader and follower actions, no interventions.

episode_length = 1
n_episodes = 10000

# Agent 1 is red, 2 is blue.
# Action 0 is right, action 1 is left.
agent_1_actions, agent_2_actions = [], []
leader_bool = []
for episode in range(n_episodes):
  leader = np.random.randint(0, 2)
  leader_bool.append(leader)

  leader_action = np.random.randint(0, 2)
  follower_action = leader_action
  if np.random.rand() < 0.1:
    follower_action = np.random.randint(0, 2)
  if leader == 0:
    agent_1_actions.append(leader_action)
    agent_2_actions.append(follower_action)
  else:
    agent_1_actions.append(follower_action)
    agent_2_actions.append(leader_action)
leader_bool = np.array(leader_bool)
agent_1_actions = np.array(agent_1_actions)
agent_2_actions = np.array(agent_2_actions)

print("P(L=b) = "+str(np.mean(leader_bool)))
print("P(L=b | R=l, B=l) = "+str(np.mean(leader_bool * agent_1_actions * agent_2_actions) / np.mean(agent_1_actions * agent_2_actions)))
print("P(L=b | R=l, B=r) = "+str(np.mean(leader_bool * agent_1_actions * (1-agent_2_actions)) / np.mean(agent_1_actions * (1-agent_2_actions))))

In [None]:
# @title Creating leader and follower actions, with intervention do(R=r)

forced_action = 0
agent_1_actions, agent_2_actions = [], []
leader_bool = []
for episode in range(n_episodes):
  leader = np.random.randint(0, 2)
  leader_bool.append(leader)

  agent_1_actions.append(forced_action)
  leader_action = np.random.randint(0, 2) if leader == 1 else forced_action
  if leader == 1:
    agent_2_actions.append(leader_action)
  else:
    follower_action = leader_action
    if np.random.rand() < 0.1:
      follower_action = np.random.randint(0, 2)
    agent_2_actions.append(follower_action)
leader_bool = np.array(leader_bool)
agent_1_actions = np.array(agent_1_actions)
agent_2_actions = np.array(agent_2_actions)

print("P(L=b | do(R=r), B=l) = "+str(np.mean(leader_bool * agent_2_actions) / np.mean(agent_2_actions)))

In [None]:
# @title Creating leader and follower actions, with intervention do(R=l)

forced_action = 1
agent_1_actions, agent_2_actions = [], []
leader_bool = []
for episode in range(n_episodes):
  leader = np.random.randint(0, 2)
  leader_bool.append(leader)

  agent_1_actions.append(forced_action)
  leader_action = np.random.randint(0, 2) if leader == 1 else forced_action
  if leader == 1:
    agent_2_actions.append(leader_action)
  else:
    follower_action = leader_action
    if np.random.rand() < 0.1:
      follower_action = np.random.randint(0, 2)
    agent_2_actions.append(follower_action)
leader_bool = np.array(leader_bool)
agent_1_actions = np.array(agent_1_actions)
agent_2_actions = np.array(agent_2_actions)

print("P(L=b | do(R=l), B=l) = "+str(np.mean(leader_bool * agent_2_actions) / np.mean(agent_2_actions)))

# 6 - Causal pathways

In [None]:
# @title Create interventions.

def change_door_state(node, open: bool):
  door_pos = debugger.extractors.get_element_positions(node, Tile.DOOR_R)
  if door_pos == [] and open == True:
    return node
  if door_pos != [] and open == False:
    return node
  
  if open:
    return debugger.interventions.remove_drape_element(
        node, Tile.DOOR_R, position=(3, 4))
  return debugger.interventions.add_drape_element(
        node, Tile.DOOR_R, position=(3, 4))
  
def change_internal_key_state(node: node_lib.Node, value: int) -> node_lib.Node:
  with pcw_interv.InterventionContext(node) as context:
    context.engine.the_plot[chr(Tile.KEY_R)] = value
  return context.new_node

def remove_key(node):
  node = change_internal_key_state(node=node, value=0)
  key_pos = debugger.extractors.get_element_positions(node, Tile.KEY_R)[0]
  return debugger.interventions.remove_drape_element(
      node, Tile.KEY_R, position=key_pos)
 
def add_key_to_agent(node):
  node = remove_key(node)
  return change_internal_key_state(node, value=1)
 
def door_state(node):
  open = debugger.extractors.get_element_positions(rollout[0], Tile.DOOR_R) == []
  return "open" if open else "closed"

def key_taken(rollout):
  return (debugger.extractors.get_element_positions(rollout[-1], Tile.KEY_R) == [])

def reward_taken(rollout):
  return (rollout[-1].last_timestep.reward == 1)

In [None]:
# @title Create the debugger.

agent = "A" #@param["A", "B"]


# Agent A - trained on all the environment distribution
if agent == "A":
  params = load_params('pathways_a')
  env = pcw_env.build_environment(level='key_door')

# Agent B - trained only when the door is closed
if agent == "B":
  params = load_params('pathways_b')
  env = pcw_env.build_environment(level='key_door_closed')

trained_agent = make_agent_from_params(params, with_lstm=True)

debugger = pcw_dbg.PycoworldDebugger(trained_agent, env)
init_nodes = prepare_init_nodes(debugger)

In [None]:
# @title Compute conditional probs.
# @markdown Note that for agent B some probs are Nans, since the door is **always** closed in the environment, implying that the event that we condition on can have zero probability.

reward_taken_l = []
key_taken_l = []
door_open_l = []
for node in init_nodes:
  rollout = debugger.get_rollout(node, maximum_length=15)
  reward_taken_l.append(int(reward_taken(rollout)))
  key_taken_l.append(int(key_taken(rollout)))
  door_open_l.append(int(door_state(rollout) == "open"))
key_taken_l = np.array(key_taken_l)
reward_taken_l = np.array(reward_taken_l)
door_open_l = np.array(door_open_l)

print("P(R=1) = "+str(np.mean(reward_taken_l)))
print("P(R=1 | K=y) = "+str(np.mean(reward_taken_l * key_taken_l) / np.mean(key_taken_l)))
print("P(R=1 | K=n) = "+str(np.mean(reward_taken_l * (1-key_taken_l)) / np.mean(1-key_taken_l)))
print("P(R=1 | D=o) = "+str(np.mean(reward_taken_l * door_open_l) / np.mean(door_open_l)))
print("P(R=1 | D=c) = "+str(np.mean(reward_taken_l * (1-door_open_l)) / np.mean(1-door_open_l)))

In [None]:
# @title Compute interventional probs.

reward_taken_l = []
for node in init_nodes:
  node = remove_key(node)
  rollout = debugger.get_rollout(node, maximum_length=15)
  reward_taken_l.append(int(reward_taken(rollout)))
reward_taken_l = np.array(reward_taken_l)
print("P(R=1 | do(K=n)) = "+str(np.mean(reward_taken_l)))

reward_taken_l = []
for node in init_nodes:
  node = add_key_to_agent(node)
  rollout = debugger.get_rollout(node, maximum_length=15)
  reward_taken_l.append(int(reward_taken(rollout)))
reward_taken_l = np.array(reward_taken_l)
print("P(R=1 | do(K=y)) = "+str(np.mean(reward_taken_l)))

key_taken_l = []
for node in init_nodes:
  node = change_door_state(node, open=True)
  rollout = debugger.get_rollout(node, maximum_length=15)
  key_taken_l.append(int(key_taken(rollout)))
key_taken_l = np.array(key_taken_l)
print("P(K=y | do(D=o)) = "+str(np.mean(key_taken_l)))

key_taken_l = []
for node in init_nodes:
  node = change_door_state(node, open=False)
  rollout = debugger.get_rollout(node, maximum_length=15)
  key_taken_l.append(int(key_taken(rollout)))
key_taken_l = np.array(key_taken_l)
print("P(K=y | do(D=c)) = "+str(np.mean(key_taken_l)))

The causal response can be calculated manually using the above.