In [1]:
import pandas as pd
import numpy as np
import zipfile
import urllib
import io
import re
import os
import traceback
import pickle as pkl

from sklearn.model_selection import LeaveOneGroupOut, GroupKFold, KFold, cross_val_score, cross_validate, cross_val_predict
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import roc_curve

from bokeh.io import output_notebook
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
from bokeh.palettes import d3

output_notebook()

if not os.path.isdir('temp'):
    os.mkdir('temp')

In [2]:
"""
EMG Pattern Database

For recording patterns, we used a MYO Thalmic bracelet worn on a user’s forearm, and a PC with a Bluetooth receiver.
The bracelet is equipped with eight sensors equally spaced around the forearm that simultaneously acquire myographic
signals. The signals are sent through a Bluetooth interface to a PC. 

We present raw EMG data for 36 subjects while they performed series of static hand gestures.The subject performs two 
series, each of which consists of six (seven) basic gestures. Each gesture was performed for 3 seconds with a pause
of 3 seconds between gestures.

Description of raw_data _*** file
Each file consist of 10 columns:
1) Time - time in ms;
2-9) Channel - eightEMG channels of MYO Thalmic bracelet;
10) Class  –thelabel of gestures: 
0 - unmarked data,
1 - hand at rest, 
2 - hand clenched in a fist, 
3 - wrist flexion,
4 – wrist extension,
5 – radial deviations,
6 - ulnar deviations,
7 - extended palm (the gesture was not performed by all subjects).

Relevant Paper:
Lobov S., Krilova N., Kastalskiy I., Kazantsev V., Makarov V.A. Latent Factors Limiting the Performance of
sEMG-Interfaces. Sensors. 2018;18(4):1122. doi: 10.3390/s18041122

Supported by the Ministry of Education and Science of the Russian Federation in the framework of megagrant allocation
in accordance with the decree of the government of the Russian Federation №220, project № 14.Y26.31.0022 
"""
if os.path.isfile('temp/df.csv'):
    print('Loading from file...')
    df = pd.read_csv('temp/df.csv')
else:
    print('Downloading files...')
    with zipfile.ZipFile(io.BytesIO(urllib.request.urlopen('https://archive.ics.uci.edu/ml/machine-learning-databases/00481/EMG_data_for_gestures-master.zip').read())) as data_zip:
        name_re = re.compile(r'.*/(\d{2})/(\d+)_raw_data_(\d{2}-\d{2})_(\d{2}\.\d{2}\.\d{2})\.txt')
        def extract(zip_f):
            match = name_re.match(zip_f.filename)
            if not match:
                return None
            pid, sid, _, _ = match.groups()
            df = pd.read_csv(io.StringIO(data_zip.read(zip_f).decode('utf8')), sep='\t')
            df = df[df['class'] != 0]
            df['pid'] = np.int(pid)
            df['sid'] = np.int(sid)
            df['time'] /= 1e3  # ms -> s
            return df
        df = pd.concat([df_i for df_i in [extract(f) for f in data_zip.infolist()] if df_i is not None])
    df.to_csv('temp/df.csv')
df.set_index(['pid', 'sid', 'time'], inplace=True)

Downloading files...


In [None]:
"""
Validate model using K-fold and grouped K-fold cross-validation.
"""
models = {
    'NaiveBayes': GaussianNB(),
    'LogisticRegression-L1L2': LogisticRegression(
        penalty='elasticnet',
        solver='saga',
        l1_ratio=0.1),
    'RandomForest': RandomForestClassifier(
        n_estimators=50,
        max_depth=5,
        min_samples_split=50000,
        min_samples_leaf=10000),
    'ExtraTrees': ExtraTreesClassifier(
        n_estimators=50,
        max_depth=5,
        min_samples_split=50000,
        min_samples_leaf=10000),
}

df_features = df.drop(columns=['class'])
df_truth = df['class'] == 1.0
n_cpus = 4

if os.path.isfile('temp/cv_results.pkl'):
    print('Loading from file...')
    folds, cv_results = pkl.load(open('temp/cv_results.pkl', 'rb'))
else:
    print('Retraining models...')
    folds = list(GroupKFold(5).split(df, groups=df.index.get_level_values('pid')))

    cv_results = {}
    for model_name, model in models.items():
        print(f'Training model: {model_name}')
        try:
            cv_results[model_name] = cross_validate(
                model,
                df_features,
                df_truth,
                scoring='roc_auc',
                cv=folds,
                n_jobs=n_cpus,
                return_train_score=True,
                return_estimator=True,
                error_score=np.nan)
        except:
            traceback.print_exc()
            cv_results[model_name] = None
    
    print('Saving...')
    pkl.dump((folds, cv_results), open('temp/cv_results.pkl', 'wb'))

    print('All done.')

Retraining models...
Training model: NaiveBayes
Training model: LogisticRegression-L1L2
Training model: RandomForest
Training model: ExtraTrees


In [None]:
class ROCPlot:
    def __init__(self, cv_results, X, y, folds):
        # Compute predictions and ROC tables.
        self._predictions = {}
        self._rocs = {}
        self._agg_rocs = {}
        self._n_folds = len(folds)
        for model_name in cv_results:
            print(f'Generating for {model_name}...')
            train, test = self._compute_all_preds_roc(cv_results[model_name], X, y, folds)
            self._predictions[model_name] = {
                'train': [preds for preds, _ in train],
                'test': [preds for preds, _ in test]
            }
            self._rocs[model_name] = {
                'train': [rocs for _, rocs in train],
                'test': [rocs for _, rocs in test]
            }
            self._agg_rocs[model_name] = {
                'train': self._aggregate_rocs(self._rocs[model_name]['train']),
                'test': self._aggregate_rocs(self._rocs[model_name]['test'])
            }
    
    @property
    def predictions(self):
        return self._predictions
    
    @property
    def rocs(self):
        return self._rocs
    
    @property
    def agg_rocs(self):
        return self._agg_rocs
    
    @staticmethod
    def _compute_indiv_preds_roc(estimator, X, y):
        df_preds = pd.concat(
            [
                pd.DataFrame(
                    data=estimator.predict_proba(X),
                    columns=['negative', 'positive']),
                pd.DataFrame(data={'truth': y.values})
            ],
            axis=1)
        fpr, tpr, threshold = roc_curve(
            df_preds['truth'].values,
            df_preds['positive'].values,
            pos_label=True)
        df_roc = pd.DataFrame({
            'fpr': fpr,
            'fnr': 1 - tpr,
            'tpr': tpr,
            'tnr': 1 - fpr,
            'threshold': threshold
        })
        return df_preds, df_roc

    @classmethod
    def _compute_all_preds_roc(cls, cv_results, X, y, folds):
        df_preds_roc_train = []
        df_preds_roc_test = []
        for fi, fold in enumerate(folds):
            train, test = fold
            df_preds_roc_train.append(cls._compute_indiv_preds_roc(
                cv_results['estimator'][fi], X.iloc[train], y.iloc[train]))
            df_preds_roc_test.append(cls._compute_indiv_preds_roc(
                cv_results['estimator'][fi], X.iloc[test], y.iloc[test]))
        return df_preds_roc_train, df_preds_roc_test

    @staticmethod
    def _aggregate(dfs, x_label, y_label, x_values, y_min=0, y_max=1):
        agg_df = []
        for fi, df in enumerate(dfs):
            x = df[x_label].values
            y = df[y_label].values

            sorted_idx = np.argsort(x)
            y_interp = np.interp(x_values, x[sorted_idx], y[sorted_idx], left=np.nan, right=np.nan)
            agg_df.append(pd.DataFrame({x_label: x_values, y_label: y_interp}))
        
        def se(x: np.array):
            x_nona = x[~np.isnan(x)]
            return np.std(x_nona)/np.sqrt(len(x_nona))
        agg_df = pd.concat(agg_df).groupby(x_label).agg([np.mean, np.std, se])
        return agg_df

    @classmethod
    def _aggregate_rocs(cls, rocs, log_min=-4):
        return {
            'linear': cls._aggregate(rocs, 'fpr', 'tpr', np.linspace(0, 1, 500)),
            'pos_log': cls._aggregate(rocs, 'fpr', 'tpr', 10**np.linspace(log_min, 0, 500)),
            'neg_log': cls._aggregate(rocs, 'fnr', 'tnr', 10**np.linspace(log_min, 0, 500)),
        }

    @staticmethod
    def _plot_with_ci(plot, x, y_mu, y_se, ci=0.95, color='black', legend_label=None):
        style_kwargs = dict(color=color)
        if legend_label is not None:
            style_kwargs['legend_label'] = legend_label
        plot.varea(x, y_mu - 1.96*y_se, y_mu + 1.96*y_se, alpha=0.1, **style_kwargs)
        plot.line(x, y_mu, **style_kwargs)

    def plot_rocs(self, which='test', plot=True):
        p_pos_log = figure(x_axis_label='log10(FPR)', y_axis_label='TPR')
        p_linear = figure(x_axis_label='FPR', y_axis_label='TPR')
        p_neg_log = figure(x_axis_label='log10(FNR)', y_axis_label='TNR')
        
        rand_seq = np.linspace(0, 1, 10000)[1:-1]
        p_pos_log.line(np.log10(rand_seq), rand_seq, color='black')
        p_linear.line(rand_seq, rand_seq, color='black', legend_label='Random')
        p_neg_log.line(np.log10(rand_seq), rand_seq, color='black')
        
        palette = d3['Category10'][len(self.agg_rocs)]
        for mi, model_name in enumerate(self.agg_rocs):
            color = palette[mi]
            agg_roc = self.agg_rocs[model_name]
            self._plot_with_ci(p_pos_log,
                np.log10(agg_roc[which]['pos_log'].index.values),
                agg_roc[which]['pos_log']['tpr']['mean'].values,
                agg_roc[which]['pos_log']['tpr']['se'].values,
                color=color)

            self._plot_with_ci(p_linear,
                agg_roc[which]['linear'].index.values,
                agg_roc[which]['linear']['tpr']['mean'].values,
                agg_roc[which]['linear']['tpr']['se'].values,
                legend_label=model_name, color=color)

            self._plot_with_ci(p_neg_log,
                np.log10(agg_roc[which]['neg_log'].index.values),
                agg_roc[which]['neg_log']['tnr']['mean'].values,
                agg_roc[which]['neg_log']['tnr']['se'].values,
                color=color)

        p_linear.legend.location = 'bottom_right'
        gplt = gridplot(\
            [[p_pos_log, p_linear, p_neg_log]],
            toolbar_location=None,
            plot_width=300,
            plot_height=300)
        if plot:
            show(gplt)
        return gplt

if os.path.isfile('temp/rpl.pkl'):
    print('Loading from file...')
    rpl = pkl.load(open('temp/rpl.pkl', 'rb'))
else:
    print('Regenerating...')
    rpl = ROCPlot(cv_results, df_features, df_truth, folds)
    pkl.dump(rpl, open('temp/rpl.pkl', 'wb'))
roc_plots = rpl.plot_rocs()