In [2]:
import socket
import threading
import cv2
import queue
import numpy as np
import tensorflow as tf
import mediapipe as mp

In [3]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image, holistic_model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = holistic_model.process(image)        # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

def extract_hand_keypoints(results):
    if results.left_hand_landmarks:
        lh = np.array([[res.x, res.y] for res in results.left_hand_landmarks.landmark])
    else:
        lh = np.zeros((21, 2))
    
    if results.right_hand_landmarks:
        rh = np.array([[res.x, res.y] for res in results.right_hand_landmarks.landmark])
    else:
        rh = np.zeros((21, 2))  
    return lh, rh

def extract_index_keypoints(results):
    if results.left_hand_landmarks:
        index_finger = results.left_hand_landmarks.landmark[8]
        return np.array([index_finger.x, index_finger.y])  # Tọa độ x, y của ngón trỏ
    else:
        return np.zeros(2)


def draw_styled_landmarks(image, results):
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [4]:
classes_direction = ['idle', 'up', 'down', 'left', 'right']

label_map_dir = {label:num for num, label in enumerate(classes_direction)}
label_map_dir

{'idle': 0, 'up': 1, 'down': 2, 'left': 3, 'right': 4}

In [5]:
classes_movement = ['movement', 'other']

label_map_check = {label:num for num, label in enumerate(classes_movement)}
label_map_check

{'movement': 0, 'other': 1}

In [6]:
classes_interaction = ['idle', 'chop', 'pinch']

label_map_interact = {label:num for num, label in enumerate(classes_interaction)}
label_map_interact

{'idle': 0, 'chop': 1, 'pinch': 2}

In [7]:
def normalize_points(points):
    origin = points[0]
    
    # Chuẩn hóa tọa độ bằng cách trừ tọa độ frame đầu tiên
    normalized_points = points - origin
    
    return normalized_points

In [8]:
def normalize_keypoints(keypoints):
    # Kiểm tra nếu có đủ số điểm (21 điểm cho mỗi bàn tay)
    if keypoints.shape[0] != 21:
        raise ValueError(f"Số lượng điểm keypoints không hợp lệ: {keypoints.shape[0]}")

    # Cổ tay là điểm đầu tiên trong keypoints (index 0)
    wrist = keypoints[0]
    
    # Dịch các điểm sao cho cổ tay trở thành gốc tọa độ (0, 0)
    normalized_keypoints = []
    for point in keypoints:
        normalized_point = (point[0] - wrist[0], point[1] - wrist[1])  # Chỉ cần dịch x, y
        normalized_keypoints.append(normalized_point)
    
    # Chuyển sang numpy array để dễ dàng tính toán min và max
    normalized_keypoints = np.array(normalized_keypoints)
    
    # Tính toán min và max cho x và y
    x_min, y_min = np.min(normalized_keypoints, axis=0)
    x_max, y_max = np.max(normalized_keypoints, axis=0)
    
    # Tránh chia cho 0 nếu max - min = 0
    if (x_max - x_min) == 0:
        x_min, x_max = 0, 1  
    if (y_max - y_min) == 0:
        y_min, y_max = 0, 1
    
    # Chuyển min và max về dạng numpy array để có thể tính toán đúng
    min_vals = np.array([x_min, y_min])
    max_vals = np.array([x_max, y_max])
    
    normalized_keypoints = (normalized_keypoints - min_vals) / (max_vals - min_vals)
    
    return normalized_keypoints

In [9]:
model_check = tf.keras.models.load_model('movement_check.keras')
model_check.summary()

In [10]:
model_direction = tf.keras.models.load_model('movement_direction.keras')
model_direction.summary()

In [11]:
model_interact = tf.keras.models.load_model('interaction.keras')
model_interact.summary()

In [12]:
model_check.compile(optimizer='Adam', loss='binary_crossentropy', metrics=['accuracy'])
model_direction.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])
model_interact.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [13]:
IP = '127.0.0.1'
PORT = 25001

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def SendData(message):
    s.sendto(message.encode(), (IP, PORT))
    print(f"{message} sent")

In [14]:
# # 1. New detection variables
# movement_seq = []
# interaction_seq = []

# threshold = 0.7

# frame_counter = 0  # Khởi tạo biến đếm

# isMovement = 0
# predictions_direction = 0
# predictions_interaction = 0

# cap = cv2.VideoCapture(0)
# # Set mediapipe model 
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#     while cap.isOpened():
        
#         # Read feed
#         ret, frame = cap.read()
        
#         frame_counter += 1 

#         # Make detections
#         image, results = mediapipe_detection(frame, holistic)
        
#         # Draw landmarks
#         draw_styled_landmarks(image, results)
        
#         lh_keypoints, rh_keypoints = extract_hand_keypoints(results)
#         index_keypoints = extract_index_keypoints(results)

#         movement_seq.append(index_keypoints)
        
#         # Interaction Predict
#         rh_keypoints = normalize_keypoints(rh_keypoints)
#         rh_keypoints = rh_keypoints.flatten()
#         interaction_seq.append(rh_keypoints)
#         interaction_seq = interaction_seq[-10:]
#         if len(interaction_seq) == 10:
#             res_interaction = model_interact.predict(np.expand_dims(interaction_seq, axis=0), verbose=0)[0]
#             predictions_interaction = np.argmax(res_interaction)
        
#         # Movement check
#         if frame_counter % 30 == 0:
#             lh_keypoints = normalize_keypoints(lh_keypoints)
#             lh_keypoints = np.expand_dims(lh_keypoints, axis=0)
#             res_check = model_check.predict(lh_keypoints, verbose=0)
#             isMovement = 0 if res_check[0] > 0.5 else 1
        
#         # if isMovement == 1:
#         seq = np.array(movement_seq[-10:])
#         seq = normalize_points(seq)
        
#         if len(seq) == 10:
#             res_direction = model_direction.predict(np.expand_dims(seq, axis=0), verbose=0)[0]
#             predictions_direction = np.argmax(res_direction)
        
        
#         # Send data through UDP
#         send_result = f"{isMovement}, {predictions_direction}, {predictions_interaction}"
#         SendData(send_result)
        
#         # Show to screen
#         cv2.imshow('OpenCV Feed', image)

#         if frame_counter % 100 == 0:
#             frame_counter = 0

#         # Break gracefully
#         if cv2.waitKey(10) & 0xFF == ord('q'):
#             break
#     cap.release()
#     cv2.destroyAllWindows()

In [15]:
# Biến lưu trữ trạng thái trước đó
prev_isMovement = None
prev_predictions_direction = None
prev_predictions_interaction = None

# Các biến mới
movement_seq = []
interaction_seq = []
threshold = 0.7
frame_counter = 0  # Khởi tạo biến đếm
isMovement = 0
predictions_direction = 0
predictions_interaction = 0

cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        
        # Read feed
        ret, frame = cap.read()

        frame_counter += 1 

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        lh_keypoints, rh_keypoints = extract_hand_keypoints(results)
        index_keypoints = extract_index_keypoints(results)

        movement_seq.append(index_keypoints)
        
        # Interaction Predict
        rh_keypoints = normalize_keypoints(rh_keypoints)
        rh_keypoints = rh_keypoints.flatten()
        interaction_seq.append(rh_keypoints)
        interaction_seq = interaction_seq[-10:]
        if len(interaction_seq) == 10:
            res_interaction = model_interact.predict(np.expand_dims(interaction_seq, axis=0), verbose=0)[0]
            predictions_interaction = np.argmax(res_interaction)
        
        # Movement check
        if frame_counter % 30 == 0:
            lh_keypoints = normalize_keypoints(lh_keypoints)
            lh_keypoints = np.expand_dims(lh_keypoints, axis=0)
            res_check = model_check.predict(lh_keypoints, verbose=0)
            isMovement = 0 if res_check[0] > 0.7 else 1
        
        # if isMovement == 1:
        seq = np.array(movement_seq[-10:])
        seq = normalize_points(seq)
        
        if len(seq) == 10:
            res_direction = model_direction.predict(np.expand_dims(seq, axis=0), verbose=0)[0]
            predictions_direction = np.argmax(res_direction)

        # Kiểm tra sự thay đổi trước khi gửi dữ liệu
        if (isMovement != prev_isMovement or
            predictions_direction != prev_predictions_direction or
            predictions_interaction != prev_predictions_interaction):
            
            # Gửi dữ liệu qua UDP nếu có sự thay đổi
            send_result = f"{isMovement}, {predictions_direction}, {predictions_interaction}"
            SendData(send_result)
            
            # Cập nhật giá trị trạng thái trước đó
            prev_isMovement = isMovement
            prev_predictions_direction = predictions_direction
            prev_predictions_interaction = predictions_interaction
        
        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        if frame_counter % 100 == 0:
            frame_counter = 0

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

0, 0, 0 sent
0, 0, 2 sent
0, 0, 1 sent
0, 0, 2 sent
0, 0, 0 sent
0, 3, 0 sent
0, 2, 0 sent
0, 4, 0 sent
0, 0, 0 sent
1, 0, 0 sent
1, 4, 0 sent
1, 0, 0 sent
1, 3, 0 sent
1, 0, 0 sent
1, 1, 0 sent
1, 0, 0 sent
1, 2, 0 sent
1, 0, 0 sent
1, 2, 0 sent


In [16]:
# # Thiết lập mô hình cho từng tác vụ
# def predict_interaction(interaction_seq):
#     # Interaction Predict
#     rh_keypoints = normalize_keypoints(interaction_seq)
#     rh_keypoints = rh_keypoints.flatten()
#     res_interaction = model_interact.predict(np.expand_dims(interaction_seq, axis=0), verbose=0)[0]
#     predictions_interaction = np.argmax(res_interaction)
#     return predictions_interaction

# def predict_movement_check(lh_keypoints):
#     # Movement check
#     lh_keypoints = normalize_keypoints(lh_keypoints)
#     lh_keypoints = np.expand_dims(lh_keypoints, axis=0)
#     res_check = model_check.predict(lh_keypoints, verbose=0)
#     isMovement = 0 if res_check[0] > 0.5 else 1
#     return isMovement

# def predict_direction(movement_seq):
#     # Direction Predict
#     seq = np.array(movement_seq[-10:])
#     seq = normalize_points(seq)
#     res_direction = model_direction.predict(np.expand_dims(seq, axis=0), verbose=0)[0]
#     predictions_direction = np.argmax(res_direction)
#     return predictions_direction

# # Thiết lập một hàm chung để chạy ba mô hình trên các luồng
# def run_model_in_threads():
#     interaction_seq = []
#     movement_seq = []
#     threshold = 0.7
#     frame_counter = 0  # Khởi tạo biến đếm

#     isMovement = 0
#     predictions_direction = 0
#     predictions_interaction = 0

#     cap = cv2.VideoCapture(0)
#     with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#         while cap.isOpened():
#             ret, frame = cap.read()
#             frame_counter += 1 

#             # Make detections
#             image, results = mediapipe_detection(frame, holistic)
#             draw_styled_landmarks(image, results)

#             lh_keypoints, rh_keypoints = extract_hand_keypoints(results)
#             index_keypoints = extract_index_keypoints(results)

#             movement_seq.append(index_keypoints)
#             interaction_seq.append(rh_keypoints)

#             # Cắt ngắn dữ liệu nếu quá dài
#             interaction_seq = interaction_seq[-10:]
#             if len(interaction_seq) == 10:
#                 # Khởi tạo 3 luồng tương ứng với 3 mô hình
#                 thread_interaction = threading.Thread(target=predict_interaction, args=(interaction_seq,))
#                 thread_movement = threading.Thread(target=predict_movement_check, args=(lh_keypoints,))
#                 thread_direction = threading.Thread(target=predict_direction, args=(movement_seq,))

#                 # Bắt đầu các luồng
#                 thread_interaction.start()
#                 thread_movement.start()
#                 thread_direction.start()

#                 # Đợi tất cả các luồng hoàn thành
#                 thread_interaction.join()
#                 thread_movement.join()
#                 thread_direction.join()

#                 # Sau khi các luồng hoàn thành, nhận kết quả từ các mô hình
#                 predictions_interaction = thread_interaction.result
#                 isMovement = thread_movement.result
#                 predictions_direction = thread_direction.result

#                 # Gửi dữ liệu qua UDP nếu có sự thay đổi
#                 send_result = f"{isMovement}, {predictions_direction}, {predictions_interaction}"
#                 SendData(send_result)

#             # Hiển thị kết quả
#             cv2.imshow('OpenCV Feed', image)

#             if frame_counter % 100 == 0:
#                 frame_counter = 0

#             if cv2.waitKey(10) & 0xFF == ord('q'):
#                 break
#         cap.release()
#         cv2.destroyAllWindows()


# run_model_in_threads()

In [17]:
# # Thiết lập mô hình cho từng tác vụ
# # def predict_interaction(interaction_seq, result_queue):
# #     rh_keypoints = normalize_keypoints(interaction_seq)
# #     rh_keypoints = rh_keypoints.flatten()
# #     res_interaction = model_interact.predict(np.expand_dims(interaction_seq, axis=0), verbose=0)[0]
# #     predictions_interaction = np.argmax(res_interaction)
# #     result_queue.put(('interaction', predictions_interaction))
    
# def predict_interaction(interaction_seq, result_queue):
#     # Chuyển đổi interaction_seq thành NumPy array trước khi sử dụng
#     interaction_seq = np.array(interaction_seq)
#     rh_keypoints = normalize_keypoints(interaction_seq)
#     rh_keypoints = rh_keypoints.flatten()  # Làm phẳng để đưa vào mô hình
#     res_interaction = model_interact.predict(np.expand_dims(rh_keypoints, axis=0), verbose=0)[0]
#     predictions_interaction = np.argmax(res_interaction)
#     result_queue.put(('interaction', predictions_interaction))

# def predict_movement_check(lh_keypoints, result_queue):
#     lh_keypoints = normalize_keypoints(lh_keypoints)
#     lh_keypoints = np.expand_dims(lh_keypoints, axis=0)
#     res_check = model_check.predict(lh_keypoints, verbose=0)
#     isMovement = 0 if res_check[0] > 0.5 else 1
#     result_queue.put(('movement', isMovement))

# def predict_direction(movement_seq, result_queue):
#     seq = np.array(movement_seq[-10:])
#     seq = normalize_points(seq)
#     res_direction = model_direction.predict(np.expand_dims(seq, axis=0), verbose=0)[0]
#     predictions_direction = np.argmax(res_direction)
#     result_queue.put(('direction', predictions_direction))

# # Thiết lập một hàm chung để chạy ba mô hình trên các luồng
# def run_model_in_threads():
#     interaction_seq = []
#     movement_seq = []
#     threshold = 0.7
#     frame_counter = 0  # Khởi tạo biến đếm

#     isMovement = 0
#     predictions_direction = 0
#     predictions_interaction = 0

#     cap = cv2.VideoCapture(0)
#     with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#         while cap.isOpened():
#             ret, frame = cap.read()
#             frame_counter += 1 

#             # Make detections
#             image, results = mediapipe_detection(frame, holistic)
#             draw_styled_landmarks(image, results)

#             lh_keypoints, rh_keypoints = extract_hand_keypoints(results)
#             index_keypoints = extract_index_keypoints(results)

#             movement_seq.append(index_keypoints)
#             interaction_seq.append(rh_keypoints)

#             # Cắt ngắn dữ liệu nếu quá dài
#             interaction_seq = interaction_seq[-10:]
#             if len(interaction_seq) == 10:
#                 # Khởi tạo queue để nhận kết quả từ các luồng
#                 result_queue = queue.Queue()

#                 # Khởi tạo 3 luồng tương ứng với 3 mô hình
#                 thread_interaction = threading.Thread(target=predict_interaction, args=(interaction_seq, result_queue))
#                 thread_movement = threading.Thread(target=predict_movement_check, args=(lh_keypoints, result_queue))
#                 thread_direction = threading.Thread(target=predict_direction, args=(movement_seq, result_queue))

#                 # Bắt đầu các luồng
#                 thread_interaction.start()
#                 thread_movement.start()
#                 thread_direction.start()

#                 # Đợi tất cả các luồng hoàn thành
#                 thread_interaction.join()
#                 thread_movement.join()
#                 thread_direction.join()

#                 # Nhận kết quả từ các luồng
#                 while not result_queue.empty():
#                     task, result = result_queue.get()
#                     if task == 'interaction':
#                         predictions_interaction = result
#                     elif task == 'movement':
#                         isMovement = result
#                     elif task == 'direction':
#                         predictions_direction = result

#                 # Gửi dữ liệu qua UDP nếu có sự thay đổi
#                 send_result = f"{isMovement}, {predictions_direction}, {predictions_interaction}"
#                 SendData(send_result)

#             # Hiển thị kết quả
#             cv2.imshow('OpenCV Feed', image)

#             if frame_counter % 100 == 0:
#                 frame_counter = 0

#             if cv2.waitKey(10) & 0xFF == ord('q'):
#                 break
#         cap.release()
#         cv2.destroyAllWindows()


# run_model_in_threads()
