In [None]:
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split, cross_val_score, cross_validate, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier

from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = [20, 10]

import math
from glob import glob
import warnings
# import pickle
# import joblib

warnings.filterwarnings('ignore')

def concat_df(file_path, class_num):
    files = sorted(glob(file_path))
    assert len(files) != 0, 'csv file is none'
    
    df = pd.DataFrame()
    for file in files:
        csv_df = pd.read_csv(file)
        df = pd.concat([df, csv_df.loc[:,'nose':'right_ankle_y']])
        df['class'] = class_num
    
    return df.reset_index(drop=True)

file_path_stop = 'path/to/stop'
file_path_change = 'path/to/change'

stop_data = concat_df(file_path_stop, 0) # class 0
chg_data = concat_df(file_path_change, 1) # class 1

concat_data = pd.concat([stop_data, chg_data])
data = concat_data.sample(frac=1, random_state=42).reset_index(drop=True)

label = data['class']
data = data.iloc[:, :-1]

In [None]:
class Predict:
    def __init__(self, clf, x_train, y_train, x_val, y_val, cv, params):
        self.trainX = x_train
        self.trainY = y_train
        self.valX = x_val
        self.valY = y_val
        self.clf = clf
        self.cv = cv
        self.params = params
        
    # inference class
    def predictTest(self):
        self.clf.fit(self.trainX, self.trainY)
        self.result = vote.predict(self.valX)
        return self.result

    # accuracy score
    def accScore(self):
        return accuracy_score(self.valY, self.result)

    # cross vaildation
    def crossVail(self):
        score = cross_val_score(self.clf, self.trainX, self.trainY, cv = self.cv)
        df = pd.DataFrame(cross_validate(self.clf, self.trainX, self.trainY, cv = self.cv))
        return score, df

    # GridSearch CV
    def gridBestparam(self):
        grid = GridSearchCV(estimator=self.clf, param_grid=self.params, cv=self.cv)
        grid.fit(self.trainX, self.trainY)
        return grid.best_params_

# normalization -1 ~ 1
class Normalization:
    def __init__(self, df, head, torso, leg):
        self.df = df
        self.body_list = [head, torso, leg]
        self.joint_len = 17
        
    def listSum(self):
        return [sum(data) for data in zip(*self.body_list)]
        
    def jointCenter(self, xy):
        return xy.iloc[:,:].sum(axis=1) / self.joint_len

    def jointNorm(self, xy, body, centr):
        col_list = xy.columns
        for col in col_list:
            xy.loc[:,col] = (xy.loc[:,col] - centr) / body
        return xy
    
    def startNormal(self):
        body = self.listSum()
        X = self.df.iloc[:, :self.joint_len]
        Y = self.df.iloc[:, self.joint_len:]
        
        centrX = self.jointCenter(X)
        centrY = self.jointCenter(Y)
        normX = self.jointNorm(X, body, centrX)
        normY = self.jointNorm(Y, body, centrY)
        dataframe = pd.merge(normX.copy(), normY.copy(), left_index=True, right_index=True, how='left')
        return dataframe
    
class Maxjoint:
    def __init__(self, df):
        self.df = df
        self.df_len = len(df)
        self.dis_list = []
        self.length = 0
    
    def maxiMum(self):
        return np.maximum.reduce(self.dis_list).tolist()
        
    def jointSum(self):
        return [sum(data) for data in zip(*self.dis_list)]
    
    def disTance(self, x1,x2,y1,y2):
        self.x1 = self.df[x1]
        self.x2 = self.df[x2]
        self.y1 = self.df[y1]
        self.y2 = self.df[y2]
        self.result = []
        
        for idx in range(self.df_len):
            self.length = math.sqrt((self.x2[idx]-self.x1[idx])**2 + (self.y2[idx]-self.y1[idx])**2)
            self.result.append(self.length)
        self.dis_list.append(self.result)
        return self.dis_list


In [None]:
# normalization

headmax = Maxjoint(data)
torsomax = Maxjoint(data)
legleft = Maxjoint(data)
legright = Maxjoint(data)

headmax.disTance('nose', 'left_eye', 'nose_y', 'left_eye_y')
headmax.disTance('nose', 'right_eye', 'nose_y', 'right_eye_y')
headmax.disTance('nose','left_ear','nose_y','left_ear_y')
headmax.disTance('nose','right_ear','nose_y','right_ear_y')
head = headmax.maxiMum()

torsomax.disTance('left_shoulder', 'left_hip', 'left_shoulder_y', 'left_hip_y')
torsomax.disTance('right_shoulder', 'right_hip', 'right_shoulder_y', 'right_hip_y')
torso = torsomax.maxiMum()

legleft.disTance('left_hip', 'left_knee', 'left_hip', 'left_knee')
legleft.disTance('left_knee', 'left_ankle', 'left_knee_y', 'left_ankle_y')
leg_left = legleft.jointSum()

legright.disTance('right_hip', 'right_knee', 'right_hip', 'right_knee')
legright.disTance('right_knee', 'right_ankle', 'right_knee_y', 'right_ankle_y')
leg_right = legright.jointSum()

leg = np.maximum.reduce([leg_left, leg_right]).tolist()
normal = Normalization(data, head, torso, leg)
dataframe = normal.startNormal()

In [None]:
def plot_line(a, b):
    if (a.any()> 0 and b.any()>0): plt.plot([a[0], b[0]], [a[1], b[1]], 'k-')
        
def plot_skeleton(sample, pattern):
    
    keypoint = ['Nose', 'LEye', 'REye', 'LEar', 'REar', 'LShoulder', 'RShoulder',
                'LElbow', 'RElbow', 'LWrist', 'RWrist', 'LHip', 'RHip', 'LKnee', 'RKnee', 'LAnkle', 'RAnkle']
    
    for i in range(len(sample)//2):
        plt.plot(sample[i], sample[17+i], pattern)
        plt.text(sample[i], sample[17+i], keypoint[i], verticalalignment='bottom' , horizontalalignment='center' )
    skeleton = sample
    Nose = skeleton[[0,17]]
    LEye = skeleton[[1,18]]
    REye = skeleton[[2,19]]
    LEar = skeleton[[3,20]]
    REar = skeleton[[4,21]]
    LShoulder = skeleton[[5,22]]
    RShoulder = skeleton[[6,23]]
    LElbow = skeleton[[7,24]]
    RElbow = skeleton[[8,25]]
    LWrist = skeleton[[9,26]]
    RWrist = skeleton[[10,27]]
    LHip = skeleton[[11,28]]
    RHip = skeleton[[12,29]]
    LKnee = skeleton[[13,30]]
    RKnee = skeleton[[14,31]]
    LAnkle = skeleton[[15,32]]
    RAnkle = skeleton[[16,33]]
    
    plot_line(LEye, Nose)
    plot_line(REar, REye)
    plot_line(REye, Nose)
    plot_line(LEar, LEye)
    plot_line(LShoulder, LElbow)
    plot_line(LElbow, LWrist)
    plot_line(RShoulder, RElbow)
    plot_line(RElbow, RWrist)
    plot_line(LHip, LKnee)
    plot_line(LKnee, LAnkle)
    plot_line(RKnee, RAnkle)
    plot_line(RHip, RKnee)
    plot_line(LHip, LShoulder)
    plot_line(RHip, RShoulder)
    plot_line(RHip, LHip)
    plot_line(LShoulder, RShoulder)
    plot_line(LShoulder, Nose)
    plot_line(RShoulder, Nose)
    
def plot(sample, centr):
    
    if centr==0:
        pad_ori = 38
        plt.figure(str(sample))
        plt.subplot(131)
        plt.title('Original skeleton')
        X_ori = sample
        x_max = max(X_ori[:17]) + pad_ori
        x_min = min(i for i in X_ori[:17] if i > 0) - pad_ori
        y_max = max(X_ori[17:]) + pad_ori
        y_min = min(j for j in X_ori[17:] if j > 0) - pad_ori
        plt.xlim(x_min,x_max)
        plt.ylim(y_max, y_min)
        plot_skeleton(X_ori, 'bo')
    
    if centr==1:
        X_nor = sample
        pad_nor = 0.2
        #plt.figure(2)
        plt.subplot(131)
        plt.title('Normalized skeleton')
        x_max = max(X_nor[:17]) + pad_nor
        x_min = min(X_nor[:17]) - pad_nor
        y_max = max(X_nor[17:]) + pad_nor
        y_min = min(X_nor[17:]) - pad_nor
        plt.xlim(x_min,x_max)
        plt.ylim(y_max, y_min)
        plot_skeleton(X_nor, 'ro')

In [None]:
# normalization
plot(dataframe.iloc[0, :], 1)
# original
plot(data.iloc[0, :], 0)

In [None]:
# ML inference
x_train, x_val, y_train, y_val = train_test_split(dataframe, label, test_size=0.3, random_state=42)

clf1 = RandomForestClassifier(n_estimators = 30, max_depth=3, random_state=42)
clf2 = LogisticRegression(C = 0.5, random_state=42)
clf3 = SVC(kernel = 'linear', C = 1.2, probability=True, random_state=42)
clf4 = LGBMClassifier(learning_rate = 0.1, max_depth = 3)

vote = VotingClassifier(estimators=[
    ('rf', clf1),('lr', clf2),('svc',clf3),('lgbm',clf4)
], voting='soft')


votings = Predict(vote, x_train, y_train, x_val, y_val, 5, 'None')

print(votings.predictTest())
print(votings.accScore())