/
expdesign.py
440 lines (374 loc) · 17.9 KB
/
expdesign.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
#!/usr/bin/python
# -*- coding: utf-8 -*-
##
# expdesign.py: Adaptive experimental design algorithms.
##
# © 2017, Chris Ferrie (csferrie@gmail.com) and
# Christopher Granade (cgranade@cgranade.com).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
##
## FEATURES ###################################################################
from __future__ import absolute_import
from __future__ import division
## ALL ########################################################################
# We use __all__ to restrict what globals are visible to external modules.
__all__ = [
'ExperimentDesigner',
'Heuristic',
'EnsembleHeuristic',
'ExpSparseHeuristic',
'PGH',
'OptimizationAlgorithms'
]
## IMPORTS ####################################################################
from future.utils import with_metaclass
import numpy as np
# for BCRB and BED classes
import scipy.optimize as opt
from qinfer._lib import enum # <- TODO: replace with flufl.enum!
from abc import ABCMeta, abstractmethod
import warnings
from qinfer.finite_difference import *
## FUNCTIONS ###################################################################
def identity(arg): return arg
## CLASSES #####################################################################
OptimizationAlgorithms = enum.enum("NULL", "CG", "NCG", "NELDER_MEAD")
class Heuristic(with_metaclass(ABCMeta, object)):
r"""
Defines a heuristic used for selecting new experiments without explicit
optimization of the risk. As an example, the :math:`t_k = (9/8)^k`
heuristic discussed by [FGC12]_ does not make explicit reference to the
risk, and so would be appropriate as a `Heuristic` subclass.
In particular, the [FGC12]_ heuristic is implemented by the
:class:`ExpSparseHeuristic` class.
"""
def __init__(self, updater):
self._updater = updater
@abstractmethod
def __call__(self, *args):
raise NotImplementedError("Not yet implemented.")
class EnsembleHeuristic(Heuristic):
r"""
Heuristic that randomly chooses one of several other
heuristics.
:param list ensemble: List of tuples ``(heuristic, pr)``
specifying the probability of choosing each member
heuristic.
"""
def __init__(self, ensemble):
self._pr = np.array([pr for heuristic, pr in ensemble])
self._heuristics = ([heuristic for heuristic, pr in ensemble])
def __call__(self, *args):
idx_heuristic = np.random.choice(len(self._heuristics), p=self._pr)
return self._heuristics[idx_heuristic](*args)
class ExpSparseHeuristic(Heuristic):
r"""
Implements the exponentially-sparse time evolution heuristic
of [FGC12]_, under which :math:`t_k = A b^k`, where :math:`A`
and :math:`b` are parameters of the heuristic.
:param qinfer.smc.SMCUpdater updater: Posterior updater for which
experiments should be heuristicly designed.
:param float scale: The value of :math:`A`, implicitly setting
the frequency scale for the problem.
:param float base: The base of the exponent; in general, should
be closer to 1 for higher-dimensional models.
:param str t_field: Name of the expparams field representing time.
If None, then the generated expparams are taken to be scalar,
and not a record.
:param dict other_fields: Values of the other fields to be used
in designed experiments.
"""
def __init__(self,
updater, scale=1, base=9/8,
t_field=None, other_fields=None
):
super(ExpSparseHeuristic, self).__init__(updater)
self._scale = scale
self._base = base
self._t_field = t_field
self._other_fields = other_fields
def __call__(self):
n_exps = len(self._updater.data_record)
t = self._scale * (self._base ** n_exps)
dtype = self._updater.model.expparams_dtype
if self._t_field is None:
return np.array([t], dtype=dtype)
else:
eps = np.empty((1,), dtype=dtype)
for field, value in self._other_fields.items():
eps[field] = value
eps[self._t_field] = t
return eps
class PGH(Heuristic):
"""
Implements the *particle guess heuristic* (PGH) of [WGFC13a]_, which
selects two particles from the current posterior, selects one as an
inversion hypothesis and sets the time parameter to be the inverse of
the distance between the particles. In this way, the PGH adapts to the
current uncertianty without additional simulation resources.
:param qinfer.smc.SMCUpdater updater: Posterior updater for which
experiments should be heuristicly designed.
:param str inv_field: Name of the ``expparams`` field corresponding to the
inversion hypothesis.
:param str t_field: Name of the ``expparams`` field corresponding to the
evolution time.
:param callable inv_func: Function to be applied to modelparameter vectors
to produce an inversion field ``x_``.
:param callable t_func: Function to be applied to the evolution time to produce a
time field ``t``.
:param int maxiters: Number of times to try and choose distinct particles
before giving up.
:param dict other_fields: Values to set for fields not given by the PGH.
Once initialized, a ``PGH`` object can be called to generate a new
experiment parameter vector:
>>> pgh = PGH(updater) # doctest: +SKIP
>>> expparams = pgh() # doctest: +SKIP
If the posterior weights are very highly peaked (that is, if the effective
sample size is too small, as measured by
:attr:`~qinfer.smc.SMCUpdater.n_ess`), then it may be the case that the two
particles chosen by the PGH are identical, such that the time would be
determined by ``1 / 0``. In this case, the `PGH` class will instead draw
new pairs of particles until they are not identical, up to ``maxiters``
attempts. If that limit is reached, a `RuntimeError` will be raised.
"""
def __init__(self, updater, inv_field='x_', t_field='t',
inv_func=identity,
t_func=identity,
maxiters=10,
other_fields=None
):
super(PGH, self).__init__(updater)
self._x_ = inv_field
self._t = t_field
self._inv_func = inv_func
self._t_func = t_func
self._maxiters = maxiters
self._other_fields = other_fields if other_fields is not None else {}
def __call__(self):
idx_iter = 0
while idx_iter < self._maxiters:
x, xp = self._updater.sample(n=2)[:, np.newaxis, :]
if self._updater.model.distance(x, xp) > 0:
break
else:
idx_iter += 1
if self._updater.model.distance(x, xp) == 0:
raise RuntimeError("PGH did not find distinct particles in {} iterations.".format(self._maxiters))
eps = np.empty((1,), dtype=self._updater.model.expparams_dtype)
eps[self._x_] = self._inv_func(x)
eps[self._t] = self._t_func(1 / self._updater.model.distance(x, xp))
for field, value in self._other_fields.items():
eps[field] = value
return eps
class ExperimentDesigner(object):
"""
Designs new experiments using the current best information provided by a
Bayesian updater.
:param qinfer.smc.SMCUpdater updater: A Bayesian updater to design
experiments for.
:param OptimizationAlgorithms opt_algo: Algorithm to be used to perform
local optimization.
"""
def __init__(self, updater, opt_algo=OptimizationAlgorithms.CG):
if opt_algo not in OptimizationAlgorithms.reverse_mapping:
raise ValueError("Unsupported or unknown optimization algorithm.")
self._updater = updater
self._opt_algo = opt_algo
# Set everything up for the first experiment.
self.new_exp()
## METHODS ################################################################
def new_exp(self):
"""
Resets this `ExperimentDesigner` instance and prepares for designing
the next experiment.
"""
self.__best_cost = None
self.__best_ep = None
def design_expparams_field(self,
guess, field,
cost_scale_k=1.0, disp=False,
maxiter=None, maxfun=None,
store_guess=False, grad_h=None, cost_mult=False
):
r"""
Designs a new experiment by varying a single field of a shape ``(1,)``
record array and minimizing the objective function
.. math::
O(\vec{e}) = r(\vec{e}) + k \$(\vec{e}),
where :math:`r` is the Bayes risk as calculated by the updater, and
where :math:`\$` is the cost function specified by the model. Here,
:math:`k` is a parameter specified to relate the units of the risk and
the cost. See :ref:`expdesign` for more details.
:param guess: Either a record array with a single guess, or
a callable function that generates guesses.
:type guess: Instance of :class:`~Heuristic`, `callable`
or :class:`~numpy.ndarray` of ``dtype``
:attr:`~qinfer.abstract_model.Simulatable.expparams_dtype`
:param str field: The name of the ``expparams`` field to be optimized.
All other fields of ``guess`` will be held constant.
:param float cost_scale_k: A scale parameter :math:`k` relating the
Bayes risk to the experiment cost.
See :ref:`expdesign`.
:param bool disp: If `True`, the optimization will print additional
information as it proceeds.
:param int maxiter: For those optimization algorithms which support
it (currently, only CG and NELDER_MEAD), limits the number of
optimization iterations used for each guess.
:param int maxfun: For those optimization algorithms which support it
(currently, only NCG and NELDER_MEAD), limits the number of
objective calls that can be made.
:param bool store_guess: If ``True``, will compare the outcome of this
guess to previous guesses and then either store the optimization of
this experiment, or the previous best-known experiment design.
:param float grad_h: Step size to use in estimating gradients. Used
only if ``opt_algo`` is NCG.
:return: An array representing the best experiment design found so
far for the current experiment.
"""
# Define some short names for commonly used properties.
up = self._updater
m = up.model
# Generate a new guess or use a guess provided, depending on the
# type of the guess argument.
if isinstance(guess, Heuristic):
raise NotImplementedError("Not yet implemented.")
elif callable(guess):
# Generate a new guess by calling the guess function provided.
ep = guess(
idx_exp=len(up.data_record),
mean=up.est_mean(),
cov=up.est_covariance_mtx()
)
else:
# Make a copy of the guess that we can manipulate, but otherwise
# use it as-is.
ep = np.copy(guess)
# Define an objective function that wraps a vector of scalars into
# an appropriate record array.
if (cost_mult==False):
def objective_function(x):
"""
Used internally by design_expparams_field.
If you see this, something probably went wrong.
"""
ep[field] = x
return up.bayes_risk(ep) + cost_scale_k * m.experiment_cost(ep)
else:
def objective_function(x):
"""
Used internally by design_expparams_field.
If you see this, something probably went wrong.
"""
ep[field] = x
return up.bayes_risk(ep)* m.experiment_cost(ep)**cost_scale_k
# Some optimizers require gradients of the objective function.
# Here, we create a FiniteDifference object to compute that for
# us.
d_dx_objective = FiniteDifference(objective_function, ep[field].size)
# Allocate a variable to hold the local optimum value found.
# This way, if an optimization algorithm doesn't support returning
# the value as well as the location, we can find it manually.
f_opt = None
# Here's the core, where we break out and call the various optimization
# routines provided by SciPy.
if self._opt_algo == OptimizationAlgorithms.NULL:
# This optimization algorithm does nothing locally, but only
# exists to leverage the store_guess functionality below.
x_opt = guess[0][field]
elif self._opt_algo == OptimizationAlgorithms.CG:
# Prepare any additional options.
opt_options = {}
if maxiter is not None:
opt_options['maxiter'] = maxiter
# Actually call fmin_cg, gathering all outputs we can.
x_opt, f_opt, func_calls, grad_calls, warnflag = opt.fmin_cg(
objective_function, guess[0][field],
disp=disp, full_output=True, **opt_options
)
elif self._opt_algo == OptimizationAlgorithms.NCG:
# Prepare any additional options.
opt_options = {}
if maxfun is not None:
opt_options['maxfun'] = maxfun
if grad_h is not None:
opt_options['epsilon'] = grad_h
# Actually call fmin_tnc, gathering all outputs we can.
# We use fmin_tnc in preference to fmin_ncg, as they implement the
# same algorithm, but fmin_tnc seems better behaved with respect
# to very flat gradient regions, due to respecting maxfun.
# By contrast, fmin_ncg can get stuck in an infinite loop in
# versions of SciPy < 0.11.
#
# Note that in some versions of SciPy, there was a bug in
# fmin_ncg and fmin_tnc that can propagate outward if the gradient
# is too flat. We catch it here and return the initial guess in that
# case, since by hypothesis, it's too flat to make much difference
# anyway.
try:
x_opt, f_opt, func_calls, grad_calls, h_calls, warnflag = opt.fmin_tnc(
objective_function, guess[0][field],
fprime=None, bounds=None, approx_grad=True,
disp=disp, full_output=True, **opt_options
)
except TypeError:
warnings.warn(
"Gradient function too flat for NCG.",
RuntimeWarning)
x_opt = guess[0][field]
f_opt = None
elif self._opt_algo == OptimizationAlgorithms.NELDER_MEAD:
opt_options = {}
if maxfun is not None:
opt_options['maxfun'] = maxfun
if maxiter is not None:
opt_options['maxiter'] = maxiter
x_opt, f_opt, iters, func_calls, warnflag = opt.fmin(
objective_function, guess[0][field],
disp=disp, full_output=True, **opt_options
)
# Optionally compare the result to previous guesses.
if store_guess:
# Possibly compute the objective function value at the local optimum
# if we don't already know it.
if f_opt is None:
guess_qual = objective_function(x_opt)
# Compare to the known best cost so far.
if self.__best_cost is None or (self.__best_cost > f_opt):
# No known best yet, or we're better than the previous best,
# so record this guess.
ep[field] = x_opt
self.__best_cost = f_opt
self.__best_ep = ep
else:
ep = self.__best_ep # Guess is bad, return current best guess
else:
# We aren't using guess recording, so just pack the local optima
# into ep for returning.
ep[field] = x_opt
# In any case, return the optimized guess.
return ep