In [6]:
import torch


# 这个笔记本的目的就是随手弄一点需要验证的东西
def batch_block_shuffle_augmentation(x, block_size=3, swap_ratio=0.2):
    """
    批量时间序列块随机交换增强方法
    Args:
        x (Tensor): 输入时间序列，形状 (batch_size, T)
        block_size (int): 每块的时间步数，默认为3
        swap_ratio (float): 交换块的比例（0~1），默认为0.2
    Returns:
        Tensor: 增强后的时间序列，形状 (batch_size, T)
    """
    batch_size, T = x.shape

    # 1. 填充到块大小的整数倍
    remainder = T % block_size
    if remainder != 0:
        padding = block_size - remainder
        x_padded = torch.nn.functional.pad(x, (0, padding))
    else:
        x_padded = x

    # 2. 分块 (batch_size, num_blocks, block_size)
    num_blocks = x_padded.size(1) // block_size
    blocks = x_padded.view(batch_size, num_blocks, block_size)
    print("patchified", blocks)
    # 3. 为每个样本生成独立的随机排列和掩码
    # 生成随机排列索引 (batch_size, num_blocks)
    perm = torch.argsort(torch.rand(batch_size, num_blocks, device=x.device), dim=-1)

    # 生成交换掩码 (batch_size, num_blocks)
    mask = torch.rand(batch_size, num_blocks, device=x.device) < swap_ratio
    print(mask, perm)
    # 原始块索引 (0, 1, ..., num_blocks-1)
    original_indices = torch.arange(num_blocks, device=x.device).expand(batch_size, -1)

    # 4. 计算新索引
    new_indices = torch.where(mask, perm, original_indices)

    # 5. 使用高级索引重组块
    batch_idx = torch.arange(batch_size, device=x.device)[:, None]
    print("batch_idx", batch_idx)
    shuffled_blocks = blocks[batch_idx, new_indices]

    # 6. 展平并截断填充部分
    shuffled_x = shuffled_blocks.reshape(batch_size, -1)
    if remainder != 0:
        shuffled_x = shuffled_x[:, :T]

    return shuffled_x
x = torch.tensor([[i for i in range(12)], [i for i in range(12)]], dtype=torch.float32)
print("原始序列:\n", x)

# 应用增强（交换比例0.5）
augmented = batch_block_shuffle_augmentation(x, swap_ratio=0.25)
print("增强后序列:\n", augmented)


原始序列:
 tensor([[ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.],
        [ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.]])
patchified tensor([[[ 0.,  1.,  2.],
         [ 3.,  4.,  5.],
         [ 6.,  7.,  8.],
         [ 9., 10., 11.]],

        [[ 0.,  1.,  2.],
         [ 3.,  4.,  5.],
         [ 6.,  7.,  8.],
         [ 9., 10., 11.]]])
tensor([[False,  True, False, False],
        [False, False, False, False]]) tensor([[3, 2, 0, 1],
        [2, 1, 0, 3]])
batch_idx tensor([[0],
        [1]])
增强后序列:
 tensor([[ 0.,  1.,  2.,  6.,  7.,  8.,  6.,  7.,  8.,  9., 10., 11.],
        [ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.]])


In [25]:
def process_sequence(seq: torch.Tensor, block_size=2, p=0.5):
    """
    :param seq: 输入的序列，形如(AnyShape, T,)
    """
    # note 先创建新的输出的位置，并且进行分块
    original_shape = seq.shape
    length = original_shape[-1]
    count_blocks = length // block_size
    blocks = seq.reshape(-1, count_blocks, block_size)
    print(blocks.shape)
    bs = blocks.shape[0]
    # (bs(B, N), count_blocks, block_size)
    new_blocks = blocks.clone()
    # 选择并打乱块
    for i in range(bs):
        selected_mask = torch.rand(count_blocks) < p
        selected_indices = torch.where(selected_mask)[0]
        shuffled_indices = selected_indices[torch.randperm(len(selected_indices))]
        new_blocks[i][selected_indices] = blocks[i][shuffled_indices]

    # 合并
    return new_blocks.reshape(original_shape)  # 去除填充部分（如有）


seq = torch.Tensor([[i for i in range(12)], [i for i in range(12)]])
print(seq)
preprocessed_seq = process_sequence(seq)
print(preprocessed_seq)

tensor([[ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.],
        [ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9., 10., 11.]])
torch.Size([2, 6, 2])
tensor([[ 0.,  1.,  4.,  5.,  6.,  7.,  2.,  3.,  8.,  9., 10., 11.],
        [ 8.,  9.,  4.,  5.,  2.,  3.,  0.,  1.,  6.,  7., 10., 11.]])


In [21]:
import numpy as np
def permutation(x, max_segments=5, seg_mode="random"):
    """
    :param x 形如(batch_size, N, T)
    """
    orig_steps = np.arange(x.shape[2])
    x = x.cpu().numpy()
    num_segs = np.random.randint(1, max_segments, size=(x.shape[0]))
    print(num_segs)
    ret = np.zeros_like(x)
    for i, pat in enumerate(x):
        if num_segs[i] > 1:
            if seg_mode == "random":
                split_points = np.random.choice(x.shape[2] - 2, num_segs[i] - 1, replace=False)
                split_points.sort()
                splits = np.split(orig_steps, split_points)
            else:
                splits = np.array_split(orig_steps, num_segs[i])
            warp = np.concatenate(np.random.permutation(splits)).ravel()
            ret[i] = pat[0, warp]
        else:
            ret[i] = pat
    return torch.from_numpy(ret)
ts = torch.Tensor([[[i for i in range(12)]]])

print(permutation(ts))ss

[3]


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (3,) + inhomogeneous part.

In [30]:
def process_sequence(seq: torch.Tensor, block_size=2, p=0.5):
    """
    :param seq: 输入的序列，形如(B, T, N)
    """
    # note 先创建新的输出的位置，并且进行分块
    original_shape = seq.shape
    bs = seq.shape[0]
    seq_length = original_shape[-2]
    count_nodes = seq.shape[-1]

    count_blocks = seq_length // block_size
    blocks = seq.reshape(-1, count_blocks, block_size, count_nodes)
    # (bs(B, N), count_blocks, block_size)
    new_blocks = blocks.clone()
    # 选择并打乱块
    for i in range(bs):
        selected_mask = torch.rand(count_blocks) < p
        selected_indices = torch.where(selected_mask)[0]
        shuffled_indices = selected_indices[torch.randperm(len(selected_indices))]
        new_blocks[i][selected_indices] = blocks[i][shuffled_indices]
    # 合并
    return new_blocks.reshape(original_shape)  # 去除填充部分（如有）
b = 2
n = 4
t = 12
input_series = torch.Tensor([[[i for i in range(12)]for __ in range(n)]for _ in range(b)])
input_series = torch.transpose(input_series, 1, 2)
print(input_series, input_series.shape)
print(process_sequence(input_series))

tensor([[[ 0.,  0.,  0.,  0.],
         [ 1.,  1.,  1.,  1.],
         [ 2.,  2.,  2.,  2.],
         [ 3.,  3.,  3.,  3.],
         [ 4.,  4.,  4.,  4.],
         [ 5.,  5.,  5.,  5.],
         [ 6.,  6.,  6.,  6.],
         [ 7.,  7.,  7.,  7.],
         [ 8.,  8.,  8.,  8.],
         [ 9.,  9.,  9.,  9.],
         [10., 10., 10., 10.],
         [11., 11., 11., 11.]],

        [[ 0.,  0.,  0.,  0.],
         [ 1.,  1.,  1.,  1.],
         [ 2.,  2.,  2.,  2.],
         [ 3.,  3.,  3.,  3.],
         [ 4.,  4.,  4.,  4.],
         [ 5.,  5.,  5.,  5.],
         [ 6.,  6.,  6.,  6.],
         [ 7.,  7.,  7.,  7.],
         [ 8.,  8.,  8.,  8.],
         [ 9.,  9.,  9.,  9.],
         [10., 10., 10., 10.],
         [11., 11., 11., 11.]]]) torch.Size([2, 12, 4])
tensor([[[ 8.,  8.,  8.,  8.],
         [ 9.,  9.,  9.,  9.],
         [ 2.,  2.,  2.,  2.],
         [ 3.,  3.,  3.,  3.],
         [ 4.,  4.,  4.,  4.],
         [ 5.,  5.,  5.,  5.],
         [ 6.,  6.,  6.,  6.],
         [ 7