-
Notifications
You must be signed in to change notification settings - Fork 102
/
actor.py
94 lines (70 loc) · 2.84 KB
/
actor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F
from torch import distributions as pyd
import utils
class TanhTransform(pyd.transforms.Transform):
domain = pyd.constraints.real
codomain = pyd.constraints.interval(-1.0, 1.0)
bijective = True
sign = +1
def __init__(self, cache_size=1):
super().__init__(cache_size=cache_size)
@staticmethod
def atanh(x):
return 0.5 * (x.log1p() - (-x).log1p())
def __eq__(self, other):
return isinstance(other, TanhTransform)
def _call(self, x):
return x.tanh()
def _inverse(self, y):
# We do not clamp to the boundary here as it may degrade the performance of certain algorithms.
# one should use `cache_size=1` instead
return self.atanh(y)
def log_abs_det_jacobian(self, x, y):
# We use a formula that is more numerically stable, see details in the following link
# https://github.com/tensorflow/probability/commit/ef6bb176e0ebd1cf6e25c6b5cecdd2428c22963f#diff-e120f70e92e6741bca649f04fcd907b7
return 2. * (math.log(2.) - x - F.softplus(-2. * x))
class SquashedNormal(pyd.transformed_distribution.TransformedDistribution):
def __init__(self, loc, scale):
self.loc = loc
self.scale = scale
self.base_dist = pyd.Normal(loc, scale)
transforms = [TanhTransform()]
super().__init__(self.base_dist, transforms)
@property
def mean(self):
mu = self.loc
for tr in self.transforms:
mu = tr(mu)
return mu
class DiagGaussianActor(nn.Module):
"""torch.distributions implementation of an diagonal Gaussian policy."""
def __init__(self, obs_dim, action_dim, hidden_dim, hidden_depth,
log_std_bounds):
super().__init__()
self.log_std_bounds = log_std_bounds
self.trunk = utils.mlp(obs_dim, hidden_dim, 2 * action_dim,
hidden_depth)
self.outputs = dict()
self.apply(utils.weight_init)
def forward(self, obs):
mu, log_std = self.trunk(obs).chunk(2, dim=-1)
# constrain log_std inside [log_std_min, log_std_max]
log_std = torch.tanh(log_std)
log_std_min, log_std_max = self.log_std_bounds
log_std = log_std_min + 0.5 * (log_std_max - log_std_min) * (log_std +
1)
std = log_std.exp()
self.outputs['mu'] = mu
self.outputs['std'] = std
dist = SquashedNormal(mu, std)
return dist
def log(self, logger, step):
for k, v in self.outputs.items():
logger.log_histogram(f'train_actor/{k}_hist', v, step)
for i, m in enumerate(self.trunk):
if type(m) == nn.Linear:
logger.log_param(f'train_actor/fc{i}', m, step)