In [None]:
import json
import os
import mediapipe as mp
import numpy as np
import cv2 as cv
import random
import math
import copy

In [None]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
path = os.path.abspath('')
vid_src_path = os.path.abspath("DataSet//Videos//ori")

In [None]:

# contains the main dataset.
with open("MSASL_train.json", 'r') as file:
    j_data = json.load(file)

# contains the each label's(key) indexes(index of each module in main dataset) as list(value).
with open("labelsNindex.json",'r') as file:
    labelsNindex = json.load(file)

OPTIONAL...

In [None]:
# contains the video's name with its human square crop cordinates compatible with cv2 as frame[y1, y2:x1, x2].
with open("videoNcrop.json",'r') as file:
    videoNcrop = json.load(file)
    
# contains the label's count(value) of each label_name(key).
with open("labelNcounts.json",'r') as file:
    labelsNcount = json.load(file)

# contains the top 10 most counts contained labels and its count(no. of modules).
with open("top10labelNcount.json",'r') as file:
    top10labelNcount = json.load(file)

In [None]:
# detects and returnsthe landmarks of humans as results.
def mediapipe_detection(image,model):
    # cv.flip(image,1)
    image = cv.cvtColor(image,cv.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv.cvtColor(image, cv.COLOR_BGR2RGB)
    return image, results

In [None]:
# draws the landmarks of face, pose, hands.
def draw_landmarks(image, results):
    
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp.solutions.face_mesh_connections.FACEMESH_FACE_OVAL,
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                              mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1))
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                              mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2))
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(80,22,76), thickness=2, circle_radius=4),#for points
                              mp_drawing.DrawingSpec(color=(80,256,250), thickness=2, circle_radius=2))#for connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                              mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2))

In [None]:
# gets keypoints of result's landmaks and returns as single flatten landmarks np.array().
def keypoints_extraction(results):
    face =  np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    pose = np.array([[res.x, res.y,res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*3) # removed res.visibility()
    lh = np.array([[res.x, res.y,res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

    return np.concatenate([pose, face, lh, rh])

In [None]:
# check's existence of paticular file without need of its extension(ex: .mp4 or webm).
def check_existence(file_path, file_name):
    
    for file in os.listdir(file_path):
        name = str(file).split('.')
        ext = name[-1]
        if len(name) == 2:
            name = "".join(name[:-1])
        else:
            name = ".".join(name[:-1])
        if (name == file_name) and (os.path.isfile(os.path.join(file_path,file_name + "." + ext))):
            return file_name + "." + ext
    return False

In [None]:
# converts normalised cordinates into pixel cordinates
def locate_it(norm_val, w, h):
    try:
        point = (int(norm_val.x * w),int(norm_val.y * h))
    except AttributeError as ae:
        point = (int(norm_val[0] * w),int(norm_val[1] * h))

    point = (min(point[0], w - 1),min(point[1], h - 1))

    return point


In [None]:
def get_rect_cords(frame,results):
    
    height, width, channels = frame.shape
    top_head_index_in_face_landmarks = 10
    hip_index_in_pose_landmarks = (24,23)

    left_shoulder = results.pose_landmarks.landmark[11]
    right_shoulder = results.pose_landmarks.landmark[12]

    centre_point = ((left_shoulder.x + right_shoulder.x) / 2 , (left_shoulder.y + right_shoulder.y) / 2)
    
    if not results.face_landmarks:
        face_loc = results.face_landmarks.landmark[top_head_index_in_face_landmarks]
        st_norm_val = (face_loc.x,face_loc.y)
        head_point = locate_it(face_loc,width,height)
    
    else:
        left_eyebrow_inner = results.pose_landmarks.landmark[4]
        # right_eyebrow_inner = results.pose_landmarks.landmark[1]
        nose_point = results.pose_landmarks.landmark[0]

        st_norm_val = [nose_point.x,nose_point.y]
        norm_dist = cv.norm(np.array([left_eyebrow_inner.x,left_eyebrow_inner.y]), np.array(st_norm_val))
        st_norm_val = (st_norm_val[0], st_norm_val[1] - norm_dist*3)

        nose_point = locate_it(nose_point, width, height)
        left_eyebrow_inner = locate_it(left_eyebrow_inner, width, height)

        # face_loc = ((left_eyebrow_inner.x + right_eyebrow_inner.x) / 2 ,  nose_point.y - (left_eyebrow_inner.y + right_eyebrow_inner.y) / 2) 
        dist  = cv.norm(np.array(left_eyebrow_inner), np.array(nose_point))
        head_point = (nose_point[0], nose_point[1] - int(dist)*3)
        
    pose_loc_1 = results.pose_landmarks.landmark[hip_index_in_pose_landmarks[0]]
    pose_loc_2 = results.pose_landmarks.landmark[hip_index_in_pose_landmarks[1]]

        
    pose_point_1 = locate_it(pose_loc_1,width,height)
    pose_point_2 = locate_it(pose_loc_2,width,height)
    pose_point = (int((pose_point_1[0]+pose_point_2[0])/2),int((pose_point_1[1]+pose_point_2[1])/2))
    distance = int(math.sqrt((pose_point[0] - head_point[0]) ** 2 + (pose_point[1] - head_point[1]) ** 2))


    # adjusting the resolution with reducing crop size when there is less space.
    if head_point[0]-int(distance/2)-29 > 0:
        st_pt1 = head_point[0]-int(distance/2)-30
        st_pt2 = pose_point[0]+int(distance/2)+30

    elif head_point[0]-int(distance/2)-19 > 0:
        st_pt1 = head_point[0]-int(distance/2)-20
        st_pt2 = pose_point[0]+int(distance/2)+20


    elif head_point[0]-int(distance/2)-9 > 0:
        st_pt1 = head_point[0]-int(distance/2)-10
        st_pt2 = pose_point[0]+int(distance/2)+10

    else: #head_point[0]-int(distance/2) > 0:
        st_pt1 = head_point[0]-int(distance/2)
        st_pt2 = pose_point[0]+int(distance/2)

    head_point = (max(st_pt1,0),head_point[1]-10 if head_point[1]-10 > 0 else 0)
    pose_point = (min(st_pt2, width - 1), min(pose_point[1], height - 1))

    rects_st_pt = head_point
    rects_sp_pt = pose_point
    
    return [rects_st_pt, rects_sp_pt],centre_point,st_norm_val

In [None]:
features  = np.array(['eat','fish','milk', 'cousin', 'want', 'nice'])

In [None]:
# getting the average video duration(frame count) of the all features
feature_indNdura = []
for feature in features:

    indexNduration = {}
    indexes_of_feature = labelsNindex[feature]

    for index in indexes_of_feature:
        
        # obtaining specific data from j_data and storing it on variable.
        speci_data = j_data[index]

        file_name = speci_data['file']

        # now we have to check whether the obtained index contains valid video data or not.
        
        is_video_there = check_existence(vid_src_path, file_name)
        if is_video_there:
            st_frame,end_frame = speci_data['start'], speci_data['end']
            vid_frame_duration = end_frame - st_frame
            indexNduration[index] = vid_frame_duration
    feature_indNdura.append(indexNduration)
# extracting the minimum, maximum and average duration(frame count) of the fetures sign sources.
min_fr_len = 1000
max_fr_len = 0
sum_for_calc_avg = []
for feature in range(len(features)):
    for duration in feature_indNdura[feature].values():
        if duration > max_fr_len:
            max_fr_len = duration
        if duration < min_fr_len:
            min_fr_len = duration
        sum_for_calc_avg.append(duration)
avg_fr_len = int(sum(sum_for_calc_avg)/len(sum_for_calc_avg))

print("the minimum length of the sign is {}".format(min_fr_len))
print("the maximum length of the sign is {}".format(max_fr_len))
print("The average length of the sign is {}".format(avg_fr_len))  
# obataining the average video sources of the features.
no_of_src_vids = 0
for feature in range(len(features)):
    no_vids = len(feature_indNdura[feature])
    no_of_src_vids += no_vids
avg_vid_count = no_of_src_vids // len(features)
print(avg_vid_count)

# exporting calculated values.
var_store = {'avg_vid_count':avg_vid_count, 'min_fr_len':min_fr_len , 'avg_fr_len': avg_fr_len, 'max_fr_len':max_fr_len , 'features': list(features)}
with open("var_store.json", 'w') as file:
    json.dump(var_store, file)


In [None]:
# creating the number of folders for the features as per the average.
for feature in range(len(features)):
    os.makedirs(os.path.join(path,"DataSet","Keypoints",str(feature)), exist_ok = True)
    for index in range(avg_vid_count):
        os.makedirs(os.path.join(path,"DataSet","Keypoints",str(feature),str(index)), exist_ok = True)

In [None]:
min_frame_duration = 1000
max_frame_duration = 0
total_frame_durations = np.array([])

In [None]:
custom_frame_duration = 35
with mp_holistic.Holistic(min_detection_confidence = 0.5, min_tracking_confidence = 0.5) as holistic: 
    # features indicates each label's index ex: features[0] = 'eat'
    # looping through each feature with its index.
    for feature in range(len(features)):
        # extracting the features index's from the labelNindex file and storing it onto a variable.
        features_index = []
        # now the features_index holds the indexs of that specific features from the MSASL.json file.
        vid_indexes_as_list = list(feature_indNdura[feature].keys())

        add_on_count = 0
        
        # sequence represents a single video and range's about average video count from each feature among features.
        for sequence in range(avg_vid_count):
            # managing to assingn when there is lesser vid src contents than average then chose one from exist src's.
            try:
                src_index = vid_indexes_as_list[sequence]
            except IndexError as ie:
                print("list out of range... Generating random index from existing...")
                src_index = random.choice(vid_indexes_as_list)

            # when faced invalid video's then generating random video.
            while(True):
                file_name_ext =  check_existence(vid_src_path, j_data[src_index]['file'])
                if not file_name_ext:
                    src_index = random.choice(vid_indexes_as_list)
                else:
                    break
                
            # getting width, height from spec_data.
            spec_data = j_data[src_index]
            width,height = spec_data['width'], spec_data['height']

            # getting start and end frame number from spec_data
            st_frame,sp_frame = spec_data['start'], spec_data['end']
      
            print("Working with the file : {} from {}:{}".format(file_name_ext,st_frame,sp_frame))

            # sequence_path = os.path.join(feature_path, str(sequence)+".mp4")
            cap = cv.VideoCapture(os.path.join(vid_src_path,file_name_ext))

            duration = sp_frame - st_frame
            
            # setting the starting frame number to the cap.
            cap.set(cv.CAP_PROP_POS_FRAMES, st_frame)
            
            # initially considering the inside video frame reading can continue and the current_frame with st_Frame.
            can_continue = True
            current_frame = st_frame
            frame_number = 0
            idle_gest_reg = np.array([])
            mode = "Loading..."
            can_record = False
            frame_track = False
            last_crop = [(0,0), (1, 1)]
                
            # looping through average frame_number of videos of label's sign duration.
            
            #creating a total of 35 frames only sign duration for signs.
            temp_key_storage = np.array([],dtype=np.uint8).reshape(0,1086)
            
            for fr in range(duration):

                # check whether to continue reading video frames or not.
                if can_continue:

                    ret, ori_img = cap.read()
                    if ret:
                        crop_frame = copy.deepcopy(ori_img)
                        img_result , results = mediapipe_detection(ori_img, holistic)
                        keypoints = keypoints_extraction(results)
                        try:
                            crop, c_point, st_norm_pt = get_rect_cords(ori_img, results)

                            if cv.norm(crop[0], last_crop[0]) > 4 and cv.norm(crop[1], last_crop[1]) > 4:
                                pass                  
                            else:
                                crop = last_crop

                            last_crop = crop
                        except AttributeError as ae:
                            crop = last_crop

                        crop_frame = crop_frame[crop[0][1]:crop[1][1], crop[0][0]:crop[1][0]]                 
                        standard_frame = copy.deepcopy(crop_frame)

                        s_height = 300
                        s_width = 300

                        width_scale = s_height/crop_frame.shape[1] 
                        height_scale = s_width/crop_frame.shape[0] 

                        standard_frame = cv.resize(standard_frame, (s_height, s_width))
                        custom_keypoints = np.array([])
                                                                

                        st_pt = (crop[0][0]/ori_img.shape[1], crop[0][1]/ori_img.shape[0])

                        for i in range(0, len(keypoints), 3):
                            
                            loc_ = (keypoints[i] - st_pt[0], keypoints[i+1] - st_pt[1])
                            point = locate_it(loc_,ori_img.shape[1],ori_img.shape[0])
                            standard_point = (int(point[0] * width_scale), int(point[1] * height_scale))

                            norm_standard_point = [standard_point[0] / s_width, standard_point[1] / s_height]

                            if (0 > standard_point[0] > s_width or 0 > standard_point[1] > s_height):
                                print("this point went into invalid section:")
                                print("standard_point: {}".format(standard_point))
                                norm_stand_point = [0.0, 0.0]
                        
                            cv.circle(standard_frame, standard_point, 1, (255,255, 0), 1)
                            custom_keypoints = np.append(custom_keypoints, norm_standard_point)
                        
                        if results.left_hand_landmarks or results.right_hand_landmarks:
                            idle_gest_reg = np.append(idle_gest_reg, 1)
                            can_record = True
                            frame_track = True
                        else:
                            idle_gest_reg = np.append(idle_gest_reg, None)

                        if can_record:
                            if len(idle_gest_reg) > 7:
                                idle_gest_reg = idle_gest_reg[-7:]
                                 
                            if len(idle_gest_reg) == 7:
                                if np.all(idle_gest_reg == None):        
                                    mode = "idle"
                                    print("Gesture Ended! Stopping...")
                                    can_record = False
                                    frame_track = False
                                    print(frame_number)
                                    frame_number = 0   
                                    break
                                else:
                                    mode = "gest"
                                    frame_number += 1
                                    
                        cv.putText(standard_frame, "{}{}:{}:FN:{}".format(features[feature],sequence,mode, frame_number), (2, 20), cv.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 0, cv.LINE_AA)     
                        cv.imshow('standard_Frame', standard_frame)
                        

                        if current_frame >= sp_frame:
                            can_continue = False
                            break
                                               
                        if (cv.waitKey(1) & 0xFF == ord('q')):
                            break
                           
                    else:
                        can_continue = False
                        break

                           
                else:
                    print("last frame reached still reading zero array frame...")
                    break
   

            
                current_frame += 1  
                
                if frame_track:
                    if frame_number >= custom_frame_duration:
                        break
                

                # saving the numpy array as .npy file.
                if can_record:
                    
                    temp_key_storage = np.vstack((temp_key_storage,custom_keypoints))
           
            
            if mode == "Loading...":
                add_on_count -= 1
                print("skipping this video..Had no gesture but adding randomly chosened prev video contenets.")
                
                # filling the remaining invalid gap with previous video extracted data's.
                temp_random = random.randint(0, sequence - 1)
                for frame_num in range(custom_frame_duration):
                        prev_npy_path = os.path.join(path,"DataSet","Keypoints",str(feature),str(temp_random),str(frame_num))
                        np_data = np.load(npy_path)
                        npy_path = os.path.join(path,"DataSet","Keypoints",str(feature),str(sequence),str(frame_num))
                        np.save(np_data, npy_path)
            else:
                key_length = len(temp_key_storage)
                if  key_length > custom_frame_duration:
                    print("exited frame lim about {}.. so reducing to standard size 35.".format(key_length))
                    center_pt = int(key_length/2)
                    starting_pt = center_pt - int(custom_frame_duration/2)
                    temp_key_storage = temp_key_storage[starting_pt:starting_pt+35]
                else:
                    refill_len = custom_frame_duration - key_length
                    print("filling the less frame gap of {} frames".format(refill_len))
                    for i in range(refill_len):
                        temp_key_storage = np.vstack((temp_key_storage,np.zeros(1086, dtype=np.float64)))
                
                if len(temp_key_storage) == custom_frame_duration:
                    for frame_num in range(custom_frame_duration):
                        npy_path = os.path.join(path,"DataSet","Keypoints",str(feature),str(sequence),str(frame_num))
                        np.save(npy_path, custom_keypoints) 
                else:
                    raise ValueError("invalid array size...")
                
                total_frame_durations = np.append(total_frame_durations, frame_number)
            print("Completed the sequence NO: {}".format(sequence))
            cap.release()
        print("completed the feature NO: {}".format(feature))
print("got completed everything...")  

avg_frame_duration = int(np.mean(total_frame_durations))

# with open("videoNcrop.json", 'w') as file:
#     json.dump(videoNcrop, file)
                            

In [None]:
cap.release()
cv.destroyAllWindows()