Skip to content

Commit

Permalink
Pipelines digest (#447)
Browse files Browse the repository at this point in the history
* Add methods get_process_pipelines and get_labels_pipeline to paradigms

* Compute digest from full pipeline Closes #429

* Update docstrings

* Update whats_new.rst

* Add process_pipeline parameter to Results.not_yet_computed

* Add process_pipeline parameter to Results.to_dataframe()

* Update benchmark test

* Fix prepare_process not called when make_process_pipelines called aoutside of get_data

* Complete TODOs

* Add shortcut for make_process_pipelines in utils

* [pre-commit.ci] auto fixes from pre-commit.com hooks

* Remove TODO as discussed (only keep pipeline for first band)

---------

Co-authored-by: Bru <a.bruno@aluno.ufabc.edu.br>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 25, 2023
1 parent 221cfc0 commit adce413
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 168 deletions.
1 change: 1 addition & 0 deletions docs/source/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ Utils
set_log_level
setup_seed
set_download_dir
make_process_pipelines
2 changes: 2 additions & 0 deletions docs/source/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ Enhancements
- Systematically set the annotations when loading data, eventually using the stim channel (PR :gh:`408` by `Pierre Guetschel`_)
- Allow :func:`moabb.datasets.utils.dataset_search` to search across paradigms ``paradigm=None`` (PR :gh:`408` by `Pierre Guetschel`_)
- Improving the review processing with more pre-commit bots (:gh:`435` by `Bruno Aristimunha`_)
- Add methods ``make_processing_pipelines`` and ``make_labels_pipeline`` to :class:`moabb.paradigms.base.BaseProcessing` (:gh:`447` by `Pierre Guetschel`_)
- Pipelines' digests are now computed from the whole processing+classification pipeline (:gh:`447` by `Pierre Guetschel`_)
- Update all dataset codes to remove white spaces and underscores (:gh:`448` by `Pierre Guetschel`_)
- Add :func:`moabb.utils.depreciated_alias` decorator (:gh:`455` by `Pierre Guetschel`_)
- Rename many dataset class names to standardize and deprecate old names (:gh:`455` by `Pierre Guetschel`_)
Expand Down
2 changes: 1 addition & 1 deletion examples/plot_disk_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
# The main interest of the array cache is when the user passes a
# computationally heavy but fixed additional preprocessing (for example
# computing the covariance matrices of the epochs). This can be done by using
# the ``processing_pipeline`` argument. The output of this additional pipeline
# the ``postprocess_pipeline`` argument. The output of this additional pipeline
# (necessary a numpy array) will be saved to avoid re-computing it each time.
#
#
Expand Down
33 changes: 24 additions & 9 deletions moabb/analysis/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from mne import get_config, set_config
from mne.datasets.utils import _get_path
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline


try:
Expand Down Expand Up @@ -50,6 +51,11 @@ def get_digest(obj):
return hashlib.md5(get_string_rep(obj)).hexdigest()


def get_pipeline_digest(process_pipeline, clf_pipeline):
full_pipeline = Pipeline(steps=[("process", process_pipeline), ("clf", clf_pipeline)])
return get_digest(full_pipeline)


class Results:
"""Class to hold results from the evaluation.evaluate method.
Expand Down Expand Up @@ -110,7 +116,7 @@ def __init__(
"{:%Y-%m-%d, %H:%M}".format(datetime.now())
)

def add(self, results, pipelines): # noqa: C901
def add(self, results, pipelines, process_pipeline): # noqa: C901
"""Add results."""

def to_list(res):
Expand All @@ -133,7 +139,7 @@ def to_list(res):

with h5py.File(self.filepath, "r+") as f:
for name, data_dict in results.items():
digest = get_digest(pipelines[name])
digest = get_pipeline_digest(process_pipeline, pipelines[name])
if digest not in f.keys():
# create pipeline main group if nonexistent
f.create_group(digest)
Expand Down Expand Up @@ -192,13 +198,20 @@ def to_list(res):
]
)

def to_dataframe(self, pipelines=None):
def to_dataframe(self, pipelines=None, process_pipeline=None):
df_list = []

# get the list of pipeline hash
digests = []
if pipelines is not None:
digests = [get_digest(pipelines[name]) for name in pipelines]
if pipelines is not None and process_pipeline is not None:
digests = [
get_pipeline_digest(process_pipeline, pipelines[name])
for name in pipelines
]
elif pipelines is not None or process_pipeline is not None:
raise ValueError(
"Either both of none of pipelines and process_pipeline must be specified."
)

with h5py.File(self.filepath, "r") as f:
for digest, p_group in f.items():
Expand All @@ -221,21 +234,23 @@ def to_dataframe(self, pipelines=None):

return pd.concat(df_list, ignore_index=True)

def not_yet_computed(self, pipelines, dataset, subj):
def not_yet_computed(self, pipelines, dataset, subj, process_pipeline):
"""Check if a results has already been computed."""
ret = {
k: pipelines[k]
for k in pipelines.keys()
if not self._already_computed(pipelines[k], dataset, subj)
if not self._already_computed(pipelines[k], dataset, subj, process_pipeline)
}
return ret

def _already_computed(self, pipeline, dataset, subject, session=None):
def _already_computed(
self, pipeline, dataset, subject, process_pipeline, session=None
):
"""Check if we have results for a current combination of pipeline /
dataset / subject."""
with h5py.File(self.filepath, "r") as f:
# get the digest from repr
digest = get_digest(pipeline)
digest = get_pipeline_digest(process_pipeline, pipeline)

# check if digest present
if digest not in f.keys():
Expand Down
107 changes: 25 additions & 82 deletions moabb/datasets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,14 @@
import re
import traceback
from dataclasses import dataclass
from enum import Enum
from inspect import signature
from pathlib import Path
from typing import Dict, Type, Union
from typing import Dict, Union

from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.pipeline import Pipeline

from moabb.datasets.bids_interface import (
BIDSInterfaceBase,
BIDSInterfaceEpochs,
BIDSInterfaceNumpyArray,
BIDSInterfaceRawEDF,
)
from moabb.datasets.preprocessing import (
EpochsToEvents,
ForkPipelines,
RawToEvents,
SetRawAnnotations,
)
from moabb.datasets.bids_interface import StepType, _interface_map
from moabb.datasets.preprocessing import SetRawAnnotations


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,21 +87,6 @@ def make(cls, dic: Union[None, Dict, "CacheConfig"] = None) -> "CacheConfig":
raise ValueError(f"Expected dict or CacheConfig, got {type(dic)}")


class StepType(Enum):
"""Enum for the different steps in the pipeline."""

RAW = "raw"
EPOCHS = "epochs"
ARRAY = "array"


_interface_map: Dict[StepType, Type[BIDSInterfaceBase]] = {
StepType.RAW: BIDSInterfaceRawEDF,
StepType.EPOCHS: BIDSInterfaceEpochs,
StepType.ARRAY: BIDSInterfaceNumpyArray,
}


def apply_step(pipeline, obj):
"""Apply a pipeline to an object."""
if obj is None:
Expand Down Expand Up @@ -224,12 +198,10 @@ def get_data(
self,
subjects=None,
cache_config=None,
raw_pipeline=None,
epochs_pipeline=None,
array_pipeline=None,
events_pipeline=None,
process_pipeline=None,
):
"""Return the data correspoonding to a list of subjects.
"""
Return the data correspoonding to a list of subjects.
The returned data is a dictionary with the following structure::
Expand Down Expand Up @@ -259,27 +231,16 @@ def get_data(
cache_config: dict | CacheConfig
Configuration for caching of datasets. See ``CacheConfig``
for details.
raw_pipeline: sklearn.pipeline.Pipeline | sklearn.base.TransformerMixin
| None
Pipeline that necessarily takes a mne.io.Raw as input,
and necessarily returns a :class:`mne.io.Raw` as output.
epochs_pipeline: sklearn.pipeline.Pipeline |
sklearn.base.TransformerMixin | None
Pipeline that necessarily takes a mne.io.Raw as input,
and necessarily returns a :class:`mne.Epochs` as output.
array_pipeline: sklearn.pipeline.Pipeline |
sklearn.base.TransformerMixin | None
Pipeline either takes as input a :class:`mne.Epochs` if
epochs_pipeline is not ``None``, or a :class:`mne.io.Raw`
otherwise. It necessarily returns a :func:`numpy.ndarray`
as output.
If array_pipeline is not None, each run will be a
dict with keys "X" and "y" corresponding respectively to the array
itself and the corresponding labels.
events_pipeline: sklearn.pipeline.Pipeline |
sklearn.base.TransformerMixin | None
Pipeline used to generate the events. Only used if
``array_pipeline`` is not ``None``.
process_pipeline: Pipeline | None
Optional processing pipeline to apply to the data.
To generate an adequate pipeline, we recommend using
:func:`moabb.utils.make_process_pipelines`.
This pipeline will receive :class:`mne.io.BaseRaw` objects.
The steps names of this pipeline should be elements of :class:`StepType`.
According to their name, the steps should either return a
:class:`mne.io.BaseRaw`, a :class:`mne.Epochs`, or a :func:`numpy.ndarray`.
This pipeline must be "fixed" because it will not be trained,
i.e. no call to ``fit`` will be made.
Returns
-------
Expand All @@ -292,35 +253,14 @@ def get_data(
if not isinstance(subjects, list):
raise ValueError("subjects must be a list")

if events_pipeline is None and array_pipeline is not None:
log.warning(
f"event_id not specified, using all the dataset's "
f"events to generate labels: {self.event_id}"
)
events_pipeline = (
RawToEvents(self.event_id)
if epochs_pipeline is None
else EpochsToEvents()
)

cache_config = CacheConfig.make(cache_config)

steps = []
steps.append((StepType.RAW, SetRawAnnotations(self.event_id)))
if raw_pipeline is not None:
steps.append((StepType.RAW, raw_pipeline))
if epochs_pipeline is not None:
steps.append((StepType.EPOCHS, epochs_pipeline))
if array_pipeline is not None:
array_events_pipeline = ForkPipelines(
if process_pipeline is None:
process_pipeline = Pipeline(
[
("X", array_pipeline),
("events", events_pipeline),
(StepType.RAW, SetRawAnnotations(self.event_id)),
]
)
steps.append((StepType.ARRAY, array_events_pipeline))
if len(steps) == 0:
steps.append((StepType.RAW, make_pipeline(None)))

data = dict()
for subject in subjects:
Expand All @@ -329,7 +269,7 @@ def get_data(
data[subject] = self._get_single_subject_data_using_cache(
subject,
cache_config,
steps,
process_pipeline,
)

return data
Expand Down Expand Up @@ -394,14 +334,17 @@ def download(
verbose=verbose,
)

def _get_single_subject_data_using_cache(self, subject, cache_config, steps):
def _get_single_subject_data_using_cache(
self, subject, cache_config, process_pipeline
):
"""Load a single subject's data using cache.
Either load the data of a single subject from disk cache or from the
dataset object,
then eventually saves or overwrites the cache version depending on the
parameters.
"""
steps = list(process_pipeline.steps)
splitted_steps = [] # list of (cached_steps, remaining_steps)
if cache_config.use:
splitted_steps += [
Expand Down
19 changes: 18 additions & 1 deletion moabb/datasets/bids_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import re
from collections import OrderedDict
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict, Type

import mne
import mne_bids
Expand Down Expand Up @@ -436,3 +437,19 @@ def _write_file(self, bids_path, obj):
overwrite=False,
verbose=self.verbose,
)


class StepType(Enum):
"""Enum corresponding to the type of data returned
by a pipeline step."""

RAW = "raw"
EPOCHS = "epochs"
ARRAY = "array"


_interface_map: Dict[StepType, Type[BIDSInterfaceBase]] = {
StepType.RAW: BIDSInterfaceRawEDF,
StepType.EPOCHS: BIDSInterfaceEpochs,
StepType.ARRAY: BIDSInterfaceNumpyArray,
}
25 changes: 18 additions & 7 deletions moabb/evaluations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,40 @@ def process(self, pipelines, param_grid=None):
for _, pipeline in pipelines.items():
if not (isinstance(pipeline, BaseEstimator)):
raise (ValueError("pipelines must only contains Pipelines " "instance"))

for dataset in self.datasets:
log.info("Processing dataset: {}".format(dataset.code))
results = self.evaluate(dataset, pipelines, param_grid)
process_pipeline = self.paradigm.make_process_pipelines(
dataset,
return_epochs=self.return_epochs,
return_raws=self.return_raws,
postprocess_pipeline=None,
)[0]
# (we only keep the pipeline for the first frequency band, better ideas?)

results = self.evaluate(dataset, pipelines, param_grid, process_pipeline)
for res in results:
self.push_result(res, pipelines)
self.push_result(res, pipelines, process_pipeline)

return self.results.to_dataframe(pipelines=pipelines)
return self.results.to_dataframe(
pipelines=pipelines, process_pipeline=process_pipeline
)

def push_result(self, res, pipelines):
def push_result(self, res, pipelines, process_pipeline):
message = "{} | ".format(res["pipeline"])
message += "{} | {} | {}".format(
res["dataset"].code, res["subject"], res["session"]
)
message += ": Score %.3f" % res["score"]
log.info(message)
self.results.add({res["pipeline"]: res}, pipelines=pipelines)
self.results.add(
{res["pipeline"]: res}, pipelines=pipelines, process_pipeline=process_pipeline
)

def get_results(self):
return self.results.to_dataframe()

@abstractmethod
def evaluate(self, dataset, pipelines, param_grid):
def evaluate(self, dataset, pipelines, param_grid, process_pipeline):
"""Evaluate results on a single dataset.
This method return a generator. each results item is a dict with
Expand Down

0 comments on commit adce413

Please sign in to comment.