六轴动作数据通过小程序game模式在50Hz频率下采样，分别为开合跳、蹲起、合掌跳、高抬腿，同时存储记录人身高、体重、年龄和性别信息。
通过LSTM、CNN和MLP算法均可以实现较好动作检测，其中测试集准确率LSTM=89.6% > MLP=86% >= CNN=86.24%。
由于个人采集数据量较小且动作之间数据量不平衡，实时监测准确率稍有下降，但扩充数据集重新导入参数即可大幅度改良。
（三种算法均测试过其他同学多种动作数据集，准确率在87%以上）
通过JavaScript重写MLP源码部署在小程序上，可以实现离线识别，但语音播报需要联网接通云存储音频文件。
由于小程序内部计算量大，采样频率受到影响，但大体监测仍准确。若改用连接服务器则使用LSTM、CNN算法识别效果会更好。
以下分数据预处理和CNN、MLP、LSTM算法两部分介绍。

In [8]:
#数据预处理：小程序云数据库存储的数据导出为csv格式，分别由加速计XYZ轴（accXs,accYs,accZs)、陀螺仪XYZ轴（gyroXs,gyroYs,gyroZs)、
#对应采集加速计和陀螺仪信号的时间戳（AtimeSs，GtimeSs）、身高体重年龄及性别、所做运动标签（yundong）和记录人id
#数据预处理将原采集的数据转化成方便算法分割读取的形式
import pandas as pd
import openpyxl
writer=pd.ExcelWriter('hezhangtiao.xlsx')

def read_data(file_path):
    data = pd.read_csv(file_path)  #读取csv数据
    return data

dataset=read_data('hezhangtiao.csv')
#
for j in range(len(dataset)):   #读取的csv数据是字符串格式，将其分割成数组，转化成浮点数，再写入excel
    Atimes_list = dataset['AtimeSs'][j][1:-1].split(',')
    Gtimes_list = dataset['GtimeSs'][j][1:-1].split(',')
    size=min(len(Atimes_list),len(Gtimes_list))    #加速计和陀螺仪由于采样误差可能数据集长度会相差1个左右，保留二者相同长度数据
    Atimes_float = [float(i) for i in Atimes_list][0:size]
    Gtimes_float = [float(i) for i in Gtimes_list][0:size]
    accXs_list=dataset['accXs'][j][1:-1].split(',')
    accXs_float=[float(i) for i in accXs_list][0:size]
    accYs_list=dataset['accYs'][j][1:-1].split(',')
    accYs_float=[float(i) for i in accYs_list][0:size]
    accZs_list = dataset['accZs'][j][1:-1].split(',')
    accZs_float = [float(i) for i in accZs_list][0:size]
    gyroXs_list=dataset['gyroXs'][j][1:-1].split(',')
    gyroXs_float = [float(i) for i in gyroXs_list][0:size]
    gyroYs_list = dataset['gyroYs'][j][1:-1].split(',')
    gyroYs_float = [float(i) for i in gyroYs_list][0:size]
    gyroZs_list = dataset['gyroZs'][j][1:-1].split(',')
    gyroZs_float = [float(i) for i in gyroZs_list][0:size]
    weight=[dataset['weight'][j] for i in range(size)]
    height=[dataset['height'][j] for i in range(size)]
    year=[dataset['year'][j] for i in range(size)]
    yundong=[dataset['yundong'][j] for i in range(size)]
    openid=[dataset['_openid'][j] for i in range(size)]
    gender=[dataset['gender'][j] for i in range(size)]
    #转化完数据生成dataframe格式然后写入excel，每一个sheet是一个记录人做的整套动作数据
    pd1=pd.DataFrame({'accXs':accXs_float,'accYs':accYs_float,'accZs':accZs_float,
                      'gyroXs':gyroXs_float,'gyroYs':gyroYs_float,'gyroZs':gyroZs_float,'AtimeSs':Atimes_float,'GtimeSs':Gtimes_float,
                      'weight':weight,'height':height,'year':year, 'yundong':yundong,'openid':openid,'gender':gender})
    pd1.to_excel(writer,sheet_name=str(j)+'sheet',index=False)
    writer.save()
#将四个动作数据集均执行一遍此操作转化为指定格式再合并成一个csv数据集，得到训练算法用的hhyHAR_6data.csv
#此处由于数据量小且采集人群均为19-20岁女生，除六轴以外的特征不适合用于训练算法。
# 如果扩大数据集至多年龄层、性别平均的数据集则可作为额外特征训练。

In [9]:
#CNN算法
import pandas as pd
import numpy as np
from scipy import stats
import tensorflow as tf
np.random.seed(444)
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"  #取消使用GPU，采用cpu跑算法
# import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()  #如果print(tf.__version__)为2.X以上的版本则需要该两行代码，1.X版本不需要

def read_data(file_path):
    column_names = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'class'] #读取六轴和标签数据
    data = pd.read_csv(file_path, header=None, names=column_names)
    return data

def feature_normalize(dataset):
    mu = np.mean(dataset, axis=0)  #数据标准化为均值为0方差为1的数据
    sigma = np.std(dataset, axis=0)
    return (dataset - mu) / sigma

def windows(data, size):    #通过滑动窗口采样，50%overlap重叠
    start = 0
    while start < data.count():
        yield int(start), int(start + size)
        start += (size / 2)


def segment_signal(data, window_size=128):   #滑动窗口采样2.56s=128个数据
    segments = np.empty((0, window_size, 6))  #segment将128个六轴数据整合成（1*128*6）的向量然后拼接所有向量成X集
    labels = np.empty((0))              #label将128个六轴数据的动作标签转化成128*1的向量然后拼接所有向量
    for (start, end) in windows(data['class'], window_size):
        x = data["accX"][start:end]
        y = data["accY"][start:end]
        z = data["accZ"][start:end]
        gx = data["gyroX"][start:end]
        gy = data["gyroY"][start:end]
        gz = data["gyroZ"][start:end]
        if (len(dataset['class'][start:end]) == window_size):
            segments = np.vstack([segments, np.dstack([x, y, z, gx, gy, gz])])
            labels = np.append(labels, stats.mode(data["class"][start:end])[0][0])
    return segments, labels


def weight_variable(shape):    #权重矩阵初始化
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)


def bias_variable(shape):
    initial = tf.constant(0.0, shape=shape)
    return tf.Variable(initial)


def depthwise_conv2d(x, W):   #卷积
    return tf.nn.depthwise_conv2d(x, W, [1, 1, 1, 1], padding='VALID')


def apply_depthwise_conv(x, kernel_size, num_channels, depth):
    weights = weight_variable([1, kernel_size, num_channels, depth])
    biases = bias_variable([depth * num_channels])
    return tf.nn.relu(tf.add(depthwise_conv2d(x, weights), biases))


def apply_max_pool(x, kernel_size, stride_size):   #池化
    return tf.nn.max_pool(x, ksize=[1, 1, kernel_size, 1],
                          strides=[1, 1, stride_size, 1], padding='VALID')

In [10]:
#读取数据并标准化数据集
dataset = read_data('hhyHAR_6data.csv')
dataset['accX'] = feature_normalize(dataset['accX'])
dataset['accY'] = feature_normalize(dataset['accY'])
dataset['accZ'] = feature_normalize(dataset['accZ'])
dataset['gyroX'] = feature_normalize(dataset['gyroX'])
dataset['gyroY'] = feature_normalize(dataset['gyroY'])
dataset['gyroZ'] = feature_normalize(dataset['gyroZ'])

segments, labels = segment_signal(dataset)
labels = np.asarray(pd.get_dummies(labels), dtype=np.int8)  #get dummies独热编码标签，4种动作对应[1,0,0,0],[0,1,0,0]等
reshaped_segments = segments.reshape(len(segments), 1, 128, 6)

#分割数据集
train_test_split = np.random.rand(len(reshaped_segments)) < 0.70
train_x = reshaped_segments[train_test_split]
train_y = labels[train_test_split]
test_x = reshaped_segments[~train_test_split]
test_y = labels[~train_test_split]

In [11]:
#输入参数
input_height = 1
input_width = 128  #滑动窗口大小
num_labels = 4  #动作分类数
num_channels = 6  #输入的X维数（六轴数据）

batch_size = 10
kernel_size = 60
depth = 60
num_hidden = 1000

learning_rate = 0.0001
training_epochs = 10

total_batches = train_x.shape[0] // batch_size

In [12]:
#X是输入，Y是输出，c卷积，p池化，flatten然后输出y
X = tf.placeholder(tf.float32, shape=[None, input_height, input_width, num_channels])
Y = tf.placeholder(tf.float32, shape=[None, num_labels])

c = apply_depthwise_conv(X, kernel_size, num_channels, depth)
p = apply_max_pool(c, 20, 2)
c = apply_depthwise_conv(p, 6, depth * num_channels, depth // 10)

shape = c.get_shape().as_list()
c_flat = tf.reshape(c, [-1, shape[1] * shape[2] * shape[3]])

f_weights_l1 = weight_variable([shape[1] * shape[2] * depth * num_channels * (depth // 10), num_hidden])
f_biases_l1 = bias_variable([num_hidden])
f = tf.nn.tanh(tf.add(tf.matmul(c_flat, f_weights_l1), f_biases_l1))

out_weights = weight_variable([num_hidden, num_labels])
out_biases = bias_variable([num_labels])
y_ = tf.nn.softmax(tf.matmul(f, out_weights) + out_biases)

In [13]:
#迭代更新和计算损失函数、准确率
loss = -tf.reduce_sum(Y * tf.log(y_))
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)

correct_prediction = tf.equal(tf.argmax(y_, 1), tf.argmax(Y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

cost_history = np.empty(shape=[1],dtype=float)

with tf.Session() as session:
    tf.global_variables_initializer().run()
    for epoch in range(training_epochs):
        for b in range(total_batches):
            offset = (b * batch_size) % (train_y.shape[0] - batch_size)
            batch_x = train_x[offset:(offset + batch_size), :, :, :]
            batch_y = train_y[offset:(offset + batch_size), :]
            _, c = session.run([optimizer, loss],feed_dict={X: batch_x, Y : batch_y})
            cost_history = np.append(cost_history,c)
        print ("Epoch: ",epoch," Training Loss: ",c," Training Accuracy: ",session.run(accuracy, feed_dict={X: train_x, Y: train_y}))
    print('Testing Accuracy:', session.run(accuracy, feed_dict={X: test_x, Y: test_y}))
    # print('y=',session.run(y_,feed_dict={X : test_x[150:155, :, :, :]}), 'Y=', train_y[150:155])


Epoch:  0  Training Loss:  12.992293  Training Accuracy:  0.46953404
Epoch:  1  Training Loss:  13.448204  Training Accuracy:  0.5878136
Epoch:  2  Training Loss:  11.726599  Training Accuracy:  0.6702509
Epoch:  3  Training Loss:  10.197773  Training Accuracy:  0.7670251
Epoch:  4  Training Loss:  9.0141535  Training Accuracy:  0.83870965
Epoch:  5  Training Loss:  8.119954  Training Accuracy:  0.8637993
Epoch:  6  Training Loss:  7.428698  Training Accuracy:  0.8781362
Epoch:  7  Training Loss:  6.8707523  Training Accuracy:  0.8888889
Epoch:  8  Training Loss:  6.411648  Training Accuracy:  0.90681005
Epoch:  9  Training Loss:  6.031598  Training Accuracy:  0.9175627
Testing Accuracy: 0.80733943


In [14]:
#MLP算法
import pandas as pd
import numpy as np
from scipy import stats
np.random.seed(444)
def read_data(file_path):
    column_names = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ','class']
    data = pd.read_csv(file_path, header=None, names=column_names)
    return data

dataset=read_data('hhyHAR_6data.csv')

def windows(data, size):
    start = 0
    while start < data.count():
        yield int(start), int(start + size)
        start += (size / 2)


def segment_signal(data, window_size=45):
    segments = np.empty((0, window_size*6))
    labels = np.empty((0))
    for (start, end) in windows(data['class'], window_size):
        x = list(data["accX"][start:end])
        y = list(data["accY"][start:end])
        z = list(data["accZ"][start:end])
        gx = list(data["gyroX"][start:end])
        gy = list(data["gyroY"][start:end])
        gz = list(data["gyroZ"][start:end])
        if (len(dataset['class'][start:end]) == window_size):
            segments = np.vstack([segments, x+y+z+gx+gy+gz]) #由于MLP的输入是一维向量，则把滑动窗口取得的六维数据拼接成一维的输入X
            labels = np.append(labels, stats.mode(data["class"][start:end])[0][0])
    return segments, labels

segments, labels = segment_signal(dataset)
labels = np.array(labels,dtype=int)
reshaped_segments=segments.reshape(len(segments),45*6)
train_test_split = np.random.rand(len(reshaped_segments)) < 0.70
X_train = reshaped_segments[train_test_split]
y_train = labels[train_test_split]
X_test = reshaped_segments[~train_test_split]
y_test = labels[~train_test_split]


from sklearn.neural_network import MLPClassifier
#指定激活函数为relu函数  隐藏层有两层，一层130个神经元，一层5个神经元
mlp = MLPClassifier(hidden_layer_sizes=(130, 5), max_iter=1000, activation='relu')
mlp.fit(X_train, y_train)
predictions = mlp.predict(X_test)
from sklearn.metrics import classification_report, confusion_matrix
print(confusion_matrix(y_test, predictions))
print(classification_report(y_test, predictions))
# print(mlp.classes_)  #可以看各种参数
# print(mlp.n_outputs_)
# print(mlp.out_activation_)
# print(np.array(mlp.coefs_).shape)
# print(len(mlp.coefs_))
# print(len(mlp.coefs_[0]))
# print(len(mlp.intercepts_[0]))
# print(np.array(mlp.coefs_[0]))
# print(np.array(mlp.coefs_[0]).shape)
# print(np.array(mlp.coefs_).shape)
# print(np.array(mlp.coefs_[1]).shape)
# print(np.array(mlp.coefs_[2]).shape)
# print(np.array(mlp.intercepts_).shape)
# print(np.array(mlp.intercepts_[0]).shape)
# print(np.array(mlp.intercepts_[1]).shape)
# print(np.array(mlp.intercepts_[2]).shape)
# #

[[72  6  7  8]
 [ 4 81  3  2]
 [ 8  2 68  4]
 [ 5  6  3 41]]
             precision    recall  f1-score   support

          0       0.81      0.77      0.79        93
          1       0.85      0.90      0.88        90
          2       0.84      0.83      0.83        82
          3       0.75      0.75      0.75        55

avg / total       0.82      0.82      0.82       320



In [15]:
#将训练得到的MLP参数导出在excel中，之后再导入小程序js页面data里，就可以通过矩阵运算得到分类值
#W是三层的权重矩阵，b是三层的偏置，fX=WX+b,中间激活函数用relu，输出y的分类激活函数用softmax
df1=pd.DataFrame(np.array(mlp.coefs_[0]))
writer = pd.ExcelWriter('W.xlsx')  
df1.to_excel(writer,'w1',index=False)
df2=pd.DataFrame(np.array(mlp.coefs_[1]))
df2.to_excel(writer,'w2',index=False)
df3=pd.DataFrame(np.array(mlp.coefs_[2]))
df3.to_excel(writer,'w3',index=False)
writer.save()

df4=pd.DataFrame(np.array(mlp.intercepts_[0]))
writer1 = pd.ExcelWriter('b.xlsx')
df4.to_excel(writer1,'b1',index=False)
df5=pd.DataFrame(np.array(mlp.intercepts_[1]))
df5.to_excel(writer1,'b2',index=False)
df6=pd.DataFrame(np.array(mlp.intercepts_[2]))
df6.to_excel(writer1,'b3',index=False)
writer1.save()

In [16]:
#LSTM算法
import pandas as pd
import numpy as np
from scipy import stats
import tensorflow as tf
np.random.seed(444)
import os
os.environ["CUDA_VISIBLE_DEVICES"]="-1"
# import tensorflow.compat.v1 as tf
# tf.disable_eager_execution()
# tf.disable_v2_behavior()   #tf版本高于2.0才需要这样导入

def read_data(file_path):
    column_names = ['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ','class']  #class是动作标签
    data = pd.read_csv(file_path, header=None, names=column_names)
    return data

def feature_normalize(dataset):
    mu = np.mean(dataset, axis=0)
    sigma = np.std(dataset, axis=0)
    return (dataset - mu) / sigma


def windows(data, size):
    start = 0
    while start < data.count():
        yield int(start), int(start + size)
        start += (size / 2)


def segment_signal(data, window_size=128):
    segments = np.empty((0, window_size, 6))
    labels = np.empty((0))
    for (start, end) in windows(data['class'], window_size):
        x = data["accX"][start:end]
        y = data["accY"][start:end]
        z = data["accZ"][start:end]
        gx = data["gyroX"][start:end]
        gy = data["gyroY"][start:end]
        gz = data["gyroZ"][start:end]
        if (len(dataset['class'][start:end]) == window_size):
            segments = np.vstack([segments, np.dstack([x, y, z, gx, gy, gz])])
            labels = np.append(labels, stats.mode(data["class"][start:end])[0][0])
    return segments, labels


dataset = read_data('hhyHAR_6data.csv')
dataset['accX'] = feature_normalize(dataset['accX'])
dataset['accY'] = feature_normalize(dataset['accY'])
dataset['accZ'] = feature_normalize(dataset['accZ'])
dataset['gyroX'] = feature_normalize(dataset['gyroX'])
dataset['gyroY'] = feature_normalize(dataset['gyroY'])
dataset['gyroZ'] = feature_normalize(dataset['gyroZ'])

segments, labels = segment_signal(dataset)
labels = np.asarray(pd.get_dummies(labels), dtype=np.int8)
reshaped_segments = segments.reshape(len(segments), 128, 6)


train_test_split = np.random.rand(len(reshaped_segments)) < 0.70
X_train = reshaped_segments[train_test_split]
y_train = labels[train_test_split]
X_test = reshaped_segments[~train_test_split]
y_test = labels[~train_test_split]


In [17]:
# 输入参数
training_data_count = len(X_train)  # 通过滑动窗口取样后的训练集数据个数
test_data_count = len(X_test)  # 测试集数据个数
n_steps = len(X_train[0])  # 128维的步长（滑动窗口2.56s)
n_input = len(X_train[0][0])  # 6维数据
#LSTM结构
n_hidden = 32 # 隐藏层特征数
n_classes = 4 # 4种动作分类
#训练超参数
learning_rate = 0.0025
lambda_loss_amount = 0.0015
training_iters = training_data_count * 300  #迭代训练300次
batch_size = 1500
display_iter = 30000  #显示测试集准确率

print("(X test shape, y test shape, every Xtest's mean, every Xtest's standard deviation)")
print(X_test.shape, y_test.shape, np.mean(X_test), np.std(X_test))
def LSTM_RNN(_X, _weights, _biases):
    # 输入(batch_size, n_steps, n_input)
    _X = tf.transpose(_X, [1, 0, 2])  
    _X = tf.reshape(_X, [-1, n_input])
    # reshape后(n_steps*batch_size, n_input)
    #激活函数用relu
    _X = tf.nn.relu(tf.matmul(_X, _weights['hidden']) + _biases['hidden'])
    _X = tf.split(_X, n_steps, 0)
    
    lstm_cell_1 = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
    lstm_cell_2 = tf.contrib.rnn.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
    lstm_cells = tf.contrib.rnn.MultiRNNCell([lstm_cell_1, lstm_cell_2], state_is_tuple=True)

    outputs, states = tf.contrib.rnn.static_rnn(lstm_cells, _X, dtype=tf.float32)

    lstm_last_output = outputs[-1]
    #输出计算结果
    return tf.matmul(lstm_last_output, _weights['out']) + _biases['out']


def extract_batch_size(_train, step, batch_size):

    shape = list(_train.shape)
    shape[0] = batch_size
    batch_s = np.empty(shape)

    for i in range(batch_size):
        index = ((step-1)*batch_size + i) % len(_train)
        batch_s[i] = _train[index]

    return batch_s


(X test shape, y test shape, every Xtest's mean, every Xtest's standard deviation)
(109, 128, 6) (109, 4) 0.04011959562112301 1.0255589600638917


In [18]:
x = tf.placeholder(tf.float32, [None, n_steps, n_input])
y = tf.placeholder(tf.float32, [None, n_classes])

weights = {
    'hidden': tf.Variable(tf.random_normal([n_input, n_hidden])), # Hidden layer weights
    'out': tf.Variable(tf.random_normal([n_hidden, n_classes], mean=1.0))
}
biases = {
    'hidden': tf.Variable(tf.random_normal([n_hidden])),
    'out': tf.Variable(tf.random_normal([n_classes]))
}

pred = LSTM_RNN(x, weights, biases)

#计算损失函数，准确率，迭代更新
l2 = lambda_loss_amount * sum(
    tf.nn.l2_loss(tf_var) for tf_var in tf.trainable_variables()
) #防止过拟合
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y, logits=pred)) + l2 # softmax损失函数
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost) 

correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

test_losses = []
test_accuracies = []
train_losses = []
train_accuracies = []

  import pandas.util.testing as tm


Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See @{tf.nn.softmax_cross_entropy_with_logits_v2}.



In [19]:
#运行LSTM训练代码
sess = tf.InteractiveSession(config=tf.ConfigProto(log_device_placement=True))
init = tf.global_variables_initializer()
sess.run(init)

step = 1
while step * batch_size <= training_iters:
    batch_xs = extract_batch_size(X_train, step, batch_size)
    batch_ys = extract_batch_size(y_train, step, batch_size)

    _, loss, acc = sess.run(
        [optimizer, cost, accuracy],
        feed_dict={
            x: batch_xs,
            y: batch_ys
        }
    )
    train_losses.append(loss)
    train_accuracies.append(acc)

    if (step*batch_size % display_iter == 0) or (step == 1) or (step * batch_size > training_iters):

        print("Training iter #" + str(step*batch_size) + \
              ":   Batch Loss = " + "{:.6f}".format(loss) + \
              ", Accuracy = {}".format(acc))

        loss, acc = sess.run(
            [cost, accuracy],
            feed_dict={
                x: X_test,
                y: y_test
            }
        )
        test_losses.append(loss)
        test_accuracies.append(acc)
        print("PERFORMANCE ON TEST SET: " + \
              "Batch Loss = {}".format(loss) + \
              ", Accuracy = {}".format(acc))

    step += 1

print("训练结束")

one_hot_predictions, accuracy, final_loss = sess.run(
    [pred, accuracy, cost],
    feed_dict={
        x: X_test,
        y: y_test
    }
)
test_losses.append(final_loss)
test_accuracies.append(accuracy)

print("FINAL RESULT: " + \
      "Batch Loss = {}".format(final_loss) + \
      ", Accuracy = {}".format(accuracy))



Training iter #1500:   Batch Loss = 252.723816, Accuracy = 0.33933332562446594
PERFORMANCE ON TEST SET: Batch Loss = 241.0306396484375, Accuracy = 0.4220183491706848
Training iter #30000:   Batch Loss = 96.057137, Accuracy = 0.8659999966621399
PERFORMANCE ON TEST SET: Batch Loss = 91.11182403564453, Accuracy = 0.7981651425361633
Training iter #60000:   Batch Loss = 30.800400, Accuracy = 0.9786666631698608
PERFORMANCE ON TEST SET: Batch Loss = 29.50992202758789, Accuracy = 0.8899082541465759
训练结束
FINAL RESULT: Batch Loss = 12.188701629638672, Accuracy = 0.9082568883895874
