# Task05. 自定义时序数据集的预处理与插补

在本节中，我们将以**合成的 eICU 数据集**为例，演示如何将自定义的医疗时间序列数据预处理为 [PyPOTS](https://github.com/WenjieDu/PyPOTS) 框架所需的输入格式，并使用 PyPOTS 进行插补。

## 关于 eICU 数据集

> The eICU Collaborative Research Database is a freely available multi-center database for critical care research.  
> **Reference**:  
> Pollard TJ, Johnson AEW, Raffa JD, Celi LA, Mark RG, and Badawi O. (2018). *The eICU Collaborative Research Database: A multi-center critical care database for research*. Scientific Data. DOI: [10.1038/sdata.2018.178](http://dx.doi.org/10.1038/sdata.2018.178)  
> Available at: [https://www.nature.com/articles/sdata2018178](https://www.nature.com/articles/sdata2018178)

eICU 数据库包含来自多家医院的 ICU 病患监护记录，是医疗时间序列研究的重要开源资源。在本示例中，我们使用经过脱敏和合成的 eICU 数据集，以避免隐私风险，同时保证数据结构与真实医疗数据一致。

## 任务目标

- 预处理表格格式的医疗时序数据为 PyPOTS 可用格式。
- 使用 PyPOTS 进行插补并还原数据。
- 生成可供后续分析或模型训练的数据集。

## 主要步骤

1. **数据加载**  
   加载原始时序数据，包括特征、标签和样本标识。

2. **构建三维张量**  
   - 将不同样本的特征对齐到统一的时间步长度。
   - 构造三维张量 `(n_samples, n_steps, n_features)`。

3. **数据插补**  
   使用 PyPOTS 提供的插补算法对张量中的缺失值进行填充。

4. **还原 DataFrame 结构**  
   将插补后的张量转换回 DataFrame 形式，保留样本 ID、时间步、特征和标签。

5. **结果保存**  
   将插补结果保存为 `.csv` 或 `.npy` 以供后续分析或建模使用。

## 结果说明

执行完以上步骤后，你将得到三个预处理完成的数据集：
- `df_train_imputed`：训练集插补结果
- `df_val_imputed`：验证集插补结果
- `df_test_imputed`：测试集插补结果

## 示例输出检查

通过 `.shape` 查看数据集维度，确认处理无误：


### 1. 自定义时序数据集的预处理与插补

In [19]:
import pypots
import numpy as np
import pandas as pd
import tsdb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from benchpots.utils.logging import logger, print_final_dataset_info
from benchpots.utils.missingness import create_missingness # 生成人工缺失值

# 设置模型的运行设备为cpu, 如果你有gpu设备可以设置为cuda
DEVICE='cuda'

### 1.1 数据加载

In [20]:
df = pd.read_csv('synthetic_eicu.csv')
df.head()

Unnamed: 0,sample_id,timestamp,apacheadmissiondx,ethnicity,gender,GCS Total,Eyes,Motor,Verbal,admissionheight,...,MAP (mmHg),Invasive BP Diastolic,Invasive BP Systolic,O2 Saturation,Respiratory Rate,Temperature (C),glucose,FiO2,pH,label
0,0,0,17.0,394.0,398.0,,,,,182.9,...,80.0,56.0,119.0,99.0,,,,,,0
1,0,1,17.0,394.0,398.0,,,,,182.9,...,79.0,56.0,112.0,98.0,,,,,,0
2,0,2,17.0,394.0,398.0,413.0,,,,182.9,...,75.0,56.0,112.0,98.0,20.0,35.3,,,,0
3,0,3,17.0,394.0,398.0,,,,,182.9,...,79.0,58.0,108.0,97.0,,,,,,0
4,0,4,17.0,394.0,398.0,,,,,182.9,...,76.0,55.0,111.0,91.0,,,,,,0


In [21]:
'''
确保时间步长的一致性：
如果自定义数据的时间序列长度不一，则需要通过用缺失值 (NaN) 填充较短的序列或截断较长的序列来对其进行标准化。
我们来设置一个最大长度，例如，我们有 48 个时间步长，表示每个患者 48 小时的记录（可以根据数据进行调整）。
'''

max_length = 48

def pad_truncate(df):
    if len(df) > max_length:
        # 如果 DataFrame 超过最大长度，则截断
        # 这里我们选择保留前 max_length 行
        # 你也可以选择其他策略，比如保留最后 max_length 行
        return df.iloc[:max_length]
    else:
        # 如果 DataFrame 少于最大长度，则填充
        # 这里我们用 NaN 填充
        # 你也可以选择其他填充值，比如 0 或者均值等
        padding = pd.DataFrame(
            index=range(max_length - len(df)),
            columns=df.columns
        )
        if not padding.empty:
            return pd.concat([df, padding])
        else:
            return df

# 对每个患者的时间序列进行填充或截断
# 这里假设 'sample_id' 是患者的唯一标识符
# 你需要根据你的数据集中的实际列名进行调整
new_df = df.groupby('sample_id').apply(pad_truncate).reset_index(drop=True)

  new_df = df.groupby('sample_id').apply(pad_truncate).reset_index(drop=True)


### 1.2 数据拆分

In [None]:
unique_sample_ids = new_df['sample_id'].unique()

train_ids, temp_ids = train_test_split(unique_sample_ids, test_size=0.2, random_state=42)
val_ids, test_ids = train_test_split(temp_ids, test_size=0.5, random_state=42)

train_df = new_df[new_df['sample_id'].isin(train_ids)]
val_df = new_df[new_df['sample_id'].isin(val_ids)]
test_df = new_df[new_df['sample_id'].isin(test_ids)]

print(f"Train DataFrame shape: {train_df.shape}")
print(f"Validation DataFrame shape: {val_df.shape}")
print(f"Test DataFrame shape: {test_df.shape}")

# 拆分特征和标签
def separate_features_labels(df, feature_cols, label_col='label'):
    X = df[feature_cols].values.reshape(-1, 48, len(feature_cols))
    # 获取唯一的样本 ID
    unique_ids = df['sample_id'].unique()
    # 获取每个样本 ID 的第一个标签
    y = df.groupby('sample_id')[label_col].first().loc[unique_ids].values
    return X, y

# 选择特征列
feature_columns = [col for col in df.columns if col not in ['sample_id', 'label', 'timestamp']]

train_X, train_y = separate_features_labels(train_df.copy(), feature_columns)
val_X, val_y = separate_features_labels(val_df.copy(), feature_columns)
test_X, test_y = separate_features_labels(test_df.copy(), feature_columns)

print(f"Train features shape: {train_X.shape}, Train labels shape: {train_y.shape}")
print(f"Validation features shape: {val_X.shape}, Validation labels shape: {val_y.shape}")
print(f"Test features shape: {test_X.shape}, Test labels shape: {test_y.shape}")

Train DataFrame shape: (235584, 23)
Validation DataFrame shape: (29472, 23)
Test DataFrame shape: (29472, 23)
Train features shape: (4908, 48, 20), Train labels shape: (4908,)
Validation features shape: (614, 48, 20), Validation labels shape: (614,)
Test features shape: (614, 48, 20), Test labels shape: (614,)


### 1.3 数据标准化

In [23]:
scaler = StandardScaler()
# Flatten the data before scaling and then reshape it into time series samples
train_X = scaler.fit_transform(train_X.reshape(-1, train_X.shape[-1])).reshape(train_X.shape)
val_X = scaler.transform(val_X.reshape(-1, val_X.shape[-1])).reshape(val_X.shape)
test_X = scaler.transform(test_X.reshape(-1, test_X.shape[-1])).reshape(test_X.shape)

In [24]:
processed_dataset = {
        # general info
        "n_classes": len(np.unique(train_y)),
        "n_steps": train_X.shape[-2],
        "n_features": train_X.shape[-1],
        "scaler": scaler,
        # train set
        "train_X": train_X,
        "train_y": train_y.flatten(),
        # val set
        "val_X": val_X,
        "val_y": val_y.flatten(),
        # test set
        "test_X": test_X,
        "test_y": test_y.flatten(),
    }

### 1.4 创建人工缺失值

In [25]:
# 保留原始数据中的ground truth以用于评估
train_X_ori = train_X
val_X_ori = val_X
test_X_ori = test_X

rate = 0.1 # 10%缺失率

# 在训练集上创建缺失值作为ground truth
train_X = create_missingness(train_X, rate, 'point')

# 在验证集上创建缺失值作为ground truth
val_X = create_missingness(val_X, rate, 'point' )

# 在测试集上创建缺失值作为ground truth
test_X = create_missingness(test_X, rate, 'point' )


processed_dataset["train_X"] = train_X
processed_dataset["val_X"] = val_X
processed_dataset["test_X"] = test_X

processed_dataset['train_X_ori'] = train_X_ori
processed_dataset['val_X_ori'] = val_X_ori
processed_dataset['test_X_ori'] = test_X_ori

### 1.5 准备用于插补的数据

In [26]:
# 计算掩码来指示X_ori数据中的真实位置，将被用来评估模型性能

train_X_indicating_mask = np.isnan(train_X_ori) ^ np.isnan(train_X)
val_X_indicating_mask = np.isnan(val_X_ori) ^ np.isnan(val_X)
test_X_indicating_mask = np.isnan(test_X_ori) ^ np.isnan(test_X)

# 组装训练集
dataset_for_training = {
    "X": processed_dataset['train_X'],
    'X_ori': processed_dataset['train_X_ori'],
}

# 组装验证集
dataset_for_validating = {
    "X": processed_dataset['val_X'],
    "X_ori": processed_dataset['val_X_ori'],
}

# 组装测试集
dataset_for_testing = {
    "X": processed_dataset['test_X'],
    "X_ori": processed_dataset['test_X_ori'],
  }

test_X_indicating_mask = np.isnan(processed_dataset['test_X_ori']) ^ np.isnan(processed_dataset['test_X'])

# 度量函数不接受 NaN 输入，因此用 0 填充 NaN
test_X_ori = np.nan_to_num(processed_dataset['test_X_ori'])

# 2. 使用SAITS对自定义数据集中的缺失值进行插补

### 2.1 插补数据

In [27]:
from pypots.nn.functional import calc_mae
from pypots.optim import Adam
from pypots.imputation import SAITS

# 创建 SAITS 模型
# SAITS 模型的参数可以根据需要进行调整
saits = SAITS(
    n_steps=processed_dataset['n_steps'],
    n_features=processed_dataset['n_features'],
    n_layers=1,
    d_model=256,
    d_ffn=128,
    n_heads=4,
    d_k=64,
    d_v=64,
    dropout=0.1,
    ORT_weight=1,  # you can adjust the weight values of arguments ORT_weight
    # and MIT_weight to make the SAITS model focus more on one task. Usually you can just leave them to the default values, i.e. 1.
    MIT_weight=1,
    batch_size=32,
    # here we set epochs=10 for a quick demo, you can set it to 100 or more for better performance
    epochs=10,
    # here we set patience=3 to early stop the training if the evaluting loss doesn't decrease for 3 epoches.
    # You can leave it to defualt as None to disable early stopping.
    patience=3,
    # give the optimizer. Different from torch.optim.Optimizer, you don't have to specify model's parameters when
    # initializing pypots.optim.Optimizer. You can also leave it to default. It will initilize an Adam optimizer with lr=0.001.
    optimizer=Adam(lr=1e-3),
    # this num_workers argument is for torch.utils.data.Dataloader. It's the number of subprocesses to use for data loading.
    # Leaving it to default as 0 means data loading will be in the main process, i.e. there won't be subprocesses.
    # You can increase it to >1 if you think your dataloading is a bottleneck to your model training speed
    num_workers=0,
    # just leave it to default as None, PyPOTS will automatically assign the best device for you.
    # Set it as 'cpu' if you don't have CUDA devices. You can also set it to 'cuda:0' or 'cuda:1' if you have multiple CUDA devices, even parallelly on ['cuda:0', 'cuda:1']
    device=DEVICE,
    # set the path for saving tensorboard and trained model files
    saving_path="tutorial_results/imputation/saits",
    # only save the best model after training finished.
    # You can also set it as "better" to save models performing better ever during training.
    model_saving_strategy="best",
)

# 训练阶段，使用训练集和验证集
saits.fit(train_set=dataset_for_training, val_set=dataset_for_validating)

# 测试阶段，插补缺失值
test_set_imputation = saits.impute(dataset_for_testing)

# calculate mean absolute error on the ground truth (artificially-missing values)
testing_mae = calc_mae(
    test_set_imputation,
    test_X_ori,
    test_X_indicating_mask,
)
print(f"Testing mean absolute error: {testing_mae:.4f}")


2025-05-10 07:49:23 [INFO]: Using the given device: cuda
2025-05-10 07:49:23 [INFO]: Model files will be saved to tutorial_results/imputation/saits/20250510_T074923
2025-05-10 07:49:23 [INFO]: Tensorboard file will be saved to tutorial_results/imputation/saits/20250510_T074923/tensorboard
2025-05-10 07:49:23 [INFO]: Using customized MAE as the training loss function.
2025-05-10 07:49:23 [INFO]: Using customized MSE as the validation metric function.
2025-05-10 07:49:23 [INFO]: SAITS initialized with the given hyperparameters, the number of trainable parameters: 691,248
2025-05-10 07:49:25 [INFO]: Epoch 001 - training loss (MAE): 0.7539, validation MSE: 0.2271
2025-05-10 07:49:28 [INFO]: Epoch 002 - training loss (MAE): 0.4819, validation MSE: 0.2160
2025-05-10 07:49:31 [INFO]: Epoch 003 - training loss (MAE): 0.4396, validation MSE: 0.1971
2025-05-10 07:49:33 [INFO]: Epoch 004 - training loss (MAE): 0.4118, validation MSE: 0.1998
2025-05-10 07:49:36 [INFO]: Epoch 005 - training loss (M

Testing mean absolute error: 0.2046


In [28]:
# 插补训练集和验证集
train_set_imputation = saits.impute(dataset_for_training)
val_set_imputation = saits.impute(dataset_for_validating)

### 2.2 将 3D NumPy 数组还原回原始的DataFrame

In [29]:
def convert_to_dataframe(X, labels, sample_ids, scaler, invers_norm = False, n_steps=48):
    """
    Convert 3D NumPy array to a DataFrame with sample_id, timestamp, and original scale features.

    Parameters:
    - X: 3D NumPy array of shape (n_samples, n_steps, n_features)
    - labels: 1D NumPy array of shape (n_samples,) -> labels for each sample
    - sample_ids: 1D NumPy array with sample IDs corresponding to each sample
    - scaler: Scaler used for normalization (MinMaxScaler/StandardScaler)
    - n_steps: Number of time steps (default: 48)

    Returns:
    - DataFrame with sample_id, timestamp, features, and labels
    """
    n_samples, _, n_features = X.shape

    assert len(feature_columns) == n_features, "Number of features in X does not match feature_columns"
    assert len(labels) == n_samples, "Number of labels does not match number of samples"
    assert len(sample_ids) == n_samples, "Number of sample IDs does not match number of samples"

    # extract the last timestep record for each sample_id  to get one row per sample,
    # using the final timestep’s data (e.g., the last hour if n_steps=48 represents hourly data)

    X_last = X[:, -1, :]  # Shape: (n_samples, n_features)

    # Inverse normalization
    if invers_norm:
      X_original = scaler.inverse_transform(X_last)
    else:
      X_original = X_last


    # Create DataFrame
    df = pd.DataFrame(X_original, columns=feature_columns)
    df['sample_id'] = sample_ids
    df['timestamp'] = n_steps - 1  # Last timestep (e.g., 47 if 0-indexed)
    df['label'] = labels

    # Reorder columns: sample_id, timestamp, features, label
    df = df[['sample_id', 'timestamp'] + feature_columns + ['label']]

    return df

In [30]:
df_train_imputed = convert_to_dataframe(train_set_imputation, train_y, train_ids, scaler)
df_val_imputed = convert_to_dataframe(val_set_imputation, val_y, val_ids, scaler)
df_test_imputed = convert_to_dataframe(test_set_imputation, test_y, test_ids, scaler)

# 检查数据集的形状
print(df_train_imputed.shape, df_val_imputed.shape, df_test_imputed.shape)

(4908, 23) (614, 23) (614, 23)


In [31]:
df_train_imputed.head()

Unnamed: 0,sample_id,timestamp,apacheadmissiondx,ethnicity,gender,GCS Total,Eyes,Motor,Verbal,admissionheight,...,MAP (mmHg),Invasive BP Diastolic,Invasive BP Systolic,O2 Saturation,Respiratory Rate,Temperature (C),glucose,FiO2,pH,label
0,3098,47,-0.676732,0.3022,0.918308,0.795591,0.674137,0.524492,0.771686,1.165612,...,0.318641,-0.023323,0.19702,-0.199657,1.652006,-0.366409,-0.071124,-0.048971,0.201896,0
1,4221,47,-0.516926,0.3022,-1.088959,0.365205,0.557349,0.474666,0.538797,-1.677569,...,-0.412638,-0.616842,-0.466705,-0.407183,-0.905164,-0.806545,-0.528615,-0.226574,0.09355,0
2,3154,47,-0.490291,0.3022,-1.088959,0.748909,0.690633,0.461668,0.76076,-0.16621,...,-0.678885,-1.012521,-0.289712,0.09749,0.85289,0.024736,-0.034354,-0.084154,0.227154,0
3,4041,47,-0.730001,0.3022,-1.088959,0.557269,0.479801,0.309801,0.727504,-1.462887,...,0.465096,0.312368,0.427536,0.09749,3.569884,-0.524262,3.389644,0.225751,0.242584,1
4,2664,47,-0.78327,0.3022,-1.088959,0.723559,0.775377,0.465457,0.695992,-1.248206,...,-0.70226,-0.682788,-0.732195,-0.793952,1.172537,0.051975,-0.165092,-0.009692,0.099024,0


In [33]:
df_train_imputed.to_csv('train_imputed.csv', index=False)
df_val_imputed.to_csv('val_imputed.csv', index=False)
df_test_imputed.to_csv('test_imputed.csv', index=False)