In [18]:
import os
import cv2
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import mediapipe as mp

# Define helper functions for feature extraction
def calculate_angle(a, b, c):
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba, bc = a - b, c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    return np.degrees(np.arccos(np.clip(cosine_angle, -1.0, 1.0)))

def calculate_distance(a, b):
    a, b = np.array(a), np.array(b)
    return np.linalg.norm(a - b)

def calculate_velocity(coord1, coord2, fps=30):
    dx, dy = coord2[0] - coord1[0], coord2[1] - coord1[1]
    return dx * fps, dy * fps

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Define stage-specific feature extraction logic
def preprocess_features_stage1(keypoints):
    return np.array([
        calculate_angle(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
        )
        for frame in keypoints
    ]).reshape(-1, 1)

def preprocess_features_stage2(keypoints):
    angles = [
        calculate_angle(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
        )
        for frame in keypoints
    ]
    distances = [
        calculate_distance(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
        )
        for frame in keypoints
    ]
    return np.stack([angles, distances], axis=1)

def preprocess_features_stage3(keypoints):
    features = []
    for i in range(1, len(keypoints)):
        features.append([
            calculate_velocity(
                (
                    keypoints[i - 1][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i - 1][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"],
                ),
                (
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"],
                ),
            )[0],
            calculate_angle(
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
            ),
            calculate_distance(
                (
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
            )
        ])
    return np.array(features)

def preprocess_features_stage4(keypoints):
    features = []
    for i in range(1, len(keypoints)):
        push_leg_velocity_x, push_leg_velocity_y = calculate_velocity(
            (keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
             keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
        )
        knee_ankle_distance = calculate_distance(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
        )
        shoulder_angle = calculate_angle(
            (keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]),
            [keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"] + 1,
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]],
        )
        right_arm_angle = calculate_angle(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_ELBOW.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_ELBOW.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
        )
        features.append([push_leg_velocity_x, push_leg_velocity_y, knee_ankle_distance, shoulder_angle, right_arm_angle])
    return np.array(features)

def preprocess_features_stage5(keypoints):
    features = []
    for i in range(len(keypoints)):
        shot_neck_distance = calculate_distance(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["y"]),
        )
        release_angle = (
            calculate_angle(
                (keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
                 keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
                (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
                 keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
                (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"] + 1,
                 keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
            )
            if i > 0
            else 0
        )
        features.append([shot_neck_distance, release_angle])
    return np.array(features)

# Stage configuration
stages = {
    "stage1": preprocess_features_stage1,
    "stage2": preprocess_features_stage2,
    "stage3": preprocess_features_stage3,
    "stage4": preprocess_features_stage4,
    "stage5": preprocess_features_stage5,
}

# Unified evaluation workflow
def evaluate_shotput_stages(video_dir, model_dir):
    results = []
    for stage, preprocess_fn in stages.items():
        video_path = os.path.join(video_dir, f"{stage}.mp4")
        model_path = os.path.join(model_dir, f"shotput_{stage}.keras")

        try:
            # Load the model
            model = load_model(model_path)

            # Extract keypoints
            keypoints = extract_keypoints(video_path)

            # Preprocess features
            features = preprocess_fn(keypoints)

            # Pad features to match input shape
            features = pad_sequences([features], maxlen=model.input_shape[1], padding="post", dtype="float32")

            # Predict
            predictions = model.predict(features)
            avg_prediction = np.mean(predictions)
            results.append((stage, avg_prediction))
        except Exception as e:
            print(f"Error processing {stage}: {e}")
            results.append((stage, "Error"))

    return results

# Run evaluation
def main():
    video_dir = "/Users/cezar/Desktop/Team Project/AI/shotput/test_videos_1"
    model_dir = "/Users/cezar/Desktop/Team Project/AI/shotput/shotput_models"

    results = evaluate_shotput_stages(video_dir, model_dir)
    overall_score = sum(score for _, score in results if score != "Error")
    results_table = pd.DataFrame(results, columns=["Stage", "Score"])
    results_table.loc[len(results_table)] = ["Overall", f"{overall_score}/5"]
    print(results_table)

if __name__ == "__main__":
    main()


I0000 00:00:1736714673.413382 9328607 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M2 Pro
  saveable.load_own_variables(weights_store.get(inner_path))
W0000 00:00:1736714673.505441 9384850 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1736714673.520041 9384850 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 66ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 112ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 108ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 106ms/step
     Stage                 Score
0   stage1              0.989415
1   stage2              1.008653
2   stage3              0.333333
3   stage4              0.333333
4   stage5              0.333333
5  Overall  2.9980683624744415/5


In [3]:
import os
import cv2
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import mediapipe as mp

# Define helper functions for feature extraction
def calculate_angle(a, b, c):
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba, bc = a - b, c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    return np.degrees(np.arccos(np.clip(cosine_angle, -1.0, 1.0)))

def calculate_distance(a, b):
    a, b = np.array(a), np.array(b)
    return np.linalg.norm(a - b)

def calculate_velocity(coord1, coord2, fps=30):
    dx, dy = coord2[0] - coord1[0], coord2[1] - coord1[1]
    return dx * fps, dy * fps

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Define stage-specific feature extraction logic
def preprocess_features_stage1(keypoints):
    return np.array([
        calculate_angle(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
        )
        for frame in keypoints
    ]).reshape(-1, 1)

def preprocess_features_stage2(keypoints):
    angles = [
        calculate_angle(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
        )
        for frame in keypoints
    ]
    distances = [
        calculate_distance(
            (frame[mp_pose.PoseLandmark.LEFT_HIP.value]["x"], frame[mp_pose.PoseLandmark.LEFT_HIP.value]["y"]),
            (frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["x"], frame[mp_pose.PoseLandmark.LEFT_KNEE.value]["y"]),
        )
        for frame in keypoints
    ]
    return np.stack([angles, distances], axis=1)

def preprocess_features_stage3(keypoints):
    features = []
    for i in range(1, len(keypoints)):
        features.append([
            calculate_velocity(
                (
                    keypoints[i - 1][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i - 1][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"],
                ),
                (
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"],
                ),
            )[0],
            calculate_angle(
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
            ),
            calculate_distance(
                (
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.LEFT_ANKLE.value]["y"]),
                (
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
                    keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
            )
        ])
    return np.array(features)

def preprocess_features_stage4(keypoints):
    features = []
    for i in range(1, len(keypoints)):
        push_leg_velocity_x, push_leg_velocity_y = calculate_velocity(
            (keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
             keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_HIP.value]["y"]),
        )
        knee_ankle_distance = calculate_distance(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_KNEE.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_ANKLE.value]["y"]),
        )
        shoulder_angle = calculate_angle(
            (keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]),
            [keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"] + 1,
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]],
        )
        right_arm_angle = calculate_angle(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_SHOULDER.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_ELBOW.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_ELBOW.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
        )
        features.append([push_leg_velocity_x, push_leg_velocity_y, knee_ankle_distance, shoulder_angle, right_arm_angle])
    return np.array(features)

def preprocess_features_stage5(keypoints):
    features = []
    for i in range(len(keypoints)):
        shot_neck_distance = calculate_distance(
            (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
            (keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["x"],
             keypoints[i][mp_pose.PoseLandmark.LEFT_SHOULDER.value]["y"]),
        )
        release_angle = (
            calculate_angle(
                (keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
                 keypoints[i - 1][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
                (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"],
                 keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
                (keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["x"] + 1,
                 keypoints[i][mp_pose.PoseLandmark.RIGHT_WRIST.value]["y"]),
            )
            if i > 0
            else 0
        )
        features.append([shot_neck_distance, release_angle])
    return np.array(features)

# Stage configuration
stages = {
    "stage1": {"preprocess": preprocess_features_stage1, "type": "regression"},
    "stage2": {"preprocess": preprocess_features_stage2, "type": "regression"},
    "stage3": {"preprocess": preprocess_features_stage3, "type": "regression"},
    "stage4": {"preprocess": preprocess_features_stage4, "type": "classification"},
    "stage5": {"preprocess": preprocess_features_stage5, "type": "classification"},
}

# Unified evaluation workflow
def evaluate_shotput_stages(video_dir, model_dir):
    results = []
    for stage, config in stages.items():
        preprocess_fn = config["preprocess"]
        model_type = config["type"]
        video_path = os.path.join(video_dir, f"{stage}.mp4")
        model_path = os.path.join(model_dir, f"shotput_{stage}.keras")

        try:
            # Load the model
            model = load_model(model_path)

            # Extract keypoints
            keypoints = extract_keypoints(video_path)

            # Preprocess features
            features = preprocess_fn(keypoints)

            # Pad features to match input shape
            features = pad_sequences([features], maxlen=model.input_shape[1], padding="post", dtype="float32")

            # Predict
            predictions = model.predict(features)

            if model_type == "classification":
                predicted_class = np.argmax(predictions, axis=1)[0]
                score_mapping = {0: 0, 1: 0.5, 2: 1}
                results.append((stage, score_mapping[predicted_class]))
            else:  # Regression
                avg_prediction = np.mean(predictions)
                results.append((stage, avg_prediction))
        except Exception as e:
            print(f"Error processing {stage}: {e}")
            results.append((stage, "Error"))

    return results

# Extract keypoints from video
def extract_keypoints(video_path):
    cap = cv2.VideoCapture(video_path)
    keypoints = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(frame_rgb)
        if results.pose_landmarks:
            keypoints.append([
                {
                    "x": lm.x,
                    "y": lm.y,
                    "z": lm.z
                } for lm in results.pose_landmarks.landmark
            ])
    cap.release()
    if not keypoints:
        raise ValueError(f"No keypoints extracted from video: {video_path}")
    return keypoints

# Run evaluation
def main():
    video_dir = "/Users/cezar/Desktop/Team Project/AI/shotput/test_videos_1"
    model_dir = "/Users/cezar/Desktop/Team Project/AI/shotput/shotput_models"

    results = evaluate_shotput_stages(video_dir, model_dir)
    results_table = pd.DataFrame(results, columns=["Stage", "Score"])
    results_table.loc[len(results_table)] = ["Overall", f"{sum(score for _, score in results if score != 'Error')}/5"]
    print(results_table)

if __name__ == "__main__":
    main()


I0000 00:00:1736715864.371042 9398579 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M2 Pro
W0000 00:00:1736715864.453693 9403359 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1736715864.465073 9403355 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


Error processing stage1: Could not locate function 'weighted_mse'. Make sure custom classes are decorated with `@keras.saving.register_keras_serializable()`. Full object config: {'module': 'builtins', 'class_name': 'function', 'config': 'weighted_mse', 'registered_name': 'function'}
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 67ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 104ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 111ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 104ms/step
     Stage                 Score
0   stage1                 Error
1   stage2              1.007987
2   stage3              0.333333
3   stage4                     0
4   stage5                     0
5  Overall  1.3413206040859222/5
