/
pipeline.py
398 lines (349 loc) · 16.9 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
"""Construct and handle Mapper pipelines."""
# License: GNU AGPLv3
from sklearn.pipeline import Pipeline
from .cluster import ParallelClustering
from .nerve import Nerve
from .utils._list_feature_union import ListFeatureUnion
from .utils.pipeline import transformer_from_callable_on_rows, identity
global_pipeline_params = ('memory', 'verbose')
nodes_params = ('scaler', 'filter_func', 'cover')
clust_prepr_params = ('clustering_preprocessing',)
clust_params = ('clusterer', 'n_jobs',
'parallel_backend_prefer')
nerve_params = ('min_intersection',)
clust_prepr_params_prefix = 'pullback_cover__'
nodes_params_prefix = 'pullback_cover__map_and_cover__'
clust_params_prefix = 'clustering__'
nerve_params_prefix = 'nerve__'
class MapperPipeline(Pipeline):
"""Subclass of :class:`sklearn.pipeline.Pipeline` to deal with
pipelines generated by :func:`~gtda.mapper.pipeline.make_mapper_pipeline`.
The :meth:`set_params` method is modified from the corresponding method in
:class:`sklearn.pipeline.Pipeline` to allow for simple access to the
parameters involved in the definition of the Mapper algorithm, without the
need to interface with the nested structure of the Pipeline objects
generated by :func:`~gtda.mapper.pipeline.make_mapper_pipeline`. The
convenience method :meth:`get_mapper_params` shows which parameters can
be set. See the Examples below.
Examples
--------
>>> from sklearn.cluster import DBSCAN
>>> from sklearn.decomposition import PCA
>>> from gtda.mapper import make_mapper_pipeline, CubicalCover
>>> filter_func = PCA(n_components=2)
>>> cover = CubicalCover()
>>> clusterer = DBSCAN()
>>> pipe = make_mapper_pipeline(filter_func=filter_func,
... cover=cover,
... clusterer=clusterer)
>>> print(pipe.get_mapper_params()['clusterer__eps'])
0.5
>>> pipe.set_params(clusterer___eps=0.1)
>>> print(pipe.get_mapper_params()['clusterer__eps'])
0.1
See also
--------
make_mapper_pipeline
"""
# TODO: Abstract away common logic into a more generalisable implementation
def get_mapper_params(self, deep=True):
"""Get all Mapper parameters for this estimator.
Parameters
----------
deep : boolean, optional, default: ``True``
If ``True``, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
pipeline_params = super().get_params(deep=deep)
return {**{param: pipeline_params[param]
for param in global_pipeline_params},
**self._clean_dict_keys(pipeline_params, nodes_params_prefix),
**self._clean_dict_keys(
pipeline_params, clust_prepr_params_prefix),
**self._clean_dict_keys(pipeline_params, clust_params_prefix),
**self._clean_dict_keys(pipeline_params, nerve_params_prefix)}
def set_params(self, **kwargs):
"""Set the Mapper parameters.
Valid parameter keys can be listed with :meth:`get_mapper_params()`.
Returns
-------
self
"""
mapper_nodes_kwargs = self._subset_kwargs(kwargs, nodes_params)
mapper_clust_prepr_kwargs = \
self._subset_kwargs(kwargs, clust_prepr_params)
mapper_clust_kwargs = self._subset_kwargs(kwargs, clust_params)
mapper_nerve_kwargs = self._subset_kwargs(kwargs, nerve_params)
if mapper_nodes_kwargs:
super().set_params(
**{nodes_params_prefix + key: mapper_nodes_kwargs[key]
for key in mapper_nodes_kwargs})
[kwargs.pop(key) for key in mapper_nodes_kwargs]
if mapper_clust_prepr_kwargs:
super().set_params(
**{clust_prepr_params_prefix + key:
mapper_clust_prepr_kwargs[key] for key in
mapper_clust_prepr_kwargs})
[kwargs.pop(key) for key in mapper_clust_prepr_kwargs]
if mapper_clust_kwargs:
super().set_params(
**{clust_params_prefix + key: mapper_clust_kwargs[key]
for key in mapper_clust_kwargs})
[kwargs.pop(key) for key in mapper_clust_kwargs]
if mapper_nerve_kwargs:
super().set_params(
**{nerve_params_prefix + key: mapper_nerve_kwargs[key]
for key in mapper_nerve_kwargs})
[kwargs.pop(key) for key in mapper_nerve_kwargs]
super().set_params(**kwargs)
return self
@staticmethod
def _subset_kwargs(kwargs, param_strings):
return {key: value for key, value in kwargs.items()
if key.startswith(param_strings)}
@staticmethod
def _clean_dict_keys(kwargs, prefix):
return {
key[len(prefix):]: kwargs[key]
for key in kwargs
if (key.startswith(prefix)
and not key.startswith(prefix + 'steps')
and not key.startswith(prefix + 'memory')
and not key.startswith(prefix + 'verbose')
and not key.startswith(prefix + 'transformer_list')
and not key.startswith(prefix + 'n_jobs')
and not key.startswith(prefix + 'transformer_weights')
and not key.startswith(prefix + 'map_and_cover'))
}
def make_mapper_pipeline(scaler=None,
filter_func=None,
cover=None,
clustering_preprocessing=None,
clusterer=None,
n_jobs=None,
parallel_backend_prefer='threads',
graph_step=True,
min_intersection=1,
memory=None,
verbose=False):
"""Construct a MapperPipeline object according to the specified Mapper
steps. [1]_
The role of this function's main parameters is illustrated in `this diagram
<../../../../_images/mapper_pipeline.svg>`_. All computational steps may
be scikit-learn estimators, including Pipeline objects.
Parameters
----------
scaler : object or None, optional, default: ``None``
If ``None``, no scaling is performed. Otherwise, it must be an
object with a ``fit_transform`` method.
filter_func : object, callable or None, optional, default: ``None``
If `None``, PCA (:class:`sklearn.decomposition.PCA`) with 2
components and default parameters is used as a default filter
function. Otherwise, it may be an object with a ``fit_transform``
method, or a callable acting on one-dimensional arrays -- in which
case the callable is applied independently to each row of the
(scaled) data.
cover : object or None, optional, default: ``None``
Covering transformer, e.g. an instance of
:class:`~gtda.mapper.OneDimensionalCover` or of
:class:`~gtda.mapper.CubicalCover`. ``None`` is equivalent to passing
an instance of :class:`~gtda.mapper.CubicalCover` with its default
parameters.
clustering_preprocessing : object or None, optional, default: ``None``
If not ``None``, it is a transformer which is applied to the
data independently to the `scaler` -> `filter_func` -> `cover`
pipeline. Clustering is then performed on portions (determined by
the `scaler` -> `filter_func` -> `cover` pipeline) of the transformed
data.
clusterer : object or None, optional, default: ``None``
Clustering object with a ``fit`` method which stores cluster labels.
``None`` is equivalent to passing an instance of
:class:`sklearn.cluster.DBSCAN` with its default parameters.
n_jobs : int or None, optional, default: ``None``
The number of jobs to use in a joblib-parallel application of the
clustering step across pullback cover sets. To be used in
conjunction with `parallel_backend_prefer`. ``None`` means 1 unless
in a :obj:`joblib.parallel_backend` context. ``-1`` means using all
processors.
parallel_backend_prefer : ``'processes'`` | ``'threads'``, optional, \
default: ``'threads'``
Soft hint for the default joblib backend to use in a joblib-parallel
application of the clustering step across pullback cover sets. To be
used in conjunction with `n_jobs`. The default process-based backend is
'loky' and the default thread-based backend is 'threading'. See [2]_.
graph_step : bool, optional, default: ``True``
Whether the resulting pipeline should stop at the calculation of the
Mapper cover, or include the construction of the Mapper graph.
min_intersection : int, optional, default: ``1``
Minimum size of the intersection between clusters required for creating
an edge in the Mapper graph. Ignored if `graph_step` is set to
``False``.
memory : None, str or object with the joblib.Memory interface, \
optional, default: ``None``
Used to cache the fitted transformers of the pipeline. By default, no
caching is performed. If a string is given, it is the path to the
caching directory. Enabling caching triggers a clone of the
transformers before fitting. Therefore, the transformer instance
given to the pipeline cannot be inspected directly. Use the attribute
``named_steps`` or ``steps`` to inspect estimators within the
pipeline. Caching the transformers is advantageous when fitting is
time consuming.
verbose : bool, optional, default: ``False``
If True, the time elapsed while fitting each step will be printed as it
is completed.
Returns
-------
mapper_pipeline : :class:`~gtda.mapper.pipeline.MapperPipeline` object
Output Mapper pipeline.
Examples
--------
>>> # Example of basic usage with default parameters
>>> import numpy as np
>>> from gtda.mapper import make_mapper_pipeline
>>> mapper = make_mapper_pipeline()
>>> print(mapper.__class__)
<class 'gtda.mapper.pipeline.MapperPipeline'>
>>> mapper_params = mapper.get_mapper_params()
>>> print(mapper_params['filter_func'].__class__)
<class 'sklearn.decomposition._pca.PCA'>
>>> print(mapper_params['cover'].__class__)
<class 'gtda.mapper.cover.CubicalCover'>
>>> print(mapper_params['clusterer'].__class__)
<class 'sklearn.cluster._dbscan.DBSCAN'>
>>> X = np.random.random((10000, 4)) # 10000 points in 4-dimensional space
>>> mapper_graph = mapper.fit_transform(X) # Create the mapper graph
>>> print(type(mapper_graph))
igraph.Graph
>>> # Node metadata stored as dict in graph object
>>> print(mapper_graph['node_metadata'].keys())
dict_keys(['node_id', 'pullback_set_label', 'partial_cluster_label',
'node_elements'])
>>> # Find which points belong to first node of graph
>>> node_id, node_elements = mapper_graph['node_metadata']['node_id'],
... mapper_graph['node_metadata']['node_elements']
>>> print(f'Node Id: {node_id[0]}, Node elements: {node_elements[0]}, '
f'Data points: {X[node_elements[0]]}')
Node Id: 0,
Node elements: [8768],
Data points: [[0.01838998 0.76928754 0.98199244 0.0074299 ]]
>>> #######################################################################
>>> # Example using a scaler from scikit-learn, a filter function from
>>> # gtda.mapper.filter, and a clusterer from gtda.mapper.cluster
>>> from sklearn.preprocessing import MinMaxScaler
>>> from gtda.mapper import Projection, FirstHistogramGap
>>> scaler = MinMaxScaler()
>>> filter_func = Projection(columns=[0, 1])
>>> clusterer = FirstHistogramGap()
>>> mapper = make_mapper_pipeline(scaler=scaler,
... filter_func=filter_func,
... clusterer=clusterer)
>>> #######################################################################
>>> # Example using a callable acting on each row of X separately
>>> import numpy as np
>>> from gtda.mapper import OneDimensionalCover
>>> cover = OneDimensionalCover()
>>> mapper.set_params(scaler=None, filter_func=np.sum, cover=cover)
>>> #######################################################################
>>> # Example setting the memory parameter to cache each step and avoid
>>> # recomputation of early steps
>>> from tempfile import mkdtemp
>>> from shutil import rmtree
>>> cachedir = mkdtemp()
>>> mapper.set_params(memory=cachedir, verbose=True)
>>> mapper_graph = mapper.fit_transform(X)
[Pipeline] ............ (step 1 of 3) Processing scaler, total= 0.0s
[Pipeline] ....... (step 2 of 3) Processing filter_func, total= 0.0s
[Pipeline] ............. (step 3 of 3) Processing cover, total= 0.0s
[Pipeline] .... (step 1 of 3) Processing pullback_cover, total= 0.0s
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 0.3s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.0s
>>> mapper.set_params(min_intersection=3)
>>> mapper_graph = mapper.fit_transform(X)
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.0s
>>> # Clear the cache directory when you don't need it anymore
>>> rmtree(cachedir)
>>> #######################################################################
>>> # Example using a large dataset for which parallelism in
>>> # clustering across the pullback cover sets can be beneficial
>>> from sklearn.cluster import DBSCAN
>>> mapper = make_mapper_pipeline(clusterer=DBSCAN(),
... n_jobs=6,
... memory=mkdtemp(),
... verbose=True)
>>> X = np.random.random((100000, 4))
>>> mapper.fit_transform(X)
[Pipeline] ............ (step 1 of 3) Processing scaler, total= 0.0s
[Pipeline] ....... (step 2 of 3) Processing filter_func, total= 0.1s
[Pipeline] ............. (step 3 of 3) Processing cover, total= 0.6s
[Pipeline] .... (step 1 of 3) Processing pullback_cover, total= 0.7s
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 1.9s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.3s
>>> mapper.set_params(n_jobs=1)
>>> mapper.fit_transform(X)
[Pipeline] ........ (step 2 of 3) Processing clustering, total= 5.3s
[Pipeline] ............. (step 3 of 3) Processing nerve, total= 0.3s
See also
--------
:class:`MapperPipeline`,
:meth:`~gtda.mapper.utils.decorators.method_to_transform`
References
----------
.. [1] G. Singh, F. Mémoli, and G. Carlsson, "Topological methods for the
analysis of high dimensional data sets and 3D object recognition";
in *SPBG*, pp. 91--100, 2007.
.. [2] "Thread-based parallelism vs process-based parallelism", in
`joblib documentation
<https://joblib.readthedocs.io/en/latest/parallel.html>`_.
"""
# TODO: Implement parameter validation
if scaler is None:
_scaler = identity(validate=False)
else:
_scaler = scaler
# If filter_func is not a scikit-learn transformer, hope it as a
# callable to be applied on each row separately. Then attempt to create a
# FunctionTransformer object to implement this behaviour.
if filter_func is None:
from sklearn.decomposition import PCA
_filter_func = PCA(n_components=2)
elif not hasattr(filter_func, 'fit_transform'):
_filter_func = transformer_from_callable_on_rows(filter_func)
else:
_filter_func = filter_func
if cover is None:
from .cover import CubicalCover
_cover = CubicalCover()
else:
_cover = cover
if clustering_preprocessing is None:
_clustering_preprocessing = identity(validate=True)
else:
_clustering_preprocessing = clustering_preprocessing
if clusterer is None:
from sklearn.cluster import DBSCAN
_clusterer = DBSCAN()
else:
_clusterer = clusterer
map_and_cover = Pipeline(
steps=[('scaler', _scaler),
('filter_func', _filter_func),
('cover', _cover)],
verbose=verbose)
all_steps = [
('pullback_cover', ListFeatureUnion(
[('clustering_preprocessing', _clustering_preprocessing),
('map_and_cover', map_and_cover)])),
('clustering', ParallelClustering(
clusterer=_clusterer,
n_jobs=n_jobs,
parallel_backend_prefer=parallel_backend_prefer))
]
if graph_step:
all_steps.append(('nerve', Nerve(min_intersection=min_intersection)))
mapper_pipeline = MapperPipeline(
steps=all_steps, memory=memory, verbose=verbose)
return mapper_pipeline