In [None]:
from persor import BVHparser
from scipy.spatial.transform import Rotation
import matplotlib.pyplot as plt
import numpy as np
import copy
import japanize_matplotlib

In [None]:
def get_joint_coords(skeleton, joint, coords):
    offset = skeleton[joint]["offset"]
    parent_joint = skeleton[joint]["joint"]
    child_joints = skeleton[joint]["children"]

    if parent_joint == None:
        return

    parent_coord = coords[parent_joint]["coord"]

    current_coord = [
        parent_coord[0] + offset[0],
        parent_coord[1] + offset[1],
        parent_coord[2] + offset[2],
    ]
    coords[joint] = {"coord": current_coord, "parent": parent_joint}

    for child in child_joints:
        get_joint_coords(skeleton, child, coords)


def skelton2coords(skeleton):
    coords = {"root": {"coord": skeleton["root"]["offset"], "parent": None}}
    skeleton["root"]

    child_joints = skeleton["root"]["children"]
    for child in child_joints:
        get_joint_coords(skeleton, child, coords)

    get_joint_coords(skeleton, "root", coords)

    return coords


def rotated_offset(skeleton, frame):
    skeleton_copy = copy.deepcopy(skeleton)
    for joint, data in skeleton_copy.items():
        if joint == "root" or joint.startswith("_"):
            continue

        x_rotate = np.deg2rad(frame[f"{joint}_Xrotation"])
        y_rotate = np.deg2rad(frame[f"{joint}_Yrotation"])
        z_rotate = np.deg2rad(frame[f"{joint}_Zrotation"])

        rot = Rotation.from_rotvec(np.array([y_rotate, x_rotate, z_rotate]))
        skeleton_copy[joint]["offset"] = rot.apply(data["offset"])

    return skeleton_copy

In [None]:
bvhp = BVHparser("./data/jump.bvh")
skeleton = bvhp.get_skeleton()
motion_df = bvhp.get_motion_df()
motion_frame = motion_df.iloc[800, :]

In [None]:
rotated_skeleton = rotated_offset(skeleton, motion_frame)
rotated_coord = skelton2coords(rotated_skeleton)

### ST-GCNするよ

https://colab.research.google.com/github/machine-perception-robotics-group/MPRGDeepLearningLectureNotebook/blob/master/15_gcn/03_action_recognition_ST_GCN.ipynb


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
from persor import BVHparser

In [None]:
print("Use CUDA:", torch.cuda.is_available())

In [None]:
# 再現性を担保

seed = 123

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms = True

In [None]:
# 隣接行列を作成
class Graph:
    def __init__(self, hop_size, bvhp):
        skeleton = bvhp.get_skeleton()
        node_num = len(skeleton)
        self.edge = self.__get_edge(bvhp)

        # hop数分離れた関節を取得
        hop_dis = self.__get_hop_distance(self.node_num, self.edge, hop_size)

        # 隣接行列を作成
        self.A = self.__get_adjacency_mat(hop_dis, hop_size)

    def __str__(self):
        return self.A

    def __get_edge(self, skeleton):
        edge = []
        for joint, data in skeleton.items():
            print(joint, data)
            parent_id = data["id"]
            children = data["children"]
            children_ids = [skeleton[child]["id"] for child in children]
            edge.extend([(parent_id, child_id) for child_id in children_ids])
            edge.extend([(child_id, parent_id) for child_id in children_ids])
            edge.append((parent_id, parent_id))

        return edge

    def __get_hop_distance(self, node_num, edge, hop_size):
        A = np.zeros((node_num, node_num))

        for i, j in edge:
            A[j, i] = 1
            A[i, j] = 1

        hop_dis = np.zeros((node_num, node_num)) + np.inf
        transfer_mat = [np.linalg.matrix_power(A, d) for d in range(hop_size + 1)]
        arrive_mat = np.stack(transfer_mat) > 0

        [hop_dis[arrive_mat[d]] for d in range(hop_size + 1)]

        return hop_dis

    def __get_adjacency_mat(self, hop_dis, hop_size):
        valid_hop = range(0, hop_size + 1, 1)
        adjacency = np.where(np.isin(hop_dis, valid_hop), 1, 0)

        normalize_adjacency = self.__normalize_digraph(adjacency)
        A = np.array(
            [np.where(hop_dis == hop, normalize_adjacency, 0) for hop in valid_hop]
        )

        return A

    def __normalize_digraph(self, A):
        Dl = np.sum(A, axis=0)
        node_num = A.shape[0]
        Dn = np.diag([Dl[i] ** (-1) if Dl[i] > 0 else 0 for i in range(node_num)])
        DAD = np.dot(A, Dn)

        return DAD

In [None]:
# 何者?
class STGC_block(nn.Module):
    def __init__(
        self, in_channels, out_channels, stride, t_kernel_size, A_size, dropout=0.5
    ):
        super().__init__()
        # 空間グラフの畳み込み
        self.sgc = SpatialGraphConvolution(
            in_channels=in_channels, out_channels=out_channels, s_kernel_size=A_size[0]
        )

        # Learnable weight matrix M エッジに重みを与えます. どのエッジが重要かを学習します.
        self.M = nn.Parameter(torch.ones(A_size))

        self.tgc = nn.Sequential(
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv2d(
                out_channels,
                out_channels,
                (t_kernel_size, 1),
                (stride, 1),
                ((t_kernel_size - 1) // 2, 0),
            ),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )

    def forward(self, x, A):
        x = self.tgc(self.sgc(x, A * self.M))
        return x

In [None]:
# モデルを定義
class ST_GCN(nn.Module):
    def __init__(self, num_classes, in_channels, t_kernel_size, hop_size, bvhp):
        super().__init__()
        # グラフ作成
        graph = Graph(hop_size, bvhp)
        A = torch.tensor(graph.A, dtype=torch.float32, requires_grad=False)
        self.register_buffer("A", A)
        A_size = A.size()

        # バッチ正規化
        self.bn = nn.BatchNorm1d(in_channels * A_size[1])

        # STGC_block
        self.stgc1 = STGC_block(in_channels, 32, 1, t_kernel_size, A_size)
        self.stgc2 = STGC_block(32, 32, 1, t_kernel_size, A_size)
        self.stgc3 = STGC_block(32, 32, 1, t_kernel_size, A_size)
        self.stgc4 = STGC_block(32, 64, 2, t_kernel_size, A_size)
        self.stgc5 = STGC_block(64, 64, 1, t_kernel_size, A_size)
        self.stgc6 = STGC_block(64, 64, 1, t_kernel_size, A_size)

        # 予測
        self.fc = nn.Conv2d(64, num_classes, kernel_size=1)

    def forward(self, x):
        # Batch Normalization
        N, C, T, V = x.size()  # batch, channel, frame, node
        x = x.permute(0, 3, 1, 2).contiguous().view(N, V * C, T)
        x = self.bn(x)
        x = x.view(N, V, C, T).permute(0, 2, 3, 1).contiguous()

        # STGC_blocks
        x = self.stgc1(x, self.A)
        x = self.stgc2(x, self.A)
        x = self.stgc3(x, self.A)
        x = self.stgc4(x, self.A)
        x = self.stgc5(x, self.A)
        x = self.stgc6(x, self.A)

        # Prediction
        x = F.avg_pool2d(x, x.size()[2:])
        x = x.view(N, -1, 1, 1)
        x = self.fc(x)
        x = x.view(x.size(0), -1)

        return x

In [None]:
# データセットの定義
class Feeder(torch.utils.data.Dataset):
    def __init__(self, bvh_path, label_path):
        super().__init__()
        self.motion_df = self.__load_bvh(bvh_path)
        self.label_df = self.__load_label(label_path)

    def __load_bvh(self, bvh_path):
        bvhp = BVHparser(bvh_path)
        motion_df = bvhp.get_motion_df()
        return motion_df

    def __load_label(self, label_path):
        label_df = pd.read_csv(label_path, header=None, names=["start", "end", "label"])
        return label_df

    def __len__(self):
        return len(self.label)

    def __getitem__(self, index):
        data = np.array(self.data[index])
        label = self.label[index]

        return data, label

In [None]:
# エポック数とバッチサイズ
NUM_EPOCH = 100
BATCH_SIZE = 64

bvhp = BVHparser("./_data/jump.bvh")

# モデルを作成
model = ST_GCN(
    num_classes=10,
    in_channels=3,
    t_kernel_size=9,  # 時間グラフ畳み込みのカーネルサイズ (t_kernel_size × 1)
    hop_size=2,
    bvhp=bvhp,
)

# オプティマイザ
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# 誤差関数
criterion = torch.nn.CrossEntropyLoss()

# データセットの用意
data_loader = dict()
data_loader["train"] = torch.utils.data.DataLoader(
    dataset=Feeder(data_path="data/train_data.npy", label_path="data/train_label.npy"),
    batch_size=BATCH_SIZE,
    shuffle=True,
)
data_loader["test"] = torch.utils.data.DataLoader(
    dataset=Feeder(data_path="data/test_data.npy", label_path="data/test_label.npy"),
    batch_size=BATCH_SIZE,
    shuffle=False,
)

# モデルを学習モードに変更
model.train()

# 学習開始
for epoch in range(1, NUM_EPOCH + 1):
    correct = 0
    sum_loss = 0
    for batch_idx, (data, label) in enumerate(data_loader["train"]):
        data = data.cuda()
        label = label.cuda()

        output = model(data)

        loss = criterion(output, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        sum_loss += loss.item()
        _, predict = torch.max(output.data, 1)
        correct += (predict == label).sum().item()

    print(
        "# Epoch: {} | Loss: {:.4f} | Accuracy: {:.4f}".format(
            epoch,
            sum_loss / len(data_loader["train"].dataset),
            (100.0 * correct / len(data_loader["train"].dataset)),
        )
    )