# Custom Multi Agent Env with Variable-Length Action Spaces in RLlib

RLlib on card games:
- How to train multiple agents. In particular, every agent (player) should have its own trajectory so that its final reward propagates on his trajectory. Still, all the agents might follow the same policy.
    https://docs.ray.io/en/master/rllib-env.html#multi-agent-and-hierarchical 
- Action space changes depending on the current state. Depending on the cards on the table I might  not be able to play some cards in my hand. In order to mask out some actions:
    https://docs.ray.io/en/master/rllib-models.html#variable-length-parametric-action-spaces

RLlib can create distinct policies and route agent decisions to its bound policy. When an agent first appears in the env, policy_mapping_fn will be called to determine which policy it is bound to. These assignments are done when the agent first enters the episode, and persist for the duration of the episode.

RLlib reports separate training statistics for each policy in the return from train(), along with the combined reward.

If all “agents” in the env are homogeneous, then it is possible to use existing single-agent algorithms for training. Since there is still only a single policy being trained, RLlib only needs to internally aggregate the experiences of the different agents prior to policy optimization.

In [1]:
%load_ext autoreload
%autoreload 2

## 1. Create the custom environment
https://docs.ray.io/en/latest/rllib-env.html?multi-agent-and-hierarchical#multi-agent-and-hierarchical

Example of multi-agent environment:
```python
# Example: using a multi-agent env
> env = MultiAgentTrafficEnv(num_cars=20, num_traffic_lights=5)

# Observations are a dict mapping agent names to their obs. Only those
# agents' names that require actions in the next call to `step()` will
# be present in the returned observation dict.
> print(env.reset())
{
    "car_1": [[...]],
    "car_2": [[...]],
    "traffic_light_1": [[...]],
}

# In the following call to `step`, actions should be provided for each
# agent that returned an observation before:
> new_obs, rewards, dones, infos = env.step(actions={"car_1": ..., "car_2": ..., "traffic_light_1": ...})

# Similarly, new_obs, rewards, dones, etc. also become dicts
> print(rewards)
{"car_1": 3, "car_2": -1, "traffic_light_1": 0}

# Individual agents can early exit; The entire episode is done when "__all__" = True
> print(dones)
{"car_2": True, "__all__": False}
```

Example of environment with variable-length action spaces. For each agent, also the available actions have to be returned as a mask:
```python
> print(env.reset())
{
    "car_1": {
        "real_obs": [[...]],
        "action_mask": [0, 0, 1, ...]
    },
    "car_2": {
        "real_obs": [[...]],
        "action_mask": [1, 0, 1, ...]
    },
    "traffic_light_1": {
        "real_obs": [[...]],
        "action_mask": [1, 1, 1, ...]
    },
}
```

In [3]:
# Implementation of the Multi Agent Env. game

from gym.spaces import Dict, Discrete, Tuple, Box
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.tune.registry import register_env
import random
import numpy as np

class Actions:
    # number of actions
    SIZE = 3

    # types o actions
    ROCK = 0
    PAPER = 1
    SCISSORS = 2
    NA = 3  # Not Available, hand not yet played

class RockPaperScissors(MultiAgentEnv):
    """
    Two-player environment for the famous rock paper scissors game, modified:
    - There are two agents which alternate, the action of one agent provides the
        state for the next agent. Since one of the two players begins, the agent
        which starts second should learn to always win! The startign player
        is drawn randomly.
    - The action space changes. The game is divided in three rounds across
        which you can't re-use the same action.
    """

    # Action/State spaces
    ACTION_SPACE = Discrete(Actions.SIZE)

    OBSERVATION_SPACE = Dict({
        "real_obs": Tuple((
            # First round
            Tuple((Discrete(4), Discrete(4))),

            # Second round
            Tuple((Discrete(4), Discrete(4))),

            # Third round
            Tuple((Discrete(4), Discrete(4))),
        )),

        # we have to handle changing action spaces
        "action_mask": Box(0, 1, shape=(Actions.SIZE, )),
    })
    
    
    # Reward mapping
    rewards = {
        (Actions.ROCK, Actions.ROCK): (0, 0),
        (Actions.ROCK, Actions.PAPER): (-1, 1),
        (Actions.ROCK, Actions.SCISSORS): (1, -1),
        (Actions.PAPER, Actions.ROCK): (1, -1),
        (Actions.PAPER, Actions.PAPER): (0, 0),
        (Actions.PAPER, Actions.SCISSORS): (-1, 1),
        (Actions.SCISSORS, Actions.ROCK): (-1, 1),
        (Actions.SCISSORS, Actions.PAPER): (1, -1),
        (Actions.SCISSORS, Actions.SCISSORS): (0, 0),
    }

    def __init__(self, config=None):
        
        # state and action spaces
        self.action_space = self.ACTION_SPACE
        self.observation_space = self.OBSERVATION_SPACE

        self.players = ["player_1", "player_2"]        

    def reset(self):
        self.player_scores = {p: 0 for p in self.players}  # just used to collect the scores
        self.curr_round = 0
        self.player_pointer = random.randint(0, 1)
        self.state = [
            [3, 3],
            [3, 3],
            [3, 3],
        ]

        # reward is given to the last player with 1 delay
        self.reward_buffer = {p: 0 for p in self.players}
        
        # actions cannot be reused across one game, we keep a mask for each player
        self.action_mask = {p: [1 for _ in range(self.action_space.n)] for p in self.players}

        return {self.players[self.player_pointer]: self.get_state(self.players[self.player_pointer])}

    def step(self, action_dict):
        # Get current player
        curr_player_pointer = self.player_pointer
        curr_player = self.players[self.player_pointer]

        # Get next player
        next_player_pointer = (self.player_pointer + 1) % 2
        next_player = self.players[next_player_pointer]
    
        # Make sure you have the ation only for the current player
        assert curr_player in action_dict and len(action_dict) == 1,\
            "{} should be playing but action {} was received.".format(curr_player, action_dict)
        
        # Play the action
        curr_action = action_dict[curr_player]
        assert self.action_space.contains(curr_action), 'Action {} is not valid'.format(curr_action)
        assert self.state[self.curr_round][curr_player_pointer] == Actions.NA,\
            "Player {} has already played in round {}. Here the current state: {}".format(
            curr_player_pointer,
            self.curr_round,
            self.state
        )        
        assert self.action_mask[curr_player][curr_action] == 1, \
            '{} has already played action {}. State: {}'.format(curr_player, curr_action, self.state)
        self.action_mask[curr_player][curr_action] = 0  # mask out this action
        self.state[self.curr_round][curr_player_pointer] = curr_action

        # We might be not done yet
        done = {"__all__": False}
        
        # If the next player has already played, the round is done
        game_done = False
        round_done = self.state[self.curr_round][next_player_pointer] != Actions.NA
        if round_done:
            # If the round is done we compute the rewards
            curr_rewards = self.rewards[tuple(self.state[self.curr_round])]
            self.player_scores["player_1"] += curr_rewards[0]
            self.player_scores["player_2"] += curr_rewards[1]            
            self.reward_buffer[curr_player] = curr_rewards[curr_player_pointer]
            
            self.curr_round += 1
            if self.curr_round == 3:
                done = {"__all__": True}
                # Return reward and state for all players
                reward = self.reward_buffer
                obs = {p: self.get_state(next_player) for p in self.players}
                game_done = True
        
        # Get the state and reward for the next player
        if not game_done:
            obs = {next_player: self.get_state(next_player)}
            reward = {next_player: self.reward_buffer[next_player]}
        
        # Move pointer to next player
        self.player_pointer = next_player_pointer
        return obs, reward, done, {}

    def get_state(self, player):
        return {
            'real_obs': self.state,
            'action_mask': self.action_mask[player]
        }
    
register_env("ParametricRPS", lambda _: RockPaperScissors())

In [4]:
# Test the environment
import random

env = RockPaperScissors()
obs = env.reset()
print(obs)

is_done = False
while not is_done:
    print('\nRound {}: {}'.format(env.curr_round, env.players[env.player_pointer]))
    action = {list(obs.keys())[0]: int(input('Insert action (0, 1, 2): '))}
    obs, reward, done, _ = env.step(action)
    print(obs, reward, done)
    is_done = done['__all__']

{'player_2': {'real_obs': [[3, 3], [3, 3], [3, 3]], 'action_mask': [1, 1, 1]}}

Round 0: player_2
Insert action (0, 1, 2): 1
{'player_1': {'real_obs': [[3, 1], [3, 3], [3, 3]], 'action_mask': [1, 1, 1]}} {'player_1': 0} {'__all__': False}

Round 0: player_1
Insert action (0, 1, 2): 0
{'player_2': {'real_obs': [[0, 1], [3, 3], [3, 3]], 'action_mask': [1, 0, 1]}} {'player_2': 0} {'__all__': False}

Round 1: player_2
Insert action (0, 1, 2): 0
{'player_1': {'real_obs': [[0, 1], [3, 0], [3, 3]], 'action_mask': [0, 1, 1]}} {'player_1': -1} {'__all__': False}

Round 1: player_1
Insert action (0, 1, 2): 1
{'player_2': {'real_obs': [[0, 1], [1, 0], [3, 3]], 'action_mask': [0, 0, 1]}} {'player_2': 0} {'__all__': False}

Round 2: player_2
Insert action (0, 1, 2): 2
{'player_1': {'real_obs': [[0, 1], [1, 0], [3, 2]], 'action_mask': [0, 0, 1]}} {'player_1': 1} {'__all__': False}

Round 2: player_1
Insert action (0, 1, 2): 2
{'player_1': {'real_obs': [[0, 1], [1, 0], [2, 2]], 'action_mask': [0, 0, 

## 2. Create the custom model for Variable-Length Action Spaces
https://docs.ray.io/en/master/rllib-models.html#variable-length-parametric-action-spaces
Our policy has to take into consideration the fact that some actions might not be executable.

See here for which algorithms support parametric actions: https://docs.ray.io/en/master/rllib-algorithms.html#feature-compatibility-matrix


**The cartpole example has working configurations for DQN (must set hiddens=[]), PPO (must disable running mean and set vf_share_layers=True), and several other algorithms.**

In [5]:
import tensorflow as tf
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork
from ray.rllib.models import ModelCatalog
from ray.rllib.models.model import flatten
from ray.rllib.models.preprocessors import get_preprocessor

def flatten_list(list_of_lists):
    flattened = []
    for l in list_of_lists:
        if isinstance(l, list):
            flattened += flatten_list(l)
        else:
            flattened.append(l)
    return flattened

class ParametricActionsModel(TFModelV2):
    def __init__(self,
                 obs_space,
                 action_space,
                 num_outputs,
                 model_config,
                 name,
                 true_obs_shape=(24,),
                 action_embed_size=None):
        super(ParametricActionsModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)
        
        if action_embed_size is None:
            action_embed_size = action_space.n  # this works for Dicrete() action

        self.action_embed_model = FullyConnectedNetwork(
            obs_space=Box(-1, 1, shape=true_obs_shape),
            action_space=action_space,
            num_outputs=action_embed_size,
            model_config=model_config,
            name=name + "_action_embed"
        )
        self.base_model = self.action_embed_model.base_model
        self.register_variables(self.action_embed_model.variables())

    def forward(self, input_dict, state, seq_lens):        
        # Compute the predicted action probabilties
        # input_dict["obs"]["real_obs"] is a list of 1d tensors if the observation space is a Tuple while
        # it should be a tensor. When it is a list we concatenate the various 1d tensors
        obs_concat = input_dict["obs"]["real_obs"]
        if isinstance(obs_concat, list):
            obs_concat = tf.concat(values=flatten_list(obs_concat), axis=1)
        action_embed, _ = self.action_embed_model({"obs": obs_concat})

        # Mask out invalid actions (use tf.float32.min for stability)
        action_mask = input_dict["obs"]["action_mask"]
        inf_mask = tf.maximum(tf.math.log(action_mask), tf.float32.min)
        return action_embed + inf_mask, state

    def value_function(self):
        return self.action_embed_model.value_function()

ModelCatalog.register_custom_model("parametric_model_tf", ParametricActionsModel)

## 3. Train the Agents

In [6]:
# before training we have to initialize ray
import ray
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.agents.dqn import DQNTrainer

ray.shutdown()
ray.init(num_cpus=4)

2020-06-21 19:39:41,237	INFO resource_spec.py:212 -- Starting Ray with 3.03 GiB memory available for workers and up to 1.52 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-06-21 19:39:41,643	INFO services.py:1170 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


{'node_ip_address': '192.168.1.125',
 'raylet_ip_address': '192.168.1.125',
 'redis_address': '192.168.1.125:57511',
 'object_store_address': '/tmp/ray/session_2020-06-21_19-39-41_236820_334940/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-06-21_19-39-41_236820_334940/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-06-21_19-39-41_236820_334940'}

In [7]:
ppo_trainer_config = {
    "env": "ParametricRPS",  # RockPaperScissors
    "model": {
        "custom_model": "parametric_model_tf",  # ParametricActionsModel,
    },
}

dqn_trainer_config = {
    "env": "ParametricRPS",  # RockPaperScissors
    "model": {
        "custom_model": "parametric_model_tf",  # ParametricActionsModel,
    },
    'hiddens':[],
    "dueling": False,
}

### 3.1. Example with tune

In [8]:
from ray import tune

stop = {
    "episode_reward_mean": 2.90,
#     "timesteps_total": stop_timesteps,
#     "training_iteration": stop_iters,
}

#### PPO

In [9]:
results = tune.run(
    PPOTrainer,
    name='RLlibExample',
    config=ppo_trainer_config,
    verbose=1,
    stop=stop
)

Trial name,status,loc,iter,total time (s),ts,reward
PPO_ParametricRPS_00000,TERMINATED,,7,29.8183,28798,2.90987


#### DQN

In [10]:
results = tune.run(
    DQNTrainer,
    name='RLlibExample',
    config=dqn_trainer_config,
    verbose=1,
    stop=stop
)

Trial name,status,loc,iter,total time (s),ts,reward
DQN_ParametricRPS_00000,TERMINATED,,11,24.4405,11023,2.98204


### 3.2. Plain example without tune

#### PPO

In [11]:
trainer = PPOTrainer(config=ppo_trainer_config)
for i in range(5):
    res = trainer.train()
    print("Iteration {}. episode_reward_mean: {}".format(i, res['episode_reward_mean']))

2020-06-21 19:40:44,257	INFO trainer.py:421 -- Tip: set 'eager': true or the --eager flag to enable TensorFlow eager execution
2020-06-21 19:40:44,263	INFO trainer.py:580 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
2020-06-21 19:40:46,985	INFO trainable.py:217 -- Getting current IP.


[2m[36m(pid=335345)[0m   ret = umr_sum(arr, axis, dtype, out, keepdims)


  ret = umr_sum(arr, axis, dtype, out, keepdims)


Iteration 0. episode_reward_mean: 0.030042918454935622
Iteration 1. episode_reward_mean: 0.7871064467766117
Iteration 2. episode_reward_mean: 1.6071428571428572
Iteration 3. episode_reward_mean: 2.1557142857142857
Iteration 4. episode_reward_mean: 2.7085714285714286


#### DQN

In [13]:
trainer = DQNTrainer(config=dqn_trainer_config)
for i in range(5):
    res = trainer.train()
    print("Iteration {}. episode_reward_mean: {}".format(i, res['episode_reward_mean']))

2020-06-21 19:41:09,395	INFO trainable.py:217 -- Getting current IP.


Iteration 0. episode_reward_mean: 0.3413173652694611
Iteration 1. episode_reward_mean: 0.3413173652694611
Iteration 2. episode_reward_mean: 0.46706586826347307
Iteration 3. episode_reward_mean: 0.7904191616766467
Iteration 4. episode_reward_mean: 1.221556886227545


### 3.3. Example with multiple policies
Inspired from: https://github.com/ray-project/ray/blob/master/rllib/examples/multi_agent_two_trainers.py

In [15]:
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.agents.dqn import DQNTrainer
from ray.tune.logger import pretty_print


policies = {
    "ppo_policy_1": (PPOTFPolicy,
                     RockPaperScissors.OBSERVATION_SPACE,
                     RockPaperScissors.ACTION_SPACE,
                     ppo_trainer_config),
    "dqn_policy_1": (DQNTFPolicy,
                     RockPaperScissors.OBSERVATION_SPACE,
                     RockPaperScissors.ACTION_SPACE,
                     dqn_trainer_config),
}

# Define the PPO trainer
ppo_trainer = PPOTrainer(config={
    "env": "ParametricRPS",  # RockPaperScissors
    "multiagent": {
        "policies_to_train": ['ppo_policy_1'],
        "policies": policies,
        "policy_mapping_fn": lambda agent_id: "ppo_policy_1" if agent_id=="player_1" else "dqn_policy_1",
    },
    # disable filters, otherwise we would need to synchronize those
    # as well to the DQN agent
    "observation_filter": "NoFilter",
})


# Define the DQN trainer
dqn_trainer = DQNTrainer(config={
    "env": "ParametricRPS",  # RockPaperScissors
    "multiagent": {
        "policies_to_train": ['dqn_policy_1'],
        "policies": policies,
        "policy_mapping_fn": lambda agent_id: "ppo_policy_1" if agent_id=="player_1" else "dqn_policy_1",
    },
})

# Alternate training of the two policies
stop_reward = 2.9
for i in range(20):
    print("== Iteration", i, "==")

    # improve the DQN policy
    print("-- DQN --")
    result_dqn = dqn_trainer.train()
    print("\tDQN. episode_reward_mean: {}".format(result_dqn['episode_reward_mean']))

    # improve the PPO policy
    print("-- PPO --")
    result_ppo = ppo_trainer.train()
    print("\tPPO. episode_reward_mean: {}".format(result_ppo['episode_reward_mean']))

    # Test passed gracefully.
    if (
        result_dqn["episode_reward_mean"] > stop_reward and
        result_ppo["episode_reward_mean"] > stop_reward
    ):
        print("test passed (both agents above requested reward)")
        break

    # swap weights to synchronize
#     dqn_trainer.set_weights(ppo_trainer.get_weights(["ppo_policy"]))
#     ppo_trainer.set_weights(dqn_trainer.get_weights(["dqn_policy"]))

2020-06-21 19:41:24,153	INFO trainable.py:217 -- Getting current IP.
2020-06-21 19:41:26,404	INFO trainable.py:217 -- Getting current IP.


== Iteration 0 ==
-- DQN --
	DQN. episode_reward_mean: 0.30538922155688625
-- PPO --
[2m[36m(pid=335354)[0m   ret = umr_sum(arr, axis, dtype, out, keepdims)
	PPO. episode_reward_mean: -0.04504504504504504
== Iteration 1 ==
-- DQN --
	DQN. episode_reward_mean: -0.03592814371257485
-- PPO --
	PPO. episode_reward_mean: 0.34234234234234234
== Iteration 2 ==
-- DQN --
	DQN. episode_reward_mean: 0.25
-- PPO --
	PPO. episode_reward_mean: 0.781437125748503
== Iteration 3 ==
-- DQN --
	DQN. episode_reward_mean: 0.4491017964071856
-- PPO --
	PPO. episode_reward_mean: 0.9864864864864865
== Iteration 4 ==
-- DQN --
	DQN. episode_reward_mean: 0.5748502994011976
-- PPO --
	PPO. episode_reward_mean: 1.3108108108108107
== Iteration 5 ==
-- DQN --
	DQN. episode_reward_mean: 0.9107142857142857
-- PPO --
	PPO. episode_reward_mean: 1.3158682634730539
== Iteration 6 ==
-- DQN --
	DQN. episode_reward_mean: 1.1137724550898203
-- PPO --
	PPO. episode_reward_mean: 1.5765765765765767
== Iteration 7 ==
-- DQN

### 3.4. Check Model config

In [20]:
from ray.tune.logger import pretty_print
# policy_id = 'default_policy'
policy_id = 'ppo_policy_1'
# policy_id = 'dqn_policy_1'
trainer = dqn_trainer

model = trainer.get_policy(policy_id=policy_id).model
print(pretty_print(model.model_config))
model.base_model.summary()

conv_activation: relu
custom_model: parametric_model_tf
custom_options: {}
dim: 84
fcnet_activation: tanh
fcnet_hiddens:
- 256
- 256
framestack: true
free_log_std: false
grayscale: false
lstm_cell_size: 256
lstm_use_prev_action_reward: false
max_seq_len: 20
no_final_linear: false
use_lstm: false
vf_share_layers: false
zero_mean: true

Model: "model_4"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
observations (InputLayer)       [(None, 24)]         0                                            
__________________________________________________________________________________________________
fc_1 (Dense)                    (None, 256)          6400        observations[0][0]               
__________________________________________________________________________________________________
fc_value_1 (Dense)              (None, 256)         

## 4. Evaluate the agents
Execute in the console:
```console
tensorboard --logdir=~/ray_results --host=0.0.0.0
```