# Huggett (1993) Model with Aggregate Risk

Heterogeneous-agent economy: individuals have uninsured idiosyncratic labor income risk and save in bonds; bonds are in **zero net supply**. Aggregate TFP $z_t$ is stochastic. The interest rate $r_t$ clears the bond market.

**Individual state:** $(b, y)$ — bond holdings and idiosyncratic income.  
**Aggregate state:** $z$ (and in equilibrium, the cross-sectional distribution).  
**Budget:** $c + b' = (1+r)b + y\,z$, with borrowing constraint $b \geq \underline{b}$.

We solve for equilibrium using **SRL/SPG** (structural policy gradient): policy $\pi(b,y,r,z) \to (c,b')$; price $r = P^*(G,z)$ from market clearing with **gradient-stop** so agents take prices as given.

In [1]:
# ========== 依赖库 / Dependencies ==========
# 中文: numpy 数值计算与网格；matplotlib 作图；scipy.stats.norm 为 Tauchen 离散化提供正态 CDF
# EN: numpy for arrays/grids; matplotlib for plots; scipy.stats.norm for Tauchen (normal CDF)
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

np.random.seed(42)   # 固定随机种子，便于复现 / fix seed for reproducibility
plt.rcParams['font.size'] = 10
plt.rcParams['axes.unicode_minus'] = False  # 避免图中负号显示为方块 / avoid minus sign rendering as block

## Global parameters (SRL Appendix A.1 — Table 2 & Section 4.1)

- **Preferences:** $E_0 \sum_{t=0}^\infty \beta^t u(c_t)$, isoelastic $u(c) = \frac{c^{1-\sigma}}{1-\sigma}$.
- **Idiosyncratic income** $y_t$: **level** AR(1) $y_{t+1}=(1-\rho_y)\bar{y}+\rho_y y_t+\nu_y\varepsilon_t$, persistence $\rho_y$, volatility $\nu_y$; discretized (Tauchen-style) on $n_y$ points, normalized so $E[y]=1$.
- **Aggregate income** $z_t$: log AR(1), persistence $\rho_z$, volatility $\nu_z$; discretized (Tauchen) on $n_z$ points.
- **Bonds:** borrowing limit $\underline{b}$, total bond supply $B = 0$ (zero net supply).

In [2]:
# ========== 校准参数 / Calibration (SRL Appendix Table 2 & Section 4.1) ==========
# 一期 = 一年 / one period = one year

# --- 偏好 / Preferences ---
beta = 0.96       # 折现因子 β；E_0 Σ β^t u(c_t) / discount factor
sigma = 2.0       # CRRA 系数 σ；u(c)=c^(1-σ)/(1-σ) / CRRA coefficient

# 个体收入 y：y 直接服从 AR(1)（水平值）/ Idiosyncratic income: y follows AR(1) in levels
rho_y = 0.6      # 持久性 / persistence
nu_y = 0.2       # 创新标准差 / innovation volatility

# 总量 z：log z ~ AR(1) / Aggregate TFP z: log z ~ AR(1)
rho_z = 0.9
nu_z = 0.02

# 债券市场 / Bonds
B = 0.0          # 总债券供给（零净供给）/ total bond supply (zero net supply)
b_min = -1.0     # 借贷约束 b ≥ b_min / borrowing constraint

# --- 局部均衡 PE：外生利率过程 / Partial equilibrium: exogenous r process ---
r_bar = 0.038    # Mean interest rate (PE)
rho_r = 0.8      # Autocorrelation of interest rate (PE)
nu_r = 0.02      # Volatility of interest rate (PE)

# ========== 离散化 / Discretization (Table 3 hyperparameters) ==========
nb = 200         # 债券网格点数 / number of bond grid points
b_max = 50.0     # 债券网格上界 / upper bound of bond grid
ny = 3           # 个体收入 y 网格点数 / number of y grid points
nr = 20          # 利率 r 网格点数 / number of r grid points (market clearing)
r_min = 0.01     # 利率网格下界 / lower bound of r grid
r_max = 0.06     # 利率网格上界 / upper bound of r grid
nz = 30          # 总量 z 网格点数 / number of z grid points
J = nb * ny      # 复合状态数 (b,y)→j / composite state size j = ib*ny+iy

# 数值截断 / Truncation (Appendix A.1, Table 3)
c_min = 1e-3     # cmin Minimum consumption 10^{-3}
etrunc = 1e-3    # etrunc Truncation threshold 10^{-3}
T_trunc = 170    # Ttrunc Truncation horizon (Table 3); = min{T : β^T < etrunc}

# ========== SPG training (Table 3) ==========
N_epoch = 1000       # Nepoch Maximum number of parameter updates
N_warmup = 50       # Nwarm-up Number of warm-up epochs
lr_ini = 1e-3       # lrini Initial learning rate
lr_decay = 0.5      # lrdecay Learning rate decay rate
N_sample = 512      # Nsample Batch size (trajectories per update)
e_converge = 3e-4   # econverge Convergence threshold (L∞ norm of param change)

print("Huggett calibration (Table 2):")
print(f"  β={beta}, σ={sigma}, ρy={rho_y}, νy={nu_y}, ρz={rho_z}, νz={nu_z}")
print(f"  B={B}, b_min={b_min};  PE: r_bar={r_bar}, ρr={rho_r}, νr={nu_r}")
print(f"  Grids: nb={nb}, ny={ny}, J={J}, b_max={b_max}, nr={nr}, r∈[{r_min},{r_max}], nz={nz}")
print(f"  T_trunc={T_trunc}, c_min={c_min}")
print("  SPG (Table 3): Nepoch=%d, Nwarmup=%d, lr_ini=%s, lr_decay=%s, Nsample=%d, econverge=%s" % (N_epoch, N_warmup, lr_ini, lr_decay, N_sample, e_converge))

Huggett calibration (Table 2):
  β=0.96, σ=2.0, ρy=0.6, νy=0.2, ρz=0.9, νz=0.02
  B=0.0, b_min=-1.0;  PE: r_bar=0.038, ρr=0.8, νr=0.02
  Grids: nb=200, ny=3, J=600, b_max=50.0, nr=20, r∈[0.01,0.06], nz=30
  T_trunc=170, c_min=0.001
  SPG (Table 3): Nepoch=1000, Nwarmup=50, lr_ini=0.001, lr_decay=0.5, Nsample=512, econverge=0.0003


## Grids and utility

- **Bond grid:** equispaced or log-spaced on $[\underline{b},\, b_{\max}]$.
- **Income grids:** Tauchen-style discretization: $y$ level AR(1), $z$ log AR(1).

In [3]:
# ---------- 债券网格 / Bond grid ----------
# 个体状态 b 在 [b_min, b_max] 上均匀取 nb 个点 / nb points on [b_min, b_max] (Appendix: nb=200, bmax=50)
b_grid = np.linspace(b_min, b_max, nb)

# ---------- 效用函数（CRRA）/ Utility (CRRA, for FOC/Euler and SRL objective L(θ)) ----------
def u(c, sig=sigma):
    """u(c) = c^(1-σ)/(1-σ)；σ=1 时为 log(c)。/ CRRA; log(c) if σ=1."""
    c = np.maximum(c, c_min)
    if np.abs(sig - 1.0) < 1e-10:
        return np.log(c)
    return (c ** (1 - sig)) / (1 - sig)

def u_prime(c, sig=sigma):
    """边际效用 u'(c)=c^(-σ)，Euler 用。/ Marginal utility for Euler."""
    c = np.maximum(c, c_min)
    return c ** (-sig)

# y_grid、z_grid 与 Ty、Tz 在下一格用 Tauchen 构造 / built in next cell via Tauchen
print("b_grid:", b_grid[0], "...", b_grid[-1], "shape", b_grid.shape)

b_grid: -1.0 ... 50.0 shape (200,)


In [4]:
# ========== Tauchen (1986) 离散化 AR(1) 统一形式 / Discretize AR(1) (unified) ==========
# 过程 x_{t+1} = (1-ρ)*mean + ρ*x_t + σ*ε, ε~N(0,1) / process: (1-ρ)*mean + ρ*x + σ*ε
def tauchen_ar1(rho, sigma_innov, n_states, m=3, mean=0.0):
    """
    统一离散化 AR(1)。Unified Tauchen for AR(1).
    输入/Input: rho 持久性, sigma_innov 创新标准差, n_states 状态数, m 网格半宽（标准差倍数）, mean 长期均值
    mean=0（默认）→ 零均值过程，网格 [-m*std, m*std]，用于 log(z) 等；mean=μ → 水平过程，网格约 [μ-m*std, μ+m*std]，下界可裁成正值（如收入 y）。
    输出/Output: x_grid (n_states,), P (n_states,n_states) 转移矩阵
    """
    std = sigma_innov / np.sqrt(1 - rho**2)
    if mean == 0:
        x_min, x_max = -m * std, m * std
    else:
        x_min = max(1e-6, mean - m * std)
        x_max = mean + m * std
    x_grid = np.linspace(x_min, x_max, n_states)
    step = (x_max - x_min) / (n_states - 1) if n_states > 1 else 1.0
    mu_i = (1 - rho) * mean + rho * x_grid  # 各状态的条件均值 (n_states,)
    z_lo = (x_grid - mu_i[:, None] + step / 2) / sigma_innov
    z_hi = (x_grid - mu_i[:, None] - step / 2) / sigma_innov
    P = np.zeros((n_states, n_states))
    P[:, 0] = norm.cdf(z_lo[:, 0])
    P[:, -1] = 1 - norm.cdf(z_hi[:, -1])
    if n_states > 2:
        P[:, 1:-1] = norm.cdf(z_lo[:, 1:-1]) - norm.cdf(z_hi[:, 1:-1])
    P = P / P.sum(axis=1, keepdims=True)
    return x_grid, P

# ---------- 个体收入 y：y 直接服从 AR(1)，mean=1 再归一化 / Idiosyncratic y: y ~ AR(1) in levels ----------
y_grid, Ty = tauchen_ar1(rho_y, nu_y, ny, m=3, mean=1.0)
# 归一化使平稳分布下 E[y]=1 / normalize so E[y]=1 under stationary distribution
invariant_y = np.linalg.matrix_power(Ty.T, 200)[:, 0]
y_grid = y_grid / (y_grid @ invariant_y)

# ---------- 总量 z：log z ~ AR(1)，取 exp 并归一化 / Aggregate z: same ----------
log_z_grid, Tz = tauchen_ar1(rho_z, nu_z, nz)
z_grid = np.exp(log_z_grid)
invariant_z = np.linalg.matrix_power(Tz.T, 200)[:, 0]
z_grid = z_grid / (z_grid @ invariant_z)

print("y_grid (idiosyncratic, AR(1), ny=%d):" % ny, y_grid)
print("Ty (idiosyncratic transition):\n", Ty)
print("z_grid (aggregate, first 5 ... last 2):", z_grid[:5], "...", z_grid[-2:])

y_grid (idiosyncratic, AR(1), ny=3): [0.25 1.   1.75]
Ty (idiosyncratic transition):
 [[6.46169767e-01 3.53811697e-01 1.85367378e-05]
 [3.03963618e-02 9.39207276e-01 3.03963618e-02]
 [1.85367378e-05 3.53811697e-01 6.46169767e-01]]
z_grid (aggregate, first 5 ... last 2): [0.87048321 0.87878612 0.88716821 0.89563026 0.90417302] ... [1.13552944 1.14636043]


## Policy Functions

Policy functions for getting consumption strategy π(b, y, r, z). The actual training uses Monte Carlo simulation inside `spg_objective` (see SRL section below).

In [5]:
from scipy.interpolate import interp2d, LinearNDInterpolator
from scipy.optimize import brentq

# ========== 从网格参数 θ 得到政策 π(b,y,r,z)=(c,b') / Policy from grid θ ==========
# theta_grid 存消费 c；policy 只返回连续 (c, b')；Young 彩票法仅在转移/模拟时用 / policy returns raw (c,b'); lottery only in transition
def policy_from_grid(b, y, r, z, theta_grid, b_grid, y_grid, z_grid, r_grid, ny, c_min_val=1e-3):
    """
    输入/Input: 状态 (b,y), 价格 (r,z), 消费网格 theta_grid
    输出/Output: (c, b_next)；b_next 为预算约束下的连续值，不在此做彩票舍入 / raw (c, b'); no lottery here
    """
    # 若为 tensor 转 numpy 以便索引 / convert tensor to numpy for indexing
    if hasattr(theta_grid, 'detach'):
        theta_grid = theta_grid.detach().cpu().numpy()
    if hasattr(b_grid, 'detach'):
        b_grid = b_grid.detach().cpu().numpy()
    if hasattr(y_grid, 'detach'):
        y_grid = y_grid.detach().cpu().numpy()
    if hasattr(z_grid, 'detach'):
        z_grid = z_grid.detach().cpu().numpy()
    if hasattr(r_grid, 'detach'):
        r_grid = r_grid.detach().cpu().numpy()
    
    # 将 (b,y,z,r) 定位到网格索引；复合状态 j=ib*ny+iy / grid indices; composite j=ib*ny+iy
    ib = np.atleast_1d(np.searchsorted(b_grid, b, side='right') - 1)
    iy = np.atleast_1d(np.searchsorted(y_grid, y, side='right') - 1)
    iz = np.atleast_1d(np.searchsorted(z_grid, z, side='right') - 1)
    ir = np.atleast_1d(np.searchsorted(r_grid, r, side='right') - 1)
    nb_grid, ny_grid = len(b_grid), len(y_grid)
    nz_grid, nr_grid = len(z_grid), len(r_grid)
    ib = np.clip(ib, 0, nb_grid - 1)
    iy = np.clip(iy, 0, ny_grid - 1)
    iz = np.clip(iz, 0, nz_grid - 1)
    ir = np.clip(ir, 0, nr_grid - 1)
    b = np.atleast_1d(np.asarray(b, dtype=float))
    y = np.atleast_1d(np.asarray(y, dtype=float))
    r = np.atleast_1d(np.asarray(r, dtype=float))
    z = np.atleast_1d(np.asarray(z, dtype=float))
    
    # 向量化取 c = theta_grid[j,iz,ir] / vectorized c from grid
    j = ib * ny + iy
    c = np.maximum(theta_grid[j, iz, ir], c_min_val)
    
    # 预算约束 b'=(1+r)b+yz-c，裁剪到 [b_min,b_max] 并反推可行 c；保留连续 b'，不做彩票 / budget; clip; keep continuous b'
    c_total = (1 + r) * b + y * z
    b_next = c_total - c
    b_next_feasible = np.clip(b_next, b_min, b_max)
    c = np.maximum(c_total - b_next_feasible, c_min_val)
    # 不在此做 lottery；转移/模拟时再用 b_next_to_grid_lottery 或 build_A / lottery only in transition, see b_next_to_grid_lottery
    if c.size == 1:
        return c.ravel()[0], b_next_feasible.ravel()[0]
    return c, b_next_feasible

# 需要离散 b' 时（如逐个体模拟）再调用：用 Young 彩票 (λ→上格点, 1-λ→下格点) / lottery only when discrete b' needed
def b_next_to_grid_lottery(b_next_feasible, b_grid):
    """将连续 b' 按 Young 法随机舍入到 b_grid 相邻两点。lottery: U[0,1]<λ → 上格点，否则下格点。"""
    i_lo = np.clip(np.searchsorted(b_grid, b_next_feasible, side='right') - 1, 0, len(b_grid) - 2)
    gap = b_grid[i_lo + 1] - b_grid[i_lo]
    lam = np.clip((np.asarray(b_next_feasible) - b_grid[i_lo]) / (gap + 1e-12), 0.0, 1.0)
    lottery = np.random.rand(len(np.atleast_1d(b_next_feasible))) < lam
    return np.where(lottery, b_grid[i_lo + 1], b_grid[i_lo])

def draw_next_state(y_idx, z_idx, Ty, Tz):
    """按 Ty、Tz 抽取下一期 (y',z') 的网格索引 / draw next (y,z) indices from Ty, Tz."""
    y_next = np.random.choice(Ty.shape[1], p=Ty[y_idx, :])
    z_next = np.random.choice(Tz.shape[1], p=Tz[z_idx, :])
    return y_next, z_next

### Part A：给定政策下的分布模拟（简单利率情形）/ Simulation with given policy

下面一组函数用于**给定政策函数** $\\pi$（如训练后的 policy_from_grid）下的分布演化与市场出清：aggregate_saving, ge_price, update_G_direct, simulate_huggett。G 用矩阵 (nb,ny)，update_G_direct 用 Young 得 Q 再 G_new = Q @ Ty。不涉及求导，用于事后模拟、PE 对比等。

**Part B（SRL 训练）**在后方：政策由 θ 参数化，PyTorch 求梯度，update_G_pi_direct 用软权重、P_star_detach 做市场出清。两套实现用途不同，非重复。

In [6]:
# ---------- G 的矩阵形式：G_mat(ib, iy) = 状态 (b_grid[ib], y_grid[iy]) 上的质量；与 G_flat[j], j=ib*ny+iy 等价 ----------
def G_to_mat(G):
    """统一为 (nb, ny) 矩阵；若已是 (J,) 则 reshape。"""
    G = np.asarray(G)
    if G.ndim == 1:
        return G.reshape(nb, ny)
    return G
def G_to_flat(G):
    """转为 (J,) 向量，j = ib*ny+iy。"""
    return np.asarray(G).reshape(nb, ny).ravel()

# ---------- 总储蓄（用于市场出清）/ Aggregate saving ----------
# 总储蓄 = Σ_{ib,iy} G(ib,iy)*b'(ib,iy)；G、b' 均为 (nb,ny) 时直接逐元乘再求和 / G·b' as matrix
def aggregate_saving(G, r, z, policy_fn, b_grid, y_grid):
    G_mat = G_to_mat(G)
    b_flat = np.repeat(b_grid, ny)
    y_flat = np.tile(y_grid, nb)
    c, b_next = policy_fn(b_flat, y_flat, r, z)
    b_next_mat = np.asarray(b_next).reshape(nb, ny)
    return (G_mat * b_next_mat).sum()

# ---------- 均衡利率 P*(G,z)：总储蓄=B / Equilibrium price P*(G,z): aggregate saving = B ----------
def ge_price(G, z, policy_fn, b_grid, y_grid, r_lo=None, r_hi=None):
    # None 表示用当前全局 r_min/r_max（调用时取值，便于改校准后无需重跑本 cell）/ None → use global r_min,r_max at call time
    r_lo = r_min if r_lo is None else r_lo
    r_hi = r_max if r_hi is None else r_hi
    def excess_saving(r):
        return aggregate_saving(G, r, z, policy_fn, b_grid, y_grid) - B
    if excess_saving(r_lo) * excess_saving(r_hi) > 0:
        return (r_lo + r_hi) / 2.0  # 两端同号取中点 / fallback if no crossing
    return brentq(excess_saving, r_lo, r_hi)

# ---------- G_t → G_{t+1}：Q(ib',iy) 再乘 Ty，输入输出均为 (nb,ny) 矩阵 / G update: G_new = Q @ Ty ----------
def update_G_direct(G, r, z, policy_fn, b_grid, y_grid, Ty):
    """G_{t+1} = A_π @ G_t。Q(ib',iy)=由 policy+Young 得到的中间分布，G_new = Q @ Ty。输入输出均为 (nb,ny)。"""
    G_flat = G_to_flat(G)
    b_flat = np.repeat(b_grid, ny)
    y_flat = np.tile(y_grid, nb)
    c, b_next_feasible = policy_fn(b_flat, y_flat, r, z)
    i_lo = np.clip(np.searchsorted(b_grid, b_next_feasible, side='right') - 1, 0, nb - 2)
    gap = b_grid[i_lo + 1] - b_grid[i_lo]
    lam = np.clip((b_next_feasible - b_grid[i_lo]) / (gap + 1e-12), 0.0, 1.0)
    iy_all = np.arange(J) % ny
    Q = np.zeros((nb, ny))
    np.add.at(Q, (i_lo, iy_all), (1 - lam) * G_flat)
    np.add.at(Q, (i_lo + 1, iy_all), lam * G_flat)
    G_new = Q @ Ty
    G_new = G_new / (G_new.sum() + 1e-20)
    return G_new

# ---------- （可选）转移矩阵 A_π，Young 法；需要显式 A 时用 / Optional: build A_π if needed ----------
# def build_A_numpy(r, z, policy_fn, b_grid, y_grid, Ty):
#     b_flat = np.repeat(b_grid, ny)
#     y_flat = np.tile(y_grid, nb)
#     c, b_next_feasible = policy_fn(b_flat, y_flat, r, z)
#     i_lo = np.clip(np.searchsorted(b_grid, b_next_feasible, side='right') - 1, 0, nb - 2)
#     gap = b_grid[i_lo + 1] - b_grid[i_lo]
#     lam = np.clip((b_next_feasible - b_grid[i_lo]) / (gap + 1e-12), 0.0, 1.0)
#     iy_all = np.arange(J) % ny
#     rows_lo = (i_lo[:, None] * ny + np.arange(ny)[None, :]).ravel()
#     rows_hi = ((i_lo + 1)[:, None] * ny + np.arange(ny)[None, :]).ravel()
#     cols = np.repeat(np.arange(J), ny)
#     vals_lo = ((1 - lam)[:, None] * Ty[iy_all, :]).ravel()
#     vals_hi = (lam[:, None] * Ty[iy_all, :]).ravel()
#     A = np.zeros((J, J))
#     np.add.at(A, (rows_lo, cols), vals_lo)
#     np.add.at(A, (rows_hi, cols), vals_hi)
#     return A

In [7]:
# ========== 分布层面模拟（不追踪单个体）/ Distribution simulation ==========
# G、c_grid 均为矩阵 (nb,ny)；每期 G_{t+1}=Q@Ty / G and c_grid as (nb,ny)
def simulate_huggett(
    T,
    policy_fn,
    b_grid, y_grid, z_grid, Ty, Tz,
    G0=None,
    z0_idx=None,
):
    """paths['G'] 与 paths['c_grid'] 形状为 (T+1, nb, ny)；v_hat = Σ_t β^t sum(G_t * u(c_t))。"""
    if G0 is None:
        G0 = np.ones((nb, ny)) / J
    G = G_to_mat(G0)
    if z0_idx is None:
        z0_idx = np.random.randint(0, len(z_grid))
    z_idx = z0_idx

    paths = {
        'G': np.zeros((T + 1, nb, ny)),
        'z_idx': np.zeros(T + 1, dtype=int),
        'z': np.zeros(T + 1),
        'r': np.zeros(T + 1),
        'c_grid': np.zeros((T + 1, nb, ny)),
    }
    paths['G'][0] = G
    paths['z_idx'][0] = z_idx
    paths['z'][0] = z_grid[z_idx]
    paths['r'][0] = np.nan
    paths['c_grid'][0] = np.nan

    b_flat = np.repeat(b_grid, ny)
    y_flat = np.tile(y_grid, nb)
    v_hat = 0.0
    for t in range(T):
        z_val = z_grid[z_idx]
        r_t = ge_price(G, z_val, policy_fn, b_grid, y_grid)  # 市场出清 / market clearing
        paths['r'][t] = r_t
        c_flat = policy_fn(b_flat, y_flat, r_t, z_val)[0]
        c_mat = np.asarray(c_flat).reshape(nb, ny)
        paths['c_grid'][t] = c_mat
        v_hat += (beta ** t) * (G * u(c_mat)).sum()
        G = update_G_direct(G, r_t, z_val, policy_fn, b_grid, y_grid, Ty)
        paths['G'][t + 1] = G
        z_idx = np.random.choice(len(z_grid), p=Tz[z_idx, :])
        paths['z_idx'][t + 1] = z_idx
        paths['z'][t + 1] = z_grid[z_idx]
    paths['r'][T] = paths['r'][T - 1]
    paths['c_grid'][T] = np.asarray(policy_fn(b_flat, y_flat, paths['r'][T], paths['z'][T])[0]).reshape(nb, ny)

    return paths, v_hat

In [8]:
# 说明：SRL 训练时，蒙特卡洛模拟在 spg_objective() 内部完成；
# 上面的 simulate_huggett() 用于参考或事后检验（给定政策后做分布模拟）。

In [9]:
# 可选：模拟结果可视化（G_t、r_t、c(b) 等）/ Optional: plot G_t, r_t, c(b) after training
# 训练完成后用 policy_trained 做 simulate_huggett 再绘图 / run simulate_huggett with policy_trained then plot

## SRL / SPG: Gradient-stop on macro, gradient descent on policy (SRL Section 3.2–3.3)

**Objective:** Maximize expected lifetime utility

$$L(\theta) = d_0^T \hat{v}_\pi$$

where $\hat{v}_\pi$ is the sample average over $N$ simulated trajectories (Monte Carlo).

**Macro (no gradient):** The price $p_t = P^*(g_t, z_t)$ is given by market clearing. We apply **stop-gradient** to $p_t$, so $\partial p_t / \partial \theta = 0$ — agents take prices as given.

**Micro (differentiate):** The policy $\pi(\cdot; \theta)$ and the transition matrix $A_\pi(z, p)$ depend on $\theta$; we backpropagate through them and through $u(c)$.

**Update:** Stochastic gradient ascent

$$\theta_{k+1} = \theta_k + \eta_k \nabla_\theta L(\theta_k)$$

In [10]:
# ========== Part B：SRL/SPG 训练（θ 可微；G 用向量 (J,)，软权重、P* detach）/ Part B: SRL training ==========
import torch

# 设备优先：CUDA (NVIDIA) → MPS (Mac Apple Silicon GPU) → CPU / device: CUDA → MPS (Mac) → CPU
if torch.cuda.is_available():
    device = torch.device('cuda')
elif getattr(torch.backends, 'mps', None) is not None and torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device('mps')   # Mac Apple Silicon GPU (Metal)
else:
    device = torch.device('cpu')
print('Using device:', device)
# 若 MPS 报错可改为 device = torch.device('cpu')；Intel Mac 可设 torch.set_num_threads(8) 加速 CPU
dtype = torch.float32

# --- SPG 用较粗网格加速（附录 Table 3 用 nb=200）/ Coarser grid for speed ---
nb_spg = 50
nr_spg = 10
nz_spg = 10
J = nb_spg * ny   # 复合状态数 (b_idx,y_idx)→j / composite state index j = b_idx*ny + y_idx

# --- 构造 SPG 网格（可从主网格抽样）/ Build SPG grids (subsample from main) ---
b_grid_spg = torch.tensor(np.linspace(b_min, b_max, nb_spg), dtype=dtype, device=device)
iz_spg = np.linspace(0, nz-1, nz_spg, dtype=int)
ir_spg = np.linspace(0, nr-1, nr_spg, dtype=int) if nr <= 20 else np.arange(nr_spg)
z_grid_t = torch.tensor(z_grid[iz_spg], dtype=dtype, device=device)
r_grid_t = torch.tensor(np.linspace(r_min, r_max, nr_spg), dtype=dtype, device=device)
y_grid_t = torch.tensor(y_grid, dtype=dtype, device=device)
Ty_t = torch.tensor(Ty, dtype=dtype, device=device)
# z 的转移矩阵在子网格上重归一化 / renormalize Tz on subgrid
Tz_sub = Tz[np.ix_(iz_spg, iz_spg)] if len(iz_spg) <= len(z_grid) else Tz
Tz_sub = Tz_sub / Tz_sub.sum(axis=1, keepdims=True)
Tz_t = torch.tensor(Tz_sub, dtype=dtype, device=device)
nz_spg = Tz_t.shape[0]

# --- 政策参数 θ：网格上消费 c = θ 的变换 / Policy param θ: consumption on grid ---
# softplus 保证 c>0 且可导 / softplus ensures c>0 and differentiable
def theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t, c_min_val=1e-3):
    """θ → c = softplus(θ)+c_min, shape (J,nz,nr)."""
    return torch.nn.functional.softplus(theta) + c_min_val

# 初值：常数储蓄率规则得 c，反解 θ / Init: constant saving rule → c, then solve for θ
def init_theta(b_grid_t, y_grid_t, z_grid_t, r_grid_t, save_frac=0.2, c_min_val=1e-3):
    nb_t, ny_t = len(b_grid_t), len(y_grid_t)
    J, nz_t, nr_t = nb_t * ny_t, len(z_grid_t), len(r_grid_t)
    b_flat = b_grid_t.repeat_interleave(ny_t)
    y_flat = y_grid_t.repeat(nb_t)
    cash = b_flat.view(J, 1, 1) * (1 + r_grid_t).view(1, 1, nr_t) + y_flat.view(J, 1, 1) * z_grid_t.view(1, nz_t, 1)
    c_grid = torch.clamp((1 - save_frac) * cash, min=c_min_val)
    theta_init = torch.log(torch.exp(c_grid - c_min_val) - 1 + 1e-8)
    return theta_init

In [11]:
# ---------- Part B 优化：G 用矩阵 (nb_spg,ny)，Q@Ty 一次矩阵乘（与 Part A 一致）/ G as (nb,ny), Q@Ty ----------
def _G_to_mat_spg(G, nb_spg, ny):
    """Part B：G 统一为 (nb_spg, ny)；若传入 (J,) 则 reshape。"""
    if G.dim() == 1:
        return G.view(nb_spg, ny)
    return G

def update_G_pi_direct(theta, G, iz, ir, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny, sigma_b=0.1):
    """G_new = (implied A_π)@G；G 为 (nb_spg,ny)，软权重得 Q 再 G_new = Q @ Ty，无 J 循环。"""
    J = nb_spg * ny
    G = _G_to_mat_spg(G, nb_spg, ny)
    z_val = z_grid_t[iz]
    r_val = r_grid_t[ir]
    c = theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t)
    c_val = c[:, iz, ir]
    b_next = (1 + r_val) * b_grid_t.repeat_interleave(ny) + y_grid_t.repeat(nb_spg) * z_val - c_val
    b_next = torch.clamp(b_next, b_min, b_max)
    dist = b_next.unsqueeze(1) - b_grid_t.unsqueeze(0)
    w_b = torch.exp(-dist.pow(2) / (2 * sigma_b**2))
    w_b = w_b / (w_b.sum(dim=1, keepdim=True) + 1e-8)
    # Q(ib',iy) = sum_ib G(ib,iy)*w_b(ib*ny+iy, ib')；向量化 (nb,ny,nb) * G → sum_ib → (nb,ny)，再 G_new = Q @ Ty
    M = w_b.view(nb_spg, ny, nb_spg).permute(2, 0, 1)
    Q = (M * G.unsqueeze(0)).sum(dim=1)
    G_new = Q @ Ty_t
    G_new = G_new / (G_new.sum() + 1e-20)
    return G_new

def build_A_pi(theta, iz, ir, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny, sigma_b=0.1):
    """（可选）显式构造 A_π；训练中已用 update_G_pi_direct(Q@Ty)，可不调此函数。"""
    J = nb_spg * ny
    z_val = z_grid_t[iz]
    r_val = r_grid_t[ir]
    c = theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t)
    c_val = c[:, iz, ir]
    b_next = (1 + r_val) * b_grid_t.repeat_interleave(ny) + y_grid_t.repeat(nb_spg) * z_val - c_val
    b_next = torch.clamp(b_next, b_min, b_max)
    dist = b_next.unsqueeze(1) - b_grid_t.unsqueeze(0)
    w_b = torch.exp(-dist.pow(2) / (2 * sigma_b**2))
    w_b = w_b / (w_b.sum(dim=1, keepdim=True) + 1e-8)
    iy_all = torch.arange(J, dtype=torch.long, device=device) % ny
    A = (w_b.unsqueeze(2) * Ty_t[iy_all, :].unsqueeze(1)).reshape(J, J).T
    return A

def aggregate_saving_grid(theta, G, iz, ir, b_grid_t, y_grid_t, z_grid_t, r_grid_t, ny):
    """总储蓄 = sum(G * b')；G 为 (nb_spg,ny)，b_next reshape 成 (nb_spg,ny) 后逐元乘再求和。"""
    nb_b = len(b_grid_t)
    J = nb_b * ny
    G_mat = _G_to_mat_spg(G, nb_b, ny)
    z_val = z_grid_t[iz]
    r_val = r_grid_t[ir]
    c = theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t)[:, iz, ir]
    b_next = (1 + r_val) * b_grid_t.repeat_interleave(ny) + y_grid_t.repeat(nb_b) * z_val - c
    b_next = torch.clamp(b_next, b_min, b_max).view(nb_b, ny)
    return (G_mat * b_next).sum()

# ---------- P*(G,z)：市场出清利率，对 θ 做 detach / Market clearing r; output detached ----------
# 向量化：一次前向算所有 ir 的 S(ir)，再取最接近 B 的格点，避免二分法多次 .item() 与 Python 调用
def P_star_detach(theta, G, iz, b_grid_t, y_grid_t, z_grid_t, r_grid_t, ny, B=0.0, beta_t=0.96, sigma_t=2.0):
    """求 r 使总储蓄≈B。一次计算所有 ir 的 S(ir)，选 |S-B| 最小的 ir。返回 r.detach()。"""
    nr = len(r_grid_t)
    if nr == 1:
        return r_grid_t[0].detach()
    nb_b = len(b_grid_t)
    G_mat = _G_to_mat_spg(G, nb_b, ny)
    z_val = z_grid_t[iz]
    c_all = theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t)[:, iz, :]
    b_flat = b_grid_t.repeat_interleave(ny)
    y_flat = y_grid_t.repeat(nb_b)
    resources = b_flat.unsqueeze(1) * (1 + r_grid_t).unsqueeze(0) + (y_flat * z_val).unsqueeze(1)
    b_next_all = (resources - c_all).clamp(b_min, b_max)
    S_all = (G_mat.unsqueeze(1) * b_next_all.view(nb_b, ny, nr)).sum(dim=(0, 1))
    best_ir = (S_all - B).abs().argmin().item()
    return r_grid_t[best_ir].detach()

def P_star_detach_bisection(theta, G, iz, b_grid_t, y_grid_t, z_grid_t, r_grid_t, ny, B=0.0, beta_t=0.96, sigma_t=2.0):
    """（备用）二分法求 r；nr 很大且只需少量迭代时可能更省显存。"""
    nr = len(r_grid_t)
    if nr == 1:
        return r_grid_t[0].detach()
    def S_at(ir):
        return aggregate_saving_grid(theta, G, iz, ir, b_grid_t, y_grid_t, z_grid_t, r_grid_t, ny).item()
    s_lo, s_hi = S_at(0), S_at(nr - 1)
    if s_lo >= B:
        return r_grid_t[0].detach()
    if s_hi <= B:
        return r_grid_t[nr - 1].detach()
    ir_lo, ir_hi = 0, nr - 1
    while ir_hi - ir_lo > 1:
        ir_mid = (ir_lo + ir_hi) // 2
        if S_at(ir_mid) < B:
            ir_lo = ir_mid
        else:
            ir_hi = ir_mid
    best_ir = ir_lo if abs(S_at(ir_lo) - B) <= abs(S_at(ir_hi) - B) else ir_hi
    return r_grid_t[best_ir].detach()

In [12]:
# 将标量 r 映射到 r 网格索引（政策查表用）/ Map scalar r to grid index for policy lookup
def r_to_ir(r_val, r_grid_t):
    r = r_val if torch.is_tensor(r_val) else torch.tensor(r_val, device=r_grid_t.device, dtype=r_grid_t.dtype)
    ir = torch.searchsorted(r_grid_t, r.unsqueeze(0)).squeeze(0).clamp(0, len(r_grid_t) - 1)
    return ir.item()

# 效用函数向量版 (J,)，用于 L(θ)=Σ_t β^t G_t'u(c_t) / Utility vector for objective
def u_torch(c_vec, sig=sigma):
    c_vec = torch.clamp(c_vec, min=c_min)
    if abs(sig - 1.0) < 1e-8:
        return torch.log(c_vec)
    return (c_vec ** (1 - sig)) / (1 - sig)

In [13]:
# Steady-state G0：用 update_G_pi_direct 迭代，不建 A；可提前收敛即停
def steady_state_G0(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny, nz_spg, nr_spg, n_iter=150, tol=1e-6):
    J = nb_spg * ny
    iz_mid = nz_spg // 2
    ir_mid = nr_spg // 2 if nr_spg > 0 else 0
    with torch.no_grad():
        G = torch.ones(nb_spg, ny, device=device, dtype=dtype) / J
        for _ in range(n_iter):
            G_new = update_G_pi_direct(theta, G, iz_mid, ir_mid, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny)
            if (G_new - G).abs().max() < tol:
                return G_new
            G = G_new
    return G

# ========== SPG objective L(θ); two phases: warm_up=True => G fixed at G0 (no G update) ==========
def spg_objective(theta, N_traj, T_horizon, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, Tz_t,
                  nb_spg, ny, nz_spg, nr_spg, beta_t, G0=None, warm_up=False):
    """
    warm_up=True: G stays at G0 for all t (no G = A_pi @ G). Phase 1: move policy toward reasonable neighborhood.
    warm_up=False: G evolves as G_{t+1} = A_pi @ G_t. Phase 2: full equilibrium simulation.
    """
    J = nb_spg * ny
    if G0 is None:
        G0 = torch.ones(nb_spg, ny, device=device, dtype=dtype) / J
    else:
        G0 = _G_to_mat_spg(G0, nb_spg, ny)
    L_list = []
    for n in range(N_traj):
        iz = np.random.randint(0, nz_spg)
        G = G0.clone()
        L_n = torch.tensor(0.0, device=device, dtype=dtype)
        for t in range(T_horizon):
            r_t = P_star_detach(theta, G, iz, b_grid_t, y_grid_t, z_grid_t, r_grid_t, ny)
            ir = r_to_ir(r_t, r_grid_t)
            c = theta_to_consumption_grid(theta, b_grid_t, y_grid_t, z_grid_t, r_grid_t)
            c_t = c[:, iz, ir]
            L_n = L_n + (beta_t ** t) * (G * u_torch(c_t.view(nb_spg, ny))).sum()
            if not warm_up:
                G = update_G_pi_direct(theta, G, iz, ir, b_grid_t, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny)
            iz = torch.multinomial(Tz_t[iz, :], 1).squeeze().item()
        L_list.append(L_n)
    return torch.stack(L_list).mean()

## Using Trained Policy

After training, convert theta to consumption grid and use `policy_from_grid` to get consumption strategy π(b, y, r, z) from the trained grid.

In [14]:
# ---------- 使用训练好的政策：θ→消费网格→policy_from_grid / Use trained policy ----------
# 需先运行下方「Initialize θ and run SPG」格得到 theta / run SPG training cell below first to get theta
try:
    theta_trained = theta.detach()
    c_grid_trained = theta_to_consumption_grid(theta_trained, b_grid_spg, y_grid_t, z_grid_t, r_grid_t)
    c_grid_np = c_grid_trained.cpu().numpy()
    def policy_trained(b, y, r, z):
        return policy_from_grid(b, y, r, z, c_grid_np, b_grid_spg.cpu().numpy(),
                              y_grid_t.cpu().numpy(), z_grid_t.cpu().numpy(), r_grid_t.cpu().numpy(), ny)
    print("Testing trained policy:")
    b_test = np.array([0.5, 1.0, 2.0])
    y_test = y_grid[1]
    r_test = (r_min + r_max) / 2.0
    z_test = z_grid[len(z_grid)//2]
    c_test, b_next_test = policy_trained(b_test, y_test, r_test, z_test)
    print(f"  b = {b_test}, y = {y_test:.3f}, r = {r_test:.3f}, z = {z_test:.3f}")
    print(f"  c = {c_test}, b_next = {b_next_test}")
except NameError:
    print('Run the SPG training cell below first to define theta.')

Run the SPG training cell below first to define theta.


In [None]:
# ========== Init θ and run SPG: Phase 1 warm-up (G fixed at steady-state G0), Phase 2 (G evolves) ==========
nr_spg = len(r_grid_t)
nz_spg = len(z_grid_t)
theta = init_theta(b_grid_spg, y_grid_t, z_grid_t, r_grid_t)
theta = theta.requires_grad_(True)
optimizer = torch.optim.Adam([theta], lr=lr_ini)

# G0 = steady-state distribution under initial θ (fixed (z,r) at midpoint). Used only in warm-up.
G0_steady = steady_state_G0(theta, b_grid_spg, y_grid_t, z_grid_t, r_grid_t, Ty_t, nb_spg, ny, nz_spg, nr_spg)

T_horizon = min(T_trunc, 50)
loss_hist = []

for epoch in range(N_epoch):
    warm_up = (epoch < N_warmup)
    t0 = max(epoch - N_warmup, 0) / max(N_epoch - N_warmup, 1)
    lr_t = lr_ini * (lr_decay ** t0)
    for g in optimizer.param_groups:
        g['lr'] = lr_t
    theta_old = theta.detach().clone()
    optimizer.zero_grad()
    G0_phase = G0_steady.detach() if warm_up else None  # Phase 1: fixed G0; Phase 2: uniform, G evolves
    L = spg_objective(theta, N_sample, T_horizon, b_grid_spg, y_grid_t, z_grid_t, r_grid_t, Ty_t, Tz_t,
                      nb_spg, ny, nz_spg, nr_spg, beta, G0=G0_phase, warm_up=warm_up)
    loss_hist.append(L.item())
    (-L).backward()
    optimizer.step()
    param_change = (theta.detach() - theta_old).abs().max().item()
    if param_change < e_converge:
        print(f"Converged at epoch {epoch+1}, param change {param_change:.2e} < {e_converge}")
        break
    if (epoch + 1) % 100 == 0 or (epoch + 1) <= 5 or (epoch + 1) == N_warmup:
        phase = "warm-up (G fixed)" if warm_up else "adaptive (G evolves)"
        print(f"Epoch {epoch+1}, L(θ) = {L.item():.6f}, lr = {lr_t:.2e}, |Δθ| = {param_change:.2e}, {phase}")

Epoch 1, L(θ) = -30.397976, lr = 1.00e-03, |Δθ| = 1.00e-03, warm-up (G fixed)
Epoch 2, L(θ) = -30.316166, lr = 1.00e-03, |Δθ| = 1.00e-03, warm-up (G fixed)
Epoch 3, L(θ) = -30.328472, lr = 1.00e-03, |Δθ| = 1.00e-03, warm-up (G fixed)
Epoch 4, L(θ) = -30.246532, lr = 1.00e-03, |Δθ| = 1.00e-03, warm-up (G fixed)
Epoch 5, L(θ) = -30.271004, lr = 1.00e-03, |Δθ| = 1.00e-03, warm-up (G fixed)
Epoch 50, L(θ) = -29.251669, lr = 1.00e-03, |Δθ| = 9.94e-04, warm-up (G fixed)
Epoch 100, L(θ) = -50.081833, lr = 9.65e-04, |Δθ| = 1.65e-03, adaptive (G evolves)
Epoch 200, L(θ) = -48.248566, lr = 8.97e-04, |Δθ| = 1.12e-03, adaptive (G evolves)
Epoch 300, L(θ) = -47.592495, lr = 8.34e-04, |Δθ| = 1.06e-03, adaptive (G evolves)
Epoch 400, L(θ) = -45.636826, lr = 7.75e-04, |Δθ| = 9.78e-04, adaptive (G evolves)


## Summary

- **Model:** Huggett (1993) with aggregate risk: bond market clearing $r_t = P^*(G_t, z_t)$, policy $\pi(b,y,r,z) \to (c, b')$.
- **State dynamics:** $y$, $z$ follow transition matrices $T_y$, $T_z$ (Tauchen); draws use `np.random.choice(..., p=row)`.
- **SRL/SPG:** Maximize $L(\theta) = \mathbb{E}[\sum_t \beta^t d_t^\top u(c_t)]$ with stop-gradient on $r$; distribution $G$ updated via $A_\pi(r,z)$ (Young lottery). Trained policy can be used in `policy_from_grid` for simulation.

## Partial Equilibrium (PE) Solution for Comparison / 局部均衡解（便于与 GE 对比）

In **partial equilibrium** we do **not** use the distribution $G$ or market clearing. The interest rate $r_t$ follows an **exogenous** Markov process:

$$r_{t+1} = (1-\rho_r)\,\bar{r} + \rho_r\, r_t + \nu_r\,\sqrt{\max\{r_t,\,0\}}\,\varepsilon_{r,t}, \qquad \varepsilon_{r,t} \sim N(0,1).$$

We solve the agent's problem by **VFI** on the state $(b, y, r)$ (with $z$ fixed, e.g. $z=1$, so income is $y\cdot z$). This yields a policy $c_{PE}(b,y,r)$ that can be compared to the **GE (SRL)** policy $c(b,y,r,z)$.

In [None]:
# ========== PE: 外生 r 过程 + VFI 求 c_PE(b,y,r) / Exogenous r process + VFI for c_PE ==========
# r_{t+1} = (1-ρr)*r_bar + ρr*r_t + νr*sqrt(max(r_t,0))*ε, ε~N(0,1)
# 在 PE 中固定 z=1，故收入 = y·z = y / In PE fix z=1 so income = y

# 利率网格（与主模型一致或专用）/ r grid for PE
r_grid_pe = np.linspace(r_min, r_max, nr)
# 为每个 r 构造下一期 r' 的转移概率（离散到 r_grid_pe 上）
# Transition: r'|r ~ N((1-ρr)*r_bar+ρr*r, νr^2*max(r,0)); discretize to grid
def build_Tr_pe(r_grid, r_bar_pe, rho_r_pe, nu_r_pe):
    """Build transition Tr[i,j] = P(r' in j | r = r_grid[i]). r' = (1-ρ)r_bar + ρr + ν*sqrt(max(r,0))*ε."""
    n = len(r_grid)
    Tr = np.zeros((n, n))
    step = (r_grid[-1] - r_grid[0]) / (n - 1) if n > 1 else 1.0
    for i in range(n):
        r_i = max(r_grid[i], 1e-8)
        mu = (1 - rho_r_pe) * r_bar_pe + rho_r_pe * r_i
        sig = nu_r_pe * np.sqrt(r_i)
        if sig < 1e-12:
            j_nearest = np.argmin(np.abs(r_grid - mu))
            Tr[i, j_nearest] = 1.0
            continue
        for j in range(n):
            if j == 0:
                Tr[i, j] = norm.cdf((r_grid[j] - mu + step / 2) / sig)
            elif j == n - 1:
                Tr[i, j] = 1 - norm.cdf((r_grid[j] - mu - step / 2) / sig)
            else:
                Tr[i, j] = (norm.cdf((r_grid[j] - mu + step / 2) / sig) -
                            norm.cdf((r_grid[j] - mu - step / 2) / sig))
        Tr[i, :] = Tr[i, :] / Tr[i, :].sum()
    return Tr

Tr_pe = build_Tr_pe(r_grid_pe, r_bar, rho_r, nu_r)
print("PE: r transition Tr_pe shape", Tr_pe.shape, "row sums:", Tr_pe.sum(axis=1)[:3])

In [None]:
# ---------- VFI for PE: 状态 (b,y,r)，z=1 / State (b,y,r), z=1 ----------
# V(ib,iy,ir) = max_{b'} u(c) + β Σ_{y',r'} Ty[iy,iy']*Tr_pe[ir,ir']*V(ib',iy',ir')
# c = (1+r)*b + y - b',  b' 取网格点 / b' on grid
nb_pe, ny_pe, nr_pe = len(b_grid), len(y_grid), len(r_grid_pe)
V_pe = np.zeros((nb_pe, ny_pe, nr_pe))
# 初值 / initial
for it in range(2000):
    V_new = np.full((nb_pe, ny_pe, nr_pe), -np.inf)
    for ib in range(nb_pe):
        for iy in range(ny_pe):
            for ir in range(nr_pe):
                b_val = b_grid[ib]
                y_val = y_grid[iy]
                r_val = r_grid_pe[ir]
                cash = (1 + r_val) * b_val + y_val  # z=1
                # 可行 b' 范围 / feasible b'
                b_next_min = b_min
                b_next_max = min(cash - c_min, b_max)
                if b_next_max < b_next_min:
                    c_val = np.maximum(cash - b_min, c_min)
                    b_next_val = b_min
                    ib_next = 0
                    val = u(c_val) + beta * np.dot(Ty[iy, :], np.dot(Tr_pe[ir, :], V_pe[ib_next, :, :]))
                    V_new[ib, iy, ir] = val
                    continue
                best_val = -np.inf
                for ib_next in range(nb_pe):
                    b_next_val = b_grid[ib_next]
                    if b_next_val < b_next_min or b_next_val > b_next_max:
                        continue
                    c_val = cash - b_next_val
                    if c_val < c_min:
                        continue
                    val = u(c_val) + beta * np.dot(Ty[iy, :], np.dot(Tr_pe[ir, :], V_pe[ib_next, :, :]))
                    if val > best_val:
                        best_val = val
                V_new[ib, iy, ir] = best_val
    diff = np.abs(V_new - V_pe).max()
    V_pe = V_new.copy()
    if diff < 1e-8:
        print("PE VFI converged in", it + 1, "iterations, max diff =", diff)
        break
else:
    print("PE VFI did not converge after 2000 iterations")

In [None]:
# 从 V_pe 反推消费政策 c_PE(ib,iy,ir) / Recover consumption policy c_PE from V_pe
c_pe_grid = np.zeros((nb_pe, ny_pe, nr_pe))
for ib in range(nb_pe):
    for iy in range(ny_pe):
        for ir in range(nr_pe):
            b_val = b_grid[ib]
            y_val = y_grid[iy]
            r_val = r_grid_pe[ir]
            cash = (1 + r_val) * b_val + y_val
            b_next_min = b_min
            b_next_max = min(cash - c_min, b_max)
            best_val = -np.inf
            best_ib_next = 0
            for ib_next in range(nb_pe):
                b_next_val = b_grid[ib_next]
                if b_next_val < b_next_min or b_next_val > b_next_max:
                    continue
                c_val = cash - b_next_val
                if c_val < c_min:
                    continue
                val = u(c_val) + beta * np.dot(Ty[iy, :], np.dot(Tr_pe[ir, :], V_pe[ib_next, :, :]))
                if val > best_val:
                    best_val = val
                    best_ib_next = ib_next
            c_pe_grid[ib, iy, ir] = max(cash - b_grid[best_ib_next], c_min)

# 插值函数：任意 (b,y,r) 得到 c_PE / Interpolate to get c_PE(b, y, r)
from scipy.interpolate import RegularGridInterpolator
c_pe_interp = RegularGridInterpolator((b_grid, y_grid, r_grid_pe), c_pe_grid, method='linear', bounds_error=False, fill_value=None)

def c_pe_policy(b, y, r):
    """PE 消费政策（z 固定为 1）/ PE consumption policy (z=1)."""
    pts = np.atleast_2d(np.column_stack([np.ravel(b), np.ravel(y), np.ravel(r)]))
    out = c_pe_interp(pts)
    out = np.maximum(np.ravel(out), c_min)
    if np.isscalar(b):
        return float(out[0])
    return out.reshape(np.shape(b))

print("PE consumption grid shape:", c_pe_grid.shape)
print("Example: c_PE(b=1, y=y_grid[1], r=0.04) =", c_pe_policy(1.0, y_grid[1], 0.04))

### PE vs GE 对比图 / PE vs GE comparison

在同一组 (b, y, r) 下比较 PE 解 c_PE(b,y,r) 与 GE（SRL）解 c(b,y,r,z)；取 z=1 便于对齐。  
Compare c_PE(b,y,r) with GE (SRL) policy c(b,y,r,z) at z=1.

In [None]:
# 绘制 c_PE 与（若已训练）c_GE 沿 b 的曲线，固定 y, r, z=1 / Plot c_PE vs c_GE along b, fixed y, r, z=1
b_plot = np.linspace(b_min, min(10, b_max), 100)
iy_fix = 1
ir_fix = nr_pe // 2
y_fix = y_grid[iy_fix]
r_fix = r_grid_pe[ir_fix]
z_fix = 1.0

c_pe_curve = c_pe_policy(b_plot, y_fix, r_fix)

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
ax.plot(b_plot, c_pe_curve, 'b-', lw=2, label='PE (VFI, exogenous r)')

# 若已有训练好的 GE 政策，叠加 c_GE(b, y_fix, r_fix, z_fix) / If GE policy trained, add c_GE
try:
    c_ge_curve, _ = policy_trained(b_plot, y_fix, r_fix, z_fix)
    if np.any(np.isfinite(c_ge_curve)):
        ax.plot(b_plot, c_ge_curve, 'r--', lw=1.5, label='GE (SRL)')
except NameError:
    pass
ax.set_xlabel('Bond holdings $b$')
ax.set_ylabel('Consumption $c(b,y,r)$')
ax.set_title(f'PE vs GE: $y$={y_fix:.3f}, $r$={r_fix:.4f}, $z$={z_fix}')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()