In [5]:
"""
高效版：Python + TuShare 的价值投资基本面筛选器（批量抓取 + 本地缓存 + 向量化计算）
说明：
- 先拉取并缓存股票列表（剔除 ST / 退 的股票），再一次性批量拉取各类财报数据；
- 然后在本地做分组统计与指标计算，最后筛出满足条件的股票并保存 CSV。
- 替换 TUSHARE_TOKEN 后即可运行。需要安装：pip install tushare pandas tqdm
"""

import os
import time
import math
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from tqdm import tqdm
import tushare as ts

# --------------- 配置区（请调整） ----------------
TUSHARE_TOKEN = "2fee2223ebadb2fb9852939cdbf869d6e29d21d7befcfbc1470b89d9"   # <- 把你的 token 放这里

START_YEAR = 2020
END_YEAR = 2024
DATA_DIR = Path("tushare_cache")
DATA_DIR.mkdir(exist_ok=True)

# 筛选阈值（可按需调整）
CAGR_MIN = 0.08
PROFIT_YEARS_MIN = 3
FCF_MARGIN_MIN = 0.05
ROE_MIN = 0.12
NETDEBT_EBITDA_MAX = 3
PITO_F_MIN = 6  # 最终 Piotroski F-score 要求（脚本采用可用字段计算并按9分制）

# --------------- 初始化 TuShare -------------------
ts.set_token(TUSHARE_TOKEN)
pro = ts.pro_api()

# --------------- 工具函数 -------------------------
def save_if_not_exists(df, path: Path):
    if not path.exists():
        df.to_csv(path, index=False)
    return path

def try_read_csv(path: Path):
    if path.exists():
        return pd.read_csv(path, dtype=str)
    return None

def float_safe(x):
    try:
        return float(x)
    except Exception:
        return float('nan')

# --------------- Step 1: 股票名单（缓存） ----------------
stock_list_path = DATA_DIR / "stock_list.csv"
if not stock_list_path.exists():
    print("下载并缓存股票列表（排除 ST/退 市）...")
    stocks = pro.stock_basic(exchange='', list_status='L',
                             fields='ts_code,name,area,industry,list_date')
    # 过滤掉名字中包含 ST 或 退 的（兼容大小写）
    mask = (~stocks['name'].str.contains('ST', case=False, na=False)) & \
           (~stocks['name'].str.contains('退', na=False))
    stocks = stocks[mask].reset_index(drop=True)
    stocks.to_csv(stock_list_path, index=False, encoding='utf-8-sig')
else:
    stocks = pd.read_csv(stock_list_path, dtype=str)
print(f"股票名单已加载：{len(stocks)} 支（已缓存：{stock_list_path}）")

# --------------- Step 2: 批量拉取五张表并缓存 ----------------
# 定义缓存文件路径
income_path = DATA_DIR / f"income_{START_YEAR}_{END_YEAR}.csv"
cashflow_path = DATA_DIR / f"cashflow_{START_YEAR}_{END_YEAR}.csv"
balance_path = DATA_DIR / f"balancesheet_{START_YEAR}_{END_YEAR}.csv"
finaind_path = DATA_DIR / f"fina_indicator_{START_YEAR}_{END_YEAR}.csv"
capital_path = DATA_DIR / f"capital_{START_YEAR}_{END_YEAR}.csv"

# helper to fetch if not cached
def fetch_or_load(path: Path, fetch_fn):
    df = try_read_csv(path)
    if df is not None:
        print(f"加载缓存：{path}")
        return df
    print(f"API 拉取并缓存：{path.name} ...")
    df = fetch_fn()
    # ensure columns are str to avoid dtype surprises; but convert numeric later
    df.to_csv(path, index=False)
    return df

# define fetch functions (fields chosen conservatively; 若 tuShare 无某字段会报错，请在报错时告知我字段名我再调整)
start_date = f"{START_YEAR}0101"
end_date = f"{END_YEAR}1231"

def fetch_income():
    return pro.income(start_date=start_date, end_date=end_date,
                      fields='ts_code,ann_date,end_date,revenue,operating_revenue,total_operating_revenue,grossprofit,netprofit')

def fetch_cashflow():
    # 常用字段：operating_cashflow 或 ncf_operate; capex 字段可能为 'capital_expend' 或 'capex'
    # 先尝试常见字段名
    return pro.cashflow(start_date=start_date, end_date=end_date,
                        fields='ts_code,ann_date,end_date,operating_cashflow,capex,netprofit_operate')

def fetch_balance():
    return pro.balancesheet(start_date=start_date, end_date=end_date,
                             fields='ts_code,ann_date,end_date,total_assets,total_liab,total_cur_assets,total_cur_liab,monetary_cap')

def fetch_finaind():
    # 包含 ROE、毛利率等
    return pro.fina_indicator(start_date=start_date, end_date=end_date,
                              fields='ts_code,ann_date,end_date,roe,grossprofit_margin,roa')

def fetch_capital():
    # 可能包含 EBITDA 字段（某些年度/公司缺失）
    return pro.capital(start_date=start_date, end_date=end_date,
                       fields='ts_code,ann_date,end_date,ebitda')

# fetch or load
income_all = fetch_or_load(income_path, fetch_income)
cash_all = fetch_or_load(cashflow_path, fetch_cashflow)
balance_all = fetch_or_load(balance_path, fetch_balance)
fina_all = fetch_or_load(finaind_path, fetch_finaind)
capital_all = fetch_or_load(capital_path, fetch_capital)

# --------------- Step 3: 数据清洗（转换类型、标准列名） ----------------
def clean_df(df, date_col='end_date'):
    # normalize column names to str
    df = df.copy()
    # Ensure ts_code exists
    if 'ts_code' not in df.columns:
        raise RuntimeError("数据表缺少 ts_code 列，请检查 TuShare 参数或字段名")
    # convert dates and numeric fields later when used
    return df

income_all = clean_df(income_all)
cash_all = clean_df(cash_all)
balance_all = clean_df(balance_all)
fina_all = clean_df(fina_all)
capital_all = clean_df(capital_all)

# convert numeric-like columns to numeric for safer computation (coerce errors)
numcols_income = [c for c in ['revenue', 'netprofit', 'operating_revenue', 'total_operating_revenue', 'grossprofit'] if c in income_all.columns]
for c in numcols_income:
    income_all[c] = pd.to_numeric(income_all[c], errors='coerce')

numcols_cash = [c for c in ['operating_cashflow', 'capex', 'netprofit_operate'] if c in cash_all.columns]
for c in numcols_cash:
    cash_all[c] = pd.to_numeric(cash_all[c], errors='coerce')

numcols_bal = [c for c in ['total_assets', 'total_liab', 'total_cur_assets', 'total_cur_liab', 'monetary_cap'] if c in balance_all.columns]
for c in numcols_bal:
    balance_all[c] = pd.to_numeric(balance_all[c], errors='coerce')

numcols_fina = [c for c in ['roe', 'grossprofit_margin', 'roa'] if c in fina_all.columns]
for c in numcols_fina:
    fina_all[c] = pd.to_numeric(fina_all[c], errors='coerce')

if 'ebitda' in capital_all.columns:
    capital_all['ebitda'] = pd.to_numeric(capital_all['ebitda'], errors='coerce')

# --------------- Step 4: 按 ts_code 分组并计算指标（向量化/分组方式） --------------
# 为了减少 Python 层循环，我们也可以把数据按 ts_code 分组到字典中（内存占用稍高，但对加速有帮助）
income_grp = {k: g.sort_values('end_date') for k, g in income_all.groupby('ts_code')}
cash_grp = {k: g.sort_values('end_date') for k, g in cash_all.groupby('ts_code')}
balance_grp = {k: g.sort_values('end_date') for k, g in balance_all.groupby('ts_code')}
fina_grp = {k: g.sort_values('end_date') for k, g in fina_all.groupby('ts_code')}
capital_grp = {k: g.sort_values('end_date') for k, g in capital_all.groupby('ts_code')} if 'ebitda' in capital_all.columns else {}

# --------------- Step 5: 对股票名单逐只计算（快速过滤 + 断点续跑） ----------------
selected = []
failed = []
pbar = tqdm(stocks['ts_code'].tolist(), desc="筛选股票", ncols=100)

for ts_code in pbar:
    try:
        # 要求各项数据至少有 N 年的数据（这里最少2年用于比较）
        inc = income_grp.get(ts_code)
        if inc is None or inc.shape[0] < 2:
            continue
        cf = cash_grp.get(ts_code)
        if cf is None or cf.shape[0] < 1:
            continue
        bal = balance_grp.get(ts_code)
        if bal is None or bal.shape[0] < 1:
            continue
        fin = fina_grp.get(ts_code)
        if fin is None or fin.shape[0] < 1:
            # 可以继续，但很多财务指标不可用
            pass

        # --- 1) 收入 CAGR (使用时间序列的第一个和最后一个) ---
        rev_series = inc['revenue'].dropna().values
        if rev_series.size < 2 or (rev_series[0] <= 0):
            continue
        years_count = inc.shape[0]
        # ensure numeric
        first_rev = float_safe(rev_series[0])
        last_rev = float_safe(rev_series[-1])
        if first_rev <= 0:
            continue
        CAGR = (last_rev / first_rev) ** (1.0 / (years_count - 1)) - 1.0

        if pd.isna(CAGR) or CAGR < CAGR_MIN:
            continue

        # --- 2) 净利润连续增长年数（以 netprofit 序列 diff > 0 计数） ---
        net_series = inc['netprofit'].dropna().astype(float).values
        profit_growth_years = 0
        if net_series.size >= 2:
            profit_growth_years = int((net_series[1:] - net_series[:-1] > 0).sum())
        if profit_growth_years < PROFIT_YEARS_MIN:
            continue

        # --- 3) 自由现金流与 FCF margin ---
        # operating_cashflow - capex
        if ('operating_cashflow' in cf.columns):
            ocf_series = cf['operating_cashflow'].astype(float).values
        else:
            ocf_series = None
        capex_series = cf['capex'].astype(float).values if 'capex' in cf.columns else None

        if ocf_series is None:
            continue

        # align by time: take the overlapping n entries with revenue (best-effort)
        # compute fcf as ocf - capex, fallback if capex missing
        try:
            n = min(len(ocf_series), len(rev_series))
            ocf_recent = ocf_series[-n:]
            rev_recent = rev_series[-n:].astype(float)
            if capex_series is not None:
                capex_recent = capex_series[-n:].astype(float)
            else:
                capex_recent = [0.0] * n
            fcf_series = ocf_recent - capex_recent
            # require last year fcf > 0 and average margin threshold
            if math.isnan(fcf_series[-1]) or fcf_series[-1] <= 0:
                continue
            fcf_margin = ( (fcf_series / rev_recent).mean() )
            if pd.isna(fcf_margin) or fcf_margin < FCF_MARGIN_MIN:
                continue
        except Exception:
            continue

        # --- 4) ROE 平均值 ---
        roe_mean = float('nan')
        if 'roe' in fin.columns:
            roe_vals = fin['roe'].dropna().astype(float).values
            if len(roe_vals) > 0:
                roe_mean = float(roe_vals.mean())
        if pd.isna(roe_mean) or roe_mean < ROE_MIN:
            continue

        # --- 5) 净负债 / EBITDA ---
        # 尝试使用 balance 的 monetary_cap 作为现金，fallback 使用 current assets
        bal_last = bal.iloc[-1]
        total_liab = float_safe(bal_last.get('total_liab', float('nan')))
        cash_value = float('nan')
        for cand in ['monetary_cap', 'monetaryfunds', 'monetary', 'monetary_fund', 'monetary_capital']:
            if cand in bal.columns:
                try:
                    cash_value = float_safe(bal_last.get(cand))
                    break
                except Exception:
                    cash_value = float('nan')
        if math.isnan(cash_value):
            # fallback to current assets / current liabilities if exist
            cur_assets = float_safe(bal_last.get('total_cur_assets', float('nan')))
            cur_liab = float_safe(bal_last.get('total_cur_liab', float('nan')))
            # assume some cash inside current assets but cannot be precise; conservative取0
            cash_value = 0.0

        net_debt = total_liab - cash_value if not math.isnan(total_liab) else float('nan')

        # ebitda
        ebitda_vals = []
        if ts_code in capital_grp:
            ebitda_vals = capital_grp[ts_code]['ebitda'].dropna().astype(float).values
        ebitda_mean = float('nan')
        if len(ebitda_vals) > 0:
            ebitda_mean = float(ebitda_vals.mean())

        # 如果没有 EBITDA，则尝试用（netprofit + interest + tax + depreciation）近似 (字段可能缺失)，否则跳过 EBITDA 检查
        ndebt_ebitda = float('nan')
        if not math.isnan(net_debt) and (not math.isnan(ebitda_mean) and ebitda_mean > 0):
            ndebt_ebitda = net_debt / ebitda_mean
            if ndebt_ebitda > NETDEBT_EBITDA_MAX:
                continue
        # else 不强制要求（某些行业 ebitda 缺失）

        # --- 6) Piotroski F-score（尽量按 9 项计算，缺字段时按可用项数做归一化） ---
        # We'll compute all 9 if fields available, otherwise compute subset and scale to 9
        piot_score = 0
        piot_total_possible = 0

        # helper get last and prior values for some items
        def last_two(series):
            s = series.dropna().astype(float).values
            if len(s) >= 2:
                return s[-1], s[-2]
            elif len(s) == 1:
                return s[-1], float('nan')
            else:
                return float('nan'), float('nan')

        # 1) Positive ROA (net profit / total assets) -> use fina.roa if available
        if 'roa' in fina_all.columns:
            cur_roa, prev_roa = last_two(fin['roa']) if ts_code in fina_grp else (float('nan'), float('nan'))
            if not math.isnan(cur_roa):
                piot_total_possible += 1
                if cur_roa > 0:
                    piot_score += 1

        # 2) Positive operating cash flow (ocf last > 0)
        if 'operating_cashflow' in cf.columns:
            cur_ocf = float_safe(cf['operating_cashflow'].dropna().astype(float).values[-1]) if cf['operating_cashflow'].dropna().size>0 else float('nan')
            if not math.isnan(cur_ocf):
                piot_total_possible += 1
                if cur_ocf > 0:
                    piot_score += 1

        # 3) Accruals (OCF > Net profit)
        if ('operating_cashflow' in cf.columns) and ('netprofit' in inc.columns):
            try:
                cur_net = float_safe(inc['netprofit'].dropna().astype(float).values[-1]) if inc['netprofit'].dropna().size>0 else float('nan')
                if not math.isnan(cur_ocf) and not math.isnan(cur_net):
                    piot_total_possible += 1
                    if cur_ocf > cur_net:
                        piot_score += 1
            except Exception:
                pass

        # 4) Leverage decreased (total_liab last < prev)
        if 'total_liab' in bal.columns and bal.shape[0] >= 2:
            piot_total_possible += 1
            if float_safe(bal['total_liab'].iloc[-1]) < float_safe(bal['total_liab'].iloc[-2]):
                piot_score += 1

        # 5) Current ratio improved (cur_assets/cur_liab)
        if ('total_cur_assets' in bal.columns) and ('total_cur_liab' in bal.columns) and bal.shape[0] >= 2:
            prev_cur = float_safe(bal['total_cur_assets'].iloc[-2]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-2]))
            cur_cur = float_safe(bal['total_cur_assets'].iloc[-1]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-1]))
            piot_total_possible += 1
            if cur_cur > prev_cur:
                piot_score += 1

        # 6) No new shares issued (share capital not increased) -- try find 'total_share' or 'tot_shr' in other tables (not guaranteed)
        # We skip if we can't find share data

        # 7) Gross margin improved (use grossprofit_margin from fina)
        if 'grossprofit_margin' in fin.columns and fin.shape[0] >= 2:
            prev_gm = float_safe(fin['grossprofit_margin'].iloc[-2])
            cur_gm = float_safe(fin['grossprofit_margin'].iloc[-1])
            piot_total_possible += 1
            if cur_gm > prev_gm:
                piot_score += 1

        # 8) Asset turnover improved (revenue / total_assets)
        if 'total_assets' in bal.columns and inc.shape[0] >= 2:
            prev_turn = float_safe(inc['revenue'].iloc[-2]) / max(1e-9, float_safe(bal['total_assets'].iloc[-2])) if bal.shape[0] >=2 else float('nan')
            cur_turn = float_safe(inc['revenue'].iloc[-1]) / max(1e-9, float_safe(bal['total_assets'].iloc[-1]))
            if not math.isnan(cur_turn) and not math.isnan(prev_turn):
                piot_total_possible += 1
                if cur_turn > prev_turn:
                    piot_score += 1

        # If total possible < 9, scale to 9-point equivalent (保守做法)
        piot_score_scaled = (piot_score / max(1, piot_total_possible)) * 9 if piot_total_possible > 0 else 0

        # 最终按门槛判断（使用 scaled）
        if piot_score_scaled < PITO_F_MIN:
            continue

        # 若通过所有检测，保存结果（关键列）
        name = stocks.loc[stocks['ts_code'] == ts_code, 'name'].values[0] if ts_code in stocks['ts_code'].values else ts_code
        selected.append({
            'ts_code': ts_code,
            'name': name,
            'CAGR': CAGR,
            'ProfitGrowthYears': profit_growth_years,
            'FCF_last': float(fcf_series[-1]) if len(fcf_series)>0 else float('nan'),
            'FCF_Margin': float(fcf_margin),
            'ROE_mean': float(roe_mean),
            'NetDebt': float(net_debt) if not math.isnan(net_debt) else float('nan'),
            'EBITDA_mean': float(ebitda_mean) if not math.isnan(ebitda_mean) else float('nan'),
            'NetDebt_EBITDA': float(ndebt_ebitda) if not math.isnan(ndebt_ebitda) else float('nan'),
            'Piotroski_scaled': float(piot_score_scaled)
        })

    except Exception as e:
        # 记录失败并继续，防止单只股票错误中断整个批量过程
        failed.append((ts_code, str(e)))
        continue

pbar.close()
# --------------- Step 6: 输出结果（避免 KeyError） ----------------
df_selected = pd.DataFrame(selected)
out_csv = DATA_DIR / "selected_value_stocks.csv"
if df_selected.empty:
    print("筛选后没有符合条件的股票（df_selected 为空）。")
else:
    # safe sort: 如果 ROE_mean 不存在则按其他字段排序
    sort_by = 'ROE_mean' if 'ROE_mean' in df_selected.columns else df_selected.columns[0]
    df_selected.sort_values(by=sort_by, ascending=False, inplace=True)
    df_selected.to_csv(out_csv, index=False, encoding='utf-8-sig')
    print(f"筛选完成：{len(df_selected)} 支符合条件，结果已保存到 {out_csv}")
    print(df_selected.head(50).to_string(index=False))

# 将失败记录保存以便排查
if failed:
    fail_df = pd.DataFrame(failed, columns=['ts_code', 'error'])
    fail_df.to_csv(DATA_DIR / "fetch_failed_log.csv", index=False, encoding='utf-8-sig')
    print(f"部分股票处理失败（详情见 fetch_failed_log.csv），失败数：{len(failed)}")


股票名单已加载：5247 支（已缓存：tushare_cache\stock_list.csv）
API 拉取并缓存：income_2020_2024.csv ...


Exception: 必填参数 ts_code

## 原始数据下载与存储

In [6]:
import tushare as ts
import pandas as pd
from pathlib import Path
import time

# ====== 配置 ======
ts.set_token("2fee2223ebadb2fb9852939cdbf869d6e29d21d7befcfbc1470b89d9")  # 替换成你的token
pro = ts.pro_api()

data_dir = Path("./tushare_data")
data_dir.mkdir(exist_ok=True)

start_date = "20180101"
end_date = "20251231"

# ====== Step 1: 获取并缓存股票列表 ======
stock_list_path = data_dir / "stock_list.csv"

if stock_list_path.exists():
    stock_df = pd.read_csv(stock_list_path)
    print(f"已加载缓存股票列表，共 {len(stock_df)} 只股票")
else:
    stock_df = pro.stock_basic(exchange='', list_status='L',
                               fields='ts_code,symbol,name,area,industry,list_date')
    stock_df = stock_df[~stock_df['name'].str.contains('ST')]  # 剔除ST
    stock_df.to_csv(stock_list_path, index=False)
    print(f"已获取股票列表并剔除ST，共 {len(stock_df)} 只股票")

ts_codes = stock_df['ts_code'].tolist()

# ====== Step 2: 定义财务数据拉取函数（带缓存） ======
def fetch_financial_data(file_name, fetch_fn):
    path = data_dir / file_name
    if path.exists():
        print(f"加载缓存：{file_name}")
        return pd.read_csv(path)

    print(f"开始下载 {file_name} 数据...")
    all_data = []
    for i, code in enumerate(ts_codes):
        try:
            df = fetch_fn(code)
            if df is not None and not df.empty:
                all_data.append(df)
        except Exception as e:
            print(f"{code} 拉取失败: {e}")
        time.sleep(0.3)  # TuShare免费版防止频率超限

        if (i + 1) % 100 == 0:
            print(f"已下载 {i+1}/{len(ts_codes)} 支股票")

    final_df = pd.concat(all_data, ignore_index=True)
    final_df.to_csv(path, index=False)
    print(f"{file_name} 下载完成，已保存 {len(final_df)} 条记录")
    return final_df

# ====== 拉取三大表 ======
income_df = fetch_financial_data("income.csv",
                                 lambda code: pro.income(ts_code=code, start_date=start_date, end_date=end_date,
                                                         fields="ts_code,ann_date,end_date,revenue,netprofit"))

balance_df = fetch_financial_data("balancesheet.csv",
                                  lambda code: pro.balancesheet(ts_code=code, start_date=start_date, end_date=end_date,
                                                                fields="ts_code,ann_date,end_date,total_assets,total_liab"))

cashflow_df = fetch_financial_data("cashflow.csv",
                                   lambda code: pro.cashflow(ts_code=code, start_date=start_date, end_date=end_date,
                                                             fields="ts_code,ann_date,end_date,n_cashflow_act"))

# ====== Step 3: 后续筛选逻辑（示例） ======
# 例如计算近3年平均ROE、现金流充足率等指标，这里你可以自己加逻辑


已获取股票列表并剔除ST，共 5247 只股票
开始下载 income.csv 数据...
已下载 100/5247 支股票
已下载 200/5247 支股票
已下载 300/5247 支股票
已下载 400/5247 支股票
已下载 500/5247 支股票
已下载 600/5247 支股票
已下载 700/5247 支股票
已下载 800/5247 支股票
已下载 900/5247 支股票
已下载 1000/5247 支股票
已下载 1100/5247 支股票
已下载 1200/5247 支股票
已下载 1300/5247 支股票
已下载 1400/5247 支股票
已下载 1500/5247 支股票
已下载 1600/5247 支股票
已下载 1700/5247 支股票
已下载 1800/5247 支股票
已下载 1900/5247 支股票
已下载 2000/5247 支股票
已下载 2100/5247 支股票
已下载 2200/5247 支股票
已下载 2300/5247 支股票
已下载 2400/5247 支股票
已下载 2500/5247 支股票
已下载 2600/5247 支股票
已下载 2700/5247 支股票
已下载 2800/5247 支股票
已下载 2900/5247 支股票
已下载 3000/5247 支股票
已下载 3100/5247 支股票
已下载 3200/5247 支股票
已下载 3300/5247 支股票
已下载 3400/5247 支股票
已下载 3500/5247 支股票
已下载 3600/5247 支股票
已下载 3700/5247 支股票
已下载 3800/5247 支股票
已下载 3900/5247 支股票
已下载 4000/5247 支股票
已下载 4100/5247 支股票
已下载 4200/5247 支股票
已下载 4300/5247 支股票
已下载 4400/5247 支股票
已下载 4500/5247 支股票
已下载 4600/5247 支股票
已下载 4700/5247 支股票
已下载 4800/5247 支股票
已下载 4900/5247 支股票
已下载 5000/5247 支股票
已下载 5100/5247 支股票
已下载 5200/5247 支股票
income.csv 下载完成，已保存 198467 

## 增量数据下载与合并

In [8]:
import pandas as pd
import tushare as ts
from pathlib import Path
import time

# ===== 配置 =====
ts.set_token("2fee2223ebadb2fb9852939cdbf869d6e29d21d7befcfbc1470b89d9")
pro = ts.pro_api()

data_dir = Path("./tushare_data")
start_date_default = "20180101"  # 如果没有历史数据，从这个日期开始

# ===== 增量更新函数 =====
def update_financial_data(file_name, fetch_fn):
    path = data_dir / file_name
    
    if path.exists():
        df_old = pd.read_csv(path, dtype=str)
        df_old['ann_date'] = pd.to_datetime(df_old['ann_date'])
        last_date = df_old['ann_date'].max().strftime("%Y%m%d")
        print(f"{file_name} 当前最新公告日期: {last_date}")
        start_date = (pd.to_datetime(last_date) + pd.Timedelta(days=1)).strftime("%Y%m%d")
    else:
        df_old = pd.DataFrame()
        start_date = start_date_default
        print(f"{file_name} 无历史数据，从 {start_date} 开始下载")

    if start_date > pd.Timestamp.today().strftime("%Y%m%d"):
        print(f"{file_name} 已是最新，无需更新")
        return df_old

    # 读取股票列表
    stock_df = pd.read_csv(data_dir / "stock_list.csv")
    ts_codes = stock_df['ts_code'].tolist()

    # 下载增量数据
    all_new_data = []
    for i, code in enumerate(ts_codes):
        try:
            df = fetch_fn(code, start_date)
            if df is not None and not df.empty:
                all_new_data.append(df)
        except Exception as e:
            print(f"{code} 拉取失败: {e}")
        time.sleep(0.3)  # 防频率超限

        if (i + 1) % 100 == 0:
            print(f"已更新 {i+1}/{len(ts_codes)} 支股票")

    if not all_new_data:
        print(f"{file_name} 无新增数据")
        return df_old

    df_new = pd.concat(all_new_data, ignore_index=True)

    # 合并新旧数据
    df_final = pd.concat([df_old, df_new], ignore_index=True).drop_duplicates(subset=['ts_code','end_date'], keep='last')
    df_final.to_csv(path, index=False)
    print(f"{file_name} 更新完成，共 {len(df_new)} 条新增记录")
    return df_final


# ===== 调用增量更新 =====
update_financial_data("income.csv",
    lambda code, start: pro.income(ts_code=code, start_date=start,
                                   end_date="20251231",
                                   fields="ts_code,ann_date,end_date,revenue,netprofit"))

update_financial_data("balancesheet.csv",
    lambda code, start: pro.balancesheet(ts_code=code, start_date=start,
                                         end_date="20251231",
                                         fields="ts_code,ann_date,end_date,total_assets,total_liab"))

update_financial_data("cashflow.csv",
    lambda code, start: pro.cashflow(ts_code=code, start_date=start,
                                     end_date="20251231",
                                     fields="ts_code,ann_date,end_date,n_cashflow_act"))


income.csv 当前最新公告日期: 20250811
income.csv 已是最新，无需更新
balancesheet.csv 当前最新公告日期: 20250811
balancesheet.csv 已是最新，无需更新
cashflow.csv 当前最新公告日期: 20250811
cashflow.csv 已是最新，无需更新


Unnamed: 0,ts_code,ann_date,end_date,n_cashflow_act
0,000001.SZ,2025-04-19,20250331,162946000000.0
1,000001.SZ,2025-03-15,20241231,63336000000.0
2,000001.SZ,2025-03-15,20241231,63336000000.0
3,000001.SZ,2024-10-19,20240930,137158000000.0
4,000001.SZ,2024-08-16,20240630,113722000000.0
...,...,...,...,...
204040,689009.SH,2023-03-31,20221231,1589096254.56
204041,689009.SH,2022-04-26,20211231,-161451665.8
204042,689009.SH,2021-04-16,20201231,896345908.49
204043,689009.SH,2020-09-30,20200630,320937130.13


## 整合逻辑后的增量数据下载

In [9]:
import pandas as pd
import tushare as ts
from pathlib import Path
import time

# ========== 配置 ==========
ts.set_token("2fee2223ebadb2fb9852939cdbf869d6e29d21d7befcfbc1470b89d9")
pro = ts.pro_api()

data_dir = Path("./tushare_data")
data_dir.mkdir(exist_ok=True)

start_date_default = "20180101"
end_date = "20251231"

# ========== Step 1: 获取股票列表（排除 ST） ==========
def fetch_stock_list():
    stock_path = data_dir / "stock_list.csv"
    if stock_path.exists():
        print("读取已有股票列表...")
        stock_df = pd.read_csv(stock_path, dtype=str)
    else:
        print("首次下载股票列表...")
        stock_df = pro.stock_basic(exchange='', list_status='L',
                                   fields='ts_code,symbol,name,area,industry,list_date')
        stock_df = stock_df[~stock_df['name'].str.contains('ST')]  # 排除 ST
        stock_df.to_csv(stock_path, index=False)
    return stock_df

# ========== Step 2: 增量下载逻辑 ==========
def update_financial_data(file_name, fetch_fn):
    path = data_dir / file_name
    
    if path.exists():
        df_old = pd.read_csv(path, dtype=str)
        df_old['ann_date'] = pd.to_datetime(df_old['ann_date'])
        last_date = df_old['ann_date'].max().strftime("%Y%m%d")
        print(f"{file_name} 当前最新公告日期: {last_date}")
        start_date = (pd.to_datetime(last_date) + pd.Timedelta(days=1)).strftime("%Y%m%d")
    else:
        df_old = pd.DataFrame()
        start_date = start_date_default
        print(f"{file_name} 无历史数据，从 {start_date} 开始下载")

    if start_date > pd.Timestamp.today().strftime("%Y%m%d"):
        print(f"{file_name} 已是最新，无需更新")
        return df_old

    # 获取股票列表
    stock_df = pd.read_csv(data_dir / "stock_list.csv")
    ts_codes = stock_df['ts_code'].tolist()

    # 下载增量数据
    all_new_data = []
    for i, code in enumerate(ts_codes):
        try:
            df = fetch_fn(code, start_date)
            if df is not None and not df.empty:
                all_new_data.append(df)
        except Exception as e:
            print(f"{code} 拉取失败: {e}")
        time.sleep(0.25)  # 控制请求频率

        if (i + 1) % 100 == 0:
            print(f"已处理 {i+1}/{len(ts_codes)} 支股票")

    if not all_new_data:
        print(f"{file_name} 无新增数据")
        return df_old

    df_new = pd.concat(all_new_data, ignore_index=True)
    df_final = pd.concat([df_old, df_new], ignore_index=True)\
                 .drop_duplicates(subset=['ts_code','end_date'], keep='last')
    df_final.to_csv(path, index=False)
    print(f"{file_name} 更新完成，新增 {len(df_new)} 条记录")
    return df_final

# ========== 主程序 ==========
if __name__ == "__main__":
    # Step 1: 确保股票列表存在
    fetch_stock_list()

    # Step 2: 更新三大财务表
    update_financial_data("income.csv",
        lambda code, start: pro.income(ts_code=code, start_date=start, end_date=end_date,
                                       fields="ts_code,ann_date,end_date,revenue,netprofit"))

    update_financial_data("balancesheet.csv",
        lambda code, start: pro.balancesheet(ts_code=code, start_date=start, end_date=end_date,
                                             fields="ts_code,ann_date,end_date,total_assets,total_liab"))

    update_financial_data("cashflow.csv",
        lambda code, start: pro.cashflow(ts_code=code, start_date=start, end_date=end_date,
                                         fields="ts_code,ann_date,end_date,n_cashflow_act"))


读取已有股票列表...
income.csv 当前最新公告日期: 20250811
income.csv 已是最新，无需更新
balancesheet.csv 当前最新公告日期: 20250811
balancesheet.csv 已是最新，无需更新
cashflow.csv 当前最新公告日期: 20250811
cashflow.csv 已是最新，无需更新


## 两步法 + 增量更新 + 自动筛选器

In [12]:
"""
完整版：两步法 + 增量更新 + 多指标价值筛选器（TuShare）
说明：
- 第一步：获取并缓存股票列表（剔除含 'ST' / '退' 的公司）
- 第二步：对三大表做增量更新（income / balancesheet / cashflow），并缓存 CSV
- 第三步：在本地数据上做向量化/分组计算筛选指标并输出结果
- 请替换 TUSHARE_TOKEN 并确保有足够接口权限
"""

import time
import math
from pathlib import Path
from datetime import datetime
import pandas as pd
import tushare as ts

# ========== 配置 ==========
TUSHARE_TOKEN = "2fee2223ebadb2fb9852939cdbf869d6e29d21d7befcfbc1470b89d9"   # <- 填你的 token
START_DATE_DEFAULT = "20180101"
END_DATE = "20251231"
DATA_DIR = Path("tushare_data")
DATA_DIR.mkdir(exist_ok=True)

# 筛选阈值（可调整）
CAGR_MIN = 0.08
PROFIT_YEARS_MIN = 3
FCF_MARGIN_MIN = 0.05
ROE_MIN = 0.12
NETDEBT_EBITDA_MAX = 3
PITO_F_MIN = 6  # Piotroski阈值（按9分制 scaled）

# ========== 初始化 TuShare ==========
ts.set_token(TUSHARE_TOKEN)
pro = ts.pro_api()

# ========== 工具函数 ==========
def float_safe(x):
    try:
        return float(x)
    except Exception:
        return float('nan')

def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

ensure_dir(DATA_DIR)

# ========== Step 1: 股票名单（缓存） ==========
def fetch_stock_list():
    path = DATA_DIR / "stock_list.csv"
    if path.exists():
        df = pd.read_csv(path, dtype=str)
        print(f"[股票名单] 已加载缓存，共 {len(df)} 行")
        return df
    print("[股票名单] 正在从 TuShare 获取...")
    df = pro.stock_basic(exchange='', list_status='L',
                         fields='ts_code,name,area,industry,list_date')
    # 过滤掉名字中包含 ST 或 退 的（兼容大小写）
    df = df[~df['name'].str.contains('ST', case=False, na=False)]
    df = df[~df['name'].str.contains('退', na=False)]
    df.to_csv(path, index=False, encoding='utf-8-sig')
    print(f"[股票名单] 已保存到 {path}，共 {len(df)} 行")
    return df

# ========== Step 2: 增量下载函数（通用） ==========
def update_table_incremental(file_name: str, fetch_fn):
    """
    file_name: 保存的 csv 文件名
    fetch_fn: function(ts_code, start_date) -> DataFrame (may be empty)
    """
    path = DATA_DIR / file_name
    if path.exists():
        df_old = pd.read_csv(path, dtype=str)
        if 'ann_date' in df_old.columns:
            df_old['ann_date'] = pd.to_datetime(df_old['ann_date'], errors='coerce')
            last_ann = df_old['ann_date'].max()
            if pd.notna(last_ann):
                start_date = (last_ann + pd.Timedelta(days=1)).strftime("%Y%m%d")
            else:
                start_date = START_DATE_DEFAULT
        else:
            # 如果没有 ann_date 字段，取 end_date 的最大值作为参考
            if 'end_date' in df_old.columns:
                df_old['end_date'] = pd.to_datetime(df_old['end_date'], errors='coerce')
                last_end = df_old['end_date'].max()
                start_date = (last_end + pd.Timedelta(days=1)).strftime("%Y%m%d") if pd.notna(last_end) else START_DATE_DEFAULT
            else:
                start_date = START_DATE_DEFAULT
        print(f"[{file_name}] 已发现已有文件，增量开始日期 = {start_date}")
    else:
        df_old = pd.DataFrame()
        start_date = START_DATE_DEFAULT
        print(f"[{file_name}] 未发现已有文件，从 {start_date} 全量下载")

    # 若起始日比今天晚或等于今天则无需更新
    if start_date > datetime.today().strftime("%Y%m%d"):
        print(f"[{file_name}] 已是最新，无需更新（start_date > 今天）")
        return df_old

    # 读取股票列表
    stock_df = fetch_stock_list()
    ts_codes = stock_df['ts_code'].tolist()

    all_new = []
    failed = []
    for i, code in enumerate(ts_codes):
        try:
            df = fetch_fn(code, start_date)
            if isinstance(df, pd.DataFrame) and not df.empty:
                all_new.append(df)
        except Exception as e:
            failed.append((code, str(e)))
        # 控制频率，TuShare 免费账户限制较严格
        time.sleep(0.25)
        if (i + 1) % 200 == 0:
            print(f"[{file_name}] 已处理 {i+1}/{len(ts_codes)} 支股票")

    if not all_new:
        print(f"[{file_name}] 无新增记录。")
        return df_old

    df_new = pd.concat(all_new, ignore_index=True)
    # 合并并去重（以 ts_code + end_date 去重）
    if not df_old.empty:
        df_combined = pd.concat([df_old, df_new], ignore_index=True)
    else:
        df_combined = df_new
    if 'end_date' in df_combined.columns:
        df_combined['end_date'] = pd.to_datetime(df_combined['end_date'], errors='coerce')
        # 去重并保留最新公告那条
        df_combined = df_combined.drop_duplicates(subset=['ts_code','end_date'], keep='last')
    df_combined.to_csv(path, index=False, encoding='utf-8-sig')
    print(f"[{file_name}] 更新完成：新增 {len(df_new)} 条记录，已保存到 {path}")
    if failed:
        fpath = DATA_DIR / f"{file_name}.failed_log.csv"
        pd.DataFrame(failed, columns=['ts_code','error']).to_csv(fpath, index=False, encoding='utf-8-sig')
        print(f"[{file_name}] 部分失败已保存到 {fpath}")
    return df_combined

# ========== Step 3: 拉取/更新三大表（使用 TuShare 官方字段） ==========
def update_all_financials():
    # income: we will request n_income_attr_p (归属于母公司净利润) & revenue
    def fetch_income_for(code, start):
        return pro.income(ts_code=code, start_date=start, end_date=END_DATE,
                          fields='ts_code,ann_date,end_date,revenue,n_income,n_income_attr_p,ebit,ebitda')

    # balancesheet: money_cap (货币资金) / total_assets / total_liab
    def fetch_balance_for(code, start):
        return pro.balancesheet(ts_code=code, start_date=start, end_date=END_DATE,
                                fields='ts_code,ann_date,end_date,total_assets,total_liab,money_cap,total_cur_assets,total_cur_liab,total_share')

    # cashflow: n_cashflow_act (经营活动产生的现金流量净额) & c_pay_acq_const_fiolta (购建固定资产支付的现金) & free_cashflow
    def fetch_cash_for(code, start):
        return pro.cashflow(ts_code=code, start_date=start, end_date=END_DATE,
                            fields='ts_code,ann_date,end_date,n_cashflow_act,c_pay_acq_const_fiolta,free_cashflow')

    inc = update_table_incremental("income.csv", fetch_income_for)
    bal = update_table_incremental("balancesheet.csv", fetch_balance_for)
    cf = update_table_incremental("cashflow.csv", fetch_cash_for)
    return inc, bal, cf

# ========== Step 4: 指标计算与筛选（向量化/分组） ==========
def run_selector():
    # 读取已缓存的表格
    income_path = DATA_DIR / "income.csv"
    balance_path = DATA_DIR / "balancesheet.csv"
    cashflow_path = DATA_DIR / "cashflow.csv"
    stock_list_path = DATA_DIR / "stock_list.csv"

    if not (income_path.exists() and balance_path.exists() and cashflow_path.exists() and stock_list_path.exists()):
        raise RuntimeError("需要先确保 income.csv / balancesheet.csv / cashflow.csv / stock_list.csv 都已存在并被更新。")

    income = pd.read_csv(income_path, dtype=str)
    balance = pd.read_csv(balance_path, dtype=str)
    cash = pd.read_csv(cashflow_path, dtype=str)
    stocks = pd.read_csv(stock_list_path, dtype=str)

    # 转换常用字段为 numeric / datetime
    for df, colnames in [(income, ['revenue','n_income','n_income_attr_p','ebit','ebitda']),
                         (balance, ['total_assets','total_liab','money_cap','total_cur_assets','total_cur_liab']),
                         (cash, ['n_cashflow_act','c_pay_acq_const_fiolta','free_cashflow'])]:
        for c in colnames:
            if c in df.columns:
                df[c] = pd.to_numeric(df[c], errors='coerce')

    for df in [income, balance, cash]:
        if 'end_date' in df.columns:
            df['end_date'] = pd.to_datetime(df['end_date'], errors='coerce')

    # 以 ts_code 为键，按 end_date 排序，并聚合需要的时间序列（字典方式）
    inc_grp = {k: g.sort_values('end_date') for k,g in income.groupby('ts_code')}
    bal_grp = {k: g.sort_values('end_date') for k,g in balance.groupby('ts_code')}
    cf_grp  = {k: g.sort_values('end_date') for k,g in cash.groupby('ts_code')}

    selected = []
    failed = []

    ts_codes = stocks['ts_code'].tolist()

    print(f"[筛选器] 开始处理 {len(ts_codes)} 支股票 ...")
    for code in ts_codes:
        try:
            inc = inc_grp.get(code)
            bal = bal_grp.get(code)
            cf  = cf_grp.get(code)

            # 需要至少 2 年收益数据与至少 1 年现金流/资产负债
            if inc is None or inc.shape[0] < 2:
                continue
            if cf is None or cf.shape[0] < 1:
                continue
            if bal is None or bal.shape[0] < 1:
                continue

            # ----- 1) 收入 CAGR（取第一年与最后一年） -----
            rev = inc['revenue'].dropna().astype(float).values
            if rev.size < 2 or rev[0] <= 0:
                continue
            years_count = inc.shape[0]
            first_rev = float_safe(rev[0])
            last_rev = float_safe(rev[-1])
            if first_rev <= 0:
                continue
            CAGR = (last_rev / first_rev) ** (1.0 / (years_count - 1)) - 1.0
            if math.isnan(CAGR) or CAGR < CAGR_MIN:
                continue

            # ----- 2) 净利润连续增长年数（使用归母净利润 n_income_attr_p 优先） -----
            profit_col = 'n_income_attr_p' if 'n_income_attr_p' in inc.columns else ('n_income' if 'n_income' in inc.columns else None)
            if profit_col is None:
                continue
            net_series = inc[profit_col].dropna().astype(float).values
            if len(net_series) < 2:
                continue
            profit_growth_years = int((net_series[1:] - net_series[:-1] > 0).sum())
            if profit_growth_years < PROFIT_YEARS_MIN:
                continue

            # ----- 3) 自由现金流（OCF - CapEx） 与 FCF margin -----
            # 经常用字段：n_cashflow_act (经营活动净额)，c_pay_acq_const_fiolta (购建固定资产支付现金)
            if 'n_cashflow_act' in cf.columns:
                ocf = cf['n_cashflow_act'].dropna().astype(float).values
            else:
                ocf = None
            if ocf is None or len(ocf) == 0:
                continue
            capex = cf['c_pay_acq_const_fiolta'].dropna().astype(float).values if 'c_pay_acq_const_fiolta' in cf.columns else None

            # 对齐最近 n 年（以 rev 的长度为基准做 best-effort 对齐）
            n = min(len(ocf), len(rev))
            ocf_recent = ocf[-n:]
            rev_recent = rev[-n:].astype(float)
            if capex is not None and len(capex) >= n:
                capex_recent = capex[-n:].astype(float)
            else:
                # 如果确实没有 capex，尝试使用 cash 的 free_cashflow 字段或假设 capex=0（保守性差）
                if 'free_cashflow' in cf.columns:
                    fcf_vals = cf['free_cashflow'].dropna().astype(float).values
                    if len(fcf_vals) >= n:
                        fcf_series = fcf_vals[-n:]
                    else:
                        # 退回到 ocf - 0
                        capex_recent = [0.0] * n
                        fcf_series = ocf_recent - capex_recent
                else:
                    capex_recent = [0.0] * n
                    fcf_series = ocf_recent - capex_recent

            if 'fcf_series' not in locals():
                fcf_series = ocf_recent - capex_recent

            # 要求最近一年 FCF 正且平均 FCF margin >= threshold
            if len(fcf_series) == 0 or math.isnan(fcf_series[-1]) or fcf_series[-1] <= 0:
                continue
            fcf_margin = (fcf_series / rev_recent).mean()
            if pd.isna(fcf_margin) or fcf_margin < FCF_MARGIN_MIN:
                continue

            # ----- 4) ROE 平均（用归母净利润 / (total_assets - total_liab) as proxy） -----
            # 取 balance 的 total_assets & total_liab，取最近 k 年的平均ROE
            # balance record 对应的 end_date 可能与 inc 的 end_date 不完全对齐，采用 best-effort 计算：
            bal_assets = bal['total_assets'].dropna().astype(float).values
            bal_liab   = bal['total_liab'].dropna().astype(float).values
            # 如果存在 fina_indicator 的 ROE 列则更好（fina_indicator not used here to keep compatibility）
            # compute proxy ROE series align by available counts
            m = min(len(net_series), len(bal_assets))
            if m <= 0:
                continue
            # take last m pairs
            roe_vals = []
            for i_idx in range(-m, 0):
                ta = float_safe(bal_assets[i_idx])
                tl = float_safe(bal_liab[i_idx]) if len(bal_liab) >= m else 0.0
                denom = max(1e-9, ta - tl)
                netp = float_safe(net_series[i_idx])
                roe_vals.append(netp / denom if denom else float('nan'))
            # average ROE (as decimal)
            if len(roe_vals) == 0:
                continue
            roe_mean = pd.Series(roe_vals).dropna().mean()
            if pd.isna(roe_mean) or roe_mean < ROE_MIN:
                continue

            # ----- 5) 净负债 / EBITDA -----
            # net_debt = total_liab - money_cap(if available) ; ebitda try from income or cash (income 'ebitda' or income 'ebit' fallback)
            bal_last = bal.iloc[-1]
            total_liab_last = float_safe(bal_last.get('total_liab', float('nan')))
            cash_candidates = ['money_cap', 'c_cash_equ_end_period', 'cash_equ_end']  # various names
            cash_val = float('nan')
            for c in ['money_cap', 'c_cash_equ_end_period', 'c_cash_equ_end_period', 'total_cur_assets']:
                if c in bal.columns:
                    try:
                        cash_val = float_safe(bal_last.get(c))
                        break
                    except Exception:
                        cash_val = float('nan')
            if pd.isna(cash_val):
                cash_val = 0.0  # conservative fallback

            net_debt = total_liab_last - cash_val if not math.isnan(total_liab_last) else float('nan')

            # try get ebitda mean (from income['ebitda'] or income['ebitda'] present)
            ebitda_vals = []
            if 'ebitda' in inc.columns:
                ebitda_vals = inc['ebitda'].dropna().astype(float).values
            # fallback to income ebit if ebitda not available
            if (len(ebitda_vals) == 0) and ('ebit' in inc.columns):
                ebitda_vals = inc['ebit'].dropna().astype(float).values
            ebitda_mean = float('nan')
            if len(ebitda_vals) > 0:
                ebitda_mean = float(pd.Series(ebitda_vals).mean())

            ndebt_ebitda = float('nan')
            if (not math.isnan(net_debt)) and (not math.isnan(ebitda_mean)) and ebitda_mean > 0:
                ndebt_ebitda = net_debt / ebitda_mean
                if ndebt_ebitda > NETDEBT_EBITDA_MAX:
                    continue
            # 如果没有 ebitda 则不强制排除

            # ----- 6) Piotroski F-score（尽可能计算可得项并 scale 到 9） -----
            piot_score = 0
            piot_total = 0

            # 1) Positive ROA (we have roe_vals last)
            try:
                cur_roa = roe_vals[-1]
                if not math.isnan(cur_roa):
                    piot_total += 1
                    if cur_roa > 0:
                        piot_score += 1
            except Exception:
                pass

            # 2) Positive operating cash flow (n_cashflow_act last)
            try:
                ocf_vals = cf['n_cashflow_act'].dropna().astype(float).values
                cur_ocf = ocf_vals[-1] if len(ocf_vals) > 0 else float('nan')
                if not math.isnan(cur_ocf):
                    piot_total += 1
                    if cur_ocf > 0:
                        piot_score += 1
            except Exception:
                cur_ocf = float('nan')

            # 3) Accruals: OCF > Net profit (last)
            try:
                last_net = net_series[-1]
                if not math.isnan(cur_ocf) and not math.isnan(last_net):
                    piot_total += 1
                    if cur_ocf > last_net:
                        piot_score += 1
            except Exception:
                pass

            # 4) Leverage decreased (total_liab last < prev)
            try:
                if bal.shape[0] >= 2:
                    prev_liab = float_safe(bal['total_liab'].iloc[-2])
                    cur_liab = float_safe(bal['total_liab'].iloc[-1])
                    piot_total += 1
                    if cur_liab < prev_liab:
                        piot_score += 1
            except Exception:
                pass

            # 5) Current ratio improved (total_cur_assets / total_cur_liab)
            try:
                if ('total_cur_assets' in bal.columns) and ('total_cur_liab' in bal.columns) and bal.shape[0] >= 2:
                    prev_cur = float_safe(bal['total_cur_assets'].iloc[-2]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-2]))
                    cur_cur = float_safe(bal['total_cur_assets'].iloc[-1]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-1]))
                    piot_total += 1
                    if cur_cur > prev_cur:
                        piot_score += 1
            except Exception:
                pass

            # 6) No new shares issued - try total_share
            try:
                if 'total_share' in bal.columns and bal.shape[0] >= 2:
                    prev_share = float_safe(bal['total_share'].iloc[-2])
                    cur_share = float_safe(bal['total_share'].iloc[-1])
                    piot_total += 1
                    if cur_share <= prev_share:
                        piot_score += 1
            except Exception:
                pass

            # 7) Gross margin improved - try revenue grossprofit / revenue (if grossprofit exists)
            try:
                # income may contain grossprofit
                if 'grossprofit' in inc.columns and inc.shape[0] >= 2:
                    prev_gm = float_safe(inc['grossprofit'].iloc[-2]) / max(1e-9, float_safe(inc['revenue'].iloc[-2]))
                    cur_gm  = float_safe(inc['grossprofit'].iloc[-1]) / max(1e-9, float_safe(inc['revenue'].iloc[-1]))
                    piot_total += 1
                    if cur_gm > prev_gm:
                        piot_score += 1
            except Exception:
                pass

            # 8) Asset turnover improved (revenue / total_assets)
            try:
                if inc.shape[0] >= 2 and bal.shape[0] >= 2:
                    prev_turn = float_safe(inc['revenue'].iloc[-2]) / max(1e-9, float_safe(bal['total_assets'].iloc[-2]))
                    cur_turn  = float_safe(inc['revenue'].iloc[-1]) / max(1e-9, float_safe(bal['total_assets'].iloc[-1]))
                    piot_total += 1
                    if cur_turn > prev_turn:
                        piot_score += 1
            except Exception:
                pass

            # 9) (Optional) - earnings quality, skip if not available

            piot_score_scaled = (piot_score / max(1, piot_total)) * 9 if piot_total > 0 else 0
            if piot_score_scaled < PITO_F_MIN:
                continue

            # ----- 结果保存 -----
            name = stocks.loc[stocks['ts_code'] == code, 'name'].values[0] if code in stocks['ts_code'].values else code
            selected.append({
                'ts_code': code,
                'name': name,
                'CAGR': CAGR,
                'ProfitGrowthYears': int(profit_growth_years),
                'FCF_last': float_safe(fcf_series[-1]) if len(fcf_series)>0 else float('nan'),
                'FCF_Margin': float_safe(fcf_margin),
                'ROE_mean': float_safe(roe_mean),
                'NetDebt': float_safe(net_debt),
                'EBITDA_mean': float_safe(ebitda_mean),
                'NetDebt_EBITDA': float_safe(ndebt_ebitda),
                'Piotroski_scaled': float_safe(piot_score_scaled)
            })

        except Exception as e:
            failed.append((code, str(e)))
            continue

    # 输出
    df_sel = pd.DataFrame(selected)
    out_path = DATA_DIR / "selected_value_stocks.csv"
    if df_sel.empty:
        print("[筛选器] 无股票通过筛选。")
    else:
        sort_by = 'ROE_mean' if 'ROE_mean' in df_sel.columns else df_sel.columns[0]
        df_sel.sort_values(by=sort_by, ascending=False, inplace=True)
        df_sel.to_csv(out_path, index=False, encoding='utf-8-sig')
        print(f"[筛选器] 已选出 {len(df_sel)} 支股票，已保存到 {out_path}")
        print(df_sel.head(50).to_string(index=False))

    if failed:
        fail_path = DATA_DIR / "selector_failed_log.csv"
        pd.DataFrame(failed, columns=['ts_code','error']).to_csv(fail_path, index=False, encoding='utf-8-sig')
        print(f"[筛选器] 部分处理失败，已记录到 {fail_path}（失败数={len(failed)}）")

    return df_sel

# ========== 主流程 ==========
if __name__ == "__main__":
    print("=== 开始：确认股票名单 ===")
    fetch_stock_list()

    print("=== 更新财务表（增量） ===")
    update_all_financials()

    print("=== 运行筛选器 ===")
    df_selected = run_selector()
    print("=== 完成 ===")


=== 开始：确认股票名单 ===
[股票名单] 已加载缓存，共 5247 行
=== 更新财务表（增量） ===
[income.csv] 已发现已有文件，增量开始日期 = 20250812
[income.csv] 已是最新，无需更新（start_date > 今天）
[balancesheet.csv] 已发现已有文件，增量开始日期 = 20250812
[balancesheet.csv] 已是最新，无需更新（start_date > 今天）
[cashflow.csv] 已发现已有文件，增量开始日期 = 20250812
[cashflow.csv] 已是最新，无需更新（start_date > 今天）
=== 运行筛选器 ===
[筛选器] 开始处理 5247 支股票 ...
[筛选器] 无股票通过筛选。
=== 完成 ===


## 更新迭代后的价投选股逻辑

In [18]:
"""
改进版：年度口径 + 两层筛选 + 中间诊断输出（中文）
说明：
- 请先保证已经有三张表的 CSV：tushare_data/income.csv, balancesheet.csv, cashflow.csv
  （如果没有，运行你已有的增量下载脚本先生成）
- 替换下面的 TUSHARE_TOKEN 如需使用 API 下载（本脚本以本地 CSV 为主）
"""

import math
from pathlib import Path
from datetime import datetime
import pandas as pd

# ========== 配置（可调）==========
TUSHARE_TOKEN = "YOUR_TUSHARE_TOKEN_IF_NEEDED"
DATA_DIR = Path("tushare_data")
DATA_DIR.mkdir(exist_ok=True)

# 初筛（宽松，可调）
INIT_CAGR_MIN = 0.03       # 营收 CAGR >= 3%
INIT_ROE_MIN = 0.06        # 平均 ROE >= 6%
INIT_OCF_POS = True        # 最近一年经营现金流为正
INIT_PROFIT_YEARS_MIN = 1  # 最近净利同比增长年份>=1
INIT_REQUIRE_ALL = False   # 初筛是否要求必须满足所有条件（False 表示满足任何主条件且 OCF>0）

# 精筛（严格，可调）
REF_ROE_MIN = 0.025        # 平均 ROE >= 2.5%
REF_PROFIT_YEARS_MIN = 1   # 最近净利同比增长年份>=2
REF_FCF_MARGIN_MIN = 0.01   # 最近5年自由现金流/营收 >= 1%
REF_PIO_MIN = 3            # Piotroski（scaled） >=3
REF_NETDEBT_EBITDA_MAX = 2.0 # 净负债/EBITDA <= 2.0

# 用年数据的最近N年
YEARS_FOR_METRICS = 5

# ========== 工具函数 ===========
def float_safe(x):
    try:
        return float(x)
    except Exception:
        return float('nan')

def to_datetime_safe(s):
    try:
        return pd.to_datetime(s, errors='coerce')
    except Exception:
        return pd.NaT

# ========== 读取本地 CSV（假设已存在） ===========
income_path = DATA_DIR / "income.csv"
balance_path = DATA_DIR / "balancesheet.csv"
cashflow_path = DATA_DIR / "cashflow.csv"
stock_list_path = DATA_DIR / "stock_list.csv"

for p in [income_path, balance_path, cashflow_path, stock_list_path]:
    if not p.exists():
        raise FileNotFoundError(f"缺少文件：{p}。请先用增量下载脚本生成或把相应 CSV 放到 {DATA_DIR}。")

print("正在加载数据（本地 CSV）...")
income = pd.read_csv(income_path, dtype=str)
balance = pd.read_csv(balance_path, dtype=str)
cash = pd.read_csv(cashflow_path, dtype=str)
stocks = pd.read_csv(stock_list_path, dtype=str)
print(f"股票名单：{len(stocks)} 支；income 行数：{len(income)}，balance 行数：{len(balance)}，cash 行数：{len(cash)}")

# 转换列为数值并规范 end_date
for df, cols in [
    (income, ['revenue','n_income','n_income_attr_p','ebit','ebitda','grossprofit']),
    (balance, ['total_assets','total_liab','money_cap','total_cur_assets','total_cur_liab','total_share']),
    (cash, ['n_cashflow_act','c_pay_acq_const_fiolta','free_cashflow'])
]:
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')
for df in [income, balance, cash]:
    if 'end_date' in df.columns:
        df['end_date'] = pd.to_datetime(df['end_date'], errors='coerce')

# 年度化：以 end_date 的 year 为键，选择每年最新一条（优先选择 12-31 报表）
def make_annual(df, value_cols):
    """把 df 年度化：返回 dict: ts_code -> DataFrame(index=year sorted asc)"""
    result = {}
    # ensure end_date exists
    if 'end_date' not in df.columns:
        return result
    df = df.copy()
    df['year'] = df['end_date'].dt.year
    # 为每 (ts_code, year) 选择最近一条（最后发布日期）
    grouped = df.groupby(['ts_code','year'], as_index=False)
    # take last record per ts_code-year
    last_per_year = grouped.apply(lambda g: g.sort_values('end_date').iloc[-1]).reset_index(drop=True)
    # split per ts_code
    for ts, g in last_per_year.groupby('ts_code'):
        sub = g.sort_values('year')
        # keep only columns we might use + year
        result[ts] = sub.reset_index(drop=True)
    return result

print("年度化数据（按年取每年最后一期）...")
inc_annual = make_annual(income, None)
bal_annual = make_annual(balance, None)
cf_annual  = make_annual(cash, None)

# 诊断：有多少公司至少有 2 年的年度收入记录
count_2yr = sum(1 for k,v in inc_annual.items() if v['year'].nunique() >= 2)
print(f"年度收入记录 >=2 年 的公司数：{count_2yr}")

# ========== 计算函数 ===========
def calc_cagr_from_array(arr):
    """arr: array of revenue sorted asc; return CAGR between first and last if periods>0"""
    if arr is None or len(arr) < 2:
        return float('nan')
    first = float_safe(arr[0])
    last  = float_safe(arr[-1])
    n_periods = len(arr) - 1
    if first <= 0 or n_periods <= 0:
        return float('nan')
    return (last / first) ** (1.0 / n_periods) - 1.0

def calc_profit_growth_years(net_arr):
    if net_arr is None or len(net_arr) < 2:
        return 0
    diffs = [net_arr[i+1] - net_arr[i] for i in range(len(net_arr)-1)]
    return sum(1 for d in diffs if not pd.isna(d) and d > 0)

def calc_roe_series(net_arr, assets_arr, liab_arr):
    # net_arr, assets_arr, liab_arr aligned by year, return list of roe decimals
    roes = []
    L = min(len(net_arr), len(assets_arr))
    for i in range(L):
        net = float_safe(net_arr[i])
        ta  = float_safe(assets_arr[i])
        tl  = float_safe(liab_arr[i]) if len(liab_arr) >= L else 0.0
        eq = max(1e-9, ta - tl)
        roes.append(net / eq if eq != 0 else float('nan'))
    return roes

def calc_fcf_series(ocf_arr, capex_arr):
    if ocf_arr is None or len(ocf_arr) == 0:
        return []
    # align lengths
    L = min(len(ocf_arr), len(capex_arr)) if capex_arr is not None else len(ocf_arr)
    if L == 0:
        return []
    cap = capex_arr[-L:] if capex_arr is not None else [0.0]*L
    ocf_last = ocf_arr[-L:]
    fcf = [float_safe(ocf_last[i]) - float_safe(cap[i]) for i in range(L)]
    return fcf

# ========== 两层筛选（含诊断统计）==========
initial = []
refined = []
failed = []

# 统计器：中间各条件通过数
stats = {
    'has_annual_2yr': 0,
    'cagr_ok': 0,
    'ocf_positive_last': 0,
    'roe_ok': 0,
    'profit_growth_ok': 0,
    'fcf_last_pos': 0,
    'passed_initial': 0,
    'passed_refined': 0
}

ts_codes = stocks['ts_code'].tolist()
print(f"开始对 {len(ts_codes)} 支股票做年度口径的两层筛选...")

for code in ts_codes:
    try:
        inc = inc_annual.get(code)
        bal = bal_annual.get(code)
        cf  = cf_annual.get(code)
        # 基本年数据要求：至少 2 年收入数据（年度）
        if (inc is None) or (inc.shape[0] < 2):
            continue
        stats['has_annual_2yr'] += 1

        # revenue series（按年升序）
        rev_arr = inc['revenue'].dropna().astype(float).values.tolist()
        if len(rev_arr) < 2:
            continue
        cagr = calc_cagr_from_array(rev_arr)

        # profit series (优先归母 n_income_attr_p)
        profit_col = 'n_income_attr_p' if 'n_income_attr_p' in inc.columns else ('n_income' if 'n_income' in inc.columns else None)
        if profit_col is None:
            net_arr = []
        else:
            net_arr = inc[profit_col].dropna().astype(float).values.tolist()

        # OCF 和 CAPEX 年度
        ocf_arr = cf['n_cashflow_act'].dropna().astype(float).values.tolist() if (cf is not None and 'n_cashflow_act' in cf.columns) else []
        capex_arr = cf['c_pay_acq_const_fiolta'].dropna().astype(float).values.tolist() if (cf is not None and 'c_pay_acq_const_fiolta' in cf.columns) else None
        fcf_arr = calc_fcf_series(ocf_arr, capex_arr)

        # ROE series using balance annual
        if bal is not None and 'total_assets' in bal.columns and 'total_liab' in bal.columns:
            assets_arr = bal['total_assets'].dropna().astype(float).values.tolist()
            liab_arr   = bal['total_liab'].dropna().astype(float).values.tolist()
            roe_vals = calc_roe_series(net_arr, assets_arr, liab_arr) if len(net_arr)>0 and len(assets_arr)>0 else []
            roe_mean = float(pd.Series(roe_vals).dropna().mean()) if len(roe_vals)>0 else float('nan')
        else:
            roe_vals = []; roe_mean = float('nan')

        # profit growth years
        profit_growth_years = calc_profit_growth_years(net_arr)

        # 判断各条件（初筛判定逻辑：OCF>0 必须，同时满足以下任意一项：CAGR/ROE/profit growth）
        cond_cagr = (not math.isnan(cagr)) and (cagr >= INIT_CAGR_MIN)
        if cond_cagr: stats['cagr_ok'] += 1
        cond_ocf = (len(ocf_arr)>0 and (not math.isnan(ocf_arr[-1])) and (ocf_arr[-1] > 0))
        if cond_ocf: stats['ocf_positive_last'] += 1
        cond_roe = (not math.isnan(roe_mean)) and (roe_mean >= INIT_ROE_MIN)
        if cond_roe: stats['roe_ok'] += 1
        cond_profit_years = profit_growth_years >= INIT_PROFIT_YEARS_MIN
        if cond_profit_years: stats['profit_growth_ok'] += 1
        cond_fcf_last_pos = (len(fcf_arr)>0 and (not math.isnan(fcf_arr[-1])) and fcf_arr[-1] > 0)
        if cond_fcf_last_pos: stats['fcf_last_pos'] += 1

        # 初筛判定
        if INIT_REQUIRE_ALL:
            passed_initial_flag = cond_cagr and cond_ocf and cond_roe and cond_profit_years and cond_fcf_last_pos
        else:
            # 宽松：必须 OCF>0 且（CAGR>=min 或 ROE>=min 或 profit_years>=min）
            passed_initial_flag = cond_ocf and (cond_cagr or cond_roe or cond_profit_years or cond_fcf_last_pos)

        if passed_initial_flag:
            stats['passed_initial'] += 1
            name = stocks.loc[stocks['ts_code']==code, 'name'].values[0] if code in stocks['ts_code'].values else code
            initial.append({
                'ts_code': code, 'name': name,
                'CAGR': cagr, 'ROE_mean': roe_mean,
                'OCF_last': ocf_arr[-1] if len(ocf_arr)>0 else float('nan'),
                'FCF_last': fcf_arr[-1] if len(fcf_arr)>0 else float('nan'),
                'ProfitGrowthYears': int(profit_growth_years)
            })
    except Exception as e:
        failed.append((code, str(e)))
        continue

# 初筛统计输出
print("====== 初筛统计（年度口径） ======")
for k,v in stats.items():
    print(f"{k} : {v}")
print(f"初筛候选总数：{len(initial)} (已保存到 {DATA_DIR/'initial_candidates.csv'})")
pd.DataFrame(initial).sort_values(by='ROE_mean', ascending=False).to_csv(DATA_DIR/'initial_candidates.csv', index=False, encoding='utf-8-sig')

# ===== 精筛：在初筛基础上做更严格判定 =====
if len(initial) == 0:
    print("初筛结果为空，建议放宽初筛阈值（例如将 INIT_CAGR_MIN 从 0.03 降到 0.01，或把 INIT_ROE_MIN 下降），或检查年度化的数据完整性。")
else:
    print("开始精筛（对初筛结果进行更严格判断）...")
    refined = []
    for rec in initial:
        code = rec['ts_code']
        try:
            inc = inc_annual.get(code)
            bal = bal_annual.get(code)
            cf  = cf_annual.get(code)
            # recompute metrics safely
            rev_arr = inc['revenue'].dropna().astype(float).values.tolist() if inc is not None else []
            profit_col = 'n_income_attr_p' if inc is not None and 'n_income_attr_p' in inc.columns else ('n_income' if inc is not None and 'n_income' in inc.columns else None)
            net_arr = inc[profit_col].dropna().astype(float).values.tolist() if profit_col is not None else []
            ocf_arr = cf['n_cashflow_act'].dropna().astype(float).values.tolist() if cf is not None and 'n_cashflow_act' in cf.columns else []
            capex_arr = cf['c_pay_acq_const_fiolta'].dropna().astype(float).values.tolist() if cf is not None and 'c_pay_acq_const_fiolta' in cf.columns else None
            fcf_arr = calc_fcf_series(ocf_arr, capex_arr)
            # roe
            if bal is not None and 'total_assets' in bal.columns:
                assets_arr = bal['total_assets'].dropna().astype(float).values.tolist()
                liab_arr   = bal['total_liab'].dropna().astype(float).values.tolist() if 'total_liab' in bal.columns else []
                roe_vals = calc_roe_series(net_arr, assets_arr, liab_arr) if len(net_arr)>0 and len(assets_arr)>0 else []
                roe_mean = float(pd.Series(roe_vals).dropna().mean()) if len(roe_vals)>0 else float('nan')
            else:
                roe_vals = []; roe_mean = float('nan')

            # profit growth years
            profit_growth_years = calc_profit_growth_years(net_arr)

            # fcf_margin (last YEARS_FOR_METRICS years)
            # align last YEARS_FOR_METRICS of fcf vs rev
            nf = min(len(fcf_arr), len(rev_arr), YEARS_FOR_METRICS)
            if nf > 0:
                fcf_margin = (sum(float_safe(fcf_arr[-nf+i]) for i in range(nf)) /
                              sum(float_safe(rev_arr[-nf+i]) for i in range(nf)) ) if sum(float_safe(rev_arr[-nf+i]) for i in range(nf)) != 0 else float('nan')
            else:
                fcf_margin = float('nan')

            # Piotroski 简版（按可得项 scale）
            p_score = 0; p_tot = 0
            # ROA positive
            if len(roe_vals)>0 and not math.isnan(roe_vals[-1]):
                p_tot += 1
                if roe_vals[-1] > 0: p_score += 1
            # OCF positive
            if len(ocf_arr)>0:
                p_tot += 1
                if ocf_arr[-1] > 0: p_score += 1
            # accruals OCF > net profit
            if len(ocf_arr)>0 and len(net_arr)>0 and not math.isnan(ocf_arr[-1]) and not math.isnan(net_arr[-1]):
                p_tot += 1
                if ocf_arr[-1] > net_arr[-1]: p_score += 1
            # leverage decreased
            try:
                if bal.shape[0] >= 2 and 'total_liab' in bal.columns:
                    p_tot += 1
                    if float_safe(bal['total_liab'].iloc[-1]) < float_safe(bal['total_liab'].iloc[-2]): p_score += 1
            except: pass
            # current ratio improved
            try:
                if 'total_cur_assets' in bal.columns and 'total_cur_liab' in bal.columns and bal.shape[0] >= 2:
                    p_tot += 1
                    prev_cur = float_safe(bal['total_cur_assets'].iloc[-2]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-2]))
                    cur_cur  = float_safe(bal['total_cur_assets'].iloc[-1]) / max(1e-9, float_safe(bal['total_cur_liab'].iloc[-1]))
                    if cur_cur > prev_cur: p_score += 1
            except: pass
            # shares not increased
            try:
                if 'total_share' in bal.columns and bal.shape[0] >= 2:
                    p_tot += 1
                    if float_safe(bal['total_share'].iloc[-1]) <= float_safe(bal['total_share'].iloc[-2]): p_score += 1
            except: pass
            # gross margin improved
            try:
                if 'grossprofit' in inc.columns and inc.shape[0] >= 2:
                    p_tot += 1
                    prev_gm = float_safe(inc['grossprofit'].iloc[-2]) / max(1e-9, float_safe(inc['revenue'].iloc[-2]))
                    cur_gm  = float_safe(inc['grossprofit'].iloc[-1]) / max(1e-9, float_safe(inc['revenue'].iloc[-1]))
                    if cur_gm > prev_gm: p_score += 1
            except: pass
            # asset turnover improved
            try:
                if inc.shape[0] >= 2 and bal.shape[0] >= 2:
                    p_tot += 1
                    prev_turn = float_safe(inc['revenue'].iloc[-2]) / max(1e-9, float_safe(bal['total_assets'].iloc[-2]))
                    cur_turn  = float_safe(inc['revenue'].iloc[-1]) / max(1e-9, float_safe(bal['total_assets'].iloc[-1]))
                    if cur_turn > prev_turn: p_score += 1
            except: pass

            p_scaled = (p_score / max(1, p_tot)) * 9 if p_tot>0 else 0

            # net debt / ebitda
            try:
                bal_last = bal.iloc[-1]
                total_liab_last = float_safe(bal_last.get('total_liab', float('nan')))
                cash_val = float_safe(bal_last.get('money_cap', float('nan'))) if 'money_cap' in bal.columns else float('nan')
                if math.isnan(cash_val): cash_val = 0.0
                net_debt = total_liab_last - cash_val if not math.isnan(total_liab_last) else float('nan')
            except:
                net_debt = float('nan')
            ebitda_vals = inc['ebitda'].dropna().astype(float).values.tolist() if 'ebitda' in inc.columns else []
            if len(ebitda_vals)==0 and 'ebit' in inc.columns:
                ebitda_vals = inc['ebit'].dropna().astype(float).values.tolist()
            ebitda_mean = float(pd.Series(ebitda_vals).mean()) if len(ebitda_vals)>0 else float('nan')
            ndebt_ebitda = net_debt / ebitda_mean if (not math.isnan(net_debt) and not math.isnan(ebitda_mean) and ebitda_mean>0) else float('nan')

            # 精筛判定
            cond_ref = True
            if math.isnan(roe_mean) or roe_mean < REF_ROE_MIN: cond_ref = False
            if profit_growth_years < REF_PROFIT_YEARS_MIN: cond_ref = False
            if math.isnan(fcf_margin) or fcf_margin < REF_FCF_MARGIN_MIN: cond_ref = False
            if p_scaled < REF_PIO_MIN: cond_ref = False
            if not math.isnan(ndebt_ebitda) and ndebt_ebitda > REF_NETDEBT_EBITDA_MAX: cond_ref = False

            if cond_ref:
                refined.append({
                    'ts_code': code, 'name': stocks.loc[stocks['ts_code']==code, 'name'].values[0] if code in stocks['ts_code'].values else code,
                    'ROE_mean': roe_mean, 'ProfitGrowthYears': profit_growth_years, 'FCF_margin': fcf_margin,
                    'Piotroski_scaled': p_scaled, 'NetDebt_EBITDA': ndebt_ebitda
                })
        except Exception as e:
            failed.append((code, str(e)))
            continue

    # 保存
    if len(refined) > 0:
        pd.DataFrame(refined).sort_values(by='ROE_mean', ascending=False).to_csv(DATA_DIR/'refined_candidates.csv', index=False, encoding='utf-8-sig')
        print(f"精筛完成：初筛候选 {len(initial)}，精筛通过 {len(refined)}。结果已保存到 {DATA_DIR/'refined_candidates.csv'}")
    else:
        empty_df = pd.DataFrame(columns=['ts_code', 'name', 'ROE_mean', 'ProfitGrowthYears', 
                                         'FCF_margin', 'Piotroski_scaled', 'NetDebt_EBITDA'])
        empty_df.to_csv(DATA_DIR/'refined_candidates.csv', index=False, encoding='utf-8-sig')
        print(f"精筛完成：初筛候选 {len(initial)}，精筛通过 0。空结果文件已创建")

# 失败日志
if failed:
    pd.DataFrame(failed, columns=['ts_code','error']).to_csv(DATA_DIR/"selector_failed_log.csv", index=False, encoding='utf-8-sig')
    print(f"部分处理失败（详情见 selector_failed_log.csv），失败数：{len(failed)}")

print("筛选流程结束。")


正在加载数据（本地 CSV）...
股票名单：5247 支；income 行数：198467，balance 行数：193262，cash 行数：204045
年度化数据（按年取每年最后一期）...
年度收入记录 >=2 年 的公司数：5247
开始对 5247 支股票做年度口径的两层筛选...
has_annual_2yr : 5247
cagr_ok : 616
ocf_positive_last : 2674
roe_ok : 0
profit_growth_ok : 0
fcf_last_pos : 2674
passed_initial : 2674
passed_refined : 0
初筛候选总数：2674 (已保存到 tushare_data\initial_candidates.csv)
开始精筛（对初筛结果进行更严格判断）...
精筛完成：初筛候选 2674，精筛通过 0。空结果文件已创建
筛选流程结束。


In [3]:
import pandas as pd
import os

input_file = r"D:\Python_Quantity\掘金计划\币圈策略\beta\data\pickle_data\swap\1INCH-USDT.pkl"
output_file = os.path.splitext(input_file)[0] + ".csv"  # 替换扩展名为csv

# 尝试不同方式读取
df = None
try:
    df = pd.read_pickle(input_file)
    print("用 pickle 方式读取成功")
except Exception as e1:
    print("pickle 读取失败:", e1)
    try:
        df = pd.read_parquet(input_file)
        print("用 parquet 方式读取成功")
    except Exception as e2:
        print("parquet 读取失败:", e2)
        try:
            df = pd.read_feather(input_file)
            print("用 feather 方式读取成功")
        except Exception as e3:
            print("feather 读取失败:", e3)
            try:
                df = pd.read_csv(input_file)
                print("用 csv 方式读取成功")
            except Exception as e4:
                print("csv 读取失败:", e4)

if df is not None:
    df.to_csv(output_file, index=False, encoding="utf-8-sig")
    print(f"转换完成，CSV 文件已保存到: {output_file}")
else:
    print("❌ 无法识别该文件格式，请确认 .pkl 实际存储格式")


pickle 读取失败: invalid load key, 'A'.
parquet 读取失败: Could not open Parquet input source '<Buffer>': Parquet magic bytes not found in footer. Either the file is corrupted or this is not a parquet file.
用 feather 方式读取成功
转换完成，CSV 文件已保存到: D:\Python_Quantity\掘金计划\币圈策略\beta\data\pickle_data\swap\1INCH-USDT.csv
