In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [9]:
import cv2
import mediapipe as mp
import os
import numpy as np
import pandas as pd
import json

# Mediapipe model and utilities
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

# Hàm phát hiện và vẽ các landmarks từ video
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Chuyển đổi từ BGR sang RGB
    image.flags.writeable = False  # Không thể chỉnh sửa hình ảnh trong quá trình xử lý
    results = model.process(image)  # Phân tích hình ảnh
    image.flags.writeable = True  # Bật lại khả năng chỉnh sửa hình ảnh
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # Chuyển lại RGB sang BGR
    return image, results

def extract_keypoints(results):
    # Trích xuất các điểm đặc trưng (landmarks) từ các phần khác nhau của cơ thể
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])  # Kết hợp tất cả các điểm vào một mảng

# Đọc dữ liệu metadata từ file JSON
metadata = {}
with open('data/WLASL_v0.3.json', 'r') as file:
    metadata = json.load(file)

# Tạo label map từ metadata
labelMap = {}
for i in metadata:
    label = i['gloss']
    for instance in i['instances']:
        video_id = int(instance['video_id'])
        frame_start = instance['frame_start']
        frame_end = instance['frame_end']
        fps = instance['fps']
        labelMap[video_id] = [label, frame_start, frame_end, fps]

# Tạo thư mục lưu dữ liệu
DATA_PATH = 'MP_Data'
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

# Lặp qua từng video trong thư mục dữ liệu
video_path = 'data/videos'
for video in os.listdir(video_path):
    if video.endswith('.mp4'):
        video_filename = os.path.basename(video)
        video_id = int(os.path.splitext(video_filename)[0])

        # Lấy thông tin video từ labelMap
        label, start_frame, end_frame, fps = labelMap[video_id]
        
        # Mở video
        cap = cv2.VideoCapture(os.path.join(video_path, video))
        cap.set(cv2.CAP_PROP_FPS, fps)

        # Khởi tạo mô hình Mediapipe Holistic
        with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
            # Tạo thư mục cho hành động
            action_path = os.path.join(DATA_PATH, label)
            if not os.path.exists(action_path):
                os.makedirs(action_path)

            # Tạo thư mục cho video
            video_dir = os.path.join(action_path, str(video_id))
            if not os.path.exists(video_dir):
                os.makedirs(video_dir)

            frame_count = 0
            keypoints_data = []

            # Đọc và xử lý từng frame
            while cap.isOpened():
                success, image = cap.read()
                if not success:
                    break
                frame_count += 1

                # Nếu frame nằm ngoài khoảng (start_frame, end_frame), bỏ qua
                if frame_count < start_frame or (end_frame != -1 and frame_count > end_frame):
                    continue

                # Phát hiện các landmarks
                image, results = mediapipe_detection(image, holistic)
                keypoints = extract_keypoints(results)
                keypoints_data.append(keypoints)

            # Lưu dữ liệu keypoints dưới dạng numpy array
            np.save(os.path.join(video_dir, f'{video_id}_keypoints.npy'), np.array(keypoints_data))

        # Đóng video
        cap.release()

print("Xử lý xong các video trong bộ dữ liệu!")

KeyboardInterrupt: 

In [None]:
!pip install mediapipe

# Extract keypoints

In [None]:
import cv2
import mediapipe as mp
import os
import numpy as np
import json

# Mediapipe model and utilities
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

# Hàm phát hiện và vẽ các landmarks từ video
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

# Hàm trích xuất keypoints từ kết quả Mediapipe
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

# Đọc metadata từ JSON
metadata = {}
with open('/kaggle/input/wlasl-processed/WLASL_v0.3.json', 'r') as file:
    metadata = json.load(file)

# Tạo label map (chỉ lấy 100 từ đầu tiên, mỗi từ lấy 5 video)
labelMap = {}
for i in metadata:
    label = i['gloss']
    for instance in i['instances']:
        video_id = int(instance['video_id'])
        frame_start = instance['frame_start']
        frame_end = instance['frame_end']
        fps = instance['fps']
        labelMap[video_id] = [label, frame_start, frame_end, fps]

# Tạo thư mục lưu dữ liệu
DATA_PATH = '/kaggle/working/MP_Data_Frames'
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

# Lặp qua từng video trong thư mục dữ liệu
video_path = '/kaggle/input/wlasl-processed/videos'
videos_processed = {}

for video in os.listdir(video_path):
    if video.endswith('.mp4'):
        video_id = int(os.path.splitext(video)[0])

        if video_id in labelMap:
            label, start_frame, end_frame, fps = labelMap[video_id]

            if label not in videos_processed:
                videos_processed[label] = 0

            if videos_processed[label] < 5:
                cap = cv2.VideoCapture(os.path.join(video_path, video))
                cap.set(cv2.CAP_PROP_FPS, fps)

                with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
                    # Tạo thư mục cho từ
                    action_path = os.path.join(DATA_PATH, label)
                    if not os.path.exists(action_path):
                        os.makedirs(action_path)

                    # Tạo thư mục cho video
                    video_dir = os.path.join(action_path, str(video_id))
                    if not os.path.exists(video_dir):
                        os.makedirs(video_dir)

                    frame_count = 0
                    while cap.isOpened():
                        success, image = cap.read()
                        if not success:
                            break
                        frame_count += 1

                        if frame_count < start_frame or (end_frame != -1 and frame_count > end_frame):
                            continue

                        image, results = mediapipe_detection(image, holistic)
                        keypoints = extract_keypoints(results)

                        # Lưu mỗi frame vào file riêng biệt
                        np.save(os.path.join(video_dir, f'{frame_count}.npy'), keypoints)

                    cap.release()
                videos_processed[label] += 1

print("Xử lý hoàn tất!")


In [None]:
!zip -r folder.zip /kaggle/working/MP_Data_2000

# Lấy danh sách hành động

In [None]:
import os

# Đường dẫn tới thư mục cần kiểm tra
root_folder = "/kaggle/working/MP_Data_2000"

# Lấy danh sách các thư mục con trong thư mục gốc
actions = [name for name in os.listdir(root_folder) if os.path.isdir(os.path.join(root_folder, name))]

# In danh sách các thư mục
print(actions)



In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

label_map = {label:num for num, label in enumerate(actions)}
label_map

sequences, labels = [], []

for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        sequence_path = os.path.join(DATA_PATH, action, str(sequence))
        num_frames = len(os.listdir(sequence_path))
        
        for frame_num in range(num_frames):  # Lấy tất cả các frame có sẵn
            res = np.load(os.path.join(sequence_path, "{}.npy".format(frame_num)))
            window.append(res)
        
        sequences.append(window)
        labels.append(label_map[action])


np.array(sequences).shape
np.array(labels).shape
X = np.array(sequences)
X.shape
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)
y_test.shape

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(actions.shape[0], activation='softmax'))

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=200, callbacks=[tb_callback])
model.summary()

In [None]:
res = model.predict(X_test)
actions[np.argmax(res[2])]
actions[np.argmax(y_test[0])]

In [None]:
model.save('action.h5')
model.load_weights('action.h5')

In [None]:
model.load_weights('action.h5')
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
multilabel_confusion_matrix(ytrue, yhat)
accuracy_score(ytrue, yhat)