In [1]:
import cv2
import mediapipe as mp
import pandas as pd
import re

mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands
mp_holistic = mp.solutions.holistic

df = pd.DataFrame()

In [2]:
# Regular expression pattern to extract coordinates
pattern = r'x: ([-+]?[0-9]*\.?[0-9]+)\ny: ([-+]?[0-9]*\.?[0-9]+)\nz: ([-+]?[0-9]*\.?[0-9]+)'

def get_coordinates(input_string):
    # Extract coordinates using regex
    matches = re.findall(pattern, input_string)

    # Convert the extracted coordinates to a list of dictionaries
    coordinates = [{'x': float(match[0]), 'y': float(match[1]), 'z': float(match[2])} for match in matches]
    return coordinates

In [3]:
import os
path = os.getcwd()

In [4]:
def get_video_speed_factor(input_path, target_duration=1.0):
    # Open the input video
    cap = cv2.VideoCapture(input_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the current duration of the video
    video_duration = frame_count / fps
    print("video_duration: ", video_duration)

    # Calculate the speed factor needed to make the video last approximately 1 second
    speed_factor = video_duration / target_duration

    # Release video capture
    cap.release()

    return speed_factor

In [5]:
def speed_up_video(input_path, output_path, speed_factor, target_width, target_height):
    # Open the input video
    cap = cv2.VideoCapture(input_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the number of frames required in the output video
    target_frame_count = int(frame_count / speed_factor)

    # Define codec and create VideoWriter object for the output video
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (target_width, target_height))  # Change resolution as needed

    current_frame = 0
    while current_frame < frame_count:
        ret, frame = cap.read()

        if not ret:
            break

        # Write the frame to the output video
        out.write(frame)

        # Calculate the number of frames to skip or duplicate
        frame_interval = frame_count / target_frame_count

        # Handle the case when the frame_interval is less than 1
        if frame_interval < 1:
            # Duplicate frames to reach the desired frame count
            duplicate_frames = int(1 / frame_interval)
            for _ in range(duplicate_frames - 1):
                out.write(frame)

        current_frame += frame_interval

    # Release video capture and writer
    cap.release()
    out.release()

    print("Video speed-up completed!")

In [6]:

def resize_video(input_path, output_path, target_width, target_height):
    # Open the input video
    cap = cv2.VideoCapture(input_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Define codec and create VideoWriter object for the output video
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (target_width, target_height))

    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        # Resize the frame to the target width and height
        resized_frame = cv2.resize(frame, (target_width, target_height))

        # Write the resized frame to the output video
        out.write(resized_frame)

    # Release video capture and writer
    cap.release()
    out.release()

    print("Video resizing completed!")

In [7]:
def change_fps(input_path, output_path, target_fps):
    # Open the input video
    cap = cv2.VideoCapture(input_path)

    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Get video dimensions
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define codec and create VideoWriter object for the output video
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, target_fps, (frame_width, frame_height))

    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break

        # Write the frame to the output video
        out.write(frame)

    # Release video capture and writer
    cap.release()
    out.release()

    print("Video fps changed successfully!")

In [8]:
def extract_video(video, target, sequence_id):
    global df
    # For webcam input:
    cap = cv2.VideoCapture(video)
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Calculate the current duration of the video
    video_duration = frame_count / fps

    print("FPS: ", fps, " VIDEO DURATION: ", video_duration)

    
    results = None
    with mp_holistic.Holistic(
        model_complexity=0,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as hands:

        while cap.isOpened():
            success, image = cap.read()
            if not success:
                print("Ignoring empty camera frame. Or video finished. ")
                # If loading a video, use 'break' instead of 'continue'.
                break

            results = hands.process(image)

            # # Draw the hand annotations on the image.
            image.flags.writeable = True
            # image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            try:
                print(len(get_coordinates(str(results.right_hand_landmarks.landmark))), len(get_coordinates(str(results.left_hand_landmarks.landmark))))
            except:
                pass

            new_row = {}
            if (results.right_hand_landmarks):
                array = get_coordinates(str(results.right_hand_landmarks.landmark))
                for i in range(len(array)):
                    new_row[f'x_right_hand_{i}'] = array[i]["x"]
                    new_row[f'y_right_hand_{i}'] = array[i]["y"]
                    new_row["target"] = target
                    new_row["sequence_id"] = sequence_id

                mp_drawing.draw_landmarks(
                    image,
                    results.right_hand_landmarks,
                    mp_hands.HAND_CONNECTIONS,
                    mp_drawing_styles.get_default_hand_landmarks_style(),
                    mp_drawing_styles.get_default_hand_connections_style())
            
            if (results.left_hand_landmarks):
                array = get_coordinates(str(results.left_hand_landmarks.landmark))
                for i in range(len(array)):
                    new_row[f'x_left_hand_{i}'] = array[i]["x"]
                    new_row[f'y_left_hand_{i}'] = array[i]["y"]
                    new_row["target"] = target
                    new_row["sequence_id"] = sequence_id

                mp_drawing.draw_landmarks(
                    image,
                    results.left_hand_landmarks,
                    mp_hands.HAND_CONNECTIONS,
                    mp_drawing_styles.get_default_hand_landmarks_style(),
                    mp_drawing_styles.get_default_hand_connections_style())
            
            if (new_row):
                df = df.append(new_row, ignore_index=True)    
            # Flip the image horizontally for a selfie-view display.
            cv2.imshow('MediaPipe Hands', cv2.flip(image, 1))
            if cv2.waitKey(5) & 0xFF == 27:
              break

    cap.release()
    cv2.destroyAllWindows()

In [9]:
import glob, os
import time
os.chdir("D:\Documents\Trabajo\megaproyecto_model\data/asl_videos")

sequence_id = 0
for file in glob.glob("*.mp4"):
    sequence_id += 1
    name = " ".join(file.split(" ")[:-1])

    input_path = f'D:\Documents\Trabajo\megaproyecto_model\data/asl_videos/{file}'


    output_path_r = 'D:\Documents\Trabajo\megaproyecto_model\data/asl_preprocessing/resized_output.mp4'
    output_path_f = 'D:\Documents\Trabajo\megaproyecto_model\data/asl_preprocessing/fps_output.mp4'
    output_path_s = 'D:\Documents\Trabajo\megaproyecto_model\data/asl_preprocessing/speed_output.mp4'

    target_width = 640
    target_height = 360
    desired_fps = 30

    resize_video(input_path, output_path_r, target_width, target_height)
    change_fps(output_path_r, output_path_f, desired_fps)
    speed_factor = get_video_speed_factor(output_path_f)
    print("speed_factor: ", speed_factor)
    speed_up_video(output_path_f, output_path_s, speed_factor, target_width, target_height)

    extract_video(output_path_s, name, sequence_id)

Video resizing completed!
Video fps changed successfully!
video_duration:  1.2333333333333334
speed_factor:  1.2333333333333334
Video speed-up completed!
FPS:  30  VIDEO DURATION:  1.0
Ignoring empty camera frame. Or video finished. 
Video resizing completed!
Video fps changed successfully!
video_duration:  2.0
speed_factor:  2.0
Video speed-up completed!
FPS:  30  VIDEO DURATION:  1.0
Ignoring empty camera frame. Or video finished. 
Video resizing completed!
Video fps changed successfully!
video_duration:  2.4
speed_factor:  2.4
Video speed-up completed!
FPS:  30  VIDEO DURATION:  1.0
Ignoring empty camera frame. Or video finished. 
Video resizing completed!
Video fps changed successfully!
video_duration:  1.5
speed_factor:  1.5
Video speed-up completed!
FPS:  30  VIDEO DURATION:  1.0
Ignoring empty camera frame. Or video finished. 
Video resizing completed!
Video fps changed successfully!
video_duration:  1.2666666666666666
speed_factor:  1.2666666666666666
Video speed-up completed!


In [10]:
df['sequence_id'] = df['sequence_id'].astype(int)

In [11]:
df.head()

Unnamed: 0,sequence_id,target,x_right_hand_0,x_right_hand_1,x_right_hand_10,x_right_hand_11,x_right_hand_12,x_right_hand_13,x_right_hand_14,x_right_hand_15,...,y_left_hand_19,y_left_hand_2,y_left_hand_20,y_left_hand_3,y_left_hand_4,y_left_hand_5,y_left_hand_6,y_left_hand_7,y_left_hand_8,y_left_hand_9
0,1,bathroom,0.264414,0.291182,0.288854,0.293428,0.294785,0.266834,0.274996,0.279632,...,,,,,,,,,,
1,1,bathroom,0.25975,0.284009,0.269205,0.276955,0.280713,0.2517,0.257347,0.266064,...,,,,,,,,,,
2,1,bathroom,0.258708,0.278215,0.25351,0.263075,0.269543,0.239567,0.24277,0.253262,...,,,,,,,,,,
3,1,bathroom,0.26215,0.274272,0.24029,0.254193,0.263576,0.230973,0.232951,0.247039,...,,,,,,,,,,
4,1,bathroom,0.268369,0.273187,0.231391,0.248651,0.260962,0.226332,0.227692,0.245479,...,,,,,,,,,,


In [12]:
dir_data = "D:\Documents\Trabajo\megaproyecto_model" + "/data.csv"
df.to_csv(dir_data, index=False)

In [13]:
validation_path = "D:\Documents\Trabajo\megaproyecto_model" + "/validation.csv"

import random
grouped = df.groupby('target')
selected_validation_ids = []
for group_name, group_indices in grouped.groups.items():
    ids = list(set(df.loc[group_indices]['sequence_id'].values))
    if (len(ids) < 0):
        raise ("Error")
    selected_validation_ids.append(random.choice(ids))

validation_df = df[df['sequence_id'].isin(selected_validation_ids)][["sequence_id", "target"]]
validation_df.to_csv(validation_path, index=False)

In [14]:
train_path = "D:\Documents\Trabajo\megaproyecto_model" + "/train.csv"
train_df = df[~df['sequence_id'].isin(selected_validation_ids)][["sequence_id", "target"]]
train_df.to_csv(train_path, index=False)