-
Notifications
You must be signed in to change notification settings - Fork 239
/
wrapped_gaussian_process.py
311 lines (262 loc) · 13 KB
/
wrapped_gaussian_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
r"""Wrapped Gaussian Process.
Lead author: Arthur Pignet
Extension of Gaussian Processes to Riemannian Manifolds,
introduced in [Mallasto]_.
References
----------
.. [Mallasto] Mallasto, A. and Feragen, A.
“Wrapped gaussian process regression on riemannian manifolds.”
IEEE/CVF Conference on Computer Vision and Pattern Recognition
(2018)
"""
from sklearn.base import BaseEstimator, MultiOutputMixin, RegressorMixin
from sklearn.gaussian_process import GaussianProcessRegressor
import geomstats.backend as gs
class WrappedGaussianProcess(MultiOutputMixin, RegressorMixin, BaseEstimator):
r"""Wrapped Gaussian Process.
The implementation is based on the algorithm 4 of [1].
Parameters
----------
space : Manifold
Manifold.
metric : RiemannianMetric
Riemannian metric.
prior : function
Associate to each input a manifold valued point.
kernel : kernel instance, default=None
The kernel specifying the covariance function of the GP. If None is
passed, the kernel ``ConstantKernel(1.0, constant_value_bounds="fixed")
* RBF(1.0, length_scale_bounds="fixed")`` is used as default. Note that
the kernel hyperparameters are optimized during fitting unless the
bounds are marked as "fixed".
alpha : float or ndarray of shape (n_samples,), default=1e-10
Value added to the diagonal of the kernel matrix during fitting.
This can prevent a potential numerical issue during fitting, by
ensuring that the calculated values form a positive definite matrix.
It can also be interpreted as the variance of additional Gaussian
measurement noise on the training observations. Note that this is
different from using a `WhiteKernel`. If an array is passed, it must
have the same number of entries as the data used for fitting and is
used as datapoint-dependent noise level. Allowing to specify the
noise level directly as a parameter is mainly for convenience and
for consistency with :class:`~sklearn.linear_model.Ridge`.
optimizer : "fmin_l_bfgs_b" or callable, default="fmin_l_bfgs_b"
Can either be one of the internally supported optimizers for optimizing
the kernel's parameters, specified by a string, or an externally
defined optimizer passed as a callable. If a callable is passed, it
must have the signature::
def optimizer(obj_func, initial_theta, bounds):
# * 'obj_func': the objective function to be minimized, which
# takes the hyperparameters theta as a parameter and an
# optional flag eval_gradient, which determines if the
# gradient is returned additionally to the function value
# * 'initial_theta': the initial value for theta, which can be
# used by local optimizers
# * 'bounds': the bounds on the values of theta
....
# Returned are the best found hyperparameters theta and
# the corresponding value of the target function.
return theta_opt, func_min
Per default, the L-BFGS-B algorithm from `scipy.optimize.minimize`
is used. If None is passed, the kernel's parameters are kept fixed.
Available internal optimizers are: `{'fmin_l_bfgs_b'}`.
n_restarts_optimizer : int, default=0
The number of restarts of the optimizer for finding the kernel's
parameters which maximize the log-marginal likelihood. The first run
of the optimizer is performed from the kernel's initial parameters,
the remaining ones (if any) from thetas sampled log-uniform randomly
from the space of allowed theta-values. If greater than 0, all bounds
must be finite. Note that `n_restarts_optimizer == 0` implies that one
run is performed.
copy_X_train : bool, default=True
If True, a persistent copy of the training data is stored in the
object. Otherwise, just a reference to the training data is stored,
which might cause predictions to change if the data is modified
externally.
random_state : int, RandomState instance or None, default=None
Determines random number generation used to initialize the centers.
Pass an int for reproducible results across multiple function calls.
[1] Mallasto, A. and Feragen, A. Wrapped gaussian process
regression on riemannian manifolds. In 2018 IEEE/CVF
Conference on Computer Vision and Pattern Recognition
"""
def __init__(
self,
space,
metric,
prior,
kernel=None,
*,
alpha=1e-10,
optimizer="fmin_l_bfgs_b",
n_restarts_optimizer=0,
copy_X_train=True,
random_state=None,
):
if metric is None:
metric = space.metric
self.metric = metric
self.space = space
self.prior = prior
self.copy_X_train = copy_X_train
self.y_train_ = None
self.tangent_y_train_ = None
self.y_train_shape_ = None
self._euclidean_gpr = GaussianProcessRegressor(
kernel=kernel,
alpha=alpha,
optimizer=optimizer,
n_restarts_optimizer=n_restarts_optimizer,
normalize_y=False,
copy_X_train=copy_X_train,
random_state=random_state,
)
self.__dict__.update(self._euclidean_gpr.__dict__)
self.log_marginal_likelihood = self._euclidean_gpr.log_marginal_likelihood
def _get_tangent_targets(self, X, y):
"""Compute the tangent targets, using the provided prior.
Parameters
----------
X : array-like of shape (n_samples, n_features) or list of object
Feature vectors or other representations of training data.
y : array-like of shape (n_samples,) or (n_samples, n_targets)
or (n_samples, n1_targets, n2_targets) for
matrix-valued targets.
Target values. The target must belongs to the manifold space
Returns
-------
tangent_y : array-like of shape (n_samples,) or (n_samples, n_targets)
or (n_samples, n1_targets, n2_targets)
Target projected on the associated (by the prior) tangent space.
"""
base_points = self.prior(X)
return self.metric.log(y, base_point=base_points)
def fit(self, X, y):
"""Fit Wrapped Gaussian process regression model.
The Wrapped Gaussian process is fit through the following steps:
- Compute the tangent dataset using the prior
- Fit a Gaussian process regression on the tangent dataset
- Store the resulting euclidean Gaussian process
Parameters
----------
X : array-like of shape (n_samples, n_features) or list of object
Feature vectors or other representations of training data.
y : array-like of shape (n_samples,) or (n_samples, n_targets)
or (n_samples, n1_targets, n2_targets)
Target values. The target must belongs to the manifold space
Returns
-------
self : object
WrappedGaussianProcessRegressor class instance.
"""
if not gs.all(self.space.belongs(y)):
raise AttributeError("The target values must belongs to the given space")
# compute the tangent dataset using the prior
tangent_y = self._get_tangent_targets(X, y)
self.y_train_shape_ = y.shape[
1:
] # this is really useful when the samples are matrices, or tensor of dim>1
tangent_y = gs.reshape(tangent_y, (y.shape[0], -1)) # flatten the samples.
# fit a gpr on the tangent dataset
self._euclidean_gpr.fit(X, tangent_y)
# update the attributes of the wgpr using the new attributes of the gpr
self.__dict__.update(self._euclidean_gpr.__dict__)
self.y_train_ = y
self.tangent_y_train_ = tangent_y # = self._euclidean_gpr.y_train_
return self
def predict(self, X, return_tangent_std=False, return_tangent_cov=False):
"""Predict using the Gaussian process regression model.
A fitted Wrapped Gaussian process can be use to predict values
through the following steps:
- Use the stored Gaussian process regression on the dataset to
return tangent predictions
- Compute the base-points using the prior
- Map the tangent predictions on the manifold via the metric's exp
with the base-points yielded by the prior
We can also predict based on an unfitted model by using the GP prior.
In addition to the mean of the predictive distribution, optionally also
returns its standard deviation (`return_std=True`) or covariance
(`return_cov=True`). Note that at most one of the two can be requested.
Parameters
----------
X : array-like of shape (n_samples, n_features) or list of object
Query points where the GP is evaluated.
return_tangent_std : bool, default=False
If True, the standard-deviation of the predictive distribution on at
the query points in the tangent space is returned along with the mean.
return_tangent_cov : bool, default=False
If True, the covariance of the joint predictive distribution at
the query points in the tangent space is returned along with the mean.
Returns
-------
y_mean : ndarray of shape (n_samples,) or (n_samples, n_targets)
Mean of predictive distribution a query points.
y_std : ndarray of shape (n_samples,) or (n_samples, n_targets), optional
Standard deviation of predictive distribution at query points in
the tangent space.
Only returned when `return_std` is True.
y_cov : ndarray of shape (n_samples, n_samples) or \
(n_samples, n_samples, n_targets), optional
Covariance of joint predictive distribution a query points
in the tangent space.
Only returned when `return_cov` is True.
In the case where the target is matrix valued,
return the covariance of the vectorized prediction.
"""
euc_result = self._euclidean_gpr.predict(
X, return_cov=return_tangent_cov, return_std=return_tangent_std
)
return_multiple = return_tangent_std or return_tangent_cov
tangent_means = euc_result[0] if return_multiple else euc_result
base_points = self.prior(X)
tangent_means = gs.reshape(
gs.cast(tangent_means, dtype=X.dtype),
(X.shape[0], *self.y_train_shape_),
)
y_mean = self.metric.exp(tangent_means, base_point=base_points)
if return_multiple:
tangent_std_cov = gs.cast(euc_result[1], dtype=X.dtype)
return (y_mean, tangent_std_cov)
return y_mean
def sample_y(self, X, n_samples=1, random_state=0):
"""Draw samples from Wrapped Gaussian process and evaluate at X.
A fitted Wrapped Gaussian process can be use to sample
values through the following steps:
- Use the stored Gaussian process regression on the dataset
to sample tangent values
- Compute the base-points using the prior
- Flatten (and repeat if needed) both the base-points and the
tangent samples to benefit from vectorized computation.
- Map the tangent samples on the manifold via the metric's exp with the
flattened and repeated base-points yielded by the prior
Parameters
----------
X : array-like of shape (n_samples_X, n_features) or list of object
Query points where the WGP is evaluated.
n_samples : int, default=1
Number of samples drawn from the Wrapped Gaussian process per query
point.
random_state : int, RandomState instance or None, default=0
Determines random number generation to randomly draw samples.
Pass an int for reproducible results across multiple function
calls.
Returns
-------
y_samples : ndarray of shape (n_samples_X, n_samples), or \
(n_samples_X, *target_shape, n_samples)
Values of n_samples samples drawn from wrapped Gaussian process and
evaluated at query points.
"""
tangent_samples = self._euclidean_gpr.sample_y(X, n_samples, random_state)
tangent_samples = gs.cast(tangent_samples, dtype=X.dtype)
if gs.ndim(tangent_samples) > 2:
tangent_samples = gs.moveaxis(tangent_samples, -2, -1)
flat_tangent_samples = gs.reshape(tangent_samples, (-1, *self.y_train_shape_))
base_points = gs.repeat(self.prior(X), n_samples, axis=0)
flat_y_samples = self.metric.exp(flat_tangent_samples, base_point=base_points)
y_samples = gs.reshape(
flat_y_samples, (X.shape[0], n_samples, *self.y_train_shape_)
)
if gs.ndim(tangent_samples) > 2:
y_samples = gs.moveaxis(y_samples, 1, -1)
return y_samples