# 1.4 医学因果推断常用方法 · 最小可运行模板

生成时间：2025-08-15 04:18:06

包含：
- **PSM**（匹配 + Love plot）
- **IPTW**（稳定化 + 截尾 + ESS）
- **IV**（2SLS with robust SE）
- **DR**（EconML DRLearner：ATE + ITE）

> 先运行“🧪 生成/加载演示数据”和“🔧 工具函数”。
> 合成数据仅演示用途；实际研究请替换为真实数据并补充完整诊断与敏感性分析。

## 🧪 生成/加载演示数据

In [None]:
import numpy as np, pandas as pd
rng = np.random.default_rng(42)

n = 800
age = rng.normal(70, 8, n)
sex = rng.integers(0, 2, n)  # 0 female, 1 male
sofa = rng.integers(0, 12, n)
charlson = rng.integers(0, 8, n)

# Treatment assignment via logistic model
lin_ps = -4 + 0.03*age + 0.25*sex + 0.15*sofa + 0.20*charlson
ps_true = 1/(1+np.exp(-lin_ps))
treatment = rng.binomial(1, ps_true)

# Treatment effect heterogeneity
tau =  -1.0 + 0.02*(80-age) + 0.1*(sex==0)
# Baseline outcome
y0 = 10 + 0.08*age + 0.6*sofa + 0.8*charlson + rng.normal(0, 3, n)
# Realized outcome
outcome = y0 + tau*treatment + rng.normal(0, 1.2, n)

df = pd.DataFrame({
    'age': np.round(age, 1),
    'sex': sex,
    'sofa': sofa,
    'charlson': charlson,
    'treatment': treatment,
    'outcome': np.round(outcome, 3)
})
df.to_csv('/mnt/data/causal_toy_data.csv', index=False)
print('Saved synthetic dataset to /mnt/data/causal_toy_data.csv with shape:', df.shape)
df.head()

## 🔧 工具函数（SMD、ESS）

In [None]:
import numpy as np, pandas as pd

# Compute SMD for numeric/binary columns; returns Series
def smd_standardized_mean_diff(df, cols, treat_col, weights=None):
    out = {}
    T = df[treat_col].values.astype(int)
    for c in cols:
        x = df[c].values
        if weights is None:
            wt = np.ones_like(x, dtype=float)
        else:
            wt = np.asarray(weights, dtype=float)
        tmask = T==1
        cmask = T==0
        mt = np.sum(wt[tmask]*x[tmask])/np.sum(wt[tmask])
        mc = np.sum(wt[cmask]*x[cmask])/np.sum(wt[cmask])
        vt = np.sum(wt[tmask]*(x[tmask]-mt)**2)/np.sum(wt[tmask])
        vc = np.sum(wt[cmask]*(x[cmask]-mc)**2)/np.sum(wt[cmask])
        sp = np.sqrt(0.5*(vt+vc)+1e-12)
        out[c] = (mt-mc)/sp if sp>0 else 0.0
    return pd.Series(out).sort_values(key=lambda s: np.abs(s), ascending=False)

def effective_sample_size(weights):
    w = np.asarray(weights, dtype=float)
    return (w.sum()**2) / (np.sum(w**2) + 1e-12)

## 1) PSM：匹配 + Love plot

In [None]:
import numpy as np, pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt

df = pd.read_csv('/mnt/data/causal_toy_data.csv')

T, Y = 'treatment', 'outcome'
pre_covs = ['age','sex','sofa','charlson']

# 1) Estimate PS
X = pd.get_dummies(df[pre_covs], drop_first=True)
ps_model = LogisticRegression(max_iter=1000).fit(X, df[T])
ps = ps_model.predict_proba(X)[:,1]

# 2) 1:1 nearest neighbor matching with caliper
treated_idx = df.index[df[T]==1].to_numpy()
control_idx = df.index[df[T]==0].to_numpy()

nbrs = NearestNeighbors(n_neighbors=1).fit(ps[control_idx].reshape(-1,1))
dist, nn = nbrs.kneighbors(ps[treated_idx].reshape(-1,1))
matched_control = control_idx[nn.flatten()]
pairs = pd.DataFrame({'treated': treated_idx, 'control': matched_control, 'dist': dist.flatten()})
caliper = 0.05
pairs = pairs[pairs['dist'] <= caliper]

# 3) ATT
yt = df.loc[pairs['treated'], Y].to_numpy()
yc = df.loc[pairs['control'], Y].to_numpy()
att = yt.mean() - yc.mean()
print(f"PSM ATT (caliper={caliper}): {att:.4f} | pairs: {len(pairs)}")

# 4) Love plot SMD before/after
from __main__ import smd_standardized_mean_diff

smd_before = smd_standardized_mean_diff(pd.concat([df[pre_covs], df[[T]]], axis=1), pre_covs, T)

weights_matched = np.zeros(len(df))
weights_matched[pairs['treated']] = 1
weights_matched[pairs['control']] += 1

smd_after = smd_standardized_mean_diff(df, pre_covs, T, weights=weights_matched)

plt.figure()
order = smd_before.abs().sort_values(ascending=True).index.tolist()
plt.scatter(smd_before[order], range(len(order)), label='Before')
plt.scatter(smd_after[order], range(len(order)), marker='x', label='After')
plt.yticks(range(len(order)), order)
plt.axvline(0.1, linestyle='--'); plt.axvline(-0.1, linestyle='--')
plt.xlabel('Standardized Mean Difference')
plt.title('Love Plot: PSM Balance')
plt.legend()
plt.show()

## 2) IPTW：稳定化 + 截尾 + ESS

In [None]:
import numpy as np, pandas as pd
from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt

df = pd.read_csv('/mnt/data/causal_toy_data.csv')
T, Y = 'treatment', 'outcome'
pre_covs = ['age','sex','sofa','charlson']

X = pd.get_dummies(df[pre_covs], drop_first=True)
ps = LogisticRegression(max_iter=1000).fit(X, df[T]).predict_proba(X)[:,1]

p_t = df[T].mean()
w = np.where(df[T]==1, p_t/ps, (1-p_t)/(1-ps))

# Trim
lo, hi = np.percentile(w, [1, 99])
w_clip = np.clip(w, lo, hi)

ate = (w_clip[df[T]==1]*df.loc[df[T]==1, Y]).sum()/w_clip[df[T]==1].sum() - \
      (w_clip[df[T]==0]*df.loc[df[T]==0, Y]).sum()/w_clip[df[T]==0].sum()
print(f"IPTW (stabilized, trimmed {lo:.3f}-{hi:.3f}) ATE: {ate:.4f}")

from __main__ import effective_sample_size, smd_standardized_mean_diff
ESS = effective_sample_size(w_clip)
print("Effective Sample Size (ESS):", round(ESS, 1))

plt.figure()
plt.hist(w, bins=40, alpha=0.7, label='Raw')
plt.hist(w_clip, bins=40, alpha=0.7, label='Trimmed')
plt.title('IPTW Weights Distribution')
plt.legend()
plt.show()

smd_w = smd_standardized_mean_diff(df, pre_covs, T, weights=w_clip)
print("Top weighted SMD:\n", smd_w.head())

## 3) IV（2SLS）：最小示例

In [None]:
import numpy as np, pandas as pd
import statsmodels.api as sm
from linearmodels.iv import IV2SLS

df = pd.read_csv('/mnt/data/causal_toy_data.csv')
# Toy instrument: cluster-level treatment rate proxying physician preference
clusters = (df['age'] // 5).astype(int)
pref = clusters.map(df.groupby(clusters)['treatment'].mean())
df['physician_preference'] = pref.values

Y = df['outcome']
X = df['treatment']
Z = df[['physician_preference']]
W = sm.add_constant(df[['age','sex','sofa','charlson']], has_constant='add')

iv_res = IV2SLS(dependent=Y, exog=W, endog=X, instruments=pd.concat([Z, W], axis=1)).fit(cov_type='robust')
print(iv_res.summary)

## 4) DR（EconML DRLearner）：ATE 与 ITE

In [None]:
import numpy as np, pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LassoCV
from econml.dr import DRLearner

df = pd.read_csv('/mnt/data/causal_toy_data.csv')
T, Y = 'treatment', 'outcome'
X = pd.get_dummies(df[['age','sex','sofa','charlson']], drop_first=True).values

est = DRLearner(
    model_propensity=LogisticRegression(max_iter=1000),
    model_regression=RandomForestRegressor(n_estimators=200, random_state=0),
    model_final=LassoCV(cv=5, random_state=0)
)
est.fit(Y=df[Y].values, T=df[T].values, X=X)

ate = np.mean(est.effect(X))
print("DRLearner ATE:", round(float(ate), 4))
print("First 10 ITE estimates:", np.round(est.effect(X[:10]), 4))