Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Improve wrapped sklearn class repr - simplify cross val for Dataset / MLDataset #228

Open
wants to merge 7 commits into
base: cv-xarray-issue-204
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elm/mldataset/serialize_mixin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import (absolute_import, division, print_function, unicode_literals,)
import dill

class SerializeMixin:
'''A mixin for serialization of estimators via dill'''
def dumps(self, protocol=None, byref=None, fmode=None, recurse=None):
Expand Down
22 changes: 13 additions & 9 deletions elm/mldataset/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ def is_arr(arr, raise_err=False):
return _is_arr


def _split_transformer_result(Xt, y):
if isinstance(Xt, Sequence) and len(Xt) == 2 and (Xt[1] is None or is_arr(Xt[1])):
Xt, new_y = Xt
else:
new_y = y
if y is None and new_y is not None:
y = new_y
assert not isinstance(y, tuple), repr((Xt, y, new_y))
return Xt, y
def _is_xy_tuple(result, typ=tuple):
if typ and not isinstance(typ, tuple):
typ = (typ,)
typ = typ + (tuple,)
return isinstance(result, typ) and len(result) == 2


def _split_transformer_result(X, y, typ=tuple):
if _is_xy_tuple(X, typ=typ):
X, y2 = X
if y2 is not None and y is None:
y = y2
return X, y
77 changes: 24 additions & 53 deletions elm/mldataset/wrap_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@

import numpy as np
from sklearn.base import BaseEstimator, _pprint
from dask.utils import derived_from # May be useful here?
from sklearn.utils.metaestimators import if_delegate_has_method # May be useful here?
from sklearn.linear_model import LinearRegression as skLinearRegression
from sklearn.metrics import r2_score, accuracy_score
from xarray_filters.mldataset import MLDataset
from xarray_filters.reshape import to_features, to_xy_arrays
from xarray_filters.func_signatures import filter_args_kwargs
from xarray_filters.constants import FEATURES_LAYER_DIMS, FEATURES_LAYER
from elm.mldataset.util import _split_transformer_result
Expand All @@ -26,22 +23,17 @@ def get_row_index(X, features_layer=None):
arr = X[features_layer]
return getattr(arr, arr.dims[0])


def _as_numpy_arrs(self, X, y=None, **kw):
'''Convert X, y for a scikit-learn method numpy.ndarrays
'''
X, y = _split_transformer_result(X, y)
if isinstance(X, np.ndarray):
return X, y, kw.get('row_idx', None)
if isinstance(X, xr.Dataset):
X = MLDataset(X)
if hasattr(X, 'has_features'):
if X.has_features(raise_err=False):
pass
else:
X = X.to_features()
if isinstance(X, (xr.Dataset, MLDataset)):
X = MLDataset(X).to_features()
if isinstance(y, (xr.Dataset, MLDataset)):
y = MLDataset(y).to_features()
row_idx = get_row_index(X)
if hasattr(X, 'to_array') and not isinstance(X, np.ndarray):
X, y = X.to_array(y=y)
X, y = to_xy_arrays(X, y=y)
if row_idx is not None:
self._temp_row_idx = row_idx
return X, y, row_idx
Expand Down Expand Up @@ -78,8 +70,6 @@ def _call_sk_method(self, sk_method, X=None, y=None, do_split=True, **kw):
if func is None:
raise ValueError('{} is not an attribute of {}'.format(sk_method, _cls))
X, y, row_idx = self._as_numpy_arrs(X, y=y)
if do_split:
X, y = _split_transformer_result(X, y)
if row_idx is not None:
self._temp_row_idx = row_idx
kw.update(dict(self=self, X=X))
Expand All @@ -103,8 +93,8 @@ def _predict_steps(self, X, y=None, row_idx=None, sk_method=None, **kw):
row_idx = getattr(self, '_temp_row_idx', None)
if y is not None:
kw['y'] = y
y3 = self._call_sk_method(sk_method, X2, do_split=False, **kw)
return y3, row_idx
out = self._call_sk_method(sk_method, X2, do_split=True, **kw)
return out, row_idx

def predict(self, X, row_idx=None, **kw):
'''Predict from MLDataset X and return an MLDataset with
Expand All @@ -129,28 +119,30 @@ def predict(self, X, row_idx=None, **kw):
'''
y, row_idx = self._predict_steps(X, row_idx=row_idx,
sk_method='predict', **kw)
if row_idx is None:
y = y[0]
if row_idx is None or getattr(self, '_predict_as_np', False):
return y
return self._from_numpy_arrs(y, row_idx)

def predict_proba(self, X, row_idx=None, **kw):
proba, row_idx = self._predict_steps(X, row_idx=row_idx,
sk_method='predict_proba', **kw)
return proba
return proba[0]

def predict_log_proba(self, X, row_idx=None, **kw):
log_proba, row_idx = self._predict_steps(X, row_idx=row_idx,
sk_method='predict_log_proba',
**kw)
return log_proba
return log_proba[0]

def decision_function(self, X, row_idx=None, **kw):
d, row_idx = self._predict_steps(X, row_idx=row_idx,
sk_method='decision_function',
**kw)
return d
return d[0]

def fit(self, X, y=None, **kw):
X, y = _split_transformer_result(X, y)
self._call_sk_method('fit', X, y=y, **kw)
return self

Expand All @@ -159,6 +151,11 @@ def _fit(self, X, y=None, **kw):
models and must take X, y as numpy arrays'''
return self._call_sk_method('_fit', X, y=y, do_split=False, **kw)

def partial_fit(self, X, y=None, **kw):
X, y = _split_transformer_result(X, y)
self._call_sk_method('partial_fit', X, y=y, **kw)
return self

def transform(self, X, y=None, **kw):
if hasattr(self._cls, 'transform'):
return self._call_sk_method('transform', X, y=y, **kw)
Expand All @@ -176,41 +173,15 @@ def fit_transform(self, X, y=None, **kw):
self.fit(*args, **kw)
return self._call_sk_method('transform', *args, **kw)

def __repr__(self):
class_name = getattr(self, '_cls_name', self._cls.__class__.__name__)
return '%s(%s)' % (class_name, _pprint(self.get_params(deep=False),
offset=len(class_name),),)

def fit_predict(self, X, y=None, **kw):
return self.fit(X, y=y, **kw).predict(X)

def _regressor_default_score(self, X, y, sample_weight=None, row_idx=None, **kw):
X, y = _split_transformer_result(X, y)
y_pred, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='predict',
**kw)
return r2_score(y, y_pred, sample_weight=sample_weight,
multioutput='variance_weighted')

def _classifier_default_score(self, X, y=None, sample_weight=None, row_idx=None, **kw):
X, y = _split_transformer_result(X, y)
y_pred, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='predict',
**kw)
return accuracy_score(y, y_pred, sample_weight=sample_weight)

def score(self, X, y=None, sample_weight=None, row_idx=None, **kw):

if self._cls._estimator_type == 'regressor':
func = self._regressor_default_score
elif self._cls._estimator_type == 'classifier':
func = self._classifier_default_score
else:
func = None
if func:
return func(X, y, sample_weight=sample_weight, row_idx=row_idx, **kw)
self._predict_as_np = True
kw['sample_weight'] = sample_weight
score, row_idx = self._predict_steps(X, row_idx=row_idx, y=y,
sk_method='score',
**kw)
return score
self._predict_as_np = False
return score[0]

1 change: 1 addition & 0 deletions elm/model_selection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
GridSearchCV,
RandomizedSearchCV)
from elm.model_selection.ea_searchcv import EaSearchCV
from elm.model_selection.cross_validation import CVCacheSampler
70 changes: 70 additions & 0 deletions elm/model_selection/cross_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dask_searchcv.methods import CVCache
import numpy as np

class CVCacheSampler(CVCache):
def __init__(self, sampler, splits=None, pairwise=None, cache=True):
self.sampler = sampler
assert cache is True
CVCache.__init__(self, splits, pairwise=pairwise, cache=True)

def _call_sampler(self, X, y=None, n=None, is_x=True, is_train=False):
if self.splits is None:
raise ValueError('Expected .splits to before _call_sampler')
if y is not None:
raise ValueError('y should be None (found {})'.format(type(y)))
func = getattr(self.sampler, 'fit_transform', None)
if func is None:
func = getattr(self.sampler, 'transform', self.sampler)
if not callable(func):
raise ValueError('Expected "sampler" to be callable or have fit_transform/transform methods')
out = func(X, y=y, is_x=is_x, is_train=is_train)
return out

def _extract(self, X, y, n, is_x=True, is_train=True):
if self.cache is not None and (n, is_x, is_train) in self.cache:
return self.cache[n, is_x, is_train]

inds = self.splits[n][0] if is_train else self.splits[n][1]

if self.cache in (None, False):
raise ValueError('Must set cache_cv=True with _call_sampler')
result = self._call_sampler(np.array(X)[inds])
if isinstance(result, tuple) and len(result) == 2:
(self.cache[n, True, is_train],
self.cache[n, False, is_train]) = result
else:
self.cache[n, True, is_train] = result
return result

def _extract_pairwise(self, X, y, n, is_train=True):
if self.cache is not None and (n, True, is_train) in self.cache:
return self.cache[n, True, is_train]

if not hasattr(X, "shape"):
raise ValueError("Precomputed kernels or affinity matrices have "
"to be passed as arrays or sparse matrices.")
if X.shape[0] != X.shape[1]:
raise ValueError("X should be a square kernel matrix")
train, test = self.splits[n]
result = X[np.ix_(train if is_train else test, train)]
result = self._call_sampler(result)
if _is_xy_tuple(result):
if self.cache is not None:
(self.cache[n, True, is_train],
self.cache[n, False, is_train]) = result
elif self.cache is not None:
self.cache[n, True, is_train] = result
return result

def extract(self, X, y, n, is_x=True, is_train=True):
if is_x:
if self.pairwise:
return self._extract_pairwise(X, y, n, is_train=is_train)
return self._extract(X, y, n, is_x=is_x, is_train=is_train)


def cv_split_sampler(sampler, cv, X, y, groups, is_pairwise, cache):
return CVCacheSampler(sampler=sampler,
splits=list(cv.split(X, y, groups)),
pairwise=is_pairwise,
cache=cache)
49 changes: 35 additions & 14 deletions elm/model_selection/ea_searchcv.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
RandomizedSearchCV,
DaskBaseSearchCV,
_randomized_parameters)
from dask_searchcv.utils import is_pipeline
import numpy as np
from elm.model_selection.evolve import (fit_ea,
DEFAULT_CONTROL,
Expand All @@ -19,7 +18,9 @@
from elm.mldataset.util import is_arr
from elm.model_selection.sorting import pareto_front
from elm.model_selection.base import base_selection
from elm.model_selection.cross_validation import cv_split_sampler
from elm.pipeline import Pipeline
#from sklearn.pipeline import Pipeline
from xarray_filters.func_signatures import filter_kw_and_run_init
from xarray_filters.constants import DASK_CHUNK_N
from xarray_filters import MLDataset
Expand Down Expand Up @@ -62,8 +63,11 @@ def _concat_cv_results(cv1, cv2, gen=0):
by cross-validated evolutionary algorithm search over a parameter grid.\
"""
_ea_parameters = _randomized_parameters + """\
ngen : Number of generations (each generation uses
dask_searchcv.model_selection.RandomizedSearchCV)

sampler : A callable or instance with a "fit_transform" or "transform" method.
The callable takes arguments X and **kw, where X is an iterable
of arguments that make 1 sample, e.g.
``('file_1.nc', 'file_2.nc', 'file_3.nc')``
score_weights : None if doing single objective minimization or a sequence of
weights to use for flipping minimization to maximization, e.g.
[1, -1, 1] would minimize the 1st and 3rd objectives and maximize the second
Expand All @@ -86,12 +90,15 @@ def _concat_cv_results(cv1, cv2, gen=0):
'mu': 4,
'k': 4,
'early_stop': None

}
model_selection_kwargs : Keyword arguments passed to the model selection
callable (if given) otherwise ignored
select_with_test : Select / sort models based on test batch scores(True is default)
avoid_repeated_params : Avoid repeated parameters (True by default)
refit_Xy : If using ``refit=True``, then ``refit_Xy`` is either ``(X, y)`` for
refitting the best estimator, or ``X`` (array-like)
ngen : Number of generations (each generation uses
dask_searchcv.model_selection.RandomizedSearchCV)

"""
_ea_example = """\
>>> from sklearn import svm, datasets
Expand Down Expand Up @@ -126,7 +133,10 @@ def _concat_cv_results(cv1, cv2, gen=0):
'std_fit_time', 'std_score_time', 'std_test_score', 'std_train_score'...]\
"""

class EaSearchCV(RandomizedSearchCV, SklearnMixin, SerializeMixin):
def passthrough_sampler(X, y=None, **kw):
return X, y

class EaSearchCV(RandomizedSearchCV, SerializeMixin):

__doc__ = _DOC_TEMPLATE.format(name="EaSearchCV",
oneliner=_ea_oneliner,
Expand All @@ -137,17 +147,21 @@ class EaSearchCV(RandomizedSearchCV, SklearnMixin, SerializeMixin):
def __init__(self, estimator, param_distributions,
n_iter=10,
random_state=None,
ngen=3, score_weights=None,
sort_fitness=pareto_front,
model_selection=None,
model_selection_kwargs=None,
select_with_test=True,
ngen=3,
avoid_repeated_params=True,
scoring=None,
iid=True, refit=True,
iid=True, refit=True, refit_Xy=None,
cv=None, error_score='raise', return_train_score=True,
scheduler=None, n_jobs=-1, cache_cv=None):
scheduler=None, n_jobs=-1, cache_cv=True,
sampler=None,
score_weights=None,
sort_fitness=pareto_front,
model_selection=None,
model_selection_kwargs=None,
select_with_test=True):
filter_kw_and_run_init(RandomizedSearchCV.__init__, **locals())
self.sampler = sampler
self.refit_Xy = refit_Xy
self.ngen = ngen
self.select_with_test = select_with_test
self.model_selection = model_selection
Expand All @@ -156,6 +170,14 @@ def __init__(self, estimator, param_distributions,
self.avoid_repeated_params = avoid_repeated_params
self.cv_results_all_gen_ = {}

def _get_cv_split_refit_Xy(self):
if self.sampler:
sampler = self.sampler
else:
sampler = passthrough_sampler
cv_split = partial(cv_split_sampler, sampler)
return cv_split, self.refit_Xy

def _close(self):
self.cv_results_ = getattr(self, 'cv_results_all_gen_', self.cv_results_)
to_del = ('_ea_gen', 'cv_results_all_gen_',
Expand Down Expand Up @@ -270,7 +292,6 @@ def fit(self, X, y=None, groups=None, **fit_params):
if not self.get_params('sampler'):
X, y = self._as_dask_array(X, y=y)
for self._gen in range(self.ngen):
print('Generation', self._gen)
RandomizedSearchCV.fit(self, X, y, groups=groups, **fit_params)
fitnesses = self._get_cv_scores()
self.cv_results_all_gen_ = _concat_cv_results(self.cv_results_all_gen_,
Expand Down
Loading