# 02) 생존분석 및 시각화

| 주제 | 내용 |
|------|------|
| Kaplan-Meier | 비모수적 생존곡선 추정, 그룹 비교 |
| Univariate Cox | 유전자/프로브별 생존 연관 스크리닝 |
| Multivariate Cox | 임상+분자 마커 통합 모델, Forest Plot |
| 위험군 분류 | Cox 위험점수 → High/Low Risk KM 검증 |

## 라이브러리 및 환경 설정

In [None]:
import os, numpy as np, pandas as pd, seaborn as sns, matplotlib.pyplot as plt
from lifelines import KaplanMeierFitter, CoxPHFitter
from lifelines.statistics import logrank_test
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')

import plot_config
plot_config.setup()
np.random.seed(42)

## 데이터 로드

In [None]:
mrna = pd.read_csv("input/mrna_matched.txt", sep="\t", index_col=0)
clinical = pd.read_csv("input/clinical_matched.tsv", sep="\t")
methyl = pd.read_csv("input/methylation_matched.txt", sep="\t", index_col=0)

SAMPLE_COL   = "Sample ID"
STATUS_COL   = "Overall Survival Status"
DURATION_COL = "Overall Survival (Months)"

# 공통 샘플 정렬
samples = sorted(set(mrna.columns) & set(clinical[SAMPLE_COL]) & set(methyl.columns))
mrna     = mrna.loc[:, samples]
methyl   = methyl.loc[:, samples]
clinical = clinical[clinical[SAMPLE_COL].isin(samples)].reset_index(drop=True)

print(f"공통 샘플: {len(samples)}개 | mRNA {mrna.shape}, Methylation {methyl.shape}")

## 생존분석 전처리

In [None]:
clinical = clinical.dropna(subset=[DURATION_COL, STATUS_COL]).copy()

# 동적 상태 매핑 (데이터에 따라 자동 감지)
status_vals = clinical[STATUS_COL].dropna().unique()
status_map = {}
for val in status_vals:
    val_str = str(val).upper()
    if val_str.startswith('1') or 'DECEASED' in val_str or 'DEAD' in val_str:
        status_map[val] = 1
    else:
        status_map[val] = 0
clinical["event"] = clinical[STATUS_COL].map(status_map).astype(int)
clinical["duration"] = clinical[DURATION_COL].astype(float)

n_total = len(clinical)
n_events = clinical["event"].sum()
print(f"대상: {n_total}명 | 이벤트: {n_events}명 ({n_events/n_total*100:.1f}%) | 중앙 생존: {clinical['duration'].median():.1f}개월")
print(f"상태 매핑: {status_map}")

### Vibe Coding 미션 C1: 생존데이터 탐색적 분석
**미션**: 생존분석 데이터의 특성을 체계적으로 탐색해보세요!

**도전 과제**:
1. 생존시간의 분포를 히스토그램과 박스플롯으로 시각화하기
2. 이벤트 발생률을 전체적으로 요약하기 (사망/생존 비율)
3. 중도절단된 데이터의 패턴 분석하기


In [None]:
# 미션 C1 코드를 여기에 작성하세요!

### Vibe Coding 미션 C2: 생존데이터 품질 평가
**미션**: 생존분석 데이터의 품질을 다각도로 평가해보세요!

**도전 과제**:
1. 관찰기간별 이벤트 발생률 트렌드 분석하기
2. 극값(outlier) 생존시간을 가진 환자들 식별하기
3. Follow-up 기간의 적절성 평가하기


In [None]:
# 미션 C2 코드를 여기에 작성하세요!

## 1) Kaplan-Meier 생존곡선

In [None]:
kmf = KaplanMeierFitter()
kmf.fit(clinical["duration"], clinical["event"], label="All Patients")

plt.figure(figsize=(10, 6))
kmf.plot_survival_function(ci_show=True, ci_alpha=0.3)
plt.title("Kaplan-Meier Survival Curve - All Patients", fontsize=14, fontweight='bold')
plt.xlabel("Time (Months)"); plt.ylabel("Survival Probability")
plt.ylim(0, 1); plt.grid(True, alpha=0.3)

median_survival = kmf.median_survival_time_
plt.axvline(median_survival, color='red', linestyle='--', alpha=0.7, label=f'Median: {median_survival:.1f} months')
plt.legend(); plt.tight_layout(); plt.show()

### Vibe Coding 미션 C3: 기본 생존곡선 향상
**미션**: 기본 생존곡선을 더욱 정보가 풍부하고 보기 좋게 만들어보세요!

**도전 과제**:
1. 생존곡선에 위험군 수 테이블(risk table) 추가하기
2. 특정 시점(12, 24, 36개월)에서의 생존율 표시하기
3. 95% 신뢰구간과 함께 색상 조화로운 테마 적용하기


In [None]:
# 미션 C3 코드를 여기에 작성하세요!

## 바이오마커 스크리닝 (Univariate Cox)

유전자/프로브별 개별 Cox 회귀 → 생존 유의 바이오마커 선별

In [None]:
# ── 사전 계산된 스크리닝 결과가 있으면 로드 (없으면 직접 계산) ──
_NB_DIR = os.path.dirname(os.path.abspath("__file__"))
GENE_SCREEN_FILE  = os.path.join(_NB_DIR, "results", "nb02_gene_screening.tsv")
PROBE_SCREEN_FILE = os.path.join(_NB_DIR, "results", "nb02_probe_screening.tsv")

if os.path.exists(GENE_SCREEN_FILE) and os.path.exists(PROBE_SCREEN_FILE):
    gene_screen_df  = pd.read_csv(GENE_SCREEN_FILE, sep="\t")
    probe_screen_df = pd.read_csv(PROBE_SCREEN_FILE, sep="\t")
    print("[INFO] 사전 계산된 스크리닝 결과를 로드했습니다.")
else:
    print("[INFO] 스크리닝 결과 파일이 없어 직접 계산합니다 (수 분 소요)...")
    # ── 1) mRNA Univariate Cox Screening ──
    cph_screen = CoxPHFitter()
    gene_screening_results = []
    sample_ids = clinical[SAMPLE_COL].values

    for i, gene in enumerate(mrna.index):
        try:
            expr = mrna.loc[gene, sample_ids].values.astype(float)
            tmp = pd.DataFrame({
                'duration': clinical['duration'].values,
                'event': clinical['event'].values,
                'x': StandardScaler().fit_transform(expr.reshape(-1, 1)).ravel()
            })
            cph_screen.fit(tmp, duration_col='duration', event_col='event')
            s = cph_screen.summary
            gene_screening_results.append({
                'gene': gene, 'coef': s['coef'].values[0],
                'hr': s['exp(coef)'].values[0], 'p': s['p'].values[0]
            })
        except Exception:
            pass

    gene_screen_df = pd.DataFrame(gene_screening_results).sort_values('p')

    # ── 2) Methylation Univariate Cox Screening ──
    probe_screening_results = []

    for i, probe in enumerate(methyl.index):
        try:
            meth_vals = methyl.loc[probe, sample_ids].values.astype(float)
            if np.isnan(meth_vals).sum() > len(meth_vals) * 0.3:
                continue
            meth_vals = np.nan_to_num(meth_vals, nan=np.nanmean(meth_vals))
            tmp = pd.DataFrame({
                'duration': clinical['duration'].values,
                'event': clinical['event'].values,
                'x': StandardScaler().fit_transform(meth_vals.reshape(-1, 1)).ravel()
            })
            cph_screen.fit(tmp, duration_col='duration', event_col='event')
            s = cph_screen.summary
            probe_screening_results.append({
                'probe': probe, 'coef': s['coef'].values[0],
                'hr': s['exp(coef)'].values[0], 'p': s['p'].values[0]
            })
        except Exception:
            pass

    probe_screen_df = pd.DataFrame(probe_screening_results).sort_values('p')

# ── 결과 요약 ──
print(f"유의한 유전자: {(gene_screen_df['p']<0.05).sum()}/{len(gene_screen_df)}개")
print(f"유의한 프로브: {(probe_screen_df['p']<0.05).sum()}/{len(probe_screen_df)}개")
print("\n[Top 10 유전자]")
print(gene_screen_df.head(10).to_string(index=False))
print("\n[Top 10 프로브]")
print(probe_screen_df.head(10).to_string(index=False))

TOP_N_GENES = 5; TOP_N_PROBES = 5
top_genes = gene_screen_df.head(TOP_N_GENES)['gene'].values
top_probes = probe_screen_df.head(TOP_N_PROBES)['probe'].values

## 2) mRNA 발현 기반 생존분석
스크리닝 Top 유전자의 발현량 → High/Low 그룹 → Log-rank 검정

In [None]:
# 스크리닝 상위 20개 중 KM 분리가 가장 좋은 유전자 자동 선택
best_km_gene, best_km_p = None, 1.0
for _, row in gene_screen_df.head(20).iterrows():
    gene = row['gene']
    expr = mrna.loc[gene, clinical.set_index(SAMPLE_COL).index]
    try:
        groups = pd.qcut(expr, 2, labels=["Low", "High"], duplicates='drop')
    except ValueError:
        continue
    if groups.nunique() < 2:
        continue
    low_d = clinical.set_index(SAMPLE_COL).loc[groups == "Low"]
    high_d = clinical.set_index(SAMPLE_COL).loc[groups == "High"]
    lr = logrank_test(low_d["duration"], high_d["duration"], low_d["event"], high_d["event"])
    if lr.p_value < best_km_p:
        best_km_p = lr.p_value
        best_km_gene = gene

target_gene = best_km_gene
df = clinical.set_index(SAMPLE_COL).join(mrna.loc[target_gene].rename("Expression"))
df["Expression_group"] = pd.qcut(df["Expression"], 2, labels=["Low", "High"], duplicates='drop')

# KM 생존곡선
plt.figure(figsize=(10, 6))
kmf = KaplanMeierFitter()
for i, group in enumerate(["Low", "High"]):
    subset = df[df["Expression_group"] == group]
    kmf.fit(subset["duration"], subset["event"], label=f"{group} Expression")
    kmf.plot_survival_function(ci_show=True, ci_alpha=0.2, color=['#1f77b4', '#ff7f0e'][i])

lr = logrank_test(df[df["Expression_group"]=="Low"]["duration"], df[df["Expression_group"]=="High"]["duration"],
                  df[df["Expression_group"]=="Low"]["event"], df[df["Expression_group"]=="High"]["event"])
sig_text = "***" if lr.p_value < 0.001 else "**" if lr.p_value < 0.01 else "*" if lr.p_value < 0.05 else "ns"
plt.text(0.02, 0.98, f"Log-rank p={lr.p_value:.4e} {sig_text}", transform=plt.gca().transAxes, fontsize=12,
         bbox=dict(boxstyle="round,pad=0.3", facecolor="lightyellow", alpha=0.8), verticalalignment='top')

plt.title(f"Survival by {target_gene} Expression", fontsize=14, fontweight='bold')
plt.xlabel("Time (Months)"); plt.ylabel("Survival Probability")
plt.ylim(0, 1); plt.grid(True, alpha=0.3); plt.legend()
plt.tight_layout(); plt.show()

## 3) Methylation 기반 생존분석
스크리닝 Top 프로브의 메틸화 수준 → Hyper/Hypo 그룹 → Log-rank 검정

In [None]:
# 스크리닝 상위 20개 중 KM 분리가 가장 좋은 프로브 자동 선택
best_km_probe, best_km_p_probe = None, 1.0
for _, row in probe_screen_df.head(20).iterrows():
    probe = row['probe']
    meth_vals = methyl.loc[probe, df.index]
    meth_clean = meth_vals.dropna()
    if len(meth_clean) < 50: continue
    valid_idx = meth_clean.index
    try:
        groups = pd.qcut(meth_clean, 2, labels=["Hypo", "Hyper"], duplicates='drop')
    except ValueError:
        continue
    if groups.nunique() < 2:
        continue
    hypo_d = df.loc[valid_idx][groups == "Hypo"]
    hyper_d = df.loc[valid_idx][groups == "Hyper"]
    lr = logrank_test(hypo_d["duration"], hyper_d["duration"], hypo_d["event"], hyper_d["event"])
    if lr.p_value < best_km_p_probe:
        best_km_p_probe = lr.p_value
        best_km_probe = probe

if best_km_probe is None:
    print("[경고] 적절한 프로브를 찾지 못했습니다. 첫 번째 프로브를 사용합니다.")
    best_km_probe = probe_screen_df.iloc[0]['probe']

target_probe = best_km_probe
df["Methylation"] = methyl.loc[target_probe]
df["Methylation_group"] = pd.qcut(df["Methylation"].dropna(), 2, labels=["Hypo", "Hyper"], duplicates='drop')
df_meth = df.dropna(subset=["Methylation"])

# KM 생존곡선
plt.figure(figsize=(10, 6))
kmf = KaplanMeierFitter()
for i, group in enumerate(["Hypo", "Hyper"]):
    subset = df_meth[df_meth["Methylation_group"] == group]
    kmf.fit(subset["duration"], subset["event"], label=f"{group}methylated")
    kmf.plot_survival_function(ci_show=True, ci_alpha=0.2, color=['#2ca02c', '#d62728'][i])

lr = logrank_test(df_meth[df_meth["Methylation_group"]=="Hypo"]["duration"],
                  df_meth[df_meth["Methylation_group"]=="Hyper"]["duration"],
                  df_meth[df_meth["Methylation_group"]=="Hypo"]["event"],
                  df_meth[df_meth["Methylation_group"]=="Hyper"]["event"])
sig_text = "***" if lr.p_value < 0.001 else "**" if lr.p_value < 0.01 else "*" if lr.p_value < 0.05 else "ns"
plt.text(0.02, 0.98, f"Log-rank p={lr.p_value:.4e} {sig_text}", transform=plt.gca().transAxes, fontsize=12,
         bbox=dict(boxstyle="round,pad=0.3", facecolor="lightyellow", alpha=0.8), verticalalignment='top')

plt.title(f"Survival by Methylation ({target_probe})", fontsize=14, fontweight='bold')
plt.xlabel("Time (Months)"); plt.ylabel("Survival Probability")
plt.ylim(0, 1); plt.grid(True, alpha=0.3); plt.legend()
plt.tight_layout(); plt.show()

## 4) 다변량 Cox 회귀분석

단계적으로 변수를 추가하며 C-index 향상을 관찰합니다.

| 모델 | 변수 | 목적 |
|------|------|------|
| Model 1 | 임상변수 (연령, 병기, 아형) | 기준 |
| Model 2 | 임상 + 유전자 1개 | 분자마커 효과 |
| Model 3 | 임상 + 유전자 + 메틸화 | 멀티오믹스 효과 |

In [None]:
# ── 1) 임상변수 전처리 ──
cox_df = df[['duration', 'event']].copy()
cox_df['age'] = clinical.set_index(SAMPLE_COL).loc[cox_df.index, 'Diagnosis Age'].astype(float)
cox_df['age_z'] = StandardScaler().fit_transform(cox_df[['age']])

stage_raw = clinical.set_index(SAMPLE_COL).loc[cox_df.index, 'Neoplasm Disease Stage American Joint Committee on Cancer Code']
stage_map = {}
for s in stage_raw.dropna().unique():
    if 'IV' in str(s): stage_map[s] = 3
    elif 'III' in str(s): stage_map[s] = 3
    elif 'II' in str(s): stage_map[s] = 2
    elif 'I' in str(s): stage_map[s] = 1
    else: stage_map[s] = 2
cox_df['stage'] = stage_raw.map(stage_map).fillna(2).astype(float)

# 동적 아형 더미 변수 (최빈 아형 = 기준 그룹)
subtype = clinical.set_index(SAMPLE_COL).loc[cox_df.index, 'Subtype']
subtype_counts = subtype.value_counts()
dummy_subtypes = [s for s in subtype_counts.index[1:] if subtype_counts[s] >= 5][:2]
subtype_cols = []
for st in dummy_subtypes:
    safe_name = st.replace(' ', '_').replace('/', '_').replace('.', '_')
    cox_df[safe_name] = (subtype == st).astype(int)
    subtype_cols.append(safe_name)
print(f"기준 아형: {subtype_counts.index[0]} | 더미: {dummy_subtypes}")

# ── 2) 분자마커 추가 ──
best_gene = gene_screen_df.iloc[0]['gene']
best_probe = probe_screen_df.iloc[0]['probe']
cox_df['expr'] = StandardScaler().fit_transform(mrna.loc[best_gene, cox_df.index].values.reshape(-1, 1)).ravel()
meth_vals = methyl.loc[best_probe, cox_df.index].values.astype(float)
meth_vals = np.nan_to_num(meth_vals, nan=np.nanmean(meth_vals))
cox_df['meth'] = StandardScaler().fit_transform(meth_vals.reshape(-1, 1)).ravel()

# ── 3) 단계적 Cox 모델 비교 ──
clinical_vars = ['age_z', 'stage'] + subtype_cols
all_vars = clinical_vars + ['expr', 'meth']

cph1 = CoxPHFitter(penalizer=0.01)
cph1.fit(cox_df[['duration', 'event'] + clinical_vars], duration_col='duration', event_col='event')

cph2 = CoxPHFitter(penalizer=0.01)
cph2.fit(cox_df[['duration', 'event'] + clinical_vars + ['expr']], duration_col='duration', event_col='event')

cph3 = CoxPHFitter(penalizer=0.01)
cph3.fit(cox_df[['duration', 'event'] + all_vars], duration_col='duration', event_col='event')

print(f"Model 1 (임상만):       C-index = {cph1.concordance_index_:.4f}")
print(f"Model 2 (+유전자):      C-index = {cph2.concordance_index_:.4f}  (Δ +{cph2.concordance_index_-cph1.concordance_index_:.4f})")
print(f"Model 3 (+유전자+메틸): C-index = {cph3.concordance_index_:.4f}  (Δ +{cph3.concordance_index_-cph1.concordance_index_:.4f})")

### Forest Plot & C-index 비교

In [None]:
from matplotlib.patches import Patch

summary = cph3.summary
hr = summary["exp(coef)"]
hr_lower = summary["exp(coef) lower 95%"]
hr_upper = summary["exp(coef) upper 95%"]
p_values = summary["p"]
labels = [name.replace("gene_", "Gene: ").replace("probe_", "Probe: ") for name in hr.index]
y_pos = np.arange(len(hr))[::-1]

fig, axes = plt.subplots(1, 3, figsize=(22, max(8, len(hr) * 0.6)))
fig.suptitle(f"Cox Regression Results", fontsize=16, fontweight='bold')

# (1) Forest Plot
for pos, hr_val, lower, upper, p_val in zip(y_pos, hr, hr_lower, hr_upper, p_values):
    color = 'red' if p_val < 0.05 else 'steelblue'
    axes[0].errorbar(hr_val, pos, xerr=[[hr_val-lower], [upper-hr_val]],
                     fmt='o', color=color, alpha=0.8, capsize=4,
                     markersize=10 if p_val < 0.05 else 7, linewidth=2.5 if p_val < 0.05 else 1.5)
axes[0].axvline(x=1, color='gray', linestyle='--', alpha=0.8, lw=2)
axes[0].set_yticks(y_pos); axes[0].set_yticklabels(labels, fontsize=9)
axes[0].set_xlabel("Hazard Ratio (95% CI)"); axes[0].set_title("Forest Plot", fontweight='bold')
axes[0].legend(handles=[Patch(facecolor='red', label='p<0.05'), Patch(facecolor='steelblue', label='ns')], loc='lower right')

# (2) C-index 비교
c_indices = [cph1.concordance_index_, cph2.concordance_index_, cph3.concordance_index_]
model_labels = ['M1\nClinical', 'M2\n+Gene', 'M3\n+Gene+Meth']
axes[1].bar(model_labels, c_indices, color=['#2E86AB', '#A23B72', '#F18F01'], alpha=0.8, edgecolor='black')
for i, cidx in enumerate(c_indices):
    axes[1].text(i, cidx + 0.01, f"{cidx:.3f}", ha='center', va='bottom', fontsize=11, fontweight='bold')
axes[1].set_ylim(0.4, 1.0); axes[1].set_ylabel("C-index"); axes[1].set_title("C-index Comparison", fontweight='bold')

# (3) Baseline Survival
cph3.baseline_survival_.plot(ax=axes[2], legend=False, lw=2.5, color='darkblue')
axes[2].set_title("Baseline Survival S₀(t)", fontweight='bold')
axes[2].set_xlabel("Time (Months)"); axes[2].set_ylim(0, 1)

plt.tight_layout(rect=[0, 0, 1, 0.93]); plt.show()

### Vibe Coding 미션 C4: Cox 모델 위험군 분류 및 KM 검증
**미션**: Cox 모델의 예측값을 활용하여 환자를 위험군으로 분류하고 생존곡선으로 검증하세요!

**도전 과제**:
1. Cox 모델의 `predict_partial_hazard()`로 각 환자의 위험 점수 계산
2. 위험 점수 중앙값 기준으로 **High Risk / Low Risk** 그룹 분류
3. 두 위험군의 KM 생존곡선 비교 + **Log-rank 검정**
4. (도전) `pd.qcut()`으로 3분위(Low/Medium/High)로 나누어 세밀한 분류 시도

In [None]:
# 미션 C4 코드를 여기에 작성하세요!


## 요약

| 분석 | 핵심 함수 |
|------|-----------|
| Kaplan-Meier | `KaplanMeierFitter`, `logrank_test` |
| Univariate Cox | `CoxPHFitter` (단변량) |
| Multivariate Cox | `CoxPHFitter` (다변량, L2 정규화) |
| Forest Plot | `matplotlib errorbar` |

In [None]:
print(f"분석 환자: {len(df)}명 | 이벤트: {int(df['event'].sum())}명")
print(f"유의 유전자: {(gene_screen_df['p']<0.05).sum()}개 | 유의 프로브: {(probe_screen_df['p']<0.05).sum()}개")
print(f"\nC-index: M1={cph1.concordance_index_:.4f} → M4={cph3.concordance_index_:.4f}")

## 스크리닝 결과 저장 → NB03, NB04에서 사용

In [None]:
import os
RESULT_DIR = "results"
os.makedirs(RESULT_DIR, exist_ok=True)

# 유전자 스크리닝 결과 저장 (전체 + 유의한 것 표시)
gene_screen_df['significant'] = gene_screen_df['p'] < 0.05
gene_screen_df.to_csv(f"{RESULT_DIR}/nb02_gene_screening.tsv",
                       sep="\t", index=False)

# 프로브 스크리닝 결과 저장
probe_screen_df['significant'] = probe_screen_df['p'] < 0.05
probe_screen_df.to_csv(f"{RESULT_DIR}/nb02_probe_screening.tsv",
                        sep="\t", index=False)

n_sig_g = gene_screen_df['significant'].sum()
n_sig_p = probe_screen_df['significant'].sum()
print(f"저장 완료: {RESULT_DIR}/")
print(f"  유의한 유전자: {n_sig_g}개 / {len(gene_screen_df)}개")
print(f"  유의한 프로브: {n_sig_p}개 / {len(probe_screen_df)}개")