/
gamma.py
103 lines (84 loc) · 2.85 KB
/
gamma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import chainer
from chainer.backends import cuda
from chainer import distribution
from chainer.functions.array import broadcast
from chainer.functions.array import where
from chainer.functions.math import digamma
from chainer.functions.math import exponential
from chainer.functions.math import lgamma
from chainer.utils import cache
class Gamma(distribution.Distribution):
"""Gamma Distribution.
Args:
k(:class:`~chainer.Variable` or :ref:`ndarray`): Parameter of
distribution.
theta(:class:`~chainer.Variable` or :ref:`ndarray`): Parameter of
distribution.
"""
def __init__(self, k, theta):
super(Gamma, self).__init__()
self.__k = k
self.__theta = theta
@cache.cached_property
def k(self):
return chainer.as_variable(self.__k)
@cache.cached_property
def theta(self):
return chainer.as_variable(self.__theta)
@property
def batch_shape(self):
return self.k.shape
@cache.cached_property
def entropy(self):
return (
self.k
+ exponential.log(self.theta)
+ lgamma.lgamma(self.k)
+ (1 - self.k) * digamma.digamma(self.k))
@property
def event_shape(self):
return ()
@property
def _is_gpu(self):
return isinstance(self.k.data, cuda.ndarray)
def log_prob(self, x):
logp = (
- lgamma.lgamma(self.k)
- self.k * exponential.log(self.theta)
+ (self.k - 1) * exponential.log(x)
- x / self.theta)
xp = logp.xp
inf = xp.full_like(logp.array, xp.inf)
if isinstance(x, chainer.Variable):
x = x.array
return where.where(xp.asarray(x >= 0), logp, xp.asarray(-inf))
@cache.cached_property
def mean(self):
return self.k * self.theta
@property
def params(self):
return {'k': self.k, 'theta': self.theta}
def sample_n(self, n):
xp = chainer.backend.get_array_module(self.k)
if xp is cuda.cupy:
eps = xp.random.gamma(
self.k.data, size=(n,) + self.batch_shape, dtype=self.k.dtype)
else:
eps = xp.random.gamma(
self.k.data, size=(n,) + self.batch_shape).astype(self.k.dtype)
noise = broadcast.broadcast_to(self.theta, eps.shape) * eps
return noise
@property
def support(self):
return 'positive'
@cache.cached_property
def variance(self):
return self.mean * self.theta
@distribution.register_kl(Gamma, Gamma)
def _kl_gamma_gamma(dist1, dist2):
return (
(dist1.k - dist2.k) * digamma.digamma(dist1.k)
- (lgamma.lgamma(dist1.k) - lgamma.lgamma(dist2.k))
+ dist2.k * (exponential.log(dist2.theta)
- exponential.log(dist1.theta))
+ dist1.k * (dist1.theta / dist2.theta - 1))