In [12]:
import cv2
import os
import pandas as pd
import time

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

In [13]:
### SETUP
NUM_FRAMES_EXTRACTED_PER_VIDEO = 15
NUM_CLASSES = 'two-classes'
raw_top_dir_path = f'/Users/diego/projects/itesoS3/machine-learning/data/raw/{NUM_CLASSES}'
ph1_frames_path = f'/Users/diego/projects/itesoS3/machine-learning/data/processed/ph1/{NUM_CLASSES}/frames'
ph1_labels_path = f'/Users/diego/projects/itesoS3/machine-learning/data/processed/ph1/{NUM_CLASSES}/ph1_data.csv'

# MediaPipe setup
mediapipe_hand_landmarker_path='/Users/diego/projects/itesoS3/machine-learning/models/hand_landmarker.task'
base_options = python.BaseOptions(model_asset_path=mediapipe_hand_landmarker_path)
options = vision.HandLandmarkerOptions(base_options=base_options, num_hands=2)
detector = vision.HandLandmarker.create_from_options(options)

# the ids list contains the following:
    #           0                   1               2               3               4               5       
    #       [person_id,         cycle_id,       handedness_id,  sign_id,        sign_label,     frame_id]

I0000 00:00:1731546598.912803 1013469 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88.1), renderer: Apple M3 Pro
W0000 00:00:1731546598.922633 1106769 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1731546598.928422 1106768 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [14]:
# A function to get the sign class from the current naming system for the files
def get_sign_id(handedness_id, video_id):
    if handedness_id == 0:
        if len(video_id[18:-4]) == 1:
            sign_id = video_id[18] + "-"
        else:
            sign_id = video_id[18:20]
    else:
        if video_id[13] == '_':
            if len(video_id[14:-4]) == 1:
                sign_id = video_id[14] + "-"
            else:
                sign_id = video_id[14:16]
        else:
            if len(video_id[20:-4]) == 1:
                sign_id = video_id[20] + "-"
            else:
                sign_id = video_id[20:22]
    return sign_id

In [15]:
# This is the name the image will take. Since all the images will be in the same flat directory, they need to be unique.
def ids_to_filepath(ids):
    # the ids list contains the following:
    #           0                   1               2               3               4               5       
    #       [person_id,         cycle_id,       handedness_id,  sign_id,        sign_label,     frame_id]

    return ph1_frames_path+'/'+ids[3]+"-"+ids[0]+"-c"+ids[1]+"-h"+str(ids[2])+"-f"+ids[5]+".jpeg"

In [16]:
# breaks down the directory that classifies the videos to process them one by one
def listing(action, datatable):
    # the ids list contains the following:
    #           0                   1               2               3               4               5       
    #       [person_id,         cycle_id,       handedness_id,  sign_id,        sign_label,     frame_id]
    ids =   [-1,                -1,             -1,             -1,             -1,             -1]

    people=os.listdir(raw_top_dir_path)
    for person in people:
        #print(person)
        if person[0] == '.':
            continue
        else:
            #person_id = person[1:]
            ids[0] = person[1:]
            cycles = os.listdir(raw_top_dir_path+'/'+person)
            for cycle in cycles:
                #print(cycle)
                if cycle[0] == '.':
                    continue
                else:
                    #cycle_id = cycle[6]
                    ids[1] = cycle[6]
                    #handedness_id = 0 if cycle[10] == 'D' else 1
                    ids[2] = 0 if cycle[10] == 'D' else 1
                    videos = os.listdir(raw_top_dir_path+'/'+person+'/'+cycle)
                    for video in videos:
                        #print(video)
                        if video[0] == '.':
                            continue
                        else:
                            #sign_id = get_sign_id(handedness_id, video)
                            ids[3] = get_sign_id(ids[2], video)
                            #sign_label = sign_id[0] if sign_id[1] == '-' else sign_id
                            ids[4] = ids[3][0] if ids[3][1] == '-' else ids[3]

                            full_path = raw_top_dir_path+'/'+person+'/'+cycle+'/'+video
                            #action(full_path, person_id, cycle_id, handedness_id, sign_id, sign_label, datatable)
                            action(full_path, ids, datatable)
                        ids[3] = -1
                        ids[4] = -1
                ids[1] = -1
                ids[2] = -1
        ids[0] = -1

In [17]:
# index determination
def index_determination(detection_result, handedness_id):
    global checkin, same_handedness_count
    if len(detection_result.handedness) < 2:
        index = 0
    else:
        first = detection_result.handedness[0][0]
        second = detection_result.handedness[1][0]
        
        if first.display_name == second.display_name:
            same_handedness_count += 1
            print("same handedness")
            print(detection_result.handedness, handedness_id)
            index = 0 if first.score > second.score else 1
        else:
            first_id = 0 if first.display_name == "Right" else 1
            index = 0 if handedness_id == first_id else 1
    
    return index

In [18]:
# writing the results to the table
def write_to_table(hand_landmarks, ids, datatable):
    # the ids list contains the following:
    #           0                   1               2               3               4               5       
    #       [person_id,         cycle_id,       handedness_id,  sign_id,        sign_label,     frame_id]
    
    labeling_columns = [ids[4], ids[0], ids[1], ids[2], ids[5]]
    landmark_columns = [0 for _ in range(21*3)]

    for i in range(21):
        landmark_columns[3*i] = hand_landmarks[i].x
        landmark_columns[3*i+1] = hand_landmarks[i].y
        landmark_columns[3*i+2] = hand_landmarks[i].z

    datatable.append(labeling_columns+landmark_columns)

In [19]:
# Actual data transforming for each frame of a video to an entry in the table
def transform(full_path, ids, datatable):
    global probs, succs, skipped, checkin
    vidcap = cv2.VideoCapture(full_path)
    length = int(vidcap. get(cv2.CAP_PROP_FRAME_COUNT))
    success,image = vidcap.read()
    count = 0
    
    # We go through the video frame by frame
    while success:
        # Since there are a lot of frames at the beginning of each video where no sign is being done, I only run the operations for the last 10
        # Right now I am reading every frame of every video, even if I don't use it. I should probably figure out a way to jump right into frame -10 
        if count >= length - NUM_FRAMES_EXTRACTED_PER_VIDEO:
            checkin = 0
            try:
                # We define the frame id to label the image
                #frame_id = '0'+str(count) if count<10 else str(count)
                ids[5] = '0'+str(count) if count<10 else str(count)
                checkin = 1
                # This is the name the image will take. Since all the images will be in the same flat directory, they need to be unique.
                file_path = ids_to_filepath(ids)
                checkin = 2
                
                # Write image in temporary directory on instance volume
                cv2.imwrite(file_path, image)
                checkin = 3
                        
                # MediaPipe landmarks acquistion
                image_mp = cv2.cvtColor(cv2.imread(file_path,), cv2.COLOR_BGR2RGB)
                image_mp = mp.Image(image_format=mp.ImageFormat.SRGB, data=image_mp)
                checkin = 4

                detection_result = detector.detect(image_mp)
                checkin = 5

                # figuring out which hand is the one we want to get the information for
                result_index = index_determination(detection_result, ids[2])
                checkin = 6

                hand_landmarks = detection_result.hand_landmarks[result_index]
                checkin = 7

                # Remove image from temporary directory on instance volume 
                os.remove(file_path)
                checkin = 8

                # Write the corresponding labels
                write_to_table(hand_landmarks, ids, datatable)
                checkin = 9

                succs += 1

            except:
                print(checkin)
                os.remove(file_path)
                print("\n \n \n Problem ", ids, count)
                probs.append(ids, count)
        else:
            skipped += 1
        # Move on to the next frame
        success,image = vidcap.read()
        count += 1


In [20]:
# A function that does the cleanup
def table_cleanup(datatable):
    # We transform the labels array into a pandas DataFrame
    label_col_names = [ 'class', 'person_id', 'cycle_id', 'handedness', 'frame_id']
    landmark_col_names = []
    for i in range(21):
        landmark_col_names.append(str(i)+"x")
        landmark_col_names.append(str(i)+"y")
        landmark_col_names.append(str(i)+"z")
    labels_df = pd.DataFrame(datatable, columns=label_col_names+landmark_col_names)

    # We clean up the dataframe
    labels_df = labels_df[labels_df['class'] != 'mp']
    labels_df = labels_df[labels_df['class'] != '0']

    # We need the labels to be of numeric type for torch, so we add a new column with that information
    current_classes = list(labels_df['class'].unique())
    numeric_dict = {}

    for i in range(len(current_classes)):
        numeric_dict[current_classes[i]] = i

    labels_df['class_numeric'] = labels_df['class'].transform(lambda x : numeric_dict[x])


    # We write the dataframe into a csv file in the folder corresponding to processed data
    labels_df.to_csv(ph1_labels_path, index=False)

In [21]:
def data_prep():
    global probs, succs, skipped, same_handedness_count
    # to get execution time
    start = time.time()

    # we create a list where we will add some information each time there is a problem 
    probs = []
    # to keep track of the number of frames we think we processed correctly
    succs = 0
    # to keep track of how many frames we are reading and skipping because it is not one of the last 15
    skipped = 0
    # this is the datable that will later contain all the labelling information
    datatable = []
    # sometimes mediapipe incorrectly detects both hands as being of the same handedness. to make sure this doesn't happen too often, we keep track of it
    same_handedness_count = 0



    ###################################### ACTUAL WORK ######################################
    listing(transform, datatable)

    table_cleanup(datatable)
    ###################################### ACTUAL WORK ######################################



    ### FINALIZING
    # to get execution time
    end = time.time()
    print("Execution time: ", (end - start)/60)
    # to have a list of frames with which there were problems
    print(f"succs: {succs}; probs: {len(probs)}; skipped: {skipped}; same_handedness_count: {same_handedness_count}")
 

In [22]:
data_prep()



same handedness
[[Category(index=0, score=0.9671075344085693, display_name='Right', category_name='Right')], [Category(index=0, score=0.8107408285140991, display_name='Right', category_name='Right')]] 1
same handedness
[[Category(index=0, score=0.9441626667976379, display_name='Right', category_name='Right')], [Category(index=0, score=0.6380443572998047, display_name='Right', category_name='Right')]] 1
same handedness
[[Category(index=1, score=0.9103757739067078, display_name='Left', category_name='Left')], [Category(index=1, score=0.9562279582023621, display_name='Left', category_name='Left')]] 1
same handedness
[[Category(index=1, score=0.6612324714660645, display_name='Left', category_name='Left')], [Category(index=1, score=0.9608014822006226, display_name='Left', category_name='Left')]] 1
same handedness
[[Category(index=1, score=0.9063060283660889, display_name='Left', category_name='Left')], [Category(index=1, score=0.9719316959381104, display_name='Left', category_name='Left')]]