# 加载数据

In [1]:
import pandas as pd
import os

# 设置RawData文件夹路径
raw_data_path = "./RawData"

# 读取数据文件
gait_para = pd.read_csv(os.path.join(raw_data_path, "gait-parameters.csv"))
spec_stats = pd.read_csv(os.path.join(raw_data_path, "spectral_stats.csv"))
tempo_stats = pd.read_csv(os.path.join(raw_data_path, "temporal_stats.csv"))
PD_characteristics = pd.read_excel(
    os.path.join(raw_data_path, "PD_disease_characteristics.xlsx")
)

# 合并有相同 ID 号的数据

## gait_para 步态参数数据

对特征后缀的解释
- ConOg 代表 condition=Overground
- ConTm 代表 condition=Treadmill

In [2]:
# 分离Overground和Treadmill数据
gait_overground = gait_para[gait_para["condition"] == "Overground"].copy()
gait_treadmill = gait_para[gait_para["condition"] == "Treadmill"].copy()

# 删除condition列
gait_overground = gait_overground.drop("condition", axis=1)
gait_treadmill = gait_treadmill.drop("condition", axis=1)

# 为除ID和Group外的列添加后缀
columns_to_rename = [
    col for col in gait_overground.columns if col not in ["ID", "Group"]
]

# 为Overground数据添加ConOg后缀
overground_rename_dict = {col: f"{col}_ConOg" for col in columns_to_rename}
gait_overground = gait_overground.rename(columns=overground_rename_dict)

# 为Treadmill数据添加ConTm后缀
treadmill_rename_dict = {col: f"{col}_ConTm" for col in columns_to_rename}
gait_treadmill = gait_treadmill.rename(columns=treadmill_rename_dict)

# 合并数据，基于ID进行外连接
gait_para = pd.merge(
    gait_overground,
    gait_treadmill[
        ["ID"] + [col for col in gait_treadmill.columns if col not in ["ID", "Group"]]
    ],
    on="ID",
    how="outer",
)

## spec_stats 频谱统计数据

condition 后缀同上

side 后缀：
- SR 代表 side=right
- SL 代表 side=left

frequency 后缀：
- FT 代表 frequency=theta
- FA 代表 frequency=alpha
- FG 代表 frequency=gamma
- FLB 代表 frequency=low_beta
- FHB 代表 frequency=high_beta

新特征名：
原特征名_condition后缀_side后缀_frequency后缀

In [3]:
# 定义后缀映射
condition_suffix = {"Overground": "ConOg", "Treadmill": "ConTm"}

side_suffix = {"right": "SR", "left": "SL"}

frequency_suffix = {
    "theta": "FT",
    "alpha": "FA",
    "gamma": "FG",
    "low_beta": "FLB",
    "high_beta": "FHB",
}

# 获取需要重命名的列（除了pid、Group、condition、side、frequency）
columns_to_rename = [
    col
    for col in spec_stats.columns
    if col not in ["pid", "group", "condition", "side", "frequency"]
]

# 创建新的DataFrame来存储重构后的数据
spec_stats_list = []

# 遍历所有condition、side、frequency的组合
for condition in spec_stats["condition"].unique():
    for side in spec_stats["side"].unique():
        for frequency in spec_stats["frequency"].unique():
            # 筛选特定组合的数据
            subset = spec_stats[
                (spec_stats["condition"] == condition)
                & (spec_stats["side"] == side)
                & (spec_stats["frequency"] == frequency)
            ].copy()

            if not subset.empty:
                # 删除condition、side、frequency列
                subset = subset.drop(["condition", "side", "frequency"], axis=1)

                # 构建后缀
                suffix = f"_{condition_suffix[condition]}_{side_suffix[side]}_{frequency_suffix[frequency]}"

                # 为需要重命名的列添加后缀
                rename_dict = {col: f"{col}{suffix}" for col in columns_to_rename}
                subset = subset.rename(columns=rename_dict)

                spec_stats_list.append(subset)

# 基于pid合并所有数据
spec_stats = spec_stats_list[0]
for df in spec_stats_list[1:]:
    spec_stats = pd.merge(
        spec_stats,
        df[["pid"] + [col for col in df.columns if col not in ["pid", "group"]]],
        on="pid",
        how="outer",
    )

## tempo_stats 时间维度数据

condition 和 side 后缀同上

time 后缀：
- TL 代表 time=lift
- TD 代表 time=drop

新特征名：
原特征名_condition后缀_side后缀_time后缀

In [4]:
# 定义后缀映射
condition_suffix = {"Overground": "ConOg", "Treadmill": "ConTm"}

side_suffix = {"right": "SR", "left": "SL"}

time_suffix = {
    "lift": "TL",
    "drop": "TD",
}

# 获取需要重命名的列（除了pid、Group、condition、side、time）
columns_to_rename = [
    col
    for col in tempo_stats.columns
    if col not in ["pid", "group", "condition", "side", "time"]
]

# 创建新的DataFrame来存储重构后的数据
tempo_stats_list = []

# 遍历所有condition、side、time的组合
for condition in tempo_stats["condition"].unique():
    for side in tempo_stats["side"].unique():
        for time in tempo_stats["time"].unique():
            # 筛选特定组合的数据
            subset = tempo_stats[
                (tempo_stats["condition"] == condition)
                & (tempo_stats["side"] == side)
                & (tempo_stats["time"] == time)
            ].copy()

            if not subset.empty:
                # 删除condition、side、time列
                subset = subset.drop(["condition", "side", "time"], axis=1)

                # 构建后缀
                suffix = f"_{condition_suffix[condition]}_{side_suffix[side]}_{time_suffix[time]}"

                # 为需要重命名的列添加后缀
                rename_dict = {col: f"{col}{suffix}" for col in columns_to_rename}
                subset = subset.rename(columns=rename_dict)

                tempo_stats_list.append(subset)

# 基于pid合并所有数据
tempo_stats = tempo_stats_list[0]
for df in tempo_stats_list[1:]:
    tempo_stats = pd.merge(
        tempo_stats,
        df[["pid"] + [col for col in df.columns if col not in ["pid", "group"]]],
        on="pid",
        how="outer",
    )

## 合并所有数据（除帕金森特征外）为 data 表格

In [5]:
tempo_stats = tempo_stats.rename(columns={"pid": "ID", "group": "Group"})
spec_stats = spec_stats.rename(columns={"pid": "ID", "group": "Group"})

# 步骤1: 先合并gait_para和spec_stats
data = pd.merge(
    gait_para,
    spec_stats[
        ["ID"] + [col for col in spec_stats.columns if col not in ["ID", "Group"]]
    ],
    on="ID",
    how="outer",
)

# 步骤2: 合并tempo_stats
data = pd.merge(
    data,
    tempo_stats[
        ["ID"] + [col for col in tempo_stats.columns if col not in ["ID", "Group"]]
    ],
    on="ID",
    how="outer",
)


# 数据类型转换

In [6]:
# 检查合并后数据的列类型
print("=== data 表格列类型 ===")
print(data.dtypes)
print(f"\n数据形状: {data.shape}")

# 按数据类型分组查看列名
print("\n=== 按数据类型分组的列名 ===")
for dtype in data.dtypes.unique():
    cols = data.select_dtypes(include=[dtype]).columns.tolist()
    print(f"{dtype}: {len(cols)} 列")
    if len(cols) < 10:  # 如果列数不多，显示列名
        print(f"   {cols}")

=== data 表格列类型 ===
ID                        int64
Group                    object
stridetime_ConOg        float64
steptime_ConOg          float64
stridetimevari_ConOg    float64
                         ...   
emg_env_ConOg_SR_TD     float64
emg_env_ConTm_SL_TL     float64
emg_env_ConTm_SL_TD     float64
emg_env_ConTm_SR_TL     float64
emg_env_ConTm_SR_TD     float64
Length: 188, dtype: object

数据形状: (66, 188)

=== 按数据类型分组的列名 ===
int64: 1 列
   ['ID']
object: 3 列
   ['Group', 'walkingspeed_ConOg', 'walkingspeed_ConTm']
float64: 184 列


In [7]:
# 将除Group外的object列转换为float64
object_cols = data.select_dtypes(include=["object"]).columns.tolist()
cols_to_convert = [col for col in object_cols if col != "Group"]

print(f"需要转换的列: {cols_to_convert}")

# 转换数据类型，使用errors='coerce'将无法转换的值设为NaN
for col in cols_to_convert:
    data[col] = pd.to_numeric(data[col], errors="coerce")

# 验证转换结果
print("\n=== 转换后的数据类型 ===")
print(data.dtypes.value_counts())

# 检查是否还有object类型的列（除了Group）
print(
    f"\n剩余的object类型列: {data.select_dtypes(include=['object']).columns.tolist()}"
)


需要转换的列: ['walkingspeed_ConOg', 'walkingspeed_ConTm']

=== 转换后的数据类型 ===
float64    186
int64        1
object       1
Name: count, dtype: int64

剩余的object类型列: ['Group']


In [8]:
# 分出独立的帕金森病人表格
data_PD = data[(data["Group"] == "Parkinson")].copy()

# 将帕金森特征并入帕金森病人表格
data_PD = pd.merge(
    data_PD,
    PD_characteristics[
        ["ID"]
        + [col for col in PD_characteristics.columns if col not in ["ID", "Group"]]
    ],
    on="ID",
    how="outer",
)

# 帕金森评级

根据 H&Y 分级对 PD 病人评级：
- H&Y 为 1：Group 设置为 PD_H&Y1
- H&Y 大于 1 小于 3：Group 设置为 PD_H&Y2

In [9]:
# 检查 HoehnYahrON 列的值分布
print("HoehnYahrON 值分布:")
print(data_PD["HoehnYahrON"].value_counts().sort_index())
print(f"\nHoehnYahrON 列的数据类型: {data_PD['HoehnYahrON'].dtype}")
print(f"缺失值数量: {data_PD['HoehnYahrON'].isnull().sum()}")


# 根据 H&Y 分级对 PD 病人评级
def classify_by_HY(hy_score):
    if pd.isna(hy_score):
        return "PD_Unknown"  # 处理缺失值
    elif hy_score == 1:
        return "PD_H&Y1"
    elif 1 < hy_score < 3:
        return "PD_H&Y2"
    else:
        return "PD_Other"  # 处理其他值（如 ≥3 的情况）


data_PD["Group"] = data_PD["HoehnYahrON"].apply(classify_by_HY)

# 检查分组结果
print("\n=== H&Y 分级结果 ===")
print(data_PD["Group"].value_counts())
print("\n各组的 HoehnYahrON 值范围:")
for group in data_PD["Group"].unique():
    subset = data_PD[data_PD["Group"] == group]["HoehnYahrON"]
    print(f"{group}: {subset.min()} - {subset.max()}")

HoehnYahrON 值分布:
HoehnYahrON
1.0    15
2.0     4
2.5     1
Name: count, dtype: int64

HoehnYahrON 列的数据类型: float64
缺失值数量: 0

=== H&Y 分级结果 ===
Group
PD_H&Y1    15
PD_H&Y2     5
Name: count, dtype: int64

各组的 HoehnYahrON 值范围:
PD_H&Y1: 1.0 - 1.0
PD_H&Y2: 2.0 - 2.5


对 data 中病人进行同样的分级

In [10]:
# 根据 ID 值，用 data_PD 的 Group 列覆盖 data 的 Group 列
# 创建一个映射字典，将 data_PD 中的 ID 和新的 Group 对应起来
pd_group_mapping = dict(zip(data_PD["ID"], data_PD["Group"]))

# 更新 data 中对应 ID 的 Group 值
data["Group"] = data.apply(
    lambda row: pd_group_mapping.get(row["ID"], row["Group"]), axis=1
)

# 验证更新结果
print("=== 更新后的 Group 分布 ===")
print(data["Group"].value_counts())


=== 更新后的 Group 分布 ===
Group
Old        24
Young      22
PD_H&Y1    15
PD_H&Y2     5
Name: count, dtype: int64


# 训练-测试集划分

后续处理都将只处理 data 数据集，不管 data_PD 数据集

In [12]:
from sklearn.model_selection import StratifiedShuffleSplit

# 设置分割器参数
splitter = StratifiedShuffleSplit(
    n_splits=1,  # 只需要一次划分
    test_size=0.2,  # 20%作为测试集
    random_state=15,  # 保证结果可重现
)

# 准备特征和标签
X = data.drop(["ID", "Group"], axis=1)  # 所有特征
y = data["Group"]  # 标签（组别）

# 执行分层划分
train_index, test_index = next(splitter.split(X, y))

# 创建训练集和测试集
X_train = X.iloc[train_index]
X_test = X.iloc[test_index]
y_train = y.iloc[train_index]
y_test = y.iloc[test_index]

# 获取对应的ID
train_ids = data.loc[train_index, "ID"]
test_ids = data.loc[test_index, "ID"]

# 验证分层效果
print("原始数据标签分布:")
print(y.value_counts())
print("\n训练集标签分布:")
print(y_train.value_counts())
print("\n测试集标签分布:")
print(y_test.value_counts())

原始数据标签分布:
Group
Old        24
Young      22
PD_H&Y1    15
PD_H&Y2     5
Name: count, dtype: int64

训练集标签分布:
Group
Old        19
Young      17
PD_H&Y1    12
PD_H&Y2     4
Name: count, dtype: int64

测试集标签分布:
Group
Young      5
Old        5
PD_H&Y1    3
PD_H&Y2    1
Name: count, dtype: int64


# 数据清洗和缩放

In [13]:
# 查看训练集和测试集的缺失值情况
print("=== 训练集缺失值情况 ===")
print(f"训练集形状: {X_train.shape}")
print(f"总缺失值数量: {X_train.isnull().sum().sum()}")
print(
    f"缺失值比例: {X_train.isnull().sum().sum() / (X_train.shape[0] * X_train.shape[1]):.4f}"
)

print("\n=== 测试集缺失值情况 ===")
print(f"测试集形状: {X_test.shape}")
print(f"总缺失值数量: {X_test.isnull().sum().sum()}")
print(
    f"缺失值比例: {X_test.isnull().sum().sum() / (X_test.shape[0] * X_test.shape[1]):.4f}"
)

# 查看有缺失值的列
train_missing = X_train.isnull().sum()
test_missing = X_test.isnull().sum()

print("\n=== 有缺失值的列统计 ===")
print(f"训练集有缺失值的列数: {sum(train_missing > 0)}")
print(f"测试集有缺失值的列数: {sum(test_missing > 0)}")

# 显示缺失值最多的前5列
if sum(train_missing > 0) > 0:
    print("\n训练集缺失值最多的前5列:")
    top_missing_train = (
        train_missing[train_missing > 0].sort_values(ascending=False).head(5)
    )
    for col, count in top_missing_train.items():
        print(f"  {col}: {count} ({count / len(X_train):.3f})")

if sum(test_missing > 0) > 0:
    print("\n测试集缺失值最多的前5列:")
    top_missing_test = (
        test_missing[test_missing > 0].sort_values(ascending=False).head(5)
    )
    for col, count in top_missing_test.items():
        print(f"  {col}: {count} ({count / len(X_test):.3f})")

=== 训练集缺失值情况 ===
训练集形状: (52, 186)
总缺失值数量: 91
缺失值比例: 0.0094

=== 测试集缺失值情况 ===
测试集形状: (14, 186)
总缺失值数量: 85
缺失值比例: 0.0326

=== 有缺失值的列统计 ===
训练集有缺失值的列数: 86
测试集有缺失值的列数: 85

训练集缺失值最多的前5列:
  walkingspeed_ConOg: 5 (0.096)
  walkingspeed_ConTm: 2 (0.038)
  itc_emg_ConTm_SL_FT: 1 (0.019)
  cmc_ConTm_SL_FT: 1 (0.019)
  z_itc_eeg_ConTm_SL_FLB: 1 (0.019)

测试集缺失值最多的前5列:
  walkingspeed_ConOg: 1 (0.071)
  z_itc_eeg_ConTm_SL_FA: 1 (0.071)
  z_itc_eeg_ConTm_SL_FLB: 1 (0.071)
  cmc_ConTm_SL_FLB: 1 (0.071)
  itc_emg_ConTm_SL_FLB: 1 (0.071)


In [14]:
# 简单分析训练集和测试集的离群值情况
def detect_outliers_iqr(data):
    """使用IQR方法检测离群值"""
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # 统计每列的离群值数量
    outliers_count = ((data < lower_bound) | (data > upper_bound)).sum()
    return outliers_count


print("=== 训练集离群值分析 ===")
train_outliers = detect_outliers_iqr(X_train)
train_outliers_nonzero = train_outliers[train_outliers > 0]

print(f"训练集形状: {X_train.shape}")
print(f"有离群值的特征数: {len(train_outliers_nonzero)}")
print(f"总离群值数量: {train_outliers.sum()}")
print(f"离群值比例: {train_outliers.sum() / (X_train.shape[0] * X_train.shape[1]):.4f}")

if len(train_outliers_nonzero) > 0:
    print("\n离群值最多的前5个特征:")
    top_outliers_train = train_outliers_nonzero.sort_values(ascending=False).head(5)
    for col, count in top_outliers_train.items():
        print(f"  {col}: {count} ({count / len(X_train):.3f})")

print("\n=== 测试集离群值分析 ===")
test_outliers = detect_outliers_iqr(X_test)
test_outliers_nonzero = test_outliers[test_outliers > 0]

print(f"测试集形状: {X_test.shape}")
print(f"有离群值的特征数: {len(test_outliers_nonzero)}")
print(f"总离群值数量: {test_outliers.sum()}")
print(f"离群值比例: {test_outliers.sum() / (X_test.shape[0] * X_test.shape[1]):.4f}")

if len(test_outliers_nonzero) > 0:
    print("\n离群值最多的前5个特征:")
    top_outliers_test = test_outliers_nonzero.sort_values(ascending=False).head(5)
    for col, count in top_outliers_test.items():
        print(f"  {col}: {count} ({count / len(X_test):.3f})")

# 按样本统计离群值
print("\n=== 样本层面离群值统计 ===")
train_sample_outliers = (
    (
        X_train
        < X_train.quantile(0.25)
        - 1.5 * (X_train.quantile(0.75) - X_train.quantile(0.25))
    )
    | (
        X_train
        > X_train.quantile(0.75)
        + 1.5 * (X_train.quantile(0.75) - X_train.quantile(0.25))
    )
).sum(axis=1)

test_sample_outliers = (
    (
        X_test
        < X_test.quantile(0.25) - 1.5 * (X_test.quantile(0.75) - X_test.quantile(0.25))
    )
    | (
        X_test
        > X_test.quantile(0.75) + 1.5 * (X_test.quantile(0.75) - X_test.quantile(0.25))
    )
).sum(axis=1)

print("训练集样本离群值统计:")
print(f"  平均每个样本有 {train_sample_outliers.mean():.2f} 个离群特征")
print(f"  最多离群特征数: {train_sample_outliers.max()}")

print("测试集样本离群值统计:")
print(f"  平均每个样本有 {test_sample_outliers.mean():.2f} 个离群特征")
print(f"  最多离群特征数: {test_sample_outliers.max()}")

=== 训练集离群值分析 ===
训练集形状: (52, 186)
有离群值的特征数: 123
总离群值数量: 315
离群值比例: 0.0326

离群值最多的前5个特征:
  cmc_ConTm_SR_FA: 9 (0.173)
  power_eeg_ConTm_SR_FA: 7 (0.135)
  itc_eeg_ConTm_SL_FG: 6 (0.115)
  itc_eeg_ConTm_SR_FG: 6 (0.115)
  cmc_ConOg_SL_FG: 6 (0.115)

=== 测试集离群值分析 ===
测试集形状: (14, 186)
有离群值的特征数: 75
总离群值数量: 105
离群值比例: 0.0403

离群值最多的前5个特征:
  z_itc_emg_ConOg_SR_FT: 5 (0.357)
  itc_emg_ConOg_SR_FT: 5 (0.357)
  power_eeg_ConTm_SR_FT: 3 (0.214)
  power_eeg_ConOg_SL_FA: 3 (0.214)
  emg_env_ConTm_SR_TD: 2 (0.143)

=== 样本层面离群值统计 ===
训练集样本离群值统计:
  平均每个样本有 6.06 个离群特征
  最多离群特征数: 39
测试集样本离群值统计:
  平均每个样本有 7.50 个离群特征
  最多离群特征数: 21


In [15]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler


# 创建预处理管道
def create_preprocessing_pipeline():
    """
    创建包含KNN缺失值填补和标准化的预处理管道
    """
    # 数值特征预处理管道：KNN填补 + 标准化
    numeric_pipeline = Pipeline(
        [("imputer", KNNImputer(n_neighbors=5)), ("scaler", StandardScaler())]
    )

    # 获取所有数值特征列名
    numeric_features = X_train.columns.tolist()

    # 创建ColumnTransformer
    preprocessor = ColumnTransformer(
        transformers=[("numeric", numeric_pipeline, numeric_features)],
        remainder="passthrough",  # 保留其他列（如果有的话）
    )

    return preprocessor


# 创建预处理器
preprocessor = create_preprocessing_pipeline()

# 训练预处理器并转换训练集
X_train_processed = preprocessor.fit_transform(X_train)

# 转换测试集（使用训练集学到的参数）
X_test_processed = preprocessor.transform(X_test)

# 将结果转换回DataFrame格式，保持列名
X_train_clean = pd.DataFrame(
    X_train_processed, columns=X_train.columns, index=X_train.index
)

X_test_clean = pd.DataFrame(
    X_test_processed, columns=X_test.columns, index=X_test.index
)

# 验证预处理结果
print("=== 预处理结果验证 ===")
print(f"训练集形状: {X_train_clean.shape}")
print(f"测试集形状: {X_test_clean.shape}")
print(f"训练集缺失值: {X_train_clean.isnull().sum().sum()}")
print(f"测试集缺失值: {X_test_clean.isnull().sum().sum()}")

# 验证标准化效果
print("\n=== 标准化效果验证 ===")
print("训练集统计:")
print(
    f"  均值范围: [{X_train_clean.mean().min():.6f}, {X_train_clean.mean().max():.6f}]"
)
print(
    f"  标准差范围: [{X_train_clean.std().min():.6f}, {X_train_clean.std().max():.6f}]"
)
print("测试集统计:")
print(f"  均值范围: [{X_test_clean.mean().min():.6f}, {X_test_clean.mean().max():.6f}]")
print(f"  标准差范围: [{X_test_clean.std().min():.6f}, {X_test_clean.std().max():.6f}]")

=== 预处理结果验证 ===
训练集形状: (52, 186)
测试集形状: (14, 186)
训练集缺失值: 0
测试集缺失值: 0

=== 标准化效果验证 ===
训练集统计:
  均值范围: [-0.000000, 0.000000]
  标准差范围: [1.009756, 1.009756]
测试集统计:
  均值范围: [-0.718106, 1.916185]
  标准差范围: [0.142693, 5.663188]


# 数据降维

对特征进行分组后，分别进行降维，这样的主成分生物学意义更清晰

In [None]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, f_classif


# 分析特征分组情况
def analyze_feature_groups(X_train_clean):
    """分析数据中的特征分组"""

    print("=== 特征分组分析 ===")

    all_features = X_train_clean.columns.tolist()

    # 分组策略
    feature_groups = {
        "gait_features": [],  # 所有步态特征
        "spectral_features": [],  # 所有频谱特征
        "temporal_features": [],  # 所有时域特征
    }

    for feature in all_features:
        if any(freq in feature for freq in ["FT", "FA", "FG", "FLB", "FHB"]):
            feature_groups["spectral_features"].append(feature)
        elif any(time in feature for time in ["TL", "TD"]):
            feature_groups["temporal_features"].append(feature)
        elif any(condition in feature for condition in ["ConOg", "ConTm"]):
            feature_groups["gait_features"].append(feature)

    # 显示分组结果
    print("特征分组统计:")
    total_grouped = 0
    for group_name, features in feature_groups.items():
        if features:
            print(f"  {group_name}: {len(features)} 个特征")
            total_grouped += len(features)

    ungrouped = len(all_features) - total_grouped
    print(f"  未分组特征: {ungrouped} 个")
    print(f"  总特征数: {len(all_features)}")

    return feature_groups


# 执行特征分组分析
feature_groups = analyze_feature_groups(X_train_clean)

=== 特征分组分析 ===
特征分组统计:
  gait_features: 18 个特征
  spectral_features: 160 个特征
  temporal_features: 8 个特征
  未分组特征: 0 个
  总特征数: 186


In [None]:
# 实现基于解释方差率的分组降维
def grouped_dimensionality_reduction_by_variance(
    X_train,
    X_test,
    y_train,
    feature_groups,
    target_variance_ratio=0.90,
    min_components_per_group=3,
    max_components_per_group=None,
):
    """
    按特征组别分别进行降维，基于解释方差率确定降维数量

    Parameters:
    -----------
    X_train, X_test : DataFrame
        训练集和测试集特征
    y_train : Series
        训练集标签
    feature_groups : dict
        特征分组字典
    target_variance_ratio : float
        目标解释方差比例 (默认0.90 = 90%)
    min_components_per_group : int
        每组最少保留的主成分数
    max_components_per_group : int or None
        每组最多保留的主成分数 (None表示不限制)
    """

    print("=== 基于解释方差率的分组降维 ===")
    print(f"目标解释方差率: {target_variance_ratio:.1%}")

    reduced_train_parts = []
    reduced_test_parts = []
    reduction_info = {}

    for group_name, features in feature_groups.items():
        if not features:
            continue

        print(f"\n--- 处理 {group_name} ---")
        print(f"原始特征数: {len(features)}")

        # 提取该组特征
        X_train_group = X_train[features]
        X_test_group = X_test[features]

        # 步骤1: 特征选择 (选择与目标变量最相关的特征)
        # 选择前80%的特征，但至少保留10个，最多不超过原特征数
        n_select = min(max(int(len(features) * 0.8), 10), len(features))

        selector = SelectKBest(score_func=f_classif, k=n_select)
        X_train_selected = selector.fit_transform(X_train_group, y_train)
        X_test_selected = selector.transform(X_test_group)

        selected_features = np.array(features)[selector.get_support()]
        print(f"特征选择后: {len(selected_features)} 个特征")

        # 步骤2: 确定PCA组件数量
        # 首先用所有可能的组件训练PCA来分析方差解释情况
        max_possible_components = min(len(selected_features), len(X_train) - 1)

        pca_full = PCA(n_components=max_possible_components)
        pca_full.fit(X_train_selected)

        # 计算累积解释方差比例
        cumulative_variance = np.cumsum(pca_full.explained_variance_ratio_)

        # 找到达到目标解释方差率的组件数
        n_components = np.argmax(cumulative_variance >= target_variance_ratio) + 1

        # 应用约束条件
        if max_components_per_group is not None:
            n_components = min(n_components, max_components_per_group)
        n_components = max(n_components, min_components_per_group)
        n_components = min(n_components, max_possible_components)

        # 实际方差解释率
        actual_variance_ratio = cumulative_variance[n_components - 1]

        print(f"目标方差解释率: {target_variance_ratio:.1%}")
        print(f"实际方差解释率: {actual_variance_ratio:.1%}")
        print(f"选择的主成分数: {n_components}")

        # 步骤3: 使用确定的组件数重新训练PCA
        pca_final = PCA(n_components=n_components)
        X_train_pca = pca_final.fit_transform(X_train_selected)
        X_test_pca = pca_final.transform(X_test_selected)

        # 创建新的列名
        new_columns = [f"{group_name}_PC{i + 1}" for i in range(n_components)]

        # 转换为DataFrame
        train_df = pd.DataFrame(X_train_pca, columns=new_columns, index=X_train.index)
        test_df = pd.DataFrame(X_test_pca, columns=new_columns, index=X_test.index)

        reduced_train_parts.append(train_df)
        reduced_test_parts.append(test_df)

        # 保存降维信息
        reduction_info[group_name] = {
            "original_features": len(features),
            "selected_features": len(selected_features),
            "final_components": n_components,
            "target_variance_ratio": target_variance_ratio,
            "actual_variance_ratio": actual_variance_ratio,
            "individual_variance_ratios": pca_final.explained_variance_ratio_,
            "cumulative_variance_ratios": np.cumsum(
                pca_final.explained_variance_ratio_
            ),
            "selected_feature_names": selected_features.tolist(),
            "feature_scores": selector.scores_[selector.get_support()],
            "compression_ratio": n_components / len(features),
        }

        print(f"特征压缩比: {n_components / len(features):.3f}")

    # 合并所有组的降维结果
    X_train_reduced = pd.concat(reduced_train_parts, axis=1)
    X_test_reduced = pd.concat(reduced_test_parts, axis=1)

    # 计算总体统计信息
    total_original_features = sum(
        [info["original_features"] for info in reduction_info.values()]
    )
    total_final_features = X_train_reduced.shape[1]
    overall_compression = total_final_features / total_original_features

    print("\n=== 分组降维总结 ===")
    print(f"原始总特征数: {total_original_features}")
    print(f"降维后总特征数: {total_final_features}")
    print(f"总体压缩比: {overall_compression:.3f}")
    print(f"总体压缩率: {(1 - overall_compression):.1%}")

    return X_train_reduced, X_test_reduced, reduction_info


# 执行基于解释方差率的分组降维
X_train_reduced, X_test_reduced, group_reduction_info = (
    grouped_dimensionality_reduction_by_variance(
        X_train_clean,
        X_test_clean,
        y_train,
        feature_groups,
        target_variance_ratio=0.90,  # 90%解释方差率
        min_components_per_group=3,  # 每组最少3个主成分
        max_components_per_group=20,  # 每组最多20个主成分
    )
)

=== 基于解释方差率的分组降维 ===
目标解释方差率: 90.0%

--- 处理 gait_features ---
原始特征数: 18
特征选择后: 14 个特征
目标方差解释率: 90.0%
实际方差解释率: 92.2%
选择的主成分数: 5
特征压缩比: 0.278

--- 处理 spectral_features ---
原始特征数: 160
特征选择后: 128 个特征
目标方差解释率: 90.0%
实际方差解释率: 90.6%
选择的主成分数: 20
特征压缩比: 0.125

--- 处理 temporal_features ---
原始特征数: 8
特征选择后: 8 个特征
目标方差解释率: 90.0%
实际方差解释率: 91.4%
选择的主成分数: 4
特征压缩比: 0.500

=== 分组降维总结 ===
原始总特征数: 186
降维后总特征数: 29
总体压缩比: 0.156
总体压缩率: 84.4%


In [18]:
# 最终降维结果总结
print("=== 最终降维结果总结 ===")
print("采用解释方差率: 90%")
print(f"原始总特征数: {X_train_clean.shape[1]}")
print(f"降维后总特征数: {X_train_reduced.shape[1]}")
print(f"特征减少数量: {X_train_clean.shape[1] - X_train_reduced.shape[1]}")
print(f"总体压缩率: {(1 - X_train_reduced.shape[1] / X_train_clean.shape[1]):.1%}")

print("\n各组降维结果:")
for group_name, info in group_reduction_info.items():
    print(f"  {group_name}:")
    print(f"    {info['original_features']} → {info['final_components']} 维")
    print(f"    解释方差率: {info['actual_variance_ratio']:.1%}")
    print(f"    压缩率: {(1 - info['compression_ratio']):.1%}")

print("\n降维后的数据已准备完成，可用于后续建模")
print(f"训练集形状: {X_train_reduced.shape}")
print(f"测试集形状: {X_test_reduced.shape}")

=== 最终降维结果总结 ===
采用解释方差率: 90%
原始总特征数: 186
降维后总特征数: 29
特征减少数量: 157
总体压缩率: 84.4%

各组降维结果:
  gait_features:
    18 → 5 维
    解释方差率: 92.2%
    压缩率: 72.2%
  spectral_features:
    160 → 20 维
    解释方差率: 90.6%
    压缩率: 87.5%
  temporal_features:
    8 → 4 维
    解释方差率: 91.4%
    压缩率: 50.0%

降维后的数据已准备完成，可用于后续建模
训练集形状: (52, 29)
测试集形状: (14, 29)


# 导出数据

In [19]:
import os

# 创建 PreprocessedData 目录（如果不存在）
output_dir = "./PreprocessedData"
os.makedirs(output_dir, exist_ok=True)

# 导出训练集特征数据
print("=== 导出数据到 PreprocessedData 目录 ===")

# 1. 原始训练集特征
X_train.to_csv(os.path.join(output_dir, "X_train.csv"), index=False)
print(f"✓ X_train.csv 已保存 - 形状: {X_train.shape}")

# 2. 清洗后训练集特征
X_train_clean.to_csv(os.path.join(output_dir, "X_train_clean.csv"), index=False)
print(f"✓ X_train_clean.csv 已保存 - 形状: {X_train_clean.shape}")

# 3. 降维后训练集特征
X_train_reduced.to_csv(os.path.join(output_dir, "X_train_reduced.csv"), index=False)
print(f"✓ X_train_reduced.csv 已保存 - 形状: {X_train_reduced.shape}")

# 4. 训练集标签
y_train.to_csv(os.path.join(output_dir, "y_train.csv"), index=False, header=["Group"])
print(f"✓ y_train.csv 已保存 - 长度: {len(y_train)}")

# 5. 原始测试集特征
X_test.to_csv(os.path.join(output_dir, "X_test.csv"), index=False)
print(f"✓ X_test.csv 已保存 - 形状: {X_test.shape}")

# 6. 清洗后测试集特征
X_test_clean.to_csv(os.path.join(output_dir, "X_test_clean.csv"), index=False)
print(f"✓ X_test_clean.csv 已保存 - 形状: {X_test_clean.shape}")

# 7. 降维后测试集特征
X_test_reduced.to_csv(os.path.join(output_dir, "X_test_reduced.csv"), index=False)
print(f"✓ X_test_reduced.csv 已保存 - 形状: {X_test_reduced.shape}")

# 8. 测试集标签
y_test.to_csv(os.path.join(output_dir, "y_test.csv"), index=False, header=["Group"])
print(f"✓ y_test.csv 已保存 - 长度: {len(y_test)}")

# 额外导出ID信息用于追踪
print("\n=== 导出ID信息 ===")

# 9. 训练集ID
train_ids.to_csv(os.path.join(output_dir, "train_ids.csv"), index=False, header=["ID"])
print(f"✓ train_ids.csv 已保存 - 长度: {len(train_ids)}")

# 10. 测试集ID
test_ids.to_csv(os.path.join(output_dir, "test_ids.csv"), index=False, header=["ID"])
print(f"✓ test_ids.csv 已保存 - 长度: {len(test_ids)}")

print(f"\n所有数据文件已成功导出到: {os.path.abspath(output_dir)}")
print("\n文件列表:")
for file in sorted(os.listdir(output_dir)):
    if file.endswith(".csv"):
        file_path = os.path.join(output_dir, file)
        file_size = os.path.getsize(file_path) / 1024  # KB
        print(f"  {file} ({file_size:.1f} KB)")

=== 导出数据到 PreprocessedData 目录 ===
✓ X_train.csv 已保存 - 形状: (52, 186)
✓ X_train_clean.csv 已保存 - 形状: (52, 186)
✓ X_train_reduced.csv 已保存 - 形状: (52, 29)
✓ y_train.csv 已保存 - 长度: 52
✓ X_test.csv 已保存 - 形状: (14, 186)
✓ X_test_clean.csv 已保存 - 形状: (14, 186)
✓ X_test_reduced.csv 已保存 - 形状: (14, 29)
✓ y_test.csv 已保存 - 长度: 14

=== 导出ID信息 ===
✓ train_ids.csv 已保存 - 长度: 52
✓ test_ids.csv 已保存 - 长度: 14

所有数据文件已成功导出到: e:\文档\重要文档\2025.8\剑桥PBL\ML\Diagnosis_PD_MultimodelGaitInfo\PreprocessedData

文件列表:
  X_test.csv (38.3 KB)
  X_test_clean.csv (53.9 KB)
  X_test_reduced.csv (8.4 KB)
  X_train.csv (135.0 KB)
  X_train_clean.csv (189.8 KB)
  X_train_reduced.csv (29.3 KB)
  test_ids.csv (0.1 KB)
  train_ids.csv (0.2 KB)
  y_test.csv (0.1 KB)
  y_train.csv (0.4 KB)
