In [None]:
import numpy as np
import os
from sklearn.model_selection import train_test_split

DATA_PATH = "I:\\Ece496\\custom_data\\preprocess"

# 加载数据
X = np.load(os.path.join(DATA_PATH, "X.npy"), allow_pickle=True)
y = np.load(os.path.join(DATA_PATH, "y.npy"), allow_pickle=True)

# 1. 划分数据集（80%训练集，10%验证集，10%测试集）
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 2. 计算训练集的均值和标准差
def compute_mean_std(X_train):
    all_keypoints = np.concatenate([sample for sample in X_train], axis=0)
    mean = np.nanmean(all_keypoints, axis=0)
    std = np.nanstd(all_keypoints, axis=0)
    return mean, std

mean, std = compute_mean_std(X_train)

# 保存均值和标准差，方便以后使用
np.save(os.path.join(DATA_PATH, "mean.npy"), mean)
np.save(os.path.join(DATA_PATH, "std.npy"), std)

# 3. 归一化函数
def normalize_sample(sample, mean, std):
    return (sample - mean) / std

# 4. 使用训练集的均值和标准差对所有数据集进行归一化
X_train = [normalize_sample(sample, mean, std) for sample in X_train]
X_val = [normalize_sample(sample, mean, std) for sample in X_val]
X_test = [normalize_sample(sample, mean, std) for sample in X_test]

# 5. 保存分割和归一化后的数据
np.save(os.path.join(DATA_PATH, "X_train.npy"), X_train)
np.save(os.path.join(DATA_PATH, "y_train.npy"), y_train)
np.save(os.path.join(DATA_PATH, "X_val.npy"), X_val)
np.save(os.path.join(DATA_PATH, "y_val.npy"), y_val)
np.save(os.path.join(DATA_PATH, "X_test.npy"), X_test)
np.save(os.path.join(DATA_PATH, "y_test.npy"), y_test)

print("数据集划分和标准化完成，并已保存。")


In [None]:
import torch
import torch.nn as nn
import numpy as np

class Preprocessing(nn.Module):
    def __init__(self, global_mean, global_std, fill_nan_value=0.0):
        super(Preprocessing, self).__init__()
        self.global_mean = global_mean
        self.global_std = global_std
        self.fill_nan_value = fill_nan_value

    # 归一化处理
    def normalize(self, x):
        x = (x - self.global_mean) / self.global_std
        return x

    # 填充 NaN
    def fill_nans(self, x):
        x[torch.isnan(x)] = self.fill_nan_value
        return x
    
    # 关键点中心化
    def centralize_keypoints(self, keypoints, center_idx=85):
        center_x = keypoints[center_idx * 3]
        center_y = keypoints[center_idx * 3 + 1]
        center_z = keypoints[center_idx * 3 + 2]

        # 坐标中心化
        keypoints[::3] -= center_x
        keypoints[1::3] -= center_y
        keypoints[2::3] -= center_z
        return keypoints

    def forward(self, x):
        # 将数据形状调整为 (seq_len, n_landmarks, 3)
        x = x.reshape(x.shape[0], 3, -1).permute(0, 2, 1)
        
        # 归一化与 NaN 填充
        x = self.normalize(x)
        x = self.fill_nans(x)
        
        return x


In [None]:
def temporal_resample(sequence, target_length=105, min_scale=0.5, max_scale=1.5):
    # 随机缩放比例
    scale = np.random.uniform(min_scale, max_scale)
    new_length = int(sequence.shape[0] * scale)
    
    # 使用插值重采样到新长度
    resampled_sequence = F.interpolate(sequence.unsqueeze(0), size=new_length, mode='linear', align_corners=False).squeeze(0)
    
    # 对重采样后的序列进行裁剪或插值，保证返回长度为 target_length
    final_sequence = F.interpolate(resampled_sequence.unsqueeze(0), size=target_length, mode='linear', align_corners=False).squeeze(0)
    return final_sequence

def temporal_shift(sequence, target_length=105, max_shift=10):
    # 随机选择偏移量
    shift = np.random.randint(-max_shift, max_shift)
    
    # 对序列进行滚动
    shifted_sequence = torch.roll(sequence, shifts=shift, dims=0)
    
    # 对滚动后的序列进行裁剪或插值，保证返回长度为 target_length
    final_sequence = F.interpolate(shifted_sequence.unsqueeze(0), size=target_length, mode='linear', align_corners=False).squeeze(0)
    return final_sequence

def windowed_cutmix(sequence1, sequence2, target_length=105):
    length1, length2 = sequence1.shape[0], sequence2.shape[0]
    
    # 随机选择切割比例
    cut_ratio = np.random.rand()
    cut_point1 = int(length1 * cut_ratio)
    cut_point2 = int(length2 * cut_ratio)
    
    # 生成拼接序列
    mixed_sequence = torch.cat((sequence1[:cut_point1], sequence2[cut_point2:]), dim=0)
    
    # 对拼接后的序列进行裁剪或插值，保证返回长度为 target_length
    final_sequence = F.interpolate(mixed_sequence.unsqueeze(0), size=target_length, mode='linear', align_corners=False).squeeze(0)
    return final_sequence


In [None]:

def flip_keypoints(data, flip_indices):#鼻尖的indice
    # 反转 X 坐标
    data[:, :, 0] = -data[:, :, 0]
    # 根据指定的索引交换左右
    flipped_data = data[:, flip_indices]
    return flipped_data


In [None]:
def random_keypoint_dropout(sequence, num_points_to_drop=6, num_time_windows=3):
    seq_len, num_keypoints, _ = sequence.shape

    for _ in range(num_time_windows):
        # 随机选择时间窗口
        start = np.random.randint(0, seq_len)
        end = min(seq_len, start + np.random.randint(1, seq_len // num_time_windows))

        # 随机选择关键点索引进行遮挡
        drop_indices = np.random.choice(num_keypoints, num_points_to_drop, replace=False)
        sequence[start:end, drop_indices, :] = 0  # 填充 0，模拟遮挡
    
    return sequence

In [None]:
def drop_face_or_pose(sequence, drop_face_prob=0.2, drop_pose_prob=0.2, face_indices=None, pose_indices=None):
    # 随机决定是否丢弃面部关键点
    if np.random.rand() < drop_face_prob and face_indices is not None:
        sequence[:, face_indices, :] = 0  # 使用0填充，模拟丢弃

    # 随机决定是否丢弃姿态关键点
    if np.random.rand() < drop_pose_prob and pose_indices is not None:
        sequence[:, pose_indices, :] = 0  # 使用0填充，模拟丢弃
    
    return sequence
def drop_hand_keypoints(sequence, drop_hand_prob=0.05, left_hand_indices=None, right_hand_indices=None):
    # 随机决定是否丢弃左右手关键点
    if np.random.rand() < drop_hand_prob:
        if left_hand_indices is not None:
            sequence[:, left_hand_indices, :] = 0  # 左手关键点全置为0
        if right_hand_indices is not None:
            sequence[:, right_hand_indices, :] = 0  # 右手关键点全置为0
            
    return sequence
def temporal_mask(sequence, mask_prob=0.3, max_mask_len=10):
    if np.random.rand() < mask_prob:
        seq_len = sequence.shape[0]
        mask_len = np.random.randint(1, max_mask_len)
        start = np.random.randint(0, seq_len - mask_len)
        
        # 时间窗口内的关键点置为0
        sequence[start:start + mask_len, :, :] = 0
    
    return sequence
def spatial_mask(sequence, mask_prob=0.3, max_points=10):
    if np.random.rand() < mask_prob:
        num_keypoints = sequence.shape[1]
        mask_points = np.random.choice(num_keypoints, max_points, replace=False)
        
        # 空间上将选中的关键点置为0
        sequence[:, mask_points, :] = 0
    
    return sequence


In [None]:
def affine_transform(sequence, scale_range=(0.9, 1.1), translation_range=(-0.1, 0.1), rotation_range=(-10, 10)):
    # 随机缩放
    scale = np.random.uniform(*scale_range)
    sequence = sequence * scale

    # 随机平移
    translation = np.random.uniform(*translation_range, size=(1, sequence.shape[1], sequence.shape[2]))
    sequence = sequence + translation

    # 随机旋转（围绕Z轴旋转）
    angle = np.radians(np.random.uniform(*rotation_range))
    rotation_matrix = torch.tensor([
        [np.cos(angle), -np.sin(angle), 0],
        [np.sin(angle),  np.cos(angle), 0],
        [0, 0, 1]
    ], dtype=sequence.dtype)
    
    sequence = torch.matmul(sequence, rotation_matrix)
    return sequence

In [None]:
import numpy as np

def compute_velocity_and_acceleration(sequence):
    # 计算速度（相邻帧之间的差值）
    velocity = np.diff(sequence, axis=0, prepend=np.nan)
    # 计算加速度（相邻速度之间的差值）
    acceleration = np.diff(velocity, axis=0, prepend=np.nan)
    
    return velocity, acceleration

