In [None]:
import mediapipe as mp
from mediapipe import tasks
from mediapipe.tasks.python import vision

import cv2

from pathlib import Path
import numpy as np


In [None]:
model_path = 'models/pose_landmarker_full.task'

In [None]:
# Create a pose landmarker instance with the video mode:
options = vision.PoseLandmarkerOptions(
    base_options=tasks.BaseOptions(model_asset_path=model_path),
    running_mode=vision.RunningMode.VIDEO)

def video2landmarks(path: str):
    """動画からポーズランドマークを取得する

    Args:
        path (str): 動画ファイルのパス

    Yields:
        vision.PoseLandmarkerResult: ポーズランドマーク
    """

    with vision.PoseLandmarker.create_from_options(options) as landmarker:
        cap = cv2.VideoCapture(path)
        if not cap.isOpened():
            raise

        while True:
            ret, cv2_image = cap.read()
            if not ret:
                break

            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=cv2_image)
            timestamp = int(cap.get(cv2.CAP_PROP_POS_MSEC))
            pose_landmarker_result = landmarker.detect_for_video(mp_image, timestamp)

            yield pose_landmarker_result

        cap.release()

## 調査

In [None]:
pose_path = Path('videos/pose')
videos = list(pose_path.iterdir())
videos

In [None]:
videos[0].stem

In [None]:
str(videos[0])

In [None]:
p = str(videos[0])
resg = video2landmarks(p)
res = list(resg)
res[:4]

In [None]:
res0 = res[0]
res0

In [None]:
pls = res0.pose_landmarks[0]
pls[:4]

In [None]:
pl = pls[0]
pl

In [None]:
(pl.x, pl.y, pl.z, pl.presence, pl.visibility)

In [None]:
pwls = res0.pose_world_landmarks[0]
pwls[:4]

In [None]:
pwl = pwls[0]
pwl

In [None]:
(pwl.x, pwl.y, pwl.z, pwl.presence, pwl.visibility)

In [None]:
type(pl)

In [None]:
pl?

In [None]:
res0?

In [None]:
def result2np(result: vision.PoseLandmarkerResult):
    """ランドマークをnumpyデータに変換する

    Args:
        result (vision.PoseLandmarkerResult): ランドマーク

    Returns:
        NDArray: numpyデータ
    """

    landmarks = result.pose_landmarks[0]
    coords = [(l.x, l.y, l.z) for l in landmarks]
    return np.array(coords)

def valid_result(result: vision.PoseLandmarkerResult):
    """ランドマークがnumpyデータに変換できるかどうかを検証する

    Args:
        result (vision.PoseLandmarkerResult): ランドマーク

    Returns:
        boolean: 変換可能であればTrue、そうでなければFalse
    """
    return 0 < len(result.pose_landmarks)

In [None]:
npa0 = result2np(res0)
npa0.shape

## 学習

In [None]:
results_dict = {v.stem: list(video2landmarks(str(v))) for v in videos}

In [None]:
min_results = min(results_dict.values(), key=lambda v: len(v))
len(min_results)

In [None]:
import random
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras

In [None]:
sample_size = 500
samples = [(label, random.choices([r for r in results if valid_result(r)], k=sample_size)) for label, results in result_dict.items()]
samples.sort(key=lambda label, : label)

In [None]:
labels = [label for label, _ in samples]
labels

In [None]:
dataset = [(result2np(result), i) for i, (_, results)in enumerate(samples) for result in results]
type(dataset)

In [None]:
random.shuffle(dataset)

In [None]:
x = np.array([data for data, _ in dataset])
y = np.array([label for _, label in dataset])

In [None]:
train_x, val_x, train_y, val_y = train_test_split(x, y, train_size=0.8)
input_shape = (train_x.shape[1], train_x.shape[2])
label_len = len(labels)

In [None]:
keras_model = keras.Sequential([
    keras.layers.InputLayer(input_shape, name='input'),
    keras.layers.Flatten(name='Flatten'),
    keras.layers.Dense(64, activation='relu', name='Dense'),
    keras.layers.Dropout(0.1, name='Dropout'),
    keras.layers.Dense(label_len, activation='softmax', name='Output'),
])

In [None]:
keras_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

history = keras_model.fit(train_x, train_y, epochs=20, validation_data=(val_x, val_y))
history

In [None]:
test_sample_size = 100
test_samples = [(label, random.choices([r for r in results if valid_result(r)], k=test_sample_size)) for label, results in result_dict.items()]

test_dataset = [(result2np(result), i) for i, (_, results)in enumerate(test_samples) for result in results]

test_x = np.array([data for data, _ in test_dataset])
test_y = np.array([label for _, label in test_dataset])

keras_model.evaluate(test_x, test_y)