In [None]:
%pip install mediapipe opencv-python numpy scikit-learn tensorflow
%pip install xgboost

In [None]:
VIDEO_PATH = "videos"
DATASET_PATH = "dataset"

# Whether to do processing in the background, which is faster,
# but you don't see the landmarks while processing
BACKGROUND = True

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import os
import pathlib
import tensorflow as tf
import xgboost as xgb

from keras import layers, Sequential
from keras.callbacks import TensorBoard, EarlyStopping
from keras.optimizers import AdamW
from keras.utils import to_categorical, pad_sequences
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split

In [None]:
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    out = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, out


def get_landmarks(detection):
    return (
        # Since we use just one hand at a time, the first hand [0] is the only one we need
        np.array(
            [[l.x, l.y, l.z] for l in detection.multi_hand_landmarks[0].landmark]
        ).flatten()
        if detection.multi_hand_landmarks
        else np.zeros(21 * 3)  # 21 landmarks with 3 coordinates
    )


def get_video_filenames(path: str) -> dict[str, list[str]]:
    if not os.path.isdir(path):
        raise Exception(f"{path} is not a valid directory!")

    gestures = dict()
    for directory in os.listdir(path):
        if not os.path.isdir(os.path.join(path, directory)):
            continue

        gestures[directory] = []
        for file in os.listdir(os.path.join(path, directory)):
            if not file.endswith(".mp4"):
                continue

            gestures[directory].append(os.path.join(path, directory, file))

    return gestures


def cvexit(vid):
    vid.release()
    cv2.destroyAllWindows()
    raise SystemExit()

In [None]:
with mp_hands.Hands(
    min_detection_confidence=0.8,
    min_tracking_confidence=0.5,
    max_num_hands=1,
) as hands:
    filenames = get_video_filenames(VIDEO_PATH)
    for gesture in filenames:
        for filename in filenames[gesture]:
            # Skip already processed videos
            if os.path.exists(
                os.path.join(
                    DATASET_PATH, gesture, pathlib.Path(filename).stem + ".npy"
                )
            ):
                continue

            cap = cv2.VideoCapture(filename)
            sequences = []
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                img, res = mediapipe_detection(frame, hands)
                landmarks = get_landmarks(res)
                sequences.append(landmarks)

                if not BACKGROUND:
                    mp_drawing.draw_landmarks(img, landmarks, mp_hands.HAND_CONNECTIONS)
                    cv2.imshow("Landmark detection", img)

                    # Exit if pressing esc
                    if cv2.waitKey(1) == 27:
                        cvexit(cap)

            gesture_dir = os.path.join(DATASET_PATH, gesture)
            if not os.path.isdir(gesture_dir):
                os.makedirs(gesture_dir, exist_ok=True)

            out_filename = pathlib.Path(filename).stem + ".npy"
            np.save(os.path.join(gesture_dir, out_filename), np.array(sequences))
            print("Processed", os.path.join(gesture, out_filename))

# Cap will not be defined if there were no videos to process
try:
    cap.release()
except NameError:
    pass

cv2.destroyAllWindows()

In [None]:
def get_dataset_arrays(path):
    if not os.path.isdir(path):
        raise Exception(f"{path} is not a valid directory!")

    # First pass is just to get the label names (letters)
    label_map = dict()
    for letter in os.listdir(path):
        if not os.path.isdir(os.path.join(path, letter)):
            continue

        label_map[letter] = len(label_map)

    # Second pass is to get the actual data
    sequences, labels = [], []
    for letter in os.listdir(path):
        if not os.path.isdir(os.path.join(path, letter)):
            continue

        for file in os.listdir(os.path.join(path, letter)):
            if not file.endswith(".npy"):
                continue

            sequences.append(np.load(os.path.join(path, letter, file)))
            labels.append(label_map[letter])

    return label_map, np.array(labels), sequences

In [None]:
label_map, labels, sequences = get_dataset_arrays(DATASET_PATH)
labels = to_categorical(labels).astype(int)

In [None]:
win_size = 30  # Size in frames

split_seqs = []
split_labels = []

if win_size > 0:
    for seq, label in zip(sequences, labels):
        for i in range(0, len(seq), win_size):
            split_seqs.append(seq[i : i + win_size])
            split_labels.append(label)

    # We need to pad all sequences to the same length (length of the longest sequence)
    # The smaller the window size, the less padding will be added
    split_seqs = pad_sequences(split_seqs)
    split_labels = np.array(split_labels)
else:
    split_seqs = pad_sequences(sequences)
    split_labels = labels

In [None]:
train_seqs, test_seqs, train_labels, test_labels = [], [], [], []
for letter in label_map:
    _train_seqs, _test_seqs, _train_labels, _test_labels = train_test_split(
        split_seqs, split_labels, test_size=0.2
    )
    train_seqs.append(_train_seqs)
    test_seqs.append(_test_seqs)
    train_labels.append(_train_labels)
    test_labels.append(_test_labels)

train_seqs = np.concatenate(train_seqs)
test_seqs = np.concatenate(test_seqs)
train_labels = np.concatenate(train_labels)
test_labels = np.concatenate(test_labels)

In [None]:
print("All labels:", split_labels.shape)
print("All sequences:", split_seqs.shape)

print("Train sequences:", train_seqs.shape)
print("Test sequences:", test_seqs.shape)
print("Train labels:", train_labels.shape)
print("Test labels:", test_labels.shape)

In [None]:
train_labels_indices = np.argmax(train_labels, axis=1)
test_labels_indices = np.argmax(test_labels, axis=1)

dtrain = xgb.DMatrix(
    train_seqs.reshape(train_seqs.shape[0], -1), label=train_labels_indices
)
dtest = xgb.DMatrix(
    test_seqs.reshape(test_seqs.shape[0], -1), label=test_labels_indices
)

params = {
    "max_depth": 3,
    "eta": 0.1,
    "objective": "multi:softprob",
    "num_class": train_labels.shape[1],
}

num_round = 100  # number of training iterations
bst = xgb.train(params, dtrain, num_round)

preds = bst.predict(dtest)
best_preds = np.asarray([np.argmax(line) for line in preds])

print("Accuracy score:", accuracy_score(test_labels_indices, best_preds))
bst.save_model("xgb_model.model")

In [None]:
model = Sequential(
    layers=[
        layers.LSTM(
            units=64,
            return_sequences=True,
            activation="tanh",
            input_shape=(train_seqs.shape[1], train_seqs.shape[2]),
        ),
        layers.LSTM(units=128, return_sequences=True, activation="relu"),
        layers.LSTM(units=64, return_sequences=False, activation="relu"),
        layers.Dense(units=64, activation="relu"),
        layers.Dense(units=32, activation="relu"),
        layers.Dense(labels.shape[1], activation="softmax"),
    ]
)
model.summary()

In [None]:
# model = Sequential(
#     layers=[
#         layers.LSTM(100, input_shape=(split_seqs.shape[1], split_seqs.shape[2])),
#         layers.Dropout(0.5),
#         layers.Dense(100, activation="relu"),
#         layers.Dense(labels.shape[1], activation="softmax"),
#     ]
# )
# model.summary()

In [None]:
# model = Sequential(
#     layers=[
#         layers.Conv2D(
#             filters=1,
#             kernel_size=(2, 2),
#             activation="relu",
#             padding="same",
#             input_shape=(split_seqs.shape[1], split_seqs.shape[2], 1),
#         ),
#         layers.MaxPooling2D((2, 2)),
#         layers.Flatten(),
#         layers.Dense(labels.shape[1], activation="softmax"),
#     ]
# )
# model.summary()

In [None]:
optimizer = AdamW()
model.compile(
    optimizer=optimizer,
    loss="categorical_crossentropy",
    metrics=["categorical_accuracy"],
)

# Delete previous Tensorboard logs
if os.path.isdir("train"):
    for file in os.listdir("train"):
        if file.startswith("events.out.tfevents"):
            os.remove(os.path.join("train", file))

model.fit(
    train_seqs,
    train_labels,
    epochs=200,
    callbacks=[TensorBoard(log_dir=""), EarlyStopping(monitor="loss")],
    use_multiprocessing=True,
)

labels_hat = np.argmax(model.predict(test_seqs), axis=1).tolist()
labels_true = np.argmax(test_labels, axis=1).tolist()

print(multilabel_confusion_matrix(labels_true, labels_hat))
print("Accuracy score: {:.3f}".format(accuracy_score(labels_true, labels_hat)))

In [None]:
# Save the weights
model.save("model.keras")

# Convert the Keras model to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS,
]
converter._experimental_lower_tensor_list_ops = False
tflite_model = converter.convert()

# Save the TensorFlow Lite model
with open("app/src/main/assets/model.tflite", "wb") as f:
    f.write(tflite_model)