forked from PGM-Lab/InferPy
/
test_random_variable.py
201 lines (171 loc) · 7.04 KB
/
test_random_variable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import tensorflow as tf
from tensorflow_probability import edward2 as ed
import numpy as np
import pytest
import inferpy as inf
from inferpy.models import random_variable, sanitize_input_arg
# TODO: boolean operators and iter are not tested
@pytest.mark.parametrize("tensor, expected", [
# sum
("inf.Normal([0., 1.], 0) + inf.Normal([1., 2.], 0)", [1., 3.]),
("[0, 1] + inf.Normal([1., 2.], 0)", [1., 3.]),
("inf.Normal([0., 1.], 0) + [1, 2]", [1., 3.]),
# sub
("inf.Normal([0., 1.], 0) - inf.Normal([1., 2.], 0)", [-1., -1.]),
("[0, 1] - inf.Normal([1., 2.], 0)", [-1., -1.]),
("inf.Normal([0., 1.], 0) - [1, 2]", [-1., -1.]),
# mult
("inf.Normal([0., 1.], 0) * inf.Normal([1., 2.], 0)", [0., 2.]),
("[0, 1] * inf.Normal([1., 2.], 0)", [0., 2.]),
("inf.Normal([0., 1.], 0) * [1, 2]", [0., 2.]),
# div
("inf.Normal([0., 1.], 0) / inf.Normal([1., 2.], 0)", [0., 0.5]),
("[0, 1] / inf.Normal([1., 2.], 0)", [0., 0.5]),
("inf.Normal([0., 1.], 0) / [1, 2]", [0., 0.5]),
# mod
("inf.Normal([0., 1.], 0) % inf.Normal([2., 2.], 0)", [0., 1.]),
("[0, 1] % inf.Normal([2., 2.], 0)", [0., 1.]),
("inf.Normal([0., 1.], 0) % [2, 2]", [0., 1.]),
# lt
("inf.Normal(0., 0) < inf.Normal(2., 0)", True),
("1 < inf.Normal(0., 0)", False),
("inf.Normal(1., 0) < 2", True),
# le
("inf.Normal(2., 0) <= inf.Normal(2., 0)", True),
("1 <= inf.Normal(0., 0)", False),
("inf.Normal(1., 0) <= 1", True),
# gt
("inf.Normal(5., 0) > inf.Normal(2., 0)", True),
("1 > inf.Normal(3., 0)", False),
("inf.Normal(3., 0) > 1", True),
# getitem
("inf.Normal([0., 1., 2., 3.], 0)[0]", 0),
("inf.Normal([0., 1., 2., 3.], 0)[2]", 2),
("inf.Normal(tf.ones((3, 2)), 0)[2][0]", 1),
# pow
("inf.Normal(2., 0) ** inf.Normal(3., 0)", 8.),
("3 ** inf.Normal(2., 0)", 9.),
("inf.Normal(3., 0) ** 3", 27.),
# neg
("-inf.Normal(5., 0)", -5),
("-inf.Normal(-5., 0)", 5),
# abs
("abs(inf.Normal(5., 0))", 5),
("abs(inf.Normal(-5., 0))", 5),
# matmul
("tf.matmul(inf.Normal(tf.ones((2, 3), dtype=np.float32), 0), inf.Normal(tf.eye(3, dtype=np.float32), 0))",
np.ones((2, 3))),
("tf.matmul(np.ones((2, 3), dtype=np.float32), inf.Normal(tf.eye(3, dtype=np.float32), 0))",
np.ones((2, 3))),
("tf.matmul(inf.Normal(np.ones((2, 3), dtype=np.float32), 0), np.eye(3, dtype=np.float32))",
np.ones((2, 3))),
])
def test_operations(tensor, expected):
result = inf.get_session().run(eval(tensor))
# assert that it is equal to expected
assert np.array_equal(result, expected)
@pytest.mark.parametrize("model_object", [
# Simple random variable using scalars as parameters
(inf.Normal(0, 1)),
# Simple random variable using a list as parameter
(inf.Normal([0., 0., 0., 0.], 1)),
# Simple random variable using a numpy array as parameter
(inf.Normal(np.zeros(5), 1)),
# Simple random variable using a tensor as parameter
(inf.Normal(0, tf.ones(5))),
# Simple random variable using another random variable as parameter
(inf.Normal(inf.Normal(0, 1), 1)),
# Simple random variable using a combination of the previously tested options as parameter
(inf.Normal([inf.Normal(0, 1), 1., tf.constant(1.)], 1.)),
# Random variable operation used to define a Random Variable
(inf.Normal(inf.Normal(0, 1) + inf.Normal(0, 1), 1)),
])
def test_edward_type(model_object):
assert isinstance(model_object.var, ed.RandomVariable)
def test_name():
x = inf.Normal(0, 1, name='foo')
assert x.name == 'foo'
# using the name, not the tensor name
x = inf.Normal(0, 1, name='foo')
assert x.name == 'foo'
# Automatic name generation. It starts with 'randvar_X', where initially X is 0
x = inf.Normal(0, 1)
assert isinstance(x.name, str)
assert x.name == 'randvar_0'
def test_tensor_register():
# This allows to run a inferpy.inf.RandomVariable directly in a tf session.
x = inf.Normal(5., 0., name='foo')
assert inf.get_session().run(x) == 5.
assert isinstance(tf.convert_to_tensor(x), tf.Tensor)
assert inf.get_session().run(tf.convert_to_tensor(x)) == 5.
assert inf.get_session().run(tf.constant(5.) + x) == 10.
assert inf.get_session().run(x + tf.constant(5.)) == 10.
def test_convert_random_variables_to_tensors():
# element without RVs
element = 1
element = sanitize_input_arg(element)
assert element == element
# list of elements different from RVs
element = [1, 1]
element = sanitize_input_arg(element)
assert element == element
# numpy array of elements different from RVs
element = np.ones((3, 2))
element = inf.get_session().run(sanitize_input_arg(element))
assert (element == element).all()
# A single Random Variable
element = inf.Normal(0, 1)
result = sanitize_input_arg(element)
assert isinstance(element, random_variable.RandomVariable) and not isinstance(result, random_variable.RandomVariable)
# A list with some Random Variables
element = [inf.Normal(0, 1), 1, inf.Normal(0, 1), 2]
result = sanitize_input_arg(element)
assert all([
isinstance(element[i], random_variable.RandomVariable) and
not isinstance(result[i], random_variable.RandomVariable)
for i in [0, 2]])
# A list with some nested Random Variables
element = [[inf.Normal(0, 1), 1, inf.Normal(0, 1), 2]]
result = sanitize_input_arg(element)
assert all([
isinstance(element[0][i], random_variable.RandomVariable) and
not isinstance(result[0][i], random_variable.RandomVariable)
for i in [0, 2]])
def test_random_variable_in_pmodel():
# test that random variables in pmodel works even if no name has been provided
@inf.probmodel
def model():
inf.Normal(0, 1)
v = list(model().vars.values())[0]
assert v.name.startswith('randvar')
# assert also is_datamodel is false
assert not v.is_datamodel
def test_random_variable_in_datamodel():
# test that random variables in datamodel which uses sample_shape warns about that, and uses datamodel size
with pytest.warns(UserWarning):
with inf.datamodel(10):
x = inf.Normal(0, 1, sample_shape=(2,))
assert x.sample_shape == 10
# assert also that is_datamodel is true
assert x.is_datamodel
def test_run_in_session():
inf.get_session()
x = inf.Normal(1, 0)
assert inf.get_session().run(x) == 1
@pytest.mark.parametrize("loc_expr, scale_expr, floatx", [
# use other RVs as input
("inf.Normal(0, 1)", "inf.Normal(0, 1)", "float16"),
# use one RVs and a numpy array
("inf.Normal(0, 1)", "np.ones(5)", "float32"),
# use one RVs and a scalar
("inf.Normal(0, 1)", "5", "float64"),
# use numpy arrays
("np.ones([6, 7], dtype='float64')", "np.ones([6, 7], dtype='float16')", "float32"),
# use numpy array and scalar
("np.ones([6, 7])", "5", "float16"),
])
def test_sanitize_input(loc_expr, scale_expr, floatx):
# set floatx
inf.set_floatx(floatx)
x = inf.Normal(eval(loc_expr), eval(scale_expr))
assert x.dtype == floatx