In [3]:
# pip install fastapi pydantic

In [99]:
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Optional
import numpy as np
import copy

# ======== Data Models ========

# Represents a single pose frame with landmarks (joint positions)
class PoseType(BaseModel):
    frame: Optional[int] = None  # Optional frame index
    timestamp: float  # Time of this pose frame
    landmarks: Dict[str, List[float]]  # Landmark coordinates (x, y, z) for each joint

# Incoming request with actual and expected pose sequences
class ScoreRequest(BaseModel):
    actual: List[PoseType]
    expected: List[PoseType]

# ======== FastAPI Setup ========

app = FastAPI()

# Allow cross-origin requests from frontend (localhost:5173)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Health check endpoint to confirm server is running
@app.get("/health")
def health():
    return {"status": "ok"}

# Score endpoint to compare actual vs. expected movement
@app.post("/score")
def score(request: ScoreRequest):
    actual = request.actual
    expected = request.expected
    print(len(actual), len(expected))  # Debug output

    # Compute similarity score, then scale and clamp it between 0 and 100
    score = calculate_highest_grade(expected, actual) * 1000
    score = (score - 75) * 10 + 75
    return {"score": min(max(score, 0), 100)}

# ======== Pose Evaluation Logic ========

# Joint pairs used to form vectors (e.g. elbow to wrist)
REL_VEC_TUPS = (
    ("LEFT_WRIST", "LEFT_ELBOW"),
    ("LEFT_ELBOW", "LEFT_SHOULDER"),
    ("LEFT_SHOULDER", "RIGHT_SHOULDER"),
    ("RIGHT_WRIST", "RIGHT_ELBOW"),
    ("RIGHT_ELBOW", "RIGHT_SHOULDER"),
    ("LEFT_SHOULDER", "LEFT_HIP"),
    ("RIGHT_SHOULDER", "RIGHT_HIP"),
    ("LEFT_HIP", "RIGHT_HIP"),
    ("LEFT_HIP", "LEFT_KNEE"),
    ("LEFT_KNEE", "LEFT_ANKLE"),
    ("RIGHT_HIP", "RIGHT_KNEE"),
    ("RIGHT_KNEE", "RIGHT_ANKLE"),
)

# Interpolate pose at a given timestamp between two frames
def lin_interpolate_frames(frame1, frame2, timestamp):
    output_landmarks = {}
    output = PoseType(
        frame=None,
        timestamp=timestamp,
        landmarks=output_landmarks,
    )
    landmarks = [
        f"{side}_{bodypart}"
        for side in ["LEFT", "RIGHT"]
        for bodypart in ["WRIST", "ELBOW", "SHOULDER", "HIP", "KNEE", "ANKLE"]
    ]

    delta_t = frame2.timestamp - frame1.timestamp
    if delta_t == 0:
        print(f"⚠️ Duplicate timestamps at {timestamp}, using previous frame")
        return copy.deepcopy(frame1)

    fraction = (timestamp - frame1.timestamp) / delta_t

    for landmark in landmarks:
        if landmark not in frame1.landmarks or landmark not in frame2.landmarks:
            continue
        output_landmarks[landmark] = [
            frame1.landmarks[landmark][dim] +
            fraction * (frame2.landmarks[landmark][dim] - frame1.landmarks[landmark][dim])
            for dim in range(3)
        ]

    output.landmarks = output_landmarks
    return output




# Normalize and align timestamps between two pose sequences using linear interpolation
# ✅ FIXED VERSION OF fill_values with floating point timestamp deduplication
def fill_values(template_pose_data, user_pose_data):
    all_timestamps_ordered = set()
    temp_orig_timestamps = set()
    user_orig_timestamps = set()

    # Merge original timestamps
    for frame in template_pose_data:
        t = round(frame.timestamp, 5)
        all_timestamps_ordered.add(t)
        temp_orig_timestamps.add(t)

    for frame in user_pose_data:
        t = round(frame.timestamp, 5)
        all_timestamps_ordered.add(t)
        user_orig_timestamps.add(t)

    all_timestamps_ordered = sorted(all_timestamps_ordered)

    temp_out = []
    user_out = []

    temp_pointer = 0
    user_pointer = 0

    for timestamp in all_timestamps_ordered:
        # Template interpolation or direct frame
        if timestamp in temp_orig_timestamps:
            while round(template_pose_data[temp_pointer].timestamp, 5) != timestamp:
                temp_pointer += 1
            temp_out.append(template_pose_data[temp_pointer])
        else:
            temp_out.append(
                lin_interpolate_frames(
                    template_pose_data[temp_pointer - 1],
                    template_pose_data[temp_pointer],
                    timestamp,
                )
            )

        # User interpolation or direct frame
        if timestamp in user_orig_timestamps:
            while round(user_pose_data[user_pointer].timestamp, 5) != timestamp:
                user_pointer += 1
            user_out.append(user_pose_data[user_pointer])
        else:
            user_out.append(
                lin_interpolate_frames(
                    user_pose_data[user_pointer - 1],
                    user_pose_data[user_pointer],
                    timestamp,
                )
            )

    return temp_out, user_out

# Compute unit vector from one joint to another
def find_normalized_relative_vec_from_obj(from_landmark, to_landmark, frame_obj):
    from_coords = frame_obj.landmarks[from_landmark]
    to_coords = frame_obj.landmarks[to_landmark]
    relative_vec = [to_coords[i] - from_coords[i] for i in range(3)]
    norm = np.linalg.norm(relative_vec)
    return [c / norm for c in relative_vec]

# Assign weight to each joint vector based on how much it changes over time
def find_weights(pose_data):
    weights = []
    for rel_vec in REL_VEC_TUPS:
        diffs = []
        for i in range(1, len(pose_data)):
            vec1 = find_normalized_relative_vec_from_obj(*rel_vec, pose_data[i - 1])
            vec2 = find_normalized_relative_vec_from_obj(*rel_vec, pose_data[i])
            diffs.append(np.linalg.norm(np.array(vec2) - np.array(vec1)))
        weights.append(sum(diffs))
    return weights

# Calculate expected vs actual vectors between joints at one frame
def calculate_vectors(expected, actual):
    exp_and_actual_vec = {}

    for key in REL_VEC_TUPS:
        joint1, joint2 = key
        if joint1 not in expected or joint2 not in expected:
            print(f"❌ Expected frame missing: {joint1} or {joint2}")
            continue
        if joint1 not in actual or joint2 not in actual:
            print(f"❌ Actual frame missing: {joint1} or {joint2}")
            continue

        expected_vector = np.array(expected[joint1]) - np.array(expected[joint2])
        actual_vector = np.array(actual[joint1]) - np.array(actual[joint2])
        exp_and_actual_vec[key] = [(expected_vector, actual_vector)]

    return exp_and_actual_vec



# Score the similarity between two pose sequences based on vector alignment
def calculate_norm(expected_frames, actual_frames, weights):
    total = np.array([])
    for i in range(len(expected_frames)):
        vectors = calculate_vectors(expected_frames[i], actual_frames[i])
        list_of_diffs = np.array([])

        for num, key in enumerate(REL_VEC_TUPS):
            vector1, vector2 = vectors[key][0]
            cosine_similarity = np.dot(vector1, vector2) / (
                np.linalg.norm(vector1) * np.linalg.norm(vector2)
            )
            list_of_diffs = np.append(
                list_of_diffs, (cosine_similarity + 1) / 2 * weights[num]
            )

        avg = list_of_diffs.sum() / len(list_of_diffs)
        total = np.append(total, avg)

    return total.mean()

# Softmax to normalize weights
def softmax(x):
    e_x = np.exp(x - np.max(x))  # Stabilize values for large exponentials
    return e_x / e_x.sum(axis=0, keepdims=True)

# Try matching segments of actual movements to expected movement and grade them
def calculate_grade_for_groups2(expected_movements, actual_movements):
    weights = softmax(find_weights(expected_movements))
    grade_per_timestamp_group = {}

    for i in range(len(expected_movements), len(actual_movements)):
        rang = (i - len(expected_movements), i)
        actual_movements_window = copy.deepcopy(actual_movements[rang[0]: rang[1]])
        first_time = actual_movements_window[0].timestamp

        # Normalize timestamps to start from zero
        for j in range(len(actual_movements_window)):
            actual_movements_window[j].timestamp -= first_time

        interpolated_data = fill_values(expected_movements, actual_movements_window)
        expected_landmarks = [d.landmarks for d in interpolated_data[0]]
        actual_landmarks = [d.landmarks for d in interpolated_data[1]]
        grade = calculate_norm(expected_landmarks, actual_landmarks, weights)
        grade_per_timestamp_group[rang] = grade

    return grade_per_timestamp_group

# Get the highest scoring match from all possible subsequences
def calculate_highest_grade(expected_movements, actual_movements):
    return max(calculate_grade_for_groups2(expected_movements, actual_movements).values())



import random

def add_noise_to_pose_data(pose_data: List[PoseType], noise_level: float) -> List[PoseType]:
    """
    Returns a deep copy of pose_data with Gaussian noise added to each landmark coordinate.

    Args:
        pose_data (List[PoseType]): Original clean pose data
        noise_level (float): Standard deviation of the Gaussian noise to add (e.g., 0.01 is mild, 0.1 is spicy)

    Returns:
        List[PoseType]: Noised pose data
    """
    noisy_data = copy.deepcopy(pose_data)

    for frame in noisy_data:
        for landmark, coords in frame.landmarks.items():
            frame.landmarks[landmark] = [
                coord + random.gauss(0, noise_level) for coord in coords
            ]
    
    return noisy_data



def extend_pose_data_with_noise(pose_data: List[PoseType], repeat: int = 2, noise_level: float = 0.05) -> List[PoseType]:
    extended = []
    duration = pose_data[-1].timestamp - pose_data[0].timestamp
    frame_interval = pose_data[1].timestamp - pose_data[0].timestamp

    for i in range(repeat):
        noisy_copy = add_noise_to_pose_data(pose_data, noise_level=noise_level)
        time_shift = (i + 1) * duration + (i + 1) * frame_interval  # Avoid edge overlap
        for frame in noisy_copy:
            frame.timestamp = round(frame.timestamp + time_shift, 5)
        extended.extend(noisy_copy)

    return extended

In [100]:
def validate_pose_data(pose_data):
    for i, frame in enumerate(pose_data):
        missing = [joint for joint in ["LEFT_WRIST", "RIGHT_WRIST", "LEFT_ELBOW", "RIGHT_ELBOW"] if joint not in frame.landmarks]
        if missing:
            print(f"⚠️ Frame {i} missing joints: {missing}")


In [101]:
import json

with open("../data/deadpool.json", "r") as f:
    raw_data = json.load(f)


In [102]:
expected_poses = [PoseType(**frame) for frame in raw_data]
noisy_actual = extend_pose_data_with_noise(expected_poses, noise_level=0.05)

In [103]:
# for i in expected_poses:
#     i.landmarks['LEFT_WRIST']

In [104]:
score = calculate_highest_grade(expected_poses, noisy_actual)
print(f"Similarity score with noise: {score:.2f}")


Similarity score with noise: 0.08


In [92]:
[i.timestamp for i in expected_poses]

[0.0,
 0.033,
 0.067,
 0.1,
 0.133,
 0.167,
 0.2,
 0.233,
 0.267,
 0.3,
 0.333,
 0.367,
 0.4,
 0.433,
 0.467,
 0.5,
 0.533,
 0.567,
 0.6,
 0.633,
 0.667,
 0.7,
 0.733,
 0.767,
 0.8,
 0.833,
 0.867,
 0.9,
 0.933,
 0.967,
 1.0,
 1.033,
 1.067,
 1.1,
 1.133,
 1.167,
 1.2,
 1.233,
 1.267,
 1.3,
 1.333,
 1.367,
 1.4,
 1.433,
 1.467,
 1.5,
 1.533,
 1.567,
 1.6,
 1.633,
 1.667,
 1.7,
 1.733,
 1.767,
 1.8,
 1.833,
 1.867,
 1.9,
 1.933,
 1.967,
 2.0,
 2.033,
 2.067,
 2.1,
 2.133,
 2.167,
 2.2,
 2.233,
 2.267,
 2.3,
 2.333,
 2.367,
 2.4,
 2.433,
 2.467,
 2.5,
 2.533,
 2.567,
 2.6,
 2.633,
 2.667,
 2.7,
 2.733,
 2.767,
 2.8,
 2.833,
 2.867,
 2.9,
 2.933,
 2.967,
 3.0,
 3.033,
 3.067,
 3.1,
 3.133,
 3.167,
 3.2,
 3.233,
 3.267,
 3.3,
 3.333,
 3.367,
 3.4,
 3.433,
 3.467,
 3.5,
 3.533,
 3.567,
 3.6,
 3.633,
 3.667,
 3.7,
 3.733,
 3.767,
 3.8,
 3.833,
 3.867,
 3.9,
 3.933,
 3.967,
 4.0,
 4.033,
 4.067,
 4.1,
 4.133,
 4.167,
 4.2,
 4.233,
 4.267,
 4.3,
 4.333,
 4.367,
 4.4,
 4.433,
 4.467,
 4.5,
 4.5

In [75]:
wow = [i.timestamp for i in noisy_actual]
wow

[28.3,
 28.333000000000002,
 28.367,
 28.400000000000002,
 28.433,
 28.467000000000002,
 28.5,
 28.533,
 28.567,
 28.6,
 28.633,
 28.667,
 28.7,
 28.733,
 28.767,
 28.8,
 28.833000000000002,
 28.867,
 28.900000000000002,
 28.933,
 28.967000000000002,
 29.0,
 29.033,
 29.067,
 29.1,
 29.133,
 29.167,
 29.2,
 29.233,
 29.267,
 29.3,
 29.333000000000002,
 29.367,
 29.400000000000002,
 29.433,
 29.467000000000002,
 29.5,
 29.533,
 29.567,
 29.6,
 29.633,
 29.667,
 29.7,
 29.733,
 29.767,
 29.8,
 29.833000000000002,
 29.867,
 29.900000000000002,
 29.933,
 29.967000000000002,
 30.0,
 30.033,
 30.067,
 30.1,
 30.133,
 30.167,
 30.2,
 30.233,
 30.267,
 30.3,
 30.333000000000002,
 30.367,
 30.400000000000002,
 30.433,
 30.467,
 30.5,
 30.533,
 30.567,
 30.6,
 30.633000000000003,
 30.667,
 30.7,
 30.733,
 30.767,
 30.8,
 30.833000000000002,
 30.867,
 30.900000000000002,
 30.933,
 30.967,
 31.0,
 31.033,
 31.067,
 31.1,
 31.133000000000003,
 31.167,
 31.2,
 31.233,
 31.267,
 31.3,
 31.33300000000

In [76]:
[i.timestamp for i in expected_poses if i.timestamp in wow][::-1]

[28.3]