In [2]:
import json
import os
import shutil
import itertools
import tensorflow as tf
import numpy as np
import cv2
import config
import os
import mediapipe as mp
import torch
from mp_funs import extract_landmarks_to_np, FACEMESH_LANDMARKS, POSE_LANDMARKS, HAND_LANDMARKS
from utils import save_dict, load_dict
from math import floor
from sklearn import preprocessing

In [2]:
EXTENSION = '.mp4'
SPLITS = ['train', 'val', 'test']
X_PICK_FILE_PATH = 'data/npy_videos/npy_db_x.pkl'
Y_PICK_FILE_PATH = 'data/npy_videos/npy_db_y.pkl'
LABELS_MAP_PICK_FILE_PATH = 'data/npy_videos/labels_map.pkl'

mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities that will be useful for action representation

In [33]:
def get_n_gloss(indexfile='data/WLASL_v0.3.json'):
    content = json.load(open(indexfile))
    return len([items for items in content])
        

In [34]:
get_n_gloss(indexfile='../data/WLASL_v0.3.json')

2000

In [None]:
def rm_error_info(func, path, _):
    print("INFO: The path", path, "does not exist. Skipping...")

def gloss_ranking(content, vid_directory, top_k):
    ranking = {}
    for entry in content:
        gloss = entry['gloss']
        
        appereances = []
        for split in SPLITS:
            path_to_check = os.path.join(vid_directory, split, gloss)
            # If there is a gloss that does not appear in a split, we skip it
            try:
                items = os.listdir(path_to_check)
                appereances.append(len(items))
            except OSError:
                appereances.append(0)

        # we add up all the videos from each split, given a gloss
        ranking[gloss] = sum(appereances)

    # sorting the dictionary based on the value
    top_ranking = {k: v for k, v in sorted(ranking.items(), key=lambda item: item[1], reverse=True)}

    return dict(itertools.islice(top_ranking.items(), top_k))

In [None]:
def from_dict_to_tensor(X):
    max_len = -10e8
    n_frame_lm = FACEMESH_LANDMARKS + POSE_LANDMARKS + 2*HAND_LANDMARKS

    for sp in SPLITS:
        split = X[sp]
        for video in split:
            cur_len = len(video)
            if cur_len > max_len:
                max_len = cur_len

    # once we got the max_len, we expland the videos to match the frames
    dims = {}
    for sp in SPLITS:
        split = X[sp]
        for video in split:
            diff = max_len - len(video)
            if diff != 0:
                if diff % 2 != 0:
                    # insert at the end of the list
                    for i in range(0, floor(diff/2)):
                        video.append(np.zeros(n_frame_lm))
                    
                    # insert at the beginning of the list
                    for i in range(0, floor(diff/2)+1):
                        video.insert(i, np.zeros(n_frame_lm))
                else:
                    # insert at the end and the beginning of the list
                    for i in range(0, int(diff/2)):
                        video.append(np.zeros(n_frame_lm))
                        video.insert(i, np.zeros(n_frame_lm))
        
    for sp in SPLITS:
        # Retrieve the dimensions from the tensors
        dims[sp] = (len(X[sp]), max_len, n_frame_lm)
        # flatten the nested list of np.arrays 
        X[sp] = np.concatenate(X[sp]).ravel()
        
    # Now that the number of frames between videos are matched, we cast them into tensors
    for sp in SPLITS:
        X[sp] = torch.tensor(X[sp]).reshape(dims[sp])

    return X

In [None]:
def encode_labels(labels):
    new_labels = {}
    le = preprocessing.LabelEncoder()
    le.fit(labels['train'])

    for sp in SPLITS:
        new_labels[sp] = torch.tensor(le.transform(labels[sp]))

    le_mapping = dict(zip(le.classes_, le.transform(le.classes_)))

    return new_labels, le_mapping 


In [None]:
def save_dataset(X_tens, Y_enc, le_mapping):
    save_dict(X_tens, X_PICK_FILE_PATH)
    save_dict(Y_enc, Y_PICK_FILE_PATH)
    save_dict(le_mapping, LABELS_MAP_PICK_FILE_PATH)


def load_dataset():
    X_tens = load_dict(X_PICK_FILE_PATH)
    Y_enc = load_dict(Y_PICK_FILE_PATH)
    le_mapping = load_dict(LABELS_MAP_PICK_FILE_PATH)
    return X_tens, Y_enc, le_mapping

In [35]:

def organize(indexfile='data/WLASL_v0.3.json', vid_directory='data/videos', top_k = 1000):
    if indexfile == 'nil':
        print('No index specified. Exiting.')
        return

    content = json.load(open(indexfile))

    if top_k > get_n_gloss(indexfile):
        print("The number of the top_k is greater the total glosses of the dataset")
        return
    
    gloss_rank = gloss_ranking(content, vid_directory, top_k)
    print("Ranking created with top", top_k, "glosses/labels...")

    for entry in content:
        gloss = entry['gloss']
        instances = entry['instances']

        # if the gloss is in the top_k, then we add it to the top_k dataset
        if gloss in gloss_rank.keys():
            for inst in instances:
                vid_id = inst['video_id']
                split = inst['split']

                source = os.path.join(vid_directory, vid_id+EXTENSION)
                destination = os.path.join(vid_directory, f"top_{top_k}", split, gloss)

                # create the dataset structure /data/videos/top_k/<train|test|val>/gloss
                if not os.path.exists(destination): 
                    os.makedirs(destination)
                
                # and now, we copy from /data/videos to /data/videos/top_k/<train|test|val>/gloss
                if os.path.exists(source):
                    shutil.copy(source, destination)


In [37]:
# organize(indexfile='../data/WLASL_v0.3.json', vid_directory='../data/videos', top_k = 10)

Ranking created with top 10 glosses/labels...


In [55]:
root = '../data/videos/top_200/train/'
folders = list(os.walk(root))[1:]

for folder in folders:
    if not folder[2]:
        print(folder[0])