In [None]:
%%time

import cv2
import numpy as np
import mediapipe as mp
import csv
import time
import datetime
import os

# mediapipeの初期化
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

pose = mp_pose.Pose(static_image_mode = True, min_detection_confidence = 0.6, model_complexity = 2)

# 姿勢推定を行うファイルを指定
file_name = 'test_shiki_short'
file_extention = 'mp4'

# 姿勢推定を行うファイルを取得
cap = cv2.VideoCapture(f'test_data/{file_name}.{file_extention}')
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = cap.get(cv2.CAP_PROP_FPS)

# 解析結果の出力先を作成
dt_now = datetime.datetime.now()
now_time = dt_now.strftime('%y_%m_%d_%H_%M_%S')

output_dir = f'test_data/output/template/{file_name}_{now_time}'
if not os.path.isdir(output_dir):
    os.makedirs(output_dir)

output_csv_dir = f'{output_dir}/csv'
if not os.path.isdir(output_csv_dir):
    os.makedirs(output_csv_dir)

# writerの初期化
writer = cv2.VideoWriter(f'{output_dir}/output_{file_name}.mp4', cv2.VideoWriter_fourcc(*'mp4v',), fps, frameSize = (int(width), int(height)))

# ランドマークのラベル定義
landmark_labels = {
    'nose': 0,
    'left_eye_inner': 1,
    'left_eye': 2,
    'left_eye_outer': 3,
    'right_eye_inner': 4,
    'right_eye': 5,
    'right_eye_outer': 6,
    'left_ear': 7,
    'right_ear': 8,
    'mouth_left': 9,
    'mouth_right': 10,
    'left_shoulder': 11,
    'right_shoulder': 12,
    'left_elbow': 13,
    'right_elbow': 14,
    'left_wrist': 15,
    'right_wrist': 16,
    'left_pinky': 17,
    'right_pinky': 18,
    'left_index': 19,
    'right_index': 20,
    'left_thumb': 21,
    'right_thumb': 22,
    'left_hip': 23,
    'right_hip': 24,
    'left_knee': 25,
    'right_knee': 26,
    'left_ankle': 27,
    'right_ankle': 28,
    'left_heel': 29,
    'right_heel': 30,
    'left_foot_index': 31,
    'right_foot_index': 32
}

# 重要なランドマークのラベル番号
important_landmarks = [11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27 ,28]

# ランドマークの名前を取得
important_landmark_names = [name for name, idx in landmark_labels.items() if idx in important_landmarks]

# ランドマークごとに配列を初期化
landmarks_data = {label: [] for label in landmark_labels.values()}

# ランドマークの総合評価用の配列を初期化
# 出力先
frame_visibility_csv = f'{output_csv_dir}/frame_important_visibility.csv'
header = ['Frame'] + important_landmark_names + ['Visibility over 0.8 count', 'Visibility over 0.8 ratio']
frame_visibility_data = []
visibility_total = 0

all_visibility_csv = f'{output_csv_dir}/frame_all_visibility.csv'
all_header = ['Frame'] + list(landmark_labels.keys()) + ['Visibility over 0.8 count', 'Visibility over 0.8 ratio']
all_frame_visibility_data = []
all_visibility_total = 0

# ランドマークごとに配列を初期化
landmarks_visibility_data = {label: [] for label in landmark_labels.keys()}

# 処理時間の合計を保持する変数を初期化
total_processing_time = 0
total_pose_detection_time = 0
total_pose_drawing_time = 0

# 画像の出力先フォルダを初期化
# オリジナルデータ
output_original_dir = f'{output_dir}/original'
if not os.path.isdir(output_original_dir):
    os.makedirs(output_original_dir)

# 解析データ
# 成功データ
output_success_dir = f'{output_dir}/processed_image/success'
if not os.path.isdir(output_success_dir):
    os.makedirs(output_success_dir)

# 失敗データ
output_failure_dir = f'{output_dir}/processed_image/failure'
if not os.path.isdir(output_failure_dir):
    os.makedirs(output_failure_dir)

# 姿勢推定処理開始
if cap.isOpened():
    print(f'Estimated detection time: {count * 0.5}s')
    num = 1
    success_count = 0
    min_landmark_detection_rate = 1.0
    while cap.isOpened():
        if num > count:
            break

        start_time = time.time()

        ret, frame = cap.read()

        if ret:
            # 元画像の保存
            cv2.imwrite(f'{output_original_dir}/{file_name}_{num}.jpg', frame.copy())
            output_img = cv2.cvtColor(frame.copy(), cv2.COLOR_BGR2RGB)

            # 姿勢推定処理
            pose_start = time.time()
            pose_results = pose.process(output_img)
            pose_end = time.time()
            total_pose_detection_time += (pose_end - pose_start)

            # フレームNo.の印字
            position = (int(width - 150), int(height - 10))
            cv2.putText(output_img, f'Frame: {num}', position, cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # 姿勢推定の描画
            drawing_start = time.time()
            mp_drawing.draw_landmarks(output_img, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS,)
            drawing_end = time.time()
            total_pose_drawing_time += (drawing_end - drawing_start)

            visibility_data = [num]
            visibility_count = 0
            all_visibility_data = [num]
            all_visibility_count = 0

            # 合否判定
            if pose_results.pose_world_landmarks:
                # 推定成功画像の保存
                img_path = f'{output_success_dir}/{file_name}_{num}.jpg'
                cv2.imwrite(img_path, cv2.cvtColor(output_img, cv2.COLOR_RGB2BGR))

                # 推定結果の保存
                for lm_idx, lm in enumerate(pose_results.pose_world_landmarks.landmark):
                    landmarks_data[lm_idx].append([num, (lm.x + 1) * width, (lm.y + 1) * height, lm.z, lm.visibility])
                    all_visibility_data.append(lm.visibility)
                    landmarks_visibility_data[list(landmark_labels.keys())[lm_idx]].append(lm.visibility)
                    if lm.visibility > 0.8:
                        all_visibility_count += 1
                    if lm_idx in important_landmarks:
                        visibility_data.append(lm.visibility)
                        if lm.visibility > 0.8:
                            visibility_count += 1

                #print(f'frame: {num}/{int(count)} success')

                success_count += 1

            else:
                # 推定失敗画像の保存
                img_path = f'{output_failure_dir}/{file_name}_{num}.jpg'
                cv2.imwrite(img_path, output_img)

                # 推定結果の保存
                for lm_idx in range(33):
                    landmarks_data[lm_idx].append([num, 'null', 'null', 'null', 'null'])
                    all_visibility_data.append(0.0)
                    if lm_idx in important_landmarks:
                        visibility_data.append(0.0)

                #print(f'frame: {num}/{int(count)} failure')

            visibility_over_count = f'{visibility_count} / {len(important_landmarks)}'
            visibility_data.append(visibility_over_count)
            visibility_over_ratio = visibility_count / len(important_landmarks)
            visibility_data.append(visibility_over_ratio)
            visibility_total += visibility_over_ratio

            all_visibility_over_count = f'{all_visibility_count} / {len(landmark_labels)}'
            all_visibility_data.append(visibility_over_count)
            all_visibility_over_ratio = all_visibility_count / len(landmark_labels)
            all_visibility_data.append(all_visibility_over_ratio)
            all_visibility_total += all_visibility_over_ratio

            frame_visibility_data.append(visibility_data)
            all_frame_visibility_data.append(all_visibility_data)

            # 解析画像を動画に記録
            writer.write(cv2.cvtColor(output_img, cv2.COLOR_RGB2BGR))

        end_time = time.time()
        total_processing_time += (end_time - start_time)
        num += 1

    cap.release()
    writer.release()

    print(f'Actual detection time: {total_processing_time}s')

    # 各平均時間の算出
    avg_processing_time = total_processing_time/ (num - 1)
    avg_pose_detection_time = total_pose_detection_time / (num - 1)
    avg_pose_drawing_time = total_pose_drawing_time / (num - 1)
    avg_visibility = visibility_total / (num - 1)
    avg_all_visibility = all_visibility_total / (num - 1)

    text_lines = [
        f'トータル処理時間： {total_processing_time}\n',
        f'Video Width： {width}\n',
        f'Video Height： {height}\n',
        f'Video Frame Count： {count}\n',
        f'Video FPS： {fps}\n',
        f'Video Time： {count/fps}\n\n'
        f'平均処理時間： {avg_processing_time}\n',
        f'平均姿勢推定処理時間： {avg_pose_detection_time}\n',
        f'平均姿勢推定描画時間： {avg_pose_drawing_time}\n',
        f'平均ランドマーク検出確率(All)： {avg_visibility}\n',
        f'平均ランドマーク検出確率(Important)： {avg_all_visibility}\n'
    ]

    # 解析結果の記録
    with open(f'{output_dir}/result.txt', mode = 'w', encoding = 'utf-8', newline = '\n') as f:
        f.writelines(text_lines)

    # 解析結果のCSVの作成
    for label_idx, lm_data in landmarks_data.items():
        lm_name = [name for name, idx in landmark_labels.items() if idx == label_idx][0]
        csv_file_path = f'{output_csv_dir}/landmark_{label_idx}_{lm_name}.csv'
        with open(csv_file_path, mode = 'w', newline = '') as file:
            csv_writer = csv.writer(file)
            csv_writer.writerow(['frame num', 'x', 'y', 'z', 'visibility'])
            csv_writer.writerows(lm_data)
        #print(f'Saved {csv_file_path}')

    avg_landmark_visibility = ['average']
    max_landmark_visibility = ['maximum']
    min_landmark_visibility = ['minimum']

    for i in len(landmark_labels):
        data = landmarks_visibility_data[list(landmark_labels.keys())[i]]
        avg_landmark_visibility.append(sum(data)/len(data))
        max_landmark_visibility.append(max(data))
        min_landmark_visibility.append(min(data))

    with open(frame_visibility_csv, mode = 'w', newline = '') as file:
        csv_writer = csv.writer(file)
        csv_writer.writerow(header)
        csv_writer.writerows(frame_visibility_data)
        csv_writer.writerow(avg_landmark_visibility)
        csv_writer.writerow(max_landmark_visibility)
        csv_writer.writerow(min_landmark_visibility)

    with open(all_visibility_csv, mode = 'w', newline = '') as file:
        csv_writer = csv.writer(file)
        csv_writer.writerow(all_header)
        csv_writer.writerows(all_frame_visibility_data)

else:
    print('Cannot Opened File')

print('Detection Completed')

Estimated detection time: 76.0s


W0000 00:00:1716178073.963938    1255 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1716178074.085730    1255 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
