# Pose Estimation

In [None]:
import os, cv2
import mediapipe as mp
import pandas as pd

def extract_keypoints(input_video_path, output_video_path, source, save_with_keypoints=False):
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(max_num_hands=1)
    mp_drawing = mp.solutions.drawing_utils

    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print("Error: Video file couldn't be opened.")
        return None

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out_file = cv2.VideoWriter(output_video_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (int(cap.get(3)), int(cap.get(4))))

    all_keypoints = []
    frame_number = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = hands.process(frame)

        # If a hand landmark is found, extract the required landmarks
        if results.multi_hand_landmarks:
            hand_landmarks = results.multi_hand_landmarks[0]

            keypoints = {'Frame': frame_number, 'Source': source, 'Video': video_path.split('/')[-1]}

            # Thumb tip
            thumb_tip = hand_landmarks.landmark[4]
            keypoints['x_thumb_tip'] = thumb_tip.x
            keypoints['y_thumb_tip'] = thumb_tip.y
            keypoints['z_thumb_tip'] = thumb_tip.z

            # Index finger tip
            index_finger_tip = hand_landmarks.landmark[8]
            keypoints['x_index_tip'] = index_finger_tip.x
            keypoints['y_index_tip'] = index_finger_tip.y
            keypoints['z_index_tip'] = index_finger_tip.z

            all_keypoints.append(keypoints)

            # If set to save with keypoints, annotate the frame
            if save_with_keypoints:
                mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

        out_file.write(frame)

        frame_number += 1

    cap.release()
    out_file.release()

    # Convert list of keypoints to a pandas DataFrame
    df = pd.DataFrame(all_keypoints)

    return df

input_path = "/Users/eduardo/code/data/PDMotorDB/lr" 
output_path = input_path + '_keypoints'
source = input_path.split('\\')[-1].removeprefix('lr_').upper() 

if not os.path.exists(output_path):
    os.makedirs(output_path)




save_with_keypoints = True
selected_hand = 'R'              # 'L' / 'R' 




df_all = pd.DataFrame()
df_exclude = pd.DataFrame()

for file in sorted(os.listdir(input_path)):
    print(file)
    
    if selected_hand in file:
        output_file = file.split('.')[0] + '_keypoints.avi'

        print(f'Input file: {file}, Output file: {output_file}')

        video_path = os.path.join(input_path, file)
        output_video_path = os.path.join(output_path, output_file)

        df = extract_keypoints(video_path, output_video_path, source,
                                save_with_keypoints=save_with_keypoints)

        df_all = pd.concat([df_all, df])
        print(df_all.shape)
    
df_all.to_csv(os.path.join(output_path, f'data_keypoints_{selected_hand}.csv'), index=False)


# Data Augmentation

In [None]:
import os, cv2

def rotate_frame(frame, angle):
    if angle == 90:
        rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
    elif angle == 180:
        rotated_frame = cv2.rotate(frame, cv2.ROTATE_180)
    elif angle == 270:
        rotated_frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
    else:
        rotated_frame = frame  # No rotation
    return rotated_frame

def mirror_frame(frame):
    mirrored_frame = cv2.flip(frame, 1)
    return mirrored_frame

def augment_video(input_video_path, output_path):
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print("Error: Video file couldn't be opened.")
        return None

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    output_filename = input_video_path.split('.')[0].split('\\')[-1]
    output_file = os.path.join(output_path, output_filename)
    # print(f'Output file: {output_file}')

    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    writers = {}
    for angle in [90, 180, 270, '90m', '180m', '270m', '0m']:
        if angle in [90, 270, '90m', '270m']:
            size = (frame_height, frame_width)
        else:
            size = (frame_width, frame_height)
        writers[angle] = cv2.VideoWriter(output_file  + f'_{angle}.avi', fourcc, cap.get(cv2.CAP_PROP_FPS), size)


    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        for angle in [90, 180, 270]:
            rotated = rotate_frame(frame, angle)
            writers[angle].write(rotated)

            # Mirror the rotated frames
            mirrored_rotated = mirror_frame(rotated)
            writers[str(angle) + 'm'].write(mirrored_rotated)

        mirrored = mirror_frame(frame)
        writers['0m'].write(mirrored)

    cap.release()
    for writer in writers.values():
        writer.release()

input_path = r"D:\data\PDMotorDB\lr"
output_path = input_path + '_augmented'

if not os.path.exists(output_path):
    os.makedirs(output_path)

for file in sorted(os.listdir(input_path)):
    print(f'Input file: {file}')

    input_video_path = os.path.join(input_path, file)

    augment_video(input_video_path, output_path)


# Preprocessing videos with CLAHE

In [None]:
import os, cv2
import numpy as np

def preprocess_clahe(frame):
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))  # CLAHE object

    yuv_img = cv2.cvtColor(frame, cv2.COLOR_BGR2YUV)
    yuv_img[:,:,0] = clahe.apply(yuv_img[:,:,0])
    clahe_frame = cv2.cvtColor(yuv_img, cv2.COLOR_YUV2BGR)

    return clahe_frame

def preprocess_video(input_video_path, output_video_path, type):
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print("Error: Video file couldn't be opened.")
        return None

    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out_file = cv2.VideoWriter(output_video_path, fourcc, cap.get(cv2.CAP_PROP_FPS), (int(cap.get(3)), int(cap.get(4))))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        preprocessed_frame = preprocess_clahe(frame)

        out_file.write(preprocessed_frame)

    cap.release()
    out_file.release()

type = 'clahe'
input_path = r"D:\data\PDMotorDB\lr"
output_path = input_path + f'_{type}'

if not os.path.exists(output_path):
    os.makedirs(output_path)

for file in sorted(os.listdir(input_path)):
    output_file = file.split('.')[0] + f'_{type}.avi'
    print(f'Input file: {file}, Output file: {output_file}')

    input_video_path = os.path.join(input_path, file)
    output_video_path = os.path.join(output_path, output_file)

    preprocess_video(input_video_path, output_video_path, type)
