In [6]:
import glob
import os
import numpy as np
import pandas as pd
import seaborn as sns
import joblib
import dill
import dask, dask_ml
import dask.dataframe as dd

from importlib import reload
from scipy import signal, stats
from tqdm.auto import tqdm
from sklearn import neighbors, linear_model, ensemble, decomposition #svm, neural_network
from sklearn import feature_selection, model_selection, metrics, dummy, pipeline, compose, preprocessing
from dask_ml.model_selection import RandomizedSearchCV
from dask_ml.preprocessing import RobustScaler
from dask_ml import xgboost
from dask import distributed
from matplotlib import pyplot as plt
from src import main, feature_model
from itertools import product

In [2]:
from dask_jobqueue import SLURMCluster
from distributed import Client, LocalCluster

In [3]:
try:
    cluster.close()
    client.close()
except NameError:
    pass
finally:
    cluster = SLURMCluster(queue='short', 
                           cores=4, 
                           memory='25gB', 
                           walltime='3:00:00', 
                           death_timeout=60,
#                            job_extra=['--exclude=/home/hy180/notibnodes'])
                          )
    client = Client(cluster)
    cluster.adapt(minimum=1, maximum=50, wait_count=5)

In [43]:
dataset = 'cis-tsfeatures'
features_df = dd.read_parquet(f'/home/hy180/projects/beat_pd/extracted_features/{dataset}.parquet')

label_cols = ['on_off', 'dyskinesia', 'tremor', 'subject_id']
labels = pd.concat([
    pd.read_csv('/home/hy180/projects/beat_pd/data/cis-pd/data_labels/CIS-PD_Training_Data_IDs_Labels.csv'),
    pd.read_csv('/home/hy180/projects/beat_pd/data/real-pd/data_labels/REAL-PD_Training_Data_IDs_Labels.csv'),
], axis=0).astype({'subject_id': str})

# These features don't compute for a number of observations
drop_cols = ['rms__friedrich_coefficients__m_3__r_30__coeff_0',
       'rms__friedrich_coefficients__m_3__r_30__coeff_1',
       'rms__friedrich_coefficients__m_3__r_30__coeff_2',
       'rms__friedrich_coefficients__m_3__r_30__coeff_3',
       'rms__max_langevin_fixed_point__m_3__r_30']
# These fft features are null for our size of windows
null_fft_cols = ['rms__fft_coefficient__coeff_%d__attr_"%s"' % (n, s) 
                     for n, s in product(range(51, 100), ['abs', 'angle', 'imag', 'real'])]
# Sample entropy can take inf which screws with models
inf_cols = ['rms__sample_entropy']
df = features_df.drop(columns=[*drop_cols, *null_fft_cols, *inf_cols]).dropna().merge(labels, right_on='measurement_id', left_on='samp_id')
print('%d rows dropped due to nans in features' % (features_df.shape[0] - df.shape[0]).compute())
# df = df.persist()

0 rows dropped due to nans in features


In [29]:
df.columns[df.columns.str.startswith('rms__')]

Index(['rms__abs_energy', 'rms__absolute_sum_of_changes',
       'rms__agg_autocorrelation__f_agg_"mean"__maxlag_40',
       'rms__agg_autocorrelation__f_agg_"median"__maxlag_40',
       'rms__agg_autocorrelation__f_agg_"var"__maxlag_40',
       'rms__agg_linear_trend__f_agg_"max"__chunk_len_10__attr_"intercept"',
       'rms__agg_linear_trend__f_agg_"max"__chunk_len_10__attr_"rvalue"',
       'rms__agg_linear_trend__f_agg_"max"__chunk_len_10__attr_"slope"',
       'rms__agg_linear_trend__f_agg_"max"__chunk_len_10__attr_"stderr"',
       'rms__agg_linear_trend__f_agg_"max"__chunk_len_50__attr_"intercept"',
       ...
       'rms__symmetry_looking__r_0.9',
       'rms__symmetry_looking__r_0.9500000000000001',
       'rms__time_reversal_asymmetry_statistic__lag_1',
       'rms__time_reversal_asymmetry_statistic__lag_2',
       'rms__time_reversal_asymmetry_statistic__lag_3',
       'rms__value_count__value_-1', 'rms__value_count__value_0',
       'rms__value_count__value_1', 'rms__varian

In [31]:
df.head()

Unnamed: 0,id,rms__abs_energy,rms__absolute_sum_of_changes,"rms__agg_autocorrelation__f_agg_""mean""__maxlag_40","rms__agg_autocorrelation__f_agg_""median""__maxlag_40","rms__agg_autocorrelation__f_agg_""var""__maxlag_40","rms__agg_linear_trend__f_agg_""max""__chunk_len_10__attr_""intercept""","rms__agg_linear_trend__f_agg_""max""__chunk_len_10__attr_""rvalue""","rms__agg_linear_trend__f_agg_""max""__chunk_len_10__attr_""slope""","rms__agg_linear_trend__f_agg_""max""__chunk_len_10__attr_""stderr""",...,rms__value_count__value_0,rms__value_count__value_1,rms__variance,rms__variance_larger_than_standard_deviation,samp_id,measurement_id,subject_id,on_off,dyskinesia,tremor
0,0,0.236036,5.503803,-0.010853,-0.009885,0.045046,-0.02553,0.856429,0.017028,0.003422,...,0.0,0.0,0.002332,0.0,bd6ccdf7-901c-4c52-b41c-18372f291a18,bd6ccdf7-901c-4c52-b41c-18372f291a18,1020,0.0,,1.0
1,1,1.084483,15.815174,-0.014493,0.018759,0.06095,0.058936,0.760309,0.014638,0.004168,...,0.0,0.0,0.010729,0.0,bd6ccdf7-901c-4c52-b41c-18372f291a18,bd6ccdf7-901c-4c52-b41c-18372f291a18,1020,0.0,,1.0
2,10,1.046424,9.562216,-0.005947,0.00463,0.015205,0.181829,-0.13085,-0.003557,0.008983,...,0.0,0.0,0.010222,0.0,bd6ccdf7-901c-4c52-b41c-18372f291a18,bd6ccdf7-901c-4c52-b41c-18372f291a18,1020,0.0,,1.0
3,100,0.152247,4.839051,-0.005487,0.01854,0.035654,0.073948,-0.356508,-0.00363,0.003171,...,0.0,0.0,0.001503,0.0,bd6ccdf7-901c-4c52-b41c-18372f291a18,bd6ccdf7-901c-4c52-b41c-18372f291a18,1020,0.0,,1.0
4,101,0.095432,3.495017,-0.010973,-0.024343,0.019012,0.063523,-0.396377,-0.004275,0.003301,...,0.0,0.0,0.000919,0.0,bd6ccdf7-901c-4c52-b41c-18372f291a18,bd6ccdf7-901c-4c52-b41c-18372f291a18,1020,0.0,,1.0


In [47]:
features_df.groupby(['samp_id', 'id']).mean().index.compute()

MultiIndex([('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   0),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   1),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   2),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   3),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   4),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   5),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   6),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   7),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   8),
            ('060fdaca-dc34-4990-a53c-00e3dff1ca4d',   9),
            ...
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 228),
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 229),
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 230),
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 231),
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 232),
            ('f17b52f0-1204-4a1b-8e37-0851b081e4df', 233),
            ('f17b52f0-1204-4a1b-8e37-08

# Model

In [17]:
scaler = RobustScaler(quantile_range=(1, 99))
scaler_pg = {'scaler__quantile_range': [(.1, 99.9), (.5, 99.5), (1, 99), (5, 95), (10, 90)],}
# scaler = preprocessing.MinMaxScaler()

# Keep features w/ variance in top 95%ile 
var = lambda X, y: np.var(X, axis=0)
f_select = feature_selection.SelectPercentile(var, percentile=95)
# f_select_pg = {'f_select__percentile': [95, 80, 50, 25, 10],}
f_select_pg = {'f_select__percentile': stats.uniform(0, 100)}
# f_select = feature_selection.SelectKBest(feature_selection.mutual_info_regression, k=30)

# model = linear_model.Ridge()
# model_pg = {'model__regressor__alpha': [0.1, 0.5, 1, 2, 5],}
# model = svm.SVR()
# model_pg = {'model__regressor__kernel': ['linear', 'poly', 'rbf', 'sigmoid'], 'model__regressor__C': stats.chi2(df=2)}
# model = linear_model.ElasticNet()
# model_pg = {'model__regressor__l1_ratio': stats.uniform(0, 1), 'model__regressor__alpha': stats.chi2(df=2), }
# model_pg = {'model__regressor__l1_ratio': [0.01, 0.1, 0.5, 0.8, 0.99], 'model__regressor__alpha': [0.1, 0.5, 1, 2, 5],}
# model = mord.OrdinalRidge()
# model_pg = {'model__regressor__alpha': stats.chi2(df=2), }
model = ensemble.RandomForestRegressor(n_jobs=-1, verbose=2)
# model = xgboost.XGBRegressor(verbosity=2)
model_pg = {'model__regressor__n_estimators': stats.randint(3, 100), 'model__regressor__max_depth': stats.randint(2, 20), 'model__regressor__max_features': [.25, 'auto']}
# model = neural_network.MLPRegressor(learning_rate='adaptive')
# model_pg = {'model__regressor__hidden_layer_sizes': [(100), (50, 50)]}

clip_out = preprocessing.FunctionTransformer(np.clip, kw_args={'a_min': 0, 'a_max': 4})
clipped_model = compose.TransformedTargetRegressor(regressor=model, inverse_func=clip_out.transform)

pipe = pipeline.Pipeline([
    ('scaler', scaler), 
    ('f_select', f_select), 
    ('model', clipped_model),
], verbose=1)

param_grid = {
    **scaler_pg,
    **f_select_pg,
    **model_pg,
}

metric = metrics.make_scorer(metrics.mean_squared_error, greater_is_better=False)

cv = model_selection.StratifiedKFold(shuffle=True)
search = RandomizedSearchCV(model, param_grid, n_iter=100, scoring=metric, cv=cv, refit=False, scheduler=client)

In [18]:
model.fit(X, y)

KeyboardInterrupt: 

In [19]:
label = 'dyskinesia'
id_cols = ['measurement_id', 'samp_id']
features = df.dropna(subset=[label]).drop(columns=[*label_cols, *id_cols])

y = df.loc[features.index, label].astype('int')
X = features

client.gather(client.submit(pipe.fit, X, y))

  (         id  rms__abs_energy  rms__absolute_sum_o ... , dtype: int64)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


KeyboardInterrupt: 

In [8]:
err = client.futures["('score-10c602937093b440d648fc2de41c279d', 9, 3)"]

In [7]:
client.futures

{"('score-10c602937093b440d648fc2de41c279d', 31, 4)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 1, 1)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 73, 0)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 21, 0)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 46, 4)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 36, 0)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 8, 1)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 58, 4)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 11, 1)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 28, 2)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 68, 3)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 31, 0)": <FutureState: pending>,
 "('score-10c602937093b440d648fc2de41c279d', 71, 3)": <FutureState

In [9]:
err.exception

AttributeError("'numpy.ndarray' object has no attribute 'to_delayed'")

In [11]:
import traceback as tb
tb.print_tb(err.traceback)

  File "/home/hy180/anaconda3/lib/python3.7/site-packages/dask_ml/model_selection/methods.py", line 237, in fit
    est.fit(X, y, **fit_params)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/compose/_target.py", line 201, in fit
    self.regressor_.fit(X, y_trans, **fit_params)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/dask_xgboost/core.py", line 441, in fit
    evals_result=self.evals_result_,
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/dask_xgboost/core.py", line 288, in train
    **kwargs
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 778, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/home/hy180/anaco

In [6]:
for label in ['dyskinesia', 'on_off', 'tremor']:
    print(f"working on {label}")
    id_cols = ['measurement_id', 'samp_id']
    features = df.dropna(subset=[label]).drop(columns=[*label_cols, *id_cols])

    y = df.loc[features.index, label].astype('int')
    X = features

#     X_p, y_p = client.persist([X, y])

    search = RandomizedSearchCV(pipe, param_grid, n_iter=100, scoring=metric, cv=cv, refit=False, scheduler=client)
    cv_fit = search.fit(X, y, model__client_addr=cluster.scheduler_address)
    cv_results = pd.DataFrame(cv_fit.cv_results_)

    resultset_name = f'{dataset}_{type(model).__name__}_{label}'
    cv_results.to_csv(f'performance/cv_paramsweeps/{resultset_name}.csv')
    win_params = cv_results.loc[cv_results.rank_test_score == 1, 'params'].values[0]
    winner = pipe.set_params(**win_params)
    with open(f'models/paramsweep_winners/{resultset_name}.model', 'wb') as f:
        dill.dump(winner, f)
        
    client.restart()
    print(f"done with {label}")

working on dyskinesia


2020-05-04 14:09:41,436 :: tornado.application - ERROR - Uncaught exception GET /status/ws (::1)
HTTPServerRequest(protocol='http', host='localhost:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='::1')
Traceback (most recent call last):
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/tornado/websocket.py", line 956, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/bokeh/server/views/ws.py", line 135, in open
    raise ProtocolError("Token is expired.")
bokeh.protocol.exceptions.ProtocolError: Token is expired.
('score-10c602937093b440d648fc2de41c279d', 78, 2) has failed... retrying


KeyError: "('transformedtargetregressor-fit-10c602937093b440d648fc2de41c279d', 78, 2)"

### Dask debugging 

In [33]:
key, err = client.futures.popitem()
# keys, states = [*zip(*[*client.futures.items()])]

In [34]:
import traceback as tb
tb.print_tb(err.traceback)
print(err.exception)

Input contains NaN, infinity or a value too large for dtype('float64').


  File "/home/hy180/anaconda3/lib/python3.7/site-packages/dask_ml/model_selection/methods.py", line 260, in fit_transform
    Xt = est.fit_transform(X, y, **fit_params)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/base.py", line 574, in fit_transform
    return self.fit(X, y, **fit_params).transform(X)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/feature_selection/_univariate_selection.py", line 341, in fit
    X, y = check_X_y(X, y, ['csr', 'csc'], multi_output=True)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/utils/validation.py", line 755, in check_X_y
    estimator=estimator)
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/utils/validation.py", line 578, in check_array
    allow_nan=force_all_finite == 'allow-nan')
  File "/home/hy180/anaconda3/lib/python3.7/site-packages/sklearn/utils/validation.py", line 60, in _assert_all_finite
    msg_dtype if msg_dtype is not None else X.dtype)


## Single train-test split for evaluation

In [None]:
# label = ''
with open(f'models/paramsweep_winners/RandomForestRegressor_{label}.model', 'rb') as f:
    winner = dill.load(f)

In [None]:
x_train, x_test, y_train, y_test = model_selection.train_test_split(X.compute(), y.compute(), test_size=.25, stratify=y.compute())
# x_train, y_train = smote.fit_resample(x_train, y_train)

with joblib.parallel_backend('loky'):
    winner.fit(x_train, y_train)
    pred = winner.predict(x_test)

main.plot_performance(y_test, pred)

# Baseline for reference

In [None]:
# label = 'dyskinesia'
features = df.dropna(subset=[label]).drop(columns=[*label_cols, *id_cols])

y = df.loc[features.index, label].astype('int').compute()
metric = metrics.make_scorer(metrics.mean_squared_error, greater_is_better=False)

In [None]:
baseline_model = dummy.DummyRegressor(strategy='mean')
# Pass in y for X because we don't actually care about X
baseline_cv = model_selection.cross_validate(baseline_model, y, y, scoring=metric)
baseline_scores = baseline_cv['test_score']
ax = sns.countplot(y)
ax.set_title('mse of null model: %f' % baseline_scores.mean())

In [None]:
# patient-specific mean predictor
subj_means = labels.groupby('subject_id').mean()
X_subjs = df.loc[X.index][['subject_id']]
naive_pred = X_subjs.merge(subj_means[[label]], left_on='subject_id', right_index=True).rename(columns={label: 'prediction'})
main.plot_performance(y, naive_pred.prediction)

# Predictions on test set

In [None]:
label = 'tremor'
with open(f'models/paramsweep_winners/RandomForestRegressor_{label}.model', 'rb') as f:
    winner = dill.load(f)

In [None]:
# TODO: only predict required measurements for each label
test_index = pd.read_csv(f'test_predictions/sub_template_{label}.csv', index_col=0).index
test_features_df = pd.concat([
    pd.read_csv('extracted_features/tsfeatures_cis_test.csv', index_col=0), 
    pd.read_csv('extracted_features/tsfeatures_real_test.csv', index_col=0)
]).drop(columns=drop_cols).reindex(test_index)

test_subjs = pd.concat([
    pd.read_csv('data/test_set/cis-pd/cis-pd.CIS-PD_Test_Data_IDs.csv', index_col=0), 
    pd.read_csv('data/test_set/real-pd/real-pd.REAL-PD_Test_Data_IDs.csv', index_col=0)
]).reindex(test_index)

In [None]:
# Predict patient-specific mean if data not available
nodata_obs = test_subjs.loc[test_features_df[test_features_df.isna().sum(axis=1) > 0].index]
nodata_predictions = nodata_obs.join(subj_means, on='subject_id')[[label]].rename({label: 'prediction'}, axis=1)

In [None]:
X = test_features_df.dropna(axis='index')

test_predictions = winner.predict(X)
test_predictions_df = pd.concat([
    pd.DataFrame(index=X.index, data={'prediction': test_predictions}),
    nodata_predictions,
], axis=0)

In [None]:
test_predictions_df.to_csv(f'test_predictions/test_predictions_{label}.csv', index=True)

# Dimensionality Reduction

In [None]:
label = 'subject_id'

X = f_select.fit_transform(scaler.fit_transform(features), y=y)
pca = decomposition.FastICA(n_components=2)
proj = pca.fit_transform(X)
fig = plt.figure(figsize=(8, 8))
_ = sns.scatterplot(x=proj[:, 0], y=proj[:, 1], hue=df.loc[features.index, label], legend='full')

In [None]:
# Local cluster for debugging
try:
    local_cluster.close()
    local_client.close()
except NameError:
    pass
finally:
    local_cluster = LocalCluster(n_workers=4, threads_per_worker=1, dashboard_address='0.0.0.0:8786')
    local_client = Client(local_cluster)
    local_cluster.adapt(minimum=0, maximum=4)