forked from openai/baselines
-
Notifications
You must be signed in to change notification settings - Fork 727
/
replay_buffer.py
184 lines (152 loc) · 7.72 KB
/
replay_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import copy
from enum import Enum
import numpy as np
class GoalSelectionStrategy(Enum):
"""
The strategies for selecting new goals when
creating artificial transitions.
"""
# Select a goal that was achieved
# after the current step, in the same episode
FUTURE = 0
# Select the goal that was achieved
# at the end of the episode
FINAL = 1
# Select a goal that was achieved in the episode
EPISODE = 2
# Select a goal that was achieved
# at some point in the training procedure
# (and that is present in the replay buffer)
RANDOM = 3
# For convenience
# that way, we can use string to select a strategy
KEY_TO_GOAL_STRATEGY = {
'future': GoalSelectionStrategy.FUTURE,
'final': GoalSelectionStrategy.FINAL,
'episode': GoalSelectionStrategy.EPISODE,
'random': GoalSelectionStrategy.RANDOM
}
class HindsightExperienceReplayWrapper(object):
"""
Wrapper around a replay buffer in order to use HER.
This implementation is inspired by to the one found in https://github.com/NervanaSystems/coach/.
:param replay_buffer: (ReplayBuffer)
:param n_sampled_goal: (int) The number of artificial transitions to generate for each actual transition
:param goal_selection_strategy: (GoalSelectionStrategy) The method that will be used to generate
the goals for the artificial transitions.
:param wrapped_env: (HERGoalEnvWrapper) the GoalEnv wrapped using HERGoalEnvWrapper,
that enables to convert observation to dict, and vice versa
"""
def __init__(self, replay_buffer, n_sampled_goal, goal_selection_strategy, wrapped_env):
super(HindsightExperienceReplayWrapper, self).__init__()
assert isinstance(goal_selection_strategy, GoalSelectionStrategy), "Invalid goal selection strategy," \
"please use one of {}".format(
list(GoalSelectionStrategy))
self.n_sampled_goal = n_sampled_goal
self.goal_selection_strategy = goal_selection_strategy
self.env = wrapped_env
# Buffer for storing transitions of the current episode
self.episode_transitions = []
self.replay_buffer = replay_buffer
def add(self, obs_t, action, reward, obs_tp1, done):
"""
add a new transition to the buffer
:param obs_t: (np.ndarray) the last observation
:param action: ([float]) the action
:param reward: (float) the reward of the transition
:param obs_tp1: (np.ndarray) the new observation
:param done: (bool) is the episode done
"""
assert self.replay_buffer is not None
# Update current episode buffer
self.episode_transitions.append((obs_t, action, reward, obs_tp1, done))
if done:
# Add transitions (and imagined ones) to buffer only when an episode is over
self._store_episode()
# Reset episode buffer
self.episode_transitions = []
def sample(self, *args, **kwargs):
return self.replay_buffer.sample(*args, **kwargs)
def can_sample(self, n_samples):
"""
Check if n_samples samples can be sampled
from the buffer.
:param n_samples: (int)
:return: (bool)
"""
return self.replay_buffer.can_sample(n_samples)
def __len__(self):
return len(self.replay_buffer)
def _sample_achieved_goal(self, episode_transitions, transition_idx):
"""
Sample an achieved goal according to the sampling strategy.
:param episode_transitions: ([tuple]) a list of all the transitions in the current episode
:param transition_idx: (int) the transition to start sampling from
:return: (np.ndarray) an achieved goal
"""
if self.goal_selection_strategy == GoalSelectionStrategy.FUTURE:
# Sample a goal that was observed in the same episode after the current step
selected_idx = np.random.choice(np.arange(transition_idx + 1, len(episode_transitions)))
selected_transition = episode_transitions[selected_idx]
elif self.goal_selection_strategy == GoalSelectionStrategy.FINAL:
# Choose the goal achieved at the end of the episode
selected_transition = episode_transitions[-1]
elif self.goal_selection_strategy == GoalSelectionStrategy.EPISODE:
# Random goal achieved during the episode
selected_idx = np.random.choice(np.arange(len(episode_transitions)))
selected_transition = episode_transitions[selected_idx]
elif self.goal_selection_strategy == GoalSelectionStrategy.RANDOM:
# Random goal achieved, from the entire replay buffer
selected_idx = np.random.choice(np.arange(len(self.replay_buffer)))
selected_transition = self.replay_buffer.storage[selected_idx]
else:
raise ValueError("Invalid goal selection strategy,"
"please use one of {}".format(list(GoalSelectionStrategy)))
return self.env.convert_obs_to_dict(selected_transition[0])['achieved_goal']
def _sample_achieved_goals(self, episode_transitions, transition_idx):
"""
Sample a batch of achieved goals according to the sampling strategy.
:param episode_transitions: ([tuple]) list of the transitions in the current episode
:param transition_idx: (int) the transition to start sampling from
:return: (np.ndarray) an achieved goal
"""
return [
self._sample_achieved_goal(episode_transitions, transition_idx)
for _ in range(self.n_sampled_goal)
]
def _store_episode(self):
"""
Sample artificial goals and store transition of the current
episode in the replay buffer.
This method is called only after each end of episode.
"""
# For each transition in the last episode,
# create a set of artificial transitions
for transition_idx, transition in enumerate(self.episode_transitions):
obs_t, action, reward, obs_tp1, done = transition
# Add to the replay buffer
self.replay_buffer.add(obs_t, action, reward, obs_tp1, done)
# We cannot sample a goal from the future in the last step of an episode
if (transition_idx == len(self.episode_transitions) - 1 and
self.goal_selection_strategy == GoalSelectionStrategy.FUTURE):
break
# Sampled n goals per transition, where n is `n_sampled_goal`
# this is called k in the paper
sampled_goals = self._sample_achieved_goals(self.episode_transitions, transition_idx)
# For each sampled goals, store a new transition
for goal in sampled_goals:
# Copy transition to avoid modifying the original one
obs, action, reward, next_obs, done = copy.deepcopy(transition)
# Convert concatenated obs to dict, so we can update the goals
obs_dict, next_obs_dict = map(self.env.convert_obs_to_dict, (obs, next_obs))
# Update the desired goal in the transition
obs_dict['desired_goal'] = goal
next_obs_dict['desired_goal'] = goal
# Update the reward according to the new desired goal
reward = self.env.compute_reward(goal, next_obs_dict['achieved_goal'], None)
# Can we use achieved_goal == desired_goal?
done = False
# Transform back to ndarrays
obs, next_obs = map(self.env.convert_dict_to_obs, (obs_dict, next_obs_dict))
# Add artificial transition to the replay buffer
self.replay_buffer.add(obs, action, reward, next_obs, done)