# 1. Import and Install Dependencies

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

# 2. Keypoints using MP Holistic

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR CONVERSION RGB 2 BGR
    return image, results

In [None]:
def draw_styled_landmarks(image, results):
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color = (121,22,76), thickness = 2, circle_radius = 4), # Color landmarks
                             mp_drawing.DrawingSpec(color = (121,44,150), thickness = 2, circle_radius = 2) # Color connections
                             ) 
    
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color = (245,117,66), thickness = 2, circle_radius = 4), # Color landmarks
                             mp_drawing.DrawingSpec(color = (245,66,230), thickness = 2, circle_radius = 2) # Color connections
                             ) 

In [None]:
cap = cv2.VideoCapture(1)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence = 0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

# 3. Extract Keypoint Values

In [None]:
def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh])

In [None]:
extract_keypoints(results).shape # 21*3 + 21*3

# 4. Setup Folders for Collection

In [None]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data')

# Actions that we try to detect
actions = np.array(['apa', 'bagaimana', 'berapa', 'di mana', 'mengapa', 'siapa'])

# Thirty videos worth of data
no_sequences = 30

# total 510 videos worth of data (+augmented)
no_augmented_sequences = 510

# Videos are going to be 30 frames in length
sequence_length = 30

In [None]:
for action in actions:
    for sequence in range(no_augmented_sequences):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

# 5. Collect Keypoint Values for Training and Testing

In [None]:
cap = cv2.VideoCapture(1)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence = 0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through sequence length aka video length
            for frame_num in range(sequence_length):
                # Declare npy path
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))

                # Read feed
                ret, frame = cap.read()
                
                # Save each frame as jpg
                cv2.imwrite(f'{npy_path}.jpg', frame)

                # Make detections
                image, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0:
                    cv2.putText(image, 'STARTING COLLECTION', (120, 200),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video number {}'.format(action, sequence), (15, 12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else:
                    cv2.putText(image, 'Collecting frames for {} Video number {}'.format(action, sequence), (15, 12),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    
                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

# 6. Data Augmentation

In [None]:
def augmentation_rotation(image, angle):
    # Rotate the image
    array = [15,30,45,345,330,315]
    image_list = []
    for element in array:
        rows, cols, _ = image.shape
        M = cv2.getRotationMatrix2D((cols / 2, rows / 2), element, 1)
        augmented_image = cv2.warpAffine(image, M, (cols, rows))
        image_list.append(augmented_image)
    # Show to screen
    return image_list

In [None]:
def zoomin(image, zoom_factor):
    height, width = image.shape[:2]
    new_width = int(width * zoom_factor)
    new_height = int(height * zoom_factor)
    left = int((width - new_width))
    top = int((height - new_height))
    right = int((width + new_width))
    bottom = int((height + new_height))
    cropped_image = image[top:bottom, left:right]
    zoom_in_width = int(image.shape[1])
    zoom_in_height = int(image.shape[0])
    zoom_in_image = cv2.resize(cropped_image, (zoom_in_width, zoom_in_height))
    return zoom_in_image

def augmentation_zoomin(image):
    array = [0.9,0.8,0.7]
    image_list = []
    for element in array:
        augmented_image = zoomin(image, element)
        image_list.append(augmented_image)
    return image_list



In [None]:
def zoomout(image, zoom_factor):
    height, width = image.shape[:2]
    new_width = int(width * zoom_factor)
    new_height = int(height * zoom_factor)

    # Compute the aspect ratio difference
    width_ratio = new_width / width
    height_ratio = new_height / height
    aspect_ratio = min(width_ratio, height_ratio)

    # Compute the new dimensions
    new_width = int(width * aspect_ratio)
    new_height = int(height * aspect_ratio)

    # Ensure that the new dimensions are not larger than the original image dimensions
    new_width = min(new_width, width)
    new_height = min(new_height, height)

    # Compute the black border dimensions
    border_width = width - new_width
    border_height = height - new_height

    # Create a black border around the zoomed-out image
    border_color = (0, 0, 0)  # Black color
    bordered_image = cv2.copyMakeBorder(image, border_height, border_height, border_width, border_width,
                                        cv2.BORDER_CONSTANT, value=border_color)
    zoomed_out_image = cv2.resize(bordered_image, (width, height))
    return zoomed_out_image

def augmentation_zoomout(image):
    array = [0.9,0.8,0.7]
    image_list = []
    for element in array:
        augmented_image = zoomout(image, element)
        image_list.append(augmented_image)
    return image_list


In [None]:
def augmentation_meta(image):
    # Adjust brightness and contrast of the image
    array = np.arange(0.5, 2.5, 0.5)
    image_list = []
    for element in array:
        brightness = element  # Brightness factor (0.0 to 1.0)
        contrast = element + 1  # Contrast factor (>1.0 for higher contrast)
        augmented_image = cv2.convertScaleAbs(image, alpha=contrast, beta=brightness)
        image_list.append(augmented_image)
    return image_list

In [None]:
def augmented_background(frame_aug, file_num, frame_number):
    frameaug = frame_aug
    seq_num = file_num
    augmented_rotation = augmentation_rotation(frameaug, 45)
    augmented_zoomin = augmentation_zoomin(frameaug)
    augmented_zoomout = augmentation_zoomout(frameaug)
    augmented_meta = augmentation_meta(frameaug)
    concated_image = augmented_rotation + augmented_zoomin + augmented_zoomout  + augmented_meta
    for  index, aug_image in enumerate(concated_image):
        image, results = mediapipe_detection(aug_image, holistic)
        keypoints = extract_keypoints(results)
        npy_path = os.path.join(DATA_PATH, action, str(seq_num), str(frame_number))
        print(npy_path)
        cv2.imwrite(f'{npy_path}.jpg', aug_image)
        np.save(npy_path, keypoints)
        seq_num+= 1

In [None]:
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence = 0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in actions:
        seq_num = 30
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through sequence length aka video length
            for frame_num in range(sequence_length):
                # Read feed
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                
                frame = cv2.imread(f'{npy_path}.jpg')
                augmented_background(frame,seq_num, frame_num)
            # Break gracefully
            seq_num += 16

# 7. Preprocess Data and Create Labels and Features

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(actions)}

In [None]:
label_map

In [None]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_augmented_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [None]:
np.array(sequences).shape

In [None]:
x = np.array(sequences)

In [None]:
y = to_categorical(labels).astype(int)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.05)

# 8. Build and Train LSTM Neural Network

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
model = Sequential()

model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,126)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))

model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
history = model.fit(x_train, y_train, epochs=150, validation_split = 0.1)

In [None]:
model.summary()

In [None]:
# Plot training & validation loss values
import matplotlib.pyplot as plt

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# Plot training & validation accuracy values
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

# 9. Save Weights

In [None]:
model.save('action.h5')

In [None]:
model.load_weights('action.h5') # for reloading model

# 10. Evaluation using Confusion Matrix and Accuracy

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

In [None]:
yhat = model.predict(x_test)

In [None]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
ytrue

In [None]:
yhat

In [None]:
# define class labels
class_labels = ['apa', 'bagaimana', 'berapa', 'di mana', 'mengapa', 'siapa']

# ytrue and yhat are the predicted and the actual labels
conf_matrix = confusion_matrix(ytrue, yhat)

accuracy = accuracy_score(ytrue, yhat)
precision = precision_score(ytrue, yhat, average='weighted')
recall = recall_score(ytrue, yhat, average='weighted')
f1 = f1_score(ytrue, yhat, average='weighted')

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.set(font_scale=1.2)
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=class_labels, yticklabels=class_labels)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

print("\nAccuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1-Score:", f1)

# Calculate classification report
report = classification_report(ytrue, yhat, target_names=class_labels)
print("\nClassification Report:\n", report)

In [None]:
multilabel_confusion_matrix(ytrue, yhat)

# 11. Test in Real Time

In [None]:
from scipy import stats

In [None]:
colors = [(245,117,16), (117,245,16), (16,117,245), (0, 0, 192), (204, 51, 255), (77, 107, 165)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [None]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.9

cap = cv2.VideoCapture(1)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
             #3. Viz logic
            # Filter out the noise
            if np.argmax(np.bincount(predictions[-10:]))==np.argmax(res):
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 4: 
                sentence = sentence[-4:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (10, 10, 10), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [None]:
plt.figure(figsize=(18,18))
plt.imshow(cv2.cvtColor(prob_viz(res, actions, image, colors), cv2.COLOR_BGR2RGB))

# 12. Manual Calculation