/
frv.py
348 lines (259 loc) · 9.52 KB
/
frv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
"""
Finite Discrete Random Variables Module
See Also
========
diofant.stats.frv_types
diofant.stats.rv
diofant.stats.crv
"""
import random
from itertools import product
from ..core import Dict, Eq, Expr, Lambda, Mul, Symbol, Tuple, cacheit, sympify
from ..functions import Piecewise
from ..logic import And, Or
from ..sets import FiniteSet
from .rv import (ConditionalDomain, NamedArgsMixin, ProductDomain,
ProductPSpace, PSpace, RandomDomain, SinglePSpace,
random_symbols, rv_subs, sumsets)
class FiniteDensity(dict):
"""Finite probabibility density."""
def __call__(self, item):
item = sympify(item)
if item in self:
return self[item]
else:
return 0
@property
def dict(self):
return dict(self)
class FiniteDomain(RandomDomain):
"""
A domain with discrete finite support
Represented using a FiniteSet.
"""
is_Finite = True
@property
def dict(self):
return FiniteSet(*[Dict(dict(el)) for el in self.elements])
def as_boolean(self):
return Or(*[And(*[Eq(sym, val) for sym, val in item]) for item in self])
class SingleFiniteDomain(FiniteDomain):
"""
A FiniteDomain over a single symbol/set
Example: The possibilities of a *single* die roll.
"""
def __new__(cls, symbol, set):
if not isinstance(set, FiniteSet):
set = FiniteSet(*set)
return Expr.__new__(cls, symbol, set)
@property
def symbol(self):
return self.args[0]
@property
def symbols(self):
return FiniteSet(self.symbol)
@property
def set(self):
return self.args[1]
@property
def elements(self):
return FiniteSet(*[frozenset(((self.symbol, elem), )) for elem in self.set])
def __iter__(self):
return (frozenset(((self.symbol, elem),)) for elem in self.set)
def __contains__(self, other):
sym, val = tuple(other)[0]
return sym == self.symbol and val in self.set
class ProductFiniteDomain(ProductDomain, FiniteDomain):
"""
A Finite domain consisting of several other FiniteDomains
Example: The possibilities of the rolls of three independent dice
"""
def __iter__(self):
proditer = product(*self.domains)
return (sumsets(items) for items in proditer)
@property
def elements(self):
return FiniteSet(*self)
class ConditionalFiniteDomain(ConditionalDomain, ProductFiniteDomain):
"""
A FiniteDomain that has been restricted by a condition
Example: The possibilities of a die roll under the condition that the
roll is even.
"""
def __new__(cls, domain, condition):
if condition is True:
return domain
cond = rv_subs(condition)
# Check that we aren't passed a condition like die1 == z
# where 'z' is a symbol that we don't know about
# We will never be able to test this equality through iteration
if not cond.free_symbols.issubset(domain.free_symbols):
raise ValueError('Condition "%s" contains foreign symbols \n%s.\n' % ( # noqa: SFS101
condition, tuple(cond.free_symbols - domain.free_symbols)) +
'Will be unable to iterate using this condition')
return Expr.__new__(cls, domain, cond)
def _test(self, elem):
val = self.condition.xreplace(dict(elem))
if val in [True, False]:
return val
elif val.is_Equality:
return val.lhs == val.rhs
raise ValueError(f'Undeciable if {val!s}')
def __contains__(self, other):
return other in self.fulldomain and self._test(other)
def __iter__(self):
return (elem for elem in self.fulldomain if self._test(elem))
@property
def set(self):
if self.fulldomain.__class__ is SingleFiniteDomain:
return FiniteSet(*[elem for elem in self.fulldomain.set
if frozenset(((self.fulldomain.symbol, elem),)) in self])
else:
raise NotImplementedError(
'Not implemented on multi-dimensional conditional domain')
def as_boolean(self):
return FiniteDomain.as_boolean(self)
class SingleFiniteDistribution(Expr, NamedArgsMixin):
"""Base class for finite distributions."""
def __new__(cls, *args):
args = list(map(sympify, args))
return Expr.__new__(cls, *args)
@property # type: ignore[misc]
@cacheit
def dict(self):
return {k: self.pdf(k) for k in self.set}
@property
def pdf(self):
x = Symbol('x')
return Lambda(x, Piecewise(*(
[(v, Eq(k, x)) for k, v in self.dict.items()] + [(0, True)])))
@property
def set(self):
return list(self.dict)
values = property(lambda self: self.dict.values)
items = property(lambda self: self.dict.items)
__iter__ = property(lambda self: self.dict.__iter__)
__getitem__ = property(lambda self: self.dict.__getitem__)
__call__ = pdf
def __contains__(self, other):
return other in self.set
#####################
# Probability Space #
#####################
class FinitePSpace(PSpace):
"""
A Finite Probability Space
Represents the probabilities of a finite number of events.
"""
is_Finite = True
@property
def domain(self):
return self.args[0]
@property
def density(self):
return self.args[0]
def __new__(cls, domain, density):
density = {sympify(key): sympify(val)
for key, val in density.items()}
public_density = Dict(density)
obj = PSpace.__new__(cls, domain, public_density)
obj._density = density
return obj
def prob_of(self, elem):
return self._density.get(elem, 0)
def where(self, condition):
return ConditionalFiniteDomain(self.domain, condition)
def compute_density(self, expr):
expr = expr.xreplace({rs: rs.symbol for rs in self.values})
d = FiniteDensity()
for elem in self.domain:
val = expr.xreplace(dict(elem))
prob = self.prob_of(elem)
d[val] = d.get(val, 0) + prob
return d
@cacheit
def compute_cdf(self, expr):
d = self.compute_density(expr)
cum_prob = 0
cdf = []
for key in sorted(d):
prob = d[key]
cum_prob += prob
cdf.append((key, cum_prob))
return dict(cdf)
@cacheit
def sorted_cdf(self, expr, python_float=False):
cdf = self.compute_cdf(expr)
items = list(cdf.items())
sorted_items = sorted(items, key=lambda val_cumprob: val_cumprob[1])
if python_float:
sorted_items = [(v, float(cum_prob))
for v, cum_prob in sorted_items]
return sorted_items
def integrate(self, expr, rvs=None):
rvs = rvs or self.values
expr = expr.xreplace({rs: rs.symbol for rs in rvs})
return sum(expr.xreplace(dict(elem)) * self.prob_of(elem)
for elem in self.domain)
def probability(self, condition):
cond_symbols = frozenset(rs.symbol for rs in random_symbols(condition))
assert cond_symbols.issubset(self.symbols)
return sum(self.prob_of(elem) for elem in self.where(condition))
def conditional_space(self, condition):
domain = self.where(condition)
prob = self.probability(condition)
density = {key: val / prob
for key, val in self._density.items() if key in domain}
return FinitePSpace(domain, density)
def sample(self):
"""
Internal sample method
Returns dictionary mapping RandomSymbol to realization value.
"""
expr = Tuple(*self.values)
cdf = self.sorted_cdf(expr, python_float=True)
x = random.uniform(0, 1)
# Find first occurrence with cumulative probability less than x
# This should be replaced with binary search
for value, cum_prob in cdf:
if x < cum_prob:
# return dictionary mapping RandomSymbols to values
return dict(zip(expr, value))
assert False, 'We should never have gotten to this point' # pragma: no cover
class SingleFinitePSpace(SinglePSpace, FinitePSpace):
"""
A single finite probability space
Represents the probabilities of a set of random events that can be
attributed to a single variable/symbol.
This class is implemented by many of the standard FiniteRV types such as
Die, Bernoulli, Coin, etc....
"""
@property
def domain(self):
return SingleFiniteDomain(self.symbol, self.distribution.set)
@property # type: ignore[misc]
@cacheit
def _density(self):
return {frozenset(((self.symbol, val),)): prob
for val, prob in self.distribution.dict.items()}
class ProductFinitePSpace(ProductPSpace, FinitePSpace):
"""A collection of several independent finite probability spaces."""
@property
def domain(self):
return ProductFiniteDomain(*[space.domain for space in self.spaces])
@property # type: ignore[misc]
@cacheit
def _density(self):
proditer = product(*[iter(space._density.items())
for space in self.spaces])
d = {}
for items in proditer:
elems, probs = list(zip(*items))
elem = sumsets(elems)
prob = Mul(*probs)
d[elem] = d.get(elem, 0) + prob
return Dict(d)
@property # type: ignore[misc]
@cacheit
def density(self):
return self._density