In [1]:
from tslearn.neighbors import KNeighborsTimeSeriesClassifier
from Parser import parse_frames
from scipy.signal import medfilt
from Functions import analyse_each_rep, find_extremas, filter_extremas
from JointAngles import JointAngles
import numpy as np
import glob
import os
import pickle
import pandas as pd
from functools import reduce


In [2]:
#input_folder = "C:\\Users\\altaok\\Documents\\GitHub\\IndividualProject\\keypoints_for_all\\triceps pushdown"
input_folder = 'C:\\Users\\ak5u16\\Desktop\\IndividualProject\\keypoints_for_all\\triceps pushdown'
folder_paths = glob.glob(os.path.join(input_folder, 'triceps_pushdown*'))
points_folder_name = os.path.basename(input_folder)


def get_data_for_dataset(folder_paths, points_folder_name):
    angle_label_dict = {}
    data = {}
    # need to have a dict with keys : uf_points, ut_points, tk_points; where dict[key] = [([p], 0/1)]; append(([p], 0/1))
    #labeled_arrays = {0:[()], 1:[()]}
    angle_arrays = []
    if points_folder_name == 'triceps pushdown': 
        for folder in folder_paths:
            video_name = os.path.basename(folder)
            label = 0 if '_correct' in folder else 1
            frame_poses = parse_frames(folder)
            joint_angles = JointAngles(points_folder_name, frame_poses)


            upArm_forearm_angles = np.array(joint_angles.upArm_forearm_angles, dtype=np.float)
            upArm_forearm_angles = np.nan_to_num(upArm_forearm_angles)
            upArm_forearm_angles_filtered = medfilt(medfilt(upArm_forearm_angles, 5), 5)
            #labeled_arrays[label][0] = labeled_arrays[label][0] + (upArm_forearm_angles_filtered)

            upArm_trunk_angles = np.array(joint_angles.upArm_trunk_angles, dtype=np.float)
            upArm_trunk_angles = np.nan_to_num(upArm_trunk_angles)
            upArm_trunk_angles_filtered = medfilt(medfilt(upArm_trunk_angles, 5), 5)
            #labeled_arrays[label][0] = labeled_arrays[label][0] + (upArm_trunk_angles_filtered)

            trunk_knee_angles = np.array(joint_angles.trunk_knee_angles)
            trunk_knee_angles = np.nan_to_num(trunk_knee_angles)
            trunk_knee_angles_filtered = medfilt(medfilt(trunk_knee_angles, 5), 5)
            #labeled_arrays[label][0] = labeled_arrays[label][0] + (trunk_knee_angles_filtered)

            #number of reps 
            upArm_forearm_minimas = filter_extremas( upArm_forearm_angles_filtered, find_extremas(upArm_forearm_angles_filtered, maxima=False), maxima=False) 

            angle_arrays.append((label, upArm_forearm_minimas, [upArm_forearm_angles_filtered, upArm_forearm_angles_filtered, trunk_knee_angles_filtered]))

    else: 
        print('Error: Wrong folder path! Has to be: triceps pushdown')

    return angle_arrays
                                             
                                    

def fill_dataframe(angle_arrays, exercise_folder_name):
    df_list = []
    
    print('Filling dataset with ' + str(exercise_folder_name) + ' data...')
   
    for tup in angle_arrays:
        label = tup[0]
        extremas = tup[1]
        uf_angles, ut_angles, tk_angles = tup[2]
        # Extract rep angles
        each_rep_angles = analyse_each_rep('triceps pushdown', 'dataset', extremas, uf_angles, ut_angles, tk_angles)
        s1 = pd.Series(each_rep_angles, name='Angle_array')
        s2 = pd.Series([label for n in range(len(each_rep_angles))], name='Label')
        df = pd.concat([s1,s2], axis=1)
        df_list.append(df)

    return pd.concat(df_list).reset_index(drop=True)


        

In [13]:
angle_arrays = get_data_for_dataset(folder_paths, points_folder_name) 
df = fill_dataframe(angle_arrays, points_folder_name)
print(df.info())

Filling dataset with triceps pushdown data...
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 849 entries, 0 to 848
Data columns (total 2 columns):
Angle_array    849 non-null object
Label          849 non-null int64
dtypes: int64(1), object(1)
memory usage: 13.4+ KB
None


In [8]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', -1)
print(str(df))

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

In [14]:
def numpy_fillna(data):
    # Get lengths of each row of data
    lens = np.array([len(i) for i in data])

    # Mask of valid places in each row
    mask = np.arange(lens.max()) < lens[:,None]

    # Setup output array and put elements from data into masked positions
    out = np.zeros(mask.shape, dtype=data.dtype)
    out[mask] = np.concatenate(data)
    return out

In [15]:
from sklearn.model_selection import train_test_split
from tslearn.neighbors import KNeighborsTimeSeriesClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn import metrics

y = df['Label']
x = df['Angle_array']
#x = numpy_fillna(x)


In [16]:
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
X_train.shape

(679,)

In [17]:
X_test.shape

(170,)

In [8]:
#dtw version 2
def DTWDistance(s1, s2):
    DTW={}

    for i in range(len(s1)):
        DTW[(i, -1)] = float('inf')
    for i in range(len(s2)):
        DTW[(-1, i)] = float('inf')
    DTW[(-1, -1)] = 0

    for i in range(len(s1)):
        for j in range(len(s2)):
            dist= (s1[i]-s2[j])**2
            DTW[(i, j)] = dist + min(DTW[(i-1, j)],DTW[(i, j-1)], DTW[(i-1, j-1)])

    return np.sqrt(DTW[len(s1)-1, len(s2)-1])

In [18]:
k_range = range(1, 2) #26
scores = {}
scores_list = []
for k in k_range:
    knn_clf = KNeighborsTimeSeriesClassifier(n_neighbors=k, metric="dtw", n_jobs=-1)
    #knn_clf = KNeighborsClassifier(k, n_jobs=-1)
    knn_clf.fit(numpy_fillna(X_train.values), y_train)
    predicted_labels = knn_clf.predict(numpy_fillna(X_test.values))
    acc = metrics.accuracy_score(y_test, predicted_labels)
    scores[k] = acc
    print("Correct classification rate:", acc)
    print('\n')
    print(metrics.classification_report(y_test, predicted_labels))
    print(metrics.f1_score(y_test, predicted_labels, average='macro'))


Correct classification rate: 0.9882352941176471


              precision    recall  f1-score   support

           0       0.98      1.00      0.99       115
           1       1.00      0.96      0.98        55

    accuracy                           0.99       170
   macro avg       0.99      0.98      0.99       170
weighted avg       0.99      0.99      0.99       170

0.9864303959131545
