Skip to content

Commit

Permalink
Adding secondary metrics to gym env (#397)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #397

Differential Revision: D26526445

fbshipit-source-id: 17517a64c921b6d506816401abd3d43b17e8a9ed
  • Loading branch information
kittipatv authored and facebook-github-bot committed Feb 19, 2021
1 parent 4fe8f05 commit f65b583
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ which performs the following pseudo-code
# run Agent on environment, and record rewards
rewards = evaluate_for_n_episodes(
n=num_eval_episodes, env=env, agent=agent, max_steps=max_steps
)
).rewards
Even on completely random data, DQN can learn a policy that obtains scores close to the maximum possible score of 200!
Expand Down
8 changes: 8 additions & 0 deletions reagent/gym/envs/env_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from gym import spaces
from reagent.core.dataclasses import dataclass
from reagent.core.registry_meta import RegistryMeta
from reagent.gym.types import MetricExtractor
from reagent.parameters import CONTINUOUS_TRAINING_ACTION_RANGE
from reagent.training.utils import rescale_actions

Expand Down Expand Up @@ -139,3 +140,10 @@ def max_steps(self) -> Optional[int]:
@property
def possible_actions_mask(self) -> Optional[np.ndarray]:
return getattr(self.env, "possible_actions_mask", None)

def get_metric_extractor(self) -> Optional[MetricExtractor]:
"""
If the environment provides more scalar metrics than reward,
returns a function to extract those metrics from Trajectory
"""
return None
58 changes: 58 additions & 0 deletions reagent/gym/envs/recsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.

import logging
from typing import List

import gym
import numpy as np
Expand All @@ -10,6 +11,7 @@
from reagent.gym.envs.env_wrapper import EnvWrapper
from reagent.gym.envs.wrappers.recsim import ValueWrapper
from reagent.gym.preprocessors.default_preprocessors import RecsimObsPreprocessor
from reagent.gym.types import MetricExtractor, Trajectory, Transition
from recsim import choice_model, utils
from recsim.environments import interest_evolution, interest_exploration
from recsim.simulator import environment, recsim_gym
Expand Down Expand Up @@ -84,6 +86,62 @@ def step(self, action):
state["user"] = np.copy(state["user"])
return state, r, t, i

def get_metric_extractor(self):
return RecSimMetricExtractor(self)


class RecSimMetricExtractor(MetricExtractor):
def __init__(self, env: RecSim):
super().__init__()
obs_space = env.observation_space
assert isinstance(obs_space, gym.spaces.Dict)

# RecSim envs should have "response" key, and it should be dict
response_space = obs_space["response"][0]
assert isinstance(response_space, gym.spaces.Dict)
self._discrete_metrics = []
self._box_metrics = []
for k, v in response_space.spaces.items():
if isinstance(v, gym.spaces.Discrete):
self._discrete_metrics.append(k)
elif isinstance(v, gym.spaces.Box):
if not (len(v.shape) == 0 or (len(v.shape) == 1 and v.shape[0] == 1)):
raise NotImplementedError(
f"Expecting scalar values here; got shape {v.shape}"
)
self._box_metrics.append(k)
else:
raise NotImplementedError

self._metric_names = self._discrete_metrics + self._box_metrics

@property
def metric_names(self) -> List[str]:
return self._metric_names

def __call__(self, trajectory: Trajectory) -> np.ndarray:
transition_metrics = np.stack(
[
self._get_metric_from_transition(transition)
for transition in trajectory.transitions
]
)
return np.sum(transition_metrics, axis=0)

def _get_metric_from_transition(self, transition: Transition) -> np.ndarray:
obs = transition.observation
# Response is a list of item responses or None (the first observation doesn't return response)
response = obs["response"] or []
metrics = np.zeros(len(self._metric_names))

for item_response in response:
for i, k in enumerate(self._discrete_metrics):
metrics[i] += item_response[k]
for i, k in enumerate(self._box_metrics):
metrics[len(self._discrete_metrics) + i] += item_response[k]

return metrics


class MulticlickIEvUserModel(interest_evolution.IEvUserModel):
def simulate_response(self, documents):
Expand Down
42 changes: 30 additions & 12 deletions reagent/gym/runners/gymrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import logging
import pickle
from typing import Optional, Sequence
from typing import Optional, Sequence, Tuple

import numpy as np
import torch.multiprocessing as mp
Expand All @@ -13,7 +13,7 @@
)
from reagent.gym.agents.agent import Agent
from reagent.gym.envs import EnvWrapper
from reagent.gym.types import Trajectory, Transition
from reagent.gym.types import Trajectory, Transition, EvaluationResults
from reagent.tensorboardX import SummaryWriterContext


Expand Down Expand Up @@ -68,33 +68,41 @@ def evaluate_for_n_episodes(
max_steps: Optional[int] = None,
gammas: Sequence[float] = (1.0,),
num_processes: int = 4,
) -> np.ndarray:
) -> EvaluationResults:
"""Return an np array A of shape n x len(gammas)
where A[i, j] = ith episode evaluated with gamma=gammas[j].
Runs environments on num_processes, via multiprocessing.Pool.
"""
num_processes = min(num_processes, n)

metric_extractor = env.get_metric_extractor()

def evaluate_one_episode(
mdp_id: int,
env: EnvWrapper,
agent: Agent,
max_steps: Optional[int],
gammas: Sequence[float],
) -> np.ndarray:
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
rewards = np.empty((len(gammas),))
trajectory = run_episode(
env=env, agent=agent, mdp_id=mdp_id, max_steps=max_steps
)
for i_gamma, gamma in enumerate(gammas):
rewards[i_gamma] = trajectory.calculate_cumulative_reward(gamma)
return rewards

rewards = None
metrics = None

if metric_extractor is not None:
metrics = metric_extractor(trajectory)

return rewards, metrics

eval_results = None
if num_processes > 1:
try:
with mp.Pool(num_processes) as pool:
rewards = unwrap_function_outputs(
eval_results = unwrap_function_outputs(
pool.map(
wrap_function_arguments(
evaluate_one_episode,
Expand All @@ -116,20 +124,30 @@ def evaluate_one_episode(
)

# if we didn't run multiprocessing, or it failed, try single-processing instead.
if rewards is None:
rewards = []
if eval_results is None:
eval_results = []
for i in range(n):
rewards.append(
eval_results.append(
evaluate_one_episode(
mdp_id=i, env=env, agent=agent, max_steps=max_steps, gammas=gammas
)
)

rewards = np.array(rewards)
rewards = np.array([r[0] for r in eval_results])
for i, gamma in enumerate(gammas):
gamma_rewards = rewards[:, i]
logger.info(
f"For gamma={gamma}, average reward is {gamma_rewards.mean()}\n"
f"Rewards list: {gamma_rewards}"
)
return rewards

metrics = None
metric_names = None
if metric_extractor is not None:
metrics = np.stack([r[1] for r in eval_results])
metric_names = metric_extractor.metric_names

# FIXME: Also, return metrics & stuff
return EvaluationResults(
rewards=rewards, metrics=metrics, metric_names=metric_names
)
2 changes: 2 additions & 0 deletions reagent/gym/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import unittest

import gym
import numpy.testing as npt
import torch
import torch.nn.functional as F
Expand Down
5 changes: 3 additions & 2 deletions reagent/gym/tests/test_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,14 @@ def eval_policy(
else Agent.create_for_env(env, serving_policy)
)

eval_rewards = evaluate_for_n_episodes(
eval_results = evaluate_for_n_episodes(
n=num_eval_episodes,
env=env,
agent=agent,
max_steps=env.max_steps,
num_processes=1,
).squeeze(1)
)
eval_rewards = eval_results.rewards.squeeze(1)

logger.info("============Eval rewards==============")
logger.info(eval_rewards)
Expand Down
2 changes: 1 addition & 1 deletion reagent/gym/tests/test_gym_offline.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def evaluate_cem(env, manager, num_eval_episodes: int):
agent = Agent.create_for_env(env, policy)
return evaluate_for_n_episodes(
n=num_eval_episodes, env=env, agent=agent, max_steps=env.max_steps
)
).rewards


def run_test_offline(
Expand Down
2 changes: 1 addition & 1 deletion reagent/gym/tests/test_world_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def train_mdnrnn_and_train_on_embedded_env(
env=embed_env,
agent=agent,
num_processes=1,
)
).rewards
assert (
np.mean(rewards) >= passing_score_bar
), f"average reward doesn't pass our bar {passing_score_bar}"
Expand Down
28 changes: 28 additions & 0 deletions reagent/gym/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ def calculate_cumulative_reward(self, gamma: float = 1.0):
return sum(reward * discount for reward, discount in zip(rewards, discounts))


@dataclass
class EvaluationResults:
rewards: np.ndarray
metrics: Optional[np.ndarray] = None
metric_names: Optional[List[str]] = None


class Sampler(ABC):
"""Given scores, select the action."""

Expand Down Expand Up @@ -123,3 +130,24 @@ def update(self) -> None:
class GaussianSamplerScore(rlt.BaseDataClass):
loc: torch.Tensor
scale_log: torch.Tensor


class MetricExtractor(ABC):
"""
A callable that extract metrics from Trajectory
"""

@abstractmethod
@property
def metric_names(self) -> List[str]:
"""
The names of the metrics extracted by this callable
"""
raise NotImplementedError

@abstractmethod
def __call__(self, trajectory: Trajectory) -> np.ndarray:
"""
The return value should be 1-D array with the same length as metric_names
"""
raise NotImplementedError
2 changes: 1 addition & 1 deletion reagent/workflow/gym_batch_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def evaluate_gym(
agent = Agent.create_for_env_with_serving_policy(env, policy)
rewards = evaluate_for_n_episodes(
n=num_eval_episodes, env=env, agent=agent, max_steps=max_steps
)
).rewards
avg_reward = np.mean(rewards)
logger.info(
f"Average reward over {num_eval_episodes} is {avg_reward}.\n"
Expand Down

0 comments on commit f65b583

Please sign in to comment.