In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import os
os.chdir('A:\Desktop\small_projects\Gesture_control\openpose_demo')
from PIL import Image
import json
import pandas as pd

In [1]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())

2.5.1+cu121
True


In [None]:
alignment_list = {"forward": 0, 
                "backward": 1, 
                "left": 0, 
                "right": 1, 
                "none": 2
                }

#label1 forward backward
#label0 left right

In [4]:
csv_file = './recordings/20241211-162000.csv'
data = pd.read_csv(csv_file)

data = data[['region_attributes', 'filename']]
# data['region_attributes'] = data['region_attributes'].apply(json.loads)
data.rename(columns={'filename': 'image'}, inplace=True)
# 提取 `forward_backward` 和 `left_right` 并替换为数值
def extract_and_map(region_attributes):
    attributes = json.loads(region_attributes)
    label1 = alignment_list.get(attributes.get("forward_backward"), -1)  # 使用映射
    label0 = alignment_list.get(attributes.get("left_right"), -1)        # 使用映射
    return label1, label0

# 应用到 DataFrame
data[["label1", "label0"]] = data["region_attributes"].apply(extract_and_map).apply(pd.Series)

# 删除原列 `region_attributes`
data.drop(columns=["region_attributes"], inplace=True)

# 查看结果
print(data)

               image  label1  label0
0     frame_0001.jpg       0       2
1     frame_0002.jpg       0       2
2     frame_0003.jpg       0       2
3     frame_0004.jpg       0       2
4     frame_0005.jpg       0       2
...              ...     ...     ...
2303  frame_2304.jpg       0       0
2304  frame_2305.jpg       0       0
2305  frame_2306.jpg       0       0
2306  frame_2307.jpg       0       0
2307  frame_2308.jpg       0       0

[2308 rows x 3 columns]


In [39]:
data.to_json("processed_data.csv", index=False)
data.to_json("processed_data.json", orient="records", lines=True)

In [7]:
print(data['label1'][800])

1


In [17]:
# -------------------------
# 自定义数据集类
# -------------------------
class MultiLabelDataset(Dataset):
    def __init__(self, data_frame, image_dir, transform=None):
        """
        自定义数据集类
        :param data_frame: 包含图像路径和标签的 DataFrame
        :param image_dir: 图像所在文件夹路径
        :param transform: 数据增强和预处理
        """
        self.data_frame = data_frame
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # 获取图像路径和标签
        img_name = self.data_frame.iloc[idx]["image"]
        label1 = self.data_frame.iloc[idx]["label1"]
        label0 = self.data_frame.iloc[idx]["label0"]

        # 加载图像
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")

        # 数据增强
        if self.transform:
            image = self.transform(image)

        # 返回图像和标签
        return image, torch.tensor(label1, dtype=torch.long), torch.tensor(label0, dtype=torch.long)

# -------------------------
# 定义模型
# -------------------------
class MultiLabelModel(nn.Module):
    def __init__(self, num_classes_label1=3, num_classes_label2=3):
        super(MultiLabelModel, self).__init__()
        # 使用预训练 ResNet 提取特征
        self.base_model = models.resnet18(pretrained=True)
        self.base_model.fc = nn.Identity()  # 去掉最后一层
        self.fc1 = nn.Linear(512, num_classes_label1)  # label_1 输出
        self.fc2 = nn.Linear(512, num_classes_label2)  # label_2 输出

    def forward(self, x):
        features = self.base_model(x)
        label1_output = self.fc1(features)
        label2_output = self.fc2(features)
        return label1_output, label2_output

# -------------------------
# 训练函数
# -------------------------
def train_model(model, dataloader, criterion, optimizer, num_epochs=10, device='cuda'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for images, label1, label0 in dataloader:
            images, label1, label0 = images.to(device), label1.to(device), label0.to(device)

            # 前向传播
            optimizer.zero_grad()
            output1, output0 = model(images)

            # 计算损失
            loss1 = criterion(output1, label1)
            loss0 = criterion(output0, label0)
            loss = loss1 + loss0

            # 反向传播和优化
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader):.4f}")

In [19]:
# 数据路径
data_dir = "./recordings/20241211-162000/video"  # 图片文件夹路径
# annotation_file = "./processed_data.json"  # 标签文件路径
# 数据增强和预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# 加载数据集
dataset = MultiLabelDataset(data, data_dir, transform=transform)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

In [20]:
for images, label1, label0 in dataloader:
    print(images.shape)  # 图像批量大小
    print(label1)        # label1 批量
    print(label0)        # label0 批量
    break

torch.Size([8, 3, 224, 224])
tensor([2, 2, 0, 0, 1, 0, 2, 1])
tensor([1, 0, 1, 0, 1, 1, 0, 1])


In [9]:
# 初始化模型
model = MultiLabelModel()
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 开始训练
train_model(model, dataloader, criterion, optimizer, num_epochs=10)



Epoch [1/10], Loss: 0.7061
Epoch [2/10], Loss: 0.4966
Epoch [3/10], Loss: 0.3320
Epoch [4/10], Loss: 0.2719
Epoch [5/10], Loss: 0.2226
Epoch [6/10], Loss: 0.1681
Epoch [7/10], Loss: 0.1356
Epoch [8/10], Loss: 0.1382
Epoch [9/10], Loss: 0.0863
Epoch [10/10], Loss: 0.1129


In [14]:
torch.save(model, "multi_label_model.pth")
print("模型已保存到 multi_label_model.pth")
torch.save(model.state_dict(), "multi_label_model_state.pth")
print("模型状态字典已保存到 multi_label_model_state.pth")

模型已保存到 multi_label_model.pth
模型状态字典已保存到 multi_label_model_state.pth


In [16]:
import cv2

# 初始化模型并加载权重
model = MultiLabelModel(num_classes_label1=3, num_classes_label2=3)
model.load_state_dict(torch.load("./multi_label_model_state.pth"))
model.eval()

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 定义数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 打开摄像头
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("无法打开摄像头")
    exit()

# 推理循环
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # 转换为 PIL 格式并进行预处理
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    input_tensor = transform(pil_image).unsqueeze(0).to(device)  # 添加 batch 维度

    # 推理
    with torch.no_grad():
        output1, output0 = model(input_tensor)
        label1 = torch.argmax(output1, dim=1).item()
        label0 = torch.argmax(output0, dim=1).item()

    # 显示结果
    cv2.putText(frame, f"Label1: {label1}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.putText(frame, f"Label0: {label0}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow("Camera Inference", frame)

    # 按 'q' 键退出
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
cv2.destroyAllWindows()

  model.load_state_dict(torch.load("./multi_label_model_state.pth"))


In [83]:
"""GNN for Human Pose Recognition"""
import torch
import pandas as pd
import os 
os.chdir("A:\Desktop\small_projects\Gesture_control\openpose_demo")
from torch.utils.data import Dataset, DataLoader
from torch_geometric.nn import GCNConv
import torch.nn.functional as F
import torch.nn as nn
import torch.nn.init as init

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("当前设备: ", device)


当前设备:  cuda


In [77]:
alignment_list = {"forward": 0, 
                "backward": 1, 
                "left": 0, 
                "right": 1, 
                "none": 2
                }

#label1 forward backward
#label0 left right

csv_file = './recordings/20241211-162000.csv'
data = pd.read_csv(csv_file)

data = data[['region_attributes', 'filename']]
# data['region_attributes'] = data['region_attributes'].apply(json.loads)
data.rename(columns={'filename': 'image'}, inplace=True)
# 提取 `forward_backward` 和 `left_right` 并替换为数值
def extract_and_map(region_attributes):
    attributes = json.loads(region_attributes)
    label1 = alignment_list.get(attributes.get("forward_backward"), -1)  # 使用映射
    label0 = alignment_list.get(attributes.get("left_right"), -1)        # 使用映射
    return label1, label0

# 应用到 DataFrame
data[["label1", "label0"]] = data["region_attributes"].apply(extract_and_map).apply(pd.Series)

# 删除原列 `region_attributes`
data.drop(columns=["region_attributes"], inplace=True)

In [90]:
all_node_features = []  # 存储所有样本的节点特征
all_edge_indices = []   # 存储所有样本的边信息
all_labels0 = []        # 存储所有样本的标签1
all_labels1 = []        # 存储所有样本的标签2

edge_index = torch.tensor([
    [0, 1], [0, 4], [1, 2], [2, 3], [3, 7], [4, 5], [5, 6], [6, 8], [9, 10], [11, 12], 
    [11, 13], [11, 23], [12, 14], [12, 24], [13, 15], [14, 16], [15, 21], [15, 17], [15, 19],
    [16, 18], [16, 20], [16, 22], [17, 19], [18, 20], [23, 24], [23, 25], [24, 26], [25, 27],
    [26, 28], [27, 29], [27, 31], [28, 30], [28, 32], [29, 31], [30, 32]
    # 添加其他骨骼连接关系...
]).t()  # 转置为 [2, num_edges]

file_dir = r'./recordings/20241211-162000'
for _, row in data.iterrows():
    file_name = row['image']
    label0 = row['label0']
    label1 = row['label1']

    try:
        with open(os.path.join(file_dir, file_name.replace('jpg', 'json')), 'rb') as f:
            pose_data = json.load(f)
    except FileNotFoundError:
        break

    pose_landmarks = pose_data['pose_landmarks']
    node_features = torch.tensor(
        # [[lm["x"], lm["y"], lm["z"], lm['visibility']] for lm in pose_landmarks],
        [[lm["x"], lm["y"], lm["z"]] for lm in pose_landmarks],
        dtype=torch.float32
    )
    node_features[:, :3] = (node_features[:, :3] - node_features[:, :3].mean(dim=0)) / node_features[:, :3].std(dim=0)
    
    all_node_features.append(node_features)
    all_edge_indices.append(edge_index)
    all_labels0.append(label0)
    all_labels1.append(label1)

all_labels0 = torch.tensor(all_labels0, dtype=torch.long)
all_labels1 = torch.tensor(all_labels1, dtype=torch.long)
print("数据加载完成！")

数据加载完成！


In [91]:
class PoseDataset(Dataset):
    def __init__(self, node_features, edge_indices, labels0, labels1):
        self.node_features = node_features
        self.edge_indices = edge_indices
        self.labels0 = labels0
        self.labels1 = labels1

    def __len__(self):
        return len(self.node_features)

    def __getitem__(self, idx):
        return self.node_features[idx], self.edge_indices[idx], self.labels0[idx], self.labels1[idx]

class PoseGNN(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim1, output_dim2):
        super(PoseGNN, self).__init__()
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, hidden_dim)
        self.fc1 = torch.nn.Linear(hidden_dim, output_dim1)  # 输出 label1
        self.fc2 = torch.nn.Linear(hidden_dim, output_dim2)  # 输出 label2

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index).relu()
        x = self.conv2(x, edge_index).relu()
        x = x.mean(dim=0)  # 全局池化

        logits1 = self.fc1(x)  # 输出 logits
        logits2 = self.fc2(x)

        # 使用 softmax 激活函数将 logits 转为概率
        probs1 = F.softmax(logits1, dim=-1)
        probs2 = F.softmax(logits2, dim=-1)

        return probs1, probs2

In [92]:
dataset = PoseDataset(all_node_features, all_edge_indices, all_labels0, all_labels1)

# 创建数据加载器
dataloader = DataLoader(dataset, batch_size=512, shuffle=True)

# 测试数据加载器
for batch in dataloader:
    node_features_batch, edge_indices_batch, labels1_batch, labels2_batch = batch
    print(f"Node Features: {node_features_batch[0].shape}")  # 单个样本节点特征
    print(f"Edge Indices: {edge_indices_batch[0].shape}")    # 单个样本边关系
    print(f"Label1: {labels1_batch}, Label2: {labels2_batch}")
    break

Node Features: torch.Size([33, 3])
Edge Indices: torch.Size([2, 35])
Label1: tensor([2, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 2, 2, 0,
        1, 1, 2, 0, 0, 2, 1, 0, 0, 2, 1, 0, 2, 0, 1, 0, 2, 0, 1, 1, 1, 0, 2, 0,
        1, 1, 1, 0, 0, 0, 1, 0, 2, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0,
        1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 1, 1, 1, 1, 0,
        1, 0, 1, 1, 1, 1, 2, 0, 1, 1, 2, 1, 0, 1, 0, 0, 0, 2, 0, 1, 1, 1, 0, 0,
        1, 0, 0, 2, 0, 0, 2, 0, 2, 1, 2, 2, 1, 0, 0, 0, 2, 0, 1, 1, 0, 0, 1, 0,
        1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1,
        1, 0, 1, 0, 2, 2, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0,
        1, 0, 2, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0,
        1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 2, 2, 0, 1, 1, 1, 1,
        1, 0, 1, 0, 1, 2, 1, 1, 1, 2, 0, 1, 1, 2, 0, 1, 2, 1, 0, 0, 2, 1, 0, 1,
        0, 0, 1, 0, 0, 0, 2, 2, 0, 1, 1, 0,

In [96]:
# 初始化模型和优化器
model = PoseGNN(input_dim=3, hidden_dim=64, output_dim1=3, output_dim2=3).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10, eta_min=5e-6)
criterion = torch.nn.CrossEntropyLoss()

In [97]:
def initialize_weights(m):
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        init.xavier_uniform_(m.weight)  # 或 init.kaiming_uniform_
        if m.bias is not None:
            init.zeros_(m.bias)  # 将偏置初始化为 0

model.apply(initialize_weights)

PoseGNN(
  (conv1): GCNConv(3, 64)
  (conv2): GCNConv(64, 64)
  (fc1): Linear(in_features=64, out_features=3, bias=True)
  (fc2): Linear(in_features=64, out_features=3, bias=True)
)

In [98]:
# 训练模型
model.train()
num_epochs = 1000

for epoch in range(num_epochs):
    epoch_loss = 0.0
    for node_features_batch, edge_indices_batch, labels1_batch, labels2_batch in dataloader:
        node_features_batch = node_features_batch.to(device)
        edge_indices_batch = edge_indices_batch.to(device)
        labels1_batch = labels1_batch.to(device)
        labels2_batch = labels2_batch.to(device)
        
        optimizer.zero_grad()

        batch_loss = 0.0
        for i in range(len(node_features_batch)):
            # 单个样本输入
            logits1, logits2 = model(node_features_batch[i], edge_indices_batch[i])

            # 计算损失
            loss1 = criterion(logits1.unsqueeze(0), labels1_batch[i].unsqueeze(0))
            loss2 = criterion(logits2.unsqueeze(0), labels2_batch[i].unsqueeze(0))
            batch_loss += loss1 + loss2

        batch_loss.backward()
        optimizer.step()

        epoch_loss += batch_loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")


Epoch 1/1000, Loss: 3365.1029
Epoch 2/1000, Loss: 3361.6278
Epoch 3/1000, Loss: 3358.0923
Epoch 4/1000, Loss: 3354.7498
Epoch 5/1000, Loss: 3351.3770
Epoch 6/1000, Loss: 3348.0730
Epoch 7/1000, Loss: 3344.8439
Epoch 8/1000, Loss: 3341.5548
Epoch 9/1000, Loss: 3338.4497
Epoch 10/1000, Loss: 3335.2490
Epoch 11/1000, Loss: 3332.1716
Epoch 12/1000, Loss: 3329.1947
Epoch 13/1000, Loss: 3326.1769
Epoch 14/1000, Loss: 3323.1699
Epoch 15/1000, Loss: 3320.2417
Epoch 16/1000, Loss: 3317.3488
Epoch 17/1000, Loss: 3314.4717
Epoch 18/1000, Loss: 3311.6471
Epoch 19/1000, Loss: 3308.8760
Epoch 20/1000, Loss: 3306.0795
Epoch 21/1000, Loss: 3303.3208
Epoch 22/1000, Loss: 3300.5774
Epoch 23/1000, Loss: 3297.9380
Epoch 24/1000, Loss: 3295.2056
Epoch 25/1000, Loss: 3292.5256
Epoch 26/1000, Loss: 3289.8530
Epoch 27/1000, Loss: 3287.2194
Epoch 28/1000, Loss: 3284.6366
Epoch 29/1000, Loss: 3281.9779
Epoch 30/1000, Loss: 3279.3735
Epoch 31/1000, Loss: 3276.7780
Epoch 32/1000, Loss: 3274.2180
Epoch 33/1000, Lo

In [99]:
print(logits1, logits2, labels1_batch[-1], labels2_batch[-1])

tensor([9.9672e-01, 3.2749e-03, 8.1234e-06], device='cuda:0',
       grad_fn=<SoftmaxBackward0>) tensor([5.0428e-01, 2.3796e-04, 4.9549e-01], device='cuda:0',
       grad_fn=<SoftmaxBackward0>) tensor(0, device='cuda:0') tensor(1, device='cuda:0')


In [None]:
torch.save(model.state_dict(), "pose_gnn_state_1000.pth")
print("模型权重已保存到 pose_gnn_weights.pth")
torch.save(model, "pose_gnn_model_1000.pth")
print("完整模型已保存到 pose_gnn_model.pth")

模型权重已保存到 pose_gnn_weights.pth
完整模型已保存到 pose_gnn_model.pth


: 

In [89]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GATConv

In [None]:
class GAT(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GAT, self).__init__()
        
        # 第一层 GAT
        self.gat1 = GATConv(in_channels, hidden_channels, heads=8, dropout=0.6)
        # 第二层 GAT
        self.gat2 = GATConv(hidden_channels * 8, out_channels, heads=1, dropout=0.6)
    
    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.elu(self.gat1(x, edge_index))  # 第一层 GAT
        x = self.gat2(x, edge_index)         # 第二层 GAT
        return x