### Imports and helper functions

In [None]:
import cv2
import mediapipe as mp
import csv
import pandas as pd
import numpy as np
import math

In [None]:
def unit_vector(vector):
    return vector / np.linalg.norm(vector)

def distance(v1, v2):
    if len(v1) != len(v2):
        raise ValueError("Vectors must be of same dimensions!")
    
    squared_sum = sum((a - b) ** 2 for a, b in zip(v1, v2))
    return math.sqrt(squared_sum)

def create_pose(mp_pose):
    return mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        smooth_landmarks=True
    )

---
### Play video
#### without marker names

In [None]:
def play_without_markers(video_path, rotate_video_by_degrees, mp_pose):
    """Displays the video with markers from MediaPipe, but without their names."""
    cap = cv2.VideoCapture(video_path)

    pose = create_pose(mp_pose)
    
    mp_drawing = mp.solutions.drawing_utils

    while cap.isOpened():
        success, frame = cap.read()
            
        if not success:
            break

        if (rotate_video_by_degrees != 0):
            if (rotate_video_by_degrees == 90):
                frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
            elif(rotate_video_by_degrees == 180):
                frame = cv2.rotate(frame, cv2.ROTATE_180)
            elif(rotate_video_by_degrees == 270):
                frame = cv2.rotate(frame, cv2.ROTATE_270)     
            
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            mp_drawing.draw_landmarks(
                frame,
                results.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)
            )
        cv2.imshow("Video (without names)", frame)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break
    
    
    cap.release()
    cv2.destroyAllWindows()

#### with marker names

In [None]:
def play_with_markers(video_path, rotate_video_by_degrees, mp_pose):
    """Displays the video with markers and their names."""
    cap = cv2.VideoCapture(video_path)
    
    pose = create_pose(mp_pose)

    mp_drawing = mp.solutions.drawing_utils
    
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break
        
        
        if (rotate_video_by_degrees != 0):
            if (rotate_video_by_degrees == 90):
                frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
            elif(rotate_video_by_degrees == 180):
                frame = cv2.rotate(frame, cv2.ROTATE_180)
            elif(rotate_video_by_degrees == 270):
                frame = cv2.rotate(frame, cv2.ROTATE_270)

        # BGR -> RGB
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            # draw pose
            mp_drawing.draw_landmarks(
                frame,
                results.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2)
            )

            # display marker names
            h, w, _ = frame.shape
            for idx, landmark in enumerate(results.pose_landmarks.landmark):
                cx, cy = int(landmark.x * w), int(landmark.y * h)
                name = mp_pose.PoseLandmark(idx).name
                cv2.putText(frame, name, (cx + 5, cy - 5),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1, cv2.LINE_AA)

        cv2.imshow("Video (with names)", frame)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

---
### CSV Export of MediaPipe data

In [None]:
def pose_estimation(video, marker_names, landmark_map, video_folder, output_folder, rotate_video_by_degrees, flip_x_axis, mp_pose):
    """Runs the MediaPipe pose estimation on the video and extracts marker positions into a csv file."""
    print("Running pose estimation...")
    csv_path = output_folder + video + ".csv"
    video_path = video_folder + video + ".mp4"
    
    cap = cv2.VideoCapture(video_path)
    
    pose = create_pose(mp_pose)
    
    # CSV-File
    with open(csv_path, mode='w', newline='') as file:
        writer = csv.writer(file)
    
        # Meta/FPS
        writer.writerow(["#FPS", cap.get(cv2.CAP_PROP_FPS)])
    
        # Header: Frame + Marker X/Y/Z
        header = ["Frame"]
        for m in marker_names:
            header += [f"{m}_X", f"{m}_Y", f"{m}_Z"]
        writer.writerow(header)
    
        frame_idx = 0
    
        while cap.isOpened():
            success, frame = cap.read()
            if not success:
                break
                
            if (rotate_video_by_degrees != 0):
                if (rotate_video_by_degrees == 90):
                    frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
                elif(rotate_video_by_degrees == 180):
                    frame = cv2.rotate(frame, cv2.ROTATE_180)
                elif(rotate_video_by_degrees == 270):
                    frame = cv2.rotate(frame, cv2.ROTATE_270)
                    
            # BGR -> RGB
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)
    
            row = [frame_idx]
    
            if results.pose_landmarks:
                landmarks = results.pose_landmarks.landmark
    
                for m in marker_names:
    
                    idx = landmark_map[m]
                    lm = landmarks[idx]
                    
                    if (flip_x_axis):
                        lm.x = -lm.x
                    # invert y axis
                    row += [lm.x, -lm.y, lm.z]
    
            else:
                # if no landmark - fill with 0s
                row += [0.0] * (len(marker_names) * 3)
    
            writer.writerow(row)
            frame_idx += 1
    cap.release()
    pose.close()
    print("Pose estimation complete and output file successfully created: " + csv_path)

---
### Scaling

In [None]:
def scale(static_name, video_name, markers, limb_groups, anchor_marker, anchor_vector, csv_folder, reference_limb=None, reference_length=None, plots=False):
    """Scales data over given csv files.
    <ul>
        <li>Calculates origin displacement by averaging anchor position in static pose.</li>
        <li>Calculates average lengths of limbs in static pose.</li>
        <li>Corrects origin displacement and fixes lengths in static pose and movement video by
            <ul>
                <li>applying the origin displacement vector</li>
                <li>iterating over the limbs array, <b>fixing the length by moving the second marker according to the first</b></li>
            </ul>
        </li>
    </ul>
    """

    def scale_to_distance(v0, v1, new_distance):
        d = v1 - v0
        dist = distance(v0, v1)

        if dist == 0:
            return v0

        f = new_distance / dist
        d_scaled = f * d

        return v0 + d_scaled
    

    def shift_vector_to_distance(v0, v1, new_distance):
        direction = v1 - v0
        dist = distance(v0, v1)

        if dist == 0:
            return np.array([0, 0, 0])
        
        f = new_distance / dist
        
        v_new = v0 + f * direction

        return v_new - v1
    

    def calc_scaled_lengths(reference_limb, reference_length, lengths):
        """Calculates average length of given limb over static pose. Takes factor between that value and given length and applies it to each others limbs average length."""
        avg_lengths = {}

        factor = reference_length / lengths.mean(numeric_only=True).loc[reference_limb]

        for group in limb_groups:
            for limb in group:
                avg_lengths[limb] = lengths.mean(numeric_only=True).loc[limb] * factor
        
        return avg_lengths


    def get_data(file_path):
        """Load data from given csv and return as tuple: (first_line, dataframe)."""
        file_path = file_path + ".csv"
        with open(file_path, "r") as f:
            first_line = f.readline().strip()
            df = pd.read_csv(file_path, comment="#")
        return first_line, df
    
    def get_origin_displacement(df, anchor_marker, anchor_vector):
        """Calculates vector between position of model marker and average position of the same marker in mediapipe data. Returns the vector as a numpy array"""
        avg_anchor = df.mean(numeric_only=True).loc[anchor_marker + "_X": anchor_marker + "_Z"]
        
        return np.array(avg_anchor) - anchor_vector
        
    def calc_lengths(df):
        lengths = pd.DataFrame(columns=[limb for group in limb_groups for limb in group], dtype=float)
        
        # calculate lengths for each limb in each frame
        for frame in range(len(df)):
            frame_data = df.iloc[frame]
            l_row = {limb: 0 for group in limb_groups for limb in group}
            
            for group in limb_groups:
                for limb in group:
                    first = np.array([frame_data[markers[limb[0]] + "_X"], frame_data[markers[limb[0]] + "_Y"], frame_data[markers[limb[0]] + "_Z"]])
                    second = np.array([frame_data[markers[limb[1]] + "_X"], frame_data[markers[limb[1]] + "_Y"], frame_data[markers[limb[1]] + "_Z"]])
                    
                    l_row[limb] = distance(first, second)
            lengths.loc[len(lengths)] = l_row
        return lengths


    def scale_file(file_name, first_line, df, lengths, origin_displacement, scale_to=None):
        """Actual scaling based on given input. Also plots lengths over time if plots=True, to check for mistakes after scaling.
        <br>scale_to holds the lengths to scale the limbs to. If not given, lengths will be calculated by averaging the lengths from the given dataframe."""
        scaled_path = csv_folder + file_name + "_scaled.csv"
    
        limb_count = 0

        # plot lengths before scaling
        if plots:
            limb_count = sum(len(group) for group in limb_groups)
            p = lengths.plot(y=[i for i in range(limb_count)], label=[(markers[limb[0]], markers[limb[1]]) for group in limb_groups for limb in group], title=file_name + " unscaled", xlabel="Frame", ylabel="Length in m")
            p.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
            p.plot()

        # if no lengths to scale to are give, calculate average length per limb
        if scale_to is None:
            scale_to = lengths.mean(numeric_only=True)
    
        scaled_df = df.head(0).copy()
        
        for i in range(len(df)):
            row = df.iloc[i].copy()

            # correct origin displacement
            for m in range(len(markers)):
                marker = markers[m]
                v = np.array([row[marker + "_X"], row[marker + "_Y"], row[marker + "_Z"]]) - origin_displacement
                row[marker + "_X"] = v[0]
                row[marker + "_Y"] = v[1]
                row[marker + "_Z"] = v[2]

            for group in limb_groups:
                cum_scale_v = np.array([0, 0, 0], dtype=float)
                for limb in group:
                    m0 = markers[limb[0]]
                    m1 = markers[limb[1]]
                    
                    v0 = np.array([row[m0 + "_X"], row[m0 + "_Y"], row[m0 + "_Z"]])
                    v1 = np.array([row[m1 + "_X"], row[m1 + "_Y"], row[m1 + "_Z"]]) + cum_scale_v
            
                    # new_v = scale_to_distance(v0, v1, scale_to[limb])
                    curr_scale_v = shift_vector_to_distance(v0, v1, scale_to[limb])
                    cum_scale_v += curr_scale_v
                    
                    # row[m1 + "_X"] = new_v[0]
                    # row[m1 + "_Y"] = new_v[1]
                    # row[m1 + "_Z"] = new_v[2]
                    
                    row[m1 + "_X"] = v1[0] + curr_scale_v[0]
                    row[m1 + "_Y"] = v1[1] + curr_scale_v[1]
                    row[m1 + "_Z"] = v1[2] + curr_scale_v[2]

            scaled_df.loc[len(scaled_df)] = row
    
        # plot lengths after scaling
        if plots:
            scaled_lengths = calc_lengths(scaled_df)

            p = scaled_lengths.plot(y=[i for i in range(limb_count)], label=[(markers[limb[0]], markers[limb[1]]) for group in limb_groups for limb in group], title=file_name + " scaled", xlabel="Frame", ylabel="Length in m")
            p.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
            p.plot()
        
        with open(scaled_path, mode='w', newline='') as file:
            writer = csv.writer(file)
        
            # Meta/FPS
            file.write(first_line + "\n")
        
            # Header: Frame + Marker X/Y/Z
            header = []
            for m in scaled_df.head():
                header += [m]
            writer.writerow(header)
            
            for r in range(len(scaled_df)):
                row = []
                cr = scaled_df.iloc[r]
                for val in range(len(cr)):
                    row.append(cr.iloc[val])
                writer.writerow(row)
                
        print("Scaling complete and output file successfully created: " + scaled_path)
        return scale_to

    # get data and origin displacement of static pose
    first_line, df = get_data(csv_folder + static_name)
    origin_d = get_origin_displacement(df, anchor_marker, anchor_vector)

    # calculate lengths for each frame of static pose
    lengths = calc_lengths(df)

    print("Scaling static...")
    if (not reference_limb or not reference_length):
        # no fixed length given -> calculate average lengths from static pose for scaling
        scale_to = scale_file(static_name, first_line, df, lengths, origin_d)
    else:
        # fixed length for limb given -> calculate average lengths from static pose, calculate factor for chosen limb and apply to other average lengths to get scaling lengths
        scale_to = calc_scaled_lengths(reference_limb, reference_length, lengths)
        scale_file(static_name, first_line, df, lengths, origin_d, scale_to)

    # get data of movement video
    first_line, df = get_data(csv_folder + video_name)
    
    # calculate lengths for each frame of movement video
    lengths = calc_lengths(df)

    print("Scaling video...")
    scale_file(video_name, first_line, df, lengths, origin_d, scale_to)
    
    return static_name + "_scaled", video_name + "_scaled"

---
### .csv to .trc

In [None]:
def to_trc(video_name, output_folder):
    """
    Reads .csv and writes coordinates to formatted .trc file for import to OpenSim.
    """

    print("Converting to .trc file...")
    trc_path = output_folder + video_name + ".trc"
    units = "m"
    
    csv_path = output_folder + video_name + ".csv"
    
    with open(csv_path, "r") as f:
        first_line = f.readline().strip()
    
    fps = 30
    if first_line.startswith("#FPS"):
        fps = float(first_line.split(",")[1].strip())
    
    data_rate = fps
    camera_rate = fps
    
    # load csv
    df = pd.read_csv(csv_path, comment="#")
    
    # calculate time based on fps
    if "frame" in df.columns:
        df["Time"] = df["frame"] / data_rate
        df = df.drop(columns=["frame"])
    else:
        df["Time"] = [i / data_rate for i in range(len(df))]
    
    # extract marker names
    markers = sorted(set(col.rsplit("_", 1)[0] for col in df.columns if col.endswith("_X")))
    
    # prepare trc header
    n_markers = len(markers)
    n_frames = len(df)
    header = [
        "PathFileType\t4\t(X/Y/Z)\t" + trc_path,
        "DataRate\tCameraRate\tNumFrames\tNumMarkers\tUnits\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames",
        f"{data_rate}\t{camera_rate}\t{n_frames}\t{n_markers}\t{units}\t{data_rate}\t1\t{n_frames}",
    ]
    
    # columns for trc (OpenSim expects: Frame#, Time, Marker1 X Y Z, Marker2 X Y Z, ...)
    trc_header = ["Frame#", "Time"]
    for m in markers:
        trc_header.extend([f"{m}\t\t"])
    header.append("\t".join(trc_header))
    
    coordinatesHeader = "".join(
        [f"X{i}\tY{i}\tZ{i}\t" for i in range(1, len(markers) + 1)]
    )
    
    header.append("\t\t" + coordinatesHeader + "\n\n")
    
    # prepare rows
    rows = []
    for i, row in df.iterrows():
        frame_num = i + 1
        line = [f"{frame_num}", f"{row['Time']:.5f}"]
        for m in markers:
            x, y, z = row.get(f"{m}_X", float("nan")), row.get(f"{m}_Y", float("nan")), row.get(f"{m}_Z", float("nan"))
            line.extend([f"{x:.5f}" + f"\t{y:.5f}" + f"\t{z:.5f}"])
        rows.append("\t".join(line))
    
    # write trc file
    with open(trc_path, "w") as f:
        f.write("\n".join(header))
        f.write("\n".join(rows))
    
    print(f"trc file successfully created: {trc_path}")

---
### Execute

In [None]:
# adjust these variables accordingly
static_name = "T-Pose_pullover"
video_name = "Bein"

video_folder = "../../Videos/"
output_folder = "../output/"

static_path = video_folder + static_name + ".mp4"
video_path = video_folder + video_name + ".mp4"

# turns video clockwise, options are 0, 90, 180 and 270
rotate_video_by_degrees = 0


# create MediaPipe pose instance
mp_pose = mp.solutions.pose

# list of markers, these names have to be adjusted to the names of the OpenSim model markers
# marker_names = ["r_acromion", "r_humerus_epicondyle", "r_radius_styloid"]

marker_names = [
        "NOSE",
        "LEFT_EYE_INNER",
        "LEFT_EYE",
        "LEFT_EYE_OUTER",
        "RIGHT_EYE_INNER",
        "RIGHT_EYE",
        "RIGHT_EYE_OUTER",
        "LEFT_EAR",
        "RIGHT_EAR",
        "MOUTH_LEFT",
        "MOUTH_RIGHT",
        "LEFT_SHOULDER",
        "RIGHT_SHOULDER",
        "LEFT_ELBOW",
        "RIGHT_ELBOW",
        "LEFT_WRIST",
        "RIGHT_WRIST",
        "LEFT_PINKY",
        "RIGHT_PINKY",
        "LEFT_INDEX",
        "RIGHT_INDEX",
        "LEFT_THUMB",
        "RIGHT_THUMB",
        "LEFT_HIP",
        "RIGHT_HIP",
        "LEFT_KNEE",
        "RIGHT_KNEE",
        "LEFT_ANKLE",
        "RIGHT_ANKLE",
        "LEFT_HEEL",
        "RIGHT_HEEL",
        "LEFT_FOOT_INDEX",
        "RIGHT_FOOT_INDEX"
]

# Mapping Landmark-Index
landmark_map = {
    # all 33 mediapipe markers:
    "NOSE": mp_pose.PoseLandmark.NOSE,
    "LEFT_EYE_INNER": mp_pose.PoseLandmark.LEFT_EYE_INNER,
    "LEFT_EYE": mp_pose.PoseLandmark.LEFT_EYE,
    "LEFT_EYE_OUTER": mp_pose.PoseLandmark.LEFT_EYE_OUTER,
    "RIGHT_EYE_INNER": mp_pose.PoseLandmark.RIGHT_EYE_INNER,
    "RIGHT_EYE": mp_pose.PoseLandmark.RIGHT_EYE,
    "RIGHT_EYE_OUTER": mp_pose.PoseLandmark.RIGHT_EYE_OUTER,
    "LEFT_EAR": mp_pose.PoseLandmark.LEFT_EAR,
    "RIGHT_EAR": mp_pose.PoseLandmark.RIGHT_EAR,
    "MOUTH_LEFT": mp_pose.PoseLandmark.MOUTH_LEFT,
    "MOUTH_RIGHT": mp_pose.PoseLandmark.MOUTH_RIGHT,
    "LEFT_SHOULDER": mp_pose.PoseLandmark.LEFT_SHOULDER,
    "RIGHT_SHOULDER": mp_pose.PoseLandmark.RIGHT_SHOULDER,
    "LEFT_ELBOW": mp_pose.PoseLandmark.LEFT_ELBOW,
    "RIGHT_ELBOW": mp_pose.PoseLandmark.RIGHT_ELBOW,
    "LEFT_WRIST": mp_pose.PoseLandmark.LEFT_WRIST,
    "RIGHT_WRIST": mp_pose.PoseLandmark.RIGHT_WRIST,
    "LEFT_PINKY": mp_pose.PoseLandmark.LEFT_PINKY,
    "RIGHT_PINKY": mp_pose.PoseLandmark.RIGHT_PINKY,
    "LEFT_INDEX": mp_pose.PoseLandmark.LEFT_INDEX,
    "RIGHT_INDEX": mp_pose.PoseLandmark.RIGHT_INDEX,
    "LEFT_THUMB": mp_pose.PoseLandmark.LEFT_THUMB,
    "RIGHT_THUMB": mp_pose.PoseLandmark.RIGHT_THUMB,
    "LEFT_HIP": mp_pose.PoseLandmark.LEFT_HIP,
    "RIGHT_HIP": mp_pose.PoseLandmark.RIGHT_HIP,
    "LEFT_KNEE": mp_pose.PoseLandmark.LEFT_KNEE,
    "RIGHT_KNEE": mp_pose.PoseLandmark.RIGHT_KNEE,
    "LEFT_ANKLE": mp_pose.PoseLandmark.LEFT_ANKLE,
    "RIGHT_ANKLE": mp_pose.PoseLandmark.RIGHT_ANKLE,
    "LEFT_HEEL": mp_pose.PoseLandmark.LEFT_HEEL,
    "RIGHT_HEEL": mp_pose.PoseLandmark.RIGHT_HEEL,
    "LEFT_FOOT_INDEX": mp_pose.PoseLandmark.LEFT_FOOT_INDEX,
    "RIGHT_FOOT_INDEX": mp_pose.PoseLandmark.RIGHT_FOOT_INDEX,
    
    # for arm model:
    # "r_acromion" : mp_pose.PoseLandmark.RIGHT_SHOULDER,
    # "r_humerus_epicondyle" : mp_pose.PoseLandmark.RIGHT_ELBOW,
    # "r_radius_styloid" : mp_pose.PoseLandmark.RIGHT_WRIST
}


In [None]:
# in the first run, play the video to ensure its not rotated and correct the rotate_video_by_degrees variable if it is
# play_without_markers(static_path, rotate_video_by_degrees, mp_pose)
play_without_markers(video_path, rotate_video_by_degrees, mp_pose)

In [None]:
play_with_markers(static_path, rotate_video_by_degrees, mp_pose)
play_with_markers(video_path, rotate_video_by_degrees, mp_pose)

In [None]:
flip_x_axis = True

pose_estimation(static_name, marker_names, landmark_map, video_folder, output_folder, rotate_video_by_degrees, flip_x_axis, mp_pose)
pose_estimation(video_name, marker_names, landmark_map, video_folder, output_folder, rotate_video_by_degrees, flip_x_axis, mp_pose)

In [None]:
# to look at marker positions for limbs array
for i in range(len(marker_names)):
    print(i, marker_names[i])

In [None]:
limb_groups = [
    [(10, 9)], # mouth
    [(0, 4), (4, 5), (5, 6), (6, 8)], # nose-(right) eye-ear
    [(0, 1), (1, 2), (2, 3), (3, 7)], # nose-(left) eye-ear
    [(12, 14), (14, 16)], # right arm
    [(11, 13), (13, 15)], # left arm
    [(14, 16), (26, 28)], # right leg
    [(23, 25), (25, 27)] # left leg
]         

anchor_marker = "RIGHT_SHOULDER"

# anchor_marker = "r_acromion"
anchor_vector = np.array([0,0.837,0.17])

In [None]:
reference_limb_for_scaling = None
reference_length_for_scaling = None

reference_limb_for_scaling = (0, 1)
reference_length_for_scaling = 0.25

static_scaled, video_scaled = scale(static_name, video_name, marker_names, limb_groups, anchor_marker, anchor_vector, output_folder, reference_limb_for_scaling, reference_length_for_scaling, plots=True)

In [None]:
to_trc(static_name, output_folder)
to_trc(video_name, output_folder)

# to_trc(static_scaled, output_folder)
# to_trc(video_scaled, output_folder)

---