diff --git a/ml-agents/mlagents/trainers/ghost/trainer.py b/ml-agents/mlagents/trainers/ghost/trainer.py index 849deeae5d..50d9aad74c 100644 --- a/ml-agents/mlagents/trainers/ghost/trainer.py +++ b/ml-agents/mlagents/trainers/ghost/trainer.py @@ -304,7 +304,10 @@ def save_model(self) -> None: self.trainer.save_model() def create_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> Policy: """ Creates policy with the wrapped trainer's create_policy function @@ -313,10 +316,10 @@ def create_policy( team are grouped. All policies associated with this team are added to the wrapped trainer to be trained. """ - policy = self.trainer.create_policy(parsed_behavior_id, behavior_spec) - policy.create_tf_graph() + policy = self.trainer.create_policy( + parsed_behavior_id, behavior_spec, create_graph=True + ) self.trainer.saver.initialize_or_load(policy) - policy.init_load_weights() team_id = parsed_behavior_id.team_id self.controller.subscribe_team_id(team_id, self) @@ -326,7 +329,6 @@ def create_policy( parsed_behavior_id, behavior_spec ) self.trainer.add_policy(parsed_behavior_id, internal_trainer_policy) - internal_trainer_policy.init_load_weights() self.current_policy_snapshot[ parsed_behavior_id.brain_name ] = internal_trainer_policy.get_weights() diff --git a/ml-agents/mlagents/trainers/policy/tf_policy.py b/ml-agents/mlagents/trainers/policy/tf_policy.py index 707023ab3b..47789d1e92 100644 --- a/ml-agents/mlagents/trainers/policy/tf_policy.py +++ b/ml-agents/mlagents/trainers/policy/tf_policy.py @@ -152,6 +152,8 @@ def create_tf_graph(self) -> None: # We do an initialize to make the Policy usable out of the box. If an optimizer is needed, # it will re-load the full graph self.initialize() + # Create assignment ops for Ghost Trainer + self.init_load_weights() def _create_encoder( self, diff --git a/ml-agents/mlagents/trainers/policy/torch_policy.py b/ml-agents/mlagents/trainers/policy/torch_policy.py index 5fa135f6a2..e2ed73b25f 100644 --- a/ml-agents/mlagents/trainers/policy/torch_policy.py +++ b/ml-agents/mlagents/trainers/policy/torch_policy.py @@ -1,6 +1,7 @@ from typing import Any, Dict, List import numpy as np import torch +import copy from mlagents.trainers.action_info import ActionInfo from mlagents.trainers.behavior_id_utils import get_global_agent_id @@ -249,13 +250,13 @@ def increment_step(self, n_steps): return self.get_current_step() def load_weights(self, values: List[np.ndarray]) -> None: - pass + self.actor_critic.load_state_dict(values) def init_load_weights(self) -> None: pass def get_weights(self) -> List[np.ndarray]: - return [] + return copy.deepcopy(self.actor_critic.state_dict()) def get_modules(self): return {"Policy": self.actor_critic, "global_step": self.global_step} diff --git a/ml-agents/mlagents/trainers/ppo/trainer.py b/ml-agents/mlagents/trainers/ppo/trainer.py index c16bc3439d..a9ca897fbe 100644 --- a/ml-agents/mlagents/trainers/ppo/trainer.py +++ b/ml-agents/mlagents/trainers/ppo/trainer.py @@ -217,11 +217,15 @@ def _update_policy(self): return True def create_tf_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> TFPolicy: """ Creates a PPO policy to trainers list of policies. :param behavior_spec: specifications for policy construction + :param create_graph: whether to create the graph when policy is constructed :return policy """ policy = TFPolicy( @@ -229,6 +233,7 @@ def create_tf_policy( behavior_spec, self.trainer_settings, condition_sigma_on_obs=False, # Faster training for PPO + create_tf_graph=create_graph, ) return policy diff --git a/ml-agents/mlagents/trainers/sac/trainer.py b/ml-agents/mlagents/trainers/sac/trainer.py index e342ad4b03..11cc6762c5 100644 --- a/ml-agents/mlagents/trainers/sac/trainer.py +++ b/ml-agents/mlagents/trainers/sac/trainer.py @@ -228,7 +228,10 @@ def maybe_load_replay_buffer(self): ) def create_tf_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> TFPolicy: policy = TFPolicy( self.seed, @@ -236,7 +239,7 @@ def create_tf_policy( self.trainer_settings, tanh_squash=True, reparameterize=True, - create_tf_graph=False, + create_tf_graph=create_graph, ) self.maybe_load_replay_buffer() return policy diff --git a/ml-agents/mlagents/trainers/tests/test_ghost.py b/ml-agents/mlagents/trainers/tests/test_ghost.py index e72f573f36..acc9711830 100644 --- a/ml-agents/mlagents/trainers/tests/test_ghost.py +++ b/ml-agents/mlagents/trainers/tests/test_ghost.py @@ -38,12 +38,9 @@ def test_load_and_set(dummy_config, use_discrete): trainer_params = dummy_config trainer = PPOTrainer("test", 0, trainer_params, True, False, 0, "0") trainer.seed = 1 - policy = trainer.create_policy("test", mock_specs) - policy.create_tf_graph() + policy = trainer.create_policy("test", mock_specs, create_graph=True) trainer.seed = 20 # otherwise graphs are the same - to_load_policy = trainer.create_policy("test", mock_specs) - to_load_policy.create_tf_graph() - to_load_policy.init_load_weights() + to_load_policy = trainer.create_policy("test", mock_specs, create_graph=True) weights = policy.get_weights() load_weights = to_load_policy.get_weights() diff --git a/ml-agents/mlagents/trainers/tests/torch/test_ghost.py b/ml-agents/mlagents/trainers/tests/torch/test_ghost.py new file mode 100644 index 0000000000..06f0666cc8 --- /dev/null +++ b/ml-agents/mlagents/trainers/tests/torch/test_ghost.py @@ -0,0 +1,177 @@ +import pytest + +import numpy as np + +from mlagents.trainers.ghost.trainer import GhostTrainer +from mlagents.trainers.ghost.controller import GhostController +from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers +from mlagents.trainers.ppo.trainer import PPOTrainer +from mlagents.trainers.agent_processor import AgentManagerQueue +from mlagents.trainers.tests import mock_brain as mb +from mlagents.trainers.tests.test_trajectory import make_fake_trajectory +from mlagents.trainers.settings import TrainerSettings, SelfPlaySettings, FrameworkType + + +@pytest.fixture +def dummy_config(): + return TrainerSettings( + self_play=SelfPlaySettings(), framework=FrameworkType.PYTORCH + ) + + +VECTOR_ACTION_SPACE = 1 +VECTOR_OBS_SPACE = 8 +DISCRETE_ACTION_SPACE = [3, 3, 3, 2] +BUFFER_INIT_SAMPLES = 513 +NUM_AGENTS = 12 + + +@pytest.mark.parametrize("use_discrete", [True, False]) +def test_load_and_set(dummy_config, use_discrete): + mock_specs = mb.setup_test_behavior_specs( + use_discrete, + False, + vector_action_space=DISCRETE_ACTION_SPACE + if use_discrete + else VECTOR_ACTION_SPACE, + vector_obs_space=VECTOR_OBS_SPACE, + ) + + trainer_params = dummy_config + trainer = PPOTrainer("test", 0, trainer_params, True, False, 0, "0") + trainer.seed = 1 + policy = trainer.create_policy("test", mock_specs) + trainer.seed = 20 # otherwise graphs are the same + to_load_policy = trainer.create_policy("test", mock_specs) + + weights = policy.get_weights() + load_weights = to_load_policy.get_weights() + try: + for w, lw in zip(weights, load_weights): + np.testing.assert_array_equal(w, lw) + except AssertionError: + pass + + to_load_policy.load_weights(weights) + load_weights = to_load_policy.get_weights() + + for w, lw in zip(weights, load_weights): + np.testing.assert_array_equal(w, lw) + + +def test_process_trajectory(dummy_config): + mock_specs = mb.setup_test_behavior_specs( + True, False, vector_action_space=[2], vector_obs_space=1 + ) + behavior_id_team0 = "test_brain?team=0" + behavior_id_team1 = "test_brain?team=1" + brain_name = BehaviorIdentifiers.from_name_behavior_id(behavior_id_team0).brain_name + + ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0") + controller = GhostController(100) + trainer = GhostTrainer( + ppo_trainer, brain_name, controller, 0, dummy_config, True, "0" + ) + + # first policy encountered becomes policy trained by wrapped PPO + parsed_behavior_id0 = BehaviorIdentifiers.from_name_behavior_id(behavior_id_team0) + policy = trainer.create_policy(parsed_behavior_id0, mock_specs) + trainer.add_policy(parsed_behavior_id0, policy) + trajectory_queue0 = AgentManagerQueue(behavior_id_team0) + trainer.subscribe_trajectory_queue(trajectory_queue0) + + # Ghost trainer should ignore this queue because off policy + parsed_behavior_id1 = BehaviorIdentifiers.from_name_behavior_id(behavior_id_team1) + policy = trainer.create_policy(parsed_behavior_id1, mock_specs) + trainer.add_policy(parsed_behavior_id1, policy) + trajectory_queue1 = AgentManagerQueue(behavior_id_team1) + trainer.subscribe_trajectory_queue(trajectory_queue1) + + time_horizon = 15 + trajectory = make_fake_trajectory( + length=time_horizon, + max_step_complete=True, + observation_shapes=[(1,)], + action_space=[2], + ) + trajectory_queue0.put(trajectory) + trainer.advance() + + # Check that trainer put trajectory in update buffer + assert trainer.trainer.update_buffer.num_experiences == 15 + + trajectory_queue1.put(trajectory) + trainer.advance() + + # Check that ghost trainer ignored off policy queue + assert trainer.trainer.update_buffer.num_experiences == 15 + # Check that it emptied the queue + assert trajectory_queue1.empty() + + +def test_publish_queue(dummy_config): + mock_specs = mb.setup_test_behavior_specs( + True, False, vector_action_space=[1], vector_obs_space=8 + ) + + behavior_id_team0 = "test_brain?team=0" + behavior_id_team1 = "test_brain?team=1" + + parsed_behavior_id0 = BehaviorIdentifiers.from_name_behavior_id(behavior_id_team0) + + brain_name = parsed_behavior_id0.brain_name + + ppo_trainer = PPOTrainer(brain_name, 0, dummy_config, True, False, 0, "0") + controller = GhostController(100) + trainer = GhostTrainer( + ppo_trainer, brain_name, controller, 0, dummy_config, True, "0" + ) + + # First policy encountered becomes policy trained by wrapped PPO + # This queue should remain empty after swap snapshot + policy = trainer.create_policy(parsed_behavior_id0, mock_specs) + trainer.add_policy(parsed_behavior_id0, policy) + policy_queue0 = AgentManagerQueue(behavior_id_team0) + trainer.publish_policy_queue(policy_queue0) + + # Ghost trainer should use this queue for ghost policy swap + parsed_behavior_id1 = BehaviorIdentifiers.from_name_behavior_id(behavior_id_team1) + policy = trainer.create_policy(parsed_behavior_id1, mock_specs) + trainer.add_policy(parsed_behavior_id1, policy) + policy_queue1 = AgentManagerQueue(behavior_id_team1) + trainer.publish_policy_queue(policy_queue1) + + # check ghost trainer swap pushes to ghost queue and not trainer + assert policy_queue0.empty() and policy_queue1.empty() + trainer._swap_snapshots() + assert policy_queue0.empty() and not policy_queue1.empty() + # clear + policy_queue1.get_nowait() + + mock_specs = mb.setup_test_behavior_specs( + False, + False, + vector_action_space=VECTOR_ACTION_SPACE, + vector_obs_space=VECTOR_OBS_SPACE, + ) + + buffer = mb.simulate_rollout(BUFFER_INIT_SAMPLES, mock_specs) + # Mock out reward signal eval + buffer["extrinsic_rewards"] = buffer["environment_rewards"] + buffer["extrinsic_returns"] = buffer["environment_rewards"] + buffer["extrinsic_value_estimates"] = buffer["environment_rewards"] + buffer["curiosity_rewards"] = buffer["environment_rewards"] + buffer["curiosity_returns"] = buffer["environment_rewards"] + buffer["curiosity_value_estimates"] = buffer["environment_rewards"] + buffer["advantages"] = buffer["environment_rewards"] + trainer.trainer.update_buffer = buffer + + # when ghost trainer advance and wrapped trainer buffers full + # the wrapped trainer pushes updated policy to correct queue + assert policy_queue0.empty() and policy_queue1.empty() + trainer.advance() + assert not policy_queue0.empty() and policy_queue1.empty() + + +if __name__ == "__main__": + pytest.main() diff --git a/ml-agents/mlagents/trainers/trainer/rl_trainer.py b/ml-agents/mlagents/trainers/trainer/rl_trainer.py index 7ae4f08c21..d920a43279 100644 --- a/ml-agents/mlagents/trainers/trainer/rl_trainer.py +++ b/ml-agents/mlagents/trainers/trainer/rl_trainer.py @@ -119,7 +119,10 @@ def _is_ready_update(self): return False def create_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> Policy: if self.framework == FrameworkType.PYTORCH and TorchPolicy is None: raise UnityTrainerException( @@ -128,7 +131,9 @@ def create_policy( elif self.framework == FrameworkType.PYTORCH: return self.create_torch_policy(parsed_behavior_id, behavior_spec) else: - return self.create_tf_policy(parsed_behavior_id, behavior_spec) + return self.create_tf_policy( + parsed_behavior_id, behavior_spec, create_graph=create_graph + ) @abc.abstractmethod def create_torch_policy( @@ -141,7 +146,10 @@ def create_torch_policy( @abc.abstractmethod def create_tf_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> TFPolicy: """ Create a Policy object that uses the TensorFlow backend. diff --git a/ml-agents/mlagents/trainers/trainer/trainer.py b/ml-agents/mlagents/trainers/trainer/trainer.py index a08b2dd6ad..55ac5a9ef1 100644 --- a/ml-agents/mlagents/trainers/trainer/trainer.py +++ b/ml-agents/mlagents/trainers/trainer/trainer.py @@ -125,7 +125,10 @@ def end_episode(self): @abc.abstractmethod def create_policy( - self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec + self, + parsed_behavior_id: BehaviorIdentifiers, + behavior_spec: BehaviorSpec, + create_graph: bool = False, ) -> Policy: """ Creates policy