From d5550fe65eceb4657ec744551830609e81bc31f5 Mon Sep 17 00:00:00 2001 From: Igor Carrara <94047258+carraraig@users.noreply.github.com> Date: Thu, 21 Sep 2023 18:21:05 +0200 Subject: [PATCH] No parallel (#488) * No Paralle WS * No Paralle CS * Remove dependency * [pre-commit.ci] auto fixes from pre-commit.com hooks * Solve Test * Solve Test * [pre-commit.ci] auto fixes from pre-commit.com hooks * Solve Test * Solve Test --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- docs/source/whats_new.rst | 1 + moabb/evaluations/evaluations.py | 463 ++++++++++++++----------------- 2 files changed, 212 insertions(+), 252 deletions(-) diff --git a/docs/source/whats_new.rst b/docs/source/whats_new.rst index e3e76046d..faa2e12fa 100644 --- a/docs/source/whats_new.rst +++ b/docs/source/whats_new.rst @@ -75,6 +75,7 @@ Bugs - Fixing dataset downloader from servers with non-http (PR :gh:`433` by `Sara Sedlar`_) - Fix ``dataset_list`` to include deprecated datasets (PR :gh:`464` by `Bruno Aristimunha`_) - Fixed bug in :func:`moabb.analysis.results.get_string_rep` to handle addresses such as 0x__0A as well (PR :gh:`468` by `Anton Andreev`_) +- Removing joblib Parallel (:gh:`488` by `Igor Carrara`_) API changes ~~~~~~~~~~~ diff --git a/moabb/evaluations/evaluations.py b/moabb/evaluations/evaluations.py index bc52bda1e..4c8536356 100644 --- a/moabb/evaluations/evaluations.py +++ b/moabb/evaluations/evaluations.py @@ -4,7 +4,6 @@ from typing import Optional, Union import numpy as np -from joblib import Parallel, delayed from mne.epochs import BaseEpochs from sklearn.base import clone from sklearn.metrics import get_scorer @@ -17,7 +16,6 @@ ) from sklearn.model_selection._validation import _fit_and_score, _score from sklearn.preprocessing import LabelEncoder -from sklearn.utils import parallel_backend from tqdm import tqdm from moabb.evaluations.base import BaseEvaluation @@ -162,159 +160,141 @@ def _grid_search(self, param_grid, name, grid_clf, inner_cv): # flake8: noqa: C901 def _evaluate( - self, dataset, pipelines, param_grid, process_pipeline, postprocess_pipeline - ): - with parallel_backend("threading"): - results = Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( - delayed(self._evaluate_subject)( - dataset, - pipelines, - param_grid, - subject, - process_pipeline, - postprocess_pipeline, - ) - for subject in tqdm( - dataset.subject_list, desc=f"{dataset.code}-WithinSession" - ) - ) - - # Concatenate the results from all subjects - yield from [res for subject_results in results for res in subject_results] - - def _evaluate_subject( self, dataset, pipelines, param_grid, - subject, process_pipeline, postprocess_pipeline, ): # Progress Bar at subject level - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed( - pipelines, dataset, subject, process_pipeline - ) - if len(run_pipes) == 0: - return [] - - # get the data - X, y, metadata = self.paradigm.get_data( - dataset=dataset, - subjects=[subject], - return_epochs=self.return_epochs, - return_raws=self.return_raws, - postprocess_pipeline=postprocess_pipeline, - ) - subject_results = [] - # iterate over sessions - for session in np.unique(metadata.session): - ix = metadata.session == session + for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-WithinSession"): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) + if len(run_pipes) == 0: + return [] - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialize CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() - t_start = time() - cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) - inner_cv = StratifiedKFold( - 3, shuffle=True, random_state=self.random_state - ) - scorer = get_scorer(self.paradigm.scoring) - le = LabelEncoder() - y_cv = le.fit_transform(y[ix]) - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv + # get the data + X, y, metadata = self.paradigm.get_data( + dataset=dataset, + subjects=[subject], + return_epochs=self.return_epochs, + return_raws=self.return_raws, + postprocess_pipeline=postprocess_pipeline, + ) + # iterate over sessions + for session in np.unique(metadata.session): + ix = metadata.session == session - grid_clf = clone(clf) + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialize CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() + t_start = time() + cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state) + inner_cv = StratifiedKFold( + 3, shuffle=True, random_state=self.random_state + ) + scorer = get_scorer(self.paradigm.scoring) + le = LabelEncoder() + y_cv = le.fit_transform(y[ix]) + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv - # Create folder for grid search results - create_save_path( - self.hdf5_path, - dataset.code, - subject, - session, - name, - grid=True, - eval_type="WithinSession", - ) + grid_clf = clone(clf) - # Implement Grid Search - grid_clf = self._grid_search( - param_grid=param_grid, name=name, grid_clf=grid_clf, inner_cv=inner_cv - ) - if self.hdf5_path is not None: - model_save_path = create_save_path( + # Create folder for grid search results + create_save_path( self.hdf5_path, dataset.code, subject, session, name, - grid=False, + grid=True, eval_type="WithinSession", ) - if isinstance(X, BaseEpochs): - scorer = get_scorer(self.paradigm.scoring) - acc = list() - X_ = X[ix] - y_ = y[ix] if self.mne_labels else y_cv - for cv_ind, (train, test) in enumerate(cv.split(X_, y_)): - cvclf = clone(grid_clf) - cvclf.fit(X_[train], y_[train]) - acc.append(scorer(cvclf, X_[test], y_[test])) - - if self.hdf5_path is not None: - save_model_cv( - model=cvclf, save_path=model_save_path, cv_index=cv_ind - ) - - acc = np.array(acc) - score = acc.mean() - else: - results = cross_validate( - grid_clf, - X[ix], - y_cv, - cv=cv, - scoring=self.paradigm.scoring, - n_jobs=self.n_jobs, - error_score=self.error_score, - return_estimator=True, + # Implement Grid Search + grid_clf = self._grid_search( + param_grid=param_grid, + name=name, + grid_clf=grid_clf, + inner_cv=inner_cv, ) - score = results["test_score"].mean() if self.hdf5_path is not None: - save_model_list( - results["estimator"], - score_list=results["test_score"], - save_path=model_save_path, + model_save_path = create_save_path( + self.hdf5_path, + dataset.code, + subject, + session, + name, + grid=False, + eval_type="WithinSession", ) - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = np.NaN - duration = time() - t_start + if isinstance(X, BaseEpochs): + scorer = get_scorer(self.paradigm.scoring) + acc = list() + X_ = X[ix] + y_ = y[ix] if self.mne_labels else y_cv + for cv_ind, (train, test) in enumerate(cv.split(X_, y_)): + cvclf = clone(grid_clf) + cvclf.fit(X_[train], y_[train]) + acc.append(scorer(cvclf, X_[test], y_[test])) + + if self.hdf5_path is not None: + save_model_cv( + model=cvclf, + save_path=model_save_path, + cv_index=cv_ind, + ) - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration / 5.0, # 5 fold CV - "dataset": dataset, - "subject": subject, - "session": session, - "score": score, - "n_samples": len(y_cv), # not training sample - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * emissions,) - subject_results.append(res) + acc = np.array(acc) + score = acc.mean() + else: + results = cross_validate( + grid_clf, + X[ix], + y_cv, + cv=cv, + scoring=self.paradigm.scoring, + n_jobs=self.n_jobs, + error_score=self.error_score, + return_estimator=True, + ) + score = results["test_score"].mean() + if self.hdf5_path is not None: + save_model_list( + results["estimator"], + score_list=results["test_score"], + save_path=model_save_path, + ) - return subject_results + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = np.NaN + duration = time() - t_start + + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration / 5.0, # 5 fold CV + "dataset": dataset, + "subject": subject, + "session": session, + "score": score, + "n_samples": len(y_cv), # not training sample + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * emissions,) + + yield res def get_data_size_subsets(self, y): if self.data_size is None: @@ -535,142 +515,121 @@ def evaluate( ): if not self.is_valid(dataset): raise AssertionError("Dataset is not appropriate for evaluation") - # Progressbar at subject level - results = [] - with parallel_backend("threading"): - for result in Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)( - delayed(self.process_subject)( - subject, - param_grid, - pipelines, - dataset, - process_pipeline, - postprocess_pipeline, - ) - for subject in dataset.subject_list - ): - results.extend(result) - - return results + # Progressbar at subject level + for subject in tqdm(dataset.subject_list, desc=f"{dataset.code}-CrossSession"): + # check if we already have result for this subject/pipeline + # we might need a better granularity, if we query the DB + run_pipes = self.results.not_yet_computed( + pipelines, dataset, subject, process_pipeline + ) + if len(run_pipes) == 0: + print(f"Subject {subject} already processed") + return [] - def process_subject( - self, - subject, - param_grid, - pipelines, - dataset, - process_pipeline, - postprocess_pipeline, - ): - # check if we already have result for this subject/pipeline - # we might need a better granularity, if we query the DB - run_pipes = self.results.not_yet_computed( - pipelines, dataset, subject, process_pipeline - ) - if len(run_pipes) == 0: - print(f"Subject {subject} already processed") - return [] + # get the data + X, y, metadata = self.paradigm.get_data( + dataset=dataset, + subjects=[subject], + return_epochs=self.return_epochs, + return_raws=self.return_raws, + postprocess_pipeline=postprocess_pipeline, + ) + le = LabelEncoder() + y = y if self.mne_labels else le.fit_transform(y) + groups = metadata.session.values + scorer = get_scorer(self.paradigm.scoring) - # get the data - X, y, metadata = self.paradigm.get_data( - dataset=dataset, - subjects=[subject], - return_epochs=self.return_epochs, - return_raws=self.return_raws, - postprocess_pipeline=postprocess_pipeline, - ) - le = LabelEncoder() - y = y if self.mne_labels else le.fit_transform(y) - groups = metadata.session.values - scorer = get_scorer(self.paradigm.scoring) + for name, clf in run_pipes.items(): + if _carbonfootprint: + # Initialise CodeCarbon + tracker = EmissionsTracker(save_to_file=False, log_level="error") + tracker.start() - results = [] - for name, clf in run_pipes.items(): - if _carbonfootprint: - # Initialise CodeCarbon - tracker = EmissionsTracker(save_to_file=False, log_level="error") - tracker.start() + # we want to store a results per session + cv = LeaveOneGroupOut() + inner_cv = StratifiedKFold( + 3, shuffle=True, random_state=self.random_state + ) - # we want to store a results per session - cv = LeaveOneGroupOut() - inner_cv = StratifiedKFold(3, shuffle=True, random_state=self.random_state) + grid_clf = clone(clf) - grid_clf = clone(clf) + # Implement Grid Search + grid_clf = self._grid_search( + param_grid=param_grid, name=name, grid_clf=grid_clf, inner_cv=inner_cv + ) - # Implement Grid Search - grid_clf = self._grid_search( - param_grid=param_grid, name=name, grid_clf=grid_clf, inner_cv=inner_cv - ) + if self.hdf5_path is not None: + model_save_path = create_save_path( + hdf5_path=self.hdf5_path, + code=dataset.code, + subject=subject, + session="", + name=name, + grid=False, + eval_type="CrossSession", + ) - if self.hdf5_path is not None: - model_save_path = create_save_path( - hdf5_path=self.hdf5_path, - code=dataset.code, - subject=subject, - session="", - name=name, - grid=False, - eval_type="CrossSession", - ) + for cv_ind, (train, test) in enumerate(cv.split(X, y, groups)): + model_list = [] + if _carbonfootprint: + tracker.start() + t_start = time() + if isinstance(X, BaseEpochs): + cvclf = clone(grid_clf) + cvclf.fit(X[train], y[train]) + model_list.append(cvclf) + score = scorer(cvclf, X[test], y[test]) - for cv_ind, (train, test) in enumerate(cv.split(X, y, groups)): - model_list = [] - if _carbonfootprint: - tracker.start() - t_start = time() - if isinstance(X, BaseEpochs): - cvclf = clone(grid_clf) - cvclf.fit(X[train], y[train]) - model_list.append(cvclf) - score = scorer(cvclf, X[test], y[test]) + if self.hdf5_path is not None: + save_model_cv( + model=cvclf, + save_path=model_save_path, + cv_index=str(cv_ind), + ) + else: + result = _fit_and_score( + clone(grid_clf), + X, + y, + scorer, + train, + test, + verbose=False, + parameters=None, + fit_params=None, + error_score=self.error_score, + return_estimator=True, + ) + score = result["test_scores"] + model_list = result["estimator"] + if _carbonfootprint: + emissions = tracker.stop() + if emissions is None: + emissions = 0 + duration = time() - t_start if self.hdf5_path is not None: - save_model_cv( - model=cvclf, save_path=model_save_path, cv_index=str(cv_ind) + save_model_list( + model_list=model_list, + score_list=score, + save_path=model_save_path, ) - else: - result = _fit_and_score( - clone(grid_clf), - X, - y, - scorer, - train, - test, - verbose=False, - parameters=None, - fit_params=None, - error_score=self.error_score, - return_estimator=True, - ) - score = result["test_scores"] - model_list = result["estimator"] - if _carbonfootprint: - emissions = tracker.stop() - if emissions is None: - emissions = 0 - - duration = time() - t_start - if self.hdf5_path is not None: - save_model_list( - model_list=model_list, score_list=score, save_path=model_save_path - ) - nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] - res = { - "time": duration, - "dataset": dataset, - "subject": subject, - "session": groups[test][0], - "score": score, - "n_samples": len(train), - "n_channels": nchan, - "pipeline": name, - } - if _carbonfootprint: - res["carbon_emission"] = (1000 * emissions,) + nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1] + res = { + "time": duration, + "dataset": dataset, + "subject": subject, + "session": groups[test][0], + "score": score, + "n_samples": len(train), + "n_channels": nchan, + "pipeline": name, + } + if _carbonfootprint: + res["carbon_emission"] = (1000 * emissions,) - results.append(res) - return results + yield res def is_valid(self, dataset): return dataset.n_sessions > 1