In [37]:
!pip install ultralytics



In [38]:
import cv2
import os
import pandas as pd
import re
import numpy as np
from ultralytics import YOLO
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
import joblib  # For saving and loading the model
import warnings

In [39]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [40]:
warnings.filterwarnings("ignore")
plt.rcParams['font.sans-serif'] = ['SimHei']  # Set Chinese font

# Define parameters
dataset_dir = r'iss'  # Dataset directory, please modify according to the actual path
output_image_dir = 'output_images'  # Folder to save frame images
output_csv_path = 'output_keypoints_data.csv'  # CSV file to save joint data
os.makedirs(output_image_dir, exist_ok=True)


In [41]:
# Define names for the 17 keypoints
KEYPOINT_NAMES = [
    "Nose", "Right Eye", "Left Eye", "Right Ear", "Left Ear",
    "Right Shoulder", "Left Shoulder", "Right Elbow", "Left Elbow",
    "Right Wrist", "Left Wrist", "Right Hip", "Left Hip",
    "Right Knee", "Left Knee", "Right Ankle", "Left Ankle"
]

# define key points to keep
keypoints_to_keep = [
    'Right Shoulder', 'Left Shoulder', 'Right Elbow', 'Left Elbow',
    'Right Wrist', 'Left Wrist', 'Right Hip', 'Left Hip',
    'Right Knee', 'Left Knee', 'Right Ankle', 'Left Ankle'
]


In [42]:
# construct the column names to be processed
keypoint_columns = []
for kp in keypoints_to_keep:
    keypoint_columns.append(f'{kp}_x')
    keypoint_columns.append(f'{kp}_y')

In [43]:
# Define a function to calculate the new origin and coordinate normalization
def calculate_new_origin(keypoints_data):
    right_shoulder = keypoints_data[5][:2]
    left_shoulder = keypoints_data[6][:2]
    right_hip = keypoints_data[11][:2]
    left_hip = keypoints_data[12][:2]

    valid_points = [p for p in [right_shoulder, left_shoulder, right_hip, left_hip] if p[0] != 0 and p[1] != 0]
    if len(valid_points) == 0:
        raise ValueError("Missing keypoints for right shoulder, left shoulder, right hip, and left hip, cannot calculate new origin")

    x0 = sum([p[0] for p in valid_points]) / len(valid_points)
    y0 = sum([p[1] for p in valid_points]) / len(valid_points)
    return x0, y0

def get_min_max_of_new_coords(keypoints_data, x0, y0):#计算在新的坐标系下，所有有效关键点的 x 和 y 坐标的最小值和最大值。这是为了在新的坐标系中对关键点进行归一化。
    x_new_values = []
    y_new_values = []

    for keypoint in keypoints_data:
        x, y, conf = keypoint
        if conf > 0:  # Only compute valid keypoints
            x_new = x - x0
            y_new = -(y - y0)  # Y-axis grows from bottom to top
            x_new_values.append(x_new)
            y_new_values.append(y_new)

    x_min_new, x_max_new = min(x_new_values), max(x_new_values)
    y_min_new, y_max_new = min(y_new_values), max(y_new_values)

    return x_min_new, x_max_new, y_min_new, y_max_new

def parse_keypoints_with_custom_origin(results):
    parsed_keypoints_list = []
    for i in range(len(results)):
        keypoints_data = results[i].keypoints.data.cpu().numpy()[0]  # Retrieve keypoints (17, 3)

        # Calculate the new origin coordinates
        try:
            x0, y0 = calculate_new_origin(keypoints_data)
        except ValueError as e:
            print(f"Object {i} could not calculate a new origin: {e}")
            continue  # Skip this object

        # Calculate the min and max values of x_new and y_new in the new coordinate system
        x_min_new, x_max_new, y_min_new, y_max_new = get_min_max_of_new_coords(keypoints_data, x0, y0)

        object_keypoints = {"object_id": i}
        keypoint_dict = {}

        # Calculate each keypoint's coordinates relative to the new origin and normalize
        for j, keypoint in enumerate(keypoints_data):
            x, y, conf = keypoint
            if x == 0 and y == 0:
                keypoint_dict[KEYPOINT_NAMES[j]] = None
            else:
                # Coordinates in the new coordinate system, Y-axis grows from bottom to top
                x_new = x - x0
                y_new = -(y - y0)

                # Normalize the new coordinates
                if x_max_new != x_min_new:
                    x_normalized = (x_new - x_min_new) / (x_max_new - x_min_new)
                else:
                    x_normalized = 0  # Avoid division by zero if max equals min

                if y_max_new != y_min_new:
                    y_normalized = (y_new - y_min_new) / (y_max_new - y_min_new)
                else:
                    y_normalized = 0  # Avoid division by zero if max equals min

                keypoint_dict[KEYPOINT_NAMES[j]] = {
                    "name": KEYPOINT_NAMES[j],
                    "x": x_normalized,
                    "y": y_normalized,
                    "confidence": conf
                }

        object_keypoints["keypoints"] = keypoint_dict
        parsed_keypoints_list.append(object_keypoints)

    return parsed_keypoints_list


In [44]:
# Initialize a list to store data
all_data = []
all_data_new_video = []
# Load the YOLOv8 pose model
model = YOLO('yolov8n-pose.pt')


In [45]:
standard_dataset_dir = '/content/drive/MyDrive/iss/standard_video'
standard_output_image_dir = '/content/drive/MyDrive/iss/standard_output_images/'
# Traverse the entire dataset_fitness directory to find all video files
for root, dirs, files in os.walk(standard_dataset_dir):
    for file in files:
        if file.endswith('.mp4') or file.endswith('.MOV'):  # Process video files
            input_video_path = os.path.join(root, file)
            # Output the currently processing video file
            print(f"Processing video: {input_video_path}")
            # Extract action name and standard type
            video_info = input_video_path.split(os.sep)[-2:]  # Extract the last two parts as the action name and standard type
            action_name = re.sub(r'^\d+\s*', '', video_info[0])  # Remove leading numbers and spaces
            standard_type = video_info[1].split('.')[0]

            # Open the video file
            cap = cv2.VideoCapture(input_video_path)
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            duration = frame_count / fps

            # Take 5 frames every 0.5 seconds
            frames_per_half_second = int(fps / 2)
            max_duration = 58.0  # Only process the first 60 seconds of the video
            max_frame_to_process = int(fps * max_duration)

            frame_indices = []
            for i in range(0, max_frame_to_process, frames_per_half_second):
                # Select 3 frames from the 5-frame interval
                for j in [0, 2, 4]:  # Select the 1st, 3rd, and 5th frames from the 5 frames
                    frame_index = i + j
                    if frame_index < max_frame_to_process:
                        frame_indices.append(frame_index)

            frame_indices = sorted(set(frame_indices))  # Ensure no duplicate frames, sort in order

            # Process the video frame by frame
            frame_number = 0
            sequence_number = 0

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                if frame_number in frame_indices:
                    # Use the model for detection
                    results = model(frame)
                    parsed_keypoints = parse_keypoints_with_custom_origin(results)

                    # Assign the same sequence number: frames within the same 0.5-second interval share the same sequence number
                    sequence_number = frame_number // frames_per_half_second

                    for obj in parsed_keypoints:
                        keypoints = obj['keypoints']
                        row_data = {
                            'action_name': action_name,
                            'standard_type': standard_type,
                            'frame_index': frame_number,
                            'sequence': sequence_number,
                        }
                        # Store the coordinates of each keypoint into row_data
                        for kp_name in KEYPOINT_NAMES:
                            kp_info = keypoints.get(kp_name, None)
                            if kp_info:
                                row_data[f'{kp_name}_x'] = kp_info['x']
                                row_data[f'{kp_name}_y'] = kp_info['y']
                            else:
                                row_data[f'{kp_name}_x'] = None
                                row_data[f'{kp_name}_y'] = None

                        all_data.append(row_data)

                    # Save the processed frame image
                    processed_frame = results[0].plot()

                    image_output_path = os.path.join(standard_output_image_dir,
                                                     f"{action_name}_{standard_type}_{frame_number}_{sequence_number}.jpg")
                    cv2.imwrite(image_output_path, processed_frame)  # Save the detection result image

                frame_number += 1

            # Release video resources
            cap.release()


Processing video: /content/drive/MyDrive/iss/standard_video/sample.mp4

0: 384x640 1 person, 102.7ms
Speed: 4.3ms preprocess, 102.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 101.2ms
Speed: 3.4ms preprocess, 101.2ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 96.3ms
Speed: 2.3ms preprocess, 96.3ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 97.5ms
Speed: 2.0ms preprocess, 97.5ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Object 0 could not calculate a new origin: Missing keypoints for right shoulder, left shoulder, right hip, and left hip, cannot calculate new origin

0: 384x640 1 person, 98.7ms
Speed: 2.9ms preprocess, 98.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)
Object 0 could not calculate a new origin: Missing keypoints for right shoulder, left shoulder, right hip, and left hip, cannot calculate new orig

In [46]:
# Convert all data into a DataFrame
columns = ['action_name', 'standard_type', 'frame_index', 'sequence'] + [f'{k}_x' for k in KEYPOINT_NAMES] + [f'{k}_y' for k in KEYPOINT_NAMES]
data = pd.DataFrame(all_data, columns=columns)

# Save data to a CSV file
output_csv_path = '/content/drive/MyDrive/iss/standard_output_keypoints_data.csv'  # Path to save in Google Drive
data.to_csv(output_csv_path, index=False)
print("Data extraction completed and saved to CSV file.")


Data extraction completed and saved to CSV file.


In [47]:
# Input and output directory settings (second video)
new_dataset_dir = '/content/drive/MyDrive/iss/new_video'  # Standard video folder
new_output_image_dir = '/content/drive/MyDrive/iss/new_output_images/'  # Output image directory
output_csv_path = '/content/drive/MyDrive/iss/output_keypoints_data_second_video.csv'  # Output CSV file path

# Traverse the entire dataset_fitness directory to find all video files
for root, dirs, files in os.walk(new_dataset_dir):
    for file in files:
        if file.endswith('.mov') or file.endswith('.MOV'):  # Process video files
            input_video_path = os.path.join(root, file)
            # Output the currently processing video file
            print(f"Processing second video: {input_video_path}")
            # Extract action name and standard type
            video_info = input_video_path.split(os.sep)[-2:]  # Extract the last two parts as the action name and standard type
            action_name = re.sub(r'^\d+\s*', '', video_info[0])  # Remove leading numbers and spaces
            standard_type = video_info[1].split('.')[0]

            # Open the video file
            cap = cv2.VideoCapture(input_video_path)
            fps = cap.get(cv2.CAP_PROP_FPS)
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            duration = frame_count / fps

            # Take 5 frames every 0.5 seconds
            frames_per_half_second = int(fps / 2)
            max_duration = 58.0  # Only process the first 60 seconds of the video
            max_frame_to_process = int(fps * max_duration)

            frame_indices = []
            for i in range(0, max_frame_to_process, frames_per_half_second):
                # Select 3 frames from the 5-frame interval
                for j in [0, 2, 4]:  # Select the 1st, 3rd, and 5th frames from the 5 frames
                    frame_index = i + j
                    if frame_index < max_frame_to_process:
                        frame_indices.append(frame_index)

            frame_indices = sorted(set(frame_indices))  # Ensure no duplicate frames, sort in order

            # Process the video frame by frame
            frame_number = 0
            sequence_number = 0

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                if frame_number in frame_indices:
                    # Use the model for detection
                    results = model(frame)
                    parsed_keypoints = parse_keypoints_with_custom_origin(results)

                    # Assign the same sequence number: frames within the same 0.5-second interval share the same sequence number
                    sequence_number = frame_number // frames_per_half_second

                    for obj in parsed_keypoints:
                        keypoints = obj['keypoints']
                        row_data = {
                            'action_name': action_name,
                            'standard_type': standard_type,
                            'frame_index': frame_number,
                            'sequence': sequence_number,
                        }
                        # Store the coordinates of each keypoint into row_data
                        for kp_name in KEYPOINT_NAMES:
                            kp_info = keypoints.get(kp_name, None)
                            if kp_info:
                                row_data[f'{kp_name}_x'] = kp_info['x']
                                row_data[f'{kp_name}_y'] = kp_info['y']
                            else:
                                row_data[f'{kp_name}_x'] = None
                                row_data[f'{kp_name}_y'] = None

                        all_data_new_video.append(row_data)

                    # Save the processed frame image
                    processed_frame = results[0].plot()

                    image_output_path = os.path.join(new_output_image_dir,
                                                     f"{action_name}_{standard_type}_{frame_number}_{sequence_number}.jpg")
                    cv2.imwrite(image_output_path, processed_frame)  # Save the detection result image

                frame_number += 1

            # Release video resources
            cap.release()


Processing second video: /content/drive/MyDrive/iss/new_video/IMG_3945 2.mov

0: 640x416 1 person, 123.7ms
Speed: 4.0ms preprocess, 123.7ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 121.0ms
Speed: 3.0ms preprocess, 121.0ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 107.8ms
Speed: 2.6ms preprocess, 107.8ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 103.3ms
Speed: 1.7ms preprocess, 103.3ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 101.5ms
Speed: 2.5ms preprocess, 101.5ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 102.5ms
Speed: 2.1ms preprocess, 102.5ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 101.9ms
Speed: 2.9ms preprocess, 101.9ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 416)

0: 640x416 1 person, 102.8

In [48]:
# Convert all data into a DataFrame
columns = ['action_name', 'standard_type', 'frame_index', 'sequence'] + [f'{k}_x' for k in KEYPOINT_NAMES] + [f'{k}_y' for k in KEYPOINT_NAMES]
data = pd.DataFrame(all_data_new_video, columns=columns)

# Save data to a CSV file
new_output_csv_path = '/content/drive/MyDrive/iss/new_output_keypoints_data.csv'  # Path to save in Google Drive
data.to_csv(output_csv_path, index=False)
print("Data extraction completed and saved to CSV file.")

Data extraction completed and saved to CSV file.


In [52]:
import pandas as pd
import numpy as np

# Read the CSV files of the standard video and the new video
standard_data = pd.read_csv('/content/drive/MyDrive/iss/standard_output_keypoints_data.csv')  # Path to the CSV of the standard video
new_data = pd.read_csv('/content/drive/MyDrive/iss/output_keypoints_data_second_video.csv')  # Path to the CSV of the new video

# Select the keypoints to compare
important_keypoints = [
    "Right Shoulder", "Left Shoulder", "Right Elbow", "Left Elbow",
    "Right Knee", "Left Knee", "Right Ankle", "Left Ankle"
]

# Set threshold T, assuming the threshold is 0.1, meaning the maximum allowed deviation is 0.1
T = 0.1  # You can adjust this value based on actual requirements

# Calculate total number of frames
total_frames = len(standard_data)

# Deduction standard when each keypoint exceeds the threshold
penalty_per_keypoint = 100 / total_frames

# Initialize the total score
total_score = 100  # Initial score is 100

# Get the minimum number of frames from both datasets
total_frames = min(len(standard_data), len(new_data))

# Iterate through each frame to calculate the score
for i in range(total_frames):
    standard_row = standard_data.iloc[i]
    new_row = new_data.iloc[i]

    # Initial score for each frame (100 points)
    frame_score = 100

    for kp_name in important_keypoints:
        standard_x = standard_row[f'{kp_name}_x']
        standard_y = standard_row[f'{kp_name}_y']
        new_x = new_row[f'{kp_name}_x']
        new_y = new_row[f'{kp_name}_y']

        # Check if the keypoint is valid
        if standard_x is not None and new_x is not None and standard_y is not None and new_y is not None:
            # Calculate the Euclidean distance
            diff = np.sqrt((standard_x - new_x)**2 + (standard_y - new_y)**2)

            # If the deviation exceeds the threshold, deduct points proportionally, the greater the deviation, the more points are deducted
            if diff > T:
                # Increase the penalty based on the deviation and threshold, increasing the deduction
                penalty = (diff - T) * 100 / (1 - T)  # Calculate the deduction ratio based on deviation, increasing the penalty intensity
                frame_score -= penalty

    # Ensure that the score for each frame is not lower than 0
    frame_score = max(frame_score, 0)

    # Update the total score
    total_score = (total_score + frame_score) / 2  # Add each frame's score to the total score (average)

# Output the final total score
print(f"Total Score: {total_score}")


Total Score: 41.91355701196365
