### **Imports**

In [12]:
from os.path import join, realpath, dirname, exists, abspath, isfile, isdir
from os import mkdir as mk, name as os_name, getcwd, environ, pathsep, rename, listdir
from typing import Tuple

from mediapipe.python.solutions import drawing_utils as du 
from mediapipe.python.solutions import hands
from google.protobuf.json_format import MessageToDict

from sklearn.model_selection import train_test_split
from tensorflow.python.keras.utils.all_utils import to_categorical;
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import LSTM, Dense
from tensorflow.python.keras.callbacks import TensorBoard
from tensorflow.python.keras.backend import set_session
import tensorflow as tf

from numpy import array, zeros, concatenate, save, load, argmax
from uuid import uuid1

import cv2
from cv2 import imread, imshow, imwrite, flip, cvtColor, COLOR_BGR2RGB

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

Num GPUs Available:  1


### **Definitions**

In [3]:
# Options 💾
MODEL_NAME = 'v1'

MP_MODEL_COMPLEXITY = 0
MP_DETECTION_CONFIDENCE = 0.5
MP_TRACKING_CONFIDENCE = 0.5
MP_NUM_HANDS = 1

SIGNS = [
  'a', 'b', 'c',
]

ALL_SIGNS = SIGNS.copy()
ALL_SIGNS.insert(0, 'none')

CLASS_COUNT = len(ALL_SIGNS)
SEQUENCE_LENGHT = 10 # Amount of data per collection

# Paths 📁
ROOT_DIR = getcwd()
MODELS_DIR = join(ROOT_DIR, 'models')
MODEL_DIR = join(MODELS_DIR, MODEL_NAME)
LOG_DIR = join(MODEL_DIR, 'logs')
DATA_DIR = join(ROOT_DIR, 'data')
IMAGES_DIR = join(ROOT_DIR, 'images')
COLLECTED_IMAGES_DIR = join(IMAGES_DIR, 'collected')
PREPROCESSED_IMAGES_DIR = join(IMAGES_DIR, 'preprocessed')
PROCCESSED_IMAGES_DIR = join(IMAGES_DIR, 'processed')
REJECTED_IMAGES_DIR = join(IMAGES_DIR, 'rejected')

# Constants 🚧
HAND_LANDMARK_COUNT = 21 # https://mediapipe.dev/images/mobile/hand_landmarks.png
HAND_LANDMARK_POINTS = HAND_LANDMARK_COUNT * 3 # (x, y, z)

# Util 📐
def mkdir(path: str):
  if not exists(path):
    mk(path)
  else:
    print(f'{path} already exists!')
      
def dir_exists(dir_path: str) -> bool:
  return exists(dir_path) and isdir(dir_path)


### **Mediapipe Util**

In [4]:
def draw_landmarks(image, hand_landmarks):
  for point in hand_landmarks:
    du.draw_landmarks(
      image, point, hands.HAND_CONNECTIONS, 
      du.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
      du.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
    )
    
def draw_img_landmarks(image, hand_landmarks):
  for point in hand_landmarks:
    du.draw_landmarks(
      image, point, hands.HAND_CONNECTIONS, 
      du.DrawingSpec(color=(224,0,0), thickness=32, circle_radius=5), # points
      du.DrawingSpec(color=(0,0,224), thickness=32, circle_radius=5) # edges
    )

def mediapipe_detection(image, hands: hands.Hands):
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # COLOR CONVERSION BGR 2 RGB
  image.flags.writeable = False                   # Image is no longer writeable
  results = hands.process(image)                  # Make prediction
  image.flags.writeable = True                    # Image is now writeable
  image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # COLOR COVERSION RGB 2 BGR
  return image, results

def extract_keypoints_rh(results):
  landmarks = MessageToDict(results.multi_hand_landmarks[0])['landmark']
  res = []
  
  for lk in landmarks:
    res.append(lk['x'])
    res.append(lk['y'])
    res.append(lk['z'])
    
  return array(res)

def get_handedness(results):
  return MessageToDict(results.multi_handedness[0])['classification'][0]['label']

### *Model Structures*

In [5]:
def model_0(input_shape: Tuple[int, int]) -> Sequential:
  model = Sequential()
  model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=input_shape))
  model.add(LSTM(128, return_sequences=True, activation='relu'))
  model.add(LSTM(64, return_sequences=False, activation='relu'))
  model.add(Dense(64, activation='relu'))
  model.add(Dense(32, activation='relu'))
  model.add(Dense(CLASS_COUNT, activation='softmax'))
  return model

### **Capture w/Mediapipe**

In [5]:
cap = cv2.VideoCapture(0)
with hands.Hands(
  model_complexity=MP_MODEL_COMPLEXITY,
  min_detection_confidence=MP_DETECTION_CONFIDENCE,
  min_tracking_confidence=MP_TRACKING_CONFIDENCE,
  max_num_hands=MP_NUM_HANDS
) as mp_hands:
  while cap.isOpened():
    
    success, image = cap.read()
    image = flip(image, 1)
    
    if not success:
      print("Ignoring empty camera frame.")
      continue

    image, results = mediapipe_detection(image, mp_hands)
    if results.multi_hand_landmarks:
      draw_landmarks(image, results.multi_hand_landmarks)
      
    # Flip the image horizontally for a selfie-view display.
    imshow('MediaPipe Hands', image)
    
    key = cv2.waitKey(1)
    if key == ord('q'):
      break
  
cap.release()
cv2.destroyAllWindows()

### **Image Renaming**

In [117]:
_SIGN = 'c'
SOURCE_SIGN_DIR = join(COLLECTED_IMAGES_DIR, _SIGN)
DESTIN_SIGN_DIR = join(PREPROCESSED_IMAGES_DIR, _SIGN)
mkdir(DESTIN_SIGN_DIR)

if not exists(SOURCE_SIGN_DIR): raise Exception(f'make sure {SOURCE_SIGN_DIR} exists!')

for img_name in listdir(SOURCE_SIGN_DIR):
  src_img_path = join(SOURCE_SIGN_DIR, img_name)
  if isfile(src_img_path):
    dest_img_path = join(DESTIN_SIGN_DIR, f'{_SIGN}.{uuid1()}.jpg')
    rename(src_img_path, dest_img_path)

### **Image Data Extraction**

In [None]:
def reject(sign: str, img_name: str):
  rename(
    join(PREPROCESSED_IMAGES_DIR, sign, img_name),
    join(REJECTED_IMAGES_DIR, img_name)
  )
  
def accept(sign: str, img_name: str):
  rename(
    join(PREPROCESSED_IMAGES_DIR, sign, img_name),
    join(PROCCESSED_IMAGES_DIR, img_name)
  )

for sign in SIGNS:
  SIGN_DIR = join(PREPROCESSED_IMAGES_DIR, sign)
  DATA_SIGN_DIR = join(DATA_DIR, sign)
  if not exists(DATA_SIGN_DIR): mkdir(DATA_SIGN_DIR)
  
  with hands.Hands(
    model_complexity=MP_MODEL_COMPLEXITY,
    min_detection_confidence=MP_DETECTION_CONFIDENCE,
    min_tracking_confidence=MP_TRACKING_CONFIDENCE,
    max_num_hands=1
  ) as mp_hands:
    for img_name in listdir(SIGN_DIR):
      img_path = join(SIGN_DIR, img_name)
      image = flip(imread(img_path), 1)
      _, results = mediapipe_detection(image, mp_hands)
      
      if not results.multi_hand_landmarks:
        print(f'unable to detect any hands for image: {img_name}')
        reject(sign, img_name)
        continue
      else:
        if len(results.multi_handedness) > 1:
          print(f'detecting more than 1 hand for image: {img_name}')
          reject(sign, img_name)
          continue
        else:
          if get_handedness(results) != 'Right':
            print(f'detected hand is not a Right hand for image {img_name}')
            reject(sign, img_name)
            continue
    
      keypoints = extract_keypoints_rh(results)
      data_path = join(DATA_DIR, sign, img_name)
      save(data_path, keypoints)  
      accept(sign, img_name)

In [111]:
# ! FOR TESTING, DO NOT RUN
# for sign in SIGNS:
#   SIGN_DIR = join(PROCCESSED_IMAGES_DIR, sign)
  
#   with hands.Hands(
#     model_complexity=MP_MODEL_COMPLEXITY,
#     min_detection_confidence=MP_DETECTION_CONFIDENCE,
#     min_tracking_confidence=MP_TRACKING_CONFIDENCE,
#     max_num_hands=1
#   ) as mp_hands:
#     id = 0;
#     for img_name in listdir(SIGN_DIR):
#       img_path = join(SIGN_DIR, img_name)
#       image = imread(img_path)
#       results = mp_hands.process(cvtColor(image, COLOR_BGR2RGB))
#       if results.multi_hand_landmarks:
#         temp = image.copy()
#         draw_img_landmarks(temp, results.multi_hand_landmarks)
#         imwrite(join(IMAGES_DIR, 'temp', f'{img_name}.jpg'), temp)

### **Train**

##### *Load & Parition partition data*

In [5]:
def get_collection_count():
  data_amounts = []
  for collection_dir in listdir(DATA_DIR):
    if collection_dir == '.gitkeep': continue
    data_amounts.append(len(listdir(join(DATA_DIR, collection_dir))))
      
  return min(data_amounts)

COLLECTION_COUNT = get_collection_count()
print(f'currently using {COLLECTION_COUNT} data points')

# Load Training Data
label_map = { label: num for num, label in enumerate(ALL_SIGNS) }
sequences, labels = [ # Initializing with 'none' sign
  [
    [
      0 for i in range(HAND_LANDMARK_POINTS)
    ] for j in range(SEQUENCE_LENGHT)
  ] for k in range(COLLECTION_COUNT)
], [
  0 for i in range(COLLECTION_COUNT)
]

for sign in SIGNS:
  sign_data_dir = join(DATA_DIR, sign)
  for data_file_name in listdir(sign_data_dir)[:COLLECTION_COUNT]:
    data_path = join(sign_data_dir, data_file_name)
    res = load(data_path)
    window = [res] * SEQUENCE_LENGHT
    sequences.append(window)
    labels.append(label_map[sign])

x = array(sequences)
y = to_categorical(labels).astype(int)

input_shape = (SEQUENCE_LENGHT, HAND_LANDMARK_POINTS)

# Testing!
s_expected = (CLASS_COUNT * COLLECTION_COUNT, SEQUENCE_LENGHT, HAND_LANDMARK_POINTS)
s_result = x.shape
l_expected = (CLASS_COUNT * COLLECTION_COUNT, CLASS_COUNT)
l_result = y.shape
if s_result != s_expected:
  raise Exception(f'WARNING: expected sequence shape `{s_expected}` != from gotten `{s_result}`')
if l_result != l_expected:
  raise Exception(f'WARNING: expected labels shape `{l_expected}` != from gotten `{l_result}`')

print(f'input shape is {input_shape}')
# partitioning train & test data
x_train, x_text, y_train, y_test = train_test_split(x, y, test_size=0.05)

currently using 386 data points
input shape is (10, 63)


In [None]:
# Logging for TB
mkdir(MODEL_DIR)
mkdir(LOG_DIR)
tb_callback = TensorBoard(log_dir=LOG_DIR)


tf_config = tf.compat.v1.ConfigProto(allow_soft_placement=False)
tf_config.gpu_options.allow_growth = True
s = tf.compat.v1.Session(config=tf_config)
set_session(s)


with tf.device('/gpu:0'):
  # model = model_0(input_shape)
  model = Sequential()

  model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=input_shape))
  model.add(LSTM(128, return_sequences=True, activation='relu'))
  model.add(LSTM(64, return_sequences=False, activation='relu'))
  model.add(Dense(64, activation='relu'))
  model.add(Dense(32, activation='relu'))
  model.add(Dense(CLASS_COUNT, activation='softmax'))

  model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

  print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
  print(f'[RUN]: tensorboard --logdir={LOG_DIR}')
  model.fit(x_train, y_train, epochs=100, use_multiprocessing=True, workers=4, batch_size=2048)

  model.save(join(MODEL_DIR, 'signs.h5'))
  del model

### **Detection**

In [40]:
model = Sequential()

model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=input_shape))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(CLASS_COUNT, activation='softmax'))
model.load_weights(join(MODEL_DIR, 'signs.h5'))

sequence = []
sentence = []
threshold = 0.5

cap = cv2.VideoCapture(1)
with hands.Hands(
  model_complexity=MP_MODEL_COMPLEXITY,
  min_detection_confidence=MP_DETECTION_CONFIDENCE,
  min_tracking_confidence=MP_TRACKING_CONFIDENCE,
  max_num_hands=MP_NUM_HANDS
) as mp_hands:
  while cap.isOpened():
    
    success, image = cap.read()
    image = flip(image, 1)
    
    if not success:
      print("Ignoring empty camera frame.")
      continue

    image, results = mediapipe_detection(image, mp_hands)
    
    if results.multi_hand_landmarks:
      draw_landmarks(image, results.multi_hand_landmarks)
      
    # Flip the image horizontally for a selfie-view display.
    imshow('MediaPipe Hands', image)
    
    key = cv2.waitKey(1)
    if key == ord('q'):
      break
  
cap.release()
cv2.destroyAllWindows()

[ WARN:0@3710.638] global cap_v4l.cpp:969 open VIDEOIO(V4L2): can't find camera device
[ERROR:0@3710.638] global obsensor_uvc_stream_channel.cpp:156 getStreamChannelGroup Camera index out of range


In [36]:
res = model.predict(x_text)

for i in range(len(res)):
  pre = ALL_SIGNS[argmax(res[i])]
  rel = ALL_SIGNS[argmax(y_test[i])]
  if rel != pre:
    print(f'prediciton: {pre}, real: {rel}')


prediciton: b, real: c
