In [3]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.metrics import confusion_matrix, accuracy_score


In [4]:
actions = ['chop', 'left', 'pinch' ]
DATA_PATH = 'data_npy'
numberOfFrame = 30


for action in actions: 
    for sequence in range(30):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [5]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [6]:
def mediapipe_detection(image, holistic_model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = holistic_model.process(image)        # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [7]:
def draw_styled_landmarks(image, results):
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [8]:
def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh])

In [9]:
def process_npy():    
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        
        for action in actions:
            
            # Lấy danh sách tất cả video trong folder của action cụ thể
            video_folder = f'data_backup/{action}'
            videos = [f for f in os.listdir(video_folder) if f.endswith('.avi')]
            
            for videoCounter, videoFile in enumerate(videos):
                
                cap = cv2.VideoCapture(os.path.join(video_folder, videoFile))
                framesCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                skipFrame = max(int(framesCount/30), 1)
                
                for frameCounter in range(numberOfFrame):
                    
                    cap.set(cv2.CAP_PROP_POS_FRAMES, frameCounter * skipFrame)
                    ret, frame = cap.read()
                    
                    if not ret:
                        break
                    
                    image, results = mediapipe_detection(frame, holistic)
                    draw_styled_landmarks(image, results)
                    cv2.imshow('OpenCV Feed', image)
                    
                    keypoints = extract_keypoints(results)
                    
                    # Tạo thư mục để lưu các keypoints của video và frame hiện tại
                    npy_dir = os.path.join('data_npy', action, str(videoCounter))
                    os.makedirs(npy_dir, exist_ok=True)  # Tạo thư mục nếu chưa tồn tại
                    
                    npy_path = os.path.join('data_npy', action, str(videoCounter), str(frameCounter))
                    np.save(npy_path, keypoints)
                    
                    print(f'{action}: saving frame {frameCounter} of video {videoCounter}')
                    
                    if cv2.waitKey(10) & 0xFF == ord('q'):
                        break
                
                cap.release()
        cv2.destroyAllWindows()

In [24]:
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
#     # Lấy danh sách tất cả video trong folder của hành động cụ thể
#     video_folder = f'data/left'
#     videos = [f for f in os.listdir(video_folder) if f.endswith('.avi')][:100]  # Chỉ load số lượng video đã chọn
    
#     for videoCounter, videoFile in enumerate(videos):
        
#         cap = cv2.VideoCapture(os.path.join(video_folder, videoFile))
#         framesCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#         skipFrame = max(int(framesCount/30), 1)  # Điều chỉnh số khung hình để lấy mẫu
        
#         for frameCounter in range(30):  # Lấy mẫu 30 khung hình từ mỗi video
            
#             cap.set(cv2.CAP_PROP_POS_FRAMES, frameCounter * skipFrame)
#             ret, frame = cap.read()
            
#             if not ret:
#                 break
            
#             # Phát hiện và vẽ landmarks
#             image, results = mediapipe_detection(frame, holistic)
#             draw_styled_landmarks(image, results)
#             cv2.imshow('OpenCV Feed', image)
            
#             # Trích xuất keypoints từ kết quả
#             keypoints = extract_keypoints(results)
            
#             # Tạo thư mục để lưu các keypoints của video và frame hiện tại
#             npy_dir = os.path.join('data_npy', 'left', str(videoCounter))
#             os.makedirs(npy_dir, exist_ok=True)  # Tạo thư mục nếu chưa tồn tại
            
#             npy_path = os.path.join('data_npy', 'left', str(videoCounter), str(frameCounter))
#             np.save(npy_path, keypoints)
            
#             print(f'left: saving frame {frameCounter} of video {videoCounter}')
            
#             if cv2.waitKey(10) & 0xFF == ord('q'):
#                 break
        
#         cap.release()
# cv2.destroyAllWindows()



left: saving frame 0 of video 0
left: saving frame 1 of video 0
left: saving frame 2 of video 0
left: saving frame 3 of video 0
left: saving frame 4 of video 0
left: saving frame 5 of video 0
left: saving frame 6 of video 0
left: saving frame 7 of video 0
left: saving frame 8 of video 0
left: saving frame 9 of video 0
left: saving frame 10 of video 0
left: saving frame 11 of video 0
left: saving frame 12 of video 0
left: saving frame 13 of video 0
left: saving frame 14 of video 0
left: saving frame 15 of video 0
left: saving frame 16 of video 0
left: saving frame 17 of video 0
left: saving frame 18 of video 0
left: saving frame 19 of video 0
left: saving frame 20 of video 0
left: saving frame 21 of video 0
left: saving frame 22 of video 0
left: saving frame 23 of video 0
left: saving frame 24 of video 0
left: saving frame 25 of video 0
left: saving frame 26 of video 0
left: saving frame 27 of video 0
left: saving frame 28 of video 0
left: saving frame 29 of video 0
left: saving frame 0

In [10]:
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'chop': 0, 'left': 1, 'pinch': 2}

In [42]:
def create_dataset():   
    sequences, labels = [], []    
    for action in actions:
        
        # Lấy danh sách tất cả video trong folder của action cụ thể
        video_folder = f'data_npy/{action}'
        videos = sorted([f for f in os.listdir(video_folder) if os.path.isdir(os.path.join(video_folder, f))], key=lambda x: int(x))
        
        videos = videos[:200]
        
        # Lặp qua tất cả video trong folder
        for sequence in videos:
            print(sequence)
            window = []
            for frame_num in range(30):  # Giả định rằng mỗi video có 30 frame cần xử lý
                # Đọc file numpy của từng frame
                print(f'{action}: Loading frame {frame_num} of video {sequence}')
                res = np.load(os.path.join('data_npy', action, sequence, "{}.npy".format(frame_num)))
                window.append(res)
            sequences.append(window)
            labels.append(label_map[action])
    
    return sequences, labels

In [43]:
sequences, labels = create_dataset()

X = np.array(sequences)
y = np.array(labels)

print(X.shape)
print(y.shape)

y = to_categorical(labels).astype(int)
# print(y.shape)

# y = y.argmax(axis=1)

0
chop: Loading frame 0 of video 0
chop: Loading frame 1 of video 0
chop: Loading frame 2 of video 0
chop: Loading frame 3 of video 0
chop: Loading frame 4 of video 0
chop: Loading frame 5 of video 0
chop: Loading frame 6 of video 0
chop: Loading frame 7 of video 0
chop: Loading frame 8 of video 0
chop: Loading frame 9 of video 0
chop: Loading frame 10 of video 0
chop: Loading frame 11 of video 0
chop: Loading frame 12 of video 0
chop: Loading frame 13 of video 0
chop: Loading frame 14 of video 0
chop: Loading frame 15 of video 0
chop: Loading frame 16 of video 0
chop: Loading frame 17 of video 0
chop: Loading frame 18 of video 0
chop: Loading frame 19 of video 0
chop: Loading frame 20 of video 0
chop: Loading frame 21 of video 0
chop: Loading frame 22 of video 0
chop: Loading frame 23 of video 0
chop: Loading frame 24 of video 0
chop: Loading frame 25 of video 0
chop: Loading frame 26 of video 0
chop: Loading frame 27 of video 0
chop: Loading frame 28 of video 0
chop: Loading frame 29

In [44]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
y_test.shape

(100, 3)

In [25]:
# log_dir = os.path.join('Logs')
# tb_callback = TensorBoard(log_dir=log_dir)

In [45]:
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,126)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(3, activation='softmax'))

model.summary()

  super().__init__(**kwargs)


In [46]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [47]:
model.fit(X_train, y_train, epochs=200, shuffle=True, validation_split=0.2)

Epoch 1/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 84ms/step - accuracy: 0.3857 - loss: 1.0316 - val_accuracy: 0.5500 - val_loss: 0.7033
Epoch 2/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - accuracy: 0.6281 - loss: 0.7982 - val_accuracy: 0.7000 - val_loss: 0.5918
Epoch 3/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - accuracy: 0.7064 - loss: 0.5972 - val_accuracy: 0.7125 - val_loss: 0.6125
Epoch 4/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step - accuracy: 0.7358 - loss: 0.6772 - val_accuracy: 0.7000 - val_loss: 0.5830
Epoch 5/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step - accuracy: 0.7400 - loss: 0.5624 - val_accuracy: 0.7750 - val_loss: 1.3563
Epoch 6/200
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step - accuracy: 0.8606 - loss: 0.3661 - val_accuracy: 0.8625 - val_loss: 0.5123
Epoch 7/200
[1m10/10[0m [

<keras.src.callbacks.history.History at 0x29f8501f140>

In [48]:
res = model.predict(X_test)
actions[np.argmax(res[4])]

[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 111ms/step


'left'

In [49]:
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()


[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step


In [50]:
confusion_matrix(ytrue, yhat)
# accuracy_score(ytrue, yhat)

array([[41,  0,  0],
       [ 0, 20,  0],
       [ 1,  0, 38]], dtype=int64)

In [38]:
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

In [56]:
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
frame_counter = 0  # Khởi tạo biến đếm

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        
        frame_counter += 1  # Tăng biến đếm lên mỗi khi đọc khung hình
        if frame_counter % 3 != 0:  # Chỉ xử lý mỗi khung hình thứ 3 (tùy chỉnh)
            continue

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            res = model.predict(np.expand_dims(sequence, axis=0))[0]
            print(actions[np.argmax(res)])
            predictions.append(np.argmax(res))
            
            
        #3. Viz logic
            if np.unique(predictions[-10:])[0]==np.argmax(res): 
                if res[np.argmax(res)] > threshold: 
                    
                    if len(sentence) > 0: 
                        if actions[np.argmax(res)] != sentence[-1]:
                            sentence.append(actions[np.argmax(res)])
                    else:
                        sentence.append(actions[np.argmax(res)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

            # Viz probabilities
            image = prob_viz(res, actions, image, colors)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (3,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti