# 首先导入数据集

In [1]:
import os
import h5py
import numpy as np
import pandas as pd
import torch

from torch.utils.data import Dataset

class SurvivalDataset(Dataset):
    ''' The dataset class performs loading data from .h5 file. '''
    def __init__(self, h5_file, is_train):
        ''' Loading data from .h5 file based on (h5_file, is_train).

        :param h5_file: (String) the path of .h5 file
        :param is_train: (bool) which kind of data to be loaded?
                is_train=True: loading train data
                is_train=False: loading test data
        '''
        # loads data
        self.X, self.e, self.y = \
            self._read_h5_file(h5_file, is_train)
        # normalizes data
        self._normalize()

        print('=> load {} samples'.format(self.X.shape[0]))

    def _read_h5_file(self, h5_file, is_train):
        ''' The function to parsing data from .h5 file.

        :return X: (np.array) (n, m)
            m is features dimension.
        :return e: (np.array) (n, 1)
            whether the event occurs? (1: occurs; 0: others)
        :return y: (np.array) (n, 1)
            the time of event e.
        '''
        split = 'train' if is_train else 'test'
        with h5py.File(h5_file, 'r') as f:
            X = f[split]['x'][()]
            e = f[split]['e'][()].reshape(-1, 1)
            y = f[split]['t'][()].reshape(-1, 1)
        return X, e, y

    def _normalize(self):
        ''' Performs normalizing X data. '''
        self.X = (self.X-self.X.min(axis=0)) / (self.X.max(axis=0)-self.X.min(axis=0))

    def __getitem__(self, item):
        ''' Performs constructing torch.Tensor object'''
        # gets data with index of item
        X_item = self.X[item] # (m)
        e_item = self.e[item] # (1)
        y_item = self.y[item] # (1)
        # constructs torch.Tensor object
        X_tensor = torch.from_numpy(X_item)
        e_tensor = torch.from_numpy(e_item)
        y_tensor = torch.from_numpy(y_item)
        return X_tensor, y_tensor, e_tensor

    def __len__(self):
        return self.X.shape[0]

In [2]:
from torch.utils.data import DataLoader

# 定义数据文件路径
h5_file = './data/metabric/metabric_IHC4_clinical_train_test.h5'


# 创建训练集数据集实例
train_dataset = SurvivalDataset(h5_file, is_train=True)
test_dataset = SurvivalDataset(h5_file, is_train=False)
# 可选：如果需要，你可以查看数据集的长度
print("Training dataset length:", len(train_dataset))

# 可以通过索引访问数据集中的数据
# 假设想访问第一个样本的数据
X_sample, y_sample, e_sample = train_dataset[0]

# 打印样本数据的形状（假设m为特征的维度）
print("X_sample shape:", X_sample.shape)  # 应该是 (m,)
print("y_sample shape:", y_sample.shape)  # 应该是 (1,)
print("e_sample shape:", e_sample.shape)  # 应该是 (1,)


=> load 1523 samples
=> load 381 samples
Training dataset length: 1523
X_sample shape: torch.Size([9])
y_sample shape: torch.Size([1])
e_sample shape: torch.Size([1])


## train_loader

In [3]:
# 定义批次大小（batch size）
#batch_size = 32

# 创建训练集数据加载器
train_loader = DataLoader(train_dataset, batch_size=train_dataset.__len__(), shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=test_dataset.__len__(), shuffle=True)
# 遍历数据加载器中的每一个批次
for batch_idx, (X_batch, y_batch, e_batch) in enumerate(train_loader):
    # 在这里执行训练代码，例如：
    # optimizer.zero_grad()
    # outputs = model(X_batch)
    # loss = criterion(outputs, y_batch, e_batch)
    # loss.backward()
    # optimizer.step()
    
    # 可以根据需要打印每个批次的数据形状
    print(f"Batch {batch_idx}:")
    print("X_batch shape:", X_batch.shape)  # 应该是 (batch_size, m)
    print("y_batch shape:", y_batch.shape)  # 应该是 (batch_size, 1)
    print("e_batch shape:", e_batch.shape)  # 应该是 (batch_size, 1)

Batch 0:
X_batch shape: torch.Size([1523, 9])
y_batch shape: torch.Size([1523, 1])
e_batch shape: torch.Size([1523, 1])


## test_loader

In [4]:
for batch_idx, (X_test_batch, y_test_batch, e_test_batch) in enumerate(test_loader):
    # 在这里执行训练代码，例如：
    # optimizer.zero_grad()
    # outputs = model(X_batch)
    # loss = criterion(outputs, y_batch, e_batch)
    # loss.backward()
    # optimizer.step()
    
    # 可以根据需要打印每个批次的数据形状
    print(f"Batch {batch_idx}:")
    print("X_batch shape:", X_test_batch.shape)  # 应该是 (batch_size, m)
    print("y_batch shape:", y_test_batch.shape)  # 应该是 (batch_size, 1)
    print("e_batch shape:", e_test_batch.shape)  # 应该是 (batch_size, 1)

Batch 0:
X_batch shape: torch.Size([381, 9])
y_batch shape: torch.Size([381, 1])
e_batch shape: torch.Size([381, 1])


## C_index

In [5]:
from lifelines.utils import concordance_index
def c_index(risk_pred, y, e):
    ''' Performs calculating c-index

    :param risk_pred: (np.ndarray or torch.Tensor) model prediction   模型预测
    :param y: (np.ndarray or torch.Tensor) the times of event    事件e的时间
    :param e: (np.ndarray or torch.Tensor) flag that records whether the event occurs   标记，记录事件是否发生
    :return c_index: the c_index is calculated by (risk_pred, y, e)   返回计算的c指数
    '''
    if not isinstance(y, np.ndarray):
        y = y.detach().cpu().numpy()
    if not isinstance(risk_pred, np.ndarray):
        risk_pred = risk_pred.detach().cpu().numpy()
    if not isinstance(e, np.ndarray):
        e = e.detach().cpu().numpy()
    return concordance_index(y, risk_pred, e)  # 直接存在计算c指数的函数

## 随机生存森林

随机生存森林预测的结果是生存时间，所以在进行C_index预测的时候，为了和risk_predict区别开来需要理解为 生存时间越长 所对应的生存风险越小。所以实际上这里不需要加负号，但是在deepSurv中需要给数据加上负号来得到对应于生存时间的生存风险

In [6]:
import numpy as np
import pandas as pd
from sksurv.ensemble import RandomSurvivalForest
from sksurv.util import Surv

# 假设你已经有了X_batch, y_batch和e_batch
# 将数据从张量转换为numpy数组
X_batch_np = X_batch.numpy()
y_batch_np = y_batch.numpy().flatten()
e_batch_np = e_batch.numpy().flatten()

# 创建生存数据结构
# Surv函数接受两个参数：事件指示（事件发生为True，未发生为False）和生存时间
surv_data = Surv.from_arrays(event=e_batch_np, time=y_batch_np)

# 将特征矩阵转换为DataFrame
X_batch_df = pd.DataFrame(X_batch_np)

# 训练RSF模型
rsf = RandomSurvivalForest(n_estimators=100, min_samples_split=10, min_samples_leaf=15, n_jobs=-1, random_state=42)
rsf.fit(X_batch_df, surv_data)

# 打印模型信息
print(rsf)

RandomSurvivalForest(min_samples_leaf=15, min_samples_split=10, n_jobs=-1,
                     random_state=42)


In [7]:
# 将测试数据从张量转换为numpy数组
X_test_batch_np = X_test_batch.numpy()

# 将特征矩阵转换为DataFrame
X_test_batch_df = pd.DataFrame(X_test_batch_np)

# 使用训练好的RSF模型进行预测
# predict_survival_function 返回的是一个生成器
surv_funcs = rsf.predict_survival_function(X_test_batch_df)

# 定义你希望使用的生存概率阈值
p = 0.5

# 计算每个样本的指定生存概率时的生存时间
# 例如，生存概率为p时的时间点
specified_surv_times = np.array([np.interp(p, sf.y[::-1], sf.x[::-1]) for sf in surv_funcs])
tensor = torch.tensor(specified_surv_times, dtype=torch.float32)
valid_c = c_index(specified_surv_times,y_test_batch,e_test_batch)
valid_c

0.6458527300188277

## COX比例风险模型

In [8]:
import numpy as np
import pandas as pd
from sksurv.linear_model import CoxPHSurvivalAnalysis
from sksurv.util import Surv
from sksurv.metrics import concordance_index_censored
from sksurv.nonparametric import kaplan_meier_estimator
import matplotlib.pyplot as plt

# 将数据从张量转换为numpy数组
X_batch_np = X_batch.numpy()
y_batch_np = y_batch.numpy().flatten()
e_batch_np = e_batch.numpy().flatten().astype(bool)  # 将事件指示转换为布尔类型

# 创建生存数据结构
surv_data = Surv.from_arrays(event=e_batch_np, time=y_batch_np)

# 将特征矩阵转换为DataFrame
X_batch_df = pd.DataFrame(X_batch_np)

# 训练Cox比例风险模型
cox = CoxPHSurvivalAnalysis()
cox.fit(X_batch_df, surv_data)

# 将测试数据从张量转换为numpy数组
X_test_batch_np = X_test_batch.numpy()
y_test_batch_np = y_test_batch.numpy().flatten()
e_test_batch_np = e_test_batch.numpy().flatten().astype(bool)  # 将事件指示转换为布尔类型

# 将特征矩阵转换为DataFrame
X_test_batch_df = pd.DataFrame(X_test_batch_np)

# 使用训练好的Cox模型进行预测
predicted_survival = cox.predict_survival_function(X_test_batch_df)

# 选择生存概率阈值 p
p = 0.5

# 计算每个样本在指定生存概率时的生存时间
specified_surv_times = np.array([np.interp(p, sf.y[::-1], sf.x[::-1]) for sf in predicted_survival])

# 计算C-index
c_index = concordance_index_censored(e_test_batch_np, y_test_batch_np, -specified_surv_times)
print("C-index:", c_index[0])

C-index: 0.6335216104938655


## SVM

In [9]:
import numpy as np
import pandas as pd
from sksurv.svm import FastSurvivalSVM
from sksurv.util import Surv
from sksurv.metrics import concordance_index_censored

# 假设你已经有了X_batch, y_batch和e_batch
# 将数据从张量转换为numpy数组
X_batch_np = X_batch.numpy()
y_batch_np = y_batch.numpy().flatten()
e_batch_np = e_batch.numpy().flatten().astype(bool)  # 将事件指示转换为布尔类型

# 创建生存数据结构
surv_data = Surv.from_arrays(event=e_batch_np, time=y_batch_np)

# 训练FastKernelSurvivalSVM模型，选择多项式核
svm = FastSurvivalSVM()
svm.fit(X_batch_np, surv_data)

# 将测试数据从张量转换为numpy数组
X_test_batch_np = X_test_batch.numpy()
y_test_batch_np = y_test_batch.numpy().flatten()
e_test_batch_np = e_test_batch.numpy().flatten().astype(bool)  # 将事件指示转换为布尔类型
# 使用训练好的模型进行预测
predicted_risk = svm.predict(X_test_batch_np)

# 计算C-index
c_index = concordance_index_censored(e_test_batch_np, y_test_batch_np, predicted_risk)
print("C-index:", c_index[0])


C-index: 0.641756150042414
