In [1]:
import numpy as np
import os  # 导入 os 模块来处理路径

# 1. 加载 .npz 文件
file_path = '/home/xu/code/Beyondmimic/artifacts/walk1_subject1:v0/motion.npz'
data = np.load(file_path)

# 获取文件所在目录
output_dir = os.path.dirname(file_path)  # 提取目录部分
print(f"Output directory: {output_dir}")
# 查看文件中包含哪些数组
# print("Available arrays:", list(data.keys()))

Output directory: /home/xu/code/Beyondmimic/artifacts/walk1_subject1:v0


In [2]:
# 2. 获取帧率 fps 并转换为 Python 标量
fps = data['fps'].item()  # 使用 .item() 提取标量
dt = 1.0 / fps
print(f"FPS: {fps}, Time step (dt): {dt:.4f} seconds")

# 3. 定义要截取的时间范围 (秒)
# time_ranges = [(0.0, 20.7)]
time_ranges = [(0.0, 12.0)]

FPS: 50, Time step (dt): 0.0200 seconds


In [44]:

# 用于存储所有片段，以便后续拼接
segments = []

# 4. 为每个时间范围截取数据
for i, (start_time, end_time) in enumerate(time_ranges):
    print(f"\n--- Processing segment {i+1}: [{start_time}, {end_time}] seconds ---")
    
    # 将时间转换为索引
    start_idx = int(round(start_time / dt))
    end_idx = int(round(end_time / dt)) + 1  # +1 to include the end frame

    # 检查索引是否在有效范围内
    total_frames = data['joint_pos'].shape[0]
    if start_idx >= total_frames:
        print(f"Start time {start_time}s is beyond the data length. Skipping.")
        continue
    end_idx = min(end_idx, total_frames)
    
    print(f"Index range: [{start_idx}, {end_idx-1}] (inclusive), Number of frames: {end_idx - start_idx}")
    
    # 截取所有数组
    segment_data = {}
    for key in data.keys():
        if key == 'fps':
            segment_data[key] = data[key]  # 保持为数组
        else:
            segment_data[key] = data[key][start_idx:end_idx]
    
    # 保存独立片段
    base_name = os.path.splitext(os.path.basename(file_path))[0]  # 'motion'
    output_filename = f"{base_name}_segment_new_{i+1}.npz"
    full_output_path = os.path.join(output_dir, output_filename)
    np.savez(full_output_path, **segment_data)
    print(f"Segment saved to '{full_output_path}'")
    
    # 将当前片段数据存入列表，用于后续拼接
    segments.append(segment_data)

# 关闭原始文件
data.close()

print("\nProcessing complete.")


--- Processing segment 1: [0.0, 8.85] seconds ---
Index range: [0, 442] (inclusive), Number of frames: 443
Segment saved to '/home/xu/code/Beyondmimic/artifacts/walk1_subject1:v0/motion_segment_new_1.npz'

Processing complete.


In [3]:
# 4. 定义“冻结”时间点
freeze_time = time_ranges[0][1]  # 与 time_ranges[0][1] 一致
total_frames_original = data['joint_pos'].shape[0]
total_time_original = total_frames_original * dt
print(f"Original motion duration: {total_time_original:.2f} seconds")

# 将冻结时间转换为索引
freeze_idx = int(round(freeze_time / dt))
if freeze_idx >= total_frames_original:
    print(f"Warning: freeze_time {freeze_time}s exceeds data length {total_time_original}s. Using last frame.")
    freeze_idx = total_frames_original - 1

print(f"Freeze at time {freeze_time}s (index {freeze_idx})")

# 5. 创建新数据：前10秒用原始数据，之后冻结在10s的姿态
modified_data = {}
for key in data.keys():
    if key == 'fps':
        # fps 保持不变
        modified_data[key] = data[key]
    else:
        original_array = data[key]  # 形状: (T_orig, ...)，T_orig 是原始总帧数
        # 创建一个与原始数组形状相同的新数组
        new_array = np.zeros_like(original_array)
        
        # 第一部分：[0, freeze_idx] 使用原始数据（即 0-10s）
        new_array[:freeze_idx + 1] = original_array[:freeze_idx + 1]
        
        # 第二部分：[freeze_idx + 1, end] 全部复制 freeze_idx 时刻的那一帧
        # 使用 [freeze_idx:freeze_idx+1] 来保持维度一致，便于广播复制
        if freeze_idx + 1 < total_frames_original:
            new_array[freeze_idx + 1:] = original_array[freeze_idx:freeze_idx + 1]
        
        modified_data[key] = new_array

# 6. 保存修改后的数据到原目录
base_name = os.path.splitext(os.path.basename(file_path))[0]  # 例如 'motion'
output_filename = f"{base_name}_frozen_2.npz"
full_output_path = os.path.join(output_dir, output_filename)

np.savez(full_output_path, **modified_data)
print(f"Modified motion saved to '{full_output_path}'")
print(f"Shape of 'joint_pos' in new file: {modified_data['joint_pos'].shape}")

# 7. （可选）验证：检查最后几帧是否完全相同（冻结成功）
print("\n--- Validation: Checking if last few frames are identical (frozen) ---")
joint_pos = modified_data['joint_pos']
last_frame = joint_pos[-1]
frames_to_check = joint_pos[-5:]  # 检查最后5帧

all_same = all(np.allclose(last_frame, frame) for frame in frames_to_check)
print(f"Last 5 frames are identical: {all_same}")

# 8. 关闭原始文件
data.close()

print("\nProcessing complete.")

Original motion duration: 261.30 seconds
Freeze at time 12.0s (index 600)
Modified motion saved to '/home/xu/code/Beyondmimic/artifacts/walk1_subject1:v0/motion_frozen_2.npz'
Shape of 'joint_pos' in new file: (13065, 29)

--- Validation: Checking if last few frames are identical (frozen) ---
Last 5 frames are identical: True

Processing complete.
