# Requirements


In [6]:
# 打印代码所用关键包的版本信息
print("\n=== Package Versions ===")
print(f"Python: {os.sys.version}")
print(f"NumPy: {np.__version__}")
print(f"Pandas: {pd.__version__}")
print(f"Scikit-learn: {sklearn.__version__}") 
print(f"XGBoost: {xgboost.__version__}")       


=== Package Versions ===
Python: 3.11.5 (main, Sep 11 2023, 08:31:25) [Clang 14.0.6 ]
NumPy: 1.23.3
Pandas: 2.0.3
Scikit-learn: 1.3.0
XGBoost: 2.1.3


# XGBoost

In [1]:
import os
import re
import numpy as np
import pandas as pd
import sklearn
import xgboost
from typing import Tuple, Dict, List
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    classification_report, accuracy_score, precision_score,
    recall_score, f1_score
)
import warnings
warnings.filterwarnings('ignore')


class GBMSubtypeClassifier:
    """
    改用XGBoost进行GBM亚型分类：
      - 读取临床与甲基化数据
      - 解析短条形码并匹配原发瘤有亚型标签的样本
      - 预处理（缺失率过滤 + 均值填充 + 标准化 + ANOVA特征选择）
      - XGBoost + 分层CV的网格搜索
    """
    def __init__(self, base_dir: str):
        self.base_dir = base_dir
        self.files = {
            "meth450": os.path.join(base_dir, "HumanMethylation450"),
            "clinical": os.path.join(base_dir, "TCGA.GBM.sampleMap_GBM_clinicalMatrix"),
        }
        # 短条形码：例如 TCGA-06-1234-01（最后两位是样本类型，01=Primary Tumor）
        self.short_re = re.compile(r'^(TCGA-[A-Za-z0-9]{2}-[A-Za-z0-9]{4}-(\d{2}))')
        self.pipeline = None
        self.label_encoder = None

    # ---------- 基础工具 ----------
    def _to_sample_short(self, s: str) -> str:
        if not isinstance(s, str):
            return None
        m = self.short_re.match(s.strip())
        return m.group(1) if m else None

    def _is_primary_by_short(self, short_id: str) -> bool:
        if not isinstance(short_id, str):
            return False
        m = self.short_re.match(short_id)
        if not m:
            return False
        return m.group(2) == "01"  # 01=Primary Tumor

    def _read_tsv(self, path: str, nrows=None) -> pd.DataFrame:
        if not os.path.exists(path):
            raise FileNotFoundError(f"文件不存在: {path}")
        return pd.read_csv(path, sep="\t", low_memory=False, nrows=nrows)

    # ---------- 数据读取与匹配 ----------
    def _load_clinical(self) -> pd.DataFrame:
        clin = self._read_tsv(self.files["clinical"])

        # 样本ID列
        sid_col = None
        for col in ["sampleID", "bcr_sample_barcode"]:
            if col in clin.columns:
                sid_col = col
                break
        if sid_col is None:
            raise ValueError("临床数据中未找到样本ID列（sampleID / bcr_sample_barcode）")

        # 亚型列
        subtype_col = None
        for col in ["GeneExp_Subtype", "Subtype_mRNA", "Subtype", "GeneExp_Subtype "]:
            if col in clin.columns:
                subtype_col = col
                break
        if subtype_col is None:
            raise ValueError("临床数据中未找到亚型列（GeneExp_Subtype / Subtype_mRNA 等）")

        # 洁净化：短条形码 + 是否原发
        clin["short_id"]   = clin[sid_col].astype(str).apply(self._to_sample_short)
        clin["is_primary"] = clin["short_id"].apply(self._is_primary_by_short)

        # 规范化亚型标签：去两端空格、首字母大写
        def _norm_subtype(x):
            if pd.isna(x):
                return x
            s = str(x).strip()
            return s[:1].upper() + s[1:].lower() if s else s

        clin["subtype"] = clin[subtype_col].apply(_norm_subtype)

        # 保留：原发 + 有短ID + 有亚型
        valid = clin[(clin["is_primary"]) & (clin["short_id"].notna()) & (clin["subtype"].notna())]
        return valid[["short_id", "subtype"]].drop_duplicates()

    def _load_methylation(self) -> pd.DataFrame:
        """
        返回形状为 (样本, 特征) 的数值矩阵：
          - 自动猜测矩阵朝向：若列名不是TCGA，则看第一列是否是TCGA样本并转置
          - 只保留列名/行值里带TCGA的样本
        """
        df = self._read_tsv(self.files["meth450"])

        # 若首列为注释列（如 probe/gene），设为索引
        first_col_lower = str(df.columns[0]).strip().lower()
        if first_col_lower in {
            "gene", "genes", "hugo", "symbol", "gene symbol",
            "id", "probe", "probes", "composite element ref", "sample"
        }:
            df = df.set_index(df.columns[0])

        # 判断样本是在列还是在行
        def _cols_have_tcga(_df):
            return any(str(c).startswith("TCGA-") for c in _df.columns)

        if not _cols_have_tcga(df):
            # 如果第一列/索引是样本ID
            first_col = df.columns[0]
            if df[first_col].astype(str).str.startswith("TCGA-").any():
                df = df.set_index(first_col).transpose()

        # 只保留TCGA列
        tcga_cols = [c for c in df.columns if str(c).startswith("TCGA-")]
        if len(tcga_cols) == 0:
            # 尝试从索引转列（极端情况）
            if any(str(i).startswith("TCGA-") for i in df.index):
                df = df.transpose()
                tcga_cols = [c for c in df.columns if str(c).startswith("TCGA-")]
        if len(tcga_cols) == 0:
            raise ValueError("甲基化数据中未发现TCGA样本列/行")

        df = df[tcga_cols]
        # 转为数值
        df = df.apply(pd.to_numeric, errors="coerce")
        # 置换：需要 (样本, 特征)
        X = df.transpose().copy()
        X.index.name = "sample_barcode"
        return X

    def _match_on_short_id(self, X_meth: pd.DataFrame, clin: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
        """
        将甲基化样本条形码映射为短条形码，与临床表的 short_id 匹配。
        返回 X(样本x特征) 与 y(样本标签Series，index与X一致)。
        """
        # 从条形码映射到短ID
        short_ids = X_meth.index.to_series().astype(str).apply(self._to_sample_short)
        keep_mask = short_ids.notna()
        X_meth = X_meth.loc[keep_mask].copy()
        short_ids = short_ids.loc[keep_mask]

        # 有些短ID可能重复（同一短ID对应多个测序批次），这里保留首次出现
        # 也可以选择分组取均值：X_meth.groupby(short_ids).mean()
        # 为避免特征数巨大导致 groupby 内存压力，这里采用"首个样本"策略：
        dedup_index = ~short_ids.duplicated()
        X_meth = X_meth.loc[dedup_index]
        short_ids = short_ids.loc[dedup_index]

        # 与临床匹配
        short_to_subtype = dict(zip(clin["short_id"], clin["subtype"]))
        y = short_ids.map(short_to_subtype)

        match_mask = y.notna()
        X_matched = X_meth.loc[match_mask].copy()
        y_matched = y.loc[match_mask].copy()

        # 统一索引为短ID（更直观）
        X_matched.index = y_matched.values  # 先临时放 subtype
        X_matched.insert(0, "short_id", short_ids.loc[match_mask].values)
        # 把短ID设成索引，再把 y 的索引对齐
        X_matched = X_matched.set_index("short_id")
        y_matched.index = X_matched.index

        # 恢复 X 的真实行索引为短ID（而不是subtype）
        # 需要把第一步替换的index还原：
        # 此时 X_matched 的第一列是特征，index 是 short_id，行内没有样本条形码，不影响后续
        return X_matched, y_matched

    # ---------- 数据准备 ----------
    def prepare_data(self, min_samples_per_class: int = 10, max_missing_ratio: float = 0.5) -> Tuple[pd.DataFrame, pd.Series]:
        clin = self._load_clinical()
        X_meth = self._load_methylation()

        # 匹配
        X, y = self._match_on_short_id(X_meth, clin)

        # 过滤类别：至少 min_samples_per_class
        vc = y.value_counts()
        valid_classes = vc[vc >= min_samples_per_class].index.tolist()
        keep = y.isin(valid_classes)
        X, y = X.loc[keep], y.loc[keep]

        # 缺失率过滤（按列=特征）
        miss_ratio = X.isna().mean(axis=0)
        X = X.loc[:, miss_ratio <= max_missing_ratio]

        # 均值填充
        X = X.fillna(X.mean())

        # 标签编码（XGBoost需要数值标签）
        self.label_encoder = LabelEncoder()
        y_encoded = pd.Series(self.label_encoder.fit_transform(y), index=y.index)

        # 重置索引（模型不要依赖 short_id）
        X = X.reset_index(drop=True)
        y_encoded = y_encoded.reset_index(drop=True)

        # 基本信息
        print(f"[INFO] 匹配成功样本数: {len(y_encoded)}")
        print(f"[INFO] 类别分布: {pd.Series(y).value_counts().to_dict()}")
        print(f"[INFO] 编码后类别: {dict(zip(self.label_encoder.classes_, range(len(self.label_encoder.classes_))))}")
        print(f"[INFO] 特征数（缺失率≤{int(max_missing_ratio*100)}%）: {X.shape[1]}")
        return X, y_encoded

    # ---------- 训练与评估 ----------
    def train_and_eval(self, X: pd.DataFrame, y: pd.Series, test_size: float = 0.3, random_state: int = 42) -> Dict:
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state, stratify=y
        )

        # 根据样本规模动态设置 k（不超过特征数/训练样本数的一半）
        k_max = min(500, X_train.shape[1], max(50, X_train.shape[0] // 2))
        k_grid = sorted(set([
            min(50, X_train.shape[1]),
            min(100, X_train.shape[1]),
            min(200, X_train.shape[1], k_max)
        ]))

        # 计算类别权重
        from sklearn.utils.class_weight import compute_class_weight
        classes = np.unique(y_train)
        class_weights = compute_class_weight('balanced', classes=classes, y=y_train)
        sample_weights = np.array([class_weights[i] for i in y_train])

        pipe = Pipeline([
            ("scaler", StandardScaler(with_mean=True, with_std=True)),
            ("kbest", SelectKBest(score_func=f_classif, k=k_grid[-1] if k_grid else 50)),
            ("xgb", XGBClassifier(
                objective='multi:softprob',  # 多分类
                eval_metric='mlogloss',      # 多分类对数损失
                random_state=random_state,
                n_jobs=-1,
                verbosity=0  # 减少输出
            ))
        ])

        # CV 折数 = min(5, 最小类样本数)
        min_per_class = int(pd.Series(y_train).value_counts().min())
        cv_folds = max(2, min(5, min_per_class))
        cv = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=random_state)

        param_grid = {
            "kbest__k": k_grid,
            "xgb__n_estimators": [100, 200],
            "xgb__max_depth": [3, 6, 9],
            "xgb__learning_rate": [0.01, 0.1, 0.2],
            "xgb__subsample": [0.8, 1.0],
            "xgb__colsample_bytree": [0.8, 1.0]
        }

        gs = GridSearchCV(
            pipe, param_grid=param_grid, cv=cv, n_jobs=-1,
            scoring="f1_macro", verbose=1
        )
        gs.fit(X_train, y_train)
        self.pipeline = gs.best_estimator_

        y_pred_train = self.pipeline.predict(X_train)
        y_pred_test  = self.pipeline.predict(X_test)

        # 将数值标签转换回原始标签进行报告
        y_train_orig = self.label_encoder.inverse_transform(y_train)
        y_test_orig = self.label_encoder.inverse_transform(y_test)
        y_pred_train_orig = self.label_encoder.inverse_transform(y_pred_train)
        y_pred_test_orig = self.label_encoder.inverse_transform(y_pred_test)

        metrics = {
            "best_params": gs.best_params_,
            "cv_f1_macro": float(gs.best_score_),
            "train_accuracy": float(accuracy_score(y_train, y_pred_train)),
            "test_accuracy":  float(accuracy_score(y_test,  y_pred_test)),
            "test_precision_macro": float(precision_score(y_test, y_pred_test, average="macro")),
            "test_recall_macro":    float(recall_score(y_test, y_pred_test, average="macro")),
            "test_f1_macro":        float(f1_score(y_test, y_pred_test, average="macro")),
            "test_precision_weighted": float(precision_score(y_test, y_pred_test, average="weighted")),
            "test_recall_weighted":    float(recall_score(y_test, y_pred_test, average="weighted")),
            "test_f1_weighted":        float(f1_score(y_test, y_pred_test, average="weighted")),
        }

        print("\n=== 最优参数 ===")
        print(metrics["best_params"])
        print(f"CV f1_macro: {metrics['cv_f1_macro']:.4f}")

        print("\n=== 测试集指标 ===")
        for k in ["test_accuracy","test_precision_macro","test_recall_macro","test_f1_macro",
                  "test_precision_weighted","test_recall_weighted","test_f1_weighted"]:
            print(f"{k}: {metrics[k]:.4f}")

        print("\n=== 测试集分类报告 ===")
        print(classification_report(y_test_orig, y_pred_test_orig))

    # ---------- 一键运行 ----------
    def run(self, min_samples_per_class: int = 10) -> Dict:
        print("=" * 60)
        print("开始 GBM 亚型识别（DNA甲基化450 + XGBoost）")
        print("=" * 60)
        X, y = self.prepare_data(min_samples_per_class=min_samples_per_class)
        results = self.train_and_eval(X, y)
        print("\n[完成]")
        return results


def main():
    BASE_DIR = "/Volumes/LaCie/data" 
    clf = GBMSubtypeClassifier(BASE_DIR)
    results = clf.run(min_samples_per_class=10)
    return clf, results


if __name__ == "__main__":
    clf, results = main()

开始 GBM 亚型识别（DNA甲基化450 + XGBoost）
[INFO] 匹配成功样本数: 85
[INFO] 类别分布: {'Mesenchymal': 26, 'Classical': 26, 'Proneural': 22, 'Neural': 11}
[INFO] 编码后类别: {'Classical': 0, 'Mesenchymal': 1, 'Neural': 2, 'Proneural': 3}
[INFO] 特征数（缺失率≤50%）: 396058
Fitting 5 folds for each of 144 candidates, totalling 720 fits

=== 最优参数 ===
{'kbest__k': 50, 'xgb__colsample_bytree': 0.8, 'xgb__learning_rate': 0.1, 'xgb__max_depth': 3, 'xgb__n_estimators': 100, 'xgb__subsample': 1.0}
CV f1_macro: 0.5564

=== 测试集指标 ===
test_accuracy: 0.5769
test_precision_macro: 0.5519
test_recall_macro: 0.5387
test_f1_macro: 0.5235
test_precision_weighted: 0.6093
test_recall_weighted: 0.5769
test_f1_weighted: 0.5674

=== 测试集分类报告 ===
              precision    recall  f1-score   support

   Classical       0.64      0.88      0.74         8
 Mesenchymal       0.75      0.38      0.50         8
      Neural       0.25      0.33      0.29         3
   Proneural       0.57      0.57      0.57         7

    accuracy                   