In [None]:
# Copyright (c) 2025 Yuyang Yao
# SPDX-License-Identifier: PolyForm-Noncommercial-1.0.0

### 高频数据处理函数

In [1]:
#参数设定
import os
import pandas as pd
CODE_MAP = {
    '000852.SH': 'IM',  # 中证1000指数 -> IM期货
    '000905.SH': 'IC',  # 中证500指数 -> IC期货
    '000300.SH': 'IF',  # 沪深300指数 -> IF期货
    '000016.SH': 'IH',  # 上证50指数 -> IH期货
}
PROJECT_PATH = "/Users/yaoyuyang/Desktop/Code/daily_futures_arbitrary_strategy/src"
DATA_PATH = os.path.abspath(os.path.join(PROJECT_PATH, "..",'data'))
RAW_DATA_PATH=os.path.join(DATA_PATH, 'raw')
FILES = {
    '000852.SH':'000852.csv',
    '000905.SH':'000905.csv',
    '000300.SH':'000300.csv',
    '000016.SH':'000016.csv'
}
CAL_TYPE = "udy"
PAIRS = [('IM','IC'), ('IM','IF'), ('IM','IH'),
        ('IC','IF'), ('IC','IH'),
        ('IF','IH')]

In [7]:
#日度调仓
LEVERAGE = 2.0
NV0 = 1.0
frames = []
for code, fname in FILES.items():
    path= os.path.join(RAW_DATA_PATH,fname)
    df = pd.read_csv(path)

    needed = {'time','close'}
    missing = needed - set(df.columns)
    if missing:
        raise ValueError(f"{fname}缺少列：{missing}")

    df = df.copy()
    df["time"] = pd.to_datetime(df["time"])
    df = df.sort_values('time')
    ticker = CODE_MAP[code]
    df = df[['time','close']].rename(columns = {'close':f'p_{ticker}'})
    frames.append(df)

merged = frames[0]
for d in frames[1:]:
    merged = merged.merge(d,on='time',how='inner')
merged['date'] = merged['time'].dt.normalize()

price_cols = [c for c in merged.columns if c.startswith('p_')]
daily_close = merged.groupby('date')[price_cols].last()
prev_close_by_date = daily_close.shift(1)
prev_close_by_date.columns = [c.replace('p_','pc_') for c in prev_close_by_date.columns]

#广播到分钟数据
merged = merged.merge(prev_close_by_date.reset_index(),on='date',how='left')

In [13]:
#计算 OHLC
nv_prev =  {f'{a}{b}':NV0 for (a,b) in PAIRS}
rows = []

for date ,g in merged.groupby('date',sort = True):
    row = {'date': date}
    for long_t,short_t in PAIRS:
        pair = f'{long_t}{short_t}'
        # 当日分钟级价格
        p_long = g[f'p_{long_t}']
        p_short = g[f'p_{short_t}']
        # 当日用于定份额的“昨收”（取第一行即可，已通过 merge 广播）
        pc_long = g[f'pc_{long_t}'].iloc[0]
        pc_short = g[f'pc_{short_t}'].iloc[0]

        nv0_pair = nv_prev[pair]

        #第一日因为没有昨收，所以一直维持前值
        if pd.isna(pc_long) or pd.isna(pc_short):
            o = h = l = c = nv0_pair
        else:
            Rm = (LEVERAGE/2.0)*(p_long/pc_long-p_short/pc_short)
            NVm = nv0_pair*(1+Rm)

            o = float(NVm.iloc[0])
            h = float(NVm.max())
            l = float(NVm.min())
            c = float(NVm.iloc[-1])

            nv_prev[pair] = c
                # 输出列：<PAIR>_<CAL_TYPE>_open/high/low/close
        row[f'{pair}_open']  = o
        row[f'{pair}_high']  = h
        row[f'{pair}_low']   = l
        row[f'{pair}_close'] = c

    rows.append(row)

out = pd.DataFrame(rows).sort_values('date')

In [14]:
out.to_csv(os.path.join(RAW_DATA_PATH,f'index_nv_data.csv'),index=False)

### 因子筛选和参数选择函数

In [17]:
import json
import math
import ast
import re
import numpy as np
import pandas as pd
from typing import Dict, Any, List
from config import AVAILABLE_PAIRS, FACTOR_SELECTION_PATH

# ---- 可调阈值 ----
POS_THRESHOLDS = {
    "sharpe_min": 0.50,
    "sortino_min": 0.30,
    "calmar_min": 0.10,
    "pf_min": 1.10,
    "mdd_cap": -40.0,
}
NEG_THRESHOLDS = {
    "sharpe_max": -0.60,
    "pf_max": 0.90,
    "win_rate_max01": 0.40,
    "trade_count_min": 100,
}
FLIP_MARGIN = 0.05
PF_CAP_FOR_INVERSION = 3.0

# 列名别名（小写匹配）
ALIASES = {
    "name":       ["name"],
    "func":       ["func", "factor", "indicator", "signal", "signal_name"],
    "sharpe":     ["sharpe", "sharpe_ratio", "sharp", "sharp_ratio"],
    "sortino":    ["sortino", "sortino_ratio"],
    "calmar":     ["calmar", "calmar_ratio"],
    "pf":         ["profit_factor", "pf", "profitfactor"],
    "win_rate":   ["win_rate", "winrate", "win_rate_pct", "win%"],
    "ann_ret":    ["annual_return_pct", "annual_return", "ann_return_pct", "annualized_return_pct"],
    "mdd":        ["max_drawdown_pct", "max_drawdown", "mdd_pct"],
    "trade_cnt":  ["trade_count", "trades", "num_trades", "n_trades"],
    "params":     ["params", "param_json", "param", "config"]
}

def _find_col(df: pd.DataFrame, keys: List[str]) -> str:
    cols = {c.lower(): c for c in df.columns}
    for k in keys:
        lk = k.lower()
        if lk in cols:
            return cols[lk]
    return ""

def _get_series(df: pd.DataFrame, alias_key: str) -> pd.Series:
    col = _find_col(df, ALIASES[alias_key])
    return df[col] if col else pd.Series([np.nan]*len(df), index=df.index)

def _normalize_winrate(sr: pd.Series) -> pd.Series:
    if sr.dropna().max() is not None and sr.dropna().max() > 1.5:
        return sr / 100.0
    return sr

def _rank_within_group(s: pd.Series) -> pd.Series:
    if s.nunique() <= 1:
        return pd.Series([0.5]*len(s), index=s.index)
    return s.rank(pct=True, method="average")

_func_pat = re.compile(r"<function\s+([A-Za-z0-9_]+)\s+at\s+0x[0-9A-Fa-f]+>")

def _canon_func_name(s: str) -> str:
    if not isinstance(s, str):
        s = str(s)
    m = _func_pat.fullmatch(s.strip())
    return m.group(1) if m else s.strip()

def _parse_params_cell(val):
    # 直接读取 params 列：尽量解析成 dict，失败则原样返回
    if isinstance(val, dict):
        return val
    if isinstance(val, (list, tuple)):
        return list(val)
    if pd.isna(val):
        return {}
    s = str(val).strip()
    # 尝试 JSON
    try:
        obj = json.loads(s)
        return obj
    except Exception:
        pass
    # 尝试 Python 字面量
    try:
        obj = ast.literal_eval(s)
        return obj
    except Exception:
        pass
    # 都不行，原样给回字符串
    return s

def select_params_from_file(path: str, verbose: bool = True) -> Dict[str, Any]:
    # --- 读取 ---
    df = pd.read_csv(path)
    if verbose:
        print(f"[Stage 0] Loaded rows: {len(df)} ; cols: {list(df.columns)}")

    # 标准列
    s_name    = _get_series(df, "name")
    s_func    = _get_series(df, "func")     # 仅作为回退
    s_sharpe  = _get_series(df, "sharpe").astype(float)
    s_sortino = _get_series(df, "sortino").astype(float)
    s_calmar  = _get_series(df, "calmar").astype(float)
    s_pf      = _get_series(df, "pf").astype(float)
    s_wr_raw  = _get_series(df, "win_rate").astype(float)
    s_wr01    = _normalize_winrate(s_wr_raw)
    s_annret  = _get_series(df, "ann_ret").astype(float)
    s_mdd     = _get_series(df, "mdd").astype(float)
    s_trades  = _get_series(df, "trade_cnt")
    s_params  = _get_series(df, "params")

    # --- 清洗 ---
    mask_valid = pd.Series(True, index=df.index)
    mask_valid &= ~( (s_annret.fillna(0)==0) | (s_mdd.fillna(0)==0) )
    mask_valid &= ~( s_wr01 >= 0.98 )

    before = len(df)
    df = df[mask_valid].copy()
    if verbose:
        print(f"[Stage 1] After cleaning kept: {len(df)} (removed {before-len(df)})")

    # 绑定清洗后的视图
    s_name    = s_name.loc[df.index]
    s_func    = s_func.loc[df.index]
    s_sharpe  = s_sharpe.loc[df.index]
    s_sortino = s_sortino.loc[df.index]
    s_calmar  = s_calmar.loc[df.index]
    s_pf      = s_pf.loc[df.index]
    s_wr01    = s_wr01.loc[df.index]
    s_annret  = s_annret.loc[df.index]
    s_mdd     = s_mdd.loc[df.index]
    s_trades  = s_trades.loc[df.index]
    s_params  = s_params.loc[df.index]

    # ---- 正向候选 ----
    pos_mask = (
        (s_sharpe  >= POS_THRESHOLDS["sharpe_min"]) &
        (s_sortino >= POS_THRESHOLDS["sortino_min"]) &
        (s_calmar  >= POS_THRESHOLDS["calmar_min"]) &
        (s_pf      >= POS_THRESHOLDS["pf_min"])
    )
    if not s_mdd.isna().all():
        pos_mask &= (s_mdd >= POS_THRESHOLDS["mdd_cap"])
    df_pos = df[pos_mask].copy()
    if verbose:
        print(f"[Stage 2] Positive candidates: {len(df_pos)}")

    # ---- 反向候选 ----
    neg_mask = (
        (s_sharpe <= NEG_THRESHOLDS["sharpe_max"]) &
        (s_pf     <= NEG_THRESHOLDS["pf_max"]) &
        (s_wr01   <= NEG_THRESHOLDS["win_rate_max01"]) &
        (s_annret <  0)
    )
    if not s_trades.isna().all():
        tr = pd.to_numeric(s_trades, errors="coerce")
        neg_mask &= (tr >= NEG_THRESHOLDS["trade_count_min"])
    df_neg = df[neg_mask].copy()
    if verbose:
        print(f"[Stage 3] Flip (negative) candidates: {len(df_neg)}")

    # ---- 分组键：优先 name 列 ----
    name_col = _find_col(df, ALIASES["name"])
    if name_col:
        group_col = name_col
    else:
        func_col = _find_col(df, ALIASES["func"])
        if not func_col:
            func_col = "__single_factor__"
            df[func_col] = "FACTOR"
            df_pos[func_col] = "FACTOR"
            df_neg[func_col] = "FACTOR"
        group_col = func_col

    grouped_all = df.groupby(group_col)
    grouped_pos = df_pos.groupby(group_col) if len(df_pos) else {}
    grouped_neg = df_neg.groupby(group_col) if len(df_neg) else {}

    result: Dict[str, Any] = {}

    for key, g_all in grouped_all:
        # 该 key 即来自 name 列（或回退列）——顶层键按原样使用
        top_key = str(key)

        # 正向集
        if isinstance(grouped_pos, dict):
            g_pos = pd.DataFrame(columns=df.columns)
        else:
            try:
                g_pos = grouped_pos.get_group(key)
            except Exception:
                g_pos = pd.DataFrame(columns=df.columns)

        # 反向集
        if isinstance(grouped_neg, dict):
            g_neg = pd.DataFrame(columns=df.columns)
        else:
            try:
                g_neg = grouped_neg.get_group(key)
            except Exception:
                g_neg = pd.DataFrame(columns=df.columns)

        # 正向打分
        best_pos_score = -np.inf
        best_pos_row = None
        if len(g_pos):
            rS  = _rank_within_group(g_pos[_find_col(df, ALIASES["sharpe"])].astype(float))
            rSo = _rank_within_group(g_pos[_find_col(df, ALIASES["sortino"])].astype(float))
            rC  = _rank_within_group(g_pos[_find_col(df, ALIASES["calmar"])].astype(float))
            rPF = _rank_within_group(g_pos[_find_col(df, ALIASES["pf"])].astype(float))
            rAR = _rank_within_group(g_pos[_find_col(df, ALIASES["ann_ret"])].astype(float))
            pos_score = 0.4*rS + 0.2*rSo + 0.2*rC + 0.1*rPF + 0.1*rAR
            idx = pos_score.idxmax()
            best_pos_score = float(pos_score.loc[idx])
            best_pos_row = g_pos.loc[idx]

        # 反向打分（-Sharpe, 1/PF capped, -AnnRet）
        best_neg_score = -np.inf
        best_neg_row = None
        if len(g_neg):
            sS  = g_neg[_find_col(df, ALIASES["sharpe"])].astype(float)
            sPF = g_neg[_find_col(df, ALIASES["pf"])].astype(float)
            sAR = g_neg[_find_col(df, ALIASES["ann_ret"])].astype(float)
            invPF = 1.0 / sPF.replace(0, np.nan)
            invPF = invPF.clip(upper=PF_CAP_FOR_INVERSION).fillna(PF_CAP_FOR_INVERSION)
            rS_flip  = _rank_within_group(-sS)
            rPF_flip = _rank_within_group(invPF)
            rAR_flip = _rank_within_group(-sAR)
            neg_score = 0.5*rS_flip + 0.3*rPF_flip + 0.2*rAR_flip
            idx = neg_score.idxmax()
            best_neg_score = float(neg_score.loc[idx])
            best_neg_row = g_neg.loc[idx]

        # 镜像对决
        chosen_row = None
        direction = 1
        if (best_neg_score > -np.inf) and (best_neg_score > best_pos_score + FLIP_MARGIN):
            chosen_row = best_neg_row
            direction = -1
        elif best_pos_score > -np.inf:
            chosen_row = best_pos_row
            direction = 1
        else:
            continue

        # -------- 生成输出 --------
        # func：仅保留纯函数名
        func_for_output = _canon_func_name(top_key)

        # params：直接读取 params 列
        params_col = _find_col(df, ALIASES["params"])
        params_obj = {}
        if params_col and params_col in chosen_row.index:
            params_obj = _parse_params_cell(chosen_row[params_col])

        # metrics
        def _v(row, alias_key, default=np.nan):
            col = _find_col(df, ALIASES[alias_key])
            return float(row[col]) if col and pd.notna(row[col]) else default

        metrics = {
            "sharpe":            _v(chosen_row, "sharpe"),
            "sortino":           _v(chosen_row, "sortino"),
            "calmar":            _v(chosen_row, "calmar"),
            "max_drawdown_pct":  _v(chosen_row, "mdd"),
            "annual_return_pct": _v(chosen_row, "ann_ret"),
            "profit_factor":     _v(chosen_row, "pf"),
            "win_rate":          float(_normalize_winrate(pd.Series([_v(chosen_row, "win_rate")])).iloc[0] * 100.0),
        }

        result[top_key] = {
            "func": func_for_output,   # 仅函数名
            "params": params_obj,      # 直接来自 params 列
            "direction": int(direction),
            "metrics": metrics
        }

    if verbose:
        print(f"[Stage 4] Selected factors: {len(result)}")
        preview_keys = list(result.keys())
        print(f"[Preview] {preview_keys}")
        print(json.dumps(result, ensure_ascii=False, indent=2, default=lambda o: float(o) if isinstance(o, (np.floating,)) else o))

    return result

# 用法：
# out = select_params_from_file("/path/to/your_table.csv", verbose=True)
# with open("selected_params.json", "w", encoding="utf-8") as f:
#     json.dump(out, f, ensure_ascii=False, indent=2)


In [18]:
# --- 用法示例 ---
for pair in AVAILABLE_PAIRS:
    if pair == "IMIC":
        pass
    # 读取文件
    file_path = f"/Users/yaoyuyang/Desktop/Code/daily_futures_arbitrary_strategy/results/factor_selection/{pair}_grid_results.csv"
    out = select_params_from_file(file_path, verbose=True)
    # 结果字典结构即为目标 JSON，可按需保存）
    with open(f"{FACTOR_SELECTION_PATH}/{pair}_selected_factors.json", "w", encoding="utf-8") as f:
        json.dump(out, f, ensure_ascii=False, indent=2)
# 结果字典结构即为目标 JSON，可按需保存：
# with open("selected_params.json", "w", encoding="utf-8") as f:
#     json.dump(out, f, ensure_ascii=False, indent=2)

[Stage 0] Loaded rows: 10635 ; cols: ['name', 'func', 'params', 'sharpe']
[Stage 1] After cleaning kept: 0 (removed 10635)
[Stage 2] Positive candidates: 0
[Stage 3] Flip (negative) candidates: 0
[Stage 4] Selected factors: 0
[Preview] []
{}
[Stage 0] Loaded rows: 10635 ; cols: ['name', 'func', 'params', 'sharpe', 'sortino', 'calmar', 'max_drawdown_pct', 'annual_return_pct', 'profit_factor', 'win_rate']
[Stage 1] After cleaning kept: 6081 (removed 4554)
[Stage 2] Positive candidates: 218
[Stage 3] Flip (negative) candidates: 258
[Stage 4] Selected factors: 27
[Preview] ['DoubleMA', 'EXPWMA', 'Kaufman', 'MESA_Adaptive', 'MIDPOINT', 'MIDPRICE', 'MOM_r', 'Quantile', 'ROC_MOM', 'ROC_R', 'RSI', 'TRIX', 'WMA', 'bollinger_MOM', 'bollinger_atr_mom', 'bollinger_r', 'cci', 'chaikin', 'kdj', 'mfi_signal', 'obv_ma_signal', 'obv_price_corr', 'sar', 'stds', 'turtle_trading', 'ultosc_contrarian', 'williams_r_contrarian']
{
  "DoubleMA": {
    "func": "DoubleMA",
    "params": {
      "short_window": 