In [8]:
import cv2
import numpy as np
import os
import mediapipe as mp

In [9]:
# 初始化 Mediapipe Pose 模型
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [10]:
def mediapipe_detection(image, model):
    """
    This function detects human pose estimation keypoints from webcam footage
    
    """
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [12]:
def draw_landmarks(image, results):
    """
    This function draws keypoints and landmarks detected by the human pose estimation model
    
    """
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
                                 )

In [13]:
def extract_keypoints(results):
    """
    Processes and organizes the keypoints detected from the pose estimation model 
    to be used as inputs for the exercise decoder models
    
    """
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return pose

In [14]:
# 路径和参数设置（保持不变）
DATA_PATH = os.path.join(os.getcwd(),'data')
actions = np.array(['curl', 'squats', 'bridges'])
colors = [(245,117,16), (117,245,16), (16,117,245)]
no_sequences = 15
start_folder = 101

In [15]:
# 设置每个视频序列的帧数
sequence_length =30  # 假设每个视频序列包含 30 帧

In [16]:
# 创建数据文件夹
if not os.path.exists(DATA_PATH):
    os.makedirs(DATA_PATH)

for action in actions:
    dir_path = os.path.join(DATA_PATH, action)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    for sequence in range(no_sequences):
        sequence_path = os.path.join(dir_path, str(sequence))
        if not os.path.exists(sequence_path):
            os.makedirs(sequence_path)

In [17]:
video_files = {'curl': 'videos/curls.mp4', 'squats': 'videos/squats.mp4', 'bridges': 'videos/bridges.mp4'}  # 替换为实际视频路径


In [18]:
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    for action, video_file in video_files.items():
        cap = cv2.VideoCapture(video_file)
        frame_count = 0
        sequence = 0

        while cap.isOpened() and sequence < no_sequences:
            ret, frame = cap.read()
            if not ret:
                break
            
            image, results = mediapipe_detection(frame, pose)
            keypoints = extract_keypoints(results)
            npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_count))
            np.save(npy_path, keypoints)

            frame_count += 1
            if frame_count == sequence_length:
                sequence += 1
                frame_count = 0

        cap.release()

I0000 00:00:1710786634.719171       1 gl_context.cc:344] GL version: 2.1 (2.1 Metal - 76.3), renderer: Apple M1
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


processing data

In [19]:
label_map = {label:num for num, label in enumerate(actions)}

In [20]:
sequences, labels = [], []
for action in actions:
    # 获取动作文件夹的路径
    action_path = os.path.join(DATA_PATH, action)
    
    # 过滤掉非数字的文件夹名（排除如 .DS_Store 的文件）
    sequence_dirs = [dir_name for dir_name in os.listdir(action_path) if dir_name.isdigit()]

    # 现在可以安全地将目录名转换为整数
    for sequence in np.array(sequence_dirs).astype(int):
        window = []
        for frame_num in range(sequence_length):         
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)  
            
        sequences.append(window)
        labels.append(label_map[action])





In [21]:
# 检查 label_map 是否包含所有动作
print("Label Map:", label_map)

# 在加载数据后，检查 labels 列表
print("Labels Sample:", labels[:15])  # 打印前10个标签作为样本

# 检查 labels 列表的长度是否与数据点总数相匹配
print("Total number of labels:", len(labels))


Label Map: {'curl': 0, 'squats': 1, 'bridges': 2}
Labels Sample: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Total number of labels: 45


In [22]:
# 计算每个类别的样本数量
unique, counts = np.unique(labels, return_counts=True)
class_counts = dict(zip(unique, counts))

print("Class counts:", class_counts)


Class counts: {0: 15, 1: 15, 2: 15}


In [23]:
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

In [24]:

labels_array = np.array(labels).reshape(-1, 1)
encoder = OneHotEncoder()
y = encoder.fit_transform(labels_array).toarray()  # 使用 toarray() 转换为密集矩阵
X = np.array(sequences)
print("Shape of encoded labels:", y.shape)



Shape of encoded labels: (45, 3)


In [25]:
# 分割数据集为训练集、验证集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=1)
#X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=15/90, random_state=2)

# 输出数据集的形状以确认
print("Training set:", X_train.shape, y_train.shape)
#print("Validation set:", X_val.shape, y_val.shape)
print("Testing set:", X_test.shape, y_test.shape)

Training set: (31, 30, 132) (31, 3)
Testing set: (14, 30, 132) (14, 3)


In [26]:
unique_labels_train = np.unique(y_train)
unique_labels_test = np.unique(y_test)
print("Unique labels in training data:", unique_labels_train)
print("Unique labels in test data:", unique_labels_test)
y_train


Unique labels in training data: [0. 1.]
Unique labels in test data: [0. 1.]


array([[0., 0., 1.],
       [0., 1., 0.],
       [0., 0., 1.],
       [0., 0., 1.],
       [1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 0., 1.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 0., 1.],
       [0., 1., 0.],
       [0., 1., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 0., 1.],
       [1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [0., 1., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [1., 0., 0.],
       [0., 0., 1.],
       [0., 0., 1.]])

In [27]:
import torch
import torch.nn as nn
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [28]:
tensor_X_test = torch.from_numpy(X_test)
print('test_data_size:', tensor_X_test.size())
tensor_X_test = torch.from_numpy(X_test)
print('test_data_size:',tensor_X_test.size())
print(y_test)


tensor_y_test = torch.from_numpy(y_test)
print('test_label_size:',tensor_y_test.size())
n_data_size_test = tensor_X_test.size()[0]



print('n_data_size_test:',n_data_size_test)

tensor_X_train = torch.from_numpy(X_train)
print('train_data_size:',tensor_X_train.size())
tensor_y_train = torch.from_numpy(y_train)
print('train_label_size:',tensor_y_train.size())
n_data_size_train = tensor_X_train.size()[0]
print('n_data_size_train:',n_data_size_train)

test_data_size: torch.Size([14, 30, 132])
test_data_size: torch.Size([14, 30, 132])
[[1. 0. 0.]
 [1. 0. 0.]
 [0. 0. 1.]
 [0. 1. 0.]
 [0. 1. 0.]
 [0. 0. 1.]
 [0. 1. 0.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 0. 1.]
 [0. 1. 0.]
 [0. 1. 0.]]
test_label_size: torch.Size([14, 3])
n_data_size_test: 14
train_data_size: torch.Size([31, 30, 132])
train_label_size: torch.Size([31, 3])
n_data_size_train: 31


In [29]:
tensor_y_train

tensor([[0., 0., 1.],
        [0., 1., 0.],
        [0., 0., 1.],
        [0., 0., 1.],
        [1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 0., 1.],
        [0., 1., 0.],
        [0., 1., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 0., 1.],
        [1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [0., 1., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [1., 0., 0.],
        [0., 0., 1.],
        [0., 0., 1.]], dtype=torch.float64)

In [30]:
import numpy as np

# 假设您的标签数组是 tensor_y_train
unique_labels = np.unique(tensor_y_train.numpy())
print("Unique labels in the dataset:", unique_labels)


Unique labels in the dataset: [0. 1.]


In [31]:
class LSTM(nn.Module):
    
    def __init__(self,input_dim,hidden_dim,output_dim,layer_num):
        super(LSTM,self).__init__()
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.lstm = torch.nn.LSTM(input_dim,hidden_dim,layer_num,batch_first=True)
        self.fc = torch.nn.Linear(hidden_dim,output_dim)
        self.bn = nn.BatchNorm1d(30)
        
    def forward(self,inputs):
        x = self.bn(inputs)
        lstm_out,(hn,cn) = self.lstm(x)
        out = self.fc(lstm_out[:,-1,:])
        return out

In [32]:
n_hidden = 128  # 隐藏层维度
n_joints = 132  # 输入维度，每帧的特征数量
n_categories = 3  # 输出维度，类别数量
n_layer = 3  # LSTM 层数
rnn = LSTM(n_joints,n_hidden,n_categories,n_layer)
rnn.to(device)

LSTM(
  (lstm): LSTM(132, 128, num_layers=3, batch_first=True)
  (fc): Linear(in_features=128, out_features=3, bias=True)
  (bn): BatchNorm1d(30, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

In [33]:
LABELS = ['curls','squats','bridges']


In [34]:
def categoryFromOutput(output):
    top_n, top_i = output.topk(1)
    category_i = top_i[0].item()
    return LABELS[category_i], category_i

In [35]:
import random
def randomTrainingExampleBatch(batch_size, flag, num=-1):
    if flag == 'train':
        X = tensor_X_train
        y = tensor_y_train
        data_size = n_data_size_train
    elif flag == 'test':
        X = tensor_X_test
        y = tensor_y_test
        data_size = n_data_size_test
    
    # 确保不会尝试从小于批量大小的数据集中提取数据
    if data_size < batch_size:
        raise ValueError("Data size is smaller than batch size")
    
    if num == -1:
        ran_num = random.randint(0, data_size - batch_size)
    else:
        ran_num = num
    
    # print("ran_num_2: %d, %d, %d" % (ran_num, data_size - batch_size, data_size))
    pose_sequence_tensor = X[ran_num:(ran_num + batch_size)]
    category_tensor = y[ran_num:ran_num + batch_size, :]
    return category_tensor.long(), pose_sequence_tensor


In [36]:
import time
import math
import torch.optim as optim


In [37]:
# 训练循环
learning_rate = 0.0005
optimizer = optim.SGD(rnn.parameters(),lr=learning_rate,momentum=0.9)
criterion = nn.CrossEntropyLoss()

current_loss = 0
all_losses = []



def timeSince(since):
    now = time.time()
    s = now - since
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

start = time.time()

In [44]:
n_iters = 100000
print_every = 1000
plot_every = 1000
batch_size = 5

for iter in range(1, n_iters + 1):
    category_tensor, input_sequence = randomTrainingExampleBatch(batch_size, 'train')
    input_sequence = input_sequence.to(device)
    category_tensor = category_tensor.to(device)
  


    # 获取类别索引
    _, category_tensor = category_tensor.max(dim=1)

    optimizer.zero_grad()

    output = rnn(input_sequence.float())
    loss = criterion(output, category_tensor)
    loss.backward()
    optimizer.step()

    current_loss += loss.item()

    if iter % print_every == 0:
        guess, guess_i = categoryFromOutput(output)
        correct = '✓' if guess == LABELS[category_tensor[0]] else '✗ (%s)' % LABELS[category_tensor[0]]
        print('%d %d%% (%s) %.4f  / %s %s' % (iter, iter / n_iters * 100, timeSince(start), loss, guess, correct))

    if iter % plot_every == 0:
        all_losses.append(current_loss / plot_every)
        current_loss = 0

1000 1% (30m 33s) 0.0009  / bridges ✓
2000 2% (30m 50s) 0.0007  / curls ✓
3000 3% (31m 6s) 0.0006  / squats ✓
4000 4% (31m 24s) 0.0006  / squats ✓
5000 5% (31m 41s) 0.0005  / squats ✓
6000 6% (31m 58s) 0.0005  / squats ✓
7000 7% (32m 16s) 0.0005  / squats ✓
8000 8% (32m 32s) 0.0004  / squats ✓
9000 9% (32m 49s) 0.0005  / curls ✓
10000 10% (33m 7s) 0.0005  / curls ✓
11000 11% (33m 24s) 0.0004  / bridges ✓
12000 12% (33m 42s) 0.0005  / squats ✓
13000 13% (33m 58s) 0.0004  / curls ✓
14000 14% (34m 16s) 0.0005  / curls ✓
15000 15% (34m 33s) 0.0004  / curls ✓
16000 16% (34m 49s) 0.0004  / bridges ✓
17000 17% (35m 5s) 0.0004  / curls ✓
18000 18% (35m 22s) 0.0004  / squats ✓
19000 19% (35m 37s) 0.0004  / curls ✓
20000 20% (35m 53s) 0.0003  / squats ✓
21000 21% (36m 10s) 0.0003  / curls ✓
22000 22% (36m 26s) 0.0003  / curls ✓
23000 23% (36m 44s) 0.0003  / curls ✓
24000 24% (37m 0s) 0.0004  / bridges ✓
25000 25% (37m 17s) 0.0003  / squats ✓
26000 26% (37m 35s) 0.0004  / bridges ✓
27000 27% (37m

In [45]:
torch.save(rnn.state_dict(),'lstm_model.pkl')


In [46]:
def test(flag):
    if flag == 'train':
        n = n_data_size_train
    elif flag == 'test':
        n = n_data_size_test

    with torch.no_grad():
        right = 0
        start = time.time()
        for i in range(n):
            category_tensor, inputs = randomTrainingExampleBatch(1, flag, i)
            category_index = category_tensor[0].nonzero(as_tuple=True)[0].item()
            category = LABELS[category_index]

            inputs = inputs.to(device).float()
            output = rnn(inputs)
            
            guess, guess_i = categoryFromOutput(output)
            correct = '✓' if guess_i == category_index else '✗ (%s)' % category
            print('%d %d%% (%s) %.4f / %s %s' % (i + 1, (i + 1) / n * 100, timeSince(start), output[0][guess_i], guess, correct))
            
            if category_index == guess_i:
                right += 1

    print(flag, 'accuracy', right / n)


In [None]:
# def test(flag):
#     if flag == 'train':
#         n = n_data_size_train
#     elif flag == 'test':
#         n = n_data_size_test

#     with torch.no_grad():
#         right = 0
#         for i in range(n):
#             category_tensor, inputs = randomTrainingExampleBatch(1, flag, i)
#             category_index = category_tensor[0].nonzero(as_tuple=True)[0].item()
#             category = LABELS[category_index]

#             # 确保 inputs 在正确的设备上，并且是正确的数据类型
#             inputs = inputs.to(device).float()
#             output = rnn(inputs)
            
#             guess, guess_i = categoryFromOutput(output)
#             if category_index == guess_i:
#                 right += 1

#     print(flag, 'accuracy', right / n)


In [48]:
test(flag='test')


1 7% (0m 0s) 6.8801 / curls ✓
2 14% (0m 0s) 6.8849 / curls ✓
3 21% (0m 0s) 6.2443 / bridges ✓
4 28% (0m 0s) 6.4526 / squats ✓
5 35% (0m 0s) 6.4690 / squats ✓
6 42% (0m 0s) 6.2389 / bridges ✓
7 50% (0m 0s) 6.4604 / squats ✓
8 57% (0m 0s) 6.2370 / bridges ✓
9 64% (0m 0s) 6.2499 / bridges ✓
10 71% (0m 0s) 6.2500 / bridges ✓
11 78% (0m 0s) 6.2516 / bridges ✓
12 85% (0m 0s) 6.2468 / bridges ✓
13 92% (0m 0s) 6.4589 / squats ✓
14 100% (0m 0s) 6.4662 / squats ✓
test accuracy 1.0


In [None]:
test(flag='train')
