# A/H 低市盈率季度策略（qs 框架）回测 + 指标 + 仓位建议

- A 股市盈率：来自 `data/data.sqlite` 的 `bak_daily_a.pe`
- H 股：当前 SQLite 中 `daily_h` 没有直接的 PE 字段；仅对 A/H 双重上市公司（`data/ah_codes.csv`）用 A 股 PE 换算出“隐含 H 股 PE”

隐含换算（避免未来函数：用 T-1 的 close 做信号）：

- `A_PE = A_price(CNY) / EPS(CNY)`
- `H_PE = H_price(HKD)*HKD→CNY / EPS(CNY)`
- 所以：`H_PE = A_PE * (H_price(HKD)*HKD→CNY / A_price(CNY))`

汇率：用 `fx_daily` 中的 `USDCNH.FXCM` 与 `USDHKD.FXCM` 交叉计算 `HKD→CNY = (USD/CNH)/(USD/HKD)`。

兼容说明：如果你当前环境的 `matplotlib` 与 `numpy` 版本不兼容，Notebook 会自动跳过绘图，但仍会输出曲线数据与指标。

In [None]:
# 环境与依赖
import _bootstrap  # noqa: F401 (adds src/ to sys.path)

from pathlib import Path
import datetime as dt
import math

import numpy as np

from qs.backtester.runner import load_calendar_bars_from_sqlite, run_backtest
from qs.sqlite_utils import connect_sqlite
from qs.strategy.low_pe_quarterly import LowPEQuarterlyStrategy


In [None]:
# 可选：绘图库（若导入失败则仅不画图）
try:
    import matplotlib
    import matplotlib.pyplot as plt

    matplotlib.rcParams["axes.unicode_minus"] = False
    # 如本机有中文字体可在此设置：
    # matplotlib.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial Unicode MS"]
except Exception as e:
    matplotlib = None
    plt = None
    print("matplotlib unavailable, skip plotting:", repr(e))


In [None]:
# 回测参数
DB_PATH = _bootstrap.RAW_DB_PATH  # robust to notebook cwd
START_DATE = "20170614"
END_DATE = None  # e.g. "20251231"

INIT_CASH = 1_000_000.0
A_K = 5
H_K = 5
REBALANCE_MONTH_INTERVAL = 3
USE_ADJUSTED = True

if not DB_PATH.exists():
    raise FileNotFoundError(f"Missing DB: {DB_PATH} (run fetch/sync scripts first)")

print("DB:", DB_PATH)


In [None]:
# 运行回测（qs 回测框架）
bars = load_calendar_bars_from_sqlite(db_path=DB_PATH, start_date=START_DATE, end_date=END_DATE)
print("calendar bars:", len(bars), "from", bars[0].trade_date, "to", bars[-1].trade_date)

PAIRS_CSV_PATH = _bootstrap.DATA_DIR / "ah_codes.csv"
print("pairs csv:", PAIRS_CSV_PATH, "exists=", PAIRS_CSV_PATH.exists())

strat = LowPEQuarterlyStrategy(
    db_path_raw=str(DB_PATH),
    pairs_csv_path=str(PAIRS_CSV_PATH),
    a_k=A_K,
    h_k=H_K,
    start_date=START_DATE,
    rebalance_month_interval=REBALANCE_MONTH_INTERVAL,
    use_adjusted=USE_ADJUSTED,
)

res = run_backtest(
    bars=bars,
    strategy=strat,
    initial_cash=INIT_CASH,
    symbol="",  # 多标的
    enable_trade_log=False,
    mark_error_policy="warn",
)

print(f"Final equity: {res.final_equity:,.2f}  Total return: {(res.final_equity/res.initial_cash-1):.2%}")
print(f"Max DD: {res.max_drawdown:.2%}  peak={res.dd_peak} trough={res.dd_trough}")
print("Risk:", dict(res.risk))


In [None]:
# 策略净值序列（list-based）
trade_dates = [p.trade_date for p in res.equity_curve]
equities = [float(p.equity) for p in res.equity_curve]
nav = [e / float(INIT_CASH) for e in equities]
dates = [dt.datetime.strptime(d, "%Y%m%d") for d in trade_dates]

print("points:", len(nav), "first:", trade_dates[0], "last:", trade_dates[-1])
print("nav last:", nav[-1])


In [None]:
# 从 SQLite 读取指数 close，并对齐到策略交易日（缺失前值填充）

def load_close_map(table: str, ts_code: str) -> dict[str, float]:
    con = connect_sqlite(DB_PATH, read_only=True)
    try:
        rows = con.execute(
            f"SELECT trade_date, close FROM {table} WHERE ts_code=? AND trade_date>=? ORDER BY trade_date",
            [ts_code, START_DATE],
        ).fetchall()
    finally:
        con.close()
    return {d: float(c) for d, c in rows if c is not None}


def align_close(trade_dates: list[str], close_map: dict[str, float]) -> list[float] | None:
    # forward-fill along trade_dates
    out: list[float] = []
    last = None
    for d in trade_dates:
        if d in close_map:
            last = close_map[d]
        out.append(last if last is not None else float('nan'))
    if not out or all(math.isnan(x) for x in out):
        return None
    first = next((x for x in out if not math.isnan(x)), None)
    if first is None:
        return None
    out = [first if math.isnan(x) else x for x in out]
    return out


hs300_map = load_close_map('index_daily', '000300.SH')
ixic_map = load_close_map('index_global', 'IXIC')

hs300_close = align_close(trade_dates, hs300_map)
ixic_close = align_close(trade_dates, ixic_map)

hs300_nav = [c / hs300_close[0] for c in hs300_close] if hs300_close else None
ixic_nav = [c / ixic_close[0] for c in ixic_close] if ixic_close else None

print('bench hs300:', 'ok' if hs300_nav is not None else 'missing')
print('bench ixic:', 'ok' if ixic_nav is not None else 'missing')


In [None]:
# 曲线对比绘图（若 matplotlib 可用）
if plt is None:
    print('skip plot (matplotlib unavailable)')
else:
    plt.figure(figsize=(12, 5))
    plt.plot(dates, nav, label='Strategy (CNY)')
    if hs300_nav is not None:
        plt.plot(dates, hs300_nav, label='HS300')
    if ixic_nav is not None:
        plt.plot(dates, ixic_nav, label='IXIC')
    plt.title('Normalized Performance (Start = 1.0)')
    plt.xlabel('Date')
    plt.ylabel('NAV')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# 指标计算：Sharpe、月胜率、年胜率（策略 + 基准）

def daily_returns(nav: list[float]) -> list[float]:
    out = []
    for i in range(1, len(nav)):
        if nav[i - 1] > 0:
            out.append(nav[i] / nav[i - 1] - 1.0)
    return out


def sharpe(nav: list[float], ann_factor: int = 252) -> float:
    r = daily_returns(nav)
    if len(r) < 2:
        return float('nan')
    mu = float(np.mean(r))
    sig = float(np.std(r, ddof=0))
    if sig <= 0:
        return 0.0
    ann_ret = (1 + mu) ** ann_factor - 1
    ann_vol = sig * math.sqrt(ann_factor)
    return float(ann_ret / ann_vol) if ann_vol > 0 else 0.0


def cagr(nav: list[float], ann_factor: int = 252) -> float:
    r = daily_returns(nav)
    if not r:
        return float('nan')
    return float((nav[-1] / nav[0]) ** (ann_factor / len(r)) - 1.0)


def period_returns(
    dates: list[dt.datetime], nav: list[float], key_fn
) -> list[float]:
    out: list[float] = []
    last_key = None
    start_nav = None
    for d, v in zip(dates, nav):
        k = key_fn(d)
        if last_key is None:
            last_key = k
            start_nav = v
            continue
        if k != last_key:
            if start_nav is not None and start_nav > 0:
                out.append(v / start_nav - 1.0)
            last_key = k
            start_nav = v
    return out


def win_rate(rs: list[float]) -> float:
    if not rs:
        return float('nan')
    return float(sum(1 for x in rs if x > 0) / len(rs))


print('Strategy Sharpe:', sharpe(nav))
print('Strategy CAGR  :', cagr(nav))
print('Strategy maxDD :', res.max_drawdown)

mrets = period_returns(dates, nav, lambda x: (x.year, x.month))
yrets = period_returns(dates, nav, lambda x: x.year)
print('Monthly win rate:', win_rate(mrets), 'n=', len(mrets))
print('Yearly  win rate:', win_rate(yrets), 'n=', len(yrets))


## 凯利公式：整体仓位建议

这里基于策略收益（支持按日/按月）估计凯利比例；本 notebook 默认按月计算，并给出 1/2 Kelly（保守）作为建议的“总仓位占比”。

In [None]:
# Kelly 计算（支持按日/按月）
KELLY_FREQ = "monthly"  # "daily" | "monthly"

if KELLY_FREQ == "daily":
    rets = daily_returns(nav)
    min_samples = 50
elif KELLY_FREQ == "monthly":
    rets = period_returns(dates, nav, lambda x: (x.year, x.month))
    min_samples = 24
else:
    raise ValueError(f"Unknown KELLY_FREQ: {KELLY_FREQ}")

print('Kelly freq:', KELLY_FREQ, 'n:', len(rets))


def kelly_discrete(rs: list[float]) -> float:
    wins = [x for x in rs if x > 0]
    losses = [x for x in rs if x < 0]
    if len(rs) < min_samples or len(losses) == 0:
        return float('nan')
    p = len(wins) / len(rs)
    avg_win = float(np.mean(wins)) if wins else 0.0
    avg_loss = float(-np.mean(losses)) if losses else 0.0
    if avg_win <= 0 or avg_loss <= 0:
        return float('nan')
    b = avg_win / avg_loss
    return float(p - (1 - p) / b)


k = kelly_discrete(rets)
print('Kelly:', k)
print('Half Kelly (suggest):', 0.5 * k if k == k else float('nan'))


## 当前（最新交易日）推荐组合与仓位

- 用最新交易日 `T` 执行
- 用 `T-1` 的数据计算 PE 并选股（避免未来函数）
- 输出目标权重（sum=1）


In [None]:
from pprint import pprint

last_trade_date = trade_dates[-1]
prev_trade_date = trade_dates[-2] if len(trade_dates) >= 2 else None
print('latest trade_date:', last_trade_date, 'prev:', prev_trade_date)

if prev_trade_date is None:
    raise RuntimeError('Not enough bars')

targets, recs = strat.compute_targets(signal_date=prev_trade_date, execute_date=last_trade_date)
print('n targets:', len(targets))
pprint(targets)
print('--- records ---')
for r in recs:
    if r.leg == 'A':
        print(f"A {r.symbol} pe={r.pe:.2f} name={r.stock_name}")
    else:
        print(
            f"H {r.symbol} pe={r.pe:.2f} name={r.stock_name} "
            f"(A={r.a_symbol} A_pe={r.a_pe:.2f} A_close={r.a_close_cny:.3f} "
            f"H_close={r.h_close_hkd:.3f} fx(HKD/CNY)={r.hk_to_cny:.4f})"
        )
