## 1. Import libraries

In [None]:
import cv2 
import mediapipe as mp
import numpy as np
import os
import matplotlib.pyplot as plt

## 2. Initialize MediaPipe modules

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model 
mp_drawing = mp.solutions.drawing_utils # Drawing utilities 
mp_face_mesh = mp.solutions.face_mesh

In [None]:
#     conda create -n my_gpu_env python=3.x
#     conda activate my_gpu_env

# conda install pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia
import torch
torch.cuda.is_available()

## 3. Define Helper

In [None]:
def mediapipe_detection(image, model): 
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.setflags(write=False)         # Image is no longer writable
    results = model.process(image)      # Make prediction
    image.setflags(write=True)          # Image is now writable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results


## 4. Drawing Function

In [None]:
def draw_styled_landmarks(image, results):

    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_face_mesh.FACEMESH_CONTOURS,
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),  
                              mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                              )
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

## 5. Main Video Capturing Logic 
(show-no need for training)

In [None]:
# #Main function 
# cap = cv2.VideoCapture(0) 
# # Set mediapipe model  
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic: 
#     while cap.isOpened(): 
  
#         # Read feed 
#         ret, frame = cap.read() 
  
#         # Make detections 
#         image, results = mediapipe_detection(frame, holistic) 
#         print(results) 
          
#         # Draw landmarks 
#         draw_styled_landmarks(image, results) 
  
#         # Show to screen 
#         cv2.imshow('Holistic Model Output', image) 
  
#         # Break gracefully 
#         if cv2.waitKey(10) & 0xFF == ord('q'): 
#             break
#     cap.release() 
#     cv2.destroyAllWindows()

## 6. Extract Keypoint Values 
(x,y,z values from detected body + store in numpy arr or "0")

In [None]:
#same logic as above but in function, so can be used
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh]) # flatten arr of points x,y,z visibility values - single vector

In [None]:
# test f(x) if correct num
# extract_keypoints(results).shape

In [None]:
# test f(x) if correct num
# len(results.face/pose/lh/rh_landmarks.landmark) = 
# face model has 468*3 + pose(33*4) + lh/rh(21*3 and 21*3)
468*3+33*4+21*3+21*3 #total keypoints

## 7. Setup Folders for Collection

In [None]:
#var that holds path for exported data (numpy arrays)
DATA_PATH = os.path.join('MP_Data') 

# Actions that we try to detect
actions = np.array(['idontknow', 'areyouok', 'idontunderstand'])

# 30 videos worth of data (sequence of data)
no_sequences = 50 # 50 * 1662 keypoints to detect actions

# Videos are going to be 50 frames in length
sequence_length = 50

no need to rerun

In [None]:
# # 1 folder for each action(3) and inside 1 folder for each sequence(0-29)
# for action in actions: 
#     for sequence in range(no_sequences):
#         try: 
#             os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
#         except: # if created -> pass
#             pass

## 8. Collect Keypoint Values for Training and Testing
no need to re-run (have collected)

In [None]:
# cap = cv2.VideoCapture(0)
# # Set mediapipe model 
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
#     # NEW LOOP
#     # Loop through actions
#     for action in actions:
#         # Loop through sequences aka videos
#         for sequence in range(no_sequences):
#             # Loop through video length aka sequence length
#             for frame_num in range(sequence_length):

#                 # Read feed
#                 ret, frame = cap.read()

#                 # Make detections
#                 image, results = mediapipe_detection(frame, holistic)
# #                 print(results)

#                 # Draw landmarks
#                 draw_styled_landmarks(image, results)
                
#                 # NEW Apply wait logic
#                 if frame_num == 0: 
#                     cv2.putText(image, 'STARTING COLLECTION', (120,200), 
#                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
#                     cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
#                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
#                     # Show to screen
#                     cv2.imshow('OpenCV Feed', image)
#                     cv2.waitKey(2000)
#                 else: 
#                     cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
#                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
#                     # Show to screen
#                     cv2.imshow('OpenCV Feed', image)
                
#                 # NEW Export keypoints
#                 keypoints = extract_keypoints(results)
#                 npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
#                 np.save(npy_path, keypoints) #to save 'npy_path.npy' array of resulting keypoints

#                 # Break gracefully
#                 if cv2.waitKey(10) & 0xFF == ord('q'):
#                     break
                    
#     cap.release()
#     cv2.destroyAllWindows()

In [None]:
# cap.release()
# cv2.destroyAllWindows()

## 9. Preprocess Data and Create Labels and Features

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical # to one-hot encoding data

In [None]:
label_map = {label:num for num, label in enumerate(actions)}
# label dictionary to represent each one of our action

In [None]:
label_map

In [None]:
# sequences, labels = [], [] #x, y data
# for action in actions:
#     for sequence in range(no_sequences):
#         window = []
#         for frame_num in range(sequence_length):
#             res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
#             window.append(res) #grab frame, add to window(video)
#         sequences.append(window)
#         labels.append(label_map[action])

## With augmentation

In [None]:
# import random

# SEED = 42
# np.random.seed(SEED)
# random.seed(SEED)
# torch.manual_seed(SEED)

In [None]:
# applying augmentation to each frame
def augment_landmarks(frame, apply_prob=0.5): # probability of applying <50%
    augmented = frame.copy()

    if np.random.rand() < apply_prob:
        # Gaussian
        augmented = augmented + np.random.normal(0, 0.01, augmented.shape)

    # if np.random.rand() < apply_prob:
    #     # Scale
    #     scale = np.random.uniform(0.9, 1.1)
    #     augmented = augmented * scale

    # if np.random.rand() < apply_prob:
    #     # Translation (shifting)
    #     shift = np.random.uniform(-0.05, 0.05, augmented.shape)
    #     augmented = augmented + shift

    # if np.random.rand() < apply_prob:
    #     if augmented.ndim == 2 and augmented.shape[1] >= 2:
    #         theta = np.random.uniform(-10, 10) * np.pi / 180
    #         rotation_matrix = np.array([
    #             [np.cos(theta), -np.sin(theta)],
    #             [np.sin(theta), np.cos(theta)]
    #         ])
    #         augmented[:, :2] = augmented[:, :2] @ rotation_matrix
    
    return augmented

In [None]:
# apply augmentation on sequence of frames (50)
def augment_sequence(windows, apply_prob=0.3, target_len=50):
    augmented = np.array(windows).copy()
    # if np.random.rand() < apply_prob:
    #     # Drop random fram
    #     drop_idx = np.random.randint(0, len(augmented))
    #     augmented = np.delete(augmented, drop_idx, axis=0)

    if np.random.rand() < apply_prob:
        shift = np.random.randint(-2,2)
        augmented = np.roll(augmented, shift=shift, axis=0)
    
    if np.random.rand() < apply_prob:
        # Speed up/Slow down
        factor = np.random.choice([0.9, 1.1])
        idxs = np.linspace(0, len(augmented) - 1, int(len(augmented) * factor))
        idxs = np.clip(np.round(idxs), 0, len(augmented) - 1).astype(int)
        augmented = augmented[idxs]

    if len(augmented) < target_len:
        pad_width = ((0, target_len - len(augmented)), (0, 0))
        augmented = np.pad(augmented, pad_width, mode='constant')
    elif len(augmented) > target_len:
        augmented = augmented[:target_len]

    return augmented


In [None]:
augmented_sequences, augmented_labels = [], []
for seq, label in zip(sequences, labels):
    augmented_sequences.append(seq)
    augmented_labels.append(label)
    
    if np.random.rand() < 0.3:
        augmented_sequences.append(augment_sequence(seq))
        augmented_labels.append(label)

In [None]:
sequences, labels = [], [] #x, y data
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))

            # Frame augmentation (Gaussian/scal/translation + drop/shift/)
            if np.random.rand() < 0.3:
                res = augment_landmarks(res)
            window.append(res) #grab frame, add to window(video)
        
        # Sequence augmentation (Gaussian/scal/translation + drop/shift/speed up/slow down)
        if np.random.rand() < 0.3:
            window = augment_sequence(window)

        sequences.append(window)
        labels.append(label_map[action])

In [None]:
X = np.array(sequences)

In [None]:
X.shape

In [None]:
y = to_categorical(labels).astype(int) #to 1 hot encoding

In [None]:
y

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [None]:
X_train.shape

In [None]:
y_test.shape

## 10. Build and Train LSTM Neural Network

In [None]:
from tensorflow.keras.models import Sequential #sequential api
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard #to monitor model as its training

In [None]:
EPOCHS = 400
RANDOM_SEED = 
OPTIMIZER = 'Adam'

In [None]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
# model = Sequential()

# model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(50,1662)))
# model.add(LSTM(128, return_sequences=True, activation='relu'))
# model.add(LSTM(64, return_sequences=False, activation='relu'))

# model.add(Dense(64, activation='relu'))
# model.add(Dense(32, activation='relu'))
# model.add(Dense(actions.shape[0], activation='softmax'))

## Simplify Model

In [None]:
model = Sequential()

model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(50,1662)))
model.add(LSTM(32, return_sequences=False, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

categorical_crossentropy -> multi-class classification

binary_crossentropy -> binary-class classification

MSE -> regressions

In [None]:
history = model.fit(
    X_train, y_train,
    epochs=400,
    callbacks=[tb_callback]  # TensorBoard callback
)


In Terminal move to Logs/train: 

tensorboard --logdir=. (to see logs)

In [None]:
# tensorboard --logdir=. 
# to see logdir

In [None]:
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(history.history['loss'], label='train_loss')
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()

plt.subplot(1,2,2)
plt.plot(history.history['categorical_accuracy'], label='train_accuracy')
plt.title("Training Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()

plt.show()

In [None]:
model.summary()
# LSTM not CNN layers, cuz CNN needs more data

##  Plot Augmentations

In [None]:
def plot_landmarks(original, augmented, num_points=543):
    if (original.ndim == 1):
        num_points = original.size // 3
        original = original.reshape((num_points, 3))
        augmented = augmented.reshape((num_points, 3))
    plt.figure(figsize=(8,4))
    plt.subplot(1,2,1)
    plt.scatter(original[:,0], original[:,1], c='blue', s=10)
    plt.title("Original")
    plt.gca().invert_yaxis()

    plt.subplot(1,2,2)
    plt.scatter(augmented[:,0], augmented[:,1], c='red', s=10)
    plt.title("Augmented")
    plt.gca().invert_yaxis()

    plt.show()

In [None]:
frame = np.load('../ActionDetectionforSignLanguage/MP_Data/areyouok/0/2.npy')
aug_frame = augment_landmarks(frame)
plot_landmarks(frame, aug_frame)

## 11. Make Predictions

In [None]:
res = model.predict(X_test)

In [None]:
actions[np.argmax(res[1])] #train data val

In [None]:
actions[np.argmax(y_test[1])] #pred of model

## 12. Save Model and Load Weights

In [None]:
model.save('400_simple_augm_mod.h5')

In [None]:
model.load_weights('400_simple_augm_mod.h5')

# 13. Evaluation using Confusion Matrix and Accuracy


In [None]:
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
# whats being detected as TP FN FP TN

In [None]:
yhat = model.predict(X_test)

In [None]:
# axis=1 -> want to convert second dim in the arr
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
ytrue

In [None]:
yhat # numbers of classes

In [None]:
multilabel_confusion_matrix(ytrue, yhat)

multilabel_confusion_matrix:
1) True Positive
2) False Positive
3) False Negative
4) True Negative

In [None]:
accuracy_score(ytrue, yhat)

# 14. Test in Real Time

In [None]:
# f to render probabilities
colors = [(245,117,16), (117,245,16), (16,117,245)] 
# color combo for each action: hello, thanks, iloveyou
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40),
                    (int(prob*100), 90+num*40),
                    colors[num], -1
        ) 
        # bar dynamically changes based on probability (longer = higher)
        cv2.putText(
            output_frame, actions[num],
            (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX,
            1, (255,255,255),
            2, cv2.LINE_AA)
        
    return output_frame

# 15. Real time test

In [None]:
# 1. New detection variables
sequence = [] # append 50 frames, once got 50 frames, predict
sentence = [] # concatenate history of detection together
predictions = []
threshold = 0.3

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.4, min_tracking_confidence=0.4) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)

        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints) 
        # sequence = sequence[:50] # but still taking first 50 frames, although added to the end
        sequence = sequence[-50:]
        
        if len(sequence) == 50:
            res = model.predict(np.expand_dims(sequence, axis=0))[0] #pass 1 sequence at a time
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res)) # append all preds to prediction arr
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): # grab only last 10 preds and only unique
                if res[np.argmax(res)] > threshold: #check if res above threshold 0.5
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]: #check if current action isn't same as last sentence
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1) # top corner, size, color, filled rect
        cv2.putText(image, ' '.join(sentence), (3,30),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) 
        # render sentence with space between (3,30 starting position)(font, font size, font color, font line width, line type)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
# model.predict(X_test[0]) # shape is incorrect ERROR!!!!
X_test[0].shape # model expects (num_sequences, 50 1662) -> expand dims

In [None]:
np.expand_dims(X_test[0], axis=0).shape

In [None]:
model.predict(np.expand_dims(X_test[0], axis=0))

In [None]:
res[np.argmax(res)] > threshold

In [None]:
model.predict(np.expand_dims(X_test[0], axis=0))

## After training Sign Language Model -> Train Emotion Rec Model

### Sign Language DATASET:
- ASL Dataset
- WLASL (Word-Level American Sign Language)

### Emotion DATASET:
- FER2013
- AffectNet
- CK+

In [None]:
# 1. Capture frame from webcam or video
frame = get_video_frame()

# 2. Extract landmarks using MediaPipe
results = holistic.process(frame)

# 3. Get input for sign model (hands + pose keypoints)
sign_input = extract_sign_keypoints(results)

# 4. Get input for emotion model (face crop or landmarks)
face_crop = extract_face_image(frame, results)

# 5. Predict sign
sign_pred = sign_model.predict(sign_input)

# 6. Predict emotion
emotion_pred = emotion_model.predict(face_crop)

# 7. Display predictions
display(frame, sign=sign_pred, emotion=emotion_pred)