In [None]:
import json
import os
import cv2
import numpy as np
import mediapipe as mp
import tensorflow as tf
from sklearn.metrics import multilabel_confusion_matrix

In [None]:
folder_path = '/Users/danilobabic/Documents/GitHub/ri_train_data'
words = []
missing_videos = []
video_metadata = []
words_file = '/Users/danilobabic/Documents/GitHub/archive/wlasl_class_list.txt'
missing_videos_file = '/Users/danilobabic/Documents/GitHub/archive/missing.txt'
video_metadata_file = '/Users/danilobabic/Documents/GitHub/archive/WLASL_v0.3.json'
archive_path = '/Users/danilobabic/Documents/GitHub/archive/videos'
testing_path = '/Users/danilobabic/Documents/GitHub/testing'
mp_holistic = mp.solutions.holistic

In [None]:
def get_words(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip().split('\t')
            words.append(line[1])

In [None]:
get_words(words_file)


In [None]:
def get_missing_files(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip()
            missing_videos.append(line)

In [None]:
get_missing_files(missing_videos_file)

In [None]:
def get_video_metadata(file_path):
    with open(file_path, 'r') as file:
        data = json.load(file)
        return data

In [None]:
video_metadata = get_video_metadata(video_metadata_file)

In [None]:
def create_word_folders(path):
    for word in words:
        folder_path = os.path.join(path, word)
        os.makedirs(folder_path, exist_ok=True)
    

In [None]:
def extract_keypoints(frame, model):         
    results = model.process(frame)                 
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [None]:
def extract_videos():
    #print("USO U FUNKCIJUUUUU")
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        print(video_metadata)
        # for each word we find videos 
        for word in video_metadata:
            for video in word['instances']:
                if video['video_id'] in missing_videos or video['split'] != 'train': 
                    continue
                video_path = os.path.join(folder_path, word['gloss'], video['video_id'])
                os.makedirs(video_path, exist_ok=True)
                cap = cv2.VideoCapture(archive_path + "/" + video['video_id'] + ".mp4")
                print("USO U FUNCKIJU FAJL :" + archive_path + "/" + video['video_id'] + ".mp4")
                if video['frame_end'] == -1:
                    frame_num = 1
                    while True: 
                        ret, frame = cap.read()
                        if not ret:
                            break
                        keypoints = extract_keypoints(frame, holistic)
                        npy_path = os.path.join(video_path, str(frame_num))
                        np.save(npy_path, keypoints)
                        frame_num += 1
                else:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, video['frame_start'])
                    frame_num = video['frame_start']
                    while frame_num <= video['frame_end']:
                        ret, frame = cap.read()
                        if not ret:
                            break 
                        keypoints = extract_keypoints(frame, holistic)
                        npy_path = os.path.join(video_path, str(frame_num))
                        np.save(npy_path, keypoints)
                        frame_num += 1 
                        if frame_num > video['frame_end']:
                            break

In [None]:
def load_data_from_folders():
    frames = []
    labels = []
    word_folders = sorted(os.listdir(folder_path))
    #testing_list = ["book","minute","cherry"]
    for word_folder in word_folders:
        word_path = os.path.join(folder_path, word_folder)
        video_folders = sorted([f for f in os.listdir(word_path) if os.path.isdir(os.path.join(word_path, f))])
        for video_folder in video_folders:
            video_path = os.path.join(word_path, video_folder)
            frame_files = sorted(os.listdir(video_path))
            video_frames = []
            for frame_file in frame_files:
                frame_path = os.path.join(video_path, frame_file)
                frame = np.load(frame_path)
                video_frames.append(frame)
            frames.append(video_frames)
            labels.append(label_map[word_folder])
    return frames, labels

In [None]:
label_map = {label:num for num, label in enumerate(words)}

In [None]:
frames, labels = load_data_from_folders()
#print(frames)
max_frame_length = max(len(frame) for frame in frames)
print(labels)
# Pad the frames to have a common length
#padded_frames = tf.keras.preprocessing.sequence.pad_sequences(frames, maxlen=max_frame_length, padding='post', value=0.0)
frames = tf.keras.preprocessing.sequence.pad_sequences(frames, padding='post', dtype='float32')
labels = tf.keras.utils.to_categorical(labels)
# Convert the padded_frames to a NumPy array
frames_array = np.array(frames)
labels_array = np.array(labels)
print(frames_array.shape)
print(labels_array.shape)
#print(labels_array)


In [None]:
def create_lstm_model():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(64, return_sequences=True, activation='relu', input_shape=(None,1662)))
    model.add(tf.keras.layers.LSTM(128, return_sequences=True, activation='relu'))
    model.add(tf.keras.layers.LSTM(64, return_sequences=False, activation='relu'))
    model.add(tf.keras.layers.Dense(64, activation='relu'))
    model.add(tf.keras.layers.Dense(32, activation='relu'))
    model.add(tf.keras.layers.Dense(labels_array.shape[1], activation='softmax'))
    return model

In [None]:
log_dir = os.path.join('Logs')
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir)

In [None]:
model = create_lstm_model()
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(frames_array, labels_array, epochs= 200, callbacks=[tb_callback])
model.summary()

In [None]:
create_word_folders(testing_path)  #folder for training data

In [None]:
def extract_test_videos():
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        print(video_metadata)
        # for each word we find videos 
        for word in video_metadata:
            if word['gloss'] in ["book","minute","cherry"]:
                continue
            for video in word['instances']:
                if video['video_id'] in missing_videos or video['split'] != 'test': 
                    continue
                video_path = os.path.join(testing_path, word['gloss'], video['video_id'])
                os.makedirs(video_path, exist_ok=True)
                cap = cv2.VideoCapture(archive_path + "/" + video['video_id'] + ".mp4")
                print("USO U FUNCKIJU FAJL :" + archive_path + "/" + video['video_id'] + ".mp4")
                if video['frame_end'] == -1:
                    frame_num = 1
                    while True: 
                        ret, frame = cap.read()
                        if not ret:
                            break
                        keypoints = extract_keypoints(frame, holistic)
                        npy_path = os.path.join(video_path, str(frame_num))
                        np.save(npy_path, keypoints)
                        frame_num += 1
                else:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, video['frame_start'])
                    frame_num = video['frame_start']
                    while frame_num <= video['frame_end']:
                        ret, frame = cap.read()
                        if not ret:
                            break 
                        keypoints = extract_keypoints(frame, holistic)
                        npy_path = os.path.join(video_path, str(frame_num))
                        np.save(npy_path, keypoints)
                        frame_num += 1 
                        if frame_num > video['frame_end']:
                            break

In [None]:
extract_test_videos()

In [None]:
# FOR TESTING
def load_data_from_folders_test(): 
    frames = []
    labels = []
    word_folders = sorted(os.listdir(testing_path))
    #testing_list = ["book","minute","cherry"]
    for word_folder in word_folders:
        word_path = os.path.join(testing_path, word_folder)
        video_folders = sorted([f for f in os.listdir(word_path) if os.path.isdir(os.path.join(word_path, f))])
        for video_folder in video_folders:
            video_path = os.path.join(word_path, video_folder)
            frame_files = sorted(os.listdir(video_path))
            video_frames = []
            for frame_file in frame_files:
                frame_path = os.path.join(video_path, frame_file)
                frame = np.load(frame_path)
                video_frames.append(frame)
            frames.append(video_frames)
            labels.append(label_map[word_folder])
    return frames, labels

In [None]:
frames_test, labels_test = load_data_from_folders_test()
print(labels)
frames_test = tf.keras.preprocessing.sequence.pad_sequences(frames_test, padding='post', dtype='float32')
labels_test = tf.keras.utils.to_categorical(labels_test)
# Convert the padded_frames to a NumPy array
frames_array_test = np.array(frames_test)
labels_array_test = np.array(labels_test)
print(frames_array_test.shape)
print(labels_array_test.shape)
#print(labels_array)

In [None]:
words_predicted = model.predict(frames_array_test)
multilabel_confusion_matrix(np.argmax(labels_array_test, axis=1), np.argmax(words_predicted , axis=1))