# 1. Import and Install Dependencies

In [22]:
# %pip install tensorflow==2.10.0 opencv-python mediapipe scikit-learn matplotlib

In [23]:
import cv2 # type: ignore
import numpy as np # type: ignore
import os
from matplotlib import pyplot as plt # type: ignore
import time
import mediapipe as mp # type: ignore

import importlib # for reloading .py files
# WARNING: updates of config may require restart of kernel if reload is unsuccessful

import config
# reload config without restarting the kernel
importlib.reload(config)
from config import actions, no_sequences, sequence_length, DATA_PATH

import utils
# reload utils without restarting the kernel
importlib.reload(utils)
from utils import mediapipe_detection, extract_keypoints, draw_styled_landmarks, prob_viz

# Keypoints using MP Holistic
from utils import mp_holistic, mp_drawing, set_camera_settings, default_fps

# Setup Folders for Collection

In [24]:
empty_signs = ["background", "NoHands"]
actions = [
"ILoveYou",
"Yes",
"No",
"Hi",
"ThankYou",
"Me",
"You",
"It",
"Feel",
"Happy",
"Hungry",
"Eat",
"Bread",
"Chocolate",
"Tired",
]
actions = empty_signs + actions

In [25]:
# create folders
AUGMENTED_KEYPOINTS_PATH = "augmentation_keypoints"  # Directory where augmented keypoints will be stored

for action in actions:
    for sequence in range(no_sequences):
        try:
            os.makedirs(os.path.join(AUGMENTED_KEYPOINTS_PATH, action, str(sequence)))
        except:
            pass

# 5. Collect Keypoint Values for Training and Testing

In [26]:
# import time

# frame_count = 0
# fps_list = []
# start_time = time.time()
# camera_index = 0

# cap = cv2.VideoCapture(camera_index)
# actual_res, actual_fps = set_camera_settings(cap) # setting camera resolution and fps + printing the actual values

# # Create a named window and resize it

# # Set mediapipe model
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

#     # Flag variable to signal breaking out of all loops
#     break_all_loops = False

#     # Loop through actions
#     for action in actions:
#         # Loop through sequences aka videos
#         for sequence in range(no_sequences):
#             # Loop through video length aka sequence length
#             for frame_num in range(sequence_length):

#                 # Check if flag variable is set to break all loops
#                 if break_all_loops:
#                     break

#                 # Read feed
#                 ret, frame = cap.read()

#                 # Make detections
#                 image, results = mediapipe_detection(frame, holistic)
#                 # print(results)

#                 # Draw landmarks
#                 draw_styled_landmarks(image, results)

#                 # NEW Apply wait logic
#                 # Display messages and countdown
#                 # Apply wait logic with countdown
#                 if frame_num == 0:
#                     for countdown in range(2, 0, -1):  # 3 seconds countdown
#                         cv2.putText(image, f'GET READY FOR "{action}" in {countdown}', (30,100),
#                                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0, 0), 4, cv2.LINE_AA)
#                         # cv2.putText(image, f'Collecting frames for {action} Video Number {sequence+1}', (15,150),
#                         #            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
#                         cv2.imshow('OpenCV Feed', image)
#                         cv2.waitKey(1000)  # wait 1000 ms between each number
#                         image = frame.copy()  # clear the previous text
#                 else:
#                     cv2.putText(image, f'RECORDING [{sequence + 1}/{no_sequences}] "{action}" - Frame {frame_num + 1}/{sequence_length}', (30,100),
#                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0, 255), 4, cv2.LINE_AA)
#                     remaining_frames = sequence_length - frame_num - 1
#                     cv2.imshow('OpenCV Feed', image)

#                 # NEW Export keypoints
#                 keypoints = extract_keypoints(results)
#                 npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
#                 print(npy_path)
#                 np.save(npy_path, keypoints)

#                 frame_count += 1

#                 if frame_count % 100 == 0:
#                     end_time = time.time()
#                     elapsed = end_time - start_time
#                     actual_fps = frame_count / elapsed
#                     fps_list.append(actual_fps)
#                     print("Actual FPS:", actual_fps)
#                     # Reset timer and counter
#                     start_time = time.time()
#                     frame_count = 0

#                 # Break gracefully - go to next frame
#                 if cv2.waitKey(10) & 0xFF == ord('t'):
#                     # Set flag variable to break out of all loops
#                     break

#                 # Break gracefully - kill all
#                 if cv2.waitKey(10) & 0xFF == ord('q'):
#                     # Set flag variable to break out of all loops
#                     break_all_loops = True
#                     break

#     cap.release()
#     cv2.destroyAllWindows()
#     cv2.waitKey(1)

In [28]:
import time
import cv2
import os
import numpy as np
from pathlib import Path

# Define paths and parameters
AUGMENTED_VIDEOS_PATH = "augmentation"  # Directory where augmented videos are stored
AUGMENTED_KEYPOINTS_PATH = "augmentation_keypoints"  # Directory where augmented keypoints will be stored

frame_count = 0
fps_list = []
start_time = time.time()

# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # Flag variable to signal breaking out of all loops
    break_all_loops = False

    # Get all video files from the augmented videos directory
    video_files = list(Path(AUGMENTED_VIDEOS_PATH).rglob("*.avi"))

    # Loop through each video file
    for video_file in video_files:
        print(f"Processing video: {video_file}")
        if break_all_loops:
            break

        cap = cv2.VideoCapture(str(video_file))

        if not cap.isOpened():
            print(f"Error: Could not open video {video_file}")
            continue

        action = video_file.parent.stem  # Extract action from the parent directory name
        sequence_number, augmentation = video_file.stem.split("_", 1)  # Extract sequence number and augmentation type

        # Loop through video length aka sequence length
        frame_num = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Check if flag variable is set to break all loops
            if break_all_loops:
                break

            # Make detections
            image, results = mediapipe_detection(frame, holistic)

            # Draw landmarks
            draw_styled_landmarks(image, results)

            # Export keypoints
            keypoints = extract_keypoints(results)
            npy_path = os.path.join(AUGMENTED_KEYPOINTS_PATH, action, sequence_number, augmentation, str(frame_num))
            # Get the directory name
            dir_name = os.path.dirname(npy_path)
            # Check if the directory exists
            if not os.path.exists(dir_name):
                os.makedirs(dir_name, exist_ok=True)
                # print(f"The directory {dir_name} already exists.")
            # else:
                # Create the directory
                # os.makedirs(dir_name, exist_ok=True)
                # print(f"The directory {dir_name} has been created.")

            np.save(npy_path, keypoints)

            frame_count += 1

            if frame_count % 100 == 0:
                end_time = time.time()
                elapsed = end_time - start_time
                actual_fps = frame_count / elapsed
                fps_list.append(actual_fps)
                # print("Actual FPS:", actual_fps)
                # Reset timer and counter
                start_time = time.time()
                frame_count = 0

            frame_num += 1

            # Break gracefully - go to next frame
            if cv2.waitKey(10) & 0xFF == ord('t'):
                break

            # Break gracefully - kill all
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break_all_loops = True
                break

        cap.release()
        cv2.destroyAllWindows()
        cv2.waitKey(1)

print("All videos have been processed.")


Processing video: augmentation\background\0_GaussianBlur.avi




Processing video: augmentation\background\0_HorizontalFlip.avi
Processing video: augmentation\background\0_RandomResize.avi
Processing video: augmentation\background\0_RandomRotate.avi
Actual FPS: 14.634389047375906
Processing video: augmentation\background\0_RandomTranslate.avi
Processing video: augmentation\background\10_GaussianBlur.avi
Processing video: augmentation\background\10_HorizontalFlip.avi
Actual FPS: 15.760732002503497
Processing video: augmentation\background\10_RandomResize.avi
Processing video: augmentation\background\10_RandomRotate.avi
Processing video: augmentation\background\10_RandomTranslate.avi
Actual FPS: 15.722191543138582
Processing video: augmentation\background\11_GaussianBlur.avi
Processing video: augmentation\background\11_HorizontalFlip.avi
Processing video: augmentation\background\11_RandomResize.avi
Processing video: augmentation\background\11_RandomRotate.avi
Actual FPS: 15.89592614364775
Processing video: augmentation\background\11_RandomTranslate.av

In [None]:
# fps_list

In [None]:
# fps_check = [5.445442143436572,
#  7.249745231581113,
#  6.290899129609047,
#  5.955219709672702,
#  6.8669062454946355,
#  8.658761533004986,
#  8.171947604739815]

In [None]:
# sum(fps_check) / len(fps_check)

In [None]:
cap.release()
cv2.destroyAllWindows()