# LSTMで体のモーションを推定する  
## MediaPipeを使用  
Unityのゲーム用バージョン

### 準備

In [None]:
""" ライブラリ読み込み """
import torch
import numpy as np
import torch.optim as optim
from torch import nn
import os

In [None]:
""" GPU設定 """
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [None]:
'''モーションデータを扱うクラス '''
class motion_data():
    
    # numFrame:取り出すフレーム数、interval:取り出す間隔
    def __init__(self, numFrame):
        self.filelist = []       # 学習用データのファイルリスト
        self.dataset = []        # 学習用データセット（CSVから取得）
        self.frameTime = []      # 各フレームの時刻（CSVから取得）
        self.feature_len = 16     # 特徴量抽出後のベクトルの長さ（extract_feature関数に合わせて変更する）
        self.numFrame = numFrame
        
    # csvファイルから学習用データを取得
    def get_csv_data(self, dirName):
        self.filelist = []       # 学習用データのファイルリスト
        self.dataset = []        # 学習用データセット（CSVから取得）
        self.frameTime = []      # 各フレームの時刻（CSVから取得）
        dirName = dirName + "/"
        self.filelist = os.listdir(dirName)
        for fileName in self.filelist:
            fname = dirName + fileName
            # 以下はnpyファイルを読み込むとき
            #newdata = np.load(fname)
            # 以下はcsvファイルを読み込むとき
            csv_data = np.loadtxt(fname, delimiter=',', skiprows=2)    # skiprowsには先頭の飛ばす行数を指定
            newframeTime = csv_data[:, 1]
            newdata = np.array(csv_data[:, 2:])
            #newdata = newdata.reshape([csv_data.shape[0], csv_data.shape[1] // 3, 3])
            self.dataset.extend([newdata])
            self.frameTime.extend([newframeTime])
            
    def angle_vec2(self, v1, v2):
        return np.arccos(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))
    
    # p->p1とp->p2のベクトルの前額面の角度・矢状面の角度を返す
    def frontal_satittal_angle(self, p, p1, p2):
        v1, v2 = p1 - p, p2 - p
        v1_f, v1_s = np.array([v1[0], v1[1]]), np.array([v1[2], v1[1]])
        v2_f, v2_s = np.array([v2[0], v2[1]]), np.array([v2[2], v2[1]])
        frontal = self.angle_vec2(v1_f, v2_f)
        sagittal = self.angle_vec2(v1_s, v2_s)
        return [frontal, sagittal]
            
    # 特徴量抽出を行う関数
    # 圧縮後の次元数がself.feature_lenになるように注意
    # 引数：1時刻のデータ
    def extract_feature(self, data):
        # ここで何らかの特徴量抽出の処理をする
        vec3_data = data.reshape([len(data) // 3, 3])
        
        lhip = vec3_data[23]
        lshoulder = vec3_data[11]
        lknee = vec3_data[25]
        lelbow = vec3_data[13]
        lankle = vec3_data[27]
        lwrist = vec3_data[15]
        lhip_f, lhip_s = self.frontal_satittal_angle(lhip, lshoulder, lknee)
        lshoulder_f, lshouler_s = self.frontal_satittal_angle(lshoulder, lhip, lelbow)
        lknee_f, lknee_s = self.frontal_satittal_angle(lknee, lhip, lankle)
        lelbow_f, lelbow_s = self.frontal_satittal_angle(lelbow, lshoulder, lwrist)
        
        rhip = vec3_data[24]
        rshoulder = vec3_data[12]
        rknee = vec3_data[26]
        relbow = vec3_data[14]
        rankle = vec3_data[28]
        rwrist = vec3_data[16]
        rhip_f, rhip_s = self.frontal_satittal_angle(rhip, rshoulder, rknee)
        rshoulder_f, rshouler_s = self.frontal_satittal_angle(rshoulder, rhip, relbow)
        rknee_f, rknee_s = self.frontal_satittal_angle(rknee, rhip, rankle)
        relbow_f, relbow_s = self.frontal_satittal_angle(relbow, rshoulder, rwrist)
        
        retData = [lhip_f, lhip_s, lshoulder_f, lshouler_s, lknee_f, lknee_s, lelbow_f, lelbow_s,
                   rhip_f, rhip_s, rshoulder_f, rshouler_s, rknee_f, rknee_s, relbow_f, relbow_s]
        retData =  np.array(retData)
        
        ''' 以下はカメラ1台のとき
        vec2_data = vec3_data[:, 0:2]    # z座標は使わない
        
        lhip = vec2_data[23]
        lshoulder = vec2_data[11]
        lknee = vec2_data[25]
        lelbow = vec2_data[13]
        lankle = vec2_data[27]
        lhip_shoulder = lshoulder - lhip
        lhip_knee = lknee - lhip
        lhip_angle = self.angle_vec2(lhip_shoulder, lhip_knee)
        lshoulder_elbow = lelbow - lshoulder
        lshoulder_angle = self.angle_vec2(-lhip_shoulder, lshoulder_elbow)
        lknee_ankle = lankle - lknee
        lknee_angle = self.angle_vec2(lknee_ankle, -lhip_knee)
        
        rhip = vec2_data[24]
        rshoulder = vec2_data[12]
        rknee = vec2_data[26]
        relbow = vec2_data[14]
        rankle = vec2_data[28]
        rhip_shoulder = rshoulder - rhip
        rhip_knee = rknee - rhip
        rhip_angle = self.angle_vec2(rhip_shoulder, rhip_knee)
        rshoulder_elbow = relbow - rshoulder
        rshoulder_angle = self.angle_vec2(-rhip_shoulder, rshoulder_elbow)
        rknee_ankle = rankle - rknee
        rknee_angle = self.angle_vec2(rknee_ankle, -rhip_knee)
        
        retData = np.array([lhip_angle, lshoulder_angle, lknee_angle, rhip_angle, rshoulder_angle, rknee_angle])
        '''
        return retData
    
    # モーションデータの中から引数intervalの間隔でデータを取り出し
    # それを複数パターンでndarray化して返す
    def split_interval_data(self, data, interval):
        numData = len(data) - (interval * (self.numFrame - 1))
        if (numData <= 0):
            return np.empty((0, self.numFrame, self.feature_len))
        retData = np.empty((numData, self.numFrame, self.feature_len))
        for i in range(numData):
            splitData = np.empty((self.numFrame, self.feature_len))    # 分割データの保存先
            for j in range(self.numFrame):
                splitData[j] = self.extract_feature(data[i + interval * j])
            retData[i] = splitData
        return retData

    # モーションデータをminIntervalから最大のインターバルで間引きしたnumFrameの長さのデータセットを返す
    # これにより様々な速さのモーションデータセットを得る
    def get_interval_data(self, minInterval, correctID, maxInterval=-1):
        numMotion = len(self.dataset)
        dataset = np.empty((0, self.numFrame, self.feature_len))
        for i in range(numMotion):    # すべてのモーションデータセットに対して
            if maxInterval < 0:
                maxInterval = (len(self.dataset[i]) - 1) // (self.numFrame - 1)
            for interval in range(minInterval, maxInterval + 1):
                splitdata = self.split_interval_data(self.dataset[i], interval)
                dataset = np.concatenate([dataset, splitdata])
        correct_data = np.ones(len(dataset), dtype=np.int64) * correctID
        return [dataset, correct_data]

In [None]:
# モーションデータのオブジェクト
numFrame = 5     # モーションデータのフレーム数
interval = 2     # モーションデータを何フレームに一度取り出すか
maxInterval = 10    # 1モーションデータの最大のフレーム数
motion_count = 4
m = []
for i in range(motion_count):
    m.append(motion_data(numFrame))     # モーションのデータオブジェクト

In [None]:
""" LSTMのモデルクラス """
class Net(torch.nn.Module):
    
    def __init__(self):
        super(Net, self).__init__()
        self.feature_size = 16         # 特徴量x(t)の次元
        self.hidden_layer_size = 24    # 隠れ層のサイズ
        self.lstm_layers = 1           # LSTMのレイヤー数　(LSTMを何層重ねるか)
        self.output_size = 4           # 出力層のサイズ
        
        self.lstm = torch.nn.LSTM(self.feature_size, 
                                  self.hidden_layer_size, 
                                  num_layers = self.lstm_layers)
        
        self.fc = torch.nn.Linear(self.hidden_layer_size, self.output_size)
        
        self.softmax = torch.nn.Softmax(dim=1)  # softmax関数で配列の要素の総和が1になるように変換
        
    def init_hidden_cell(self, batch_size): # LSTMの隠れ層 hidden と記憶セル cell を初期化
        hedden = torch.zeros(self.lstm_layers, batch_size, self.hidden_layer_size)
        cell = torch.zeros(self.lstm_layers, batch_size, self.hidden_layer_size)        
        return (hedden, cell)

    def forward(self, x):
        batch_size = x.shape[0]
        self.hidden_cell = self.init_hidden_cell(batch_size)
        x = x.permute(1, 0, 2)                                   # (Batch, Seqence, Feature) -> (Seqence , Batch, Feature)
        
        lstm_out, (h_n, c_n) = self.lstm(x, self.hidden_cell)    # LSTMの入力データのShapeは(Seqence, Batch, Feature)
                                                                 # (h_n) のShapeは (num_layers, batch, hidden_size)
        x = h_n[-1,:,:]                                          # lstm_layersの最後のレイヤーを取り出す  (B, h)
                                                                 # lstm_outの最後尾と同じ？
        x = self.fc(x)
        x = self.softmax(x)
        
        return x

In [None]:
# 比較用にDNNも用意
class DNN(nn.Module):
    def __init__(self):  # コンストラクタ
        super().__init__()  # 親クラスのコンストラクタを呼び出し
        feature_size = 16         # 特徴量x(t)の次元
        time_size = 5             # 時系列の長さ
        hidden_layer_size1 = 24   # 隠れ層のサイズ
        hidden_layer_size2 = 24
        output_size = 4           # 出力層のサイズ
        
        self.input_size = feature_size * time_size
        fc1 = nn.Linear(self.input_size, hidden_layer_size1)  # 入力層 -> 隠れ層
        sig1 = nn.Sigmoid()  # 活性化関数(シグモイド)、行列計算？
        fc2 = nn.Linear(hidden_layer_size1, hidden_layer_size2)  # 入力層 -> 隠れ層
        sig2 = nn.Sigmoid()
        fc3 = nn.Linear(hidden_layer_size2, output_size)  # 隠れ層 -> 出力層
        softmax = torch.nn.Softmax(dim=1)  # softmax関数で配列の要素の総和が1になるように変換
        self.model_info = nn.ModuleList([fc1, sig1, fc2, sig2, fc3, softmax])  # メンバ変数に登録

    # forwardメソッドは親クラスの__call__メソッドから呼び出される、__call__は呼び出し可能オブジェクト
    def forward(self, x):
        batch_size = x.shape[0]
        x = x.reshape([batch_size, self.input_size])
        for i in range(len(self.model_info)):
            x = self.model_info[i](x)    # self.model_infoをすべて実行
        return x

### 学習

In [None]:
import random

""" データの準備 """
# 訓練データ（とテストデータ）から一度に何個のデータを読み込むかを指定する値
BATCH_SIZE = 20

# モーションデータを取り出して分割
m[0].get_csv_data("./stop")
m[1].get_csv_data("./walk")
m[2].get_csv_data("./jump")
m[3].get_csv_data("./run")

input_data = []
correct_data = []
data_size = []
for i in range(motion_count):
    print("Motion:%d Processing data..." % i)
    ms, mc = m[i].get_interval_data(interval, i, maxInterval=20)
    input_data.append(ms)
    correct_data.append(mc)
    data_size.append(len(mc))
print("Data size: " + str(data_size))
    
# 学習データの数をモーションごとにそろえる（少ないデータの数に合わせる）
min_data_size = min(data_size)
for i in range(motion_count):
    idx = random.sample(list(range(data_size[i])), k=min_data_size)
    input_data[i] = (input_data[i])[idx]
    correct_data[i] = (correct_data[i])[idx]

# 学習用モーションデータと正解ラベルのデータを作成
input_data = np.concatenate(input_data)    # リストの要素同士を結合
correct_data = np.concatenate(correct_data)

np.save('input_data', input_data)
np.save('correct_data', correct_data)
print(input_data * 180 / np.pi)

In [None]:
input_data = np.load('input_data.npy')
correct_data = np.load('correct_data.npy')

# テンソルに変換
input_data = torch.FloatTensor(input_data)
correct_data = torch.LongTensor(correct_data)
#input_data = torch.cuda.FloatTensor(input_data)
#correct_data = torch.cuda.LongTensor(correct_data)

# データセットの準備
dataset = torch.utils.data.TensorDataset(input_data, correct_data)
# 学習用データと検証用データに分割
train_size = int(0.8 * len(dataset))    # 学習用データのサイズ（全体の8割）
test_size = len(dataset) - train_size    # 検証用データのサイズ
trainset, testset = torch.utils.data.random_split(dataset, [train_size, test_size])
trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)
#print(vars(dataset))

In [8]:
""" 設定 """
# モデル作成
net = Net().to(device)
#net = DNN().to(device)
# 損失関数（計算結果と正解ラベルの誤差を比較、それを基に最適化）
criterion = torch.nn.CrossEntropyLoss()  # CrossEntropyLossは損失関数に多クラス分類でよく使われる
# 最適化アルゴリズム
#optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)  # SGD（確率的勾配降下法）
optimizer = optim.Adam(net.parameters(), lr=0.001)

In [9]:
""" 学習 """
EPOCHS = 2  # すべての入力に対してn回実行

for epoch in range(1, EPOCHS + 1):
    running_loss = 0.0  # 平均値出力用
    for count, item in enumerate(trainloader, 1):  # BATCH_SIZEごとに実行するため、このときcountの値を増やす
        inputs, labels = item  # trainloader経由でデータを20個取り出す
        
        # CUDAで使えるようキャスト
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()  # 重みとバイアスの更新で内部的に使用するデータセット
        
        # Runs the forward pass with autocasting.
        outputs = net(inputs)  # ニューラルネットワークにデータを入力
        
        loss = criterion(outputs, labels)  # 正解ラベルとの比較

        loss.backward()  # 誤差逆伝播
        optimizer.step()  # 重みとバイアスの更新

        running_loss += loss.item()
        times = 100
        if count % times == 0:
            print(f'#{epoch}, data: {count * BATCH_SIZE}, running_loss: {running_loss / times:1.3f}')
            running_loss = 0.0

torch.save(net, 'body1.pth')
print('Training Finished')

#1, data: 2000, running_loss: 1.243
#1, data: 4000, running_loss: 0.947
#1, data: 6000, running_loss: 0.871
#1, data: 8000, running_loss: 0.825
#1, data: 10000, running_loss: 0.792
#1, data: 12000, running_loss: 0.780
#1, data: 14000, running_loss: 0.769
#1, data: 16000, running_loss: 0.768
#1, data: 18000, running_loss: 0.761
#1, data: 20000, running_loss: 0.760
#2, data: 2000, running_loss: 0.762
#2, data: 4000, running_loss: 0.761
#2, data: 6000, running_loss: 0.761
#2, data: 8000, running_loss: 0.763
#2, data: 10000, running_loss: 0.753
#2, data: 12000, running_loss: 0.754
#2, data: 14000, running_loss: 0.754
#2, data: 16000, running_loss: 0.751
#2, data: 18000, running_loss: 0.751
#2, data: 20000, running_loss: 0.755
Training Finished


In [10]:
""" 結果出力 """
_, predicted = torch.max(outputs, 1)
print(predicted)
print(labels)

correct = 0
total = 0

with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs)
        _, predicted = torch.max(outputs, 1)
        total += len(outputs)
        correct += (predicted == labels).sum().item()

print(f'correct: {correct}, accuracy: {correct} / {total} = {100 * correct / total} %')


tensor([0, 0, 0, 1, 0])
tensor([0, 0, 0, 1, 0])
correct: 5044, accuracy: 5044 / 5067 = 99.54608249457273 %


### リアルタイム認識

#### UDPの準備

In [None]:
from socket import *

class udp_client:
    def __init__(self):
        # 送信側IP
        SrcIP = "127.0.0.1"    # localhost
        # 送信側ポート番号
        SrcPort = 11111
        # 送信側アドレスをtupleに格納
        self.SrcAddr = (SrcIP, SrcPort)

        # 受信側アドレスの設定
        # 受信側IP
        DstIP = "127.0.0.1"
        # 受信側ポート番号
        DstPort = 22222
        # 受信側アドレスをtupleに格納
        self.DstAddr = (DstIP, DstPort)

        # ソケット作成
        self.udpClntSock = socket(AF_INET, SOCK_DGRAM)
        # 送信側アドレスでソケットを設定
        self.udpClntSock.bind(self.SrcAddr)
        
    
    def __delete__(self):
        del self.udpClntSock
        
        
    def send(self, data):
        # バイナリに変換
        data = data.encode('utf-8')

        # 受信側アドレスに送信
        self.udpClntSock.sendto(data, self.DstAddr)

In [None]:
from IPython.display import clear_output
import torch
import cv2
import mediapipe as mp
import numpy as np

In [None]:
""" モーションの間引き """
class motion_analyze():
    # motion_lenはnumFrame, intervalので抽出して足りるフレーム数
    def __init__(self, motion_len):
        self.vec_size = 33
        self.data = np.zeros((motion_len, self.vec_size * 3), dtype=np.float32)
    
    def LandmarkToArray(self, landmark):
        return np.array([landmark.x, landmark.y, landmark.z])
    
    '''
    # 矢状面だけのとき
    def record_old(self, results):
        if results.pose_landmarks is None:
            return None
        
        data = np.empty((self.vec_size, 3), dtype=np.float32)
        for i in range(self.vec_size):
            data[i] = self.LandmarkToArray(results.pose_landmarks.landmark[i])
        data = data.reshape(self.vec_size * 3)
        
        self.data = np.roll(self.data, -1, axis=0)
        self.data[-1] = data
        #self.time_ary.append(time.perf_counter())
        
        return self.data
    '''
    
    def record(self, results_front, results_sagittal):
        if results_front.pose_landmarks is None:
            return None
        if results_sagittal.pose_landmarks is None:
            return None
        
        data = np.empty((self.vec_size, 3), dtype=np.float32)
        for i in range(self.vec_size):
            front = self.LandmarkToArray(results_front.pose_landmarks.landmark[i])
            sagittal = self.LandmarkToArray(results_sagittal.pose_landmarks.landmark[i])
            data[i][0] = front[0]    # x座標は前額面から
            #data[i][1] = (front[1] + sagittal[1]) / 2    # y座標は矢状面と前額面のy座標の平均
            data[i][1] = sagittal[1]    # y座標はとりあえず矢状面から取得
            data[i][2] = sagittal[0]    # z座標は矢状面から
        data = data.reshape(self.vec_size * 3)
        
        self.data = np.roll(self.data, -1, axis=0)
        self.data[-1] = data

        return self.data

In [None]:
""" MediaPipe複数台 """
class my_mediapipe():

    def __init__(self, capture=0, window_name="MediaPipe"):
        self.windowName = window_name
        # For webcam input:
        self.cap = cv2.VideoCapture(capture)

        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        self.mp_holistic = mp.solutions.holistic

        self.mp_drawing = mp.solutions.drawing_utils
        self.mesh_drawing_spec = self.mp_drawing.DrawingSpec(thickness=2,  color=(0,255,0))
        self.mark_drawing_spec = self.mp_drawing.DrawingSpec(thickness=3,  circle_radius=3, color=(0,0,255))

        self.holistic = self.mp_holistic.Holistic(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5)
        
        self.w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))         # カメラの横幅を取得
        self.h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))        # カメラの縦幅を取得


    # 終了時に必ず呼び出し
    def close(self):
        self.holistic.close()    # withを使わない代わり？
        self.cap.release()
        #cv2.destroyAllWindows()
        
        
    # with文が無くても動く？
    def loop(self, show_caputer=True):
        if self.cap.isOpened():
            success, image = self.cap.read()
            if not success:
                print("Ignoring empty camera frame.")
                # If loading a video, use 'break' instead of 'continue'.
                return
            
            image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)           # 時計回り90度回転
            # To improve performance, optionally mark the image as not writeable to
            # pass by reference.
            image.flags.writeable = False
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            results = self.holistic.process(image)

            # Draw landmark annotation on the image.
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            
            if (show_caputer is False):
                image = np.zeros([self.w, self.h, 3])

            self.mp_drawing.draw_landmarks(    # ポーズの特徴
                image,
                results.pose_landmarks,
                self.mp_holistic.POSE_CONNECTIONS,
                landmark_drawing_spec=self.mp_drawing_styles
                .get_default_pose_landmarks_style())

            # Flip the image horizontally for a selfie-view display.
            cv2.imshow(self.windowName, cv2.flip(image, 1))

            return results

        else:
            return None

In [None]:
motion_class = motion_analyze(numFrame)
udp = udp_client()

''' 顔とポーズを取得 '''
# https://google.github.io/mediapipe/solutions/holistic

""" GPU設定 """
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

""" 識別 """
net = torch.load('body1.pth')

mediapipe1 = my_mediapipe(0, window_name="MediaPipe1")    # 正面から
mediapipe2 = my_mediapipe(1, window_name="MediaPipe2")    # 横から

while(True):
    clear_output(wait=True)
    results1 = mediapipe1.loop(False)
    results2 = mediapipe2.loop(False)
    
    data = motion_class.record(results1, results2)
    if data is not None:
        motion = m[0].split_interval_data(data, 1)[0]
        motion = motion.reshape((1, numFrame, len(motion[0])))
        motion = torch.FloatTensor(motion)
        outputs = net(motion)  # ニューラルネットワークにデータを入力
        value, predicted_idx = torch.max(outputs, 1)
        answer = int(predicted_idx[0])
        print(value)
        if (answer == 0):
            print("stop")
            udp.send("stop")
        elif (answer == 1):
            print("walk")
            udp.send("walk")
        elif (answer == 2):
            print("jump")
            udp.send("jump")
        elif (answer == 3):
            print("run")
            udp.send("run")
        else:
            print("error")
        
    key = cv2.waitKey(1) & 0xFF
    if key == 27:    # Escで終了
        break

mediapipe1.close()
mediapipe2.close()
cv2.destroyAllWindows()
del udp

cpu


In [None]:
mediapipe1.close()
mediapipe2.close()
cv2.destroyAllWindows()
del udp