In [None]:
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
%matplotlib inline
from matplotlib import font_manager as fm, rcParams
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS']

In [None]:
import pandas as pd
import numpy as np
from sksurv.ensemble import RandomSurvivalForest
from sksurv.metrics import concordance_index_censored
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_curve, auc

geo_path = "./dataset/GEO_clinical_genes.xlsx"
tcga_path = "./dataset/TCGA_clinical_genes.xlsx"
df = pd.read_excel(geo_path)

df['event'] = df['CSS'].apply(lambda x: 1 if x == 'Dead' else 0)  
df.drop(columns=['CSS'], inplace=True) 

covariates = list(df.columns)
covariates.remove('Survival_months') 
covariates.remove('event')  

categorical_vars = df.select_dtypes(include=['object', 'category']).columns.tolist()
df = pd.get_dummies(df, columns=categorical_vars, drop_first=True)

y = np.array([(event, time) for event, time in zip(df['event'], df['Survival_months'])], 
             dtype=[('event', bool), ('time', float)])

X = df.drop(columns=['Survival_months', 'event']).values  


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
param_grid = {
    "n_estimators": [100, 150],  
    "max_depth": [3, 5],  
    "min_samples_split": [2, 3],  
    "min_samples_leaf": [2, 3],  
}

rsf = RandomSurvivalForest(random_state=42)
grid_search = GridSearchCV(rsf, param_grid, cv=2, scoring='neg_mean_squared_error', n_jobs=-1)
grid_search.fit(X_train, y_train)

In [None]:
best_params = grid_search.best_params_
print(f"best_params: {best_params}")

best_rsf = RandomSurvivalForest(**best_params, random_state=42)
best_rsf.fit(X_train, y_train)

c_index_train = concordance_index_censored(y_train['event'], y_train['time'], best_rsf.predict(X_train))[0]
c_index_test = concordance_index_censored(y_test['event'], y_test['time'], best_rsf.predict(X_test))[0]

print(f'c_index_train: {c_index_train:.4f}')
print(f'c_index_test: {c_index_test:.4f}')

In [None]:
test_df = pd.DataFrame({
    'Survival_months': y_test['time'],
    'event': y_test['event']
})

In [None]:
pred_surv_funcs = best_rsf.predict_survival_function(X_test)

In [None]:
time_points = [12, 36, 60] 
plt.figure(figsize=(8, 6), dpi=500)

for t in time_points:
    surv_probs = np.array([
        np.interp(t, surv_func.x, surv_func.y) for surv_func in pred_surv_funcs
    ])
    risk_scores = 1 - surv_probs
    
    y_true = ((test_df['Survival_months'] <= t) & (test_df['event'] == True)).astype(int)
    
    fpr, tpr, _ = roc_curve(y_true, risk_scores)
    roc_auc = auc(fpr, tpr)

    plt.plot(fpr, tpr, label=f"{t} months (AUC = {roc_auc:.2f})")

plt.plot([0, 1], [0, 1], 'k--', lw=1)
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('RSF Survival Analysis: 1-year, 3-year, and 5-year ROC Curves')
plt.legend(loc='lower right')
plt.show()

In [None]:
t_min = 1
t_max = 60
t_points = np.arange(t_min, t_max + 1)
auc_values = []

for t in t_points:
    surv_probs = np.array([
        np.interp(t, surv_func.x, surv_func.y) for surv_func in pred_surv_funcs
    ])
    risk_scores = 1 - surv_probs

    y_true = ((test_df['Survival_months'] <= t) & (test_df['event'] == True)).astype(int)

    if (y_true.sum() == 0) or (y_true.sum() == len(y_true)):
        auc_values.append(np.nan)
    else:
        fpr, tpr, _ = roc_curve(y_true, risk_scores)
        auc_val = auc(fpr, tpr)
        auc_values.append(auc_val)

plt.figure(figsize=(8, 6), dpi=500)
plt.ylim([0.6, 0.9])
plt.plot(t_points, auc_values, marker='o', linestyle='-')
plt.xlabel('Time (months)')
plt.ylabel('AUC')
plt.title('RSF Survival Analysis: AUC Over Time')
plt.grid(True)
plt.show()

In [None]:
# DataFrame
rsf_auc_df = pd.DataFrame({
    'Month': t_points,
    'RSF_AUC': auc_values
})

rsf_auc_df.to_excel("./Log/RSF_AUC_by_month.xlsx", index=False)