From c5b5666ae51a3a634d0784663226543ec733d79e Mon Sep 17 00:00:00 2001 From: Kaiwen Wang Date: Tue, 5 May 2020 19:15:41 -0700 Subject: [PATCH] Reimplement MDNRNN using new gym. (#253) Summary: Using our new gym, test MDNRNN feature importance/sensitivity. Also, train DQN to play POMDP string game with states embedded with MDNRNN. This is in preparation to nuke old gym folder. Pull Request resolved: https://github.com/facebookresearch/ReAgent/pull/253 Differential Revision: D21385499 Pulled By: kaiwenw fbshipit-source-id: a4fa462ecdd5352e4cbb7cbb956517fcdf0f1502 --- reagent/core/dataclasses.py | 3 +- reagent/evaluation/world_model_evaluator.py | 119 ++-- reagent/gym/agents/post_step.py | 56 +- reagent/gym/envs/__init__.py | 18 + .../envs/dynamics}/linear_dynamics.py | 6 +- .../{test/gym => gym/envs}/pomdp/pocman.py | 2 +- reagent/gym/envs/pomdp/state_embed_env.py | 123 ++++ .../gym => gym/envs}/pomdp/string_game.py | 10 +- reagent/gym/envs/utils.py | 19 + reagent/gym/policies/random_policies.py | 12 +- .../gym/policies/samplers/discrete_sampler.py | 2 +- reagent/gym/preprocessors/__init__.py | 2 + .../gym/preprocessors/trainer_preprocessor.py | 35 + .../world_model/cartpole_features.yaml | 20 + .../world_model/discrete_dqn_string.yaml | 55 ++ reagent/gym/tests/test_gym.py | 147 ++-- .../tests}/test_linear_dynamics.py | 4 +- .../environment => gym/tests}/test_pomdp.py | 9 +- reagent/gym/tests/test_world_model.py | 427 ++++++++++++ reagent/gym/tests/utils.py | 58 ++ reagent/models/mdn_rnn.py | 52 +- reagent/models/world_model.py | 2 +- reagent/parameters.py | 66 +- .../replay_memory/circular_replay_buffer.py | 62 +- reagent/test/base/horizon_test_base.py | 28 + reagent/test/environment/__init__.py | 17 - reagent/test/gym/open_ai_gym_environment.py | 10 +- reagent/test/gym/pomdp/__init__.py | 18 - reagent/test/gym/run_gym.py | 8 +- reagent/test/gym/world_model/mdnrnn_gym.py | 646 ------------------ .../test/gym/world_model/state_embed_gym.py | 266 -------- .../test/gym/world_model/test_mdnrnn_gym.py | 64 -- .../gym/world_model/test_state_embed_gym.py | 66 -- reagent/test/world_model/test_mdnrnn.py | 16 +- reagent/training/__init__.py | 16 +- reagent/training/cem_trainer.py | 5 +- .../training/world_model/mdnrnn_trainer.py | 101 +-- reagent/types.py | 36 +- reagent/workflow/gym_batch_rl.py | 13 +- .../model_managers/discrete/discrete_dqn.py | 2 +- .../model_managers/discrete_dqn_base.py | 8 +- .../model_managers/model_based/__init__.py | 7 + .../model_managers/model_based/world_model.py | 49 ++ reagent/workflow/model_managers/union.py | 1 + .../model_managers/world_model_base.py | 77 +++ reagent/workflow_utils/transitional.py | 32 +- requirements.txt | 1 + setup.cfg | 1 + 48 files changed, 1292 insertions(+), 1505 deletions(-) rename reagent/{test/environment => gym/envs/dynamics}/linear_dynamics.py (95%) rename reagent/{test/gym => gym/envs}/pomdp/pocman.py (99%) create mode 100644 reagent/gym/envs/pomdp/state_embed_env.py rename reagent/{test/gym => gym/envs}/pomdp/string_game.py (84%) create mode 100644 reagent/gym/envs/utils.py create mode 100644 reagent/gym/preprocessors/trainer_preprocessor.py create mode 100644 reagent/gym/tests/configs/world_model/cartpole_features.yaml create mode 100644 reagent/gym/tests/configs/world_model/discrete_dqn_string.yaml rename reagent/{test/environment => gym/tests}/test_linear_dynamics.py (96%) rename reagent/{test/environment => gym/tests}/test_pomdp.py (89%) create mode 100644 reagent/gym/tests/test_world_model.py create mode 100644 reagent/gym/tests/utils.py delete mode 100644 reagent/test/gym/pomdp/__init__.py delete mode 100644 reagent/test/gym/world_model/mdnrnn_gym.py delete mode 100644 reagent/test/gym/world_model/state_embed_gym.py delete mode 100644 reagent/test/gym/world_model/test_mdnrnn_gym.py delete mode 100644 reagent/test/gym/world_model/test_state_embed_gym.py create mode 100644 reagent/workflow/model_managers/model_based/__init__.py create mode 100644 reagent/workflow/model_managers/model_based/world_model.py create mode 100644 reagent/workflow/model_managers/world_model_base.py diff --git a/reagent/core/dataclasses.py b/reagent/core/dataclasses.py index d77194477..84c0d0c89 100644 --- a/reagent/core/dataclasses.py +++ b/reagent/core/dataclasses.py @@ -41,8 +41,9 @@ pass +logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) - +logger.setLevel(logging.INFO) logger.info(f"USE_VANILLA_DATACLASS: {USE_VANILLA_DATACLASS}") logger.info(f"ARBITRARY_TYPES_ALLOWED: {ARBITRARY_TYPES_ALLOWED}") diff --git a/reagent/evaluation/world_model_evaluator.py b/reagent/evaluation/world_model_evaluator.py index 1437dca66..0579f3b49 100644 --- a/reagent/evaluation/world_model_evaluator.py +++ b/reagent/evaluation/world_model_evaluator.py @@ -4,14 +4,11 @@ from typing import Dict, List import torch -from reagent.models.mdn_rnn import transpose from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer from reagent.types import ( - ExtraData, PreprocessedFeatureVector, PreprocessedMemoryNetworkInput, PreprocessedStateAction, - PreprocessedTrainingBatch, ) @@ -25,7 +22,7 @@ def __init__(self, trainer: MDNRNNTrainer, state_dim: int) -> None: self.trainer = trainer self.state_dim = state_dim - def evaluate(self, tdp: PreprocessedTrainingBatch) -> Dict: + def evaluate(self, tdp: PreprocessedMemoryNetworkInput) -> Dict[str, float]: self.trainer.mdnrnn.mdnrnn.eval() losses = self.trainer.get_loss(tdp, state_dim=self.state_dim, batch_first=True) detached_losses = { @@ -65,22 +62,20 @@ def __init__( self.sorted_action_feature_start_indices = sorted_action_feature_start_indices self.sorted_state_feature_start_indices = sorted_state_feature_start_indices - def evaluate(self, tdp: PreprocessedTrainingBatch): + def evaluate(self, batch: PreprocessedMemoryNetworkInput): """ Calculate feature importance: setting each state/action feature to the mean value and observe loss increase. """ - assert isinstance(tdp.training_input, PreprocessedMemoryNetworkInput) - self.trainer.mdnrnn.mdnrnn.eval() - - state_features = tdp.training_input.state.float_features - action_features = tdp.training_input.action # type: ignore - batch_size, seq_len, state_dim = state_features.size() # type: ignore + self.trainer.memory_network.mdnrnn.eval() + state_features = batch.state.float_features + action_features = batch.action # type: ignore + seq_len, batch_size, state_dim = state_features.size() # type: ignore action_dim = action_features.size()[2] # type: ignore action_feature_num = self.action_feature_num state_feature_num = self.state_feature_num feature_importance = torch.zeros(action_feature_num + state_feature_num) - orig_losses = self.trainer.get_loss(tdp, state_dim=state_dim, batch_first=True) + orig_losses = self.trainer.get_loss(batch, state_dim=state_dim) orig_loss = orig_losses["loss"].cpu().detach().item() del orig_losses @@ -90,7 +85,7 @@ def evaluate(self, tdp: PreprocessedTrainingBatch): state_feature_boundaries = self.sorted_state_feature_start_indices + [state_dim] for i in range(action_feature_num): - action_features = tdp.training_input.action.reshape( # type: ignore + action_features = batch.action.reshape( # type: ignore (batch_size * seq_len, action_dim) ).data.clone() @@ -115,28 +110,24 @@ def evaluate(self, tdp: PreprocessedTrainingBatch): ) action_features = action_features.reshape( # type: ignore - (batch_size, seq_len, action_dim) + (seq_len, batch_size, action_dim) ) # type: ignore - new_tdp = PreprocessedTrainingBatch( - training_input=PreprocessedMemoryNetworkInput( # type: ignore - state=tdp.training_input.state, - action=action_features, - next_state=tdp.training_input.next_state, - reward=tdp.training_input.reward, - time_diff=torch.ones_like(tdp.training_input.reward).float(), - not_terminal=tdp.training_input.not_terminal, # type: ignore - step=None, - ), - extras=ExtraData(), - ) - losses = self.trainer.get_loss( - new_tdp, state_dim=state_dim, batch_first=True + + new_batch = PreprocessedMemoryNetworkInput( + state=batch.state, + action=action_features, + next_state=batch.next_state, + reward=batch.reward, + time_diff=torch.ones_like(batch.reward).float(), + not_terminal=batch.not_terminal, # type: ignore + step=None, ) + losses = self.trainer.get_loss(new_batch, state_dim=state_dim) feature_importance[i] = losses["loss"].cpu().detach().item() - orig_loss del losses for i in range(state_feature_num): - state_features = tdp.training_input.state.float_features.reshape( # type: ignore + state_features = batch.state.float_features.reshape( # type: ignore (batch_size * seq_len, state_dim) ).data.clone() boundary_start, boundary_end = ( @@ -149,29 +140,24 @@ def evaluate(self, tdp: PreprocessedTrainingBatch): state_features[:, boundary_start:boundary_end] # type: ignore ) state_features = state_features.reshape( # type: ignore - (batch_size, seq_len, state_dim) + (seq_len, batch_size, state_dim) ) # type: ignore - new_tdp = PreprocessedTrainingBatch( - training_input=PreprocessedMemoryNetworkInput( # type: ignore - state=PreprocessedFeatureVector(float_features=state_features), - action=tdp.training_input.action, # type: ignore - next_state=tdp.training_input.next_state, - reward=tdp.training_input.reward, - time_diff=torch.ones_like(tdp.training_input.reward).float(), - not_terminal=tdp.training_input.not_terminal, # type: ignore - step=None, - ), - extras=ExtraData(), - ) - losses = self.trainer.get_loss( - new_tdp, state_dim=state_dim, batch_first=True + new_batch = PreprocessedMemoryNetworkInput( + state=PreprocessedFeatureVector(float_features=state_features), + action=batch.action, # type: ignore + next_state=batch.next_state, + reward=batch.reward, + time_diff=torch.ones_like(batch.reward).float(), + not_terminal=batch.not_terminal, # type: ignore + step=None, ) + losses = self.trainer.get_loss(new_batch, state_dim=state_dim) feature_importance[i + action_feature_num] = ( losses["loss"].cpu().detach().item() - orig_loss ) del losses - self.trainer.mdnrnn.mdnrnn.train() + self.trainer.memory_network.mdnrnn.train() logger.info( "**** Debug tool feature importance ****: {}".format(feature_importance) ) @@ -207,44 +193,31 @@ def __init__( self.state_feature_num = state_feature_num self.sorted_state_feature_start_indices = sorted_state_feature_start_indices - def evaluate(self, tdp: PreprocessedTrainingBatch): + def evaluate(self, batch: PreprocessedMemoryNetworkInput): """ Calculate state feature sensitivity due to actions: randomly permutating actions and see how much the prediction of next state feature deviates. """ - mdnrnn_training_input = tdp.training_input - assert isinstance(mdnrnn_training_input, PreprocessedMemoryNetworkInput) + assert isinstance(batch, PreprocessedMemoryNetworkInput) - self.trainer.mdnrnn.mdnrnn.eval() + self.trainer.memory_network.mdnrnn.eval() - batch_size, seq_len, state_dim = ( - mdnrnn_training_input.next_state.float_features.size() - ) + seq_len, batch_size, state_dim = batch.next_state.float_features.size() state_feature_num = self.state_feature_num feature_sensitivity = torch.zeros(state_feature_num) - state, action, next_state, reward, not_terminal = transpose( - mdnrnn_training_input.state.float_features, - mdnrnn_training_input.action, - mdnrnn_training_input.next_state.float_features, - mdnrnn_training_input.reward, - mdnrnn_training_input.not_terminal, - ) - mdnrnn_input = PreprocessedStateAction( - state=PreprocessedFeatureVector(float_features=state), - action=PreprocessedFeatureVector(float_features=action), - ) - # the input of mdnrnn has seq-len as the first dimension - mdnrnn_output = self.trainer.mdnrnn(mdnrnn_input) + state = batch.state.float_features + action = batch.action + mdnrnn_input = PreprocessedStateAction.from_tensors(state, action) + + # the input of world_model has seq-len as the first dimension + mdnrnn_output = self.trainer.memory_network(mdnrnn_input) predicted_next_state_means = mdnrnn_output.mus - shuffled_mdnrnn_input = PreprocessedStateAction( - state=PreprocessedFeatureVector(float_features=state), - # shuffle the actions - action=PreprocessedFeatureVector( - float_features=action[:, torch.randperm(batch_size), :] - ), + # shuffle the actions + shuffled_mdnrnn_input = PreprocessedStateAction.from_tensors( + state, action[:, torch.randperm(batch_size), :] ) - shuffled_mdnrnn_output = self.trainer.mdnrnn(shuffled_mdnrnn_input) + shuffled_mdnrnn_output = self.trainer.memory_network(shuffled_mdnrnn_input) shuffled_predicted_next_state_means = shuffled_mdnrnn_output.mus assert ( @@ -274,7 +247,7 @@ def evaluate(self, tdp: PreprocessedTrainingBatch): ) feature_sensitivity[i] = abs_diff.cpu().detach().item() - self.trainer.mdnrnn.mdnrnn.train() + self.trainer.memory_network.mdnrnn.train() logger.info( "**** Debug tool feature sensitivity ****: {}".format(feature_sensitivity) ) diff --git a/reagent/gym/agents/post_step.py b/reagent/gym/agents/post_step.py index eb469128e..0dd053372 100644 --- a/reagent/gym/agents/post_step.py +++ b/reagent/gym/agents/post_step.py @@ -2,13 +2,14 @@ # Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -import inspect import logging -from typing import Any, Optional, Union +from typing import Optional, Union import gym +import numpy as np import reagent.types as rlt import torch +from reagent.gym.preprocessors import make_replay_buffer_trainer_preprocessor from reagent.gym.types import PostStep from reagent.replay_memory.circular_replay_buffer import ReplayBuffer from reagent.training.rl_dataset import RLDataset @@ -18,6 +19,30 @@ logger = logging.getLogger(__name__) +def add_replay_buffer_post_step(replay_buffer: ReplayBuffer): + """ + Simply add transitions to replay_buffer. + """ + + def post_step( + obs: np.ndarray, + actor_output: rlt.ActorOutput, + reward: float, + terminal: bool, + possible_actions_mask: Optional[torch.Tensor], + ) -> None: + action = actor_output.action.numpy() + log_prob = actor_output.log_prob.numpy() + if possible_actions_mask is None: + possible_actions_mask = torch.ones_like(actor_output.action).to(torch.bool) + possible_actions_mask = possible_actions_mask.numpy() + replay_buffer.add( + obs, action, reward, terminal, possible_actions_mask, log_prob.item() + ) + + return post_step + + def train_with_replay_buffer_post_step( replay_buffer: ReplayBuffer, trainer: Trainer, @@ -25,7 +50,7 @@ def train_with_replay_buffer_post_step( batch_size: int, replay_burnin: Optional[int] = None, trainer_preprocessor=None, - device: Optional[Union[str, torch.device]] = None, + device: Union[str, torch.device] = "cpu", ) -> PostStep: """ Called in post_step of agent to train based on replay buffer (RB). Args: @@ -36,7 +61,7 @@ def train_with_replay_buffer_post_step( replay_burnin: optional requirement for minimum size of RB before training begins. (i.e. burn in this many frames) """ - if device is not None and isinstance(device, str): + if isinstance(device, str): device = torch.device(device) _num_steps = 0 @@ -45,27 +70,10 @@ def train_with_replay_buffer_post_step( size_req = max(size_req, replay_burnin) if trainer_preprocessor is None: - sig = inspect.signature(trainer.train) - logger.info(f"Deriving trainer_preprocessor from {sig.parameters}") - # Assuming training_batch is in the first position (excluding self) - assert ( - list(sig.parameters.keys())[0] == "training_batch" - ), f"{sig.parameters} doesn't have training batch in first position." - training_batch_type = sig.parameters["training_batch"].annotation - assert training_batch_type != inspect.Parameter.empty - if not hasattr(training_batch_type, "from_replay_buffer"): - raise NotImplementedError( - f"{training_batch_type} does not implement from_replay_buffer" - ) - - def trainer_preprocessor(batch): - retval = training_batch_type.from_replay_buffer(batch) - if device is not None: - retval = retval.to(device) - return retval + trainer_preprocessor = make_replay_buffer_trainer_preprocessor(trainer, device) def post_step( - obs: Any, + obs: np.ndarray, actor_output: rlt.ActorOutput, reward: float, terminal: bool, @@ -98,7 +106,7 @@ def log_data_post_step(dataset: RLDataset, mdp_id: str, env: gym.Env) -> PostSte sequence_number = 0 def post_step( - obs: Any, + obs: np.ndarray, actor_output: rlt.ActorOutput, reward: float, terminal: bool, diff --git a/reagent/gym/envs/__init__.py b/reagent/gym/envs/__init__.py index 31b9c0e57..44e438285 100644 --- a/reagent/gym/envs/__init__.py +++ b/reagent/gym/envs/__init__.py @@ -1,7 +1,25 @@ #!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +from .dynamics.linear_dynamics import LinDynaEnv # noqa from .env_factory import EnvFactory +from .pomdp.pocman import PocManEnv # noqa +from .pomdp.string_game import StringGameEnv # noqa +from .utils import register_if_not_exists __all__ = ["EnvFactory"] + + +######### Register classes below ########## + +CUR_MODULE = "reagent.gym.envs" +ENV_CLASSES = [ + ("Pocman-v0", ".pomdp.pocman:PocManEnv"), + ("StringGame-v0", ".pomdp.string_game:StringGameEnv"), + ("LinearDynamics-v0", ".dynamics.linear_dynamics:LinDynaEnv"), +] + +for env_name, rel_module_path in ENV_CLASSES: + full_module_path = CUR_MODULE + rel_module_path + register_if_not_exists(id=env_name, entry_point=full_module_path) diff --git a/reagent/test/environment/linear_dynamics.py b/reagent/gym/envs/dynamics/linear_dynamics.py similarity index 95% rename from reagent/test/environment/linear_dynamics.py rename to reagent/gym/envs/dynamics/linear_dynamics.py index 08377b3f3..3c697ccae 100644 --- a/reagent/test/environment/linear_dynamics.py +++ b/reagent/gym/envs/dynamics/linear_dynamics.py @@ -67,8 +67,10 @@ def step(self, action): # add the negative sign because we actually want to maximize the rewards, while an LRQ solution minimizes # rewards by convention reward = -( - state.T.dot(self.Q).dot(state) + action.T.dot(self.R).dot(action) - ).squeeze() + ( + state.T.dot(self.Q).dot(state) + action.T.dot(self.R).dot(action) + ).squeeze() + ) self.step_cnt += 1 terminal = False if self.step_cnt >= self.max_steps: diff --git a/reagent/test/gym/pomdp/pocman.py b/reagent/gym/envs/pomdp/pocman.py similarity index 99% rename from reagent/test/gym/pomdp/pocman.py rename to reagent/gym/envs/pomdp/pocman.py index abad73010..561b0b7c6 100644 --- a/reagent/test/gym/pomdp/pocman.py +++ b/reagent/gym/envs/pomdp/pocman.py @@ -216,7 +216,7 @@ def __init__(self): self.observation_space = Box(low=0, high=1, shape=(STATE_DIM,)) self._reward_range = 100 self.step_cnt = 0 - self.max_step = self.board["_max_step"] + self._max_episode_steps = self.board["_max_step"] def seed(self, seed=None): np.random.seed(seed) diff --git a/reagent/gym/envs/pomdp/state_embed_env.py b/reagent/gym/envs/pomdp/state_embed_env.py new file mode 100644 index 000000000..9dff5b400 --- /dev/null +++ b/reagent/gym/envs/pomdp/state_embed_env.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +""" +This file shows an example of using embedded states to feed to RL models in +partially observable environments (POMDPs). Embedded states are generated by a world +model which learns how to encode past n observations into a low-dimension +vector.Embedded states improve performance in POMDPs compared to just using +one-step observations as states because they encode more historical information +than one-step observations. +""" +import logging +from collections import deque +from typing import Optional + +import gym +import numpy as np +import reagent.types as rlt +import torch +from gym import Env +from gym.spaces import Box +from reagent.models.world_model import MemoryNetwork + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class StateEmbedEnvironment(Env): + def __init__( + self, + gym_env: Env, + mdnrnn: MemoryNetwork, + max_embed_seq_len: int, + state_min_value: Optional[float] = None, + state_max_value: Optional[float] = None, + ): + self.env = gym_env + self.unwrapped.spec = self.env.unwrapped.spec + self.max_embed_seq_len = max_embed_seq_len + self.mdnrnn = mdnrnn + self.embed_size = self.mdnrnn.num_hiddens + self.raw_state_dim = self.env.observation_space.shape[0] # type: ignore + self.state_dim = self.embed_size + self.raw_state_dim + if isinstance(self.env.action_space, gym.spaces.Discrete): + self.is_discrete_action = True + self.action_dim = self.env.action_space.n + elif isinstance(self.env.action_space, gym.spaces.Box): + self.is_discrete_action = False + self.action_dim = self.env.action_space.shape[0] + + self.action_space = self.env.action_space + + # only need to set up if needed + if state_min_value is None or state_max_value is None: + state_min_value = np.min(gym_env.observation_space.low) + state_max_value = np.max(gym_env.observation_space.high) + + self.observation_space = Box( # type: ignore + low=state_min_value, high=state_max_value, shape=(self.state_dim,) + ) + + self.cur_raw_state = None + self.recent_states = deque([], maxlen=self.max_embed_seq_len) # type: ignore + self.recent_actions = deque([], maxlen=self.max_embed_seq_len) # type: ignore + + def seed(self, seed): + self.env.seed(seed) + + @torch.no_grad() + def embed_state(self, state): + """ Embed state after either reset() or step() """ + assert len(self.recent_states) == len(self.recent_actions) + old_mdnrnn_mode = self.mdnrnn.mdnrnn.training + self.mdnrnn.mdnrnn.eval() + + # Embed the state as the hidden layer's output + # until the previous step + current state + if len(self.recent_states) == 0: + mdnrnn_state = np.zeros((1, self.raw_state_dim)) + mdnrnn_action = np.zeros((1, self.action_dim)) + else: + mdnrnn_state = np.array(list(self.recent_states)) + mdnrnn_action = np.array(list(self.recent_actions)) + + mdnrnn_state = torch.tensor(mdnrnn_state, dtype=torch.float).unsqueeze(1) + mdnrnn_action = torch.tensor(mdnrnn_action, dtype=torch.float).unsqueeze(1) + mdnrnn_input = rlt.PreprocessedStateAction.from_tensors( + state=mdnrnn_state, action=mdnrnn_action + ) + mdnrnn_output = self.mdnrnn(mdnrnn_input) + hidden_embed = ( + mdnrnn_output.all_steps_lstm_hidden[-1].squeeze().detach().cpu().numpy() + ) + state_embed = np.hstack((hidden_embed, state)) + self.mdnrnn.mdnrnn.train(old_mdnrnn_mode) + logger.debug( + f"Embed_state\nrecent states: {np.array(self.recent_states)}\n" + f"recent actions: {np.array(self.recent_actions)}\n" + f"state_embed{state_embed}\n" + ) + return state_embed + + def reset(self): + next_raw_state = self.env.reset() + self.recent_states = deque([], maxlen=self.max_embed_seq_len) + self.recent_actions = deque([], maxlen=self.max_embed_seq_len) + self.cur_raw_state = next_raw_state + next_embed_state = self.embed_state(next_raw_state) + return next_embed_state + + def step(self, action): + if self.is_discrete_action: + action_np = np.zeros(self.action_dim) + action_np[action] = 1.0 + else: + action_np = action + self.recent_states.append(self.cur_raw_state) + self.recent_actions.append(action_np) + next_raw_state, reward, terminal, info = self.env.step(action) + logger.debug("action {}, reward {}\n".format(action, reward)) + self.cur_raw_state = next_raw_state + next_embed_state = self.embed_state(next_raw_state) + return next_embed_state, reward, terminal, info diff --git a/reagent/test/gym/pomdp/string_game.py b/reagent/gym/envs/pomdp/string_game.py similarity index 84% rename from reagent/test/gym/pomdp/string_game.py rename to reagent/gym/envs/pomdp/string_game.py index a6b05a033..c913bbbb2 100644 --- a/reagent/test/gym/pomdp/string_game.py +++ b/reagent/gym/envs/pomdp/string_game.py @@ -3,6 +3,14 @@ """ The agent can observe a character at one time. But the reward is given based on last n (n>1) steps' observation (a string). +In this environment, the agent can observe a character ("A", "B") at +each time step, but the reward it receives actually depends on past 3 steps: +if the agent observes "ABB" in the past 3 steps, it receives +5 reward; if the +agent observes "BBB", it receives -5 reward; otherwise, the agent receives 0. +The action is the next character the agent wants to reveal, and the next state +is exactly the action just taken (i.e., the transition function only depends on +the action). Each episode is limited to 6 steps. Therefore, the optimal policy +is to choose actions "ABBABB" in sequence which results to +10 reward. """ import itertools import logging @@ -27,7 +35,7 @@ class StringGameEnv(Env): def __init__(self): np.random.seed(123) torch.manual_seed(123) - self.max_step = MAX_STEP + self._max_episode_steps = MAX_STEP self.reward_map = {} self._init_reward() logger.debug(self.reward_map) diff --git a/reagent/gym/envs/utils.py b/reagent/gym/envs/utils.py new file mode 100644 index 000000000..e80e75365 --- /dev/null +++ b/reagent/gym/envs/utils.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. + +import logging + +from gym.envs.registration import register, registry + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def register_if_not_exists(id, entry_point): + """ + Preventing tests from failing trying to re-register environments + """ + if id not in registry.env_specs: + logging.info(f"Registering id={id}, entry_point={entry_point}.") + register(id=id, entry_point=entry_point) diff --git a/reagent/gym/policies/random_policies.py b/reagent/gym/policies/random_policies.py index 0f98f2cc8..a95502fbb 100644 --- a/reagent/gym/policies/random_policies.py +++ b/reagent/gym/policies/random_policies.py @@ -10,9 +10,15 @@ from reagent.gym.policies.policy import Policy -""" -TODO: remove explicit argument of possible_actions_mask since cts doesn't have. -""" +def make_random_policy_for_env(env: gym.Env): + if isinstance(env.action_space, gym.spaces.Discrete): + # discrete action space + return DiscreteRandomPolicy.create_for_env(env) + elif isinstance(env.action_space, gym.spaces.Box): + # continuous action space + return ContinuousRandomPolicy.create_for_env(env) + else: + raise NotImplementedError(f"{env.action_space} not supported") class DiscreteRandomPolicy(Policy): diff --git a/reagent/gym/policies/samplers/discrete_sampler.py b/reagent/gym/policies/samplers/discrete_sampler.py index eae97bbe8..e4ca6fb06 100644 --- a/reagent/gym/policies/samplers/discrete_sampler.py +++ b/reagent/gym/policies/samplers/discrete_sampler.py @@ -72,7 +72,7 @@ def sample_action( if possible_actions_mask is not None: assert scores.shape == possible_actions_mask.shape mod_scores = scores.clone().float() - mod_scores[~possible_actions_mask.bool()] = -float("inf") + mod_scores[~(possible_actions_mask.bool())] = -float("inf") else: mod_scores = scores raw_action = mod_scores.argmax(dim=1) diff --git a/reagent/gym/preprocessors/__init__.py b/reagent/gym/preprocessors/__init__.py index 9f1be0100..b297aefce 100644 --- a/reagent/gym/preprocessors/__init__.py +++ b/reagent/gym/preprocessors/__init__.py @@ -9,6 +9,7 @@ make_default_serving_action_extractor, make_default_serving_obs_preprocessor, ) +from .trainer_preprocessor import make_replay_buffer_trainer_preprocessor __all__ = [ @@ -16,4 +17,5 @@ "make_default_obs_preprocessor", "make_default_serving_obs_preprocessor", "make_default_serving_action_extractor", + "make_replay_buffer_trainer_preprocessor", ] diff --git a/reagent/gym/preprocessors/trainer_preprocessor.py b/reagent/gym/preprocessors/trainer_preprocessor.py new file mode 100644 index 000000000..3d165be40 --- /dev/null +++ b/reagent/gym/preprocessors/trainer_preprocessor.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. + +""" Get default preprocessors for training time. """ + +import inspect +import logging + +import torch +from reagent.training.trainer import Trainer + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +def make_replay_buffer_trainer_preprocessor(trainer: Trainer, device: torch.device): + sig = inspect.signature(trainer.train) + logger.info(f"Deriving trainer_preprocessor from {sig.parameters}") + # Assuming training_batch is in the first position (excluding self) + assert ( + list(sig.parameters.keys())[0] == "training_batch" + ), f"{sig.parameters} doesn't have training batch in first position." + training_batch_type = sig.parameters["training_batch"].annotation + assert training_batch_type != inspect.Parameter.empty + if not hasattr(training_batch_type, "from_replay_buffer"): + raise NotImplementedError( + f"{training_batch_type} does not implement from_replay_buffer" + ) + + def trainer_preprocessor(batch): + retval = training_batch_type.from_replay_buffer(batch) + return retval.to(device) + + return trainer_preprocessor diff --git a/reagent/gym/tests/configs/world_model/cartpole_features.yaml b/reagent/gym/tests/configs/world_model/cartpole_features.yaml new file mode 100644 index 000000000..4c97e6ff7 --- /dev/null +++ b/reagent/gym/tests/configs/world_model/cartpole_features.yaml @@ -0,0 +1,20 @@ +env: CartPole-v0 +model: + WorldModel: + trainer_param: + state_dim: 4 + action_dim: 2 + hidden_size: 50 + num_hidden_layers: 2 + learning_rate: 0.005 + not_terminal_loss_weight: 1 + next_state_loss_weight: 1 + reward_loss_weight: 1 + num_gaussians: 1 +num_train_episodes: 500 +num_test_episodes: 30 +seq_len: 1 +batch_size: 1024 +num_train_epochs: 10 +use_gpu: false +saved_mdnrnn_path: null diff --git a/reagent/gym/tests/configs/world_model/discrete_dqn_string.yaml b/reagent/gym/tests/configs/world_model/discrete_dqn_string.yaml new file mode 100644 index 000000000..1a5f80bff --- /dev/null +++ b/reagent/gym/tests/configs/world_model/discrete_dqn_string.yaml @@ -0,0 +1,55 @@ +env: StringGame-v0 + +# for training embedding model +embedding_model: + WorldModel: + trainer_param: + state_dim: 2 + action_dim: 2 + hidden_size: 20 + num_hidden_layers: 2 + learning_rate: 0.001 + not_terminal_loss_weight: 0 + next_state_loss_weight: 0 + reward_loss_weight: 1 + num_gaussians: 1 +seq_len: 3 +batch_size: 1024 +num_embedding_train_episodes: 4000 +num_embedding_train_epochs: 15 +saved_mdnrnn_path: null + +# for training agent +num_state_embed_episodes: 1000 +train_model: + DiscreteDQN: + trainer_param: + actions: + - 0 + - 1 + rl: + gamma: 0.99 + target_update_rate: 0.1 + maxq_learning: true + q_network_loss: mse + double_q_learning: true + minibatch_size: 1024 + minibatches_per_step: 1 + optimizer: + optimizer: ADAM + learning_rate: 0.001 + evaluation: + calc_cpe_in_training: false + net_builder: + FullyConnected: + sizes: + - 128 + - 64 + activations: + - leaky_relu + - leaky_relu +num_agent_train_epochs: 100 +num_agent_eval_epochs: 10 +use_gpu: false +# highest score, which requires history insight, is 10.0 +passing_score_bar: 10.0 diff --git a/reagent/gym/tests/test_gym.py b/reagent/gym/tests/test_gym.py index e7c0c4280..1aad4db75 100644 --- a/reagent/gym/tests/test_gym.py +++ b/reagent/gym/tests/test_gym.py @@ -3,15 +3,12 @@ import logging import os import pprint -import random import unittest from typing import Optional, Tuple -import gym import numpy as np import torch from parameterized import parameterized -from reagent.core.configuration import make_config_class from reagent.gym.agents.agent import Agent from reagent.gym.agents.post_step import train_with_replay_buffer_post_step from reagent.gym.envs.env_factory import EnvFactory @@ -20,70 +17,58 @@ make_default_serving_obs_preprocessor, ) from reagent.gym.runners.gymrunner import run_episode -from reagent.parameters import NormalizationData, NormalizationKey +from reagent.gym.tests.utils import build_normalizer from reagent.replay_memory.circular_replay_buffer import ReplayBuffer -from reagent.tensorboardX import SummaryWriterContext -from reagent.test.base.utils import ( - only_continuous_action_normalizer, - only_continuous_normalizer, -) +from reagent.test.base.horizon_test_base import HorizonTestBase from reagent.workflow.model_managers.union import ModelManager__Union from reagent.workflow.types import RewardOptions -from ruamel.yaml import YAML +# for seeding the environment +SEED = 0 logger = logging.getLogger(__name__) -curr_dir = os.path.dirname(__file__) +logger.setLevel(logging.INFO) -SEED = 0 +""" +Put on-policy gym tests here in the format (test name, path to yaml config). +Format path to be: "configs//__online.yaml." +NOTE: These tests should ideally finish quickly (within 10 minutes) since they are +unit tests which are run many times. +""" +GYM_TESTS = [ + ("Discrete Dqn Cartpole", "configs/cartpole/discrete_dqn_cartpole_online.yaml"), + ( + "Discrete Dqn Open Gridworld", + "configs/open_gridworld/discrete_dqn_open_gridworld.yaml", + ), + ("SAC Pendulum", "configs/pendulum/sac_pendulum_online.yaml"), + ("TD3 Pendulum", "configs/pendulum/td3_pendulum_online.yaml"), +] -def build_state_normalizer(env): - if isinstance(env.observation_space, gym.spaces.Box): - assert ( - len(env.observation_space.shape) == 1 - ), f"{env.observation_space.shape} has dim > 1, and is not supported." - return NormalizationData( - dense_normalization_parameters=only_continuous_normalizer( - list(range(env.observation_space.shape[0])), - env.observation_space.low, - env.observation_space.high, - ) - ) - elif isinstance(env.observation_space, gym.spaces.Dict): - # assuming env.observation_space is image - return None - else: - raise NotImplementedError(f"{env.observation_space} not supported") - - -def build_action_normalizer(env): - action_space = env.action_space - if isinstance(action_space, gym.spaces.Discrete): - return only_continuous_normalizer( - list(range(action_space.n)), min_value=0, max_value=1 - ) - elif isinstance(action_space, gym.spaces.Box): - assert action_space.shape == ( - 1, - ), f"Box action shape {action_space.shape} not supported." - - return NormalizationData( - dense_normalization_parameters=only_continuous_action_normalizer( - [0], - min_value=action_space.low.item(), - max_value=action_space.high.item(), - ) - ) - else: - raise NotImplementedError(f"{action_space} not supported.") +curr_dir = os.path.dirname(__file__) + + +class TestGym(HorizonTestBase): + @parameterized.expand(GYM_TESTS) + def test_gym_cpu(self, name: str, config_path: str): + self.run_from_config( + run_test=run_test, + config_path=os.path.join(curr_dir, config_path), + use_gpu=False, + ) + logger.info(f"{name} passes!") -def build_normalizer(env): - return { - NormalizationKey.STATE: build_state_normalizer(env), - NormalizationKey.ACTION: build_action_normalizer(env), - } + @parameterized.expand(GYM_TESTS) + @unittest.skipIf(not torch.cuda.is_available(), "CUDA not available") + def test_gym_gpu(self, name: str, config_path: str): + self.run_from_config( + run_test=run_test, + config_path=os.path.join(curr_dir, config_path), + use_gpu=True, + ) + logger.info(f"{name} passes!") def run_test( @@ -172,55 +157,5 @@ def run_test( logger.info(eval_rewards) -def run_from_config(path, use_gpu): - yaml = YAML(typ="safe") - with open(path, "r") as f: - config_dict = yaml.load(f.read()) - config_dict["use_gpu"] = use_gpu - - @make_config_class(run_test) - class ConfigClass: - pass - - config = ConfigClass(**config_dict) - return run_test(**config.asdict()) - - -GYM_TESTS = [ - ("Discrete Dqn Cartpole", "configs/cartpole/discrete_dqn_cartpole_online.yaml"), - ( - "Discrete Dqn Open Gridworld", - "configs/open_gridworld/discrete_dqn_open_gridworld.yaml", - ), - ("SAC Pendulum", "configs/pendulum/sac_pendulum_online.yaml"), - ("TD3 Pendulum", "configs/pendulum/td3_pendulum_online.yaml"), -] - - -class TestGym(unittest.TestCase): - """ - Environments that require short training time (<=10min) can be tested here. - """ - - def setUp(self): - SummaryWriterContext._reset_globals() - logging.basicConfig(level=logging.INFO) - logger.setLevel(logging.INFO) - np.random.seed(SEED) - torch.manual_seed(SEED) - random.seed(SEED) - - @parameterized.expand(GYM_TESTS) - def test_gym_cpu(self, name: str, config_path: str): - run_from_config(os.path.join(curr_dir, config_path), False) - logger.info(f"{name} passes!") - - @parameterized.expand(GYM_TESTS) - @unittest.skipIf(not torch.cuda.is_available(), "CUDA not available") - def test_gym_gpu(self, name: str, config_path: str): - run_from_config(os.path.join(curr_dir, config_path), True) - logger.info(f"{name} passes!") - - if __name__ == "__main__": unittest.main() diff --git a/reagent/test/environment/test_linear_dynamics.py b/reagent/gym/tests/test_linear_dynamics.py similarity index 96% rename from reagent/test/environment/test_linear_dynamics.py rename to reagent/gym/tests/test_linear_dynamics.py index a8adf2d63..de3b7cd8f 100644 --- a/reagent/test/environment/test_linear_dynamics.py +++ b/reagent/gym/tests/test_linear_dynamics.py @@ -5,9 +5,9 @@ import time import unittest -import gym import numpy as np import scipy.linalg as linalg +from reagent.gym.envs.env_factory import EnvFactory logger = logging.getLogger(__name__) @@ -22,7 +22,7 @@ def test_random_vs_lqr(self): Test random actions vs. a LQR controller. LQR controller should perform much better than random actions in the linear dynamics environment. """ - env = gym.make("LinearDynamics-v0") + env = EnvFactory.make("LinearDynamics-v0") num_test_episodes = 500 def random_policy(env, state): diff --git a/reagent/test/environment/test_pomdp.py b/reagent/gym/tests/test_pomdp.py similarity index 89% rename from reagent/test/environment/test_pomdp.py rename to reagent/gym/tests/test_pomdp.py index b632739c0..f238befee 100644 --- a/reagent/test/environment/test_pomdp.py +++ b/reagent/gym/tests/test_pomdp.py @@ -6,8 +6,7 @@ import unittest import numpy as np -from reagent.test.gym.pomdp.pocman import PocManEnv -from reagent.test.gym.pomdp.string_game import StringGameEnv +from reagent.gym.envs.env_factory import EnvFactory logger = logging.getLogger(__name__) @@ -18,13 +17,13 @@ def setUp(self): logging.getLogger().setLevel(logging.DEBUG) def test_string_game(self): - env = StringGameEnv() + env = EnvFactory.make("StringGame-v0") env.seed(313) mean_acc_reward = self._test_env(env) assert 0.1 >= mean_acc_reward def test_pocman(self): - env = PocManEnv() + env = EnvFactory.make("Pocman-v0") env.seed(313) mean_acc_reward = self._test_env(env) assert -80 <= mean_acc_reward <= -70 @@ -37,7 +36,7 @@ def _test_env(self, env): start_time = time.time() env.reset() acc_rw = 0 - for i in range(env.max_step): + for i in range(env._max_episode_steps): env.print_internal_state() action = env.random_action() ob, rw, done, info = env.step(action) diff --git a/reagent/gym/tests/test_world_model.py b/reagent/gym/tests/test_world_model.py new file mode 100644 index 000000000..96fdec030 --- /dev/null +++ b/reagent/gym/tests/test_world_model.py @@ -0,0 +1,427 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +import logging +import os +import unittest +from typing import Dict, List + +import gym +import numpy as np +import reagent.types as rlt +import torch +from reagent.evaluation.world_model_evaluator import ( + FeatureImportanceEvaluator, + FeatureSensitivityEvaluator, +) +from reagent.gym.agents.agent import Agent +from reagent.gym.agents.post_step import add_replay_buffer_post_step +from reagent.gym.envs.env_factory import EnvFactory +from reagent.gym.envs.pomdp.state_embed_env import StateEmbedEnvironment +from reagent.gym.policies.random_policies import make_random_policy_for_env +from reagent.gym.preprocessors import make_replay_buffer_trainer_preprocessor +from reagent.gym.runners.gymrunner import run_episode +from reagent.gym.tests.utils import build_normalizer +from reagent.models.world_model import MemoryNetwork +from reagent.replay_memory.circular_replay_buffer import ReplayBuffer +from reagent.test.base.horizon_test_base import HorizonTestBase +from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer +from reagent.workflow.model_managers.union import ModelManager__Union +from reagent.workflow.types import RewardOptions +from tqdm import tqdm + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +curr_dir = os.path.dirname(__file__) + +SEED = 0 + + +def print_mdnrnn_losses(epoch, batch_num, losses): + logger.info( + f"Printing loss for Epoch {epoch}, Batch {batch_num};\n" + f"loss={losses['loss']}, bce={losses['bce']}," + f"gmm={losses['gmm']}, mse={losses['mse']} \n" + ) + + +def create_rb(env, batch_size, seq_len, desired_size): + rb = ReplayBuffer.create_from_env( + env=env, + replay_memory_size=desired_size, + batch_size=batch_size, + stack_size=seq_len, + # we want sequence of actions too for WorldModel + return_everything_as_stack=True, + ) + random_policy = make_random_policy_for_env(env) + post_step = add_replay_buffer_post_step(rb) + agent = Agent.create_for_env( + env, policy=random_policy, post_transition_callback=post_step + ) + while not rb.is_full(): + run_episode(env, agent) + return rb + + +def calculate_feature_importance( + env: gym.Env, + trainer: MDNRNNTrainer, + use_gpu: bool, + test_batch: rlt.PreprocessedMemoryNetworkInput, +): + assert isinstance(env.action_space, gym.spaces.Discrete) + assert isinstance(env.observation_space, gym.spaces.Box) + assert len(env.observation_space.shape) == 1 + state_dim = env.observation_space.shape[0] + action_dim = env.action_space.n + + feature_importance_evaluator = FeatureImportanceEvaluator( + trainer, + discrete_action=True, + state_feature_num=state_dim, + action_feature_num=action_dim, + sorted_state_feature_start_indices=list(range(state_dim)), + sorted_action_feature_start_indices=list(range(action_dim)), + ) + feature_loss_vector = feature_importance_evaluator.evaluate(test_batch)[ + "feature_loss_increase" + ] + feature_importance_map = {} + for i in range(action_dim): + print( + "action {}, feature importance: {}".format(i, feature_loss_vector[i].item()) + ) + feature_importance_map[f"action{i}"] = feature_loss_vector[i].item() + for i in range(state_dim): + print( + "state {}, feature importance: {}".format( + i, feature_loss_vector[i + action_dim].item() + ) + ) + feature_importance_map[f"state{i}"] = feature_loss_vector[i + action_dim].item() + return feature_importance_map + + +def calculate_feature_sensitivity( + env: gym.Env, + trainer: MDNRNNTrainer, + use_gpu: bool, + test_batch: rlt.PreprocessedMemoryNetworkInput, +): + assert isinstance(env.action_space, gym.spaces.Discrete) + assert isinstance(env.observation_space, gym.spaces.Box) + assert len(env.observation_space.shape) == 1 + state_dim = env.observation_space.shape[0] + feature_sensitivity_evaluator = FeatureSensitivityEvaluator( + trainer, + state_feature_num=state_dim, + sorted_state_feature_start_indices=list(range(state_dim)), + ) + feature_sensitivity_vector = feature_sensitivity_evaluator.evaluate(test_batch)[ + "feature_sensitivity" + ] + feature_sensitivity_map = {} + for i in range(state_dim): + feature_sensitivity_map["state" + str(i)] = feature_sensitivity_vector[i].item() + print( + "state {}, feature sensitivity: {}".format( + i, feature_sensitivity_vector[i].item() + ) + ) + return feature_sensitivity_map + + +def train_mdnrnn( + env: gym.Env, + trainer: MDNRNNTrainer, + trainer_preprocessor, + num_train_episodes: int, + seq_len: int, + batch_size: int, + num_train_epochs: int, + # for optional validation + test_replay_buffer=None, +): + train_replay_buffer = create_rb( + env, batch_size, seq_len, num_train_episodes * env._max_episode_steps + ) + num_batch_per_epoch = train_replay_buffer.size // batch_size + logger.info("Made RBs, starting to train now!") + for epoch in range(num_train_epochs): + for i in range(num_batch_per_epoch): + batch = train_replay_buffer.sample_transition_batch_tensor( + batch_size=batch_size + ) + preprocessed_batch = trainer_preprocessor(batch) + losses = trainer.train(preprocessed_batch) + print_mdnrnn_losses(epoch, i, losses) + + # validation + if test_replay_buffer is not None: + with torch.no_grad(): + trainer.memory_network.mdnrnn.eval() + test_batch = test_replay_buffer.sample_transition_batch_tensor( + batch_size=batch_size + ) + preprocessed_test_batch = trainer_preprocessor(test_batch) + valid_losses = trainer.get_loss(preprocessed_test_batch) + print_mdnrnn_losses(epoch, "validation", valid_losses) + trainer.memory_network.mdnrnn.train() + return trainer + + +def train_mdnrnn_and_compute_feature_stats( + env: str, + model: ModelManager__Union, + num_train_episodes: int, + num_test_episodes: int, + seq_len: int, + batch_size: int, + num_train_epochs: int, + use_gpu: bool, + saved_mdnrnn_path: str = None, +): + """ Train MDNRNN Memory Network and compute feature importance/sensitivity. """ + env = EnvFactory.make(env) + env.seed(SEED) + + manager = model.value + trainer = manager.initialize_trainer( + use_gpu=use_gpu, + reward_options=RewardOptions(), + normalization_data_map=build_normalizer(env), + ) + + device = "cuda" if use_gpu else "cpu" + trainer_preprocessor = make_replay_buffer_trainer_preprocessor(trainer, device) + test_replay_buffer = create_rb( + env, batch_size, seq_len, num_test_episodes * env._max_episode_steps + ) + + if saved_mdnrnn_path is None: + # train from scratch + trainer = train_mdnrnn( + env=env, + trainer=trainer, + trainer_preprocessor=trainer_preprocessor, + num_train_episodes=num_train_episodes, + seq_len=seq_len, + batch_size=batch_size, + num_train_epochs=num_train_epochs, + test_replay_buffer=test_replay_buffer, + ) + else: + # load a pretrained model, and just evaluate it + trainer.memory_network.mdnrnn.load_state_dict(torch.load(saved_mdnrnn_path)) + + with torch.no_grad(): + trainer.memory_network.mdnrnn.eval() + test_batch = test_replay_buffer.sample_transition_batch_tensor( + batch_size=test_replay_buffer.size + ) + preprocessed_test_batch = trainer_preprocessor(test_batch) + feature_importance = calculate_feature_importance( + env=env, + trainer=trainer, + use_gpu=use_gpu, + test_batch=preprocessed_test_batch, + ) + + feature_sensitivity = calculate_feature_sensitivity( + env=env, + trainer=trainer, + use_gpu=use_gpu, + test_batch=preprocessed_test_batch, + ) + + trainer.memory_network.mdnrnn.train() + return feature_importance, feature_sensitivity + + +def create_embed_rl_dataset( + env: gym.Env, + memory_network: MemoryNetwork, + num_state_embed_episodes: int, + batch_size: int, + seq_len: int, + hidden_dim: int, + use_gpu: bool, +): + assert isinstance(env.action_space, gym.spaces.Discrete) + assert isinstance(env.observation_space, gym.spaces.Box) + assert len(env.observation_space.shape) == 1 + logger.info("Starting to create embedded RL Dataset!") + + # seqlen+1 because MDNRNN embeds the first seq_len steps and then + # the embedded state will be concatenated with the last step + # Ie.. (o1,o2,...,on) -> RNN -> h1,h2,...,hn + # and we set s_{n+1} = [o_{n+1}, h_n] + embed_env = StateEmbedEnvironment( + gym_env=env, mdnrnn=memory_network, max_embed_seq_len=seq_len + 1 + ) + # now create a filled replay buffer of embeddings + # new obs shape dim = state_dim + hidden_dim + embed_rb_capacity = num_state_embed_episodes * env._max_episode_steps + + embed_rb = ReplayBuffer.create_from_env( + env=embed_env, + replay_memory_size=embed_rb_capacity, + batch_size=batch_size, + stack_size=1, + ) + + device = "cuda" if use_gpu else "cpu" + policy = make_random_policy_for_env(embed_env) + post_step = add_replay_buffer_post_step(embed_rb) + agent = Agent.create_for_env( + env=embed_env, policy=policy, post_transition_callback=post_step, device=device + ) + + # only an approximation, since episodes may not run to max steps + with tqdm(total=num_state_embed_episodes, desc="approx filling embed_rb") as pbar: + while not embed_rb.is_full(): + run_episode(env=embed_env, agent=agent) + pbar.update() + + batch = embed_rb.sample_transition_batch_tensor(batch_size=embed_rb_capacity) + state_min = min(batch.state.min(), batch.next_state.min()).item() + state_max = max(batch.state.max(), batch.next_state.max()).item() + logger.info( + f"Finished making embed dataset with size {embed_rb.size}, " + f"min {state_min}, max {state_max}" + ) + return embed_rb, state_min, state_max + + +def train_mdnrnn_and_train_on_embedded_env( + env: str, + embedding_model: ModelManager__Union, + num_embedding_train_episodes: int, + seq_len: int, + batch_size: int, + num_embedding_train_epochs: int, + train_model: ModelManager__Union, + num_state_embed_episodes: int, + num_agent_train_epochs: int, + num_agent_eval_epochs: int, + use_gpu: bool, + passing_score_bar: float, + saved_mdnrnn_path: str = None, +): + """ Train an agent on embedded states by the MDNRNN. """ + env = EnvFactory.make(env) + env.seed(SEED) + + embedding_manager = embedding_model.value + embedding_trainer = embedding_manager.initialize_trainer( + use_gpu=use_gpu, + reward_options=RewardOptions(), + normalization_data_map=build_normalizer(env), + ) + + device = "cuda" if use_gpu else "cpu" + embedding_trainer_preprocessor = make_replay_buffer_trainer_preprocessor( + embedding_trainer, device + ) + if saved_mdnrnn_path is None: + # train from scratch + embedding_trainer = train_mdnrnn( + env=env, + trainer=embedding_trainer, + trainer_preprocessor=embedding_trainer_preprocessor, + num_train_episodes=num_embedding_train_episodes, + seq_len=seq_len, + batch_size=batch_size, + num_train_epochs=num_embedding_train_epochs, + ) + else: + # load a pretrained model, and just evaluate it + embedding_trainer.memory_network.mdnrnn.load_state_dict( + torch.load(saved_mdnrnn_path) + ) + + # create embedding dataset + embed_rb, state_min, state_max = create_embed_rl_dataset( + env=env, + memory_network=embedding_trainer.memory_network, + num_state_embed_episodes=num_state_embed_episodes, + batch_size=batch_size, + seq_len=seq_len, + hidden_dim=embedding_trainer.params.hidden_size, + use_gpu=use_gpu, + ) + embed_env = StateEmbedEnvironment( + gym_env=env, + mdnrnn=embedding_trainer.memory_network, + max_embed_seq_len=seq_len, + state_min_value=state_min, + state_max_value=state_max, + ) + agent_manager = train_model.value + agent_trainer = agent_manager.initialize_trainer( + use_gpu=use_gpu, + reward_options=RewardOptions(), + normalization_data_map=build_normalizer(embed_env), + ) + device = "cuda" if use_gpu else "cpu" + agent_trainer_preprocessor = make_replay_buffer_trainer_preprocessor( + agent_trainer, device + ) + num_batch_per_epoch = embed_rb.size // batch_size + for epoch in range(num_agent_train_epochs): + for _ in tqdm(range(num_batch_per_epoch), desc=f"epoch {epoch}"): + batch = embed_rb.sample_transition_batch_tensor(batch_size=batch_size) + preprocessed_batch = agent_trainer_preprocessor(batch) + agent_trainer.train(preprocessed_batch) + + # evaluate model + rewards = [] + policy = agent_manager.create_policy(serving=False) + agent = Agent.create_for_env(embed_env, policy=policy, device=device) + for i in range(num_agent_eval_epochs): + ep_reward = run_episode(env=embed_env, agent=agent) + rewards.append(ep_reward) + logger.info(f"Finished eval episode {i} with reward {ep_reward}.") + logger.info(f"Average eval reward is {np.mean(rewards)}.") + assert ( + np.mean(rewards) >= passing_score_bar + ), f"average reward doesn't pass our bar {passing_score_bar}" + return rewards + + +class TestWorldModel(HorizonTestBase): + @staticmethod + def verify_result(result_dict: Dict[str, float], expected_top_features: List[str]): + top_feature = max(result_dict, key=result_dict.get) + assert ( + top_feature in expected_top_features + ), f"top_feature: {top_feature}, expected_top_features: {expected_top_features}" + + def test_mdnrnn(self): + """ Test MDNRNN feature importance and feature sensitivity. """ + config_path = "configs/world_model/cartpole_features.yaml" + feature_importance, feature_sensitivity = self.run_from_config( + run_test=train_mdnrnn_and_compute_feature_stats, + config_path=os.path.join(curr_dir, config_path), + use_gpu=False, + ) + TestWorldModel.verify_result(feature_importance, ["state3"]) + TestWorldModel.verify_result(feature_sensitivity, ["state3"]) + logger.info("MDNRNN feature test passes!") + + def test_world_model(self): + """ Train DQN on POMDP given features from world model. """ + config_path = "configs/world_model/discrete_dqn_string.yaml" + HorizonTestBase.run_from_config( + run_test=train_mdnrnn_and_train_on_embedded_env, + config_path=os.path.join(curr_dir, config_path), + use_gpu=False, + ) + logger.info("World model test passes!") + + +if __name__ == "__main__": + unittest.main() diff --git a/reagent/gym/tests/utils.py b/reagent/gym/tests/utils.py new file mode 100644 index 000000000..3654ca6db --- /dev/null +++ b/reagent/gym/tests/utils.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. + + +from gym import spaces +from reagent.parameters import NormalizationData, NormalizationKey +from reagent.test.base.utils import ( + only_continuous_action_normalizer, + only_continuous_normalizer, +) + + +def build_state_normalizer(env): + if isinstance(env.observation_space, spaces.Box): + assert ( + len(env.observation_space.shape) == 1 + ), f"{env.observation_space.shape} has dim > 1, and is not supported." + return NormalizationData( + dense_normalization_parameters=only_continuous_normalizer( + list(range(env.observation_space.shape[0])), + env.observation_space.low, + env.observation_space.high, + ) + ) + elif isinstance(env.observation_space, spaces.Dict): + # assuming env.observation_space is image + return None + else: + raise NotImplementedError(f"{env.observation_space} not supported") + + +def build_action_normalizer(env): + action_space = env.action_space + if isinstance(action_space, spaces.Discrete): + return only_continuous_normalizer( + list(range(action_space.n)), min_value=0, max_value=1 + ) + elif isinstance(action_space, spaces.Box): + assert action_space.shape == ( + 1, + ), f"Box action shape {action_space.shape} not supported." + + return NormalizationData( + dense_normalization_parameters=only_continuous_action_normalizer( + [0], + min_value=action_space.low.item(), + max_value=action_space.high.item(), + ) + ) + else: + raise NotImplementedError(f"{action_space} not supported.") + + +def build_normalizer(env): + return { + NormalizationKey.STATE: build_state_normalizer(env), + NormalizationKey.ACTION: build_action_normalizer(env), + } diff --git a/reagent/models/mdn_rnn.py b/reagent/models/mdn_rnn.py index a8bdaa65c..99adb2d75 100644 --- a/reagent/models/mdn_rnn.py +++ b/reagent/models/mdn_rnn.py @@ -16,7 +16,9 @@ logger = logging.getLogger(__name__) -class _MDNRNNBase(nn.Module): +class MDNRNN(nn.Module): + """ Mixture Density Network - Recurrent Neural Network """ + def __init__( self, state_dim, action_dim, num_hiddens, num_hidden_layers, num_gaussians ): @@ -25,8 +27,13 @@ def __init__( self.action_dim = action_dim self.num_hiddens = num_hiddens self.num_hidden_layers = num_hidden_layers - self.num_gaussians = num_gaussians + self.rnn = nn.LSTM( + input_size=state_dim + action_dim, + hidden_size=num_hiddens, + num_layers=num_hidden_layers, + ) + self.num_gaussians = num_gaussians # outputs: # 1. mu, sigma, and pi for each gaussian # 2. non-terminal signal @@ -35,25 +42,6 @@ def __init__( num_hiddens, (2 * state_dim + 1) * num_gaussians + 2 ) - def forward(self, *inputs): - pass - - -class MDNRNN(_MDNRNNBase): - """ Mixture Density Network - Recurrent Neural Network """ - - def __init__( - self, state_dim, action_dim, num_hiddens, num_hidden_layers, num_gaussians - ): - super().__init__( - state_dim, action_dim, num_hiddens, num_hidden_layers, num_gaussians - ) - self.rnn = nn.LSTM( - input_size=state_dim + action_dim, - hidden_size=num_hiddens, - num_layers=num_hidden_layers, - ) - def forward(self, actions, states, hidden=None): """ Forward pass of MDN-RNN @@ -139,15 +127,13 @@ def deque_sample(self, indices): yield s.state, s.action, s.next_state, s.reward, s.not_terminal def sample_memories( - self, batch_size, use_gpu=False, batch_first=False - ) -> rlt.PreprocessedTrainingBatch: + self, batch_size, use_gpu=False + ) -> rlt.PreprocessedMemoryNetworkInput: """ :param batch_size: number of samples to return :param use_gpu: whether to put samples on gpu - :param batch_first: If True, the first dimension of data is batch_size. - If False (default), the first dimension is SEQ_LEN. Therefore, - state's shape is SEQ_LEN x BATCH_SIZE x STATE_DIM, for example. By default, - MDN-RNN consumes data with SEQ_LEN as the first dimension. + State's shape is SEQ_LEN x BATCH_SIZE x STATE_DIM, for example. + By default, MDN-RNN consumes data with SEQ_LEN as the first dimension. """ sample_indices = np.random.randint(self.memory_size, size=batch_size) device = ( @@ -161,10 +147,10 @@ def sample_memories( zip(*self.deque_sample(sample_indices)), ) - if not batch_first: - state, action, next_state, reward, not_terminal = transpose( - state, action, next_state, reward, not_terminal - ) + # make shapes seq_len x batch_size x feature_dim + state, action, next_state, reward, not_terminal = transpose( + state, action, next_state, reward, not_terminal + ) training_input = rlt.PreprocessedMemoryNetworkInput( state=rlt.PreprocessedFeatureVector(float_features=state), @@ -175,7 +161,7 @@ def sample_memories( not_terminal=not_terminal, step=None, ) - return rlt.PreprocessedTrainingBatch(training_input=training_input) + return training_input def insert_into_memory(self, state, action, next_state, reward, not_terminal): self.replay_memory.append( @@ -245,5 +231,5 @@ def gmm_loss(batch, mus, sigmas, logpi, reduce=True): log_prob = max_log_probs.squeeze() + torch.log(probs) if reduce: - return -torch.mean(log_prob) + return -(torch.mean(log_prob)) return -log_prob diff --git a/reagent/models/world_model.py b/reagent/models/world_model.py index 42611182b..8b797ade3 100644 --- a/reagent/models/world_model.py +++ b/reagent/models/world_model.py @@ -39,7 +39,7 @@ def input_prototype(self): ), ) - def forward(self, input): + def forward(self, input: rlt.PreprocessedStateAction): ( mus, sigmas, diff --git a/reagent/parameters.py b/reagent/parameters.py index 54f4b287b..c5d27f0fa 100644 --- a/reagent/parameters.py +++ b/reagent/parameters.py @@ -56,6 +56,40 @@ class RainbowDQNParameters(BaseDataClass): quantile: bool = False +@dataclass(frozen=True) +class MDNRNNTrainerParameters(BaseDataClass): + __hash__ = param_hash + + hidden_size: int = 64 + num_hidden_layers: int = 2 + learning_rate: float = 0.001 + num_gaussians: int = 5 + train_data_percentage: float = 60.0 + validation_data_percentage: float = 20.0 + test_data_percentage: float = 20.0 + # weight in calculating world-model loss + reward_loss_weight: float = 1.0 + next_state_loss_weight: float = 1.0 + not_terminal_loss_weight: float = 1.0 + fit_only_one_next_step: bool = False + + +@dataclass(frozen=True) +class CEMTrainerParameters: + __hash__ = param_hash + + plan_horizon_length: int = 0 + num_world_models: int = 0 + cem_population_size: int = 0 + cem_num_iterations: int = 0 + ensemble_population_size: int = 0 + num_elites: int = 0 + mdnrnn: MDNRNNTrainerParameters = MDNRNNTrainerParameters() + rl: RLParameters = RLParameters() + alpha: float = 0.25 + epsilon: float = 0.001 + + @dataclass(frozen=True) class CNNParameters(BaseDataClass): __hash__ = param_hash @@ -181,38 +215,6 @@ class NormalizationData(BaseDataClass): dense_normalization_parameters: Optional[Dict[int, NormalizationParameters]] -@dataclass(frozen=True) -class MDNRNNParameters(BaseDataClass): - hidden_size: int = 64 - num_hidden_layers: int = 2 - minibatch_size: int = 16 - learning_rate: float = 0.001 - num_gaussians: int = 5 - train_data_percentage: float = 60.0 - validation_data_percentage: float = 20.0 - test_data_percentage: float = 20.0 - # weight in calculating world-model loss - reward_loss_weight: float = 1.0 - next_state_loss_weight: float = 1.0 - not_terminal_loss_weight: float = 1.0 - fit_only_one_next_step: bool = False - - -@dataclass(frozen=True) -class CEMParameters(BaseDataClass): - plan_horizon_length: int = 0 - num_world_models: int = 0 - cem_population_size: int = 0 - cem_num_iterations: int = 0 - ensemble_population_size: int = 0 - num_elites: int = 0 - mdnrnn: MDNRNNParameters = MDNRNNParameters() - rl: RLParameters = RLParameters() - evaluation: EvaluationParameters = EvaluationParameters() - alpha: float = 0.25 - epsilon: float = 0.001 - - ################################################# # RL Ranking parameters # ################################################# diff --git a/reagent/replay_memory/circular_replay_buffer.py b/reagent/replay_memory/circular_replay_buffer.py index f4f57444d..e5f77ff2f 100644 --- a/reagent/replay_memory/circular_replay_buffer.py +++ b/reagent/replay_memory/circular_replay_buffer.py @@ -122,6 +122,7 @@ def __init__( stack_size: int, replay_capacity: int, batch_size: int, + return_everything_as_stack: bool = False, update_horizon: int = 1, gamma: float = 0.99, max_sample_attempts: int = 1000, @@ -139,6 +140,8 @@ def __init__( stack_size: int, number of frames to use in state stack. replay_capacity: int, number of transitions to keep in memory. batch_size: int. + return_everything_as_stack: bool, set True if we want everything, + not just states, to be stacked too update_horizon: int, length of update ('n' in n-step update). gamma: int, the discount factor. max_sample_attempts: int, the maximum number of attempts allowed to @@ -185,6 +188,7 @@ def __init__( self._reward_dtype = reward_dtype self._observation_shape = observation_shape self._stack_size = stack_size + self._return_everything_as_stack = return_everything_as_stack self._state_shape = self._observation_shape + (self._stack_size,) self._replay_capacity = replay_capacity self._batch_size = batch_size @@ -229,6 +233,7 @@ def create_from_env( stack_size: int = 1, store_possible_actions_mask: bool = True, store_log_prob: bool = True, + **kwargs, ): extra_storage_types: List[ReplayElement] = [] obs_space = env.observation_space @@ -285,6 +290,7 @@ def create_from_env( reward_shape=(), reward_dtype=np.float32, extra_storage_types=extra_storage_types, + **kwargs, ) @staticmethod @@ -619,8 +625,9 @@ def sample_transition_batch_tensor(self, batch_size=None, indices=None): batch = self.sample_transition_batch(batch_size=batch_size, indices=indices) def _normalize_tensor(k, v): + squeeze_set = {"state", "next_state"} t = torch.tensor(v) - if (k == "state" or k == "next_state") and self._stack_size == 1: + if k in squeeze_set and self._stack_size == 1: t = t.squeeze(2) elif t.ndim == 1: t = t.unsqueeze(1) @@ -664,15 +671,18 @@ def sample_transition_batch(self, batch_size=None, indices=None): transition_elements = self.get_transition_elements(batch_size) - def get_obs_stack_for_indices(indices): + def get_stack_for_indices(key, indices): """ Get stack of observations """ # calculate 2d array of indices with size (batch_size, stack_size) # ith row contain indices in the stack of obs at indices[i] stack_indices = indices.reshape(-1, 1) + np.arange(-self._stack_size + 1, 1) stack_indices %= self._replay_capacity - # Reshape to (batch_size, obs_shape, stack_size) - perm = [0] + list(range(2, len(self._observation_shape) + 2)) + [1] - return self._store["observation"][stack_indices].transpose(perm) + retval = self._store[key][stack_indices] + if len(retval.shape) > 2: + # Reshape to (batch_size, obs_shape, stack_size) + perm = [0] + list(range(2, len(self._observation_shape) + 2)) + [1] + retval = retval.transpose(perm) + return retval # calculate 2d array of indices with size (batch_size, update_horizon) # ith row contain the multistep indices starting at indices[i] @@ -681,8 +691,8 @@ def get_obs_stack_for_indices(indices): def get_traj_lengths(): """ Calculate trajectory length, defined to be the number of states - in this multi_step transition until terminal state or end of - multi_step. Dopamine calls multi_step as "update_horizon". + in this multi_step transition until terminal state or until + end of multi_step (a.k.a. update_horizon). """ terminals = self._store["terminal"][multistep_indices] # if trajectory is non-terminal, we'll have traj_length = update_horizon @@ -705,23 +715,41 @@ def get_multistep_reward_for_indices(): batch_arrays = [] for element in transition_elements: if element.name == "state": - batch = get_obs_stack_for_indices(indices) + batch = get_stack_for_indices("observation", indices) elif element.name == "next_state": - batch = get_obs_stack_for_indices(next_indices) + batch = get_stack_for_indices("observation", next_indices) elif element.name == "reward": - batch = get_multistep_reward_for_indices() + if self._return_everything_as_stack: + if self._update_horizon > 1: + raise NotImplementedError( + "Uncertain how to do this without double counting.." + ) + batch = get_stack_for_indices("reward", indices) + else: + batch = get_multistep_reward_for_indices() elif element.name == "terminal": terminal_indices = (next_indices - 1) % self._replay_capacity - batch = self._store["terminal"][terminal_indices].astype(np.bool) + if self._return_everything_as_stack: + batch = get_stack_for_indices("terminal", terminal_indices) + else: + batch = self._store["terminal"][terminal_indices] + batch = batch.astype(np.bool) elif element.name == "indices": batch = indices - elif element.name in ("next_action", "next_reward"): - store_name = element.name.lstrip("next_") - batch = self._store[store_name][next_indices] elif element.name in self._store: - batch = self._store[element.name][indices] - elif element.name.startswith("next_") and element.name[5:] in self._store: - batch = self._store[element.name[5:]][next_indices] + if self._return_everything_as_stack: + batch = get_stack_for_indices(element.name, indices) + else: + batch = self._store[element.name][indices] + elif element.name.startswith("next_"): + store_name = element.name[len("next_") :] + assert ( + store_name in self._store + ), f"{store_name} is not in {self._store.keys()}" + if self._return_everything_as_stack: + batch = get_stack_for_indices(store_name, next_indices) + else: + batch = self._store[store_name][next_indices] batch = batch.astype(element.type) batch_arrays.append(batch) diff --git a/reagent/test/base/horizon_test_base.py b/reagent/test/base/horizon_test_base.py index 676c00c95..c30ee23d3 100644 --- a/reagent/test/base/horizon_test_base.py +++ b/reagent/test/base/horizon_test_base.py @@ -1,14 +1,42 @@ #!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +import random import unittest +from typing import Callable +import numpy as np +import torch +from reagent.core.configuration import make_config_class from reagent.tensorboardX import SummaryWriterContext +from ruamel.yaml import YAML + + +SEED = 0 class HorizonTestBase(unittest.TestCase): def setUp(self): SummaryWriterContext._reset_globals() + logging.basicConfig(level=logging.INFO) + np.random.seed(SEED) + torch.manual_seed(SEED) + random.seed(SEED) def tearDown(self): SummaryWriterContext._reset_globals() + + @classmethod + def run_from_config(cls, run_test: Callable, config_path: str, use_gpu: bool): + yaml = YAML(typ="safe") + with open(config_path, "r") as f: + config_dict = yaml.load(f.read()) + config_dict["use_gpu"] = use_gpu + + @make_config_class(run_test) + class ConfigClass: + pass + + config = ConfigClass(**config_dict) # type: ignore + return run_test(**config.asdict()) # type: ignore diff --git a/reagent/test/environment/__init__.py b/reagent/test/environment/__init__.py index 96585b32e..ec3ac3aaf 100644 --- a/reagent/test/environment/__init__.py +++ b/reagent/test/environment/__init__.py @@ -2,22 +2,5 @@ # Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. import logging -from gym.envs.registration import register, registry -from reagent.test.environment.linear_dynamics import LinDynaEnv # noqa - logger = logging.getLogger(__name__) - - -def register_if_not_exists(id, entry_point): - """ - Preventing tests from failing trying to re-register environments - """ - if id not in registry.env_specs: - register(id=id, entry_point=entry_point) - - -register_if_not_exists( - id="LinearDynamics-v0", - entry_point="reagent.test.environment.linear_dynamics:LinDynaEnv", -) diff --git a/reagent/test/gym/open_ai_gym_environment.py b/reagent/test/gym/open_ai_gym_environment.py index e470a35b7..eea9279a1 100644 --- a/reagent/test/gym/open_ai_gym_environment.py +++ b/reagent/test/gym/open_ai_gym_environment.py @@ -8,7 +8,7 @@ import gym import numpy as np -import reagent.test.gym.pomdp # noqa +import reagent.gym.envs # noqa import torch from gym import Env from reagent.gym.envs.env_factory import EnvFactory @@ -133,9 +133,11 @@ def _create_env(self, gymenv: Union[str, Env], random_seed: Optional[int]): self.state_dim = self.env.observation_space.shape[0] # type: ignore self.img = False elif len(self.env.observation_space.shape) == 3: # type: ignore - self.height, self.width, self.num_input_channels = ( - self.env.observation_space.shape # type: ignore - ) + ( + self.height, + self.width, + self.num_input_channels, + ) = self.env.observation_space.shape # type: ignore self.img = True @property diff --git a/reagent/test/gym/pomdp/__init__.py b/reagent/test/gym/pomdp/__init__.py deleted file mode 100644 index 9da5ea754..000000000 --- a/reagent/test/gym/pomdp/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -import logging - -from reagent.test.environment import register_if_not_exists -from reagent.test.gym.pomdp.pocman import PocManEnv # noqa -from reagent.test.gym.pomdp.string_game import StringGameEnv # noqa - - -logger = logging.getLogger(__name__) - - -register_if_not_exists( - id="Pocman-v0", entry_point="reagent.test.gym.pomdp.pocman:PocManEnv" -) -register_if_not_exists( - id="StringGame-v0", entry_point="reagent.test.gym.pomdp.string_game:StringGameEnv" -) diff --git a/reagent/test/gym/run_gym.py b/reagent/test/gym/run_gym.py index 729ec0e3b..f2a98ab0f 100644 --- a/reagent/test/gym/run_gym.py +++ b/reagent/test/gym/run_gym.py @@ -13,12 +13,12 @@ import torch from reagent.json_serialize import json_to_object from reagent.parameters import ( - CEMParameters, + CEMTrainerParameters, ContinuousActionModelParameters, DiscreteActionModelParameters, EvaluationParameters, FeedForwardParameters, - MDNRNNParameters, + MDNRNNTrainerParameters, RainbowDQNParameters, RLParameters, TrainingParameters, @@ -95,8 +95,8 @@ class OpenAiGymParameters(BaseDataClass): sac_value_training: Optional[FeedForwardParameters] = None critic_training: Optional[FeedForwardParameters] = None actor_training: Optional[FeedForwardParameters] = None - cem: Optional[CEMParameters] = None - mdnrnn: Optional[MDNRNNParameters] = None + cem: Optional[CEMTrainerParameters] = None + mdnrnn: Optional[MDNRNNTrainerParameters] = None evaluation: EvaluationParameters = EvaluationParameters() diff --git a/reagent/test/gym/world_model/mdnrnn_gym.py b/reagent/test/gym/world_model/mdnrnn_gym.py deleted file mode 100644 index be9388872..000000000 --- a/reagent/test/gym/world_model/mdnrnn_gym.py +++ /dev/null @@ -1,646 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -""" -Learn a world model on gym environments -""" -import argparse -import json -import logging -import sys -from typing import Dict, Optional - -import numpy as np -import reagent.types as rlt -import torch -from reagent.evaluation.world_model_evaluator import ( - FeatureImportanceEvaluator, - FeatureSensitivityEvaluator, -) -from reagent.json_serialize import json_to_object -from reagent.models.mdn_rnn import MDNRNNMemoryPool -from reagent.models.world_model import MemoryNetwork -from reagent.test.gym.open_ai_gym_environment import ( - EnvType, - ModelType, - OpenAIGymEnvironment, -) -from reagent.test.gym.run_gym import ( - OpenAiGymParameters, - OpenAiRunDetails, - dict_to_np, - get_possible_actions, -) -from reagent.training.rl_dataset import RLDataset -from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer - - -logger = logging.getLogger(__name__) - - -def loss_to_num(losses): - return {k: v.item() for k, v in losses.items()} - - -def multi_step_sample_generator( - gym_env: OpenAIGymEnvironment, - num_transitions: int, - max_steps: Optional[int], - multi_steps: int, - include_shorter_samples_at_start: bool, - include_shorter_samples_at_end: bool, -): - """ - Convert gym env multi-step sample format to mdn-rnn multi-step sample format - - :param gym_env: The environment used to generate multi-step samples - :param num_transitions: # of samples to return - :param max_steps: An episode terminates when the horizon is beyond max_steps - :param multi_steps: # of steps of states and actions per sample - :param include_shorter_samples_at_start: Whether to keep samples of shorter steps - which are generated at the beginning of an episode - :param include_shorter_samples_at_end: Whether to keep samples of shorter steps - which are generated at the end of an episode - """ - samples = gym_env.generate_random_samples( - num_transitions=num_transitions, - use_continuous_action=True, - max_step=max_steps, - multi_steps=multi_steps, - include_shorter_samples_at_start=include_shorter_samples_at_start, - include_shorter_samples_at_end=include_shorter_samples_at_end, - ) - - for j in range(num_transitions): - sample_steps = len(samples.terminals[j]) # type: ignore - state = dict_to_np(samples.states[j], np_size=gym_env.state_dim, key_offset=0) - action = dict_to_np( - samples.actions[j], np_size=gym_env.action_dim, key_offset=gym_env.state_dim - ) - next_actions = np.float32( # type: ignore - [ - dict_to_np( - samples.next_actions[j][k], - np_size=gym_env.action_dim, - key_offset=gym_env.state_dim, - ) - for k in range(sample_steps) - ] - ) - next_states = np.float32( # type: ignore - [ - dict_to_np( - samples.next_states[j][k], np_size=gym_env.state_dim, key_offset=0 - ) - for k in range(sample_steps) - ] - ) - rewards = np.float32(samples.rewards[j]) # type: ignore - terminals = np.float32(samples.terminals[j]) # type: ignore - not_terminals = np.logical_not(terminals) - ordered_states = np.vstack((state, next_states)) - ordered_actions = np.vstack((action, next_actions)) - mdnrnn_states = ordered_states[:-1] - mdnrnn_actions = ordered_actions[:-1] - mdnrnn_next_states = ordered_states[-multi_steps:] - mdnrnn_next_actions = ordered_actions[-multi_steps:] - - # Padding zeros so that all samples have equal steps - # The general rule is to pad zeros at the end of sequences. - # In addition, if the sequence only has one step (i.e., the - # first state of an episode), pad one zero row ahead of the - # sequence, which enables embedding generated properly for - # one-step samples - num_padded_top_rows = 1 if multi_steps > 1 and sample_steps == 1 else 0 - num_padded_bottom_rows = multi_steps - sample_steps - num_padded_top_rows - sample_steps_next = len(mdnrnn_next_states) - num_padded_top_rows_next = 0 - num_padded_bottom_rows_next = multi_steps - sample_steps_next - yield ( - np.pad( - mdnrnn_states, - ((num_padded_top_rows, num_padded_bottom_rows), (0, 0)), - "constant", - constant_values=0.0, - ), - np.pad( - mdnrnn_actions, - ((num_padded_top_rows, num_padded_bottom_rows), (0, 0)), - "constant", - constant_values=0.0, - ), - np.pad( - rewards, - ((num_padded_top_rows, num_padded_bottom_rows)), - "constant", - constant_values=0.0, - ), - np.pad( - mdnrnn_next_states, - ((num_padded_top_rows_next, num_padded_bottom_rows_next), (0, 0)), - "constant", - constant_values=0.0, - ), - np.pad( - mdnrnn_next_actions, - ((num_padded_top_rows_next, num_padded_bottom_rows_next), (0, 0)), - "constant", - constant_values=0.0, - ), - np.pad( - not_terminals, - ((num_padded_top_rows, num_padded_bottom_rows)), - "constant", - constant_values=0.0, - ), - sample_steps, - sample_steps_next, - ) - - -def get_replay_buffer( - num_episodes: int, seq_len: int, max_step: int, gym_env: OpenAIGymEnvironment -) -> MDNRNNMemoryPool: - num_transitions = num_episodes * max_step - replay_buffer = MDNRNNMemoryPool(max_replay_memory_size=num_transitions) - for ( - mdnrnn_state, - mdnrnn_action, - rewards, - next_states, - _, - not_terminals, - _, - _, - ) in multi_step_sample_generator( - gym_env, - num_transitions=num_transitions, - max_steps=max_step, - multi_steps=seq_len, - include_shorter_samples_at_start=False, - include_shorter_samples_at_end=False, - ): - mdnrnn_state, mdnrnn_action, next_states, rewards, not_terminals = ( - torch.tensor(mdnrnn_state), - torch.tensor(mdnrnn_action), - torch.tensor(next_states), - torch.tensor(rewards), - torch.tensor(not_terminals), - ) - replay_buffer.insert_into_memory( - mdnrnn_state, mdnrnn_action, next_states, rewards, not_terminals - ) - - return replay_buffer - - -def main(args): - parser = argparse.ArgumentParser( - description="Train a Mixture-Density-Network RNN net to learn an OpenAI" - " Gym environment, i.e., predict next state, reward, and" - " terminal signal using current state and action" - ) - parser.add_argument("-p", "--parameters", help="Path to JSON parameters file.") - parser.add_argument( - "-g", - "--gpu_id", - help="If set, will use GPU with specified ID. Otherwise will use CPU.", - default=-1, - ) - parser.add_argument( - "-l", - "--log_level", - choices=["debug", "info", "warning", "error", "critical"], - help="If set, use logging level specified (debug, info, warning, error, " - "critical). Else defaults to info.", - default="info", - ) - parser.add_argument( - "-f", - "--feature_importance", - action="store_true", - help="If set, feature importance will be calculated after the training", - ) - parser.add_argument( - "-s", - "--feature_sensitivity", - action="store_true", - help="If set, state feature sensitivity by varying actions will be" - " calculated after the training", - ) - parser.add_argument( - "-e", - "--save_embedding_to_path", - help="If a file path is provided, save a RLDataset with states embedded" - " by the trained world model", - ) - args = parser.parse_args(args) - - logger.setLevel(getattr(logging, args.log_level.upper())) - - with open(args.parameters, "r") as f: - params = json_to_object(f.read(), OpenAiGymParameters) - if args.gpu_id != -1: - params = params._replace(use_gpu=True) - - mdnrnn_gym( - params, - args.feature_importance, - args.feature_sensitivity, - args.save_embedding_to_path, - ) - - -def mdnrnn_gym( - params: OpenAiGymParameters, - feature_importance: bool = False, - feature_sensitivity: bool = False, - save_embedding_to_path: Optional[str] = None, - seed: Optional[int] = None, -): - assert params.mdnrnn is not None - use_gpu = params.use_gpu - logger.info("Running gym with params") - logger.info(params) - - env_type = params.env - env = OpenAIGymEnvironment( - env_type, epsilon=1.0, softmax_policy=False, gamma=0.99, random_seed=seed - ) - - # create test data once - assert params.run_details.max_steps is not None - test_replay_buffer = get_replay_buffer( - params.run_details.num_test_episodes, - params.run_details.seq_len, - params.run_details.max_steps, - env, - ) - test_batch = test_replay_buffer.sample_memories( - test_replay_buffer.memory_size, use_gpu=use_gpu, batch_first=True - ) - - trainer = create_trainer(params, env, use_gpu) - _, _, trainer = train_sgd( - env, - trainer, - use_gpu, - "{} test run".format(env_type), - params.mdnrnn.minibatch_size, - params.run_details, - test_batch=test_batch, - ) - feature_importance_map, feature_sensitivity_map, dataset = None, None, None - if feature_importance: - feature_importance_map = calculate_feature_importance( - env, trainer, use_gpu, params.run_details, test_batch=test_batch - ) - if feature_sensitivity: - feature_sensitivity_map = calculate_feature_sensitivity_by_actions( - env, trainer, use_gpu, params.run_details, test_batch=test_batch - ) - if save_embedding_to_path: - dataset = RLDataset(save_embedding_to_path) - create_embed_rl_dataset(env, trainer, dataset, use_gpu, params.run_details) - dataset.save() - return env, trainer, feature_importance_map, feature_sensitivity_map, dataset - - -def calculate_feature_importance( - gym_env: OpenAIGymEnvironment, - trainer: MDNRNNTrainer, - use_gpu: bool, - run_details: OpenAiRunDetails, - test_batch: rlt.PreprocessedTrainingBatch, -): - assert run_details.max_steps is not None - assert run_details.num_test_episodes is not None - assert run_details.seq_len is not None - feature_importance_evaluator = FeatureImportanceEvaluator( - trainer, - discrete_action=gym_env.action_type == EnvType.DISCRETE_ACTION, - state_feature_num=gym_env.state_dim, - action_feature_num=gym_env.action_dim, - sorted_action_feature_start_indices=list(range(gym_env.action_dim)), - sorted_state_feature_start_indices=list(range(gym_env.state_dim)), - ) - feature_loss_vector = feature_importance_evaluator.evaluate(test_batch)[ - "feature_loss_increase" - ] - feature_importance_map = {} - for i in range(gym_env.action_dim): - print( - "action {}, feature importance: {}".format(i, feature_loss_vector[i].item()) - ) - feature_importance_map[f"action{i}"] = feature_loss_vector[i].item() - for i in range(gym_env.state_dim): - print( - "state {}, feature importance: {}".format( - i, feature_loss_vector[i + gym_env.action_dim].item() - ) - ) - feature_importance_map[f"state{i}"] = feature_loss_vector[ - i + gym_env.action_dim - ].item() - return feature_importance_map - - -def create_embed_rl_dataset( - gym_env: OpenAIGymEnvironment, - trainer: MDNRNNTrainer, - dataset: RLDataset, - use_gpu: bool, - run_details: OpenAiRunDetails, -): - assert run_details.max_steps is not None - old_mdnrnn_mode = trainer.mdnrnn.mdnrnn.training - trainer.mdnrnn.mdnrnn.eval() - num_transitions = run_details.num_state_embed_episodes * run_details.max_steps - device = torch.device("cuda") if use_gpu else torch.device("cpu") # type: ignore - - ( - state_batch, - action_batch, - reward_batch, - next_state_batch, - next_action_batch, - not_terminal_batch, - step_batch, - next_step_batch, - ) = map( - list, - zip( - *multi_step_sample_generator( - gym_env=gym_env, - num_transitions=num_transitions, - max_steps=run_details.max_steps, - # +1 because MDNRNN embeds the first seq_len steps and then - # the embedded state will be concatenated with the last step - multi_steps=run_details.seq_len + 1, - include_shorter_samples_at_start=True, - include_shorter_samples_at_end=False, - ) - ), - ) - - def concat_batch(batch): - return torch.cat( - [ - torch.tensor( - np.expand_dims(x, axis=1), dtype=torch.float, device=device - ) - for x in batch - ], - dim=1, - ) - - # shape: seq_len x batch_size x feature_dim - mdnrnn_state = concat_batch(state_batch) - next_mdnrnn_state = concat_batch(next_state_batch) - mdnrnn_action = concat_batch(action_batch) - next_mdnrnn_action = concat_batch(next_action_batch) - - mdnrnn_input = rlt.PreprocessedStateAction.from_tensors( - state=mdnrnn_state, action=mdnrnn_action - ) - next_mdnrnn_input = rlt.PreprocessedStateAction.from_tensors( - state=next_mdnrnn_state, action=next_mdnrnn_action - ) - # batch-compute state embedding - mdnrnn_output = trainer.mdnrnn(mdnrnn_input) - next_mdnrnn_output = trainer.mdnrnn(next_mdnrnn_input) - - for i in range(len(state_batch)): - # Embed the state as the hidden layer's output - # until the previous step + current state - hidden_idx = 0 if step_batch[i] == 1 else step_batch[i] - 2 # type: ignore - next_hidden_idx = next_step_batch[i] - 2 # type: ignore - hidden_embed = ( - mdnrnn_output.all_steps_lstm_hidden[hidden_idx, i, :] - .squeeze() - .detach() - .cpu() - ) - state_embed = torch.cat( - (hidden_embed, torch.tensor(state_batch[i][hidden_idx + 1])) # type: ignore - ) - next_hidden_embed = ( - next_mdnrnn_output.all_steps_lstm_hidden[next_hidden_idx, i, :] - .squeeze() - .detach() - .cpu() - ) - next_state_embed = torch.cat( - ( - next_hidden_embed, - torch.tensor(next_state_batch[i][next_hidden_idx + 1]), # type: ignore - ) - ) - - logger.debug( - "create_embed_rl_dataset:\nstate batch\n{}\naction batch\n{}\nlast " - "action: {},reward: {}\nstate embed {}\nnext state embed {}\n".format( - state_batch[i][: hidden_idx + 1], # type: ignore - action_batch[i][: hidden_idx + 1], # type: ignore - action_batch[i][hidden_idx + 1], # type: ignore - reward_batch[i][hidden_idx + 1], # type: ignore - state_embed, - next_state_embed, - ) - ) - - terminal = 1 - not_terminal_batch[i][hidden_idx + 1] # type: ignore - possible_actions, possible_actions_mask = get_possible_actions( - gym_env, ModelType.PYTORCH_PARAMETRIC_DQN.value, False - ) - possible_next_actions, possible_next_actions_mask = get_possible_actions( - gym_env, ModelType.PYTORCH_PARAMETRIC_DQN.value, terminal - ) - dataset.insert( - state=state_embed, - action=torch.tensor(action_batch[i][hidden_idx + 1]), # type: ignore - reward=float(reward_batch[i][hidden_idx + 1]), # type: ignore - next_state=next_state_embed, - next_action=torch.tensor( - next_action_batch[i][next_hidden_idx + 1] # type: ignore - ), - terminal=torch.tensor(terminal), - possible_next_actions=possible_next_actions, - possible_next_actions_mask=possible_next_actions_mask, - time_diff=torch.tensor(1), - possible_actions=possible_actions, - possible_actions_mask=possible_actions_mask, - policy_id=0, - ) - logger.info( - "Insert {} transitions into a state embed dataset".format(len(state_batch)) - ) - trainer.mdnrnn.mdnrnn.train(old_mdnrnn_mode) - return dataset - - -def calculate_feature_sensitivity_by_actions( - gym_env: OpenAIGymEnvironment, - trainer: MDNRNNTrainer, - use_gpu: bool, - run_details: OpenAiRunDetails, - test_batch: rlt.PreprocessedTrainingBatch, - seq_len: int = 5, - num_test_episodes: int = 100, -): - assert run_details.max_steps is not None - feature_sensitivity_evaluator = FeatureSensitivityEvaluator( - trainer, - state_feature_num=gym_env.state_dim, - sorted_state_feature_start_indices=list(range(gym_env.state_dim)), - ) - feature_sensitivity_vector = feature_sensitivity_evaluator.evaluate(test_batch)[ - "feature_sensitivity" - ] - feature_sensitivity_map = {} - for i in range(gym_env.state_dim): - feature_sensitivity_map["state" + str(i)] = feature_sensitivity_vector[i].item() - print( - "state {}, feature sensitivity: {}".format( - i, feature_sensitivity_vector[i].item() - ) - ) - return feature_sensitivity_map - - -def train_sgd( - gym_env: OpenAIGymEnvironment, - trainer: MDNRNNTrainer, - use_gpu: bool, - test_run_name: str, - minibatch_size: int, - run_details: OpenAiRunDetails, - test_batch: rlt.PreprocessedTrainingBatch, -): - assert run_details.max_steps is not None - train_replay_buffer = get_replay_buffer( - run_details.num_train_episodes, - run_details.seq_len, - run_details.max_steps, - gym_env, - ) - valid_replay_buffer = get_replay_buffer( - run_details.num_test_episodes, - run_details.seq_len, - run_details.max_steps, - gym_env, - ) - valid_batch = valid_replay_buffer.sample_memories( - valid_replay_buffer.memory_size, use_gpu=use_gpu, batch_first=True - ) - valid_loss_history = [] - - num_batch_per_epoch = train_replay_buffer.memory_size // minibatch_size - logger.info( - "Collected data {} transitions.\n" - "Training will take {} epochs, with each epoch having {} mini-batches" - " and each mini-batch having {} samples".format( - train_replay_buffer.memory_size, - run_details.train_epochs, - num_batch_per_epoch, - minibatch_size, - ) - ) - - for i_epoch in range(run_details.train_epochs): - for i_batch in range(num_batch_per_epoch): - training_batch = train_replay_buffer.sample_memories( - minibatch_size, use_gpu=use_gpu, batch_first=True - ) - losses = trainer.train(training_batch, batch_first=True) - logger.info( - "{}-th epoch, {}-th minibatch: \n" - "loss={}, bce={}, gmm={}, mse={} \n" - "cum loss={}, cum bce={}, cum gmm={}, cum mse={}\n".format( - i_epoch, - i_batch, - losses["loss"], - losses["bce"], - losses["gmm"], - losses["mse"], - np.mean(trainer.cum_loss), - np.mean(trainer.cum_bce), - np.mean(trainer.cum_gmm), - np.mean(trainer.cum_mse), - ) - ) - - trainer.mdnrnn.mdnrnn.eval() - valid_losses = trainer.get_loss( - valid_batch, state_dim=gym_env.state_dim, batch_first=True - ) - valid_losses = loss_to_num(valid_losses) - valid_loss_history.append(valid_losses) - trainer.mdnrnn.mdnrnn.train() - logger.info( - "{}-th epoch, validate loss={}, bce={}, gmm={}, mse={}".format( - i_epoch, - valid_losses["loss"], - valid_losses["bce"], - valid_losses["gmm"], - valid_losses["mse"], - ) - ) - latest_loss = valid_loss_history[-1]["loss"] - recent_valid_loss_hist = valid_loss_history[ - -1 - run_details.early_stopping_patience : -1 - ] - # earlystopping - if len(valid_loss_history) > run_details.early_stopping_patience and all( - (latest_loss >= v["loss"] for v in recent_valid_loss_hist) - ): - break - - trainer.mdnrnn.mdnrnn.eval() - test_losses = trainer.get_loss( - test_batch, state_dim=gym_env.state_dim, batch_first=True - ) - test_losses = loss_to_num(test_losses) - logger.info( - "Test loss: {}, bce={}, gmm={}, mse={}".format( - test_losses["loss"], - test_losses["bce"], - test_losses["gmm"], - test_losses["mse"], - ) - ) - logger.info("Valid loss history: {}".format(valid_loss_history)) - return test_losses, valid_loss_history, trainer - - -def create_trainer( - params: OpenAiGymParameters, env: OpenAIGymEnvironment, use_gpu: bool -): - assert params.mdnrnn is not None - assert params.run_details.max_steps is not None - mdnrnn_params = params.mdnrnn - mdnrnn_net = MemoryNetwork( - state_dim=env.state_dim, - action_dim=env.action_dim, - num_hiddens=mdnrnn_params.hidden_size, - num_hidden_layers=mdnrnn_params.num_hidden_layers, - num_gaussians=mdnrnn_params.num_gaussians, - ) - if use_gpu: - mdnrnn_net = mdnrnn_net.cuda() - - cum_loss_hist_len = ( - params.run_details.num_train_episodes - * params.run_details.max_steps - // mdnrnn_params.minibatch_size - ) - trainer = MDNRNNTrainer( - mdnrnn_network=mdnrnn_net, params=mdnrnn_params, cum_loss_hist=cum_loss_hist_len - ) - return trainer - - -if __name__ == "__main__": - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - logging.getLogger().setLevel(logging.INFO) - args = sys.argv - main(args[1:]) diff --git a/reagent/test/gym/world_model/state_embed_gym.py b/reagent/test/gym/world_model/state_embed_gym.py deleted file mode 100644 index 971e5fe98..000000000 --- a/reagent/test/gym/world_model/state_embed_gym.py +++ /dev/null @@ -1,266 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -""" -This file shows an example of using embedded states to feed to RL models in -partially observable environments (POMDPs). Embedded states are generated by a world -model which learns how to encode past n observations into a low-dimension -vector.Embedded states improve performance in POMDPs compared to just using -one-step observations as states because they encode more historical information -than one-step observations. -""" -import argparse -import json -import logging -import sys -from collections import deque - -import gym -import numpy as np -import reagent.types as rlt -import torch -from gym import Env -from gym.spaces import Box -from reagent.json_serialize import from_json, json_to_object -from reagent.models.world_model import MemoryNetwork -from reagent.test.gym.open_ai_gym_environment import EnvType, OpenAIGymEnvironment -from reagent.test.gym.open_ai_gym_memory_pool import OpenAIGymMemoryPool -from reagent.test.gym.run_gym import ( - OpenAiGymParameters, - create_epsilon, - create_predictor, - create_trainer, - train_gym_offline_rl, -) -from reagent.test.gym.world_model.mdnrnn_gym import create_embed_rl_dataset, mdnrnn_gym -from reagent.training.rl_dataset import RLDataset - - -logger = logging.getLogger(__name__) - - -class StateEmbedGymEnvironment(Env): - def __init__( - self, - gym_env: Env, - mdnrnn: MemoryNetwork, - max_embed_seq_len: int, - state_min_value: float, - state_max_value: float, - ): - self.env = gym_env - self.unwrapped.spec = self.env.unwrapped.spec - self.max_embed_seq_len = max_embed_seq_len - self.mdnrnn = mdnrnn - self.embed_size = self.mdnrnn.num_hiddens - self.raw_state_dim = self.env.observation_space.shape[0] # type: ignore - self.state_dim = self.embed_size + self.raw_state_dim - if isinstance(self.env.action_space, gym.spaces.Discrete): - self.action_type = EnvType.DISCRETE_ACTION - self.action_dim = self.env.action_space.n - elif isinstance(self.env.action_space, gym.spaces.Box): - self.action_type = EnvType.CONTINUOUS_ACTION - self.action_dim = self.env.action_space.shape[0] - - self.action_space = self.env.action_space - self.observation_space = Box( # type: ignore - low=state_min_value, high=state_max_value, shape=(self.state_dim,) - ) - - self.cur_raw_state = None - self.recent_states = deque([], maxlen=self.max_embed_seq_len) # type: ignore - self.recent_actions = deque([], maxlen=self.max_embed_seq_len) # type: ignore - - def seed(self, seed): - self.env.seed(seed) - - def embed_state(self, state): - """ Embed state after either reset() or step() """ - assert len(self.recent_states) == len(self.recent_actions) - old_mdnrnn_mode = self.mdnrnn.mdnrnn.training - self.mdnrnn.mdnrnn.eval() - - # Embed the state as the hidden layer's output - # until the previous step + current state - if len(self.recent_states) == 0: - mdnrnn_state = np.zeros((1, self.raw_state_dim)) - mdnrnn_action = np.zeros((1, self.action_dim)) - else: - mdnrnn_state = np.array(list(self.recent_states)) - mdnrnn_action = np.array(list(self.recent_actions)) - - mdnrnn_state = torch.tensor(mdnrnn_state, dtype=torch.float).unsqueeze(1) - mdnrnn_action = torch.tensor(mdnrnn_action, dtype=torch.float).unsqueeze(1) - mdnrnn_input = rlt.PreprocessedStateAction.from_tensors( - state=mdnrnn_state, action=mdnrnn_action - ) - mdnrnn_output = self.mdnrnn(mdnrnn_input) - hidden_embed = ( - mdnrnn_output.all_steps_lstm_hidden[-1].squeeze().detach().cpu().numpy() - ) - state_embed = np.hstack((hidden_embed, state)) - self.mdnrnn.mdnrnn.train(old_mdnrnn_mode) - logger.debug( - "Embed_state\nrecent states: {}\nrecent actions: {}\nstate_embed{}\n".format( - np.array(self.recent_states), np.array(self.recent_actions), state_embed - ) - ) - return state_embed - - def reset(self): - next_raw_state = self.env.reset() - self.recent_states = deque([], maxlen=self.max_embed_seq_len) - self.recent_actions = deque([], maxlen=self.max_embed_seq_len) - self.cur_raw_state = next_raw_state - next_embed_state = self.embed_state(next_raw_state) - return next_embed_state - - def step(self, action): - if self.action_type == EnvType.DISCRETE_ACTION: - action_np = np.zeros(self.action_dim) - action_np[action] = 1.0 - else: - action_np = action - self.recent_states.append(self.cur_raw_state) - self.recent_actions.append(action_np) - next_raw_state, reward, terminal, info = self.env.step(action) - logger.debug("action {}, reward {}\n".format(action, reward)) - self.cur_raw_state = next_raw_state - next_embed_state = self.embed_state(next_raw_state) - return next_embed_state, reward, terminal, info - - -def main(args): - parser = argparse.ArgumentParser( - description="Train a RL net to play in an OpenAI Gym environment. " - "States are embedded by a mdn-rnn model." - ) - parser.add_argument( - "-p", - "--mdnrnn_parameters", - help="Path to JSON parameters file for MDN-RNN training.", - ) - parser.add_argument( - "-q", "--rl_parameters", help="Path to JSON parameters file for RL training." - ) - parser.add_argument( - "-s", - "--score-bar", - help="Bar for averaged tests scores.", - type=float, - default=None, - ) - parser.add_argument( - "-g", - "--gpu_id", - help="If set, will use GPU with specified ID. Otherwise will use CPU.", - default=-1, - ) - parser.add_argument( - "-l", - "--log_level", - help="If set, use logging level specified (debug, info, warning, error, " - "critical). Else defaults to info.", - default="info", - ) - args = parser.parse_args(args) - if args.log_level not in ("debug", "info", "warning", "error", "critical"): - raise Exception("Logging level {} not valid level.".format(args.log_level)) - else: - logging.getLogger().setLevel(getattr(logging, args.log_level.upper())) - - with open(args.mdnrnn_parameters, "r") as f: - mdnrnn_params = json_to_object(f.read(), OpenAiGymParameters) - with open(args.rl_parameters, "r") as f: - rl_params = json_to_object(f.read(), OpenAiGymParameters) - - env, mdnrnn_trainer, embed_rl_dataset = create_mdnrnn_trainer_and_embed_dataset( - mdnrnn_params, rl_params.use_gpu - ) - - max_embed_seq_len = mdnrnn_params["run_details"]["seq_len"] - _, _, rl_trainer, rl_predictor, state_embed_env = run_gym( - rl_params, - args.score_bar, - embed_rl_dataset, - env.env, - mdnrnn_trainer.mdnrnn, - max_embed_seq_len, - ) - - -def create_mdnrnn_trainer_and_embed_dataset( - mdnrnn_params: OpenAiGymParameters, use_gpu -): - env, mdnrnn_trainer, _, _, _ = mdnrnn_gym(mdnrnn_params) - embed_rl_dataset = RLDataset("/tmp/rl.pkl") - create_embed_rl_dataset( - env, mdnrnn_trainer, embed_rl_dataset, use_gpu, mdnrnn_params.run_details - ) - return env, mdnrnn_trainer, embed_rl_dataset - - -def run_gym( - params: OpenAiGymParameters, - score_bar, - embed_rl_dataset: RLDataset, - gym_env: Env, - mdnrnn: MemoryNetwork, - max_embed_seq_len: int, -): - assert params.rl is not None - rl_parameters = params.rl - - env_type = params.env - model_type = params.model_type - epsilon, epsilon_decay, minimum_epsilon = create_epsilon( - offline_train=True, rl_parameters=rl_parameters, params=params - ) - - replay_buffer = OpenAIGymMemoryPool(params.max_replay_memory_size) - for row in embed_rl_dataset.rows: - replay_buffer.insert_into_memory(**row) - - assert replay_buffer.memory_buffer is not None - state_mem = replay_buffer.memory_buffer.state - state_min_value = torch.min(state_mem).item() - state_max_value = torch.max(state_mem).item() - state_embed_env = StateEmbedGymEnvironment( - gym_env, mdnrnn, max_embed_seq_len, state_min_value, state_max_value - ) - open_ai_env = OpenAIGymEnvironment( - state_embed_env, - epsilon, - rl_parameters.softmax_policy, - rl_parameters.gamma, - epsilon_decay, - minimum_epsilon, - ) - rl_trainer = create_trainer(params, open_ai_env) - rl_predictor = create_predictor( - rl_trainer, model_type, params.use_gpu, open_ai_env.action_dim - ) - - assert ( - params.run_details.max_steps is not None - and params.run_details.offline_train_epochs is not None - ), "Missing data required for offline training: {}".format(str(params.run_details)) - return train_gym_offline_rl( - gym_env=open_ai_env, - replay_buffer=replay_buffer, - model_type=model_type, - trainer=rl_trainer, - predictor=rl_predictor, - test_run_name="{} offline rl state embed".format(env_type), - score_bar=score_bar, - max_steps=params.run_details.max_steps, - avg_over_num_episodes=params.run_details.avg_over_num_episodes, - offline_train_epochs=params.run_details.offline_train_epochs, - num_batch_per_epoch=None, - ) - - -if __name__ == "__main__": - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - logging.getLogger().setLevel(logging.INFO) - args = sys.argv - main(args[1:]) diff --git a/reagent/test/gym/world_model/test_mdnrnn_gym.py b/reagent/test/gym/world_model/test_mdnrnn_gym.py deleted file mode 100644 index 4613ec0ec..000000000 --- a/reagent/test/gym/world_model/test_mdnrnn_gym.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. - -import json -import logging -import os -import random -import unittest -from typing import Dict, List - -import numpy as np -import torch -from reagent.json_serialize import json_to_object -from reagent.test.gym.run_gym import OpenAiGymParameters -from reagent.test.gym.world_model.mdnrnn_gym import mdnrnn_gym - - -logger = logging.getLogger(__name__) - -curr_dir = os.path.dirname(__file__) -MDNRNN_CARTPOLE_JSON = os.path.join(curr_dir, "../../configs/mdnrnn_cartpole_v0.json") - - -class TestMDNRNNGym(unittest.TestCase): - def setUp(self): - logging.getLogger().setLevel(logging.DEBUG) - np.random.seed(0) - torch.manual_seed(0) - random.seed(0) - - @staticmethod - def verify_result(result_dict: Dict[str, float], expected_top_features: List[str]): - top_feature = max(result_dict, key=result_dict.get) - assert ( - top_feature in expected_top_features - ), f"top_feature: {top_feature}, expected_top_features: {expected_top_features}" - - def test_mdnrnn_cartpole(self): - with open(MDNRNN_CARTPOLE_JSON, "r") as f: - params = json_to_object(f.read(), OpenAiGymParameters) - _, _, feature_importance_map, feature_sensitivity_map, _ = self._test_mdnrnn( - params, feature_importance=True, feature_sensitivity=True - ) - self.verify_result(feature_importance_map, ["state1", "state3", "action1"]) - self.verify_result(feature_sensitivity_map, ["state1", "state3"]) - - @unittest.skipIf(not torch.cuda.is_available(), "CUDA not available") - def test_mdnrnn_cartpole_gpu(self): - with open(MDNRNN_CARTPOLE_JSON, "r") as f: - params = json_to_object(f.read(), OpenAiGymParameters) - _, _, feature_importance_map, feature_sensitivity_map, _ = self._test_mdnrnn( - params, use_gpu=True, feature_importance=True, feature_sensitivity=True - ) - self.verify_result(feature_importance_map, ["state1", "state3"]) - self.verify_result(feature_sensitivity_map, ["state1", "state3"]) - - def _test_mdnrnn( - self, - params: OpenAiGymParameters, - use_gpu=False, - feature_importance=False, - feature_sensitivity=False, - ): - return mdnrnn_gym(params, feature_importance, feature_sensitivity, seed=0) diff --git a/reagent/test/gym/world_model/test_state_embed_gym.py b/reagent/test/gym/world_model/test_state_embed_gym.py deleted file mode 100644 index 963756734..000000000 --- a/reagent/test/gym/world_model/test_state_embed_gym.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. - -import json -import logging -import random -import unittest -from typing import List - -import numpy as np -import torch -from reagent.json_serialize import json_to_object -from reagent.test.base.horizon_test_base import HorizonTestBase -from reagent.test.gym.run_gym import OpenAiGymParameters -from reagent.test.gym.world_model.state_embed_gym import ( - create_mdnrnn_trainer_and_embed_dataset, - run_gym, -) - - -logger = logging.getLogger(__name__) - -MDNRNN_STRING_GAME_JSON = "reagent/test/configs/mdnrnn_string_game_v0.json" -DQN_STRING_GAME_JSON = "reagent/test/configs/discrete_dqn_string_game_v0.json" - - -class TestStateEmbedGym(HorizonTestBase): - def setUp(self): - logging.getLogger().setLevel(logging.INFO) - torch.manual_seed(0) - np.random.seed(0) - random.seed(0) - super().setUp() - - @staticmethod - def verify_result(reward_history: List[float], expected_reward: float): - assert reward_history[-1] >= expected_reward - - @unittest.skipIf(not torch.cuda.is_available(), "CUDA not available") - def test_string_game_gpu(self): - with open(MDNRNN_STRING_GAME_JSON, "r") as f: - mdnrnn_params = json_to_object(f.read(), OpenAiGymParameters) - mdnrnn_params = mdnrnn_params._replace(use_gpu=True) - with open(DQN_STRING_GAME_JSON, "r") as f: - rl_params = json_to_object(f.read(), OpenAiGymParameters) - rl_params = rl_params._replace(use_gpu=True) - avg_reward_history = self._test_state_embed(mdnrnn_params, rl_params) - self.verify_result(avg_reward_history, 10) - - @staticmethod - def _test_state_embed( - mdnrnn_params: OpenAiGymParameters, rl_params: OpenAiGymParameters - ): - env, mdnrnn_trainer, embed_rl_dataset = create_mdnrnn_trainer_and_embed_dataset( - mdnrnn_params, rl_params.use_gpu - ) - max_embed_seq_len = mdnrnn_params.run_details.seq_len - avg_reward_history, _, _, _, _ = run_gym( - rl_params, - None, # score bar - embed_rl_dataset, - env.env, - mdnrnn_trainer.mdnrnn, - max_embed_seq_len, - ) - return avg_reward_history diff --git a/reagent/test/world_model/test_mdnrnn.py b/reagent/test/world_model/test_mdnrnn.py index d0a40562c..4705dc872 100644 --- a/reagent/test/world_model/test_mdnrnn.py +++ b/reagent/test/world_model/test_mdnrnn.py @@ -8,7 +8,7 @@ import torch from reagent.models.mdn_rnn import MDNRNNMemoryPool, gmm_loss from reagent.models.world_model import MemoryNetwork -from reagent.parameters import MDNRNNParameters +from reagent.parameters import MDNRNNTrainerParameters from reagent.test.world_model.simulated_world_model import SimulatedWorldModel from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer from torch.distributions.categorical import Categorical @@ -53,10 +53,10 @@ def test_gmm_loss(self): logger.info( "gmm loss={}, p1={}, p2={}, p1+p2={}, -log(p1+p2)={}".format( - gl, p1, p2, p1 + p2, -torch.log(p1 + p2) + gl, p1, p2, p1 + p2, -(torch.log(p1 + p2)) ) ) - assert -torch.log(p1 + p2) == gl + assert -(torch.log(p1 + p2)) == gl def test_mdnrnn_simulate_world_cpu(self): self._test_mdnrnn_simulate_world() @@ -128,7 +128,7 @@ def _test_mdnrnn_simulate_world(self, use_gpu=False): ) num_batch = num_episodes // batch_size - mdnrnn_params = MDNRNNParameters( + mdnrnn_params = MDNRNNTrainerParameters( hidden_size=mdnrnn_num_hiddens, num_hidden_layers=mdnrnn_num_hidden_layers, minibatch_size=batch_size, @@ -145,15 +145,15 @@ def _test_mdnrnn_simulate_world(self, use_gpu=False): if use_gpu: mdnrnn_net = mdnrnn_net.cuda() trainer = MDNRNNTrainer( - mdnrnn_network=mdnrnn_net, params=mdnrnn_params, cum_loss_hist=num_batch + memory_network=mdnrnn_net, params=mdnrnn_params, cum_loss_hist=num_batch ) for e in range(num_epochs): for i in range(num_batch): training_batch = replay_buffer.sample_memories( - batch_size, use_gpu=use_gpu, batch_first=True + batch_size, use_gpu=use_gpu ) - losses = trainer.train(training_batch, batch_first=True) + losses = trainer.train(training_batch) logger.info( "{}-th epoch, {}-th minibatch: \n" "loss={}, bce={}, gmm={}, mse={} \n" @@ -179,4 +179,4 @@ def _test_mdnrnn_simulate_world(self, use_gpu=False): ): return - assert False, "losses not reduced significantly during training" + raise RuntimeError("losses not reduced significantly during training") diff --git a/reagent/training/__init__.py b/reagent/training/__init__.py index 122561872..2c71049b7 100644 --- a/reagent/training/__init__.py +++ b/reagent/training/__init__.py @@ -1,15 +1,27 @@ #!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. -from .dqn_trainer import DQNTrainer -from .parametric_dqn_trainer import ParametricDQNTrainer +from .c51_trainer import C51Trainer, C51TrainerParameters +from .cem_trainer import CEMTrainer +from .dqn_trainer import DQNTrainer, DQNTrainerParameters +from .parametric_dqn_trainer import ParametricDQNTrainer, ParametricDQNTrainerParameters +from .qrdqn_trainer import QRDQNTrainer, QRDQNTrainerParameters from .sac_trainer import SACTrainer, SACTrainerParameters from .td3_trainer import TD3Trainer, TD3TrainingParameters +from .world_model.mdnrnn_trainer import MDNRNNTrainer __all__ = [ + "C51Trainer", + "C51TrainerParameters", + "CEMTrainer", "DQNTrainer", + "DQNTrainerParameters", + "MDNRNNTrainer", "ParametricDQNTrainer", + "ParametricDQNTrainerParameters", + "QRDQNTrainer", + "QRDQNTrainerParameters", "SACTrainer", "SACTrainerParameters", "TD3Trainer", diff --git a/reagent/training/cem_trainer.py b/reagent/training/cem_trainer.py index 69b89e8f8..0cc0e452c 100644 --- a/reagent/training/cem_trainer.py +++ b/reagent/training/cem_trainer.py @@ -15,11 +15,10 @@ import reagent.types as rlt import torch from reagent.models.cem_planner import CEMPlannerNetwork -from reagent.parameters import CEMParameters from reagent.training.rl_trainer_pytorch import RLTrainer from reagent.training.training_data_page import TrainingDataPage from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer - +from reagent.parameters import CEMTrainerParameters logger = logging.getLogger(__name__) @@ -29,7 +28,7 @@ def __init__( self, cem_planner_network: CEMPlannerNetwork, world_model_trainers: List[MDNRNNTrainer], - parameters: CEMParameters, + parameters: CEMTrainerParameters, use_gpu: bool = False, ) -> None: super().__init__(parameters.rl, use_gpu=use_gpu) diff --git a/reagent/training/world_model/mdnrnn_trainer.py b/reagent/training/world_model/mdnrnn_trainer.py index 03e5c1967..015972fed 100644 --- a/reagent/training/world_model/mdnrnn_trainer.py +++ b/reagent/training/world_model/mdnrnn_trainer.py @@ -8,9 +8,9 @@ import reagent.types as rlt import torch import torch.nn.functional as F -from reagent.models.mdn_rnn import gmm_loss, transpose +from reagent.models.mdn_rnn import gmm_loss from reagent.models.world_model import MemoryNetwork -from reagent.parameters import MDNRNNParameters +from reagent.parameters import MDNRNNTrainerParameters logger = logging.getLogger(__name__) @@ -21,14 +21,14 @@ class MDNRNNTrainer: def __init__( self, - mdnrnn_network: MemoryNetwork, - params: MDNRNNParameters, + memory_network: MemoryNetwork, + params: MDNRNNTrainerParameters, cum_loss_hist: int = 100, ): - self.mdnrnn = mdnrnn_network + self.memory_network = memory_network self.params = params self.optimizer = torch.optim.Adam( - self.mdnrnn.mdnrnn.parameters(), lr=params.learning_rate + self.memory_network.mdnrnn.parameters(), lr=params.learning_rate ) self.minibatch = 0 self.cum_loss: Deque[float] = deque([], maxlen=cum_loss_hist) @@ -36,39 +36,18 @@ def __init__( self.cum_gmm: Deque[float] = deque([], maxlen=cum_loss_hist) self.cum_mse: Deque[float] = deque([], maxlen=cum_loss_hist) - def train( - self, training_batch: rlt.PreprocessedTrainingBatch, batch_first: bool = False - ): - assert ( - type(training_batch) is rlt.PreprocessedTrainingBatch - and type(training_batch.training_input) - is rlt.PreprocessedMemoryNetworkInput - ), "{} {}".format( - str(type(training_batch)), str(type(training_batch.training_input)) - ) - + def train(self, training_batch: rlt.PreprocessedMemoryNetworkInput): self.minibatch += 1 - if batch_first: - batch_size, seq_len, state_dim = ( - training_batch.training_input.state.float_features.shape - ) - else: - seq_len, batch_size, state_dim = ( - training_batch.training_input.state.float_features.shape - ) - self.mdnrnn.mdnrnn.train() + (seq_len, batch_size, state_dim) = training_batch.state.float_features.shape + + self.memory_network.mdnrnn.train() self.optimizer.zero_grad() - losses = self.get_loss(training_batch, state_dim, batch_first) + losses = self.get_loss(training_batch, state_dim) losses["loss"].backward() self.optimizer.step() - detached_losses = { - "loss": losses["loss"].cpu().detach().item(), - "gmm": losses["gmm"].cpu().detach().item(), - "bce": losses["bce"].cpu().detach().item(), - "mse": losses["mse"].cpu().detach().item(), - } + detached_losses = {k: loss.cpu().detach().item() for k, loss in losses.items()} self.cum_loss.append(detached_losses["loss"]) self.cum_gmm.append(detached_losses["gmm"]) self.cum_bce.append(detached_losses["bce"]) @@ -79,9 +58,8 @@ def train( def get_loss( self, - training_batch: rlt.PreprocessedTrainingBatch, + training_batch: rlt.PreprocessedMemoryNetworkInput, state_dim: Optional[int] = None, - batch_first: bool = False, ): """ Compute losses: @@ -95,52 +73,27 @@ def get_loss( dimensions). :param training_batch: - training_batch.learning_input has these fields: - - state: (BATCH_SIZE, SEQ_LEN, STATE_DIM) torch tensor - - action: (BATCH_SIZE, SEQ_LEN, ACTION_DIM) torch tensor - - reward: (BATCH_SIZE, SEQ_LEN) torch tensor - - not-terminal: (BATCH_SIZE, SEQ_LEN) torch tensor - - next_state: (BATCH_SIZE, SEQ_LEN, STATE_DIM) torch tensor - the first two dimensions may be swapped depending on batch_first + training_batch has these fields: + - state: (SEQ_LEN, BATCH_SIZE, STATE_DIM) torch tensor + - action: (SEQ_LEN, BATCH_SIZE, ACTION_DIM) torch tensor + - reward: (SEQ_LEN, BATCH_SIZE) torch tensor + - not-terminal: (SEQ_LEN, BATCH_SIZE) torch tensor + - next_state: (SEQ_LEN, BATCH_SIZE, STATE_DIM) torch tensor :param state_dim: the dimension of states. If provided, use it to normalize gmm loss - :param batch_first: whether data's first dimension represents batch size. If - FALSE, state, action, reward, not-terminal, and next_state's first - two dimensions are SEQ_LEN and BATCH_SIZE. - :returns: dictionary of losses, containing the gmm, the mse, the bce and the averaged loss. """ - learning_input = training_batch.training_input - assert isinstance(learning_input, rlt.PreprocessedMemoryNetworkInput) + assert isinstance(training_batch, rlt.PreprocessedMemoryNetworkInput) # mdnrnn's input should have seq_len as the first dimension - if batch_first: - state, action, next_state, reward, not_terminal = transpose( - learning_input.state.float_features, - learning_input.action, - learning_input.next_state.float_features, - learning_input.reward, - learning_input.not_terminal, # type: ignore - ) - learning_input = rlt.PreprocessedMemoryNetworkInput( # type: ignore - state=rlt.PreprocessedFeatureVector(float_features=state), - reward=reward, - time_diff=torch.ones_like(reward).float(), - action=action, - not_terminal=not_terminal, - next_state=rlt.PreprocessedFeatureVector(float_features=next_state), - step=None, - ) - mdnrnn_input = rlt.PreprocessedStateAction( - state=learning_input.state, # type: ignore - action=rlt.PreprocessedFeatureVector( - float_features=learning_input.action - ), # type: ignore + mdnrnn_input = rlt.PreprocessedStateAction.from_tensors( + training_batch.state.float_features, training_batch.action ) - mdnrnn_output = self.mdnrnn(mdnrnn_input) + mdnrnn_output = self.memory_network(mdnrnn_input) + # mus, sigmas: [seq_len, batch_size, num_gaussian, state_dim] mus, sigmas, logpi, rs, nts = ( mdnrnn_output.mus, mdnrnn_output.sigmas, @@ -149,9 +102,9 @@ def get_loss( mdnrnn_output.not_terminal, ) - next_state = learning_input.next_state.float_features - not_terminal = learning_input.not_terminal # type: ignore - reward = learning_input.reward + next_state = training_batch.next_state.float_features + not_terminal = training_batch.not_terminal + reward = training_batch.reward if self.params.fit_only_one_next_step: next_state, not_terminal, reward, mus, sigmas, logpi, nts, rs = tuple( map( diff --git a/reagent/types.py b/reagent/types.py index 5a2cfc382..a8c1b72f4 100644 --- a/reagent/types.py +++ b/reagent/types.py @@ -494,7 +494,41 @@ def batch_size(self): @dataclass class PreprocessedMemoryNetworkInput(PreprocessedBaseInput): - action: Union[torch.Tensor, torch.Tensor] + action: torch.Tensor + + @classmethod + def from_replay_buffer(cls, batch): + # RB's state is (batch_size, state_dim, stack_size) whereas + # we want (stack_size, batch_size, state_dim) + # for scalar fields like reward and terminal, + # RB returns (batch_size, stack_size), where as + # we want (stack_size, batch_size) + # Also convert action to float + + if len(batch.state.shape) == 2: + # this is stack_size = 1 (i.e. we squeezed in RB) + state = batch.state.unsqueeze(2) + next_state = batch.next_state.unsqueeze(2) + else: + # shapes should be + state = batch.state + next_state = batch.next_state + # Now shapes should be (batch_size, state_dim, stack_size) + # Turn shapes into (stack_size, batch_size, feature_dim) where + # feature \in {state, action}; also, make action a float + permutation = [2, 0, 1] + not_terminal = 1.0 - batch.terminal.transpose(0, 1).float() + return cls( + state=PreprocessedFeatureVector(float_features=state.permute(permutation)), + next_state=PreprocessedFeatureVector( + float_features=next_state.permute(permutation) + ), + action=batch.action.permute(permutation).float(), + reward=batch.reward.transpose(0, 1), + not_terminal=not_terminal, + step=None, + time_diff=None, + ) @dataclass diff --git a/reagent/workflow/gym_batch_rl.py b/reagent/workflow/gym_batch_rl.py index 356284e05..a44f838d2 100644 --- a/reagent/workflow/gym_batch_rl.py +++ b/reagent/workflow/gym_batch_rl.py @@ -12,7 +12,7 @@ from reagent.gym.agents.agent import Agent from reagent.gym.agents.post_step import log_data_post_step from reagent.gym.envs.env_factory import EnvFactory -from reagent.gym.policies import ContinuousRandomPolicy, DiscreteRandomPolicy +from reagent.gym.policies.random_policies import make_random_policy_for_env from reagent.gym.runners.gymrunner import run_episode from reagent.prediction.dqn_torch_predictor import ( ActorTorchPredictor, @@ -41,17 +41,6 @@ def initialize_seed(seed: Optional[int] = None): torch.manual_seed(seed) -def make_random_policy_for_env(env: gym.Env): - if isinstance(env.action_space, gym.spaces.Discrete): - # discrete action space - return DiscreteRandomPolicy.create_for_env(env) - elif isinstance(env.action_space, gym.spaces.Box): - # continuous action space - return ContinuousRandomPolicy.create_for_env(env) - else: - raise NotImplementedError(f"{env.action_space} not supported") - - def offline_gym( env: str, pkl_path: str, diff --git a/reagent/workflow/model_managers/discrete/discrete_dqn.py b/reagent/workflow/model_managers/discrete/discrete_dqn.py index e36ab2ac4..d5efae4e2 100644 --- a/reagent/workflow/model_managers/discrete/discrete_dqn.py +++ b/reagent/workflow/model_managers/discrete/discrete_dqn.py @@ -33,7 +33,7 @@ class DiscreteDQN(DiscreteDQNBase): # note that only DiscreteDQN and QRDQN call RLTrainer._initialize_cpe, # so maybe can be removed from the RLTrainer class. - def __post_init_post_parse__(self,): + def __post_init_post_parse__(self): super().__post_init_post_parse__() self.rl_parameters = self.trainer_param.rl self.eval_parameters = self.trainer_param.evaluation diff --git a/reagent/workflow/model_managers/discrete_dqn_base.py b/reagent/workflow/model_managers/discrete_dqn_base.py index 667673e10..c89401099 100644 --- a/reagent/workflow/model_managers/discrete_dqn_base.py +++ b/reagent/workflow/model_managers/discrete_dqn_base.py @@ -73,18 +73,22 @@ def should_generate_eval_dataset(self) -> bool: def create_policy(self, serving: bool) -> Policy: """ Create an online DiscreteDQN Policy from env. """ - from reagent.gym.policies.samplers.discrete_sampler import SoftmaxActionSampler + from reagent.gym.policies.samplers.discrete_sampler import ( + SoftmaxActionSampler, + GreedyActionSampler, + ) from reagent.gym.policies.scorers.discrete_scorer import ( discrete_dqn_scorer, discrete_dqn_serving_scorer, ) - sampler = SoftmaxActionSampler(temperature=self.rl_parameters.temperature) if serving: + sampler = GreedyActionSampler() scorer = discrete_dqn_serving_scorer( DiscreteDqnPredictorUnwrapper(self.build_serving_module()) ) else: + sampler = SoftmaxActionSampler(temperature=self.rl_parameters.temperature) scorer = discrete_dqn_scorer(self.trainer.q_network) return Policy(scorer=scorer, sampler=sampler) diff --git a/reagent/workflow/model_managers/model_based/__init__.py b/reagent/workflow/model_managers/model_based/__init__.py new file mode 100644 index 000000000..70a4636b6 --- /dev/null +++ b/reagent/workflow/model_managers/model_based/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. + +from .world_model import WorldModel + + +__all__ = ["WorldModel"] diff --git a/reagent/workflow/model_managers/model_based/world_model.py b/reagent/workflow/model_managers/model_based/world_model.py new file mode 100644 index 000000000..ff1532458 --- /dev/null +++ b/reagent/workflow/model_managers/model_based/world_model.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 + +import logging + +import torch # @manual +from reagent.core.dataclasses import dataclass, field +from reagent.models.world_model import MemoryNetwork +from reagent.parameters import MDNRNNTrainerParameters, NormalizationKey, param_hash +from reagent.preprocessing.normalization import get_num_output_features +from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer +from reagent.workflow.model_managers.world_model_base import WorldModelBase + + +logger = logging.getLogger(__name__) + + +@dataclass +class WorldModel(WorldModelBase): + __hash__ = param_hash + + trainer_param: MDNRNNTrainerParameters = field( + default_factory=MDNRNNTrainerParameters + ) + + def __post_init_post_parse__(self): + super().__post_init_post_parse__() + + def build_trainer(self) -> MDNRNNTrainer: + memory_network = MemoryNetwork( + state_dim=get_num_output_features( + self.get_normalization_data(NormalizationKey.STATE) + ), + action_dim=get_num_output_features( + self.get_normalization_data(NormalizationKey.ACTION) + ), + num_hiddens=self.trainer_param.hidden_size, + num_hidden_layers=self.trainer_param.num_hidden_layers, + num_gaussians=self.trainer_param.num_gaussians, + ) + if self.use_gpu: + memory_network = memory_network.cuda() + + return MDNRNNTrainer(memory_network=memory_network, params=self.trainer_param) + + def build_serving_module(self) -> torch.nn.Module: + """ + Returns a TorchScript predictor module + """ + raise NotImplementedError() diff --git a/reagent/workflow/model_managers/union.py b/reagent/workflow/model_managers/union.py index a54a852fe..87b95f4b9 100644 --- a/reagent/workflow/model_managers/union.py +++ b/reagent/workflow/model_managers/union.py @@ -8,6 +8,7 @@ from .actor_critic import * # noqa from .discrete import * # noqa +from .model_based import * # noqa @ModelManager.fill_union() diff --git a/reagent/workflow/model_managers/world_model_base.py b/reagent/workflow/model_managers/world_model_base.py new file mode 100644 index 000000000..2c669f1ea --- /dev/null +++ b/reagent/workflow/model_managers/world_model_base.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +import logging +from typing import Dict, Optional, Tuple + +from reagent.core.dataclasses import dataclass +from reagent.gym.policies.policy import Policy +from reagent.parameters import NormalizationData, NormalizationKey +from reagent.preprocessing.batch_preprocessor import BatchPreprocessor +from reagent.workflow.model_managers.model_manager import ModelManager +from reagent.workflow.types import Dataset, RewardOptions, RLTrainingOutput, TableSpec + + +logger = logging.getLogger(__name__) + + +@dataclass +class WorldModelBase(ModelManager): + def __post_init_post_parse__(self): + super().__init__() + + @classmethod + def normalization_key(cls) -> str: + raise NotImplementedError() + + def create_policy(self) -> Policy: + """ Create a WorldModel Policy from env. """ + raise NotImplementedError() + + @property + def should_generate_eval_dataset(self) -> bool: + return False + + def _set_normalization_parameters( + self, normalization_data_map: Dict[str, NormalizationData] + ): + """ + Set normalization parameters on current instance + """ + state_norm_data = normalization_data_map.get(NormalizationKey.STATE, None) + assert state_norm_data is not None + assert state_norm_data.dense_normalization_parameters is not None + action_norm_data = normalization_data_map.get(NormalizationKey.ACTION, None) + assert action_norm_data is not None + assert action_norm_data.dense_normalization_parameters is not None + self.set_normalization_data_map(normalization_data_map) + + def run_feature_identification( + self, input_table_spec: TableSpec + ) -> Dict[str, NormalizationData]: + raise NotImplementedError() + + def query_data( + self, + input_table_spec: TableSpec, + sample_range: Optional[Tuple[float, float]], + reward_options: RewardOptions, + ) -> Dataset: + raise NotImplementedError() + + def build_batch_preprocessor(self) -> BatchPreprocessor: + raise NotImplementedError() + + def train( + self, train_dataset: Dataset, eval_dataset: Optional[Dataset], num_epochs: int + ) -> RLTrainingOutput: + """ + Train the model + + Returns partially filled RLTrainingOutput. The field that should not be filled + are: + - output_path + - warmstart_output_path + - vis_metrics + - validation_output + """ + raise NotImplementedError() diff --git a/reagent/workflow_utils/transitional.py b/reagent/workflow_utils/transitional.py index 9d5b02ce3..63daaa0b4 100644 --- a/reagent/workflow_utils/transitional.py +++ b/reagent/workflow_utils/transitional.py @@ -4,7 +4,6 @@ from typing import Dict import torch -from reagent.models.actor import FullyConnectedActor from reagent.models.categorical_dqn import CategoricalDQN from reagent.models.cem_planner import CEMPlannerNetwork from reagent.models.dqn import FullyConnectedDQN @@ -13,10 +12,10 @@ from reagent.models.quantile_dqn import QuantileDQN from reagent.models.world_model import MemoryNetwork from reagent.parameters import ( - CEMParameters, + CEMTrainerParameters, ContinuousActionModelParameters, DiscreteActionModelParameters, - MDNRNNParameters, + MDNRNNTrainerParameters, OptimizerParameters, ) from reagent.preprocessing.normalization import ( @@ -24,15 +23,18 @@ get_num_output_features, ) from reagent.test.gym.open_ai_gym_environment import EnvType, OpenAIGymEnvironment -from reagent.training.c51_trainer import C51Trainer, C51TrainerParameters -from reagent.training.cem_trainer import CEMTrainer -from reagent.training.dqn_trainer import DQNTrainer, DQNTrainerParameters -from reagent.training.parametric_dqn_trainer import ( +from reagent.training import ( + C51Trainer, + C51TrainerParameters, + CEMTrainer, + DQNTrainer, + DQNTrainerParameters, + MDNRNNTrainer, ParametricDQNTrainer, ParametricDQNTrainerParameters, + QRDQNTrainer, + QRDQNTrainerParameters, ) -from reagent.training.qrdqn_trainer import QRDQNTrainer, QRDQNTrainerParameters -from reagent.training.world_model.mdnrnn_trainer import MDNRNNTrainer def create_dqn_trainer_from_params( @@ -222,14 +224,14 @@ def create_parametric_dqn_trainer_from_params( def get_cem_trainer( - env: OpenAIGymEnvironment, params: CEMParameters, use_gpu: bool + env: OpenAIGymEnvironment, params: CEMTrainerParameters, use_gpu: bool ) -> CEMTrainer: num_world_models = params.num_world_models world_model_trainers = [ create_world_model_trainer(env, params.mdnrnn, use_gpu) for _ in range(num_world_models) ] - world_model_nets = [trainer.mdnrnn for trainer in world_model_trainers] + world_model_nets = [trainer.memory_network for trainer in world_model_trainers] discrete_action = env.action_type == EnvType.DISCRETE_ACTION terminal_effective = params.mdnrnn.not_terminal_loss_weight > 0 action_upper_bounds, action_lower_bounds = None, None @@ -261,9 +263,9 @@ def get_cem_trainer( def create_world_model_trainer( - env: OpenAIGymEnvironment, mdnrnn_params: MDNRNNParameters, use_gpu: bool + env: OpenAIGymEnvironment, mdnrnn_params: MDNRNNTrainerParameters, use_gpu: bool ) -> MDNRNNTrainer: - mdnrnn_net = MemoryNetwork( + memory_network = MemoryNetwork( state_dim=env.state_dim, action_dim=env.action_dim, num_hiddens=mdnrnn_params.hidden_size, @@ -271,6 +273,6 @@ def create_world_model_trainer( num_gaussians=mdnrnn_params.num_gaussians, ) if use_gpu: - mdnrnn_net = mdnrnn_net.cuda() - mdnrnn_trainer = MDNRNNTrainer(mdnrnn_network=mdnrnn_net, params=mdnrnn_params) + memory_network = memory_network.cuda() + mdnrnn_trainer = MDNRNNTrainer(memory_network=memory_network, params=mdnrnn_params) return mdnrnn_trainer diff --git a/requirements.txt b/requirements.txt index 820f1805f..367975796 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ numpy==1.17.2 pandas==0.25.0 pydantic==1.4 torch +tqdm==4.46.0 petastorm==0.9.0 parameterized==0.7.4 pyspark==2.4.5 diff --git a/setup.cfg b/setup.cfg index 6ebe09120..192ef7c2a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,6 +16,7 @@ install_requires = pandas>=1.0.3 pydantic>=1.4 torch + tqdm>=4.46.0 petastorm>=0.9.0 parameterized>=0.7.4 pyspark>=2.4.5