In [1]:
import numpy as np

In [3]:
def load_and_concatenate_data(subjects, data_dir="../preprocess", y_file="y.npy"):
    """
    Load and concatenate EEG data from multiple subjects.

    Parameters:
        subjects (list of str): List of subject names (e.g., ['lzy', 'lyf']).
        data_dir (str): Path to the preprocessing directory.
        y_file (str): Filename for label data.
        return_subject_labels (bool): Whether to return subject label array.

    Returns:
        X_all (ndarray): Concatenated EEG data.
        y_all (ndarray): Concatenated labels.
        subject_labels (ndarray, optional): Array of subject names per sample.
    """
    X_list = []
    y_list = []

    for name in subjects:
        X = np.load(f"{data_dir}/{name}/X.npy")
        y = np.load(f"{data_dir}/{y_file}")

        # Filter out invalid samples
        valid_mask = np.max(np.abs(X), axis=(1, 2)) < 1000
        X = X[valid_mask]
        y = y[valid_mask]

        X_list.append(X)
        y_list.append(y)

    X_all = np.concatenate(X_list, axis=0)
    y_all = np.concatenate(y_list, axis=0)
    return X_all, y_all

In [4]:
X, y = load_and_concatenate_data(["lzy", "lyf"])
print(X.shape, y.shape)

(8962, 8, 1000) (8962,)


In [5]:
# 假设 X.shape = (4724, 8, 1000)，y.shape = (4724,)
X_train, y_train = [], []
X_val, y_val = [], []
X_test, y_test = [], []

group_size = 200
val_size = 20
test_size = 20

for i in range(0, len(X), group_size):
    end = min(i + group_size, len(X))
    group_indices = np.arange(i, end)

    if end - i < 40:
        continue

    group_X = X[group_indices]
    group_y = y[group_indices]


    # 更新有效样本数量
    if len(group_X) < 40:
        continue

    # 再重新划分索引
    test_indices = np.arange(len(group_X))[-test_size:]
    val_indices = np.arange(len(group_X))[-(test_size + val_size):-test_size]
    train_indices = np.arange(len(group_X))[:-(test_size + val_size)]

    X_train.append(group_X[train_indices])
    y_train.append(group_y[train_indices])

    X_val.append(group_X[val_indices])
    y_val.append(group_y[val_indices])

    X_test.append(group_X[test_indices])
    y_test.append(group_y[test_indices])

# 合并所有样本
X_train = np.concatenate(X_train, axis=0)
y_train = np.concatenate(y_train, axis=0)
X_val = np.concatenate(X_val, axis=0)
y_val = np.concatenate(y_val, axis=0)
X_test = np.concatenate(X_test, axis=0)
y_test = np.concatenate(y_test, axis=0)

# 保存结果
np.save(f"X_train.npy", X_train)
np.save(f"y_train.npy", y_train)
np.save(f"X_val.npy", X_val)
np.save(f"y_val.npy", y_val)
np.save(f"X_test.npy", X_test)
np.save(f"y_test.npy", y_test)

print(f"train: {X_train.shape}, {y_train.shape}")
print(f"val: {X_val.shape}, {y_val.shape}")
print(f"test: {X_test.shape}, {y_test.shape}")

train: (7162, 8, 1000), (7162,)
val: (900, 8, 1000), (900,)
test: (900, 8, 1000), (900,)
