In [6]:
import os
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np

In [7]:
dataset_path = "../dataset"
output_path = "../output"
if not os.path.exists(output_path):
    os.makedirs(output_path)

In [8]:
def normalize_landmarks(landmarks: list) -> list:
    """Normalize a list of pose landmarks.

    This function brings the center (mean of outer most x, y and z) to (0, 0, 0)
    and scales so that the maximum distance from the center is 0.5.
    Also remove the z coordinate.

    Args:
        landmarks (list): A flat list of landmark values [x1, y1, z1, v1, x2, y2, z2, v2, ..., xN, yN, zN, vN].

    Returns:
        list: The normalized landmark list in the same format.

    """
    landmarks = np.array(landmarks).reshape(-1, 4)
    max_x = np.max(landmarks[:, 0])
    min_x = np.min(landmarks[:, 0])
    max_y = np.max(landmarks[:, 1])
    min_y = np.min(landmarks[:, 1])

    # Get center
    center_x = (max_x + min_x) / 2
    center_y = (max_y + min_y) / 2
    
    # Bring center to (0, 0, 0)
    landmarks[:, 0] -= center_x
    landmarks[:, 1] -= center_y

    # Get max distance from center
    max_distance = np.max(np.sqrt(landmarks[:, 0]**2 + landmarks[:, 1]**2 + landmarks[:, 2]**2))

    # Scale to 0.5
    scale = 0.5 / max_distance
    landmarks[:, 0] *= scale
    landmarks[:, 1] *= scale

    # Remove z coordinate
    landmarks = landmarks[:, :2]

    # Flatten the array and convert to list
    landmarks = landmarks.flatten().tolist()
    return landmarks


In [11]:
        
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)

# Iterate through each folder in the dataset path
for folder_name in os.listdir(dataset_path):
    # Create full path to the folder
    folder_path = os.path.join(dataset_path, folder_name)
    if os.path.isdir(folder_path):
        # Create output CSV path for the folder
        output_csv = os.path.join(output_path, f"{folder_name}_pose_landmarks.csv")

        # List to store data
        data = []

        # Process each video in the folder
        for video_name in os.listdir(folder_path):
            if video_name.endswith(".mp4") or video_name.endswith(".mov"):
                video_path = os.path.join(folder_path, video_name)
                cap = cv2.VideoCapture(video_path)

                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break

                    # Convert the frame to RGB
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                    # Detect pose landmarks
                    results = pose.process(frame_rgb)
                    if results.pose_landmarks:
                        # Extract landmarks
                        landmarks = []
                        for landmark in results.pose_landmarks.landmark:
                            landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
                        landmarks = normalize_landmarks(landmarks)
                        # Append to data list
                        data.append([video_name] + landmarks + [folder_name])

                cap.release()

        # Create a DataFrame
        columns = ["video"] + [f"{landmark}_{axis}" for landmark in range(33) for axis in ["x", "y"]] + ["label"]
        df = pd.DataFrame(data, columns=columns)

        # Save to CSV
        df.to_csv(output_csv, index=False)

        print(f"Pose landmarks for folder '{folder_name}' have been saved to {output_csv}")

I0000 00:00:1747104087.684737 1503017 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1747104087.710860 4050839 gl_context.cc:369] GL version: 3.0 (OpenGL ES 3.0 Mesa 24.2.8-1ubuntu1~24.04.1), renderer: D3D12 (Intel(R) UHD Graphics 770)
W0000 00:00:1747104087.756564 4050814 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1747104087.788393 4050818 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


Pose landmarks for folder 'hoeing' have been saved to ../output/hoeing_pose_landmarks.csv
Pose landmarks for folder 'fertilizing' have been saved to ../output/fertilizing_pose_landmarks.csv
Pose landmarks for folder 'watering' have been saved to ../output/watering_pose_landmarks.csv
Pose landmarks for folder 'planting' have been saved to ../output/planting_pose_landmarks.csv
Pose landmarks for folder 'weeding' have been saved to ../output/weeding_pose_landmarks.csv
Pose landmarks for folder 'prunning' have been saved to ../output/prunning_pose_landmarks.csv
Pose landmarks for folder 'checking' have been saved to ../output/checking_pose_landmarks.csv


In [14]:
def rotate_landmarks(landmarks, angle_rad, axis='z'):
    rotated = []
    cos_theta = np.cos(angle_rad)
    sin_theta = np.sin(angle_rad)
    if axis == 'x':
        for i in range(0, len(landmarks), 4):
            x, y, z, v = landmarks[i], landmarks[i+1], landmarks[i+2], landmarks[i+3]
            y_rot = y * cos_theta - z * sin_theta
            z_rot = y * sin_theta + z * cos_theta
            rotated.extend([x, y_rot, z_rot, v])
    elif axis == 'y':
        for i in range(0, len(landmarks), 4):
            x, y, z, v = landmarks[i], landmarks[i+1], landmarks[i+2], landmarks[i+3]
            x_rot = x * cos_theta + z * sin_theta
            z_rot = -x * sin_theta + z * cos_theta
            rotated.extend([x_rot, y, z_rot, v])
    else:  # "z"
        for i in range(0, len(landmarks), 2):
            x, y = landmarks[i], landmarks[i+1]
            x_c, y_c = x - 0.5, y - 0.5
            x_rot = x_c * cos_theta - y_c * sin_theta
            y_rot = x_c * sin_theta + y_c * cos_theta
            rotated.extend([x_rot + 0.5, y_rot + 0.5])
    return rotated

def augment_csv_with_rotation(
    input_csv,
    output_csv,
    x_angles_deg=[deg for deg in range(-90, 91, 45)],
    y_angles_deg=[deg for deg in range(-90, 91, 45)],
    z_angles_deg=[deg for deg in range(-45, 46, 15)],
):
    df = pd.read_csv(input_csv)
    augmented_rows = []
    for idx, row in df.iterrows():
        orig_landmarks = row[1:-1].values.astype(float)
        # for angle in x_angles_deg:
        #     rotated = rotate_landmarks(orig_landmarks, np.deg2rad(angle), axis='x')
        #     new_row = [row['video']] + rotated + [row['label']]
        #     augmented_rows.append(new_row)
        # for angle in y_angles_deg:
        #     rotated = rotate_landmarks(orig_landmarks, np.deg2rad(angle), axis='y')
        #     new_row = [row['video']] + rotated + [row['label']]
        #     augmented_rows.append(new_row)
        for angle in z_angles_deg:
            rotated = rotate_landmarks(orig_landmarks, np.deg2rad(angle), axis='z')
            new_row = [row['video']] + rotated + [row['label']]
            augmented_rows.append(new_row)
    columns = df.columns.tolist()
    df_aug = pd.DataFrame(augmented_rows, columns=columns)
    df_all = pd.concat([df, df_aug], ignore_index=True)
    df_all.to_csv(output_csv, index=False)
    print(f"Augmented data saved to {output_csv}")

# Augment all CSV files in the output_path
for fname in os.listdir(output_path):
    if fname.endswith('.csv') and not fname.endswith('_augmented.csv'):
        input_csv = os.path.join(output_path, fname)
        output_csv = os.path.join(output_path, fname.replace('.csv', '_augmented.csv'))
        augment_csv_with_rotation(input_csv, output_csv)

Augmented data saved to ../output/fertilizing_pose_landmarks_augmented.csv
Augmented data saved to ../output/hoeing_pose_landmarks_augmented.csv
Augmented data saved to ../output/checking_pose_landmarks_augmented.csv
Augmented data saved to ../output/weeding_pose_landmarks_augmented.csv
Augmented data saved to ../output/planting_pose_landmarks_augmented.csv
Augmented data saved to ../output/prunning_pose_landmarks_augmented.csv
Augmented data saved to ../output/watering_pose_landmarks_augmented.csv
