In [1]:
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import cv2

mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS)      # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)       # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)  # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                               mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                               mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                              ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                               mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                               mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                              ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                               mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                               mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                              ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                               mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                               mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                              ) 
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

## ---------------------------------- DATASET PATH ----------------------------------

In [14]:
DATA_PATH = os.path.join('ipcam_dataset')

# Create directories
# First, check if the base directory exists. If not, create it.
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)
    print(f"Created directory: {DATA_PATH}")
else:
    print(f"{DATA_PATH} already existed!")

ipcam_dataset already existed!


## ---------------------------------------- ACTION ----------------------------------------

In [15]:
actions = np.array(['aoa'])

# Then, create a directory for each action
for action in actions:
    action_path = os.path.join(DATA_PATH, action)
    if not os.path.exists(action_path):
        os.makedirs(action_path)
        print(f"Created action:  {action_path}")
    else:
        print(f"Action {action} already existed!   --->   {action_path}")

# # Videos are going to be 30 frames in length
sequence_length = 30

start_sequence = 0
end_sequence = 3


for action in actions: 
    for sequence in range(start_sequence,end_sequence):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence),"landmarks"))
        except:
            pass

Created action:  ipcam_dataset\aoa


## --------------------------- EACH VIDEO HAVE 30 FRAMES -------------------------

In [30]:
import requests
from io import BytesIO


IP_WEBCAM_URL = "http://192.168.0.104:8080/shot.jpg"

def get_ip_webcam_frame():
    response = requests.get(IP_WEBCAM_URL)
    img_array = np.array(bytearray(response.content), dtype=np.uint8)
    frame = cv2.imdecode(img_array, -1)
    return frame

for action in actions:
    for sequence in range(start_sequence,end_sequence):
        video_name = f'{action}_sequence{sequence}.avi'
        video_path = os.path.join(DATA_PATH, action, str(sequence), video_name)
        out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'XVID'), 10.0, (640, 480))

        for frame_num in range(sequence_length):
            # ret, frame = cap.read()
            frame = get_ip_webcam_frame()
            

            out.write(frame)

            if frame_num == 0:
                cv2.putText(frame, 'STARTING COLLECTION', (120, 200), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                cv2.putText(frame, f'Collecting frames for {action} Video Number {sequence}', (15, 12), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                cv2.imshow('OpenCV Feed', frame)
                cv2.waitKey(2000)
            else:
                cv2.putText(frame, f'Collecting frames for {action} Video Number {sequence}', (15, 12), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                cv2.imshow('OpenCV Feed', frame)
                cv2.waitKey(100)

            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
        
        out.release()

cv2.destroyAllWindows()

## -------------- EXTRACT FRAMES AND LANDMARKS FROM ALL VIDEOS --------------

In [31]:
# Initialize holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

    # Iterate over actions
    for action in actions:
        action_path = os.path.join(DATA_PATH, action)
        for sequence in range(start_sequence, end_sequence ):
            sequence_path = os.path.join(action_path, str(sequence))
            video_name = f'{action}_sequence{sequence}.avi'
            video_path = os.path.join(sequence_path, video_name)
            
            # Check if video file exists
            if not os.path.exists(video_path):
                print(f"Video file not found: {video_path}")
                continue
            
            # Create landmarks and frames directories if they don't exist
            landmarks_path = os.path.join(sequence_path, 'landmarks')
            frames_path = os.path.join(sequence_path, 'frames')
            if not os.path.exists(landmarks_path):
                os.makedirs(landmarks_path)
            if not os.path.exists(frames_path):
                os.makedirs(frames_path)
            
            # Capture video
            cap = cv2.VideoCapture(video_path)
            frame_num = 0
            
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Process the frame and extract landmarks
                image, results = mediapipe_detection(frame, holistic)
                draw_styled_landmarks(image, results)
                keypoints = extract_keypoints(results)
                
                # Save the keypoints as .npy file
                npy_path = os.path.join(landmarks_path, f'{frame_num}.npy')
                np.save(npy_path, keypoints)
                
                # Save the frame with drawn landmarks
                frame_path = os.path.join(frames_path, f'{frame_num}.jpg')
                cv2.imwrite(frame_path, image)
                
                frame_num += 1
                
                if frame_num >= sequence_length:
                    break
            
            cap.release()

print("Processing completed.")

Processing completed.


## -------------- Check any random .npy for Debugging , for example: 10.npy --------------

In [None]:
import numpy as np
import cv2
import mediapipe as mp

# ------------------------------------

video_number = '1'
npy_file = '7.npy'

# ------------------------------------

# Path to a random .npy file (replace with the actual path to your .npy file)
npy_file_path = f'{DATA_PATH}/{actions[0]}/{video_number}/landmarks/{npy_file}'

# Initialize MediaPipe drawing
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

def draw_landmarks_on_black_image(keypoints, image_size=(640, 480)):
    # Create a black image
    black_image = np.zeros((image_size[1], image_size[0], 3), dtype=np.uint8)
    
    # Extract different landmarks from keypoints
    pose_landmarks = keypoints[:33*4].reshape(33, 4)
    face_landmarks = keypoints[33*4:33*4+468*3].reshape(468, 3)
    lh_landmarks = keypoints[33*4+468*3:33*4+468*3+21*3].reshape(21, 3)
    rh_landmarks = keypoints[33*4+468*3+21*3:].reshape(21, 3)
    
    # Draw pose landmarks
    for landmark in pose_landmarks:
        x, y, z, visibility = landmark
        if visibility > 0.5:  # Draw only visible landmarks
            cv2.circle(black_image, (int(x * image_size[0]), int(y * image_size[1])), 5, (255, 0, 0), -1)
    
    # Draw face landmarks
    for landmark in face_landmarks:
        x, y, z = landmark
        cv2.circle(black_image, (int(x * image_size[0]), int(y * image_size[1])), 1, (0, 255, 0), -1)
    
    # Draw left hand landmarks
    for landmark in lh_landmarks:
        x, y, z = landmark
        cv2.circle(black_image, (int(x * image_size[0]), int(y * image_size[1])), 5, (0, 0, 255), -1)
    
    # Draw right hand landmarks
    for landmark in rh_landmarks:
        x, y, z = landmark
        cv2.circle(black_image, (int(x * image_size[0]), int(y * image_size[1])), 5, (255, 255, 0), -1)
    
    return black_image

# Load the keypoints from the .npy file
keypoints = np.load(npy_file_path)

# Draw the landmarks on a black image
black_image_with_landmarks = draw_landmarks_on_black_image(keypoints)

# Display the image
cv2.imshow('Landmarks on Black Image', black_image_with_landmarks)
cv2.waitKey(0)
cv2.destroyAllWindows()