## Data Extract Part

In [68]:
import json
import os

import cv2
from mediapipe_impl.pose_estimation import PoseEstimationModule as pm

# cap = cv2.VideoCapture(0)
detector = pm.PoseDetector()

images_dir = "../datasets/img"
images = [os.path.join(images_dir, img) for img in os.listdir(images_dir) if img.endswith(('.png', '.jpg', '.jpeg'))]

I0000 00:00:1731759380.854248 5614427 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 83.1), renderer: Apple M1


In [69]:
for image_path in images:
    print(image_path)
    # success, img = cap.read()
    # img = cv2.resize(img, (1280, 720))
    img = cv2.imread(image_path)
    img = detector.find_pose(img, draw=False)
    lm_list = detector.find_position(img, False)
    cv2.imshow('img', img)
    cv2.waitKey(1)  # 等待1毫秒刷新窗口

    # label = input(f"Please enter the label (0/1): ")
    data = {
        "filename": image_path.split('/')[-1],
        "features": lm_list,
    }

    # 读取现有JSON文件（如果存在），将新数据追加到列表中
    json_file_path = "../data.json"
    try:
        with open(json_file_path, "r") as file:
            data_list = json.load(file)
    except FileNotFoundError:
        data_list = []

    # 追加新数据
    data_list.append(data)

    # 将更新后的列表写回JSON文件
    with open(json_file_path, "w") as file:
        json.dump(data_list, file, indent=4)

    # 关闭窗口并退出循环
    cv2.destroyAllWindows()
cv2.destroyAllWindows()


W0000 00:00:1731759380.921806 5676431 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1731759380.938731 5676431 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


../datasets/img/img_10.jpg
../datasets/img/img_38.jpg
../datasets/img/img_1.jpg
../datasets/img/img_39.jpg
../datasets/img/img_11.jpg
../datasets/img/img_13.jpg
../datasets/img/img_100.jpg
../datasets/img/img_2.jpg
../datasets/img/img_3.jpg
../datasets/img/img_12.jpg
../datasets/img/img_16.jpg
../datasets/img/img_7.jpg
../datasets/img/img_6.jpg
../datasets/img/img_17.jpg
../datasets/img/img_29.jpg
../datasets/img/img_15.jpg
../datasets/img/img_4.jpg
../datasets/img/img_5.jpg
../datasets/img/img_14.jpg
../datasets/img/img_28.jpg
../datasets/img/img_98.jpg
../datasets/img/img_73.jpg
../datasets/img/img_67.jpg
../datasets/img/img_66.jpg
../datasets/img/img_72.jpg
../datasets/img/img_99.jpg
../datasets/img/img_64.jpg
../datasets/img/img_70.jpg
../datasets/img/img_58.jpg
../datasets/img/img_59.jpg
../datasets/img/img_71.jpg
../datasets/img/img_65.jpg
../datasets/img/img_49.jpg
../datasets/img/img_61.jpg
../datasets/img/img_75.jpg
../datasets/img/img_74.jpg
../datasets/img/img_60.jpg
../data

## Data Label Part

## Combine Data Part

In [70]:
def jsonl_to_json(jsonl_file_path, output_json_file_path):
    """
    将 JSONL 文件中的数据转换为标准的 JSON 格式文件。

    :param jsonl_file_path: JSONL 文件路径
    :param output_json_file_path: 输出的 JSON 文件路径
    """
    merged_data = []

    # 打开 JSONL 文件并逐行处理
    with open(jsonl_file_path, 'r') as jsonl_file:
        for line in jsonl_file:
            item = json.loads(line)  # 解析每一行的 JSON 对象
            merged_data.append(item)

    # 将合并后的数据写入输出 JSON 文件
    with open(output_json_file_path, 'w') as json_file:
        json.dump(merged_data, json_file, indent=4)

    print(f"Converted JSONL data saved to {output_json_file_path}")


In [71]:
jsonl_to_json("../all.jsonl", "../all.json")

Converted JSONL data saved to ../all.json


In [72]:
def merge_features_by_filename(file1_path, file2_path, output_path):
    # 读取两个文件的内容
    with open(file1_path, 'r') as f1:
        list1 = json.load(f1)

    with open(file2_path, 'r') as f2:
        list2 = json.load(f2)

    # 创建一个以 filename 为键的字典，便于快速查找
    features_dict = {item["filename"]: item["features"] for item in list1}

    # 遍历 list2，将 features 合并
    for item in list2:
        filename = item["filename"]
        if filename in features_dict:
            item["features"] = features_dict[filename]

    # 将合并后的数据写入输出文件
    with open(output_path, 'w') as out_file:
        json.dump(list2, out_file, indent=4)

    print(f"Merged data saved to {output_path}")


In [73]:
merge_features_by_filename("../data.json", "../all.json", "../merged_data.json")

Merged data saved to ../merged_data.json




## Model Train Part

In [74]:
import json
import numpy as np

In [75]:
def load_data(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)

    inputs = []
    labels = []
    for sample in data:
        if sample["label"]:
            features = sample["features"]
            # Flatten each (x, y, z, visibility) into a single array
            flattened_features = []
            for keypoint in features:
                flattened_features.extend([keypoint["x"], keypoint["y"], keypoint["z"], keypoint["visibility"]])

            inputs.append(flattened_features)

            # Multi-label processing: Convert label list to a binary vector
            label_list = sample["label"]
            labels.append(label_list)

    return np.array(inputs), labels

In [79]:
# Load data
print("Load Data")
X, y = load_data('../merged_data.json')
print(X)
print(y)
print(f"Initial X shape: {X.shape}")
ALL_LABELS = []
for label in y:
    if label and label not in ALL_LABELS:
        ALL_LABELS.append(label)
print(f"ALL LABELS: {ALL_LABELS}")
# N categories
N = len(ALL_LABELS)
print(f"N: {N}")
KEY_POINTS = 33
FEATURES = 4
# Reshape data to fit Conv1D input: (samples, steps, features)
X = X.reshape((X.shape[0], KEY_POINTS, 4))  # 33 keypoints with 4 features (x, y, z, visibility)
# Checking the shape of the reshaped data
print(X.shape)

Load Data
[[ 5.47724485e-01  3.36077452e-01 -1.35032237e+00 ...  2.18307185e+00
   7.85210133e-02  1.20109795e-02]
 [ 5.50327599e-01  3.45828354e-01 -1.20299661e+00 ...  2.22794843e+00
  -5.52580468e-02  1.42122142e-03]
 [ 5.49107134e-01  3.41806620e-01 -1.15832043e+00 ...  2.21742845e+00
  -2.66200844e-02  2.13399483e-03]
 ...
 [ 5.49295306e-01  3.41853082e-01 -1.22420025e+00 ...  2.20655656e+00
   1.32704213e-01  4.39481530e-03]
 [ 5.49370110e-01  3.40936750e-01 -1.21732807e+00 ...  2.20889902e+00
   1.12884142e-01  3.36590293e-03]
 [ 5.49224079e-01  3.43372256e-01 -1.17433846e+00 ...  2.21863818e+00
  -4.61158492e-02  1.99269177e-03]]
[['Blur'], ['Normal'], ['Blur'], ['Normal'], ['Blur'], ['Blur'], ['Wrong'], ['Blur'], ['Blur'], ['Wrong'], ['Blur'], ['Wrong'], ['Blur'], ['Wrong'], ['Normal'], ['Normal'], ['Normal'], ['Normal'], ['Normal'], ['Normal']]
Initial X shape: (20, 132)
ALL LABELS: [['Blur'], ['Normal'], ['Wrong']]
N: 3
(20, 33, 4)


In [80]:
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# Initialize a LabelEncoder to convert strings to integers
label_encoder = LabelEncoder()
# Fit and transform the labels to integers
y_int = label_encoder.fit_transform(y)
# Now apply to_categorical for one-hot encoding
y_onehot = to_categorical(y_int, num_classes=len(label_encoder.classes_))
print(f"One-hot encoded labels shape: {y_onehot.shape}")
print(y_onehot)


One-hot encoded labels shape: (20, 3)
[[1. 0. 0.]
 [0. 1. 0.]
 [1. 0. 0.]
 [0. 1. 0.]
 [1. 0. 0.]
 [1. 0. 0.]
 [0. 0. 1.]
 [1. 0. 0.]
 [1. 0. 0.]
 [0. 0. 1.]
 [1. 0. 0.]
 [0. 0. 1.]
 [1. 0. 0.]
 [0. 0. 1.]
 [0. 1. 0.]
 [0. 1. 0.]
 [0. 1. 0.]
 [0. 1. 0.]
 [0. 1. 0.]
 [0. 1. 0.]]


  y = column_or_1d(y, warn=True)


In [81]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

# Define the model
model = Sequential()
# Add Conv1D layer
model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(KEY_POINTS, FEATURES)))
# Add MaxPooling1D layer
model.add(MaxPooling1D(pool_size=2))
# Add another Conv1D layer
model.add(Conv1D(filters=128, kernel_size=3, activation='relu'))
# Add another MaxPooling1D layer
model.add(MaxPooling1D(pool_size=2))
# Flatten the output from Conv1D layers
model.add(Flatten())
# Add Dense layer with dropout for regularization
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
# Output layer with softmax activation (for classification)
model.add(Dense(N, activation='softmax'))  # N is the number of classes

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [82]:
# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
# Summary of the model
model.summary()

In [83]:
# Train the Model
history = model.fit(X, y_onehot, epochs=10, batch_size=32, validation_split=0.2)
# Evaluate the Model
loss, accuracy = model.evaluate(X, y_onehot)
print(f'Accuracy: {accuracy * 100:.2f}%')

Epoch 1/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 600ms/step - accuracy: 0.1875 - loss: 1.1439 - val_accuracy: 0.0000e+00 - val_loss: 1.0762
Epoch 2/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - accuracy: 0.5000 - loss: 1.0460 - val_accuracy: 0.0000e+00 - val_loss: 1.1886
Epoch 3/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step - accuracy: 0.5000 - loss: 1.0957 - val_accuracy: 0.0000e+00 - val_loss: 1.3054
Epoch 4/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - accuracy: 0.5625 - loss: 1.0425 - val_accuracy: 0.0000e+00 - val_loss: 1.4066
Epoch 5/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - accuracy: 0.5000 - loss: 1.0257 - val_accuracy: 0.0000e+00 - val_loss: 1.4844
Epoch 6/10
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - accuracy: 0.5000 - loss: 1.0455 - val_accuracy: 0.0000e+00 - val_loss: 1.5463
Epoch 7/10
[1m1/1[0

In [84]:
import cv2
from mediapipe_impl.pose_estimation import PoseEstimationModule as pm

detector = pm.PoseDetector()


def extract_keypoints(image_path):
    cap = cv2.VideoCapture(image_path)
    success, img = cap.read()
    img = detector.find_pose(img=img)
    lm_list = detector.find_position(img, draw=False)
    return lm_list

I0000 00:00:1731759484.003967 5614427 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 83.1), renderer: Apple M1


In [85]:
def predict_image(image_path):
    data = []
    keypoints = extract_keypoints(image_path)
    for keypoint in keypoints:
        data.extend([keypoint["x"], keypoint["y"], keypoint["z"], keypoint["visibility"]])
    X = np.array(data).reshape((1, 33, 4))
    print(X.shape)
    # 模型预测
    predictions = model.predict(X)

    # 获取预测类别的索引
    predicted_class = np.argmax(predictions, axis=1)[0]

    print(f"Predicted Class: {predicted_class}")
    print(ALL_LABELS[predicted_class])


# 示例：预测一张新图像
image_path = '../datasets/img.jpg'
result = predict_image(image_path)

(1, 33, 4)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
Predicted Class: 0
['Blur']
