/
model.py
180 lines (166 loc) · 7.67 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from typing import List, Tuple
from mlagents.tf_utils import tf
from mlagents.trainers.models import ModelUtils
from mlagents.trainers.policy.tf_policy import TFPolicy
class CuriosityModel(object):
def __init__(
self, policy: TFPolicy, encoding_size: int = 128, learning_rate: float = 3e-4
):
"""
Creates the curiosity model for the Curiosity reward Generator
:param policy: The policy being trained
:param encoding_size: The size of the encoding for the Curiosity module
:param learning_rate: The learning rate for the curiosity module
"""
self.encoding_size = encoding_size
self.policy = policy
self.next_visual_in: List[tf.Tensor] = []
encoded_state, encoded_next_state = self.create_curiosity_encoders()
self.create_inverse_model(encoded_state, encoded_next_state)
self.create_forward_model(encoded_state, encoded_next_state)
self.create_loss(learning_rate)
def create_curiosity_encoders(self) -> Tuple[tf.Tensor, tf.Tensor]:
"""
Creates state encoders for current and future observations.
Used for implementation of Curiosity-driven Exploration by Self-supervised Prediction
See https://arxiv.org/abs/1705.05363 for more details.
:return: current and future state encoder tensors.
"""
encoded_state_list = []
encoded_next_state_list = []
if self.policy.vis_obs_size > 0:
self.next_visual_in = []
visual_encoders = []
next_visual_encoders = []
for i in range(self.policy.vis_obs_size):
# Create input ops for next (t+1) visual observations.
next_visual_input = ModelUtils.create_visual_input(
self.policy.brain.camera_resolutions[i],
name="curiosity_next_visual_observation_" + str(i),
)
self.next_visual_in.append(next_visual_input)
# Create the encoder ops for current and next visual input.
# Note that these encoders are siamese.
encoded_visual = ModelUtils.create_visual_observation_encoder(
self.policy.visual_in[i],
self.encoding_size,
ModelUtils.swish,
1,
"curiosity_stream_{}_visual_obs_encoder".format(i),
False,
)
encoded_next_visual = ModelUtils.create_visual_observation_encoder(
self.next_visual_in[i],
self.encoding_size,
ModelUtils.swish,
1,
"curiosity_stream_{}_visual_obs_encoder".format(i),
True,
)
visual_encoders.append(encoded_visual)
next_visual_encoders.append(encoded_next_visual)
hidden_visual = tf.concat(visual_encoders, axis=1)
hidden_next_visual = tf.concat(next_visual_encoders, axis=1)
encoded_state_list.append(hidden_visual)
encoded_next_state_list.append(hidden_next_visual)
if self.policy.vec_obs_size > 0:
# Create the encoder ops for current and next vector input.
# Note that these encoders are siamese.
# Create input op for next (t+1) vector observation.
self.next_vector_in = tf.placeholder(
shape=[None, self.policy.vec_obs_size],
dtype=tf.float32,
name="curiosity_next_vector_observation",
)
encoded_vector_obs = ModelUtils.create_vector_observation_encoder(
self.policy.vector_in,
self.encoding_size,
ModelUtils.swish,
2,
"curiosity_vector_obs_encoder",
False,
)
encoded_next_vector_obs = ModelUtils.create_vector_observation_encoder(
self.next_vector_in,
self.encoding_size,
ModelUtils.swish,
2,
"curiosity_vector_obs_encoder",
True,
)
encoded_state_list.append(encoded_vector_obs)
encoded_next_state_list.append(encoded_next_vector_obs)
encoded_state = tf.concat(encoded_state_list, axis=1)
encoded_next_state = tf.concat(encoded_next_state_list, axis=1)
return encoded_state, encoded_next_state
def create_inverse_model(
self, encoded_state: tf.Tensor, encoded_next_state: tf.Tensor
) -> None:
"""
Creates inverse model TensorFlow ops for Curiosity module.
Predicts action taken given current and future encoded states.
:param encoded_state: Tensor corresponding to encoded current state.
:param encoded_next_state: Tensor corresponding to encoded next state.
"""
combined_input = tf.concat([encoded_state, encoded_next_state], axis=1)
hidden = tf.layers.dense(combined_input, 256, activation=ModelUtils.swish)
if self.policy.brain.vector_action_space_type == "continuous":
pred_action = tf.layers.dense(
hidden, self.policy.act_size[0], activation=None
)
squared_difference = tf.reduce_sum(
tf.squared_difference(pred_action, self.policy.selected_actions), axis=1
)
self.inverse_loss = tf.reduce_mean(
tf.dynamic_partition(squared_difference, self.policy.mask, 2)[1]
)
else:
pred_action = tf.concat(
[
tf.layers.dense(
hidden, self.policy.act_size[i], activation=tf.nn.softmax
)
for i in range(len(self.policy.act_size))
],
axis=1,
)
cross_entropy = tf.reduce_sum(
-tf.log(pred_action + 1e-10) * self.policy.selected_actions, axis=1
)
self.inverse_loss = tf.reduce_mean(
tf.dynamic_partition(cross_entropy, self.policy.mask, 2)[1]
)
def create_forward_model(
self, encoded_state: tf.Tensor, encoded_next_state: tf.Tensor
) -> None:
"""
Creates forward model TensorFlow ops for Curiosity module.
Predicts encoded future state based on encoded current state and given action.
:param encoded_state: Tensor corresponding to encoded current state.
:param encoded_next_state: Tensor corresponding to encoded next state.
"""
combined_input = tf.concat(
[encoded_state, self.policy.selected_actions], axis=1
)
hidden = tf.layers.dense(combined_input, 256, activation=ModelUtils.swish)
pred_next_state = tf.layers.dense(
hidden,
self.encoding_size
* (self.policy.vis_obs_size + int(self.policy.vec_obs_size > 0)),
activation=None,
)
squared_difference = 0.5 * tf.reduce_sum(
tf.squared_difference(pred_next_state, encoded_next_state), axis=1
)
self.intrinsic_reward = squared_difference
self.forward_loss = tf.reduce_mean(
tf.dynamic_partition(squared_difference, self.policy.mask, 2)[1]
)
def create_loss(self, learning_rate: float) -> None:
"""
Creates the loss node of the model as well as the update_batch optimizer to update the model.
:param learning_rate: The learning rate for the optimizer.
"""
self.loss = 10 * (0.2 * self.forward_loss + 0.8 * self.inverse_loss)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
self.update_batch = optimizer.minimize(self.loss)