/
wrappers.py
474 lines (372 loc) · 15.5 KB
/
wrappers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
"""Meta-estimators for parallelizing scikit-learn."""
import logging
import dask.array as da
import dask.dataframe as dd
import dask.delayed
import numpy as np
import sklearn.base
import sklearn.metrics
from sklearn.utils.validation import check_is_fitted
from dask_ml.utils import _timer
from ._partial import fit
from ._utils import copy_learned_attributes
from .metrics import check_scoring, get_scorer
logger = logging.getLogger(__name__)
class ParallelPostFit(sklearn.base.BaseEstimator):
"""Meta-estimator for parallel predict and transform.
Parameters
----------
estimator : Estimator
The underlying estimator that is fit.
scoring : string or callable, optional
A single string (see :ref:`scoring_parameter`) or a callable
(see :ref:`scoring`) to evaluate the predictions on the test set.
For evaluating multiple metrics, either give a list of (unique)
strings or a dict with names as keys and callables as values.
NOTE that when using custom scorers, each scorer should return a
single value. Metric functions returning a list/array of values
can be wrapped into multiple scorers that return one value each.
See :ref:`multimetric_grid_search` for an example.
.. warning::
If None, the estimator's default scorer (if available) is used.
Most scikit-learn estimators will convert large Dask arrays to
a single NumPy array, which may exhaust the memory of your worker.
You probably want to always specify `scoring`.
Notes
-----
.. warning::
This class is not appropriate for parallel or distributed *training*
on large datasets. For that, see :class:`Incremental`, which provides
distributed (but sequential) training.
This estimator does not parallelize the training step. This simply calls
the underlying estimators's ``fit`` method called and copies over the
learned attributes to ``self`` afterwards.
It is helpful for situations where your training dataset is relatively
small (fits on a single machine) but you need to predict or transform
a much larger dataset. ``predict``, ``predict_proba`` and ``transform``
will be done in parallel (potentially distributed if you've connected
to a ``dask.distributed.Client``).
Note that many scikit-learn estimators already predict and transform in
parallel. This meta-estimator may still be useful in those cases when your
dataset is larger than memory, as the distributed scheduler will ensure the
data isn't all read into memory at once.
See Also
--------
Incremental
Examples
--------
>>> from sklearn.ensemble import GradientBoostingClassifier
>>> import sklearn.datasets
>>> import dask_ml.datasets
Make a small 1,000 sample 2 training dataset and fit normally.
>>> X, y = sklearn.datasets.make_classification(n_samples=1000,
... random_state=0)
>>> clf = ParallelPostFit(estimator=GradientBoostingClassifier(),
... scoring='accuracy')
>>> clf.fit(X, y)
ParallelPostFit(estimator=GradientBoostingClassifier(...))
>>> clf.classes_
array([0, 1])
Transform and predict return dask outputs for dask inputs.
>>> X_big, y_big = dask_ml.datasets.make_classification(n_samples=100000,
random_state=0)
>>> clf.predict(X)
dask.array<predict, shape=(10000,), dtype=int64, chunksize=(1000,)>
Which can be computed in parallel.
>>> clf.predict_proba(X).compute()
array([[0.99141094, 0.00858906],
[0.93178389, 0.06821611],
[0.99129105, 0.00870895],
...,
[0.97996652, 0.02003348],
[0.98087444, 0.01912556],
[0.99407016, 0.00592984]])
"""
def __init__(self, estimator=None, scoring=None):
self.estimator = estimator
self.scoring = scoring
@property
def _postfit_estimator(self):
# The estimator instance to use for postfit tasks like score
return self.estimator
def fit(self, X, y=None, **kwargs):
"""Fit the underlying estimator.
Parameters
----------
X, y : array-like
**kwargs
Additional fit-kwargs for the underlying estimator.
Returns
-------
self : object
"""
logger.info("Starting fit")
with _timer("fit", _logger=logger):
result = self.estimator.fit(X, y, **kwargs)
# Copy over learned attributes
copy_learned_attributes(result, self)
copy_learned_attributes(result, self.estimator)
return self
def transform(self, X):
"""Transform block or partition-wise for dask inputs.
For dask inputs, a dask array or dataframe is returned. For other
inputs (NumPy array, pandas dataframe, scipy sparse matrix), the
regular return value is returned.
If the underlying estimator does not have a ``transform`` method, then
an ``AttributeError`` is raised.
Parameters
----------
X : array-like
Returns
-------
transformed : array-like
"""
transform = self._check_method("transform")
if isinstance(X, da.Array):
return X.map_blocks(transform)
elif isinstance(X, dd._Frame):
return _apply_partitionwise(X, transform)
else:
return transform(X)
def score(self, X, y):
"""Returns the score on the given data.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Input data, where n_samples is the number of samples and
n_features is the number of features.
y : array-like, shape = [n_samples] or [n_samples, n_output], optional
Target relative to X for classification or regression;
None for unsupervised learning.
Returns
-------
score : float
return self.estimator.score(X, y)
"""
scoring = self.scoring
if not scoring:
if type(self._postfit_estimator).score == sklearn.base.RegressorMixin.score:
scoring = "r2"
elif (
type(self._postfit_estimator).score
== sklearn.base.ClassifierMixin.score
):
scoring = "accuracy"
else:
scoring = self.scoring
if scoring:
if not dask.is_dask_collection(X) and not dask.is_dask_collection(y):
scorer = sklearn.metrics.get_scorer(scoring)
else:
scorer = get_scorer(scoring)
return scorer(self, X, y)
else:
return self._postfit_estimator.score(X, y)
def predict(self, X):
"""Predict for X.
For dask inputs, a dask array or dataframe is returned. For other
inputs (NumPy array, pandas dataframe, scipy sparse matrix), the
regular return value is returned.
Parameters
----------
X : array-like
Returns
-------
y : array-like
"""
predict = self._check_method("predict")
if isinstance(X, da.Array):
return X.map_blocks(predict, dtype="int", drop_axis=1)
elif isinstance(X, dd._Frame):
return _apply_partitionwise(X, predict)
else:
return predict(X)
def predict_proba(self, X):
"""Predict for X.
For dask inputs, a dask array or dataframe is returned. For other
inputs (NumPy array, pandas dataframe, scipy sparse matrix), the
regular return value is returned.
If the underlying estimator does not have a ``predict_proba``
method, then an ``AttributeError`` is raised.
Parameters
----------
X : array or dataframe
Returns
-------
y : array-like
"""
predict_proba = self._check_method("predict_proba")
if isinstance(X, da.Array):
# XXX: multiclass
return X.map_blocks(
predict_proba, dtype="float", chunks=(X.chunks[0], len(self.classes_))
)
elif isinstance(X, dd._Frame):
return _apply_partitionwise(X, predict_proba)
else:
return predict_proba(X)
def _check_method(self, method):
"""Check if self.estimator has 'method'.
Raises
------
AttributeError
"""
estimator = self._postfit_estimator
if not hasattr(estimator, method):
msg = "The wrapped estimator '{}' does not have a '{}' method.".format(
estimator, method
)
raise AttributeError(msg)
return getattr(estimator, method)
class Incremental(ParallelPostFit):
"""Metaestimator for feeding Dask Arrays to an estimator blockwise.
This wrapper provides a bridge between Dask objects and estimators
implementing the ``partial_fit`` API. These *incremental learners* can
train on batches of data. This fits well with Dask's blocked data
structures.
See the `list of incremental learners`_ in the scikit-learn documentation
for a list of estimators that implement the ``partial_fit`` API. Note that
`Incremental` is not limited to just these classes, it will work on any
estimator implementing ``partial_fit``, including those defined outside of
scikit-learn itself.
Calling :meth:`Incremental.fit` with a Dask Array will pass each block of
the Dask array or arrays to ``estimator.partial_fit`` *sequentially*.
Like :class:`ParallelPostFit`, the methods available after fitting (e.g.
:meth:`Incremental.predict`, etc.) are all parallel and delayed.
The ``estimator_`` attribute is a clone of `estimator` that was actually
used during the call to ``fit``. All attributes learned during training
are available on ``Incremental`` directly.
.. _list of incremental learners: http://scikit-learn.org/stable/modules/scaling_strategies.html#incremental-learning # noqa
Parameters
----------
estimator : Estimator
Any object supporting the scikit-learn ``parital_fit`` API.
scoring : string or callable, optional
A single string (see :ref:`scoring_parameter`) or a callable
(see :ref:`scoring`) to evaluate the predictions on the test set.
For evaluating multiple metrics, either give a list of (unique)
strings or a dict with names as keys and callables as values.
NOTE that when using custom scorers, each scorer should return a
single value. Metric functions returning a list/array of values
can be wrapped into multiple scorers that return one value each.
See :ref:`multimetric_grid_search` for an example.
.. warning::
If None, the estimator's default scorer (if available) is used.
Most scikit-learn estimators will convert large Dask arrays to
a single NumPy array, which may exhaust the memory of your worker.
You probably want to always specify `scoring`.
random_state : int or numpy.random.RandomState, optional
Random object that determines how to shuffle blocks.
shuffle_blocks : bool, default True
Determines whether to call ``partial_fit`` on a randomly selected chunk
of the Dask arrays (default), or to fit in sequential order. This does
not control shuffle between blocks or shuffling each block.
Attributes
----------
estimator_ : Estimator
A clone of `estimator` that was actually fit during the ``.fit`` call.
See Also
--------
ParallelPostFit
Examples
--------
>>> from dask_ml.wrappers import Incremental
>>> from dask_ml.datasets import make_classification
>>> import sklearn.linear_model
>>> X, y = make_classification(chunks=25)
>>> est = sklearn.linear_model.SGDClassifier()
>>> clf = Incremental(est, scoring='accuracy')
>>> clf.fit(X, y, classes=[0, 1])
When used inside a grid search, prefix the underlying estimator's
parameter names with ``estimator__``.
>>> from sklearn.model_selection import GridSearchCV
>>> param_grid = {"estimator__alpha": [0.1, 1.0, 10.0]}
>>> gs = GridSearchCV(clf, param_grid)
>>> gs.fit(X, y, classes=[0, 1])
"""
def __init__(
self, estimator=None, scoring=None, shuffle_blocks=True, random_state=None
):
self.shuffle_blocks = shuffle_blocks
self.random_state = random_state
super(Incremental, self).__init__(estimator=estimator, scoring=scoring)
@property
def _postfit_estimator(self):
check_is_fitted(self, "estimator_")
return self.estimator_
def _fit_for_estimator(self, estimator, X, y, **fit_kwargs):
check_scoring(estimator, self.scoring)
if not dask.is_dask_collection(X) and not dask.is_dask_collection(y):
result = estimator.partial_fit(X=X, y=y, **fit_kwargs)
else:
result = fit(
estimator,
X,
y,
random_state=self.random_state,
shuffle_blocks=self.shuffle_blocks,
**fit_kwargs
)
copy_learned_attributes(result, self)
self.estimator_ = result
return self
def fit(self, X, y=None, **fit_kwargs):
estimator = sklearn.base.clone(self.estimator)
self._fit_for_estimator(estimator, X, y, **fit_kwargs)
return self
def partial_fit(self, X, y=None, **fit_kwargs):
"""Fit the underlying estimator.
If this estimator has not been previously fit, this is identical to
:meth:`Incremental.fit`. If it has been previously fit,
``self.estimator_`` is used as the starting point.
Parameters
----------
X, y : array-like
**kwargs
Additional fit-kwargs for the underlying estimator.
Returns
-------
self : object
"""
estimator = getattr(self, "estimator_", None)
if estimator is None:
estimator = sklearn.base.clone(self.estimator)
return self._fit_for_estimator(estimator, X, y, **fit_kwargs)
def _first_block(dask_object):
"""Extract the first block / partition from a dask object
"""
if isinstance(dask_object, da.Array):
if dask_object.ndim > 1 and dask_object.numblocks[-1] != 1:
raise NotImplementedError(
"IID estimators require that the array "
"blocked only along the first axis. "
"Rechunk your array before fitting."
)
shape = (dask_object.chunks[0][0],)
if dask_object.ndim > 1:
shape = shape + (dask_object.chunks[1][0],)
return da.from_delayed(
dask_object.to_delayed().flatten()[0], shape, dask_object.dtype
)
if isinstance(dask_object, dd._Frame):
return dask_object.get_partition(0)
else:
return dask_object
def _apply_partitionwise(X, func):
"""Apply a prediction partition-wise to a dask.dataframe"""
sample = func(X._meta_nonempty)
if sample.ndim <= 1:
p = ()
else:
p = (sample.shape[1],)
if isinstance(sample, np.ndarray):
blocks = X.to_delayed()
arrays = [
da.from_delayed(
dask.delayed(func)(block), shape=(np.nan,) + p, dtype=sample.dtype
)
for block in blocks
]
return da.concatenate(arrays)
else:
return X.map_partitions(func, meta=sample)