In [30]:
import tensorflow as tf
import os
import shutil
import numpy as np
import cv2
import matplotlib.pyplot as plt
import json
import random
from fractions import Fraction
import math
import mediapipe as mp
import re

## Data

Resoultion is 1920 by 1080 (W/H)

## Preprocessing

In [31]:
def get_files(path):
    f = []
    with os.scandir(path) as entries:
        for entry in entries:
            if entry.is_file():
                f.append(entry.name)
    return f

def get_class(path):
    return int(path.split('_')[0])

def get_files_per_class(files):
    files_for_class = {}
    
    for fname in files:
        class_name = get_class(fname)
        if files_for_class.get(class_name, 0) != 0:
            files_for_class[class_name].append(fname)
        else:
            files_for_class[class_name] = [fname]
    
    return files_for_class

In [32]:
all_files = get_files('./dataset/lsa64-data/all/')
files_for_class = get_files_per_class(all_files)
classes = list(files_for_class.keys())

In [29]:
for key in files_for_class:
    files_for_class[key] = list(filter(lambda x: re.match(r"[0-9]{3}_[0-9]{3}_00[1-2].mp4", x), files_for_class[key]))

In [None]:
# Do not Run
# Find the average and median number of frames used for describing each word (for now n = 10) and check the frame dimensions

class_frame_info = {}

for class_ in files_for_class:
    result = []
    for file in files_for_class[class_]:
        cap = cv2.VideoCapture(f'./dataset/lsa64-data/all/{file}')
        result.append(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
    result = np.array(result)
    class_frame_info[class_] = {}
    class_frame_info[class_]['mean'] = np.mean(result)
    class_frame_info[class_]['median'] = np.median(result)

In [12]:
class_frame_info

{1: {'mean': 119.5, 'median': 118.0},
 2: {'mean': 124.0, 'median': 118.0},
 3: {'mean': 119.5, 'median': 118.0},
 4: {'mean': 104.5, 'median': 88.0},
 5: {'mean': 133.0, 'median': 118.0},
 6: {'mean': 161.5, 'median': 148.0},
 7: {'mean': 107.5, 'median': 118.0},
 8: {'mean': 97.0, 'median': 88.0},
 9: {'mean': 125.5, 'median': 118.0},
 10: {'mean': 98.5, 'median': 88.0},
 11: {'mean': 103.0, 'median': 103.0},
 12: {'mean': 124.0, 'median': 118.0},
 13: {'mean': 113.5, 'median': 118.0},
 14: {'mean': 97.0, 'median': 88.0},
 15: {'mean': 91.0, 'median': 88.0},
 16: {'mean': 115.0, 'median': 118.0},
 17: {'mean': 100.0, 'median': 88.0},
 18: {'mean': 116.5, 'median': 118.0},
 19: {'mean': 94.0, 'median': 88.0},
 20: {'mean': 100.0, 'median': 88.0},
 21: {'mean': 125.5, 'median': 118.0},
 22: {'mean': 116.5, 'median': 118.0},
 23: {'mean': 124.0, 'median': 118.0},
 24: {'mean': 126.5, 'median': 122.0},
 25: {'mean': 140.0, 'median': 152.0},
 26: {'mean': 122.0, 'median': 122.0},
 27: {'m

In [14]:
# Do not Run

with open('./lsa64-data-info/nframes.json', 'w') as f:
    json_data = json.dumps(class_frame_info)
    f.write(json_data)

In [33]:
with open('./lsa64-data-info/nframes.json', 'r') as read_content:
    class_frame_info = json.load(read_content)
    class_frame_info = {int(k):v for k, v in class_frame_info.items()}

In [6]:
medians = []

for class_ in classes[16:48]:
    medians.append(class_frame_info[int(class_)]['median'])

np.median(medians)

122.0

In [7]:
m = np.unique(medians, return_counts=True)
list(zip(*m))

[(88.0, 3), (118.0, 4), (122.0, 13), (137.0, 3), (152.0, 7), (182.0, 2)]

In [46]:
max_f = 0

for class_ in classes[16:48]:
    result = []
    for file in files_for_class[class_]:
        cap = cv2.VideoCapture(f'dataset/lsa64-data/all/{file}')
        result.append(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
    max_f = max(max_f, max(result))
    result = np.array(result)

In [47]:
max_f

242.0

Frames = 122, Max n_frames = 242

### Previous Model + Keras Sequence

In [5]:
from fractions import Fraction

def save_holistic(path, holistic, image):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = holistic.process(image)
    image.flags.writeable = True

    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

    landmarks = np.concatenate([pose, face, lh, rh])
    np.save(path, landmarks)
    
def sequential_frames(v_id, src, holistic, n_frames):
    for f in range(n_frames):
        ret, frame = src.read()
        if ret:
            save_holistic(f'./lsa64-data-info/landmarks/{v_id}/{f}.npy', holistic, frame)
        else:
            np.save(f'./lsa64-data-info/landmarks/{v_id}/{f}.npy', np.zeros((1662, )))

def frames_from_video(v_id, n_frames):
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing_styles = mp.solutions.drawing_styles
    mp_holistic = mp.solutions.holistic
    v_id = v_id.split('.')[0]
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        src = cv2.VideoCapture(f'./dataset/lsa64-data/videos/{v_id}.mp4')
        video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)
        os.makedirs(f'./lsa64-data-info/landmarks/{v_id}', mode=777)
        
        if video_length <= n_frames:
            frame_step = 0
            sequential_frames(v_id, src, holistic, n_frames)
        else:
            f = 0
            ratio = round(video_length/n_frames, 1)
            frac = Fraction(ratio).limit_denominator(10)
            
            if ratio == 1.7 or ratio == 1.9:
                frac = Fraction(ratio).limit_denominator(2)
            
            num, den = frac.numerator, frac.denominator
            while f < n_frames:
                for _ in range(min(den, n_frames-f)):
                    ret, frame = src.read()
                    if ret:
                        save_holistic(f'./lsa64-data-info/landmarks/{v_id}/{f}.npy', holistic, frame)
                    else:
                        np.save(f'./lsa64-data-info/landmarks/{v_id}/{f}.npy', np.zeros((1662, )))
                    f += 1
                for _ in range(num-den):
                    ret, frame = src.read()

        src.release()

In [49]:
for class_ in classes[16:48]:
    for v_id in files_for_class[class_]:
        frames_from_video(v_id, 122)

In [6]:
from keras.utils import to_categorical

y = [i for i in range(32)]
y = to_categorical(y).astype(int)

In [7]:
label_map = {label:y[num] for num, label in enumerate(classes[16:48])}
label_map

{17: array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 18: array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 19: array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 20: array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 21: array([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 22: array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 23: array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 24: array([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 25: array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,

In [8]:
len(label_map)

32

In [12]:
def get_train_val_test_data(classes, label_map, files_for_class, split_ratio=[0.8, 0.9]):
    
    result_train_v_ids = []
    result_val_v_ids = []
    result_test_v_ids = []
    result_train_labels = []
    result_val_labels = []
    result_test_labels = []
    
    for class_ in classes:
        v_ids = files_for_class[class_]
        np.random.shuffle(v_ids)
        split_1 = int(len(v_ids) * (split_ratio[0]))
        split_2 = int(len(v_ids) * (split_ratio[1]))
        result_train_v_ids.extend(v_ids[:split_1])
        result_val_v_ids.extend(v_ids[split_1:split_2])
        result_test_v_ids.extend(v_ids[split_2:])
        result_train_labels.extend([label_map[class_]]*split_1)
        result_val_labels.extend([label_map[class_]]*(split_2-split_1))
        result_test_labels.extend([label_map[class_]]*(len(v_ids) - split_2))
        
    return np.array(result_train_v_ids), np.array(result_train_labels), np.array(result_val_v_ids), np.array(result_val_labels), np.array(result_test_v_ids), np.array(result_test_labels) 

In [12]:
a, b, c, d, e, f = get_train_val_test_data(classes[16:48], label_map, files_for_class)

In [13]:
a.shape, b.shape, c.shape, d.shape, e.shape, f.shape

((512,), (512, 32), (64,), (64, 32), (64,), (64, 32))

In [14]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, v_ids, labels, n_frames, batch_size = 4, training = False):
        self.v_ids = v_ids
        self.labels = labels
        self.n_frames = n_frames
        self.training = training
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.v_ids))
        if self.training:
            np.random.shuffle(self.indexes)

    def get_video_frames(self, v_ids, classes):
        result_X = []
        result_y = []
        for index, v_id in enumerate(v_ids):
            window = []
            for f in range(self.n_frames):
                window.append(np.load(f'./lsa64-data-info/landmarks/{v_id.split(".")[0]}/{f}.npy'))
            result_X.append(np.stack(window, axis=0))
            result_y.append(classes[index])
        
        return np.array(result_X), np.array(result_y)
            
    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        return self.get_video_frames(self.v_ids[indexes], self.labels[indexes])
    
    def __len__(self):
        return math.floor(len(self.v_ids) / self.batch_size)
    
    def on_epoch_end(self):
        if self.training:
            np.random.shuffle(self.indexes)

In [15]:
train_gen = DataGenerator(a, b, 122, training=True)
val_gen = DataGenerator(c, d, 122)

In [16]:
train_gen[0][0].shape, train_gen[0][1].shape

((4, 122, 1662), (4, 32))

In [17]:
train_gen[0][1].shape

(4, 32)

In [18]:
steps_per_epoch = len(train_gen)
validation_steps = len(val_gen)

In [23]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, GRU

model = Sequential()
model.add(GRU(256, return_sequences=True, activation='relu', input_shape=(122, 1662)))
model.add(GRU(128, return_sequences=True, activation='relu'))
model.add(GRU(64, return_sequences=False, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='softmax'))



In [24]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru_3 (GRU)                 (None, 122, 256)          1474560   
                                                                 
 gru_4 (GRU)                 (None, 122, 128)          148224    
                                                                 
 gru_5 (GRU)                 (None, 64)                37248     
                                                                 
 dense_3 (Dense)             (None, 128)               8320      
                                                                 
 dense_4 (Dense)             (None, 64)                8256      
                                                                 
 dense_5 (Dense)             (None, 32)                2080      
                                                                 
Total params: 1,678,688
Trainable params: 1,678,688
No

In [25]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [26]:
history = model.fit(train_gen, epochs=1, steps_per_epoch=steps_per_epoch, validation_data=val_gen, validation_steps=validation_steps)

 23/128 [====>.........................] - ETA: 11:12 - loss: 3.4660 - accuracy: 0.0000e+00

KeyboardInterrupt: 

In [55]:
from fractions import Fraction

def save_holistic(path, holistic, image):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = holistic.process(image)
    image.flags.writeable = True

    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

    landmarks = np.concatenate([pose, lh, rh])
    np.save(path, landmarks)
    
def sequential_frames(v_id, src, holistic, n_frames):
    for f in range(n_frames):
        ret, frame = src.read()
        if ret:
            save_holistic(f'./lsa64-data-info/landmarks-16-30-3/{v_id}/{f}.npy', holistic, frame)
        else:
            np.save(f'./lsa64-data-info/landmarks-16-30-3/{v_id}/{f}.npy', np.zeros((258, )))

def frames_from_video(v_id, n_frames):
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing_styles = mp.solutions.drawing_styles
    mp_holistic = mp.solutions.holistic
    v_id = v_id.split('.')[0]
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        src = cv2.VideoCapture(f'./dataset/lsa64-data/videos/{v_id}.mp4')
        video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)
        os.makedirs(f'./lsa64-data-info/landmarks-16-30-3/{v_id}', mode=777)
        
        if video_length <= n_frames:
            frame_step = 0
            sequential_frames(v_id, src, holistic, n_frames)
        else:
            f = 0
            ratio = round(video_length/n_frames, 1)
            frac = Fraction(ratio).limit_denominator(10)
            
            if ratio == 1.7 or ratio == 1.9:
                frac = Fraction(ratio).limit_denominator(2)
            
            num, den = frac.numerator, frac.denominator
            while f < n_frames:
                for _ in range(min(den, n_frames-f)):
                    ret, frame = src.read()
                    if ret:
                        save_holistic(f'./lsa64-data-info/landmarks-16-30-3/{v_id}/{f}.npy', holistic, frame)
                    else:
                        np.save(f'./lsa64-data-info/landmarks-16-30-3/{v_id}/{f}.npy', np.zeros((258, )))
                    f += 1
                for _ in range(num-den):
                    ret, frame = src.read()

        src.release()

In [56]:
for class_ in classes[16:32]:
    for v_id in files_for_class[class_]:
        frames_from_video(v_id, 122)

In [57]:
from keras.utils import to_categorical

y = [i for i in range(16)]
y = to_categorical(y).astype(int)

In [58]:
label_map = {label:y[num] for num, label in enumerate(classes[16:32])}
label_map

{17: array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 18: array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 19: array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 20: array([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 21: array([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 22: array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 23: array([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 24: array([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
 25: array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]),
 26: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
 27: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]),
 28: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]),
 29: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]),
 30: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]),
 31: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]),
 32: array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])}

In [59]:
len(classes[16:32])

16

In [60]:
a, b, c, d, e, f = get_train_val_test_data(classes[16:32], label_map, files_for_class)

In [61]:
a.shape, b.shape, c.shape, d.shape, e.shape, f.shape

((640,), (640, 16), (80,), (80, 16), (80,), (80, 16))

In [62]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, v_ids, labels, n_frames, batch_size = 16, training = False):
        self.v_ids = v_ids
        self.labels = labels
        self.n_frames = n_frames
        self.training = training
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.v_ids))
        if self.training:
            np.random.shuffle(self.indexes)

    def get_video_frames(self, v_ids, classes):
        result_X = []
        result_y = []
        for index, v_id in enumerate(v_ids):
            window = []
            for f in range(self.n_frames):
                window.append(np.load(f'./lsa64-data-info/landmarks-16-30-3/{v_id.split(".")[0]}/{f}.npy'))
            result_X.append(np.stack(window, axis=0))
            result_y.append(classes[index])
        
        return np.array(result_X), np.array(result_y)
            
    def __getitem__(self, index):
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        return self.get_video_frames(self.v_ids[indexes], self.labels[indexes])
    
    def __len__(self):
        return math.floor(len(self.v_ids) / self.batch_size)
    
    def on_epoch_end(self):
        if self.training:
            np.random.shuffle(self.indexes)

In [63]:
train_gen = DataGenerator(a, b, 122, training=True)
val_gen = DataGenerator(c, d, 122)

In [64]:
train_gen[0][0].shape, train_gen[0][1].shape

((16, 122, 258), (16, 16))

In [65]:
train_gen[0][1].shape

(16, 16)

In [66]:
steps_per_epoch = len(train_gen)
validation_steps = len(val_gen)

In [67]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, GRU

model = Sequential()
model.add(GRU(128, return_sequences=True, activation='relu', input_shape=(122, 258)))
model.add(GRU(64, return_sequences=True, activation='relu'))
model.add(GRU(32, return_sequences=True, activation='relu'))
model.add(GRU(16, return_sequences=False, activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(16, activation='softmax'))



In [68]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru_4 (GRU)                 (None, 122, 128)          148992    
                                                                 
 gru_5 (GRU)                 (None, 122, 64)           37248     
                                                                 
 gru_6 (GRU)                 (None, 122, 32)           9408      
                                                                 
 gru_7 (GRU)                 (None, 16)                2400      
                                                                 
 dense_3 (Dense)             (None, 128)               2176      
                                                                 
 dense_4 (Dense)             (None, 64)                8256      
                                                                 
 dense_5 (Dense)             (None, 16)               

In [69]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [70]:
steps_per_epoch = len(train_gen)
validation_steps = len(val_gen)

In [None]:
history = model.fit(train_gen, epochs=10, steps_per_epoch=steps_per_epoch, validation_data=val_gen, validation_steps=validation_steps)

Epoch 1/10
Epoch 2/10
Epoch 3/10

In [68]:
sequences, labels = [], []
for v_id, label in zip(a, b):
    window = []
    for f in range(30):
        window.append(np.load(f'./lsa64-data-info/landmarks-16-30/{v_id.split(".")[0]}/{f}.npy'))
    sequences.append(np.stack(window, axis=0))
    labels.append(label)

In [69]:
X = np.array(sequences)
y = np.array(labels).astype(int)
rand_ind = np.arange(len(X))
np.random.shuffle(rand_ind)
X = X[rand_ind]
y = y[rand_ind]

In [None]:
history = model.fit(X, y, batch_size=16, epochs=100)

Epoch 1/100

In [33]:
history = model.fit(train_gen, epochs=10, steps_per_epoch=steps_per_epoch, validation_data=val_gen, validation_steps=validation_steps)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
