# Cross-validation

## Implementation

### CV Pipeline

In [1]:
import os
import pandas as pd
import numpy as np
import traceback as tb
from typing import Tuple, Dict, Union, Generator, List
from dataclasses import dataclass
from imblearn.over_sampling import SMOTE, SMOTENC
from sklearn.svm import LinearSVC
from sklearn.model_selection import StratifiedKFold, LeaveOneGroupOut, StratifiedShuffleSplit, RepeatedStratifiedKFold
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from sklearn.base import BaseEstimator, clone
import time
import ray


@dataclass
class FoldResult:
    name: str
    estimator: BaseEstimator
    X_train: pd.DataFrame
    y_train: np.ndarray
    X_test: pd.DataFrame
    y_test: np.ndarray
    categories: Dict[str, Dict[int, str]] = None


def _split(
        alg: str,
        X: Union[pd.DataFrame, np.ndarray] = None,
        y: np.ndarray = None,
        groups: np.ndarray = None,
        random_state: int = None,
        n_splits: int = None,
        n_repeats: int = None,
        test_ratio: float = None
) -> Generator[Tuple[np.ndarray, np.ndarray], None, None]:
    if alg == 'holdout':
        splitter = StratifiedShuffleSplit(
            n_splits=n_splits,
            test_size=test_ratio,
            random_state=random_state
        )
    elif alg == 'kfold':
        if n_repeats and n_repeats > 1:
            splitter = RepeatedStratifiedKFold(
                n_splits=n_splits,
                n_repeats=n_repeats,
                random_state=random_state,
            )
        else:
            splitter = StratifiedKFold(
                n_splits=n_splits,
                random_state=random_state,
                shuffle=False if random_state is None else True,
            )
    elif alg == 'logo':
        splitter = LeaveOneGroupOut()
    else:
        raise ValueError('"alg" should be one of "holdout", "kfold", "logo", or "groupk".')

    split = splitter.split(X, y, groups)

    for I_train, I_test in split:
        yield I_train, I_test


def _train(
    dir_result: str,
    name: str,
    X_train_C: np.ndarray,
    X_train_N: np.ndarray,
    y_train: np.ndarray,
    X_test_C: np.ndarray,
    X_test_N: np.ndarray,
    y_test: np.ndarray,
    C_cat: np.ndarray,
    C_num: np.ndarray,
    estimator: BaseEstimator,
    encoder: Union[OneHotEncoder, OrdinalEncoder] = None,
    normalize: bool = False,
    select: bool = False,
    select_threshold: float = 1e-5,
    select_params: Dict[str, any] = None,
    oversample: bool = False,
    random_state: int = None,
):
    X_train_C, X_train_N = X_train_C.astype(object), X_train_N.astype(np.float32)
    X_test_C, X_test_N = X_test_C.astype(object), X_test_N.astype(np.float32)

    if select:
        _t = time.time()
        try:
            select_params = select_params or dict()
            scaler = StandardScaler().fit(X_train_N)
            svc = LinearSVC(
                random_state=random_state, **select_params
            ).fit(X=np.nan_to_num(scaler.transform(X_train_N)), y=y_train)
            M = np.asarray([abs(r) > select_threshold for r in np.ravel(svc.coef_)])
            C_num = C_num[M]
            X_train_N, X_test_N = X_train_N[:, M], X_test_N[:, M] 
            log(f'Complete feature selection ({time.time() - _t:.2f}s): # Cat. = {len(C_cat)}; # Num. = {len(C_num)}')
        except:
            log(f'Failure in feature selection. Keep training without it. Caused by: \n{tb.format_exc()}')
    
    if normalize:
        _t = time.time()

        try:
            scaler = StandardScaler().fit(X_train_N)
            X_train_N = np.nan_to_num(scaler.transform(X_train_N)).astype(np.float32)
            X_test_N = np.nan_to_num(scaler.transform(X_test_N)).astype(np.float32)
            log(f'Complete to normalizing numeric features ({time.time() - _t:.2f}s)')
        except:
            log(f'Failure in normalizing numeric features. Keep training without it. Caused by: \n{tb.format_exc()}')

    if oversample:
        _t = time.time()

        try:
            ord_enc = OrdinalEncoder(dtype=np.float32).fit(X_train_C)
            X_train = np.concatenate((ord_enc.transform(X_train_C), X_train_N), axis=1)
            I_cat = np.arange(X_train_C.shape[1])
            I_num = np.setdiff1d(np.arange(X_train.shape[1]), I_cat)
            if len(C_cat):
                sampler = SMOTENC(categorical_features=I_cat, random_state=random_state)
            else:
                sampler = SMOTE(random_state=random_state)
            X_train, y_train = sampler.fit_resample(X_train, y_train)
            
            X_train_C, X_train_N = X_train[:, I_cat], X_train[:, I_num]
            X_train_C = ord_enc.inverse_transform(X_train_C)
            log(f'Complete to oversampling ({time.time() - _t:.2f}s)')
        except:
            log(f'Failure in oversampling. Keep training without it. Caused by:  \n{tb.format_exc()}')
    
    X_train_C, X_test_C = encoder.transform(X_train_C).astype(np.int32), encoder.transform(X_test_C).astype(np.int32)

    if isinstance(encoder, OneHotEncoder):
        categories = C_cat = list(encoder.get_feature_names_out(C_cat))
    else:
        categories = {k: {c: i for i, c in enumerate(v)} for k, v in zip(C_cat, encoder.categories_)}
        
    X_train_C = pd.DataFrame(X_train_C, columns=C_cat, dtype=np.int32)
    X_train_N = pd.DataFrame(X_train_N, columns=C_num, dtype=np.float32)
    X_test_C = pd.DataFrame(X_test_C, columns=C_cat, dtype=np.int32)
    X_test_N = pd.DataFrame(X_test_N, columns=C_num, dtype=np.float32)

    X_train = pd.concat((X_train_C, X_train_N), axis=1)
    X_test = pd.concat((X_test_C, X_test_N), axis=1)

    try:
        _t = time.time()

        estimator = estimator.fit(X_train, y_train)
        result = FoldResult(
            name=name,
            estimator=estimator,
            X_train=X_train,
            y_train=y_train,
            X_test=X_test,
            y_test=y_test,
            categories=categories
        )
        log(f'Complete to model fitting ({time.time() - _t:.2f}s)')
        dump(result, os.path.join(dir_result, f'{name}.pkl'))
    except:
        log(f'Failure in model fitting. Caused by: \n{tb.format_exc()}')


def cross_val(
    X: pd.DataFrame,
    y: np.ndarray,
    groups: np.ndarray,
    path: str,
    name: str,
    estimator: BaseEstimator,
    categories: List[str] = None,
    normalize: bool = False,
    split: str = None,
    split_params: Dict[str, any] = None,
    select: bool = False,
    select_threshold: float = 1e-5,
    select_params: Dict[str, any] = None,
    oversample: bool = False,
    onehot: bool = False,
    random_state: int = None
):
    if not os.path.exists(path):
        raise ValueError('"path" does not exist.')
    
    if not split:
        raise ValueError('"split" should be specified.')
    
    if not ray.is_initialized():
        raise EnvironmentError('"ray" should be initialized.')
    
    jobs = []
    func = ray.remote(_train).remote

    categories = list() if categories is None else categories
    C_cat = np.asarray(sorted(categories))
    C_num = np.asarray(sorted(X.columns[~X.columns.isin(C_cat)]))

    # Encoding categorical features
    if onehot:
        encoder = OneHotEncoder(dtype=np.int32, sparse_output=False, drop=None).fit(X[C_cat].values)
    else:
        encoder = OrdinalEncoder(dtype=np.int32).fit(X[C_cat].values)

    split_params = split_params or dict()
    splitter = _split(alg=split, X=X, y=y, groups=groups, random_state=random_state, **split_params)

    for idx_fold, (I_train, I_test) in enumerate(splitter):
        if split == 'logo':
            FOLD_NAME = str(np.unique(groups[I_test]).item(0))
        else:
            FOLD_NAME = str(idx_fold + 1)

        X_train, y_train = X.iloc[I_train, :], y[I_train]
        X_train_C, X_train_N = X_train[C_cat], X_train[C_num]

        X_test, y_test = X.iloc[I_test, :], y[I_test]
        X_test_C, X_test_N = X_test[C_cat], X_test[C_num]

        job = func(
            dir_result=path,
            name=f'{name}#{FOLD_NAME}',
            X_train_C=X_train_C.values,
            X_train_N=X_train_N.values,
            y_train=y_train,
            X_test_C=X_test_C.values,
            X_test_N=X_test_N.values,
            y_test=y_test,
            C_cat=C_cat,
            C_num=C_num,
            estimator=clone(estimator),
            encoder=encoder,
            normalize=normalize,
            select=select,
            select_threshold=select_threshold,
            select_params=select_params,
            oversample=oversample,
            random_state=random_state
        )
        jobs.append(job)
    ray.get(jobs)


### Minor Modification on XGBClassifer
This modification allows XGBClassifiers to automatically generate evaluation sets during pipeline (without passing any argument in "fit" function)

In [None]:
import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from sklearn.base import BaseEstimator
from sklearn.model_selection import StratifiedShuffleSplit


class EvXGBClassifier(BaseEstimator):
    def __init__(
        self,
        eval_size=None,
        eval_metric='logloss',
        early_stopping_rounds=10,
        random_state=None,
        **kwargs
        ):
        self.random_state = random_state
        self.eval_size = eval_size
        self.eval_metric = eval_metric
        self.early_stopping_rounds = early_stopping_rounds
        self.model = XGBClassifier(
            random_state=self.random_state,
            eval_metric=self.eval_metric,
            early_stopping_rounds=self.early_stopping_rounds,
            **kwargs
        )

    @property
    def classes_(self):
        return self.model.classes_

    def fit(self, X: pd.DataFrame, y: np.ndarray):
        if self.eval_size:
            splitter = StratifiedShuffleSplit(random_state=self.random_state, test_size=self.eval_size)
            I_train, I_eval = next(splitter.split(X, y))
            X_train, y_train = X.iloc[I_train, :], y[I_train]
            X_eval, y_eval = X.iloc[I_eval, :], y[I_eval]
            self.model = self.model.fit(
                X=X_train, y=y_train, 
                eval_set=[(X_eval, y_eval)],
                verbose=False
            )
        else:
            self.model = self.model.fit(X=X, y=y, verbose=False)
        return self

    def predict(self, X: pd.DataFrame):
        return self.model.predict(X)

    def predict_proba(self, X: pd.DataFrame):
        return self.model.predict_proba(X)


## Execution

In [2]:
import os
from itertools import product
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import RandomForestClassifier
from functools import partial


RANDOM_STATE = 42

ESTIMATOR_DUMMY = DummyClassifier(strategy='prior')
ESTIMATOR_RF = RandomForestClassifier(random_state=RANDOM_STATE)
ESTIMATOR_XGB = EvXGBClassifier(
    random_state=RANDOM_STATE, 
    eval_metric='logloss', 
    eval_size=0.2,
    early_stopping_rounds=10, 
    objective='binary:logistic', 
    verbosity=0,
    learning_rate=0.01
)

CROSS_VAL_BASE = partial(
    cross_val,
    path=os.path.join(PATH_INTERMEDIATE, 'eval'),
    select=True,
    normalize=True,
    select_threshold=1e-3,
    select_params=dict(tol=1e-4, max_iter=5000, dual=False, penalty='l1', loss='squared_hinge', C=1e-2),
    oversample=True,
    split='logo',
    random_state=RANDOM_STATE,
    onehot=True
)

#CLS = ['valence', 'arousal', 'stress', 'disturbance']
CLS = ['stress']
OVERSAMPLE = [True, False]
ESTIMATORS = {'xgb': ESTIMATOR_XGB, 'dummy': ESTIMATOR_DUMMY, 'rf': ESTIMATOR_RF}


with on_ray(num_cpus=6):
    for l, o, e in product(
        CLS, OVERSAMPLE, ESTIMATORS
    ):
        name = f'{l}#{"os" if o else "ns"}#{e}'
        p = os.path.join(PATH_INTERMEDIATE, 'feat', f'{l}.pkl')
        X, y, groups, t, datetimes = load(p)
        cats = X.columns[X.dtypes == object]
        CROSS_VAL_BASE(
            X=X, y=y, groups=groups, name=name,
            estimator=ESTIMATORS[e], 
            categories=cats,
            oversample=o
        )

# Evaluation

## Implementation

In [None]:
import numpy as np
from typing import Dict
from itertools import product
from sklearn.metrics import log_loss
from sklearn.metrics import accuracy_score, balanced_accuracy_score, \
    confusion_matrix, precision_recall_fscore_support, \
    roc_auc_score, matthews_corrcoef, average_precision_score, \
    log_loss, brier_score_loss
import scipy.stats.mstats as ms


def evaluate(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    y_prob: np.ndarray,
    classes: np.ndarray
) -> Dict[str, any]:
    R = {}
    n_classes = len(classes)
    is_multiclass = n_classes > 2
    is_same_y = len(np.unique(y_true)) == 1
    R['inst'] = len(y_true)
    
    for c in classes:
        R[f'inst_{c}'] = np.sum(y_true == c)
        
    if not is_multiclass:
        _, cnt = np.unique(y_true, return_counts=True)
        
        if len(cnt) > 1:
            R['class_ratio'] = cnt[0] / cnt[1]
        else:
            R['class_ratio'] = np.nan

    C = confusion_matrix(y_true=y_true, y_pred=y_pred, labels=classes)
    for (i1, c1), (i2, c2) in product(enumerate(classes), enumerate(classes)):
        R[f'true_{c1}_pred_{c2}'] = C[i1, i2]

    # Threshold Measure
    R['acc'] = accuracy_score(y_true=y_true, y_pred=y_pred)
    R['bac'] = balanced_accuracy_score(y_true=y_true, y_pred=y_pred)
    R['gmean'] = ms.gmean(np.diag(C) / np.sum(C, axis=1))
    R['mcc'] = matthews_corrcoef(y_true=y_true, y_pred=y_pred)
    
    if is_multiclass:
        for avg in ('macro', 'micro'):
            pre, rec, f1, _ = precision_recall_fscore_support(
                y_true=y_true,
                y_pred=y_pred,
                labels=classes,
                average=avg, 
                zero_division=0
            )
            R[f'pre_{avg}'] = pre
            R[f'rec_{avg}'] = rec
            R[f'f1_{avg}'] = f1
    else:
        pre, rec, f1, _ = precision_recall_fscore_support(
            y_true=y_true, y_pred=y_pred, pos_label=c, average='macro', zero_division=0
        )
        R[f'pre_macro'] = pre
        R[f'rec_macro'] = rec
        R[f'f1_macro'] = f1
        
        for c in classes:
            pre, rec, f1, _ = precision_recall_fscore_support(
                y_true=y_true, y_pred=y_pred, pos_label=c, average='binary', zero_division=0
            )
            R[f'pre_{c}'] = pre
            R[f'rec_{c}'] = rec
            R[f'f1_{c}'] = f1

    # Ranking Measure
    if is_multiclass:
        for avg, mc in product(('macro', 'micro'), ('ovr', 'ovo')):
            R[f'roauc_{avg}_{mc}'] = roc_auc_score(
                y_true=y_true, y_score=y_prob,
                average=avg, multi_class=mc, labels=classes
            ) if not is_same_y else np.nan
    else:
        R[f'roauc'] = roc_auc_score(
            y_true=y_true, y_score=y_prob[:, 1], average=None
        ) if not is_same_y else np.nan
        for i, c in enumerate(classes):
            R[f'prauc_{c}'] = average_precision_score(
                y_true=y_true, y_score=y_prob[:, i], pos_label=c, average=None
            ) 
            R[f'prauc_ref_{c}'] = np.sum(y_true == c) / len(y_true)

    # Probability Measure
    R['log_loss'] = log_loss(y_true=y_true, y_pred=y_prob, labels=classes, normalize=True)

    if not is_multiclass:
        R[f'brier_loss'] = brier_score_loss(
            y_true=y_true, y_prob=y_prob[:, 1], pos_label=classes[1]
        )

    return R

## Execution

In [None]:
import os
import pandas as pd


RESULTS_EVAL = []
DIR_EVAL = os.path.join(PATH_INTERMEDIATE, 'eval')


for f in os.listdir(DIR_EVAL):
    l, o, m, s = f[:f.index('.pkl')].split('#')
    res = load(os.path.join(DIR_EVAL, f))
    X, y = res.X_test, res.y_test
    y_pred = res.estimator.predict(X)
    y_prob = res.estimator.predict_proba(X)
    ev_test = evaluate(
        y_true=y,
        y_pred=y_pred,
        y_prob=y_prob,
        classes=[0, 1]
    )
    
    X, y = res.X_train, res.y_train
    y_pred = res.estimator.predict(X)
    y_prob = res.estimator.predict_proba(X)
    ev_train = evaluate(
        y_true=y,
        y_pred=y_pred,
        y_prob=y_prob,
        classes=[0, 1]
    )
    
    RESULTS_EVAL.append({
        'label': l,
        'oversampling': o,
        'alg': m,
        'split': s,
        'n_feature': len(X.columns),
        **{
            f'test_{k}': v for k, v in ev_test.items()
        },
        **{
            f'train_{k}': v for k, v in ev_train.items()
        }
    })
    
RESULTS_EVAL = pd.DataFrame(RESULTS_EVAL)
RESULTS_EVAL.head()

In [None]:
import pandas as pd


SUMMARY_EVAL = []

for row in RESULTS_EVAL.groupby(
    ['label', 'oversampling', 'alg']
).agg(summary).reset_index().itertuples():
    for k, v in row._asdict().items():
        if type(v) is dict:
            r = dict(
                label=row.label,
                oversampling=row.oversampling,
                alg=row.alg,
                metric=k,
                **v
            )
            SUMMARY_EVAL.append(r)

SUMMARY_EVAL = pd.DataFrame(SUMMARY_EVAL)    
SUMMARY_EVAL.head()

In [None]:
SUB_SUMMARY_EVAL = SUMMARY_EVAL.loc[
    lambda x: x['metric'].isin(
        ['n_feature', 'train_class_ratio', 'train_inst_0', 'train_inst_1', 'test_inst_0', 'test_inst_1', 'test_acc', 'test_f1_0' ,'test_f1_1', 'test_f1_macro', 'train_f1_0' ,'train_f1_1', 'train_f1_macro',]
    )
].round(3).assign(
    mean_sd=lambda x: x['mean'].astype(str).str.cat(' (' + x['SD'].astype(str) + ')', sep='')
).pivot(
    index=['label', 'alg', 'oversampling'], columns=['metric'], values=['mean_sd']
)
SUB_SUMMARY_EVAL

# Feature Importances

## Implementation

In [None]:
from typing import Union, Optional


def feature_importance(
    estimator
):
    if hasattr(estimator, 'model'):
        estimator = estimator.model
    
    if not hasattr(estimator, 'feature_names_in_') or not hasattr(estimator, 'feature_importances_'):
        return None
    
    names = estimator.feature_names_in_
    importances = estimator.feature_importances_
    
    return names, importances

## Execution

In [None]:
import os
import pandas as pd
from collections import defaultdict


IMPORTANCE_EVAL = defaultdict(list)
DIR_EVAL = os.path.join(PATH_INTERMEDIATE, 'eval')


for f in os.listdir(DIR_EVAL):
    res = load(os.path.join(DIR_EVAL, f))
    
    f_norm = f[:f.index('.pkl')]
    cv = f_norm[:f.rindex('#')]
    
    feat_imp = feature_importance(res.estimator)
    if not feat_imp:
        continue
        
    names, importance = feat_imp
    d = pd.DataFrame(
        importance.reshape(1, -1),
        columns=names
    )
    IMPORTANCE_EVAL[cv].append(d)


IMPORTANCE_SUMMARY = []

for k, v in IMPORTANCE_EVAL.items():
    l, o, a = k.split('#')
    
    new_v = pd.concat(
        v, axis=0
    ).fillna(0.0).mean().reset_index().set_axis(
        ['feature', 'importance'], axis=1
    ).assign(
        label=l,
        oversampling=o,
        alg=a
    )
    IMPORTANCE_SUMMARY.append(new_v)
    
IMPORTANCE_SUMMARY = pd.concat(IMPORTANCE_SUMMARY, axis=0)
IMPORTANCE_SUMMARY.head()

### Plot

In [None]:
%%R -i IMPORTANCE_SUMMARY -w 26 -h 22 -u cm

plots <- list()

#for (l in c('valence', 'arousal', 'stress', 'disturbance')) {
for (l in c( 'stress')) {
    data <- IMPORTANCE_SUMMARY %>% filter(
        (label == l) & (oversampling == 'os')
    )

    p_label <- ggplot() + geom_text(
        aes(x=.5, y=.5),
        label=str_to_title(l), 
        family='ssp', 
        fontface='bold',
        size=4
    ) + theme_void()

    data_rf <- data %>% filter(
        alg == 'rf'
    )

    p_rf <- ggplot(
        data %>% filter(alg == 'rf') %>% top_n(n=10, wt=importance),
        aes(x=reorder(feature, -importance), y=importance),
    ) + geom_col(
    ) + THEME_DEFAULT + theme(
        axis.text.x=element_text(angle=90, size=10, hjust=1),
        axis.title.x=element_blank(),
        axis.title.y=element_blank()
    ) + labs(
        subtitle='Random Forest'
    ) + ylim(
        c(0, 0.08)
    )
    p_xgb <- ggplot(
        data %>% filter(alg == 'xgb') %>% top_n(n=10, wt=importance),
        aes(x=reorder(feature, -importance), y=importance),
    ) + geom_col(
    ) + THEME_DEFAULT + theme(
        axis.text.x=element_text(angle=90, size=10, hjust=1),
        axis.title.x=element_blank(),
        axis.title.y=element_blank()
    ) + labs(
        subtitle='XGBoost'
    ) + ylim(
        c(0, 0.08)
    )
    plots[[paste(l, 'label', sep='_')]] <- p_label
    plots[[paste(l, 'rf', sep='_')]] <- p_rf
    plots[[paste(l, 'xgb', sep='_')]] <- p_xgb
}

#p <- plots$arousal_label + plots$valence_label
#p <- p / (plots$arousal_rf | plots$arousal_xgb | plots$valence_rf | plots$valence_xgb)
#p <- p / (plots$stress_label + plots$disturbance_label)
#p <- p / (plots$stress_rf | plots$stress_xgb | plots$disturbance_rf | plots$disturbance_xgb)
p <- plots$stress_label 
p <- p / (plots$stress_rf | plots$stress_xgb)


p <- p + plot_layout(
    heights=c(1.1, 10, 1.1, 10)
)

ggsave(paste('./fig/imp.pdf'), plot=p, width=26, height=20, unit='cm', device=cairo_pdf)
print(p)
