In [1]:
# -*- coding: utf-8 -*-
"""
AKShare ETF Universe Selection + Daily History Download (since 20150101)

Two-stage filtering:
1) Snapshot prefilter from fund_etf_spot_em (fast)
2) 60-trading-day liquidity filter using fund_etf_hist_em (robust)
Then download full daily history since 20150101 for final universe.

Requirements:
    pip install akshare pandas numpy tqdm pyarrow
"""

import os
import time
import math
import random
import datetime as dt
from typing import Optional, List, Tuple, Dict

import numpy as np
import pandas as pd
from tqdm import tqdm

import akshare as ak


# -----------------------------
# Config
# -----------------------------
START_DATE_FULL = "20150101"
LOOKBACK_LIQ_DAYS = 90  # 拉近 60 个交易日，建议多给一点天数避免停牌/缺失；后面再取最近60行
LIQ_WINDOW = 60

# 你可以按市场情况调阈值（单位：人民币）
PREFILTER_MIN_TURNOVER = 5_000_000     # 快照成交额 >= 500万
PREFILTER_MIN_FLOAT_MKTCAP = 100_000_000  # 快照流通市值 >= 1亿
PREFILTER_MIN_TURNOVER_RATE = 0.20      # 快照换手率 >= 0.20%（注意：接口字段通常是“%数值”或字符串，代码里会处理）
PREFILTER_MIN_SHARES = None             # 快照“最新份额”如果能解析则可加门槛；默认不强依赖

# 二次过滤：近60交易日成交额中位数
LIQ_MIN_MEDIAN_TURNOVER = 20_000_000    # 2000万
LIQ_MIN_MEDIAN_TURNOVER_RATE = None     # 如需可设，比如 0.15（%）

ADJUST = ""  # "" 不复权; "qfq" 前复权; "hfq" 后复权（ETF一般不强依赖复权；你可按研究口径切换）
SAVE_DIR = "./etf_data_em"
SAVE_FORMAT = "parquet"  # "parquet" or "csv"
MAX_WORKERS = 1  # AKShare/网页源有时会限频；建议先单线程稳定跑，确认后再上多线程
SLEEP_RANGE = (0.2, 0.8)  # 每次请求随机 sleep，降低被限频概率


# -----------------------------
# Helpers
# -----------------------------
def _sleep():
    time.sleep(random.uniform(*SLEEP_RANGE))


def _to_float(x) -> float:
    """Robust parse numeric fields possibly containing '--', '%', commas, etc."""
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return np.nan
    s = str(x).strip()
    if s in {"--", "---", "None", ""}:
        return np.nan
    s = s.replace(",", "")
    # remove trailing %
    if s.endswith("%"):
        s = s[:-1]
    try:
        return float(s)
    except ValueError:
        return np.nan


def _ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)


def _today_yyyymmdd() -> str:
    return dt.date.today().strftime("%Y%m%d")


def _normalize_code(code: str) -> str:
    """ETF code should be 6-digit string."""
    c = str(code).strip()
    # some sources might include 'sh'/'sz' prefixes; keep digits
    digits = "".join(ch for ch in c if ch.isdigit())
    return digits.zfill(6) if digits else c


# -----------------------------
# Step 1: Snapshot prefilter
# -----------------------------
def fetch_etf_spot_em() -> pd.DataFrame:
    df = ak.fund_etf_spot_em()
    # normalize columns
    if "代码" in df.columns:
        df["code"] = df["代码"].map(_normalize_code)
    else:
        raise ValueError("fund_etf_spot_em() 返回字段里找不到 '代码'")
    if "名称" in df.columns:
        df["name"] = df["名称"].astype(str)
    else:
        df["name"] = ""

    # numeric columns (best-effort)
    for col in ["成交额", "流通市值", "换手率", "最新份额", "最新价"]:
        if col in df.columns:
            df[col] = df[col].map(_to_float)

    return df


def snapshot_prefilter(
    spot: pd.DataFrame,
    min_turnover: float = PREFILTER_MIN_TURNOVER,
    min_float_mktcap: float = PREFILTER_MIN_FLOAT_MKTCAP,
    min_turnover_rate: float = PREFILTER_MIN_TURNOVER_RATE,
    min_shares: Optional[float] = PREFILTER_MIN_SHARES,
) -> pd.DataFrame:
    x = spot.copy()

    # Turnover (成交额)
    if "成交额" in x.columns and min_turnover is not None:
        x = x[x["成交额"].fillna(0) >= float(min_turnover)]

    # Float market cap (流通市值)
    if "流通市值" in x.columns and min_float_mktcap is not None:
        x = x[x["流通市值"].fillna(0) >= float(min_float_mktcap)]

    # Turnover rate (换手率) -- usually in percent units already (e.g. 1.23 means 1.23%)
    if "换手率" in x.columns and min_turnover_rate is not None:
        x = x[x["换手率"].fillna(0) >= float(min_turnover_rate)]

    # Shares (最新份额) -- may be missing/NaN; only enforce if you set a threshold
    if "最新份额" in x.columns and min_shares is not None:
        x = x[x["最新份额"].fillna(0) >= float(min_shares)]

    # Basic sanity: keep 6-digit codes
    x = x[x["code"].str.fullmatch(r"\d{6}", na=False)]
    x = x.drop_duplicates(subset=["code"]).reset_index(drop=True)
    return x


# -----------------------------
# Step 2: Robust liquidity filter via recent daily bars
# -----------------------------
def download_etf_daily_em(
    code: str,
    start_date: str,
    end_date: str,
    adjust: str = ADJUST,
    period: str = "daily",
    max_retry: int = 4,
) -> Optional[pd.DataFrame]:
    """
    Download ETF daily/weekly/monthly bars from Eastmoney via AKShare.
    Returns DataFrame with standardized columns.
    """
    code = _normalize_code(code)
    last_err = None
    for k in range(max_retry):
        try:
            _sleep()
            df = ak.fund_etf_hist_em(
                symbol=code,
                period=period,
                start_date=start_date,
                end_date=end_date,
                adjust=adjust,
            )
            if df is None or len(df) == 0:
                return None

            # standardize
            df = df.copy()
            if "日期" in df.columns:
                df["date"] = pd.to_datetime(df["日期"])
            elif "date" in df.columns:
                df["date"] = pd.to_datetime(df["date"])
            else:
                # unexpected schema
                df["date"] = pd.NaT

            # map known cols (Eastmoney ETF hist typically uses Chinese headers)
            rename_map = {
                "开盘": "open",
                "收盘": "close",
                "最高": "high",
                "最低": "low",
                "成交量": "volume",
                "成交额": "turnover",
                "振幅": "amplitude",
                "涨跌幅": "pct_chg",
                "涨跌额": "chg",
                "换手率": "turnover_rate",
            }
            for c_old, c_new in rename_map.items():
                if c_old in df.columns:
                    df[c_new] = df[c_old].map(_to_float)

            df["code"] = code
            df = df.sort_values("date").reset_index(drop=True)
            return df

        except Exception as e:
            last_err = e
            # backoff
            time.sleep(0.8 * (2 ** k))
            continue

    # exhausted retries
    print(f"[WARN] download failed for {code}: {last_err}")
    return None


def liquidity_filter_recent(
    codes: List[str],
    end_date: str,
    lookback_days: int = LOOKBACK_LIQ_DAYS,
    window: int = LIQ_WINDOW,
    min_median_turnover: float = LIQ_MIN_MEDIAN_TURNOVER,
    min_median_turnover_rate: Optional[float] = LIQ_MIN_MEDIAN_TURNOVER_RATE,
) -> Tuple[List[str], pd.DataFrame]:
    """
    For each code, download recent daily bars and compute median turnover over last `window` rows.
    Returns (passed_codes, stats_df).
    """
    # Use calendar days for start range; we just need enough trading days
    end_dt = dt.datetime.strptime(end_date, "%Y%m%d").date()
    start_dt = end_dt - dt.timedelta(days=lookback_days)
    start_date = start_dt.strftime("%Y%m%d")

    rows = []
    passed = []

    for code in tqdm(codes, desc="Liquidity check (recent daily)"):
        df = download_etf_daily_em(code, start_date=start_date, end_date=end_date, adjust=ADJUST, period="daily")
        if df is None or df["date"].isna().all():
            continue

        # take last N trading bars
        df_tail = df.dropna(subset=["date"]).tail(window).copy()
        if len(df_tail) < max(20, int(window * 0.6)):  # too short -> likely new ETF / missing
            continue

        med_turnover = df_tail["turnover"].median() if "turnover" in df_tail.columns else np.nan
        med_torate = df_tail["turnover_rate"].median() if "turnover_rate" in df_tail.columns else np.nan

        ok = True
        if min_median_turnover is not None and not np.isnan(med_turnover):
            ok = ok and (med_turnover >= float(min_median_turnover))
        elif min_median_turnover is not None and np.isnan(med_turnover):
            ok = False

        if min_median_turnover_rate is not None:
            if np.isnan(med_torate):
                ok = False
            else:
                ok = ok and (med_torate >= float(min_median_turnover_rate))

        rows.append(
            {
                "code": _normalize_code(code),
                "med_turnover_60d": med_turnover,
                "med_turnover_rate_60d": med_torate,
                "n_obs": len(df_tail),
                "pass": ok,
            }
        )
        if ok:
            passed.append(_normalize_code(code))

    stats = pd.DataFrame(rows).sort_values(["pass", "med_turnover_60d"], ascending=[False, False])
    return passed, stats


# -----------------------------
# Step 3: Full history download since 20150101
# -----------------------------
def download_full_history(
    codes: List[str],
    start_date: str,
    end_date: str,
    save_dir: str = SAVE_DIR,
    save_format: str = SAVE_FORMAT,
) -> pd.DataFrame:
    _ensure_dir(save_dir)
    all_df = []

    for code in tqdm(codes, desc="Download full daily history"):
        df = download_etf_daily_em(code, start_date=start_date, end_date=end_date, adjust=ADJUST, period="daily")
        if df is None or len(df) == 0:
            continue

        # keep a consistent set of columns
        keep = ["code", "date", "open", "high", "low", "close", "volume", "turnover", "turnover_rate", "pct_chg"]
        for c in keep:
            if c not in df.columns:
                df[c] = np.nan
        df = df[keep].copy()

        # save per ticker
        path = os.path.join(save_dir, f"{code}.{ 'parquet' if save_format=='parquet' else 'csv'}")
        if save_format == "parquet":
            df.to_parquet(path, index=False)
        else:
            df.to_csv(path, index=False, encoding="utf-8-sig")

        all_df.append(df)

    if not all_df:
        return pd.DataFrame()

    out = pd.concat(all_df, ignore_index=True)
    # also save a combined file
    combined_path = os.path.join(save_dir, f"ALL_ETF_daily_{start_date}_{end_date}.{ 'parquet' if save_format=='parquet' else 'csv'}")
    if save_format == "parquet":
        out.to_parquet(combined_path, index=False)
    else:
        out.to_csv(combined_path, index=False, encoding="utf-8-sig")

    return out

In [2]:
# end_date = _today_yyyymmdd()
# print(f"[INFO] end_date = {end_date}")
# 
# # 1) snapshot
# spot = fetch_etf_spot_em()
# print(f"[INFO] spot count = {len(spot):,}")
# 
# # 2) prefilter (fast)
# cand = snapshot_prefilter(spot)
# print(f"[INFO] candidates after snapshot prefilter = {len(cand):,}")
# 
# # 3) robust liquidity filter (recent 60D)
# passed_codes, stats = liquidity_filter_recent(
#     cand["code"].tolist(),
#     end_date=end_date,
#     lookback_days=LOOKBACK_LIQ_DAYS,
#     window=LIQ_WINDOW,
#     min_median_turnover=LIQ_MIN_MEDIAN_TURNOVER,
#     min_median_turnover_rate=LIQ_MIN_MEDIAN_TURNOVER_RATE,
# )
# print(f"[INFO] passed after 60D liquidity filter = {len(passed_codes):,}")
# 
# _ensure_dir(SAVE_DIR)
# stats.to_csv(os.path.join(SAVE_DIR, "liquidity_filter_stats.csv"), index=False, encoding="utf-8-sig")
# pd.Series(passed_codes, name="code").to_csv(os.path.join(SAVE_DIR, "universe_codes.csv"), index=False, encoding="utf-8-sig")
# 
# # 4) download full history since 20150101 for final universe
# full = download_full_history(
#     passed_codes,
#     start_date=START_DATE_FULL,
#     end_date=end_date,
#     save_dir=SAVE_DIR,
#     save_format=SAVE_FORMAT,
# )
# print(f"[INFO] full history rows downloaded = {len(full):,}")
# print("[DONE]")
# 


In [3]:
spot = ak.fund_etf_spot_em()

# standardize
spot["code"] = spot["代码"].astype(str).str.extract(r"(\d+)")[0].str.zfill(6)
spot["成交额"] = pd.to_numeric(spot["成交额"], errors="coerce")

# Example: pick the most liquid ETF right now (highest 成交额)
one = spot.sort_values("成交额", ascending=False).iloc[0]
code = one["code"]
name = one["名称"]

print(code, name)

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))