In [12]:
import numpy as np
import pandas as pd
import os, random
from tqdm import tqdm

In [13]:
target_per_class=20
duration_range=(3, 10)
frame_interval=0.15
is_trainset_only=True

label2idx = {
    'crawl': 0, 'walk': 1,
    'sit-floor': 2, 'sit-high-chair': 3, 'sit-low-chair': 4, 'stand': 5, 
    'hold-horizontal': 6, 'hold-vertical': 7, 'piggyback': 8, 
    'baby-food': 9, 'bottle': 10, 'breast': 11, 
    'face-down': 12, 'face-side': 13, 'face-up':14, 'roll-over': 15
}


origin_dir = './data_origin/'
aug_dir = './data_aug/'
aug_method = 'GS'

sampling_interval = 0.15  # s/frame
tolerance = 0.15

def find_pair_dirs(base_dir: str, aug_method: str=None):
    sequence_dir = os.path.join(base_dir, aug_method, "sequence") if aug_method else os.path.join(base_dir, "sequence")
    label_dir = os.path.join(aug_dir, aug_method, "label") if aug_method else os.path.join(origin_dir, "label")
    os.makedirs(sequence_dir, exist_ok=True)
    os.makedirs(label_dir, exist_ok=True)
    return sequence_dir, label_dir

origin_sequence_dir, origin_label_dir = find_pair_dirs(origin_dir)
aug_sequence_dir, aug_label_dir = find_pair_dirs(aug_dir, aug_method)

In [14]:
# 加载训练集 ID
train_ids = set()
if is_trainset_only:
    train_txt_path = os.path.join(origin_dir, 'train.txt')
    with open(train_txt_path, 'r') as f:
        train_ids = set(line.strip() for line in f)

# 统计每个 action 的均值和标准差
action_stats = {action: [] for action in label2idx}
for label_file in os.listdir(origin_label_dir):
    if not label_file.endswith('_label.csv'):
        continue
    seq_id = label_file.replace('_label.csv', '')
    if is_trainset_only and seq_id not in train_ids:
        continue

    seq_path = os.path.join(origin_sequence_dir, f"{seq_id}.csv")
    label_path = os.path.join(origin_label_dir, label_file)
    if not os.path.exists(seq_path):
        continue

    label_df = pd.read_csv(label_path)
    action = label_df.iloc[0]['action']
    if action not in action_stats:
        continue

    df = pd.read_csv(seq_path)
    stats = {
        'accel_x_mean': df['accel_x'].mean(),
        'accel_x_std': df['accel_x'].std(),
        'accel_y_mean': df['accel_y'].mean(),
        'accel_y_std': df['accel_y'].std(),
        'accel_z_mean': df['accel_z'].mean(),
        'accel_z_std': df['accel_z'].std(),
        'seq_id': seq_id,
        'label_path': label_path,
        'action': action
    }
    action_stats[action].append(stats)

### Gaussian Sampling

In [15]:
# 开始生成数据
sample_id = 0
for action, samples in tqdm(action_stats.items()):
    total_needed = target_per_class
    num_available = len(samples)
    if num_available == 0:
        continue

    while total_needed > 0:
        batch = samples if total_needed >= num_available else random.sample(samples, total_needed)
        for stat in batch:
            min_len = int(duration_range[0] / frame_interval)
            max_len = int(duration_range[1] / frame_interval)
            length = random.randint(min_len, max_len)

            df = pd.DataFrame({
                'accel_x': np.random.normal(stat['accel_x_mean'], stat['accel_x_std'], length),
                'accel_y': np.random.normal(stat['accel_y_mean'], stat['accel_y_std'], length),
                'accel_z': np.random.normal(stat['accel_z_mean'], stat['accel_z_std'], length),
            }).round(9)

            file_id = f"A{sample_id:05d}"
            df.to_csv(os.path.join(aug_sequence_dir, f"{file_id}.csv"), index=False)

            label_df = pd.read_csv(stat['label_path'])[['gender', 'age', 'dur', 'action']]
            label_df.to_csv(os.path.join(aug_label_dir, f"{file_id}_label.csv"), index=False)

            sample_id += 1
            total_needed -= 1

print(f"[完成] 已生成 {sample_id} 条统计增强数据样本。")

100%|██████████| 16/16 [00:01<00:00,  9.14it/s]

[完成] 已生成 300 条统计增强数据样本。





# Cosine Simulation

In [16]:
aug_method = 'CS'
aug_sequence_dir, aug_label_dir = find_pair_dirs(aug_dir, aug_method)

In [17]:
# 加载训练集 ID
train_ids = set()
if is_trainset_only:
    train_txt_path = os.path.join(origin_dir, 'train.txt')
    with open(train_txt_path, 'r') as f:
        train_ids = set(line.strip() for line in f)

# 统计每个 action 的均值、标准差、主频率
action_stats = {action: [] for action in label2idx}
for label_file in os.listdir(origin_label_dir):
    if not label_file.endswith('_label.csv'):
        continue
    seq_id = label_file.replace('_label.csv', '')
    if is_trainset_only and seq_id not in train_ids:
        continue

    seq_path = os.path.join(origin_sequence_dir, f"{seq_id}.csv")
    label_path = os.path.join(origin_label_dir, label_file)
    if not os.path.exists(seq_path):
        continue

    label_df = pd.read_csv(label_path)
    action = label_df.iloc[0]['action']
    if action not in action_stats:
        continue

    df = pd.read_csv(seq_path)

    # 提取三轴数据
    x = df['accel_x'].values
    y = df['accel_y'].values
    z = df['accel_z'].values
    n = len(x)
    t = np.linspace(0, 1, n, endpoint=False)  # 采样时间轴假设 1s 内均匀分布
    freqs = np.fft.rfftfreq(n, d=1./n)  # 频率轴（单位: Hz）

    # FFT 获取主频率
    def get_main_freq(signal):
        fft = np.fft.rfft(signal)
        fft_mag = np.abs(fft)
        fft_mag[0] = 0  # 忽略直流分量
        return freqs[np.argmax(fft_mag)]

    main_freq_x = get_main_freq(x)
    main_freq_y = get_main_freq(y)
    main_freq_z = get_main_freq(z)

    stats = {
        'accel_x_mean': x.mean(),
        'accel_x_std': x.std(),
        'accel_x_freq': main_freq_x,
        'accel_y_mean': y.mean(),
        'accel_y_std': y.std(),
        'accel_y_freq': main_freq_y,
        'accel_z_mean': z.mean(),
        'accel_z_std': z.std(),
        'accel_z_freq': main_freq_z,
        'seq_id': seq_id,
        'label_path': label_path,
        'action': action
    }
    action_stats[action].append(stats)

In [18]:
# 开始生成 cosine 模拟数据
sample_id = 0
for action, samples in tqdm(action_stats.items()):
    total_needed = target_per_class
    num_available = len(samples)
    if num_available == 0:
        continue

    while total_needed > 0:
        batch = samples if total_needed >= num_available else random.sample(samples, total_needed)
        for stat in batch:
            min_len = int(duration_range[0] / frame_interval)
            max_len = int(duration_range[1] / frame_interval)
            length = random.randint(min_len, max_len)
            t = np.linspace(0, 1, length)  # 1秒内均匀采样（相对时间）

            # 每个通道生成带主频率的余弦序列 + 噪声
            def cosine_noise(mean, std, freq):
                cosine = np.cos(2 * np.pi * freq * t)
                noise = np.random.normal(0, std, length)
                return mean + std * cosine + noise

            accel_x = cosine_noise(stat['accel_x_mean'], stat['accel_x_std'], stat['accel_x_freq'])
            accel_y = cosine_noise(stat['accel_y_mean'], stat['accel_y_std'], stat['accel_y_freq'])
            accel_z = cosine_noise(stat['accel_z_mean'], stat['accel_z_std'], stat['accel_z_freq'])

            df = pd.DataFrame({
                'accel_x': np.round(accel_x, 9),
                'accel_y': np.round(accel_y, 9),
                'accel_z': np.round(accel_z, 9),
            })

            file_id = f"C{sample_id:05d}"
            df.to_csv(os.path.join(aug_sequence_dir, f"{file_id}.csv"), index=False)

            label_df = pd.read_csv(stat['label_path'])[['gender', 'age', 'dur', 'action']]
            label_df.to_csv(os.path.join(aug_label_dir, f"{file_id}_label.csv"), index=False)

            sample_id += 1
            total_needed -= 1

print(f"[完成] 已生成 {sample_id} 条 cosine 模拟数据样本。")

100%|██████████| 16/16 [00:01<00:00,  9.44it/s]

[完成] 已生成 300 条 cosine 模拟数据样本。



