In [8]:
import torch
import warnings

warnings.filterwarnings('ignore')
torch.cuda.set_device(6)

In [9]:
import os
import json
import math
import numpy as np
# 以下的数字代表是这个点的x坐标序号，+1 为y坐标序号 
# 0-21   index = 2 * num + 1
WRIST = 1 # 腕部关键点  
THUMB_TIP = 9 #大拇指顶点
INDEX_FINGER_TIP = 17 #食指顶点
MIDDLE_FINGER_TIP = 25 #中指顶点
RING_FINGER_TIP = 33 #无名指顶点
PINKY_TIP = 41 #小指顶点

'''
WT  大拇指 ——> 腕部
WI  食指
WM  中指
WR  无名指
WP  小指
'''

# 计算最大值和最小值
def find_local_extremes(numbers):
    if len(numbers) < 3:
        return "输入的数字串过短，无法找到局部最大值和最小值", []

    extremes = []
    transitions = []  # 记录时间变化点

    for i in range(1, len(numbers) - 1):
        if numbers[i] >= numbers[i - 1] and numbers[i] > numbers[i + 1] :
            extremes.append((i, numbers[i], "最高点"))
        elif numbers[i] <= numbers[i - 1] and numbers[i] < numbers[i + 1]:
            extremes.append((i, numbers[i], "最低点"))
    
    if len(extremes) == 0:
        return "没有找到局部最大值和最小值", []

    # 记录时间变化点
    for i in range(1, len(extremes)):
        if extremes[i][2] == "最高点" and extremes[i-1][2] == "最低点":
            transitions.append((extremes[i-1][0], extremes[i][0], "升为最高点"))
        elif extremes[i][2] == "最低点" and extremes[i-1][2] == "最高点":
            transitions.append((extremes[i-1][0], extremes[i][0], "降为最低点"))

    return extremes, transitions

# 计算抓伸周期
def calculate_transition_times(transitions):
    high_to_low_times = []
    low_to_high_times = []

    for i in range(1, len(transitions)):
        if transitions[i][2] == "降为最低点":
            high_to_low_time = transitions[i][1] - transitions[i][0]
            high_to_low_times.append(high_to_low_time)
        elif transitions[i][2] == "升为最高点":
            low_to_high_time = transitions[i][1] - transitions[i][0]
            low_to_high_times.append(low_to_high_time)

    return high_to_low_times, low_to_high_times

# 计算2、3、4、5指相对于腕部关键点的距离
def get_distance(label):
    WT = []
    WI = []
    WM = []
    WR = []
    WP = []
    for line in label:
        x1, y1 = line[1], line[2]
        coordinates = [(line[i], line[i + 1]) for i in range(9, 42, 8)]

        distances = [math.sqrt((x - x1) ** 2 + (y - y1) ** 2) for x, y in coordinates]

        wt_distance, wi_distance, wm_distance, wr_distance, wp_distance = distances

        WT.append(wt_distance)
        WI.append(wi_distance)
        WM.append(wm_distance)
        WR.append(wr_distance)
        WP.append(wp_distance)

    return WT, WI, WM, WR, WP

#2指、5指根部关键点与手腕关键点所围成的面积
def calculate_area(label):
    area_list = []
    for line in label :
        x1, y1 = line[1], line[2]   #腕部关键点坐标
        x2, y2 = line[17], line[18]
        x3, y3 = line[41], line[42]
        area = 0.5 * abs(x1 * (y2 - y3) + x2 * (y3 - y1) + x3 * (y1 - y2))
        area_list.append(area)
    return area_list

def calculate_time_differences(curve1_transitions, curve2_transitions):
    # 初始化变量
    start_differences = []
    end_differences = []

    # 遍历每个周期
    for i in range(min(len(curve1_transitions), len(curve2_transitions))):
        # 找到两条曲线的第一个极值点的帧数
        start_frame_curve1 = curve1_transitions[i][0]
        start_frame_curve2 = curve2_transitions[i][0]

        # 找到两条曲线的最后一个极值点的帧数
        end_frame_curve1 = curve1_transitions[i][1]
        end_frame_curve2 = curve2_transitions[i][1]
        

        # 计算启动时间差和结束时间差，并添加到列表中
        if i % 2 == 0 :
            start_differences.append(start_frame_curve2 - start_frame_curve1)
        else:
            end_differences.append(end_frame_curve2 - end_frame_curve1)

    return start_differences, end_differences

def get_info(dist):
    extremes, transitions = find_local_extremes(dist)
    high_to_low_times, low_to_high_times = calculate_transition_times(transitions)
    return extremes, transitions, high_to_low_times, low_to_high_times

def process(json_path):
    with open(json_path, 'r') as file:
        data = json.load(file)
        label = data["label"]
        
    WT, WI, WM, WR, WP = get_distance(label)
    
    wt_extremes, wt_transitions, wt_high_to_low_times, wt_low_to_high_times = get_info(WT)
    wi_extremes, wi_transitions, wi_high_to_low_times, wi_low_to_high_times = get_info(WI)
    wm_extremes, wm_transitions, wm_high_to_low_times, wm_low_to_high_times = get_info(WM)
    wr_extremes, wr_transitions, wr_high_to_low_times, wr_low_to_high_times = get_info(WR)
    wp_extremes, wp_transitions, wp_high_to_low_times, wp_low_to_high_times = get_info(WP)
    
    
    wt_info = [max(wt_high_to_low_times), min(wt_high_to_low_times), sum(wt_high_to_low_times) / len(wt_high_to_low_times), max(wt_low_to_high_times), min(wt_low_to_high_times),sum(wt_low_to_high_times) / len(wt_low_to_high_times)]
    wi_info = [max(wi_high_to_low_times), min(wi_high_to_low_times), sum(wi_high_to_low_times) / len(wi_high_to_low_times), max(wi_low_to_high_times), min(wi_low_to_high_times),sum(wi_low_to_high_times) / len(wi_low_to_high_times)]
    wm_info = [max(wm_high_to_low_times), min(wm_high_to_low_times), sum(wm_high_to_low_times) / len(wm_high_to_low_times), max(wm_low_to_high_times), min(wm_low_to_high_times),sum(wm_low_to_high_times) / len(wm_low_to_high_times)]
    wr_info = [max(wr_high_to_low_times), min(wr_high_to_low_times), sum(wr_high_to_low_times) / len(wr_high_to_low_times), max(wr_low_to_high_times), min(wr_low_to_high_times),sum(wr_low_to_high_times) / len(wr_low_to_high_times)]
    wp_info = [max(wp_high_to_low_times), min(wp_high_to_low_times), sum(wp_high_to_low_times) / len(wp_high_to_low_times), max(wp_low_to_high_times), min(wp_low_to_high_times),sum(wp_low_to_high_times) / len(wp_low_to_high_times)]

    area = calculate_area(label)
    #start_differences, end_differences = calculate_time_differences(wi_transitions, wm_transitions)
    
    return area
    #return [wt_info, wi_info, wm_info, wr_info, wp_info]
    #return [wi_info, wm_info, wr_info, wp_info]
    #return [wi_info, wm_info, wr_info]


病人结果

In [10]:
def get_result(label_folder, txt_file):
    # 读取txt文件中的序号名
    with open(txt_file, 'r') as file:
        sequence_numbers = file.read().splitlines()
    result = []
    # 遍历label文件夹中的文件
    for seq_num in sequence_numbers:
        left_file = f'p{seq_num}L.json'
        left_path = os.path.join(label_folder, left_file)
        
        right_file = f'p{seq_num}R.json'
        right_path = os.path.join(label_folder, right_file)
        
        # 检查文件是否存在
        if os.path.exists(left_path):
            left = process(left_path)
            #print(single)
            result.append(left)
        # else:
        #     print(f'File {json_file} does not exist in the label folder.')
        
        if os.path.exists(right_path):
            right = process(right_path)
            result .append(right)
            
    #return np.array(result)
    return result


正常人结果

In [11]:
def get_normal_result(label_folder, txt_file):
    # 读取txt文件中的序号名
    with open(txt_file, 'r') as file:
        sequence_numbers = file.read().splitlines()
    result = []
    # 遍历label文件夹中的文件
    for seq_num in sequence_numbers:
        left_file = f'h{seq_num}L.json'
        left_path = os.path.join(label_folder, left_file)
        
        right_file = f'h{seq_num}R.json'
        right_path = os.path.join(label_folder, right_file)
        
        # 检查文件是否存在
        if os.path.exists(left_path):
            left = process(left_path)
            #print(single)
            result.append(left)
        # else:
        #     print(f'File {json_file} does not exist in the label folder.')
        
        if os.path.exists(right_path):
            right = process(right_path)
            result .append(right)
            
    return np.array(result)

In [12]:
# 定义文件路径
label_folder = 'mask_output/all'  # label文件夹路径
#txt_file = 'mask_output/jinzhuibing.txt'  # jinzhuibing.txt文件路径

jzb_data = get_result(label_folder, 'mask_output/jinzhuibing.txt')

In [13]:
for cycle in jzb_data:
    print(cycle)


[0.11803513071895426, 0.04934640522875817, 0.019669117647058816, 0.013623366013071889, 0.011707516339869283, 0.07369689542483661, 0.11277369281045752, 0.10485089869281046, 0.020024509803921578, 0.017187499999999988, 0.014810049019607842, 0.014899918300653597, 0.01996527777777779, 0.09573937908496732, 0.12462622549019609, 0.13247140522875817, 0.022228349673202606, 0.017248774509803923, 0.014060457516339869, 0.015024509803921567, 0.0822610294117647, 0.11610294117647056, 0.11933823529411765, 0.05016339869281045, 0.01728962418300653, 0.011924019607843138, 0.013915441176470589, 0.06274714052287582, 0.10901348039215687, 0.12470996732026143, 0.02305964052287582, 0.016580882352941174, 0.015222630718954244, 0.013431372549019618, 0.016313316993464055, 0.08237336601307188, 0.1198263888888889, 0.06023692810457516, 0.015263480392156872, 0.011137663398692814, 0.061928104575163394, 0.10803104575163403, 0.12927287581699348, 0.11597222222222225, 0.017354983660130725, 0.014475081699346415, 0.01191993464

填充序列

In [17]:
# 找到所有序列中的最大长度
max_length = max(len(seq) for seq in jzb_data)

def pad_sequence(sequence, max_length):
    current_length = len(sequence)
    if current_length < max_length:
        num_to_pad = max_length - current_length
        pad_sequence = sequence[:num_to_pad]  # 前X位
        padded_sequence = pad_sequence + sequence
        return padded_sequence
    else:
        return sequence

# 填充每个序列
padded_data = [pad_sequence(seq, max_length) for seq in jzb_data]

In [None]:
# 填充序列，使所有序列长度相同
max_length = max(len(x) for x in jzb_data)
min_length = min(len(x) for x in jzb_data)
print(max_length)
print(min_length)
data_padded = np.array([x + [0]*(max_length - len(x)) for x in jzb_data])

300
142


In [None]:
print(data_padded.shape)

(258, 300)


In [None]:
pjs_data = get_result(label_folder, 'mask_output/pajinseng.txt')
yz_data = get_result(label_folder, 'mask_output/yaozhui.txt')
zf_data = get_result(label_folder, 'mask_output/zhongfeng.txt')
normal_data = get_normal_result(label_folder, 'mask_output/normal.txt')
normal_data = normal_data[:200]
print(jzb_data.shape)
print(pjs_data.shape)
print(yz_data.shape)
print(zf_data.shape)
print(normal_data.shape)

In [None]:
# jzb_index = np.random.choice(jzb_data.shape[0], 37, replace=False)
# jzb_data = jzb_data[jzb_index]
# # pjs_index = np.random.choice(pjs_data.shape[0], 10, replace=False) 
# # pjs_data = pjs_data[pjs_index]
# zf_index = np.random.choice(zf_data.shape[0], 37, replace=False)
# zf_data = zf_data[zf_index]

In [None]:
# 展开数据列表
x1 = jzb_data.reshape((jzb_data.shape[0], -1))
x2 = pjs_data.reshape((pjs_data.shape[0], -1))
x3 = yz_data.reshape((yz_data.shape[0], -1))
x4 = zf_data.reshape((zf_data.shape[0], -1))
x5 = normal_data.reshape((normal_data.shape[0], -1))

# 创建对应的标签
y1 = np.ones(x1.shape[0], dtype=int) * 1
y2 = np.ones(x2.shape[0], dtype=int) * 2
y3 = np.ones(x3.shape[0], dtype=int) * 3
y4 = np.ones(x4.shape[0], dtype=int) * 4
y5 = np.ones(x5.shape[0], dtype=int) * 5

# 合并数据和标签
X = np.vstack((x1, x2, x4, x5))
y = np.hstack((y1, y2, y4, y5))

# 打印合并后的数据和标签的形状
print("Shape of X:", X.shape)
print("Shape of y:", y.shape)

Shape of X: (744, 24)
Shape of y: (744,)


In [None]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import warnings

warnings.filterwarnings("ignore")

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 定义要评估的模型
models = {
    "SVM": SVC(),
    "Random Forest": RandomForestClassifier(),
    "Logistic Regression": LogisticRegression(max_iter=500),
    "K-Nearest Neighbors": KNeighborsClassifier()
}
# 使用交叉验证评估每个模型的性能，并记录结果
best_score = 0
best_model_name = None
best_model = None

for name, model in models.items():
    scores = cross_val_score(model, X_train, y_train, cv=5)
    mean_score = scores.mean()
    std_score = scores.std() * 2
    print(f"{name} - Accuracy: {mean_score:.2f} (+/- {std_score:.2f})")
    
    if mean_score > best_score:
        best_score = mean_score
        best_model_name = name
        best_model = model

# 训练并测试每个模型
for name, model in models.items():
    model.fit(X_train, y_train)
    accuracy = model.score(X_test, y_test)
    print(f"{name} Test Accuracy: {accuracy:.2f}")

# 输出性能最佳的模型
print(f"Best Model: {best_model_name} with Cross-Validation Accuracy: {best_score:.2f}")

SVM - Accuracy: 0.50 (+/- 0.05)
Random Forest - Accuracy: 0.49 (+/- 0.03)
Logistic Regression - Accuracy: 0.42 (+/- 0.06)
K-Nearest Neighbors - Accuracy: 0.42 (+/- 0.03)
SVM Test Accuracy: 0.52
Random Forest Test Accuracy: 0.50
Logistic Regression Test Accuracy: 0.45
K-Nearest Neighbors Test Accuracy: 0.44
Best Model: SVM with Cross-Validation Accuracy: 0.50


In [None]:
print(jzb_data.shape[1])

4


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split, TensorDataset
from torchvision.models import resnet101
from torchvision.transforms import ToTensor, Normalize, Compose

In [None]:
# 定义自定义数据集
class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample, label

In [None]:
y1 = np.ones(x1.shape[0], dtype=int) * 0
y2 = np.ones(x2.shape[0], dtype=int) * 1
y4 = np.ones(x4.shape[0], dtype=int) * 2
y5 = np.ones(x5.shape[0], dtype=int) * 3

In [None]:
# 合并数据和标签
data = np.concatenate((jzb_data, pjs_data,zf_data, normal_data), axis=0)
labels = np.concatenate((y1, y2, y4, y5), axis=0)


In [None]:
# 使用 np.tile 直接进行复制并重塑形状
data = np.tile(data[:, np.newaxis, :, :], (1, 3, 1, 1))

In [None]:
# 将 NumPy 数组转换为 torch.Tensor
data = torch.tensor(data)
labels = torch.tensor(labels, dtype=float)

In [None]:
print(data.shape)
print(labels.shape)

torch.Size([744, 3, 4, 6])
torch.Size([744])


In [None]:
# 定义数据变换
transform = Compose([
    ToTensor(),
    Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# 创建自定义数据集
#dataset = CustomDataset(data, labels, transform=transform)

# 将特征和标签包装在 TensorDataset 中
dataset = TensorDataset(data, labels)

# 分割数据集为训练集和测试集
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# 定义ResNet模型
class SimpleResNet(nn.Module):
    def __init__(self, num_classes=4):
        super(SimpleResNet, self).__init__()
        self.resnet = resnet101(pretrained=True)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)

    def forward(self, x):
        return self.resnet(x)

In [None]:
# 初始化模型、损失函数和优化器
model = SimpleResNet(num_classes=4)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
device = torch.device('cuda:6')
model.to(device)

Downloading: "https://download.pytorch.org/models/resnet101-63fe2227.pth" to /home/cike/.cache/torch/hub/checkpoints/resnet101-63fe2227.pth
100.0%


SimpleResNet(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
      

In [None]:
# 训练模型
num_epochs = 150
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs = inputs.float()
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.long())
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    #print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.float() # 确保 inputs 是 Float 类型
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels.long()).sum().item()

    accuracy = correct / total
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}, Test Accuracy: {accuracy:.4f}")

Epoch 1/150, Loss: 1.6080, Test Accuracy: 0.3826
Epoch 2/150, Loss: 1.3331, Test Accuracy: 0.5034
Epoch 3/150, Loss: 1.2665, Test Accuracy: 0.4765
Epoch 4/150, Loss: 1.2946, Test Accuracy: 0.4497
Epoch 5/150, Loss: 1.2772, Test Accuracy: 0.4295
Epoch 6/150, Loss: 1.2748, Test Accuracy: 0.4631
Epoch 7/150, Loss: 1.2806, Test Accuracy: 0.4966
Epoch 8/150, Loss: 1.2377, Test Accuracy: 0.4631
Epoch 9/150, Loss: 1.2408, Test Accuracy: 0.4832
Epoch 10/150, Loss: 1.2633, Test Accuracy: 0.5034
Epoch 11/150, Loss: 1.2812, Test Accuracy: 0.4362
Epoch 12/150, Loss: 1.2282, Test Accuracy: 0.4765
Epoch 13/150, Loss: 1.1665, Test Accuracy: 0.4564
Epoch 14/150, Loss: 1.1971, Test Accuracy: 0.4430
Epoch 15/150, Loss: 1.1709, Test Accuracy: 0.4832
Epoch 16/150, Loss: 1.1364, Test Accuracy: 0.4631
Epoch 17/150, Loss: 1.1691, Test Accuracy: 0.4765
Epoch 18/150, Loss: 1.1546, Test Accuracy: 0.4698
Epoch 19/150, Loss: 1.1231, Test Accuracy: 0.4765
Epoch 20/150, Loss: 1.2147, Test Accuracy: 0.4497
Epoch 21/

In [None]:
# 保存模型权重
torch.save(model.state_dict(), 'model_weights.pth')
print('Model weights saved to model_weights.pth')

Model weights saved to model_weights.pth


In [None]:
# 假设 model 是定义好的模型
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()  # 设置模型为评估模式
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.float() # 确保 inputs 是 Float 类型
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.long()).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.4f}")

Test Accuracy: 1.1957


In [None]:
# 测试模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.float() # 确保 inputs 是 Float 类型
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels.long()).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy:.4f}")

Test Accuracy: 0.5978
