Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Flatting evaluation #486

Closed
Closed
5 changes: 4 additions & 1 deletion moabb/evaluations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class BaseEvaluation(ABC):
use MNE raw to train pipelines.
mne_labels: bool, default=False
if returning MNE epoch, use original dataset label if True
n_splits: int, default=5
Number of splits for evaluation.
"""

def __init__(
Expand All @@ -64,6 +66,7 @@ def __init__(
return_epochs=False,
return_raws=False,
mne_labels=False,
n_splits=5,
):
self.random_state = random_state
self.n_jobs = n_jobs
Expand All @@ -77,7 +80,7 @@ def __init__(
if not isinstance(paradigm, BaseParadigm):
raise (ValueError("paradigm must be an Paradigm instance"))
self.paradigm = paradigm

self.n_splits = n_splits
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should protect this new attribute, or at least raise a warning in case the user changes it. Because one of the purposes of MOABB is to normalize the evaluation of algorithms across BCI research, so it's best if everyone uses 5 folds.

# check labels
if self.mne_labels and not self.return_epochs:
raise (ValueError("mne_labels could only be set with return_epochs"))
Expand Down
193 changes: 85 additions & 108 deletions moabb/evaluations/evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional, Union

import numpy as np
from joblib import Parallel, delayed
from joblib import Memory, Parallel, delayed
from mne.epochs import BaseEpochs
from sklearn.base import clone
from sklearn.metrics import get_scorer
Expand All @@ -13,7 +13,6 @@
LeaveOneGroupOut,
StratifiedKFold,
StratifiedShuffleSplit,
cross_validate,
)
from sklearn.model_selection._validation import _fit_and_score, _score
from sklearn.preprocessing import LabelEncoder
Expand All @@ -22,6 +21,7 @@

from moabb.evaluations.base import BaseEvaluation
from moabb.evaluations.utils import create_save_path, save_model_cv, save_model_list
from moabb.model_selection import WithinSessionValidator


try:
Expand All @@ -37,6 +37,9 @@
Vector = Union[list, tuple, np.ndarray]


memory = Memory(location="__cache__")


class WithinSessionEvaluation(BaseEvaluation):
"""Performance evaluation within session (k-fold cross-validation)

Expand Down Expand Up @@ -71,9 +74,6 @@ class WithinSessionEvaluation(BaseEvaluation):
If not None, can guarantee same seed for shuffling examples.
n_jobs: int, default=1
Number of jobs for fitting of pipeline.
n_jobs_evaluation: int, default=1
Number of jobs for evaluation, processing in parallel the within session,
cross-session or cross-subject.
overwrite: bool, default=False
If true, overwrite the results.
error_score: "raise" or numeric, default="raise"
Expand All @@ -91,16 +91,20 @@ class WithinSessionEvaluation(BaseEvaluation):
use MNE raw to train pipelines.
mne_labels: bool, default=False
if returning MNE epoch, use original dataset label if True
n_splits: int, default=5
Number of splits for evaluation.
"""

VALID_POLICIES = ["per_class", "ratio"]

def __init__(
self,
n_splits=5,
n_perms: Optional[Union[int, Vector]] = None,
data_size: Optional[dict] = None,
**kwargs,
):
self.cv = WithinSessionValidator(n_splits=n_splits)
self.data_size = data_size
self.n_perms = n_perms
self.calculate_learning_curve = self.data_size is not None
Expand Down Expand Up @@ -168,7 +172,7 @@ def _evaluate(
results = Parallel(n_jobs=self.n_jobs_evaluation, verbose=1)(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment bellow about this Parallel

delayed(self._evaluate_subject)(
dataset,
pipelines,
pipeline,
param_grid,
subject,
process_pipeline,
Expand All @@ -177,142 +181,108 @@ def _evaluate(
for subject in tqdm(
dataset.subject_list, desc=f"{dataset.code}-WithinSession"
)
for pipeline in self.results.not_yet_computed(
pipelines, dataset, subject, process_pipeline
)
)

# Concatenate the results from all subjects
yield from [res for subject_results in results for res in subject_results]

@memory.cache
def _evaluate_subject(
self,
dataset,
pipelines,
param_grid,
pipeline,
subject,
process_pipeline,
postprocess_pipeline,
):
# Progress Bar at subject level
# check if we already have result for this subject/pipeline
# we might need a better granularity, if we query the DB
run_pipes = self.results.not_yet_computed(
pipelines, dataset, subject, process_pipeline
)
if len(run_pipes) == 0:
return []

# get the data
X, y, metadata = self.paradigm.get_data(
# Getting the data
X, labels, metadata = self.paradigm.get_data(
dataset=dataset,
subjects=[subject],
return_epochs=self.return_epochs,
return_raws=self.return_raws,
postprocess_pipeline=postprocess_pipeline,
)
subject_results = []
# iterate over sessions

y = []
# **WRONG**
# We need to think in a better way to do this...
# Before: For each session, we perform a Label Encoder
for session in np.unique(metadata.session):
ix = metadata.session == session
y.append(self.encode_labels(labels[ix], self.mne_labels))
bruAristimunha marked this conversation as resolved.
Show resolved Hide resolved

for name, clf in run_pipes.items():
if _carbonfootprint:
# Initialize CodeCarbon
tracker = EmissionsTracker(save_to_file=False, log_level="error")
tracker.start()
t_start = time()
cv = StratifiedKFold(5, shuffle=True, random_state=self.random_state)
inner_cv = StratifiedKFold(
3, shuffle=True, random_state=self.random_state
)
scorer = get_scorer(self.paradigm.scoring)
le = LabelEncoder()
y_cv = le.fit_transform(y[ix])
X_ = X[ix]
y_ = y[ix] if self.mne_labels else y_cv
name, clf = pipeline.items()
bruAristimunha marked this conversation as resolved.
Show resolved Hide resolved
if _carbonfootprint:
# Initialize CodeCarbon
tracker = EmissionsTracker(save_to_file=False, log_level="error")
tracker.start()
t_start = time()

grid_clf = clone(clf)
# To-do: find a way to expose this for.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that if we want to only have one Parallel to avoid nesting them, we should put it here instead of across the datasets and subjects for two reasons:

  • parallel calls between subjects and datasets means loading a lot of data in parallel so it's not super efficient
  • plus, if the user wants to also do parallel calls between datasets or subjects, they can launch multiple scripts, each with a different subject.

# Here, we will have n_splits = n_sessions*n_splits (default 5)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should even have n_splits = n_sessions*n_splits*n_pipelines.

for cv_ind, (train_idx, test_idx) in enumerate(
self.cv.split(X, y, groups=metadata)
):
session = metadata[train_idx][0]

# Create folder for grid search results
create_save_path(
self.hdf5_path,
dataset.code,
subject,
session,
name,
grid=True,
eval_type="WithinSession",
)

# Create folder for grid search results
create_save_path(
if self.hdf5_path is not None:
model_save_path = create_save_path(
self.hdf5_path,
dataset.code,
subject,
session,
name,
grid=True,
grid=False,
eval_type="WithinSession",
)

# Implement Grid Search
grid_clf = self._grid_search(
param_grid=param_grid, name=name, grid_clf=grid_clf, inner_cv=inner_cv
)
if self.hdf5_path is not None:
model_save_path = create_save_path(
self.hdf5_path,
dataset.code,
subject,
session,
name,
grid=False,
eval_type="WithinSession",
)

if isinstance(X, BaseEpochs):
scorer = get_scorer(self.paradigm.scoring)
acc = list()
X_ = X[ix]
y_ = y[ix] if self.mne_labels else y_cv
for cv_ind, (train, test) in enumerate(cv.split(X_, y_)):
cvclf = clone(grid_clf)
cvclf.fit(X_[train], y_[train])
acc.append(scorer(cvclf, X_[test], y_[test]))

if self.hdf5_path is not None:
save_model_cv(
model=cvclf, save_path=model_save_path, cv_index=cv_ind
)
scorer = get_scorer(self.paradigm.scoring)
acc = list()
cvclf = deepcopy(clf)
cvclf.fit(X[train_idx], y[train_idx])
acc.append(scorer(cvclf, X[test_idx], y[test_idx]))
bruAristimunha marked this conversation as resolved.
Show resolved Hide resolved

acc = np.array(acc)
score = acc.mean()
else:
results = cross_validate(
grid_clf,
X[ix],
y_cv,
cv=cv,
scoring=self.paradigm.scoring,
n_jobs=self.n_jobs,
error_score=self.error_score,
return_estimator=True,
)
score = results["test_score"].mean()
if self.hdf5_path is not None:
save_model_list(
results["estimator"],
score_list=results["test_score"],
save_path=model_save_path,
)
if self.hdf5_path is not None:
save_model_cv(model=cvclf, save_path=model_save_path, cv_index=cv_ind)

if _carbonfootprint:
emissions = tracker.stop()
if emissions is None:
emissions = np.NaN
duration = time() - t_start
acc = np.array(acc)
score = acc.mean()

nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1]
res = {
"time": duration / 5.0, # 5 fold CV
"dataset": dataset,
"subject": subject,
"session": session,
"score": score,
"n_samples": len(y_cv), # not training sample
"n_channels": nchan,
"pipeline": name,
}
if _carbonfootprint:
res["carbon_emission"] = (1000 * emissions,)
subject_results.append(res)
if _carbonfootprint:
emissions = tracker.stop()
if emissions is None:
emissions = np.NaN
duration = time() - t_start

nchan = X.info["nchan"] if isinstance(X, BaseEpochs) else X.shape[1]
res = {
"time": duration / 5.0, # 5 fold CV
bruAristimunha marked this conversation as resolved.
Show resolved Hide resolved
"dataset": dataset,
"subject": subject,
"session": session,
"score": score,
"n_samples": len(y), # not training sample
"n_channels": nchan,
"pipeline": name,
}
if _carbonfootprint:
res["carbon_emission"] = (1000 * emissions,)
subject_results.append(res)

return subject_results

Expand Down Expand Up @@ -352,6 +322,13 @@ def get_data_size_subsets(self, y):
raise ValueError(f"Unknown policy {self.data_size['policy']}")
return indices

@staticmethod
def encode_labels(labels, mne_labels):
le = LabelEncoder()
y_cv = le.fit_transform(labels)
y = labels if mne_labels else y_cv
return y

def score_explicit(self, clf, X_train, y_train, X_test, y_test):
if not self.mne_labels:
# convert labels if array, keep them if epochs and mne_labels is set
Expand Down
5 changes: 5 additions & 0 deletions moabb/model_selection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .intersubject import CrossSessionValidator, WithinSessionValidator
from .intrasubject import CrossSubjectValidator


__all__ = ["WithinSessionValidator", "CrossSessionValidator", "CrossSubjectValidator"]