In [1]:
# download_etf_sina.py
# -*- coding: utf-8 -*-

import re
import time
from pathlib import Path
from typing import Iterable, List, Optional, Union, Dict

import pandas as pd
import akshare as ak


# -----------------------------
# 1) 工具：自动规范化 ETF 代码
# -----------------------------
def normalize_etf_symbol(symbol: str) -> str:
    """
    将用户输入规范化为 ak.fund_etf_hist_sina 可用的格式：shXXXXXX / szXXXXXX

    支持：
    - "sh516780", "sz159941"
    - "516780", "159941"（自动判断）
    - "516780.SH", "159941.SZ"
    - "SH516780", "SZ159941"
    """
    if symbol is None:
        raise ValueError("symbol 不能为空")

    s = str(symbol).strip()
    s = s.replace(" ", "").lower()

    # 处理类似 516780.sh / 159941.sz / 516780.SH
    m = re.match(r"^(\d{6})\.(sh|sz)$", s)
    if m:
        code, ex = m.group(1), m.group(2)
        return f"{ex}{code}"

    # 已经是 shxxxxxx / szxxxxxx
    m = re.match(r"^(sh|sz)(\d{6})$", s)
    if m:
        ex, code = m.group(1), m.group(2)
        return f"{ex}{code}"

    # 纯 6 位数字：根据常见规则自动判断
    m = re.match(r"^(\d{6})$", s)
    if m:
        code = m.group(1)

        # 常见经验规则（ETF 主要集中在这些段）：
        # - 上交所 ETF：510/511/512/513/515/516/517/518/588 等
        # - 深交所 ETF：159/160/161 等
        # 注意：规则并非覆盖所有，但对大多数 ETF 足够实用。
        if code.startswith(("50", "51", "52", "56", "58")):
            return f"sh{code}"
        if code.startswith(("15", "16")):
            return f"sz{code}"

        # 兜底：更偏向 ETF 常见分布（上交所多一些），不放心你可手动加前缀
        return f"sh{code}"

    raise ValueError(f"无法识别的 symbol 格式：{symbol}")


# ---------------------------------------
# 2) 工具：在不确定列名的情况下做字段映射
# ---------------------------------------
def _pick_col(df: pd.DataFrame, candidates: List[str], contains_any: List[str]) -> Optional[str]:
    """
    优先按 candidates 精确匹配列名；否则按 contains_any 做包含匹配
    """
    for c in candidates:
        if c in df.columns:
            return c

    for col in df.columns:
        col_str = str(col)
        if any(k in col_str for k in contains_any):
            return col
    return None


# ---------------------------------------
# 3) 单标的下载：返回标准 OHLCV DataFrame
# ---------------------------------------
def load_etf_hist_sina_ohlcv(
    symbol: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
) -> pd.DataFrame:
    """
    下载 ETF 历史行情（Sina），并标准化为 OHLCV：
    index = datetime
    columns = ["Open", "High", "Low", "Close", "Volume"]

    start_date/end_date: "YYYY-MM-DD" 或 "YYYYMMDD" 都可（pandas 会解析）
    """
    sym = normalize_etf_symbol(symbol)

    raw = ak.fund_etf_hist_sina(symbol=sym)
    if raw is None or raw.empty:
        return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"], dtype="float64")

    df = raw.copy()

    # ---- 日期列 ----
    date_col = _pick_col(
        df,
        candidates=["date", "日期", "交易日期", "时间", "净值日期"],
        contains_any=["日期", "时间", "date"],
    )
    if date_col is None:
        raise ValueError(f"[{sym}] 找不到日期列，现有列：{list(df.columns)}")

    # ---- OHLCV 列 ----
    open_col = _pick_col(
        df,
        candidates=["open", "开盘", "开盘价", "开盘价格"],
        contains_any=["开盘", "open"],
    )
    high_col = _pick_col(
        df,
        candidates=["high", "最高", "最高价", "最高价格"],
        contains_any=["最高", "high"],
    )
    low_col = _pick_col(
        df,
        candidates=["low", "最低", "最低价", "最低价格"],
        contains_any=["最低", "low"],
    )
    close_col = _pick_col(
        df,
        candidates=["close", "收盘", "收盘价", "收盘价格", "最新价", "单位净值", "净值"],
        contains_any=["收盘", "close", "最新", "净值"],
    )

    # 成交量：不同源可能叫 成交量/成交(手)/成交量(手)/volume 等
    vol_col = _pick_col(
        df,
        candidates=["volume", "成交量", "成交量(手)", "成交(手)"],
        contains_any=["成交量", "volume", "成交"],
    )

    missing = [name for name, col in {
        "Open": open_col,
        "High": high_col,
        "Low": low_col,
        "Close": close_col,
        "Volume": vol_col,
    }.items() if col is None]

    if missing:
        raise ValueError(f"[{sym}] 缺少字段 {missing}，现有列：{list(df.columns)}")

    # ---- 清洗：日期 & 数值 ----
    df[date_col] = pd.to_datetime(df[date_col])

    def to_num(series: pd.Series) -> pd.Series:
        return pd.to_numeric(series.astype(str).str.replace(",", ""), errors="coerce")

    df[open_col] = to_num(df[open_col])
    df[high_col] = to_num(df[high_col])
    df[low_col] = to_num(df[low_col])
    df[close_col] = to_num(df[close_col])

    # 成交量有时是带单位/逗号的字符串，这里尽量转数值；转失败就 NaN
    df[vol_col] = to_num(df[vol_col])

    out = (
        df.set_index(date_col)[[open_col, high_col, low_col, close_col, vol_col]]
          .rename(columns={
              open_col: "Open",
              high_col: "High",
              low_col: "Low",
              close_col: "Close",
              vol_col: "Volume",
          })
          .sort_index()
          .dropna(subset=["Open", "High", "Low", "Close"])  # 价格不完整就丢弃
    )

    # ---- 日期过滤 ----
    if start_date:
        out = out[out.index >= pd.to_datetime(start_date)]
    if end_date:
        out = out[out.index <= pd.to_datetime(end_date)]

    return out


# ---------------------------------------
# 4) 保存：单标的保存为 CSV，文件名自动带起止日期
# ---------------------------------------
def save_etf_ohlcv_to_csv(
    symbol: str,
    start_date: str,
    end_date: str,
    save_dir: Union[str, Path] = "./data",
    encoding: str = "utf-8-sig",
) -> Path:
    sym = normalize_etf_symbol(symbol)
    df = load_etf_hist_sina_ohlcv(sym, start_date=start_date, end_date=end_date)

    if df.empty:
        raise ValueError(f"[{sym}] 下载结果为空，请检查日期范围或标的代码是否正确")

    save_dir = Path(save_dir)
    save_dir.mkdir(parents=True, exist_ok=True)

    # 文件名按：sh516780_2022-01-01_2024-12-31.csv
    filename = f"{sym}_{start_date}_{end_date}.csv"
    filepath = save_dir / filename
    df.to_csv(filepath, encoding=encoding)

    return filepath


# ---------------------------------------
# 5) 批量下载：支持多个 ETF
# ---------------------------------------
def batch_download_etf_to_csv(
    symbols: Iterable[str],
    start_date: str,
    end_date: str,
    save_dir: Union[str, Path] = "./data",
    sleep_seconds: float = 0.3,
    continue_on_error: bool = True,
) -> Dict[str, Union[Path, Exception]]:
    """
    批量下载并保存，返回每个 symbol 的结果：
    - 成功：Path
    - 失败：Exception

    sleep_seconds：防止请求过快被限速
    continue_on_error：True 则遇到单个失败继续下一个
    """
    results: Dict[str, Union[Path, Exception]] = {}
    symbols_list = list(symbols)

    for i, s in enumerate(symbols_list, start=1):
        sym = normalize_etf_symbol(s)
        try:
            path = save_etf_ohlcv_to_csv(
                symbol=sym,
                start_date=start_date,
                end_date=end_date,
                save_dir=save_dir,
            )
            results[sym] = path
            print(f"[{i}/{len(symbols_list)}] ✅ {sym} -> {path}")
        except Exception as e:
            results[sym] = e
            print(f"[{i}/{len(symbols_list)}] ❌ {sym} 失败：{e}")
            if not continue_on_error:
                raise
        finally:
            if sleep_seconds and sleep_seconds > 0:
                time.sleep(sleep_seconds)

    return results




In [2]:
results = batch_download_etf_to_csv(
    symbols=["sh516780"],
    start_date="2025-11-20",
    end_date="2026-01-20",
    save_dir="./data"
)
results

[1/1] ✅ sh516780 -> data/sh516780_2025-11-20_2026-01-20.csv


{'sh516780': PosixPath('data/sh516780_2025-11-20_2026-01-20.csv')}