## Step 1. Import necessary libraries

In [1]:
# import itertools
import numpy as np
import pandas as pd
import seaborn as sns
import time

import matplotlib.pyplot as plt

from joblib import dump, load
from sklearn import preprocessing
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.feature_selection import chi2, SelectKBest
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay, multilabel_confusion_matrix
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score
from sklearn.model_selection import cross_val_score, GridSearchCV, GroupKFold, train_test_split
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, Normalizer, OneHotEncoder, RobustScaler, StandardScaler
from sklearn.svm import SVC

from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})

## Step 2a. Declare user variables

In [33]:
is_test = False
is_dataset_one_file = False

user_specified_layout = 'layout1'
user_specified_layout_int = int(user_specified_layout[len(user_specified_layout) - 1])
user_specified_nth = 3

user_test_size = 0.2
user_random_state = 1

if user_specified_layout_int == 1:
    ith_inp_col = 16
else:
    ith_inp_col = 20

n_steps = 2
list_filenames = {
    'layout1':'dataset1.csv',
    'layout2':'dataset2.csv',
}
list_columns = {
    'layout1_all':['timestamp','posture_id','posture_label','s01','s02','s03','s04','s05','s06','s07','s08','s09','s10','s11','s12','s13','s14','s15','s16','birth_year','sex','height','weight','bmi','bmi_label','full_name','nth','round'],
    'layout2_all':['timestamp','posture_id','posture_label','s01','s02','s03','s04','s05','s06','s07','s08','s09','s10','s11','s12','s13','s14','s15','s16','s17','s18','s19','s20','birth_year','sex','height','weight','bmi','bmi_label','full_name','nth','round'],
    'layout1_cat_inp':[],
    'layout1_num_inp':['s01','s02','s03','s04','s05','s06','s07','s08','s09','s10','s11','s12','s13','s14','s15','s16', 'height','weight','bmi'],
    'layout2_cat_inp':[],
    'layout2_num_inp':['s01','s02','s03','s04','s05','s06','s07','s08','s09','s10','s11','s12','s13','s14','s15','s16','s17','s18','s19','s20', 'height','weight','bmi'],
}
list_positions = ['Yearner_Right', 'Yearner_Left', 'Fetal_Right', 'Fetal_Left', 'Log_Right', 'Log_Left', 'Supine', 'Prone',
                  # 'Empty'
]
list_grouped_positions = [
    'YRLR',
    'YLLL',
    'Fetal_Right',
    'Fetal_Left',
    'Supine',
    'Prone',
]

# {
#     'Yearner_Right' : 1,
#     'Yearner_Left' : 2,
#     'Fetal_Right' : 3,
#     'Fetal_Left' : 4,
#     'Log_Right' : 5,
#     'Log_Left' : 6,
#     'Supine' : 7,
#     'Prone' : 8,
# }

filename = list_filenames[user_specified_layout]
cols_all = list_columns[user_specified_layout + '_all']
cols_cat_inp = list_columns[user_specified_layout + '_cat_inp']
cols_num_inp = list_columns[user_specified_layout + '_num_inp']
cols_inp = cols_cat_inp + cols_num_inp
cols_drp = list(set(cols_all) - set(cols_inp))

cols_num_inp_std = ['height','weight','bmi']
cols_num_inp_nrm = list(set(cols_num_inp) - set(cols_num_inp_std))

col_grp = 'full_name'
col_trg = 'posture_id'

pd.set_option('display.float_format', lambda x: '%.2f' % x)
sns.reset_orig()

## Step 2b. Declare and prepare needed variables

In [34]:
if is_dataset_one_file:
    df = pd.read_csv(filename, usecols = cols_all)
else:
    import os
    import glob

    # os.chdir("C:/Users/Julianne/Documents/Notebooks/thesis/data/set_{}".format(user_specified_layout_int))
    os.chdir("C:/Users/Julianne/Downloads/set_{} clean".format(user_specified_layout_int))

    extension = 'csv'
    list_raw_filenames = [i for i in glob.glob('*.{}'.format(extension))]
    df = pd.concat([pd.read_csv(f, usecols = cols_all) for f in list_raw_filenames])

    os.chdir("C:/Users/Julianne/Documents/Notebooks/thesis".format(user_specified_layout_int))

df = df[df.nth <= 5]

# unseen_df_cols_drp = list(set(cols_all) - set(cols_inp) - set(['posture_id','posture_label']))
# unseen_df = df[df.nth == 5]
# unseen_df = unseen_df.drop(columns=unseen_df_cols_drp)
# unseen_df.to_csv("clean dataset{} with label.csv".format(user_specified_layout_int), index=False, encoding='utf-8-sig')
# unseen_df = unseen_df.drop(columns=['posture_id','posture_label'])
# unseen_df.to_csv("clean dataset{}.csv".format(user_specified_layout_int), index=False, encoding='utf-8-sig')

if(isinstance(user_specified_nth, int)):
    print('user_specified_nth is an instance of int')
    df = df[df.nth == user_specified_nth]
    df.to_csv("dataset{}.csv".format(user_specified_layout_int), index=False, encoding='utf-8-sig')

user_specified_nth is an instance of int


In [4]:
if is_test:
    df

In [5]:
# if is_test:
#     for col in cols_num_inp_nrm:
#         df[col] = df[col] * df['weight']
#     df

In [6]:
if is_test:
    df.info()

In [7]:
def categorize(row):
    if row['posture_label'] == 'Yearner_Right' or row['posture_label'] == 'Log_Right':
        return 100
    elif row['posture_label'] == 'Yearner_Left' or row['posture_label'] == 'Log_Left':
        return 200
    else:
        return row['posture_id']

df_inp = df.drop(columns=cols_drp)

X = df_inp

y = df[col_trg].values
y_intermediate = df.apply(lambda row: categorize(row), axis=1).values
groups = df[col_grp].values

# TODO: delete cols_identifying_idx and other calls to this variable
cols_identifying_idx = df_inp.columns.get_indexer(['height','weight','bmi'])

In [8]:
cols_cat_inp_idx = df_inp.columns.get_indexer(cols_cat_inp)
cols_num_inp_idx = df_inp.columns.get_indexer(cols_num_inp)

In [9]:
# #apply SelectKBest class to extract top 10 best features
# bestfeatures = SelectKBest(score_func=chi2, k=10)
# fit = bestfeatures.fit(X,y)
# dfscores = pd.DataFrame(fit.scores_)
# dfcolumns = pd.DataFrame(X.columns)
# #concat two dataframes for better visualization 
# featureScores = pd.concat([dfcolumns,dfscores],axis=1)
# featureScores.columns = ['Specs','Score']  #naming the dataframe columns
# print(featureScores.nlargest(10,'Score'))  #print 10 best features

## Step 3. Visualize the dataset

In [10]:
if is_test:
    df_inp.describe()

In [11]:
# pd.plotting.scatter_matrix(X, figsize=(10, 10));

In [12]:
if is_test:
    # Create correlation matrix
    corr_matrix = df.drop(columns=cols_drp).corr()

    # Define mask used to cover squares above diagonal
    mask = np.triu(np.ones_like(corr_matrix, dtype=bool))

    plt.figure(figsize=(20, 10), facecolor='w', edgecolor='k')
    plt.title('Correlation Matrix - ' + user_specified_layout)
    sns.set(font_scale=1.2)
    sns.heatmap(corr_matrix, cmap='coolwarm', center = 0, annot=True, fmt='.1g', mask=mask)

In [13]:
steps_cat_inp = [
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
]
pipe_cat_inp = Pipeline(steps_cat_inp)

steps_num_inp = [
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', MinMaxScaler())
]
pipe_num_inp = Pipeline(steps_num_inp)

ct = ColumnTransformer(transformers=[
          ('categorical', pipe_cat_inp, cols_cat_inp_idx),
          ('numerical', pipe_num_inp, cols_num_inp_idx)
])

In [14]:
df[col_grp].nunique()

23

## Step 4. Declare needed functions

In [15]:
def print_metrics(actual_targets, predicted_targets, target_classes):
    print(accuracy_score(actual_targets, predicted_targets))
#     print(precision_score(actual_targets, predicted_targets, average='micro'))
#     print(recall_score(actual_targets, predicted_targets, average='micro'))
#     print(f1_score(actual_targets, predicted_targets, average='micro'))
#     try:
#         print(classification_report(actual_targets, predicted_targets, target_names=target_classes))
#     except:
#         print(classification_report(actual_targets, predicted_targets))

In [16]:
def generate_confusion_matrix(cnf_matrix, actual_targets, predicted_targets, target_classes, normalize=False, title='Confusion Matrix'):
    if normalize:
        cnf_matrix = cnf_matrix.astype('float') / cnf_matrix.sum(axis=1)[:, np.newaxis]
        print("Confusion Matrix, With Normalized Counts")
        ConfusionMatrixDisplay.from_predictions(actual_targets, predicted_targets, 
#                                                 display_labels = target_classes, 
                                                xticks_rotation = 45, 
                                                normalize='true', 
                                                values_format = '.2f')
    else:
        print("Confusion Matrix, Without Normalized Counts")
        ConfusionMatrixDisplay.from_predictions(actual_targets, predicted_targets, 
#                                                 display_labels = target_classes, 
                                                xticks_rotation = 45)

    plt.title(title)

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    return cnf_matrix

In [17]:
def plot_confusion_matrix(actual_targets, predicted_targets, classifier, target_classes):
    cm = confusion_matrix(actual_targets, predicted_targets)
    np.set_printoptions(precision=2)

    print_metrics(actual_targets, predicted_targets, target_classes)

    # Plot non-normalized confusion matrix
    plt.figure()
    generate_confusion_matrix(cm, actual_targets, predicted_targets, target_classes=target_classes, title='%s Confusion Matrix - Layout %x (Without Normalized Counts)' % (classifier, user_specified_layout_int))
    plt.savefig('CM-%xS-L%x-%s-nonnormalized.png' % (n_steps, user_specified_layout_int, classifier), format='png', bbox_inches="tight")
    plt.show()

    # Plot normalized confusion matrix
    plt.figure()
    generate_confusion_matrix(cm, actual_targets, predicted_targets, target_classes=target_classes, normalize=True, title='%s Confusion Matrix - Layout %x (With Normalized Counts)' % (classifier, user_specified_layout_int))
    plt.savefig('CM-%xS-L%x-%s-normalized.png' % (n_steps, user_specified_layout_int, classifier), format='png', bbox_inches="tight")
    plt.show()

In [18]:
def generate_pipeline(classifier):
    if classifier == 'MLP':
        pipe = Pipeline([
            ('pre', ct), 
    #             ('feature_selection', SelectKBest(score_func=chi2, k=10)), # SelectKBest(score_func=f_regression, k=4)
            ('clf', MLPClassifier(max_iter=5000))
        ])
    elif classifier == 'SVM':
        pipe = Pipeline([
            ('pre', ct), 
    #             ('feature_selection', SelectKBest(score_func=chi2, k=10)), # SelectKBest(score_func=f_regression, k=4)
            ('clf', SVC(kernel='rbf', random_state=user_random_state))
        ])

    return pipe

In [19]:
def save_final_models(data_x, data_y, data_y_intermediate, classifier):
    pipe_intermediate = generate_pipeline(classifier)
    pipe_intermediate.fit(data_x, data_y_intermediate)
    dump(pipe_intermediate, 'L{}-YearnerLog-Step1.joblib'.format(user_specified_layout_int))

    pipe_rights = generate_pipeline(classifier)
    pipe_rights.fit(data_x[np.isin(data_y, [1,5])], data_y[np.isin(data_y, [1,5])])
    dump(pipe_rights, 'L{}-YearnerLog-Step2rights.joblib'.format(user_specified_layout_int))

    pipe_lefts = generate_pipeline(classifier)
    pipe_lefts.fit(data_x[np.isin(data_y, [2,6])], data_y[np.isin(data_y, [2,6])])
    dump(pipe_lefts, 'L{}-YearnerLog-Step2lefts.joblib'.format(user_specified_layout_int))

In [20]:
def model(train_x, train_y, test_x, test_y, classifier):
    pipe = generate_pipeline(classifier)

    pipe.fit(train_x, train_y)
    predicted_labels = pipe.predict(test_x)

    return predicted_labels

In [21]:
def evaluate_subgroup(train_x, train_y, test_x, test_y, intermediate_labels, list_ids, subgroup_id, classifier):
    train_x_subgroup = train_x[np.isin(train_y, list_ids)]
    train_y_subgroup = train_y[np.isin(train_y, list_ids)]
    test_x_predicted_subgroup = test_x[np.isin(intermediate_labels, subgroup_id)]
    test_y_corresponding_predicted_subgroup = test_y[np.isin(intermediate_labels, subgroup_id)]

    predicted_labels_from_subgroup = []
    if test_x_predicted_subgroup.size:
        predicted_labels_from_subgroup = model(train_x_subgroup, train_y_subgroup, 
                                               test_x_predicted_subgroup, test_y_corresponding_predicted_subgroup, 
                                               classifier)
    else:
        print("None of the participant's data was classified as subgroup {}".format(subgroup_id))

    return predicted_labels_from_subgroup, test_y_corresponding_predicted_subgroup

In [22]:
def evaluate_model(data_x, data_y, data_y_intermediate, classifier):
    group_k_fold = GroupKFold(n_splits=df[col_grp].nunique())

    predicted_targets = np.array([])
    intermediate_actual_targets = np.array([])
    actual_targets = np.array([])
    predicted_targets_rights = np.array([])
    actual_targets_rights = np.array([])
    predicted_targets_lefts = np.array([])
    actual_targets_lefts = np.array([])

    for i, (train_index, test_index) in enumerate(group_k_fold.split(data_x, data_y, groups=groups)):
        train_x, test_x = data_x[train_index], data_x[test_index]
        train_y, test_y = data_y[train_index], data_y[test_index]
        train_y_intermediate, test_y_intermediate = data_y_intermediate[train_index], data_y_intermediate[test_index]
        print('fold %x (height: %.2f, weight: %.2f, bmi: %.2f)' % (i, test_x[0, cols_identifying_idx[0]], test_x[0, cols_identifying_idx[1]], test_x[0, cols_identifying_idx[2]]))

        intermediate_output = model(train_x, train_y_intermediate, test_x, test_y_intermediate, classifier)

        predicted_targets = np.append(predicted_targets, intermediate_output)
        intermediate_actual_targets = np.append(intermediate_actual_targets, test_y_intermediate)
        actual_targets = np.append(actual_targets, test_y)

#         cmd = ConfusionMatrixDisplay.from_predictions(test_y_intermediate, predicted_labels, 
#                                                       display_labels = list_grouped_positions, 
#                                                       xticks_rotation = 45, 
#                                                       normalize='true', 
#                                                       values_format = '.2f')
#         plt.title('%s Confusion Matrix - Layout %x (height: %.2f, weight: %.2f, bmi: %.2f)' % (classifier, user_specified_layout_int, test_x[0, cols_identifying_idx[0]], test_x[0, cols_identifying_idx[1]], test_x[0, cols_identifying_idx[2]]))
#         plt.tight_layout()
#         plt.savefig('kfold confusion matrices/CM-%xS-L%x-%s-height-%.2f, weight-%.2f, bmi-%.2f.png' % (n_steps, user_specified_layout_int, classifier, test_x[0, cols_identifying_idx[0]], test_x[0, cols_identifying_idx[1]], test_x[0, cols_identifying_idx[2]]), format='png', bbox_inches="tight")
#         plt.show()

        output, corresponding_true_y = evaluate_subgroup(train_x, train_y, test_x, test_y, intermediate_output, [1,5], 100, classifier)

        predicted_targets_rights = np.append(predicted_targets_rights, output)
        actual_targets_rights = np.append(actual_targets_rights, corresponding_true_y)

        output, corresponding_true_y = evaluate_subgroup(train_x, train_y, test_x, test_y, intermediate_output, [2,6], 200, classifier)

        predicted_targets_lefts = np.append(predicted_targets_lefts, output)
        actual_targets_lefts = np.append(actual_targets_lefts, corresponding_true_y)

#         cmd = ConfusionMatrixDisplay.from_predictions(test_y_corresponding_predicted_rights, predicted_labels_2, 
#                                                       display_labels = list_positions, 
#                                                       xticks_rotation = 45, 
#                                                       normalize='true', 
#                                                       values_format = '.2f')
#         plt.title('%s Confusion Matrix - Layout %x (height: %.2f, weight: %.2f, bmi: %.2f)' % (classifier, user_specified_layout_int, test_x[0, cols_identifying_idx[0]], test_x[0, cols_identifying_idx[1]], test_x[0, cols_identifying_idx[2]]))
#         plt.tight_layout()
# #         plt.savefig('kfold confusion matrices/CM-%xS-L%x-%s-height-%.2f, weight-%.2f, bmi-%.2f.png' % (n_steps, user_specified_layout_int, classifier, test_x[0, cols_identifying_idx[0]], test_x[0, cols_identifying_idx[1]], test_x[0, cols_identifying_idx[2]]), format='png', bbox_inches="tight")
#         plt.show()
    #TODO: fix target_classes
#     plot_confusion_matrix(intermediate_actual_targets, predicted_targets, classifier, list_grouped_positions)
#     plot_confusion_matrix(actual_targets_rights, predicted_targets_rights, classifier, list_positions)
#     plot_confusion_matrix(actual_targets_lefts, predicted_targets_lefts, classifier, list_positions)

    new_predicted_targets = []
    ctr_rights = 0
    ctr_lefts = 0
    for target in predicted_targets:
        if target == 100:
            new_predicted_targets.append(predicted_targets_rights[ctr_rights])
            ctr_rights += 1
        elif target == 200:
            new_predicted_targets.append(predicted_targets_lefts[ctr_lefts])
            ctr_lefts += 1
        else:
            new_predicted_targets.append(target)
            
#     np.putmask(predicted_targets, predicted_targets == 100, predicted_targets_rights)
#     np.putmask(predicted_targets, predicted_targets == 200, predicted_targets_lefts)
#     np.putmask(actual_targets, actual_targets == 100, actual_targets_rights)
#     np.putmask(actual_targets, actual_targets == 200, actual_targets_lefts)

    return actual_targets, new_predicted_targets

## Step 5. Execute

In [23]:
X = df_inp.to_numpy()

data = X
target = y

In [24]:
# actual_targets, predicted_targets = evaluate_model(data, target, y_intermediate, 'SVM')

In [25]:
# plot_confusion_matrix(actual_targets, predicted_targets, 'SVM', list_positions)

In [26]:
# actual_targets, predicted_targets = evaluate_model(data, target, y_intermediate, 'MLP')

In [27]:
# plot_confusion_matrix(actual_targets, predicted_targets, 'MLP', list_positions)

In [28]:
# save_final_models(data, target, y_intermediate, 'SVM')

In [29]:
# save_final_models(data, target, y_intermediate, 'MLP')

In [30]:
# x_sample = np.array([
#     [769,588,485,987,663,567,399,913,956,686,151,785,991,806,157,633],
#     [781,604,475,985,683,590,379,921,934,716,177,791,895,590,121,715],
#     [863,588,567,986,877,575,408,944,986,595,139,839,985,808,141,657],
#     [942,716,561,864,803,662,464,759,860,376,478,819,714,441,196,860],
#     [984,724,308,551,960,617,326,676,933,178,881,912,609,140,982,986],
#     [986,712,313,593,924,598,318,699,826,149,857,894,615,128,808,921],
#     [985,752,364,656,979,694,259,837,923,162,677,985,573,114,971,985],
#     [845,656,679,817,657,592,548,646,721,251,648,854,800,529,844,985],
# ])

# saved_model = load('L{}-YearnerLog-Step1.joblib'.format(user_specified_layout_int))
# predicted_targets = saved_model.predict(x_sample)

# if x_sample[np.isin(predicted_targets, 100)].size:
#     saved_model_rights = load('L{}-YearnerLog-Step2rights.joblib'.format(user_specified_layout_int))
#     predicted_targets_rights = saved_model_rights.predict(x_sample[np.isin(predicted_targets, 100)])

# if x_sample[np.isin(predicted_targets, 200)].size:
#     saved_model_lefts = load('L{}-YearnerLog-Step2lefts.joblib'.format(user_specified_layout_int))
#     predicted_targets_lefts = saved_model_lefts.predict(x_sample[np.isin(predicted_targets, 200)])

# new_predicted_targets = []
# ctr_rights = 0
# ctr_lefts = 0
# for target in predicted_targets:
#     if target == 100:
#         new_predicted_targets.append(predicted_targets_rights[ctr_rights])
#         ctr_rights += 1
#     elif target == 200:
#         new_predicted_targets.append(predicted_targets_lefts[ctr_lefts])
#         ctr_lefts += 1
#     else:
#         new_predicted_targets.append(target)

# for i, pred in enumerate(new_predicted_targets):
#     print('line #{:''>3}: {:''>3}, {}'.format(i + 2, predicted_targets[i], new_predicted_targets[i]))
# # print(predicted_targets)
# # print(new_predicted_targets)

FileNotFoundError: [Errno 2] No such file or directory: 'L1-YearnerLog-Step1.joblib'