In [1]:
import cv2
import numpy as np
import keyboard
import mediapipe as mp
import tensorflow as tf
import time

In [2]:
def load_model(tflite_save_path):
    interpreter = tf.lite.Interpreter(model_path=tflite_save_path)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    return interpreter, input_details, output_details

def gesture_preprocess(landmark):
    """
    convert landmarks for trainable data
    66 features
    X (21): 0-20
    Y (21): 21-41
    Z (21): 42-62
    X,Y,Z range (3): 63-65

    params landmark: mediapipe landmark for 1 hand
    params label: str
    return: np.array (1,66)
    """
    lm_x = np.array([])
    lm_y = np.array([])
    lm_z = np.array([])
    for hlm in landmark.landmark:
        lm_x = np.append(lm_x, hlm.x)
        lm_y = np.append(lm_y, hlm.y)
        lm_z = np.append(lm_z, hlm.z)
    data_gest = [lm_x, lm_y, lm_z]
    x_rng, y_rng, z_rng = lm_x.max() - lm_x.min(), lm_y.max() - lm_y.min(), lm_z.max() - lm_z.min()
    data_gest = np.ravel([(k - k.min()) / (k.max() - k.min()) for i, k in enumerate(data_gest)])
    data_gest = np.append(data_gest, [x_rng, y_rng, z_rng])
    return data_gest.astype('float32')

def gesture_inference(data):
    """
    inference

    param data: np.array
    return: int class
    """
    interpreter.set_tensor(input_details[0]['index'], np.array([data]))
    interpreter.invoke()
    tflite_results = interpreter.get_tensor(output_details[0]['index'])
    inf_idx = np.argmax(np.squeeze(tflite_results))
    if np.squeeze(tflite_results)[inf_idx] < 0.95:
        return -1
    return inf_idx

In [3]:
def gesture_pipeline(image, gest_time, hands, mp_hands, mp_drawing, mp_drawing_styles, debug=True):
    """
    param image: stream image
    param gest_time: timer
    param debug: bool - debug view
    return int: gesture id
    return image: drawn image
    return time: updated timer
    """
    inf_class = {-1: 'None', 0: 'Hit', 1: 'Stand', 2: 'Split', 3: 'Reset'}
    inf_class_idx = -1
    
    image.flags.writeable = False
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = hands.process(image)
    
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    if results.multi_hand_landmarks:
        if (time.time() - gest_time) > 0.5:
#                 print("detected hand")
                for hand_landmarks in results.multi_hand_landmarks:
                    # inference
                    gest_data = gesture_preprocess(hand_landmarks)
                    inf_class_idx = gesture_inference(gest_data)
                    
                    if debug:
                        # draw
                        mp_drawing.draw_landmarks(
                            image,
                            hand_landmarks,
                            mp_hands.HAND_CONNECTIONS,
                            mp_drawing_styles.get_default_hand_landmarks_style(),
                            mp_drawing_styles.get_default_hand_connections_style())
    else:
        gest_time = time.time()
    return inf_class[inf_class_idx], image, gest_time

In [4]:
rtsp_url = "rtsp://192.168.1.98:8554/unicast"
stream_video = cv2.VideoCapture(rtsp_url)

debug = True

In [5]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands

tflite_save_path = 'blackbeard/gesture/model/model.tflite'
interpreter, input_details,output_details = load_model(tflite_save_path)

# inf_class = {-1: 'None', 0: 'Hit', 1: 'Stand', 2: 'Split', 3: 'Reset'}
# inf_class_idx = -1

In [6]:
with mp_hands.Hands(
    max_num_hands=1,
    model_complexity=1,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as hands:
    detect_time = time.time()
    while stream_video.isOpened():

        # Reading image from stream
        _, image = stream_video.read()

        ########################################################
        # ######### START OBJECT DETECTION PIPELINE #########  #

        # Get detected objects from stream
#         for detected_objects in object_detection(net, obj_labels, image, cuda=1):

#             print("[INFO] Card Detected:", detected_objects)  # for debug
#             print("---------------------------------------------")  # for debug

        # ########## END OBJECT DETECTION PIPELINE ##########  #
        ########################################################

        ########################################################
        # ############## START GESTURE PIPELINE #############  #
        
        gest_class, image, detect_time = gesture_pipeline(image, 
                                                          detect_time, 
                                                          hands, 
                                                          mp_hands, 
                                                          mp_drawing, 
                                                          mp_drawing_styles, 
                                                          debug)
#         image.flags.writeable = False
#         image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
#         results = hands.process(image)
        
#         image.flags.writeable = True
#         image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
#         if results.multi_hand_landmarks:
#             if (time.time() - detect_time) > 0.5:
#                     print("detected hand")
#                     for hand_landmarks in results.multi_hand_landmarks:
#                         # inference
#                         gest_data = gesture_preprocess(hand_landmarks)
#                         inf_class_idx = gesture_inference(gest_data)

#                         # draw
#                         mp_drawing.draw_landmarks(
#                             image,
#                             hand_landmarks,
#                             mp_hands.HAND_CONNECTIONS,
#                             mp_drawing_styles.get_default_hand_landmarks_style(),
#                             mp_drawing_styles.get_default_hand_connections_style())
#         else:
#             detect_time = time.time()

        # ############### END GESTURE PIPELINE ##############  #
        ########################################################

        ########################################################
        # ######## START BLACKJACK STRATEGY PIPELINE ########  #

        # Insert code here
        # ######### END BLACKJACK STRATEGY PIPELINE #########  #
        ########################################################

        # debug view
        if debug:
            cv2.putText(image, f"{gest_class}", (0, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
            cv2.imshow('Debug View', image)
            
        if cv2.waitKey(5) & 0xFF == 27:
            break

cv2.destroyAllWindows()
stream_video.release()