In [3]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
from ultralytics import YOLO
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
mp_pose = mp.solutions.pose

In [4]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [5]:
def mediapipe_detection(image, model):
    # mp_image = mp.Image.create_from_file('/path/to/image')
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)# Image is no longer writeable
    results = model.detect(mp_image)                 # Make prediction      # Image is now writeable
    return results

In [6]:
from mediapipe.framework.formats import landmark_pb2
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [7]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

model_file = open('pose_landmarker_lite.task', "rb")
model_data = model_file.read()
model_file.close()
    
base_options = python.BaseOptions(model_asset_buffer=model_data)
options = PoseLandmarkerOptions(
    base_options=base_options,
    running_mode=VisionRunningMode.IMAGE,
    num_poses =10)

In [None]:
# Specify the height and width to which each video frame will be resized in our dataset.
IMAGE_HEIGHT , IMAGE_WIDTH = 256, 256

# Specify the number of frames of a video that will be fed to the model as one sequence.
SEQUENCE_LENGTH = 20

# Specify the directory containing the UCF50 dataset. 

# Specify the list containing the names of the classes used for training. Feel free to choose any set of classes.

In [8]:

def frames_extraction(video_path):
    '''
    This function will extract the required frames from a video after resizing and normalizing them.
    Args:
        video_path: The path of the video in the disk, whose frames are to be extracted.
    Returns:
        frames_list: A list containing the resized and normalized frames of the video.
    '''

    # Declare a list to store video frames.
    frames_list = []
    
    # Read the Video File using the VideoCapture object.
    video_reader = cv2.VideoCapture(video_path)

    # Get the total number of frames in the video.
    video_frames_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the the interval after which frames will be added to the list.
    skip_frames_window = max(int(video_frames_count/SEQUENCE_LENGTH), 1)

    # Iterate through the Video Frames.
    for frame_counter in range(SEQUENCE_LENGTH):

        # Set the current frame position of the video.
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, frame_counter * skip_frames_window)

        # Reading the frame from the video. 
        success, frame = video_reader.read() 

        # Check if Video frame is not successfully read then break the loop
        if not success:
            break

        # Resize the Frame to fixed height and width.
        resized_frame = cv2.resize(frame, (IMAGE_HEIGHT, IMAGE_WIDTH))

        # Append the normalized frame into the frames list
        frames_list.append(resized_frame)
    
    # Release the VideoCapture object. 
    video_reader.release()

    # Return the frames list.
    return frames_list

In [10]:
def extract_keypoints(pose_landmarks):
    pose =[]
    # if(len(results.pose_landmarks) == 2):
    #     print(pose_landmarks_list[0])
    #     print(pose_landmarks_list[1])
    pose = np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose_landmarks]).flatten() if pose_landmarks else np.zeros(33*4)
    # pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    # face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    # lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    # rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose])

In [11]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('train_dataset_train\\videos')

# Actions that we try to detect

actions = np.array(['cartwheel', 'catch', 'clap', 'climb', 'dive', 'draw_sword', 'dribble', 'fencing',
                    'flic_flac', 'golf', 'handstand', 'hit', 'jump',
                    'pick', 'pour', 'pullup', 'push', 'pushup', 'shoot_ball', 'sit', 
                    'situp', 'swing_baseball', 'sword_exercise', 'throw'])
# actions = np.array(['cartwheel', 'catch'])

In [12]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [53]:
model_2 = Sequential()
model_2.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(20,132)))
model_2.add(LSTM(128, return_sequences=True, activation='relu'))
model_2.add(LSTM(64, return_sequences=False, activation='relu'))
model_2.add(Dense(64, activation='relu'))
model_2.add(Dense(32, activation='relu'))
model_2.add(Dense(actions.shape[0], activation='softmax'))

model_2.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [54]:
model_2.load_weights('modern_model_32.h5')

In [58]:
#тестирование модели без ожидания кадров > 20 (если не работает выполняем то что ниже)


# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5

cap = cv2.VideoCapture("D:\\hakaton\Ставрополь\\train_dataset_train\\videos\\pullup\\Girl_does_12_pull-ups_pullup_u_cm_np1_ba_med_0.avi")
# Set mediapipe model
with PoseLandmarker.create_from_options(options) as landmarker:
    while cap.isOpened():
        # number = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        # print(number)
        ret, frame = cap.read()
        if not ret:
            break

        # Make detections
        results = mediapipe_detection(frame, landmarker)

        # Draw landmarks
        annotated_image = draw_landmarks_on_image(frame, results)

        # 2. Prediction logic
        
        for idx in range(len(results.pose_landmarks)):
            pose_landmarks = results.pose_landmarks[idx]
            keypoints = extract_keypoints(pose_landmarks)
            sequence.append(keypoints)
            sequence = sequence[-20:]
            # keypoints = np.expand_dims(keypoints, axis=0)
            # keypoints = np.expand_dims(keypoints, axis=0)
            res = model_2.predict(np.expand_dims(sequence, axis=0))
            
            h, w, c = frame.shape
            x_max = 0
            y_max = 0
            x_min = w
            y_min = h
            for lm in pose_landmarks:
                x, y = int(lm.x * w), int(lm.y * h)
                if x > x_max:
                    x_max = x
                if x < x_min:
                    x_min = x
                if y > y_max:
                    y_max = y
                if y < y_min:
                    y_min = y
            x_max = x_max + int(h/15)
            y_max = y_max + int(h/15)
            x_min = x_min - int(h/15)
            y_min = y_min - int(h/15)
            
            action = actions[np.argmax(res)]
                # predictions.append(np.argmax(res))
            cv2.rectangle(annotated_image, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
            cv2.putText(annotated_image, ' '.join(action), (x_min,y_min),
            cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)


        
        cv2.imshow('OpenCV Feed', annotated_image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()



In [56]:
#тестирование модели с ожиданием кадров > 20

# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
a= 0

# Set mediapipe model
with PoseLandmarker.create_from_options(options) as landmarker:
    cap = cv2.VideoCapture("D:\\hakaton\Ставрополь\\train_dataset_train\\videos\\pullup\\Girl_does_12_pull-ups_pullup_u_cm_np1_ba_med_0.avi")
    while cap.isOpened():
        number = cap.get(cv2.CAP_PROP_FRAME_COUNT)
        print(number)
        ret, frame = cap.read()
        if not ret:
            break

        # Make detections
        results = mediapipe_detection(frame, landmarker)

        # Draw landmarks
        annotated_image = draw_landmarks_on_image(frame, results)
        a = a+1
        print(a)

        # 2. Prediction logic
        
        for idx in range(len(results.pose_landmarks)):
            pose_landmarks = results.pose_landmarks[idx]
            keypoints = extract_keypoints(pose_landmarks)
            
            sequence.append(keypoints)
            # sequence = sequence[-20:]
            # keypoints = np.expand_dims(keypoints, axis=0)
            # keypoints = np.expand_dims(keypoints, axis=0)
            if(len(sequence) >20):
                sequence = sequence[-20:]
                res = model_2.predict(np.expand_dims(sequence, axis=0))
            
                h, w, c = frame.shape
                x_max = 0
                y_max = 0
                x_min = w
                y_min = h
                for lm in pose_landmarks:
                    x, y = int(lm.x * w), int(lm.y * h)
                    if x > x_max:
                        x_max = x
                    if x < x_min:
                        x_min = x
                    if y > y_max:
                        y_max = y
                    if y < y_min:
                        y_min = y
                x_max = x_max + int(h/15)
                y_max = y_max + int(h/15)
                x_min = x_min - int(h/15)
                y_min = y_min - int(h/15)
            
                action = actions[np.argmax(res)]
                # predictions.append(np.argmax(res))
                cv2.rectangle(annotated_image, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
                cv2.putText(annotated_image, ' '.join(action), (x_min,y_min),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)


        
        cv2.imshow('OpenCV Feed', annotated_image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

84.0
1
84.0
2
84.0
3
84.0
4
84.0
5
84.0
6
84.0
7
84.0
8
84.0
9
84.0
10
84.0
11
84.0
12
84.0
13
84.0
14
84.0
15
84.0
16
84.0
17
84.0
18
84.0
19
84.0
20
84.0
21
84.0
22
84.0
23
84.0
24
84.0
25
84.0
26
84.0
27
84.0
28
84.0
29
84.0
30
84.0
31
84.0
32
84.0
33
84.0
34
84.0
35
84.0
36
84.0
37
84.0
38
84.0
39
84.0
40
84.0
41
84.0
42
84.0
43
84.0
44
84.0
45
84.0
46
84.0
47
84.0
48
84.0
49
84.0
50
84.0
51
84.0
52
84.0
53
84.0
54
84.0
55
84.0
56
84.0
57
84.0
58
84.0
59
84.0
60
84.0
61
84.0
62
84.0
63
84.0
64
84.0
65
84.0
66
84.0
67
84.0
68
84.0
69
84.0
70
84.0
71
84.0
72
84.0
73
84.0
74
84.0
75
84.0
76
84.0
77
84.0
78
84.0
79
84.0
80
84.0
81
84.0
82
84.0
83
84.0
