In [1]:
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np
import os
from datetime import timedelta



In [2]:
def format_timedelta(td):
    """Служебная функция для классного форматирования объектов timedelta (например, 00:00:20.05)
    исключая микросекунды и сохраняя миллисекунды"""
    result = str(td)
    try:
        result, ms = result.split(".")
    except ValueError:
        return result + ".00".replace(":", "-")
    ms = int(ms)
    ms = round(ms / 1e4)
    return f"{result}.{ms:02}".replace(":", "-")

In [3]:
def get_saving_frames_durations(cap, saving_fps):
    """Функция, которая возвращает список длительностей, в которые следует сохранять кадры."""
    s = []
    # получаем продолжительность клипа, разделив количество кадров на количество кадров в секунду
    clip_duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)
    # используйте np.arange () для выполнения шагов с плавающей запятой
    for i in np.arange(0, clip_duration, 1 / saving_fps):
        s.append(i)
    return s

In [4]:
def cut_video(inp_dir):
    SAVING_FRAMES_PER_SECOND = 10
    video = inp_dir
    filename = video
    filename += "-opencv"
    # создаем папку по названию видео файла
    if not os.path.isdir(filename):
        os.mkdir(filename)
    # читать видео файл
    cap = cv2.VideoCapture(video)
    # получить FPS видео
    fps = cap.get(cv2.CAP_PROP_FPS)
    # если SAVING_FRAMES_PER_SECOND выше видео FPS, то установите его на FPS (как максимум)
    saving_frames_per_second = min(fps, SAVING_FRAMES_PER_SECOND)
    # получить список длительностей для сохранения
    saving_frames_durations = get_saving_frames_durations(cap, saving_frames_per_second)
    # запускаем цикл
    count = 0
    while True:
        is_read, frame = cap.read()
        if not is_read:
            # выйти из цикла, если нет фреймов для чтения
            break
        # получаем продолжительность, разделив количество кадров на FPS
        frame_duration = count / fps
        try:
            # получить самую раннюю продолжительность для сохранения
            closest_duration = saving_frames_durations[0]
        except IndexError:
            # список пуст, все кадры длительности сохранены
            break
        if frame_duration >= closest_duration:
            # если ближайшая длительность меньше или равна длительности кадра,
            # затем сохраняем фрейм
            frame_duration_formatted = format_timedelta(timedelta(seconds=frame_duration))
            cv2.imwrite(os.path.join(filename, f"frame{frame_duration_formatted}.jpg"), frame)
            # удалить точку продолжительности из списка, так как эта точка длительности уже сохранена
            try:
                saving_frames_durations.pop(0)
            except IndexError:
                pass
        # увеличить количество кадров
        count += 1
    # print(f"Итого сохранено кадров {save_count}")
    return filename

In [15]:
def create_test_df(input_dir):
    mp_holistic = mp.solutions.holistic
    mp_drawing = mp.solutions.drawing_utils

    face_list = []
    left_hand_list = []
    right_hand_list = []
    pose_list = []

    iter_f = 0
    for filename in os.listdir(input_dir):
        fil_n = input_dir + '/' + filename

        print("Photo processed: ", iter_f)
        image = cv2.imread(fil_n)
        # cv2.imshow("ff",image)

        ## Setup mediapipe instance
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

            # Recolor image to RGB
            image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Make detection
            results = holistic.process(image)

            # Recolor back to BGR
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # 1. Draw face landmarks
            mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                                      mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
                                      mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1)
                                      )

            # 2. Right hand
            mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=(80, 22, 10), thickness=2, circle_radius=4),
                                      mp_drawing.DrawingSpec(color=(80, 44, 121), thickness=2, circle_radius=2)
                                      )

            # 3. Left Hand
            mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=4),
                                      mp_drawing.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=2)
                                      )

            # 2. Draw pose landmarks
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
                                      mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
                                      )

            # cv2.imshow('Mediapipe Feed', image)
            # cv2.waitKey()

        # 1. Face Dataframe
        # len_face = len(results.face_landmarks.landmark)
        len_face = 468
        # print(len_face)
        for itr in range(len_face):
            if results.face_landmarks:
                lst = [iter_f, 'face', itr, results.face_landmarks.landmark[itr].x,
                       results.face_landmarks.landmark[itr].y, results.face_landmarks.landmark[itr].z]
            else:
                lst = [iter_f, 'face', itr, None,None,None]
            face_list.append(lst)

        # 2. Left_hand Dataframe
        # len_lft = len(results.left_hand_landmarks.landmark)
        len_lft = 21
        # print(len_lft)
        for itr in range(len_lft):
            if results.left_hand_landmarks:
                lst = [iter_f, 'left_hand', itr, results.left_hand_landmarks.landmark[itr].x,
                       results.left_hand_landmarks.landmark[itr].y, results.left_hand_landmarks.landmark[itr].z]
            else:
                lst = [iter_f, 'left_hand', itr, None,None,None]
            left_hand_list.append(lst)

        # 3. Right_hand Dataframe
        # len_rgt = len(results.right_hand_landmarks.landmark)
        len_rgt = 21
        # print(len_rgt)
        for itr in range(len_rgt):
            if results.right_hand_landmarks:
                lst = [iter_f, 'right_hand', itr, results.right_hand_landmarks.landmark[itr].x,
                       results.right_hand_landmarks.landmark[itr].y, results.right_hand_landmarks.landmark[itr].z]
            else:
                lst = [iter_f, 'right_hand', itr, None,None,None]
            right_hand_list.append(lst)


        # 2. Pose Dataframe
        # len_pose = len(results.pose_landmarks.landmark)
        len_pose = 33
        # print(len_pose)
        for itr in range(len_pose):
            if results.pose_landmarks:
                lst = [iter_f, 'pose', itr, results.pose_landmarks.landmark[itr].x,
                       results.pose_landmarks.landmark[itr].y, results.pose_landmarks.landmark[itr].z]
            else:
                lst = [iter_f, 'pose', itr, None,None,None]
            pose_list.append(lst)

        iter_f += 1
    df_face = pd.DataFrame(face_list, columns=['frame', 'type', 'landmark_index', 'x', 'y', 'z'])
    df_left_hand = pd.DataFrame(left_hand_list, columns=['frame', 'type', 'landmark_index', 'x', 'y', 'z'])
    df_right_hand = pd.DataFrame(right_hand_list, columns=['frame', 'type', 'landmark_index', 'x', 'y', 'z'])
    df_pose = pd.DataFrame(pose_list, columns=['frame', 'type', 'landmark_index', 'x', 'y', 'z'])
    df = pd.concat([df_face, df_left_hand,df_right_hand,df_pose], ignore_index=True)
    return df
    # df.to_csv(out_dir)

In [16]:
def create_video():
    cap = cv2.VideoCapture(0)
    cap.set(3, 640)
    cap.set(4, 480)

    # out = 'output.mp4'
    # fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    out = cv2.VideoWriter('output.mp4', 0x7634706d, 30.0, (640, 480))

    while True:
        ret, frame = cap.read()
        out.write(frame)
        cv2.imshow('frame', frame)
        c = cv2.waitKey(1)
        if c & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    return out

In [19]:
def get_dataframe():
    create_video()
    video = 'output.mp4'
    # print(video)
    inp_dir = cut_video(video)
    # out_df_dir = out

    df = create_test_df(inp_dir)
    return df

In [20]:
print(get_dataframe())

Photo processed:  0
Photo processed:  1
Photo processed:  2
Photo processed:  3
Photo processed:  4
Photo processed:  5
Photo processed:  6
Photo processed:  7
Photo processed:  8
Photo processed:  9
Photo processed:  10
Photo processed:  11
Photo processed:  12
Photo processed:  13
Photo processed:  14
Photo processed:  15
Photo processed:  16
Photo processed:  17
Photo processed:  18
Photo processed:  19
Photo processed:  20
Photo processed:  21
Photo processed:  22
Photo processed:  23
Photo processed:  24
Photo processed:  25
Photo processed:  26
Photo processed:  27
Photo processed:  28
Photo processed:  29
Photo processed:  30
Photo processed:  31
Photo processed:  32
Photo processed:  33
Photo processed:  34
Photo processed:  35
Photo processed:  36
Photo processed:  37
Photo processed:  38
Photo processed:  39
Photo processed:  40
Photo processed:  41
Photo processed:  42
Photo processed:  43
Photo processed:  44
Photo processed:  45
Photo processed:  46
Photo processed:  47
Ph