Bag of Patterns Implementation

In [10]:
import numpy as np
import pandas as pd 
import ordpy as ord
from pyts import bag_of_words
import os
import math
import itertools
import matplotlib.pyplot as plt
import string 

from sklearn.preprocessing import MinMaxScaler
from tqdm.notebook import tqdm
from pyts.transformation import BagOfPatterns
from pyts.bag_of_words import BagOfWords

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import ParameterGrid
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, recall_score, f1_score, confusion_matrix

EVAL_DICT = {'accuracy ' : accuracy_score, "recall" : recall_score, "f1 score" : f1_score, "confusion_matrix" : confusion_matrix}

plt.style.use('ggplot')

In [2]:

def evaluate_model(preds, labels):
    res = {}
    for mname, metric in EVAL_DICT.items():

        if 'f1' in mname:
            res[mname] = metric(labels, preds, average='binary')

        else:
            res[mname] = metric(labels, preds)

    return res


def display_metrics(metrics):

    for mname, value in metrics.items():
        if mname != 'confusion_matrix':
            print(f"{mname}:{value:.4f}", end="\t")
        else:
            print()
            print(mname)
            print(value)

In [3]:
def recover_data_from_df(csv_path= "../data/processed/final/train_val_test", filter= False):

    data_splits = {t.split('.')[0] : t for t in os.listdir(csv_path)}
    final_data = {key : [] for key in data_splits}
    final_targets = {key : [] for key in data_splits}
    final_names = {key : [] for key in data_splits}


    for dt, dpath in data_splits.items(): 
        final_path = os.path.join(csv_path, dpath)

        for sample, df in pd.read_csv(final_path).groupby('sample'):
            
            label = df['label'].iloc[0]
            workout = df['Workout'].iloc[0]
            df.drop(columns=['sample', 'label', 'Workout'], inplace=True)

            apn_obj = df.T.values

            final_data[dt].append(apn_obj)
            final_targets[dt].append(label)
            final_names[dt].append(workout)


        final_targets[dt] = np.array(final_targets[dt])

    return final_data, final_targets, final_names

ft, fy, fn = recover_data_from_df()
x = ft['train_data'][0] # uma unica amostra da serie, nao tem como fazer com todas


In [4]:
param = {"window_size" : 10, "word_size" : 3, "n_bins" : 3}

def get_bop_from_ts(x, param):

    bop_transformer = BagOfPatterns(**param)
    n_b, w_s = int(param['n_bins']), int(param['word_size'])
    max_alphabet = list(itertools.product(*([[i for i in string.ascii_letters[:n_b]]] * w_s)))
    max_alphabet = np.apply_along_axis(''.join, axis=1, arr=max_alphabet)
    max_alphabet_dic = dict(zip(max_alphabet, range(0, len(max_alphabet))))
    
    ts_hist = bop_transformer.fit_transform(x).toarray()
    # print(ts_hist)

    voc_used = bop_transformer.vocabulary_
    words = np.array(list(voc_used.values()))
    set_diff = np.setdiff1d(max_alphabet, words)

    # print(ts_hist)
    for k in set_diff:
        d_number = max_alphabet_dic[k]
        ts_hist = np.insert(ts_hist, d_number, 0, axis=-1)

    ts_hist = ts_hist.sum(axis=1)
    return ts_hist
    return (ts_hist - ts_hist.min()) / (ts_hist.max() - ts_hist.min())


def get_bop(datalist, param):
    t = np.array([get_bop_from_ts(i, param) for i in datalist])
    return t

In [6]:
param = {"window_size" : 10, "word_size" : 3, "n_bins" : 3}

xtrain = get_bop(ft['train_data'], param)
xval = get_bop(ft['validation_data'], param)
xtest = get_bop(ft['test_data'], param)

print(xtrain.shape)
print(xval.shape)
print(xtest.shape)

(383, 7)
(107, 7)
(55, 7)


In [None]:

def loop_clfs(xtrain, xval):

    classifiers = {"5-nn" : KNeighborsClassifier(), "SVM" : SVC(), "LogReg" : LogisticRegression(), "RF" : RandomForestClassifier()}
    res = {}


    for clf, cfoo in classifiers.items():
        
        pipe = Pipeline([('scaler', MinMaxScaler()), (clf, cfoo)])

        pipe.fit(xtrain, fy['train_data'])
        pred_val = pipe.predict(xval)

        dval = evaluate_model(pred_val, labels=fy['validation_data']) 
        res[clf] = dval

    return res


def grid_search_on_BOP(params):

    ans = {"5-nn" : [], "SVM" : [], "LogReg" : [], "RF" : []}
    for p in tqdm(params):
        window_size, window_step, word_size, n_bins = p['window_size'], p['window_step'], p['word_size'], p['n_bins']
        
        xtrain = get_bop(ft['train_data'], param)
        xval = get_bop(ft['validation_data'], param)
        xtest = get_bop(ft['test_data'], param)

        t = loop_clfs(xtrain, xval) # * --> dict := (clf, clf_metrics) 
        
                
        for clf, metrics in t.items():

            metrics['window_size'] = window_size
            metrics['window_step'] = window_step
            metrics['word_size'] = word_size
            metrics['n_bins'] = n_bins

            ans[clf].append(metrics)


        # return ans
    return {key: pd.DataFrame.from_dict(v) for key, v in ans.items()}

In [None]:
b_grid_params =  ParameterGrid({
    'window_size' : range(10, 101, 10), 
    'window_step' : range(1, 51, 10),
    'word_size' : range(2, 5),
    'n_bins' : range(3,5),
    'numerosity_reduction' : [False],
    'norm_mean' : [False], 'norm_std':[False],
})

print(len(b_grid_params))


300


In [None]:
dict_ans = grid_search_on_BOP(b_grid_params)


  0%|          | 0/300 [00:00<?, ?it/s]

In [None]:
for key, value in dict_ans.items():
    print(key)
    value.sort_values(by=['f1 score', 'accuracy ', 'recall'], ascending=False, inplace=True)
    display(value.head(3))

5-nn


Unnamed: 0,accuracy,recall,f1 score,window_size,window_step,word_size,n_bins
0,0.803738,0.8125,0.787879,10,1,2,3
1,0.803738,0.8125,0.787879,10,1,3,3
2,0.803738,0.8125,0.787879,10,1,4,3


SVM


Unnamed: 0,accuracy,recall,f1 score,window_size,window_step,word_size,n_bins
0,0.794393,0.791667,0.77551,10,1,2,3
1,0.794393,0.791667,0.77551,10,1,3,3
2,0.794393,0.791667,0.77551,10,1,4,3


LogReg


Unnamed: 0,accuracy,recall,f1 score,window_size,window_step,word_size,n_bins
0,0.719626,0.708333,0.693878,10,1,2,3
1,0.719626,0.708333,0.693878,10,1,3,3
2,0.719626,0.708333,0.693878,10,1,4,3


RF


Unnamed: 0,accuracy,recall,f1 score,window_size,window_step,word_size,n_bins
111,0.831776,0.895833,0.826923,80,21,2,3
119,0.82243,0.875,0.815534,80,41,4,3
140,0.82243,0.875,0.815534,100,11,4,3


In [11]:


clf_param = {"5-nn" : {'window_size':10, 'window_step': 1,	'word_size': 2,	'n_bins' :3 } ,
             "SVM" :  {'window_size':10, 'window_step': 1,	'word_size': 2,	'n_bins' :3 } ,
            "LogReg": {'window_size':10, 'window_step': 1,	'word_size': 2,	'n_bins' :3 } ,
              "RF" :  {'window_size':80, 'window_step': 21,	'word_size': 2,	'n_bins' :3 } ,}


def test_best_params(param):

	classifiers = {"5-nn" : KNeighborsClassifier(), "SVM" : SVC(), "LogReg" : LogisticRegression(), "RF" : RandomForestClassifier()}
	for clfname, cfoo in classifiers.items():
		print(clfname)

		xtest = get_bop(ft['test_data'], param[clfname])
		xtrain = get_bop(ft['train_data'], param[clfname])

		pipe = Pipeline([('scaler', MinMaxScaler()), (clfname, cfoo)])
		pipe.fit(xtrain, fy['train_data'])

		pred_test = pipe.predict(xtest)
		dval = evaluate_model(pred_test, fy['test_data'])
		display_metrics(dval)
		print()


test_best_params(clf_param)

5-nn
accuracy :0.9091	recall:0.9600	f1 score:0.9057	
confusion_matrix
[[26  4]
 [ 1 24]]

SVM
accuracy :0.9091	recall:0.8800	f1 score:0.8980	
confusion_matrix
[[28  2]
 [ 3 22]]

LogReg
accuracy :0.8000	recall:0.7600	f1 score:0.7755	
confusion_matrix
[[25  5]
 [ 6 19]]

RF
accuracy :0.5455	recall:0.5600	f1 score:0.5283	
confusion_matrix
[[16 14]
 [11 14]]

