# Draw Skeletons from Videos and Check the Results

In [1]:
# Extract the zip file
import zipfile
import os

def extract_zip(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Extracted {zip_path} to {extract_to}")

# Paths
zip_path = '/home/samer/Desktop/Projects/mmaction2/data/Data 7_3.zip'  # Update with the actual path to your ZIP file
extract_to = '/home/samer/Desktop/Projects/mmaction2/data'  # Update with the desired extraction path

# Extract the ZIP file
extract_zip(zip_path, extract_to)

Extracted /home/samer/Desktop/Projects/mmaction2/data/Data 7_3.zip to /home/samer/Desktop/Projects/mmaction2/data


## Yolo Pose

In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
import os

# Initialize YOLOv8 Pose model
model = YOLO('yolov8m-pose.pt')

def draw_skeletons_yolo(frame, keypoints):
    for point in keypoints:
        x, y, conf = point
        if conf > 0.5:  # Confidence threshold
            cv2.circle(frame, (int(x), int(y)), 3, (0, 255, 0), -1)
    return frame

def visualize_raw_sample_yolo(action, sequence, input_dir, output_dir):
    sequence_dir = os.path.join(input_dir, action, sequence)
    video_path = os.path.join(sequence_dir, "sequence_video.avi")

    print(f"Checking for video at: {video_path}")  # Debugging print statement

    if not os.path.exists(video_path):
        print(f"Missing video for {sequence} in {action}")
        return

    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"{action}_{sequence}_data73_annotated.avi")
    video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'XVID'), fps, (frame_width, frame_height))

    frame_num = 0
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        # Perform pose detection
        results = model(frame)
        for result in results:
            if result.keypoints is not None:
                keypoints = result.keypoints.data[0].cpu().numpy()  # Move keypoints to CPU and convert to numpy
                frame = draw_skeletons_yolo(frame, keypoints)

        video_writer.write(frame)
        frame_num += 1

    cap.release()
    video_writer.release()
    print(f"Raw annotated video saved to {output_path}")

In [None]:
# Example usage
action = 'Browsing'
sequence = 'sequence_1'
input_dir = '/home/samer/Desktop/Projects/mmaction2/data/Data 7_3'
output_dir = '/home/samer/Desktop/Projects/mmaction2/Visualizations/RawVid_KP_Visualization_Data_7_3'
visualize_raw_sample_yolo(action, sequence, input_dir, output_dir)

# Extract skeletons from videos and save the as npy files

In [4]:
import cv2
import numpy as np
from ultralytics import YOLO
import os

def extract_skeletons_from_videos_yolo(actions, input_dir, output_dir, use_normalized=True):
    os.makedirs(output_dir, exist_ok=True)
    
    model = YOLO('yolov8m-pose.pt')
    
    for action in actions:
        action_dir = os.path.join(input_dir, action)
        sequences = sorted(os.listdir(action_dir), key=lambda x: int(x.split('_')[1]))
        
        for sequence in sequences:
            sequence_dir = os.path.join(action_dir, sequence)
            video_path = os.path.join(sequence_dir, "sequence_video.avi")
            
            if not os.path.exists(video_path):
                print(f"Missing video for {sequence} in {action}")
                continue
            
            skeleton_dir = os.path.join(output_dir, action, sequence)
            os.makedirs(skeleton_dir, exist_ok=True)
            
            cap = cv2.VideoCapture(video_path)
            frame_num = 0
            
            while cap.isOpened():
                success, frame = cap.read()
                if not success:
                    break
                
                results = model(frame, stream=True)
                for result in results:
                    if result.keypoints is not None:
                        keypoints = result.keypoints.xyn[0].cpu().numpy() if use_normalized else result.keypoints.xy[0].cpu().numpy()
                        scores = result.keypoints.conf[0].cpu().numpy()  # Get the keypoint scores

                        keypoint_with_scores = np.hstack((keypoints, scores[:, np.newaxis]))

                        # Save keypoints and scores as npy file
                        npy_path = os.path.join(skeleton_dir, f"{frame_num}.npy")
                        np.save(npy_path, keypoint_with_scores)
                
                frame_num += 1
            
            cap.release()
            print(f"Processed {sequence} in {action}")

    print("Extraction completed for all actions and sequences.")


In [None]:
actions = ['Browsing', 'Pick_up', 'Place_in_Basket', 'Compare', 'Read_Labels',
           'Talk_on_Phone', 'Check_List', 'Return_Items', 'Look_Around', 'Conceal_Item',
           'Leaving_Quickly', 'Distracting_Behavior']

input_dir = '/home/samer/Desktop/Projects/mmaction2/data/Data_7_3'
output_dir = '/home/samer/Desktop/Projects/mmaction2/data/Skeletons_YOLO_Data 7_3'

extract_skeletons_from_videos_yolo(actions, input_dir, output_dir, use_normalized=True)


# Check the extracted numpy skeletons

* This function checks if the collected npy sequences correctly represent the extracted keypoints from YOLO.
* Visualize the keypoints of a random video in the dataset to ensure that everything is okay.

In [None]:
import cv2
import numpy as np
import os

def visualize_extracted_skeletons_yolo(action, sequence, input_video_dir, skeletons_dir, output_dir):
    sequence_video_path = os.path.join(input_video_dir, action, sequence, "sequence_video.avi")
    skeleton_sequence_dir = os.path.join(skeletons_dir, action, sequence)

    if not os.path.exists(sequence_video_path):
        print(f"Missing video for {sequence} in {action}")
        return

    if not os.path.exists(skeleton_sequence_dir):
        print(f"Missing skeleton data for {sequence} in {action}")
        return

    cap = cv2.VideoCapture(sequence_video_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"{action}_{sequence}_skeleton_annotated.mp4")
    video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    frame_num = 0
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        npy_path = os.path.join(skeleton_sequence_dir, f"{frame_num}.npy")
        if os.path.exists(npy_path):
            keypoints = np.load(npy_path)
            coords = keypoints[:, :2]
            scores = keypoints[:, 2]
            print(f"Frame {frame_num} keypoints: {coords}, scores: {scores}")  # Debug print

            for (x, y), score in zip(coords, scores):
                # Scale the coordinates to frame dimensions
                x = int(x * frame_width)
                y = int(y * frame_height)
                if score > 0.5:  # Only draw keypoints with high confidence
                    cv2.circle(frame, (x, y), 3, (219, 55, 156), -1)
                    print(f"Drawing keypoint at: {(x, y)}, score: {score}")  # Debug print

        video_writer.write(frame)
        frame_num += 1

    cap.release()
    video_writer.release()
    print(f"Annotated video with skeletons saved to {output_path}")


In [None]:
action = 'Browsing'
sequence = 'sequence_1'
input_video_dir = '/home/samer/Desktop/Projects/mmaction2/data/Data_7_3'
skeletons_dir = '/home/samer/Desktop/Projects/mmaction2/scripts/Skeletons_Visualization_Data_7_3'
output_dir = '/home/samer/Desktop/Projects/mmaction2/scripts/Annotated_Videos'

visualize_extracted_skeletons_yolo(action, sequence, input_video_dir, skeletons_dir, output_dir)
