In [1]:
from sys import path
path.append("../")
from src.data.smile import SmileData
from src.utils import make_binary
from src.utils.score import Merger
from sklearn.metrics import accuracy_score
from sklearn.base import ClassifierMixin
from sklearn.ensemble import AdaBoostClassifier
from sklearn.linear_model import SGDClassifier, RidgeClassifier
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
from numpy import ndarray
from typing import Callable
from numpy import stack
from warnings import warn

In [2]:
path_to_data: str = "../data.nosync/dataset_smile_challenge_unravelled_train_cut10_stadd.npy"
data = SmileData(path_to_data=path_to_data, test=False, debug_mode=False)

In [3]:
feature_selection_configs = dict(
    criterion="mutual information",
    method="percentage",
    method_attribute=51,
    joined=False,
    deep_features=False,
)


data.feature_selection(**feature_selection_configs)

x = data.get_handcrafted_features()
y = data.get_labels()
y = make_binary(y)

In [27]:
from numpy.random import randint

y_1 = randint(low=0, high=2, size=2070)
y_2 = randint(low=0, high=2, size=2070)
y_3 = randint(low=0, high=2, size=2070)
print(y_1.shape)
print(y_2.shape)
print(y_3.shape)

(2070,)
(2070,)
(2070,)


In [28]:
from numpy import stack

merger = Merger._get_merge_strategy('majority_voting')

merger(stack([y_1, y_2, y_3], axis=1).reshape(-1, 20))

ValueError: cannot reshape array of size 6210 into shape (20)

In [5]:
from numpy import array
y = array(y).reshape(-1, 10)

In [32]:
class MultiModalClassifier(ClassifierMixin):
    def __init__(
        self,
        models: dict[str, ClassifierMixin],
        fusion_method: Callable | ClassifierMixin | str,
        time_length: int,
        probability: bool = False,
    ):
        """Classifier to train a multi-modal approach. The classifier will train
        a model (at minute-level) for

        Parameters
        ----------
        models : dict[str, ClassifierMixin]
            _description_
        fusion_method : Callable | ClassifierMixin | str
            _description_
        time_length : int
            _description_
        probability : bool, optional
            _description_, by default False
        """
        self.time_length = time_length
        self.data_names: list[str] = list(models.keys())
        self.models = models
        self.probability = probability
        if isinstance(fusion_method, str):
            self.fusion_method = Merger._get_merge_strategy(fusion_method)
        else:
            self.fusion_method = fusion_method

    def fit(self, x: dict[str, ndarray], y: ndarray):
        if not isinstance(x, dict):
            raise TypeError(f"x must be a dict. Got {type(x)} instead")

        y_preds: dict[str, ndarray] = {}
        for data_name, model in self.models.items():
            model.fit(x[data_name], y)
            y_preds[data_name] = (
                model.predict(x[data_name])
                if not self.probability
                else model.predict_proba(x[data_name])[:, 1]
            )
            self.models[data_name] = model

        if not isinstance(self.fusion_method, str) and not callable(self.fusion_method):
            warn(f"Assuming fusion method to be ML based.")
            # logger.warning('Assuming fusion method to be ML based')
            y_pred = self._ravel_back_results(y_preds=y_preds)

            y = Merger.check_truth(Merger.ravel_back(y=y, time_length=self.time_length))
            self.fusion_method.fit(y_pred, y)

    def _ravel_back_results(self, y_preds: ndarray) -> ndarray:
        y_pred: ndarray = stack(list(y_preds.values()), axis=1)
        y_pred: ndarray = y_pred.reshape(-1, self.time_length * y_pred.shape[-1])
        return y_pred

    def predict(self, x: dict[str, ndarray]) -> ndarray:
        y_preds: dict[str, ndarray] = {}
        for data_name, model in self.models.items():
            y_preds[data_name] = (
                model.predict(x[data_name])
                if not self.probability
                else model.predict_proba(x[data_name])[:, 1]
            )

        y_pred = self._ravel_back_results(y_preds=y_preds)
        if not isinstance(self.fusion_method, str) and not callable(self.fusion_method):
            return self.fusion_method.predict(y_pred)
        else:
            return self.fusion_method(y_pred)

    def score(self, x: dict[str, ndarray], y: ndarray) -> float:
        y_pred = self.predict(x)
        y = Merger.check_truth(Merger.ravel_back(y=y, time_length=self.time_length))
        return accuracy_score(y_pred, y)


In [33]:
from joblib import Parallel, logger, delayed
from src.utils.cv import make_unravelled_folds
from copy import deepcopy

n_jobs: int = -1
verbose: int = 1
pre_dispatch: str = "all"


def fit_and_score(
    estimator: ClassifierMixin,
    x: dict[str, ndarray] | ndarray,
    y: ndarray,
    train_idx: ndarray,
    test_idx: ndarray,
) -> float:
    if isinstance(x, dict):
        x_train: dict[str, ndarray] = {
            data_name: x[data_name][train_idx] for data_name in x.keys()
        }
        x_test: dict[str, ndarray] = {
            data_name: x[data_name][test_idx] for data_name in x.keys()
        }
    else:
        x_train = x[train_idx]
        x_test = x[test_idx]
    y_train: ndarray = y[train_idx]

    y_test: ndarray = y[test_idx]

    estimator.fit(x_train, y_train)

    return estimator.score(x_test, y_test)


def cross_validation(
    x: dict[str, ndarray] | ndarray,
    y: ndarray,
    estimator: ClassifierMixin,
    cv,
    n_jobs: int | None = None,
) -> list[float]:
    parallel = Parallel(
        n_jobs=n_jobs, verbose=verbose, pre_dispatch=pre_dispatch, backend="loky"
    )
    results = parallel(
        delayed(fit_and_score)(
            deepcopy(estimator),
            x,
            y,
            train_idx,
            test_idx,
        )
        for train_idx, test_idx in cv
    )
    return list(results)


In [35]:
cv: list[tuple[ndarray, ndarray]] = make_unravelled_folds(t=10, n_folds=10, n_data=2070)

models = {
    "ECG_features": AdaBoostClassifier(),
    "ST_features": AdaBoostClassifier(),
    "GSR_features": AdaBoostClassifier(),
}
fusion_method = "majority_voting"

multimodal_classifier = MultiModalClassifier(
    models=models, fusion_method=SVC(kernel="rbf"), time_length=10, probability=True
)

res = cross_validation(x=x, y=y, estimator=multimodal_classifier, cv=cv, n_jobs=1)


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
  warn(f"Assuming fusion method to be ML based.")
[Parallel(n_jobs=1)]: Done  10 out of  10 | elapsed:   10.3s finished


In [127]:
from numpy import mean, std, sqrt
print(f"Multimodal: {mean(res)*100} +- {std(res)*100/sqrt(9)}")

Multimodal: 57.874396135265705 +- 2.9498062583034823


In [109]:
# unimodal
res = dict()
for data_name, data in x.items():
    res[data_name] = cross_validation(x=data, y=y, estimator=SVC(), cv=cv, n_jobs=-1)

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    9.8s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    8.7s finished
[Parallel(n_jobs=-1)]: Using backend LokyBackend with 10 concurrent workers.
[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:    7.8s finished


In [110]:
for data_name, result in res.items():
    print(f"{data_name}: {mean(result)*100} +- {std(result)*100/sqrt(9)}")

ECG_features: 58.971014492753625 +- 2.943135807567352
GSR_features: 47.34299516908212 +- 3.106388296395339
ST_features: 58.60386473429953 +- 3.781226418052658


In [8]:
from itertools import chain, combinations





stuff: list[tuple[str, str]] = [
    ("hand_crafted_features", "ECG_features"),
    ("hand_crafted_features", "GSR_features"),
]
res = [subset for subset in all_subsets(stuff) if len(subset) != 0]


In [15]:
for couple in res:
    print(lust)

('hand_crafted_features', 'ECG_features')
('hand_crafted_features', 'GSR_features')
('hand_crafted_features', 'ECG_features')
('hand_crafted_features', 'GSR_features')


In [37]:
from numpy import mean
mean([0.40096618357487923,
0.3140096618357488,
0.5024154589371981,
0.5362318840579711,
0.5942028985507246,
0.642512077294686,
0.4927536231884058,
0.5217391304347826,
0.5797101449275363,
0.5024154589371981])

0.5086956521739131