In [1]:
import pandas as pd
import numpy as np
from lifelines import CoxPHFitter
from sklearn.tree import DecisionTreeRegressor, plot_tree
import matplotlib.pyplot as plt

In [3]:
plt.rcParams['font.family'] = 'SimHei' 
plt.rcParams['mathtext.fontset'] = 'stix'  # 使用STIX字体，支持数学符号
plt.rcParams['mathtext.default'] = 'regular'  # 默认数学文本样式

In [4]:
file_path = '附件_降维_XY.xlsx'  
df = pd.read_excel(file_path)

In [None]:
df['time'] = df['检测孕周_天数']
df['event'] = (df['Y染色体浓度'] >= 0.04).astype(int)

covars = ['孕妇BMI', '检测抽血次数', 'T18', '18号染色体的Z值', '检测孕周_天数', 'X染色体浓度' ] 

if covars:
    corr_matrix = df[covars].corr()
    print("\n显著变量间的相关性矩阵:")
    print(corr_matrix.round(3))
    
    # 如果两个变量高度相关（|r| > 0.7）， 去掉其一
    for i in range(len(covars)):
        for j in range(i+1, len(covars)):
            var1, var2 = covars[i], covars[j]
            corr = abs(corr_matrix.loc[var1, var2])
            if corr > 0.7:
                print(f" {var1} 和 {var2} 高度相关 (r={corr:.3f})")

covars = ['孕妇BMI', '检测抽血次数', 'T18', '18号染色体的Z值', 'X染色体浓度' ] # 去掉 孕周 

In [None]:
if covars:
    print("=" * 60)

    cph_multi = CoxPHFitter()
    cph_multi.fit(df[['time', 'event'] + covars], 
                 duration_col='time', event_col='event')
    
    print(cph_multi.summary)

In [None]:
df['预期达标天数'] = cph_multi.predict_median(df[covars])

print(df[['孕妇BMI', '检测抽血次数', 'T18', '18号染色体的Z值', '预期达标天数']].head(10))

In [8]:
output_file = '附件_预期_XY.xlsx'
df.to_excel(output_file, index=False)

In [None]:
from lifelines.utils import concordance_index

c_index = concordance_index(
        event_times=df['time'],
        event_observed=df['event'],
        predicted_scores=df['预期达标天数']
    )
    
print("生存模型评估指标:")
print("=" * 50)
print(f"Concordance Index (C-index): {c_index:.3f}")
print("> 0.5: 优于随机猜测")
print("> 0.7: 模型较好") 
print("> 0.8: 模型优秀")

event_samples = df[df['event'] == 1]
if len(event_samples) > 0:
    mae_events = np.mean(np.abs(event_samples['预期达标天数'] - event_samples['检测孕周_天数']))
    print(f"已达标样本平均绝对误差: {mae_events:.2f} 天")

In [10]:
from sklearn.tree import DecisionTreeRegressor, plot_tree
import matplotlib.pyplot as plt

# 按预期达标时间进行分组
def decision_tree_by_expected_time(df, max_group=9, min_sample=100):
    
    X = df[['孕妇BMI']].values  # BMI分组
    y = df['预期达标天数'].values  # 预期达标天数
    
    tree = DecisionTreeRegressor(
        max_leaf_nodes=max_group,     
        min_samples_leaf=min_sample,           
        random_state=42
    )
    
    tree.fit(X, y)
    
    bmi_thresholds = extract_tree_thresholds(tree)
    df['决策树组别'] = np.digitize(df['孕妇BMI'], bins=bmi_thresholds)
    
    return tree, bmi_thresholds

def extract_tree_thresholds(tree):
    """从决策树提取BMI分割阈值"""
    thresholds = []
    n_nodes = tree.tree_.node_count
    children_left = tree.tree_.children_left
    children_right = tree.tree_.children_right
    threshold = tree.tree_.threshold
    
    for i in range(n_nodes):
        if children_left[i] != children_right[i]:  
            thresholds.append(threshold[i])
    
    return sorted(thresholds)

In [None]:
# 1. 选择最佳分裂准则（推荐方案一）
tree, bmi_thresholds = decision_tree_by_expected_time(df, max_group=9)

print("BMI分割阈值:", bmi_thresholds)
print("分组情况:")
print(df['决策树组别'].value_counts().sort_index())

# 2. 分析每组特征
group_analysis = df.groupby('决策树组别').agg({
    '孕妇BMI': ['min', 'max', 'mean'],
    '预期达标天数': ['mean', 'std']
}).round(2)

print("\n各组特征分析:")
print(group_analysis)

# 3. 可视化决策树
fig, ax = plt.subplots(figsize=(20, 10))
plot_tree(tree, 
          feature_names=['孕妇BMI'], 
          filled=True, 
          rounded=True,
          fontsize=10,
          ax=ax)
plt.title("基于BMI的达标时间决策树", x=0.46, fontdict={"fontsize":18})

for text in ax.texts:
    content = text.get_text()
    
    content = content.replace('samples', '样本数')
    content = content.replace('value', '预期天数')
    content = content.replace('squared_error', '平方误差')
    
    text.set_text(content)

plt.savefig('决策树结构.svg', format='svg', bbox_inches='tight', dpi=400)
plt.show()


In [None]:
plt.figure(figsize=(18, 10))

legend_labels = []
for group in sorted(df['决策树组别'].unique()):
    group_data = df[df['决策树组别'] == group]
    count = len(group_data)
    mean_bmis = group_data['孕妇BMI'].mean()
    legend_labels.append(f'组{group+1} (n={count},平均BMI={mean_bmis:.2f})')
    
    plt.scatter(group_data['孕妇BMI'], group_data['预期达标天数'], 
               alpha=0.6)

for threshold in bmi_thresholds:
    plt.axvline(x=threshold, color='red', linestyle='--', alpha=0.7)
    
plt.xlabel('孕妇BMI')
plt.ylabel('预期达标时间（周）')
plt.title('决策树分组结果', fontdict={"fontsize":18})
plt.legend(legend_labels)
plt.grid(True)
plt.savefig('决策树分组结果.svg', format='svg', bbox_inches='tight', dpi=400)
plt.show()

In [14]:
output_file = "附件_决策树分组_XY.xlsx"
df.to_excel(output_file, index=False)

## 预测时点

In [15]:
def P_false(df, group):
    group_data = df[df['决策树组别'] == int(group)].copy()
    group_data = group_data.sort_values('检测孕周_天数')  
    
    t_values = np.arange(70, 28*7, 7)
    results = {}
    
    for t in t_values:
        samples_up_to_t = group_data[group_data['检测孕周_天数'] <= t]
        total_up_to_t = len(samples_up_to_t)
        
        if total_up_to_t == 0:
            results[t] = 1  # 如果没有样本，设为1,相当于排除掉这个t
        else:
            false_t = len(samples_up_to_t[samples_up_to_t['Y染色体浓度'] < 0.04])
            results[t] = false_t / total_up_to_t  

            results[t] = np.where(results[t]>0.1, results[t], 0.1) # 样本数过少时，拟合的结果相当于增加了隐性风险
    
    return results


def P_late(t):
    if t <= 12*7: 
        return 0
    elif t > 12*7 and t < 28*7:
        return (t - 12*7) / (28*7 - 12*7)
    else:
        return 1 # 28之后已经有很高风险，直接增加大的惩罚
# 我们认为，在早期之后，随着时间增加风险线性增加

In [16]:
def risk(P_false_curves, w1=0.7, w2=0.3):
    """
    计算每个组别的风险函数并找出最优检测时间
    Risk(t) = w1 * P_false(t) + w2 * P_late(t)
    """
    risk_results = {}
    optimal_times = {}
    
    for group, p_false_curve in P_false_curves.items():
        group_risk = {}
        
        for t, p_false in p_false_curve.items():
            risk = w1 * p_false + w2 * P_late(t)
            group_risk[t] = risk
        
        min_risk_t = min(group_risk.items(), key=lambda x: x[1])[0]
        min_risk_value = group_risk[min_risk_t]
        
        risk_results[group] = group_risk
        optimal_times[group] = (min_risk_t, min_risk_value)
    
    return risk_results, optimal_times

In [None]:
df = pd.read_excel(output_file)

P_false_curves = {}
for group in df['决策树组别'].unique():
    P_false_curves[group] = P_false(df, group)

P_false_curves

In [None]:
def calculate_all_optimal_results(P_false_curves, weight_combinations):
    """
    计算所有权重组合在各个组的最小risk和最佳检测时间
    """
    results = []
    
    for w1, w2 in weight_combinations:
        _, optimal_times = risk(P_false_curves, w1, w2)
        
        for group, (best_t, min_risk) in optimal_times.items():
            results.append({
                '权重组合': f'w1={w1}, w2={w2}',
                '组别': group,
                '最佳检测时间(周)': best_t,
                '最小风险值': min_risk,
                'w1': w1,
                'w2': w2
            })
    
    results_df = pd.DataFrame(results)
    
    return results_df

In [None]:
import pandas as pd
import numpy as np

def calculate_P_false_P_late_for_all_groups(df, P_false_curves):
    groups = sorted(df['决策树组别'].unique())
    t_values = np.arange(70, 7*28, 7)
    
    results = []
    
    for group in groups:
        for t in t_values:
            # 计算P_false
            p_false = P_false_curves.get(group, {}).get(t, np.nan)
            
            # 计算P_late
            p_late = P_late(t)
            
            results.append({
                '决策树组别': group,
                '孕周t(周)': t/7,
                'P_false': p_false,
                'P_late': p_late
            })
    
    # 转换为DataFrame
    results_df = pd.DataFrame(results)
    
    return results_df

# 计算所有组别的P_false和P_late
all_results_df = calculate_P_false_P_late_for_all_groups(df, P_false_curves)

# 保存到Excel文件
excel_filename = '决策树组别_P_false_P_late_详细数据.xlsx'
with pd.ExcelWriter(excel_filename, engine='openpyxl') as writer:
    # 保存完整数据
    all_results_df.to_excel(writer, sheet_name='完整数据', index=False)
    
    # 按组别分别保存到不同工作表
    for group in all_results_df['决策树组别'].unique():
        group_df = all_results_df[all_results_df['决策树组别'] == group]
        sheet_name = f'决策树组别{group}'
        group_df.to_excel(writer, sheet_name=sheet_name, index=False)
    
    # 创建汇总表（透视表格式）
    pivot_false = all_results_df.pivot_table(
        index='孕周t(周)', 
        columns='决策树组别', 
        values='P_false',
        aggfunc='first'
    )
    pivot_late = all_results_df.pivot_table(
        index='孕周t(周)', 
        columns='决策树组别', 
        values='P_late',
        aggfunc='first'
    )
    
    pivot_false.to_excel(writer, sheet_name='P_false_汇总')
    pivot_late.to_excel(writer, sheet_name='P_late_汇总')

print(f"数据已保存到: {excel_filename}")
print(f"总数据行数: {len(all_results_df)}")
print(f"组别数量: {len(all_results_df['决策树组别'].unique())}")
print(f"孕周范围: {all_results_df['孕周t(周)'].min()} - {all_results_df['孕周t(周)'].max()} 周")

# 显示数据预览
print("\n数据预览:")
print(all_results_df.head(10))

# 显示每个组别的统计信息
print("\n各組別P_false統計:")
for group in all_results_df['决策树组别'].unique():
    group_data = all_results_df[all_results_df['决策树组别'] == group]
    avg_p_false = group_data['P_false'].mean()
    max_p_false = group_data['P_false'].max()
    min_p_false = group_data['P_false'].min()
    print(f"组别 {group}: 平均P_false = {avg_p_false:.4f}, 最大值 = {max_p_false:.4f}, 最小值 = {min_p_false:.4f}")

## 熵权

In [None]:
def risk(P_false_curves, weights_list):
    risk_results = {}
    optimal_times = {}
    
    for i, (group, p_false_curve) in enumerate(P_false_curves.items()):
            
        w1, w2 = weights_list[i]
        group_risk = {}
        
        for t, p_false in p_false_curve.items():
            risk_value = w1 * p_false + w2 * P_late(t)
            group_risk[t] = risk_value
    
        min_risk_t = min(group_risk.items(), key=lambda x: x[1])[0]
        min_risk_value = group_risk[min_risk_t]
        
        risk_results[group] = {
            'risk_curve': group_risk,
            'weights': (w1, w2),
            'optimal_time': min_risk_t,
            'min_risk': min_risk_value
        }
        optimal_times[group] = (min_risk_t, min_risk_value)
    
    return risk_results, optimal_times

weights_list = [(0.7834,0.2166),(0.8129,0.1871),(0.8360,0.1640),(0.8802,0.1198),
                (0.7436,0.2564),(0.8409,0.1591),(0.7665,0.2335),(0.5666,0.4334)]

risk_results, optimal_times = risk(P_false_curves, weights_list)

print("各组最优检测时间和最小风险:")
print("=" * 50)
for group, (optimal_time, min_risk) in optimal_times.items():
    w1, w2 = risk_results[group]['weights']
    print(f"{group}: 最优检测时间 = {optimal_time}天({optimal_time/7}周), 最小风险 = {min_risk:.4f}")
    print(f"    权重: w1 = {w1:.4f}, w2 = {w2:.4f}")
    print("-" * 50)
