#Setup Colab

In [0]:
from google.colab import drive 

drive.mount('/content/gdrive')
%cd "/content/gdrive/My Drive/Colab Notebooks/PBT_MARL_watered_down/"
!pwd
!ls -l

# Install if you haven't done so.
!pip install tensorflow==2.2.0
!pip install ray[rllib]==0.8.5   
#!pip show tensorflow
#!pip show ray

#Chkpt/restore & log path

In [0]:
g_drive_path = "/content/gdrive/My Drive/Colab Notebooks/PBT_MARL_watered_down/"

local_dir = g_drive_path + "chkpt/"
chkpt_freq = 10
chkpt = 150
restore_path = "{}checkpoint_{}/checkpoint-{}".format(local_dir, chkpt, chkpt)
is_restore = False

log_dir = g_drive_path + "ray_results/"

#Imports

In [0]:
from collections import defaultdict
from typing import Dict
import random
import numpy as np

from gym.spaces import Discrete

import ray
from ray import tune

from ray.tune.registry import register_env
from ray.rllib.models import ModelCatalog

from ray.rllib.policy import Policy

from ray.rllib.agents.ppo import ppo
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo import appo
from ray.rllib.agents.ppo.appo import APPOTrainer
from ray.rllib.agents.ppo import ddppo
from ray.rllib.agents.ppo.ddppo import DDPPOTrainer

from ray.rllib.env import BaseEnv
from ray.rllib.env.multi_agent_env import MultiAgentEnv

from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.evaluation import MultiAgentEpisode, RolloutWorker
from ray.rllib.agents.callbacks import DefaultCallbacks

from ray.rllib.utils import try_import_tf
from ray.tune.logger import pretty_print

tf = try_import_tf()

from RockPaperScissorsEnv import RockPaperScissorsEnv
from Helper import Helper
from PBT_MARL import PBT_MARL

#Callbacks

In [0]:
"""#Callbacks"""

class MyCallbacks(DefaultCallbacks):
    def on_episode_start(self, worker: RolloutWorker, base_env: BaseEnv,
                         policies: Dict[str, Policy],
                         episode: MultiAgentEpisode, **kwargs):
        print("on_episode_start {}, _agent_to_policy {}".format(episode.episode_id, episode._agent_to_policy))
        episode.hist_data["episode_id"] = []

    def on_episode_step(self, worker: RolloutWorker, base_env: BaseEnv,
                        episode: MultiAgentEpisode, **kwargs):
          """
          pole_angle = abs(episode.last_observation_for()[2])
          raw_angle = abs(episode.last_raw_obs_for()[2])
          assert pole_angle == raw_angle
          episode.user_data["pole_angles"].append(pole_angle)
          """
          pass

    def on_episode_end(self, worker: RolloutWorker, base_env: BaseEnv,
                       policies: Dict[str, Policy], episode: MultiAgentEpisode,
                       **kwargs):
        print("on_episode_end {}, episode.agent_rewards {}".format(episode.episode_id, episode.agent_rewards))

        player_policy = []
        score = []
        for k,v in episode.agent_rewards.items():
            player_policy.append(k)
            score.append(v)

        pol_i_key = player_policy[0][1]
        pol_j_key = player_policy[1][1]
        _, str_i = pol_i_key.split("_")
        _, str_j = pol_j_key.split("_")
        agt_i_key = "agt_" + str_i
        agt_j_key = "agt_" + str_j

        g_helper = ray.util.get_actor("g_helper")     
        prev_rating_i = ray.get(g_helper.get_rating.remote(agt_i_key))
        prev_rating_j = ray.get(g_helper.get_rating.remote(agt_j_key))
        score_i = score[0]
        score_j = score[1]
        rating_i, rating_j = l_PBT_MARL.compute_rating(prev_rating_i, prev_rating_j, score_i, score_j)
        ray.get(g_helper.update_rating.remote(agt_i_key, agt_j_key, rating_i, rating_j, score_i, score_j))
        print("on_episode_end ray.get(g_helper.get_agt_store.remote())", ray.get(g_helper.get_agt_store.remote()))

    def on_sample_end(self, worker: RolloutWorker, samples: SampleBatch,
                      **kwargs):
        print("on_sample_end returned sample batch of size {}".format(samples.count))

    def on_train_result(self, trainer, result: dict, **kwargs):
        print("trainer.train() result: {} -> {} episodes".format(trainer, result["episodes_this_iter"]))
        # you can mutate the result dict to add new fields to return
        result["callback_ok"] = True
        print("on_train_result result", result)

        l_PBT_MARL.PBT(trainer)     # perform PBT

        g_helper = ray.util.get_actor("g_helper")     
        ray.get(g_helper.set_pair.remote())     # set the lastest pair
        print("on_train_result g_helper.get_pair.remote()", ray.get(g_helper.get_pair.remote()))

    def on_postprocess_trajectory(
            self, worker: RolloutWorker, episode: MultiAgentEpisode,
            agent_id: str, policy_id: str, policies: Dict[str, Policy],
            postprocessed_batch: SampleBatch,
            original_batches: Dict[str, SampleBatch], **kwargs):
        print("postprocessed {}, {}, {}, {} steps".format(episode, agent_id, policy_id, postprocessed_batch.count))
        """
        if "num_batches" not in episode.custom_metrics:
            episode.custom_metrics["num_batches"] = 0
        episode.custom_metrics["num_batches"] += 1
        """

#Policy

In [0]:
def init_policies(population_size, obs_space, act_space, use_lstm, hyperparameters_range):
    """
    Sample hyper-parameter from the hyper-parameter distribution.
    """
    policies = {}
    for i in range(population_size):
        pol_key = "p_" + str(i)
        lr = np.random.uniform(low=hyperparameters_range["lr"][0], high=hyperparameters_range["lr"][1], size=None)
        gamma = np.random.uniform(low=hyperparameters_range["gamma"][0], high=hyperparameters_range["gamma"][1], size=None)
        policies[pol_key] = (None, obs_space, act_space, {"model": {"use_lstm": use_lstm},
                                                          "lr": lr,
                                                          "gamma": gamma})
    return policies

def train_policies(population_size):    
    train_policies = []
    for i in range(population_size):
        pol_key = "p_" + str(i)
        train_policies.append(pol_key)

    return policies

def select_policy(agent_id):
    _, i = agent_id.split("_")
    policy = "p_" + str(i)
    print("select_policy {} {}".format(agent_id , policy))
    return policy     

#Variables

In [0]:
population_size = 8
K = 0.1     
T_select = 0.77 #0.47
binomial_n = 1
inherit_prob = 0.5
perturb_prob = 0.1
perturb_val = [0.8, 1.2]
hyperparameters_range = {"lr": [0.00001, 0.01], 
                         "gamma": [0.9, 0.999]}

register_env("RockPaperScissorsEnv", lambda _: RockPaperScissorsEnv(_, population_size))     # register RockPaperScissorsEnv with RLlib     
# get obs & act spaces from dummy CDA env
dummy_env = RockPaperScissorsEnv(_, population_size=0)
obs_space = dummy_env.observation_space
act_space = dummy_env.action_space

use_lstm=False
policies = init_policies(population_size, obs_space, act_space, use_lstm, hyperparameters_range)
train_policies = train_policies(population_size)

l_PBT_MARL = PBT_MARL(population_size, 
                      K, T_select, 
                      binomial_n, inherit_prob,
                      perturb_prob, perturb_val)

ray.shutdown()
ray.init(ignore_reinit_error=True, log_to_driver=True, webui_host='127.0.0.1', num_cpus=2, num_gpus=1)      #start ray
print("ray.nodes()", ray.nodes())

g_helper = Helper.options(name="g_helper").remote(population_size, policies)      # this object runs on a different ray actor process
ray.get(g_helper.set_pair.remote())

num_iters = 15     # num of main training loop

2020-06-11 07:57:14,116	INFO resource_spec.py:212 -- Starting Ray with 7.18 GiB memory available for workers and up to 3.59 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-06-11 07:57:14,580	INFO services.py:1170 -- View the Ray dashboard at [1m[32m127.0.0.1:8265[39m[22m


ray.nodes() [{'NodeID': 'e41d10539bf08b56039e2431a5f75a431d43206a', 'Alive': True, 'NodeManagerAddress': '172.28.0.2', 'NodeManagerHostname': '571b538b671a', 'NodeManagerPort': 53651, 'ObjectManagerPort': 43717, 'ObjectStoreSocketName': '/tmp/ray/session_2020-06-11_07-57-14_115178_125/sockets/plasma_store', 'RayletSocketName': '/tmp/ray/session_2020-06-11_07-57-14_115178_125/sockets/raylet', 'Resources': {'node:172.28.0.2': 1.0, 'GPU': 1.0, 'object_store_memory': 50.0, 'memory': 147.0, 'CPU': 2.0}, 'alive': True}]


#Config

In [0]:
def get_config():
    config = ddppo.DEFAULT_CONFIG.copy()

    config["env"] = RockPaperScissorsEnv
    config["multiagent"] = {"policies_to_train": train_policies,
                            "policies": policies,
                            "policy_mapping_fn": select_policy}        
    config["num_cpus_per_worker"] = 0.25                                
    config["num_gpus_per_worker"] = 0.125
    config["num_workers"] = 2      
    config["num_envs_per_worker"] = 3
    config["rollout_fragment_length"] = 30                  
    config["train_batch_size"] = -1     # must be -1 for DDPPO trainer                                                            
    config["sgd_minibatch_size"] = 10                       
    config["num_sgd_iter"] = 3      # number of epochs to execute per train batch.
    config["callbacks"] = MyCallbacks
    config["log_level"] = "WARN"      # WARN/INFO/DEBUG 
    config["output"] = log_dir

    return config

#Go train

In [0]:
def go_train(config):     
    trainer = ddppo.DDPPOTrainer(config=config, env="RockPaperScissorsEnv")         

    if is_restore == True:
        trainer.restore(restore_path) 
    
    result = None
    for i in range(num_iters):
        result = trainer.train()       
        print("training loop = {} of {}".format(i + 1, num_iters))            
        print(pretty_print(result))     # includes result["custom_metrics"]
    
        if i % chkpt_freq == 0:
            checkpoint = trainer.save(local_dir)
            print("checkpoint saved at", checkpoint)
    
    checkpoint = trainer.save(local_dir)
    print("checkpoint saved at", checkpoint)
    

# run everything
go_train(get_config())    

ray.shutdown()

Output hidden; open in https://colab.research.google.com to view.