# DeepSurv深度学习生存分析模型

本notebook实现了DeepSurv深度神经网络模型，用于癌症患者生存分析预测。DeepSurv是基于Cox比例风险模型的深度学习扩展，能够捕获非线性特征交互。

## 目标
- 构建和训练DeepSurv深度学习模型
- 评估模型性能并与传统方法对比
- 分析模型的预测能力和特征重要性

## 1. 导入库和加载数据

In [None]:
# 基础库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# 深度学习库
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F

# 生存分析库
from lifelines.utils import concordance_index
from lifelines import CoxPHFitter
from sksurv.metrics import concordance_index_censored

# 科学计算库
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold
import pickle
from pathlib import Path
import os

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

# 检查CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

print("库导入完成！")

In [None]:
# 加载预处理后的数据
data_dir = Path('../data/processed')

# 加载训练和测试数据
train_data = pd.read_csv(data_dir / 'train_data.csv')
test_data = pd.read_csv(data_dir / 'test_data.csv')

# 加载预处理器
with open(data_dir / 'preprocessors.pkl', 'rb') as f:
    preprocessors = pickle.load(f)

feature_columns = preprocessors['feature_columns']

print(f"训练数据形状: {train_data.shape}")
print(f"测试数据形状: {test_data.shape}")
print(f"特征数量: {len(feature_columns)}")

# 准备训练数据
X_train = train_data[feature_columns].values
y_train_duration = train_data['Duration'].values
y_train_event = train_data['Event'].values

X_test = test_data[feature_columns].values
y_test_duration = test_data['Duration'].values
y_test_event = test_data['Event'].values

print(f"训练集事件率: {y_train_event.mean():.2%}")
print(f"测试集事件率: {y_test_event.mean():.2%}")

# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train)
y_train_duration_tensor = torch.FloatTensor(y_train_duration)
y_train_event_tensor = torch.FloatTensor(y_train_event)

X_test_tensor = torch.FloatTensor(X_test)
y_test_duration_tensor = torch.FloatTensor(y_test_duration)
y_test_event_tensor = torch.FloatTensor(y_test_event)

print("数据加载完成！")

## 2. DeepSurv模型定义

In [None]:
class DeepSurv(nn.Module):
    """
    DeepSurv模型实现
    基于Cox比例风险模型的深度神经网络
    """
    def __init__(self, input_dim, hidden_dims=[64, 32, 16], dropout_rate=0.3):
        super(DeepSurv, self).__init__()
        
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.dropout_rate = dropout_rate
        
        # 构建网络层
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(hidden_dim))
            layers.append(nn.Dropout(dropout_rate))
            prev_dim = hidden_dim
        
        # 输出层（风险得分）
        layers.append(nn.Linear(prev_dim, 1))
        
        self.network = nn.Sequential(*layers)
        
        # 初始化权重
        self._initialize_weights()
    
    def _initialize_weights(self):
        """权重初始化"""
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        """前向传播"""
        return self.network(x)
    
    def predict_risk(self, x):
        """预测风险得分"""
        self.eval()
        with torch.no_grad():
            if isinstance(x, np.ndarray):
                x = torch.FloatTensor(x)
            risk_score = self.forward(x)
            return risk_score.cpu().numpy()

# 创建模型实例
input_dim = X_train.shape[1]
model = DeepSurv(input_dim=input_dim, 
                hidden_dims=[128, 64, 32], 
                dropout_rate=0.3)

model = model.to(device)

print(f"DeepSurv模型结构:")
print(model)
print(f"\\n模型参数数量: {sum(p.numel() for p in model.parameters()):,}")

## 3. Cox部分似然损失函数

In [None]:
def negative_log_partial_likelihood(risk_scores, durations, events):
    """
    计算Cox模型的负对数部分似然损失
    
    Args:
        risk_scores: 模型预测的风险得分 [batch_size, 1]
        durations: 生存时间 [batch_size]
        events: 事件指示器 [batch_size]
    
    Returns:
        negative log partial likelihood
    """
    # 确保risk_scores是二维的
    if risk_scores.dim() == 2:
        risk_scores = risk_scores.view(-1)
    
    # 只考虑发生事件的样本
    event_mask = events.bool()
    if event_mask.sum() == 0:
        return torch.tensor(0.0, requires_grad=True)
    
    event_times = durations[event_mask]
    event_risks = risk_scores[event_mask]
    
    # 计算部分似然
    log_likelihood = 0.0
    
    for i, (time_i, risk_i) in enumerate(zip(event_times, event_risks)):
        # 风险集：所有在时间time_i时仍然在观察中的样本
        at_risk_mask = durations >= time_i
        at_risk_risks = risk_scores[at_risk_mask]
        
        if len(at_risk_risks) > 0:
            # 计算log-sum-exp以避免数值不稳定
            log_sum_exp_risks = torch.logsumexp(at_risk_risks, dim=0)
            log_likelihood += risk_i - log_sum_exp_risks
    
    # 返回负对数似然
    return -log_likelihood / event_mask.sum()

def cox_loss(risk_scores, durations, events):
    """包装的Cox损失函数"""
    return negative_log_partial_likelihood(risk_scores, durations, events)

# 测试损失函数
test_risks = torch.randn(10, 1, requires_grad=True)
test_durations = torch.rand(10) * 100
test_events = torch.randint(0, 2, (10,)).float()

test_loss = cox_loss(test_risks, test_durations, test_events)
print(f"测试损失值: {test_loss.item():.4f}")
print("Cox损失函数定义完成！")

## 4. 模型训练

In [None]:
# 训练参数
BATCH_SIZE = 128
LEARNING_RATE = 0.001
NUM_EPOCHS = 200
PATIENCE = 20

# 优化器和学习率调度器
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=10, factor=0.5)

# 数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_duration_tensor, y_train_event_tensor)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

# 训练历史记录
train_losses = []
train_c_indices = []
val_c_indices = []
best_c_index = 0
patience_counter = 0

print("开始训练DeepSurv模型...")
print(f"训练参数: Batch Size={BATCH_SIZE}, Learning Rate={LEARNING_RATE}, Epochs={NUM_EPOCHS}")

for epoch in range(NUM_EPOCHS):
    model.train()
    epoch_losses = []
    
    for batch_idx, (batch_x, batch_duration, batch_event) in enumerate(train_loader):
        batch_x = batch_x.to(device)
        batch_duration = batch_duration.to(device)
        batch_event = batch_event.to(device)
        
        # 前向传播
        optimizer.zero_grad()
        risk_scores = model(batch_x)
        
        # 计算损失
        loss = cox_loss(risk_scores, batch_duration, batch_event)
        
        # 反向传播
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        epoch_losses.append(loss.item())
    
    # 计算训练集C-index
    model.eval()
    with torch.no_grad():
        train_risks = model(X_train_tensor.to(device)).cpu().numpy().flatten()
        train_c_index = concordance_index(y_train_duration, -train_risks, y_train_event)
        
        # 计算验证集C-index
        val_risks = model(X_test_tensor.to(device)).cpu().numpy().flatten()
        val_c_index = concordance_index(y_test_duration, -val_risks, y_test_event)
    
    # 记录训练历史
    avg_loss = np.mean(epoch_losses)
    train_losses.append(avg_loss)
    train_c_indices.append(train_c_index)
    val_c_indices.append(val_c_index)
    
    # 学习率调度
    scheduler.step(avg_loss)
    
    # 早停检查
    if val_c_index > best_c_index:
        best_c_index = val_c_index
        patience_counter = 0
        # 保存最佳模型
        torch.save(model.state_dict(), '../model/deepsurv_best.pth')
    else:
        patience_counter += 1
    
    # 打印进度
    if (epoch + 1) % 20 == 0 or epoch == 0:
        print(f'Epoch [{epoch+1}/{NUM_EPOCHS}], Loss: {avg_loss:.4f}, '
              f'Train C-index: {train_c_index:.4f}, Val C-index: {val_c_index:.4f}')
    
    # 早停
    if patience_counter >= PATIENCE:
        print(f'早停于第 {epoch+1} 轮，最佳验证C-index: {best_c_index:.4f}')
        break

print(f"\\n训练完成！最佳验证C-index: {best_c_index:.4f}")

## 5. 训练过程可视化

In [None]:
# 创建文件夹
os.makedirs('../model', exist_ok=True)

# 可视化训练过程
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# 损失曲线
axes[0].plot(train_losses, label='Training Loss', color='blue')
axes[0].set_title('训练损失曲线')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Cox Partial Likelihood Loss')
axes[0].legend()
axes[0].grid(True)

# C-index曲线
axes[1].plot(train_c_indices, label='Training C-index', color='blue')
axes[1].plot(val_c_indices, label='Validation C-index', color='red')
axes[1].set_title('C-index变化曲线')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('C-index')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.savefig('../reports/deepsurv_training_curves.png', dpi=300, bbox_inches='tight')
plt.show()

# 打印训练摘要
print(f"训练摘要:")
print(f"总训练轮数: {len(train_losses)}")
print(f"最终训练损失: {train_losses[-1]:.4f}")
print(f"最终训练C-index: {train_c_indices[-1]:.4f}")
print(f"最佳验证C-index: {best_c_index:.4f}")
print(f"最终学习率: {scheduler.get_last_lr()[0]:.6f}")

## 6. 模型评估和预测

In [None]:
# 加载最佳模型
model.load_state_dict(torch.load('../model/deepsurv_best.pth'))
model.eval()

# 生成预测
with torch.no_grad():
    train_risk_scores = model(X_train_tensor.to(device)).cpu().numpy().flatten()
    test_risk_scores = model(X_test_tensor.to(device)).cpu().numpy().flatten()

# 计算评估指标
train_c_index_final = concordance_index(y_train_duration, -train_risk_scores, y_train_event)
test_c_index_final = concordance_index(y_test_duration, -test_risk_scores, y_test_event)

print(f"DeepSurv模型最终评估结果:")
print(f"训练集C-index: {train_c_index_final:.4f}")
print(f"测试集C-index: {test_c_index_final:.4f}")

# 风险分层分析
def create_risk_groups(risk_scores, n_groups=3):
    """创建风险分组"""
    quantiles = np.quantile(risk_scores, np.linspace(0, 1, n_groups + 1))
    risk_groups = np.digitize(risk_scores, quantiles[1:-1])
    return risk_groups

# 在测试集上进行风险分层
test_risk_groups = create_risk_groups(test_risk_scores, n_groups=3)

# 计算各风险组的统计信息
risk_stats = []
for group in range(3):
    mask = test_risk_groups == group
    group_durations = y_test_duration[mask]
    group_events = y_test_event[mask]
    
    stats = {
        'Group': f'Group {group}',
        'Sample_Count': mask.sum(),
        'Event_Rate': group_events.mean(),
        'Median_Duration': np.median(group_durations),
        'Mean_Risk_Score': test_risk_scores[mask].mean()
    }
    risk_stats.append(stats)

risk_stats_df = pd.DataFrame(risk_stats)
print(f"\\n风险分层统计:")
display(risk_stats_df)

# 可视化风险得分分布
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 风险得分分布
axes[0].hist(test_risk_scores, bins=30, alpha=0.7, color='skyblue')
axes[0].set_title('测试集风险得分分布')
axes[0].set_xlabel('风险得分')
axes[0].set_ylabel('频数')
axes[0].axvline(np.mean(test_risk_scores), color='red', linestyle='--', label='平均值')
axes[0].legend()

# 风险得分与生存时间关系
scatter = axes[1].scatter(test_risk_scores, y_test_duration, 
                         c=y_test_event, alpha=0.6, cmap='RdYlBu')
axes[1].set_title('风险得分与生存时间关系')
axes[1].set_xlabel('风险得分')
axes[1].set_ylabel('生存时间（月）')
plt.colorbar(scatter, ax=axes[1], label='事件发生(1=死亡, 0=存活)')

plt.tight_layout()
plt.savefig('../reports/deepsurv_risk_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# 保存预测结果
results_data = {
    'DeepSurv_Train_CIndex': train_c_index_final,
    'DeepSurv_Test_CIndex': test_c_index_final,
    'DeepSurv_Train_Risks': train_risk_scores,
    'DeepSurv_Test_Risks': test_risk_scores,
    'Risk_Groups_Stats': risk_stats_df
}

# 保存详细结果
pd.DataFrame({
    'PatientID': test_data['PatientID'] if 'PatientID' in test_data.columns else range(len(test_risk_scores)),
    'Duration': y_test_duration,
    'Event': y_test_event,
    'Risk_Score': test_risk_scores,
    'Risk_Group': test_risk_groups
}).to_csv('../data/processed/deepsurv_predictions.csv', index=False)

# 保存模型总结
model_summary = {
    'Model': 'DeepSurv',
    'Architecture': str(model.hidden_dims),
    'Input_Dim': input_dim,
    'Parameters': sum(p.numel() for p in model.parameters()),
    'Training_Epochs': len(train_losses),
    'Best_Val_CIndex': best_c_index,
    'Final_Test_CIndex': test_c_index_final,
    'Training_Time': 'Not recorded'  # 可以添加训练时间记录
}

with open('../reports/deepsurv_summary.txt', 'w', encoding='utf-8') as f:
    f.write("DeepSurv模型总结报告\\n")
    f.write("=" * 30 + "\\n\\n")
    for key, value in model_summary.items():
        f.write(f"{key}: {value}\\n")
    f.write("\\n风险分层统计:\\n")
    f.write(risk_stats_df.to_string(index=False))

print("DeepSurv模型训练和评估完成！")
print(f"模型文件保存至: ../model/deepsurv_best.pth")
print(f"预测结果保存至: ../data/processed/deepsurv_predictions.csv")
print(f"模型总结保存至: ../reports/deepsurv_summary.txt")