# Day1 · CME（资本市场预期）教学脚手架 · Part 3

## 主题：**Σ（协方差）估计 + 稳健化（Winsorize/收缩） + 导出接口**

目标：基于资产历史收益构建 `RiskModel(Sigma, assets, vol_annual, corr)`，
并提供 CSV/JSON 导出，供后续 MVO/BL 使用。


### 目录
1. 依赖与数据类（轻量复刻）
2. 工具函数：收益预处理（对齐、去极值Winsorize、年化）
3. Σ估计方法：`sample` / `shrink_diagonal` / `shrink_constcorr`
4. 导出接口：CSV/JSON
5. Smoke Test：用合成数据快速验证


In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd

AssetCode = str

@dataclass
class RiskModel:
    Sigma: np.ndarray                     # 协方差矩阵(资产×资产)
    assets: List[AssetCode]               # 顺序需与 Sigma 对齐
    vol_annual: Dict[AssetCode, float]    # 年化波动
    corr: Optional[np.ndarray] = None     # 相关性矩阵（可选）

@dataclass
class MarketAssumptions:
    mu: Dict[AssetCode, Optional[float]]
    mu_scn: Dict[str, Dict[AssetCode, Optional[float]]]
    notes: Dict[AssetCode, str]


In [None]:
def align_returns(df: pd.DataFrame, assets: List[AssetCode]) -> pd.DataFrame:
    """确保按资产顺序对齐列，丢弃全NA列与全NA行。输入应为收益率（非价格）。"""
    use_cols = [c for c in assets if c in df.columns]
    sub = df[use_cols].copy()
    sub = sub.dropna(how='all')
    sub = sub.dropna(axis=1, how='all')
    return sub[use_cols]

def winsorize(df: pd.DataFrame, p: float = 0.01) -> pd.DataFrame:
    """对每列按分位数进行Winsorize去极值，默认1%与99%。"""
    out = df.copy()
    for c in out.columns:
        s = out[c].dropna()
        if s.empty: continue
        lo, hi = s.quantile(p), s.quantile(1-p)
        out[c] = out[c].clip(lower=lo, upper=hi)
    return out

def annualize_cov(cov_m: np.ndarray, periods_per_year: int = 12) -> np.ndarray:
    return cov_m * periods_per_year

def corr_from_cov(cov_m: np.ndarray) -> np.ndarray:
    d = np.sqrt(np.diag(cov_m))
    with np.errstate(divide='ignore', invalid='ignore'):
        corr = cov_m / np.outer(d, d)
        corr[np.isnan(corr)] = 0.0
    return corr


In [None]:
def sample_cov(returns: pd.DataFrame) -> np.ndarray:
    return np.asarray(returns.cov())

def shrink_to_diagonal(S: np.ndarray, alpha: float = 0.2) -> np.ndarray:
    """朝对角矩阵收缩：T = diag(S)。Sigma = (1-alpha)S + alpha T"""
    T = np.diag(np.diag(S))
    return (1 - alpha) * S + alpha * T

def shrink_to_const_correlation(S: np.ndarray, alpha: float = 0.2) -> np.ndarray:
    """朝“常数相关”目标收缩：所有非对角相关系数等于均值rho_bar。
    目标T的构造：T_ii = S_ii； T_ij = rho_bar * sqrt(S_ii*S_jj)
    """
    std = np.sqrt(np.diag(S))
    with np.errstate(divide='ignore', invalid='ignore'):
        R = S / np.outer(std, std)
        np.fill_diagonal(R, np.nan)
        rho_bar = np.nanmean(R)
    # 构造目标T
    T = np.outer(std, std) * rho_bar
    np.fill_diagonal(T, np.diag(S))
    return (1 - alpha) * S + alpha * T

def estimate_covariance(
    returns: pd.DataFrame,
    method: str = 'sample',
    alpha: float = 0.2,
    winsorize_pct: Optional[float] = 0.01,
    periods_per_year: int = 12,
) -> np.ndarray:
    """统一接口：预处理（可选Winsorize）→ 协方差估计 → 年化。
    method ∈ {'sample','shrink_diagonal','shrink_constcorr'}
    """
    X = returns.copy()
    if winsorize_pct is not None and winsorize_pct > 0:
        X = winsorize(X, p=winsorize_pct)
    S = sample_cov(X)
    if method == 'sample':
        S_hat = S
    elif method == 'shrink_diagonal':
        S_hat = shrink_to_diagonal(S, alpha=alpha)
    elif method == 'shrink_constcorr':
        S_hat = shrink_to_const_correlation(S, alpha=alpha)
    else:
        raise ValueError("Unknown method")
    return annualize_cov(S_hat, periods_per_year=periods_per_year)


In [None]:
def build_risk_model(returns: pd.DataFrame, assets: List[AssetCode], method: str = 'shrink_constcorr', alpha: float = 0.2, winsorize_pct: float = 0.01, periods_per_year: int = 12) -> RiskModel:
    ret = align_returns(returns, assets)
    S_annual = estimate_covariance(ret, method=method, alpha=alpha, winsorize_pct=winsorize_pct, periods_per_year=periods_per_year)
    vols = np.sqrt(np.diag(S_annual))
    vol_annual = {a: float(v) for a, v in zip(assets, vols)}
    corr = corr_from_cov(S_annual)
    return RiskModel(Sigma=S_annual, assets=assets, vol_annual=vol_annual, corr=corr)

def export_sigma_to_csv(rm: RiskModel, path: str):
    df = pd.DataFrame(rm.Sigma, index=rm.assets, columns=rm.assets)
    df.to_csv(path, encoding='utf-8-sig')

def export_mu_to_csv(mu: Dict[AssetCode, Optional[float]], path: str):
    df = pd.DataFrame({'asset': list(mu.keys()), 'mu': list(mu.values())})
    df.to_csv(path, index=False, encoding='utf-8-sig')

def export_assumptions_json(assump: MarketAssumptions, path: str):
    payload = {
        'mu': assump.mu,
        'mu_scn': assump.mu_scn,
        'notes': assump.notes,
    }
    import json
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)


## Smoke Test（合成数据）
生成 8 个资产的月度收益（120期），用三种方法估计 Σ，并导出 CSV/JSON。

In [None]:
np.random.seed(42)
assets = ["cash","short_bond","credit","equity_div","equity_growth","gold","reits","commodity"]
n, T = len(assets), 120

# 构造一个“常数相关”协方差作为生成器
base_vol = np.array([0.01,0.03,0.05,0.15,0.18,0.12,0.10,0.14]) / np.sqrt(12)  # 月度波动
rho = 0.2
R = np.full((n,n), rho)
np.fill_diagonal(R, 1.0)
Sigma_gen = np.outer(base_vol, base_vol) * R

# 采样月度收益
L = np.linalg.cholesky(Sigma_gen)
Z = np.random.normal(size=(T, n))
ret = Z @ L.T
returns = pd.DataFrame(ret, columns=assets)

# 三种估计
rm_sample = build_risk_model(returns, assets, method='sample')
rm_diag = build_risk_model(returns, assets, method='shrink_diagonal', alpha=0.3)
rm_cc = build_risk_model(returns, assets, method='shrink_constcorr', alpha=0.3)

# 导出示例
out_dir = '/mnt/data'
export_sigma_to_csv(rm_cc, f"{out_dir}/Sigma_constcorr.csv")
export_sigma_to_csv(rm_sample, f"{out_dir}/Sigma_sample.csv")
pd.DataFrame(rm_cc.corr, index=assets, columns=assets).to_csv(f"{out_dir}/Corr_constcorr.csv", encoding='utf-8-sig')

# 构造一个最小的 mu 假设并导出
mu_stub = {a: None for a in assets}
export_mu_to_csv(mu_stub, f"{out_dir}/mu_stub.csv")

# 构造 MarketAssumptions 占位并导出JSON
assump_stub = MarketAssumptions(mu=mu_stub, mu_scn={'base': mu_stub}, notes={a: '' for a in assets})
export_assumptions_json(assump_stub, f"{out_dir}/assumptions_stub.json")

rm_cc.vol_annual

### 下一步（Part 4 预告）
- μ/Σ 可视化（雷达图/热力图/表格），
- 一键导出供 MVO 调用的 `mu.csv` 与 `Sigma.csv`，
- 与 MVO 笔记本的接口对接（读取→优化→输出权重）。