# Getting started with the Boltzmann Policy Distribution

This notebook walks through how to calculate the Boltzmann policy distribution (BPD) for a simple environment and then use it to predict human behavior. You can use this as a tutorial to get started with using the Boltzmann policy distribution for your own environments!

If you are running this notebook in Google Colab, it is recommended to use a GPU. You can enable GPU acceleration by going to **Runtime > Change runtime type** and selecting **GPU** from the dropdown.

First, make sure you have installed the `boltzmann-policy-distribution` package, either from GitHub or PyPI:

In [None]:
try:
    import bpd
except ImportError:
    !pip install boltzmann-policy-distribution

# Avoid warnings from RLlib:
!pip install moviepy gputil

## Building an environment

We'll build an environment similar to the one found in Figures 3 and 6 of [the paper](https://openreview.net/pdf?id=_l_QjPGN5ye) to demonstrate how to use the BPD. In this gridworld, the agent must repetitively travel to the an apple tree, pick an apple, and drop it in a basket. The world is in the shape of a ring with the apple tree and basket opposite one another, so the agent can travel either way around to pick apples. The agent receives 1 reward for dropping off an apple in the basket.

Our implementation of the BPD is based on [RLlib](https://docs.ray.io/en/latest/rllib/index.html), which supports environments defined with the [OpenAI Gym](https://gym.openai.com/) interface, so our environment will use Gym.

In [None]:
from typing import Any, Dict, Tuple
from typing_extensions import TypedDict
from ray.tune.registry import register_env

import gym
from gym import spaces
import numpy as np


class ApplePickingConfigDict(TypedDict):
    ring_size: int
    horizon: int


DEFAULT_CONFIG: ApplePickingConfigDict = {"ring_size": 8, "horizon": 100}


class ApplePickingEnv(gym.Env):
    current_position: int
    holding_apple: bool
    timestep: int

    def __init__(self, config: ApplePickingConfigDict = DEFAULT_CONFIG):
        super().__init__()
        self.config = {**DEFAULT_CONFIG, **config}

        self.obs_space_size = self.config["ring_size"] + 1
        self.observation_space = spaces.Box(
            low=np.zeros(self.obs_space_size, dtype=np.float32),
            high=np.ones(self.obs_space_size, dtype=np.float32),
        )
        self.action_space = spaces.Discrete(2)

    def _get_obs(self) -> np.ndarray:
        obs = np.zeros(self.obs_space_size)
        obs[self.current_position] = 1
        if self.holding_apple:
            obs[-1] = 1
        return obs

    def reset(self) -> np.ndarray:
        self.current_position = 0
        self.holding_apple = False
        self.timestep = 0
        return self._get_obs()

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict[str, Any]]:
        if action == 0:
            self.current_position += 1
            # Wrap around the ring if necessary.
            if self.current_position >= self.config["ring_size"]:
                self.current_position -= self.config["ring_size"]
        elif action == 1:
            self.current_position -= 1
            # Wrap around the ring if necessary.
            if self.current_position < 0:
                self.current_position += self.config["ring_size"]
        else:
            assert False, "Invalid action"

        reward = 0
        # The basket is at position 0 in the ring.
        if self.current_position == 0 and self.holding_apple:
            # Drop off apple.
            reward = 1
            self.holding_apple = False
        # The tree is halfway around the ring.
        elif (
            self.current_position == self.config["ring_size"] // 2
            and not self.holding_apple
        ):
            # Pick apple.
            self.holding_apple = True

        self.timestep += 1
        done = self.timestep >= self.config["horizon"]

        return self._get_obs(), float(reward), done, {}


register_env("apple_picking", lambda env_config: ApplePickingEnv(env_config))

### Human data

The purpose of the BPD is to better predict human behavior, so let's generate some simulated human data to evaluate it with. We'll generate data for two consistent humans, one of whom always moves around the ring to the right and one of which always moves left. We can also generate data for a human which alternates going left and right (i.e., they are not as consistent).

In [None]:
from typing import List
from ray.rllib.evaluation import SampleBatch


def generate_human_trajectory(action_sequence: List[int]) -> SampleBatch:
    env = ApplePickingEnv()
    done = False
    obs_list = []
    action_list = []
    reward_list = []
    obs = env.reset()
    timestep = 0
    while not done:
        obs_list.append(obs)
        action = action_sequence[timestep]
        action_list.append(action)
        obs, reward, done, info = env.step(action)
        reward_list.append(reward)
        timestep += 1
    return SampleBatch(
        {
            SampleBatch.OBS: np.array(obs_list),
            SampleBatch.ACTIONS: np.array(action_list),
            SampleBatch.PREV_ACTIONS: np.array([0] + action_list[:-1]),
            SampleBatch.REWARDS: np.array(reward_list),
        }
    )


human_trajectories = [
    generate_human_trajectory([0] * 100),
    generate_human_trajectory([1] * 100),
    generate_human_trajectory(([0] * 8 + [1] * 8) * 7),
]

## Calculating a Boltzmann rational policy

Before calculating the Boltzmann policy distribution for our environment, let's first calculate a Boltzmann rational policy to use as a baseline. We can do this with entropy-regularized policy gradient, where we add an entropy bonus to the policy gradient objective. We'll use RLlib's build-in [PPO](https://arxiv.org/abs/1707.06347) implementation to train the policy:

In [None]:
import tqdm
from ray.rllib.agents.ppo import PPOTrainer

# Configure the algorithm.
ppo_trainer = PPOTrainer(
    config={
        "env": "apple_picking",
        "framework": "torch",
        "lr": 1e-3,
        "gamma": 0.9,
        "train_batch_size": 2000,
        "sgd_minibatch_size": 2000,
        "create_env_on_driver": True,
        # This adds the entropy regularizer to calculate the Boltzmann rational policy.
        "entropy_coeff": 1,
    }
)

for _ in tqdm.trange(10):
    training_result = ppo_trainer.train()

eval_result = ppo_trainer.evaluate()
print("Mean reward = ", eval_result["evaluation"]["episode_reward_mean"])

Now that we've trained a Boltzmann rational policy, let's see how well it predicts the simulated human actions.

In [None]:
import torch
from ray.rllib.policy.torch_policy import TorchPolicy


def evaluate_cross_entropy(policy: TorchPolicy, trajectory):
    trajectory = trajectory.copy()
    action_dist_inputs, _ = policy.model(
        policy._lazy_tensor_dict(trajectory),
        [
            torch.tensor(state)[None].to(policy.device)
            for state in policy.get_initial_state()
        ],
        np.array([len(trajectory)]),
    )
    action_distribution = policy.dist_class(action_dist_inputs)
    return -action_distribution.logp(trajectory[SampleBatch.ACTIONS]).mean()


boltzmann_rational_policy = ppo_trainer.get_policy()
for traj_index, human_trajectory in enumerate(human_trajectories):
    cross_entropy = evaluate_cross_entropy(boltzmann_rational_policy, human_trajectory)
    print(
        f"Trajectory {traj_index}: Boltzmann rationality cross-entropy = {cross_entropy:.2f}"
    )

As you can see, the Boltzmann rational policy does an okay job of predicting human behavior. The cross-entropy is lower than if it was predicting totally random actions (then it would be $\ln(2) \approx 0.7$). However, it can't take advantage of the fact that the first two human trajectories are consistent—that the human in those trajectories is always going around the ring the same way.

## Using the Boltzmann policy distribution (BPD)

Now, let's calculate the Boltzmann policy distribution for this apple-picking environment. Unlike Boltzmann rationality, the BPD can take advantage of consistent human behavior to give better predictions of human actions.

Unlike for Boltzmann rationality, we can't use built-in RLlib algorithms to calculate the BPD. Fortunately, we provide an RLlib-compatible BPD calculation algorithm! Using it requires just a bit more work than what we've already done.

First, we have to implement a policy network that can produce a *distribution* over policies, rather than a single policy. We do this by conditioning on a latent vector which is passed in as part of the observation. We found that the best policy network architectures for the BPD use an attention mechanism to decide which parts of this latent vector to use for calculating action probabilities.

The policy network also has to provide a *discriminator*, which is used in the implicit variational inference algorithm that calculates the BPD.

The policy network implemented below looks complicated, but it's easy to modify for other environments. In fact, it should be possible to use it directly with other environments that only require fully-connected layers in their policy networks.

In [None]:
from typing import Dict, List, Optional, Tuple, Union
import gym
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.policy.rnn_sequencing import add_time_dimension
from ray.rllib.policy.sample_batch import SampleBatch
import torch
from torch import nn
import numpy as np
from ray.rllib.utils.typing import ModelConfigDict, TensorType


class TransformerDiscriminator(nn.Module):
    def __init__(
        self,
        in_dim: int,
        out_dim: int,
        size_hidden: int,
        num_layers: int,
        sum_over_seq: bool = True,
    ):
        super().__init__()

        self.sum_over_seq = sum_over_seq

        self.encoder = nn.Linear(in_dim, size_hidden)
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=size_hidden,
                nhead=1,
                dim_feedforward=size_hidden,
                batch_first=True,
            ),
            num_layers=num_layers,
        )
        self.head = nn.Linear(size_hidden, out_dim)

    def forward(
        self,
        obs: torch.Tensor,
        seq_lens: Union[torch.Tensor, np.ndarray],
    ) -> torch.Tensor:
        encoded_obs = self.encoder(obs)

        if isinstance(seq_lens, np.ndarray):
            seq_lens = torch.Tensor(seq_lens).int()
        max_seq_len = encoded_obs.shape[0] // seq_lens.shape[0]
        transformer_inputs = add_time_dimension(
            encoded_obs,
            max_seq_len=max_seq_len,
            framework="torch",
            time_major=False,
        )

        transformer_outputs = self.transformer(transformer_inputs)
        outputs: torch.Tensor = self.head(transformer_outputs)
        if self.sum_over_seq:
            outputs = outputs.sum(dim=1)
        else:
            outputs = outputs.reshape(-1, outputs.size()[-1])
        return outputs


class ApplePickingDistributionModel(TorchModelV2, nn.Module):
    def __init__(
        self,
        obs_space: gym.spaces.Space,
        action_space: gym.spaces.Space,
        num_outputs: int,
        model_config: ModelConfigDict,
        name: str,
        latent_size: int,
        num_heads: int = 4,
        discriminate_sequences: bool = False,
    ):
        TorchModelV2.__init__(
            self, obs_space, action_space, num_outputs, model_config, name
        )
        nn.Module.__init__(self)

        assert self.model_config["vf_share_layers"] is True
        assert len(self.model_config["fcnet_hiddens"]) == 3

        self.num_heads = num_heads
        self.latent_size = latent_size
        self.discriminate_sequences = discriminate_sequences

        in_dim = self.obs_space.shape[-1] + self.num_outputs
        self.discriminator_net = self._build_discriminator(in_dim)
        self.detached_discriminator_net = self._build_discriminator(in_dim)

        self.backbone = nn.Sequential(
            nn.Linear(
                obs_space.shape[0] - latent_size, self.model_config["fcnet_hiddens"][0]
            ),
            nn.LeakyReLU(),
            nn.Linear(
                self.model_config["fcnet_hiddens"][0],
                self.model_config["fcnet_hiddens"][1],
            ),
            nn.LeakyReLU(),
        )
        self.attention = nn.Linear(
            self.model_config["fcnet_hiddens"][1], self.num_heads * latent_size
        )
        self.head = nn.Sequential(
            nn.Linear(
                self.model_config["fcnet_hiddens"][1] + self.num_heads,
                self.model_config["fcnet_hiddens"][2],
            ),
            nn.LeakyReLU(),
            nn.Linear(self.model_config["fcnet_hiddens"][2], num_outputs),
        )

        self.value_head = nn.Sequential(
            nn.Linear(
                self.model_config["fcnet_hiddens"][1] + self.num_heads,
                self.model_config["fcnet_hiddens"][2],
            ),
            nn.LeakyReLU(),
            nn.Linear(self.model_config["fcnet_hiddens"][2], 1),
        )

    def get_initial_state(self) -> List[np.ndarray]:
        if self.discriminate_sequences:
            return [np.zeros(1)]
        else:
            return super().get_initial_state()

    def forward(
        self,
        input_dict: Dict[str, TensorType],
        state: List[TensorType],
        seq_lens: TensorType,
    ) -> Tuple[TensorType, List[TensorType]]:
        obs = input_dict["obs_flat"].float()
        self._last_flat_in = obs.reshape(obs.shape[0], -1)

        self._features = self.backbone(self._last_flat_in[:, : -self.latent_size])
        attention_weights = self.attention(self._features)
        attention_weights = attention_weights.reshape(-1, 4, self.latent_size)
        attention_weights = attention_weights.softmax(-1)

        latent_attention_output = (
            attention_weights * self._last_flat_in[:, None, -self.latent_size :]
        ).sum(2)
        head_input = torch.cat(
            [self._features, latent_attention_output],
            dim=1,
        )

        logits = self.head(head_input)
        self._vf = self.value_head(head_input)[:, 0]

        return logits, [s + 1 for s in state]

    def value_function(self) -> TensorType:
        return self._vf

    def _build_discriminator(
        self,
        in_dim: int,
        out_dim: int = 1,
    ) -> nn.Module:
        if self.discriminate_sequences:
            return TransformerDiscriminator(
                in_dim,
                1,
                self.model_config["fcnet_hiddens"][0],
                len(self.model_config["fcnet_hiddens"]),
            )
        else:
            dims = [in_dim] + self.model_config["fcnet_hiddens"] + [out_dim]
            layers: List[nn.Module] = []
            for dim1, dim2 in zip(dims[:-1], dims[1:]):
                layers.append(nn.Linear(dim1, dim2))
                layers.append(nn.LeakyReLU())
            layers = layers[:-1]
            return nn.Sequential(*layers)

    def discriminator(
        self, input_dict, seq_lens: Optional[torch.Tensor] = None, detached=False
    ):
        """
        Takes in a dictionary with observations and action probabilities and
        outputs whether it thinks they came from this policy distribution or
        the prior.

        If detached is True, then this will run through a separate, "detached" copy of
        the discriminator which will not propagate gradients to the main network.
        """

        if detached:
            self.detached_discriminator_net.load_state_dict(
                self.discriminator_net.state_dict(keep_vars=False),
            )
            self.detached_discriminator_net.eval()
            discriminator_net = self.detached_discriminator_net
        else:
            discriminator_net = self.discriminator_net

        obs = input_dict[SampleBatch.OBS]
        ac_probs = input_dict[SampleBatch.ACTION_PROB]
        if not detached:
            ac_probs = ac_probs + torch.normal(torch.zeros_like(ac_probs), 0.1)
        net_input = torch.cat([obs, ac_probs], dim=1)
        net_input[
            :, self.obs_space.shape[-1] - self.latent_size : self.obs_space.shape[-1]
        ] = 0

        if self.discriminate_sequences:
            return discriminator_net(net_input, seq_lens)
        else:
            return discriminator_net(net_input)


ModelCatalog.register_custom_model(
    "apple_picking_distribution_model",
    ApplePickingDistributionModel,
)

Now that we've implemented the policy network, we're ready to calculate the BPD for this environment with implicit variational inference! We'll use similar code to when we trained the Boltzmann rational policy, but now we replace the PPO trainer with a BPD trainer. There are a few other important differences:
 * We need to wrap our environment in a `LatentEnvWrapper` from `bpd.envs.latent_wrapper`. This will pass in the latent vector which the BPD is conditioned to the policy network on as part of the observation.
 * We're using our custom model for the policy and value network, `ApplePickingDistributionModel`.
 * We pass in a temperature (the $1 / \beta$ parameter from the paper), which controls how suboptimal the human is, and a prior concentration (the $\alpha$ parameter from the paper), which controls how inconsistent the human is.

In [None]:
import tqdm
import torch
from bpd.agents.bpd_trainer import BPDTrainer

latent_size = 2

# Configure the algorithm.
bpd_trainer = BPDTrainer(
    config={
        "env": "latent_wrapper",
        "env_config": {
            "env": "apple_picking",
            "env_config": {},
            "latent_dist": lambda: np.random.normal(0, 1, 2),
            "episodes_per_latent": 1,
            "agents_with_latent": {0},
        },
        "model": {
            "custom_model": "apple_picking_distribution_model",
            "max_seq_len": 10,
            "vf_share_layers": True,
            "custom_model_config": {
                "latent_size": latent_size,
                "discriminate_sequences": True,
            },
            "fcnet_hiddens": [64, 64, 64],
        },
        "temperature": 0.1,
        "prior_concentration": 0.2,
        "latent_size": latent_size,
        "framework": "torch",
        "lr": 1e-3,
        "gamma": 0.9,
        "num_sgd_iter": 30,
        "train_batch_size": 2000,
        "sgd_minibatch_size": 2000,
        "num_gpus": 1 if torch.cuda.is_available() else 0,
        "create_env_on_driver": True,
        "vf_loss_coeff": 0.01,
    }
)

for _ in tqdm.trange(50):
    training_result = bpd_trainer.train()

eval_result = bpd_trainer.evaluate()
print("Mean reward = ", eval_result["evaluation"]["episode_reward_mean"])

### Online prediction with the BPD

Now that we've calculated the BPD for our environment, we can use it for online prediction of human behavior. Online prediction with the BPD requires solving an inference problem: we use the BPD as a prior over human policies and update it to a posterior after observing actions that the human takes. In the paper, we explored two ways of approximating this inference. The first explicitly models a posterior distribution over the latent vector with mean-field variational inference (MFVI). We have implemented this algorithm in the `VIActionPredictor` class. Try running the code below to see how the predictions made by the BPD compare to those made by Boltzmann rationality.

In [None]:
from bpd.latent_prediction import VIActionPredictor

bpd_policy = bpd_trainer.get_policy()

for traj_index, human_trajectory in enumerate(human_trajectories):
    action_logits = VIActionPredictor(bpd_policy).predict_actions(human_trajectory)
    action_distribution = bpd_policy.dist_class(action_logits)
    cross_entropy = -action_distribution.logp(
        human_trajectory[SampleBatch.ACTIONS]
    ).mean()
    print(
        f"Trajectory {traj_index}: Boltzmann policy distribution (using MFVI) cross-entropy = {cross_entropy:.2f}"
    )

Notice how using the BPD results in much lower cross-entropy for predictions on the first two human trajectories, both of which are *consistent*. On the *inconsistent* third human trajectory, the BPD performs about the same as Boltzmann rationality. Fortunately, we find that real humans are usually consistent, so the BPD greatly outperforms Boltzmann rationality at predicting human behavior.

The other method we used to approximate the online prediction inference doesn't use an explicit representation of the posterior over the latent vector at all. Instead, it directly approximates the online prediction problem $p(a_t \mid s_1, a_1, \dots, s_t)$. To do this, we generate many trajectories from our calculated BPD policies and then train an autoregressive sequence model to predict the next action conditioned on previous states and actions. With enough trajectories, this approach should precisely approximate the exact online prediction inference.

We have implemented an RLlib trainer called `DistillationPredictionTrainer` which can trains one policy to imitate another policy. In this case, we'll imitate our latent-conditioned BPD policy with an LSTM policy that *can't see the latent vector*. This will force the LSTM policy to learn to perform exactly the inference described above.

In [None]:
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.policy.policy import PolicySpec
from bpd.agents.distillation_prediction import DistillationPredictionTrainer

PREDICTION_POLICY_ID = "prediction"
env = ApplePickingEnv()
prediction_trainer = DistillationPredictionTrainer(
    config={
        "env": "latent_wrapper",
        "env_config": {
            **bpd_trainer.config["env_config"],
        },
        "distillation_mapping_fn": lambda policy_id: PREDICTION_POLICY_ID,
        "multiagent": {
            "policies": {
                DEFAULT_POLICY_ID: PolicySpec(
                    None,
                    bpd_policy.observation_space,
                    bpd_policy.action_space,
                    {"model": bpd_trainer.config["model"]},
                ),
                PREDICTION_POLICY_ID: PolicySpec(
                    None,
                    env.observation_space,
                    env.action_space,
                    {
                        "model": {
                            "use_lstm": True,
                            "lstm_cell_size": 32,
                            "lstm_use_prev_action": True,
                            "max_seq_len": 100,
                        },
                    },
                ),
            },
            "policies_to_train": [PREDICTION_POLICY_ID],
            "policy_mapping_fn": lambda agent_id, episodes, worker, **kwargs: DEFAULT_POLICY_ID,
        },
        "framework": "torch",
        "lr": 1e-3,
        "num_sgd_iter": 3,
        "train_batch_size": 500,
        "sgd_minibatch_size": 500,
        "num_gpus": 1 if torch.cuda.is_available() else 0,
        "create_env_on_driver": True,
        "num_workers": 0,
    }
)
prediction_trainer.set_weights(bpd_trainer.get_weights())

for _ in tqdm.trange(40):
    training_result = prediction_trainer.train()

print(
    "Train cross entropy = ",
    training_result["info"]["learner"][PREDICTION_POLICY_ID]["learner_stats"][
        "cross_entropy"
    ],
)

Let's see how our LSTM prediction policy does on the simulated human data:

In [None]:
bpd_prediction_policy = prediction_trainer.get_policy(PREDICTION_POLICY_ID)
for traj_index, human_trajectory in enumerate(human_trajectories):
    cross_entropy = evaluate_cross_entropy(bpd_prediction_policy, human_trajectory)
    print(f"Trajectory {traj_index}: BPD cross-entropy = {cross_entropy:.2f}")

Like with the MFVI inference method, the BPD outperforms Boltzmann rationality for the first two trajectories where the simulated human is consistent. Notice how fast the LSTM prediction is—much faster than MFVI. This makes it useful for applications where we need to predict human behavior in real time.

## Conclusion

Now you should be ready to use the Boltzmann policy distribution for modeling humans in new environments. If you want to see how we used the BPD for a more complex environment, Overcooked, see the [README](https://github.com/cassidylaidlaw/boltzmann-policy-distribution/blob/main/README.md). Feel free to reach out with any questions to [cassidy_laidlaw@berkeley.edu](mailto:cassidy_laidlaw@berkeley.edu).

Happy human modeling!