In [80]:
import os
import pandas as pd

BASE_DIR = r"D:/data/数据"
MIN_DIR = os.path.join(BASE_DIR, "分钟数据")
RT_MIN_DIR = os.path.join(BASE_DIR, "实时分钟数据")
DAY_DIR = os.path.join(BASE_DIR, "日线数据")

# 第一步：保证目录存在
os.makedirs(MIN_DIR, exist_ok=True)
os.makedirs(RT_MIN_DIR, exist_ok=True)
os.makedirs(DAY_DIR, exist_ok=True)

In [81]:
# -*- coding: utf-8 -*-
'''
1、取数仓的1分钟数据，接入本地qlib（本demo只提供样例转换代码）
2、qlib计算出的股池回写到数仓中（由后台搜集器同步到股池）
'''
import time
import qlib
import os
import numpy as np
import pandas as pd
from dw import DW
import statsmodels.api as sm
import warnings
warnings.filterwarnings('ignore')

dw = DW()
# 获取转债基础信息:债券代码 债券简称  申购日期 申购代码 申购上限 正股代码  正股简称     正股价     转股价   债现价  转股溢价率   原股东配售-股权登记日  原股东配售-每股配售额     发行规模         中签号发布日       中签率          上市时间  信用评级
def get_redis_cb_info():    
    # 获取所有可转债代码
    df_info = dw.redis_get_base('cb_info')
    return df_info

# 获取历史日线数据
def get_redis_day01(df_info):
    for index, row in df_info.iterrows():
        # 转化为tr_code
        tr_code =  row['债券代码']+'.SH' if row['债券代码'][:2] == '11' else row['债券代码']+'.SZ'
        df_day01 = dw.redis_get_day01(tr_code)
        if df_day01 is not None:
            df_day01.to_csv(f'D:/data/数据/日线数据/{tr_code}.csv')
            # # 打印前5行数据
            print(tr_code)
            print(df_day01)


# 获取历史1分钟数据
def get_api_min01(df_info: pd.DataFrame):
    s_time = time.strftime('%Y-%m-%d', time.localtime(time.time()-86400*30)) # 90天前    
    e_time = time.strftime('%Y-%m-%d', time.localtime(time.time())) # 当天
    for inde, row in df_info.iterrows():
        # 转化为tr_code
        tr_code =  row['债券代码']+'.SH' if row['债券代码'][:2] == '11' else row['债券代码']+'.SZ'
        # 获取1分钟数据        
        # df_min01_his = dw.api_get_history_min01(tr_code=tr_code, start_date=s_time, end_date=e_time)        
        # if not df_min01_his.empty and df_min01_his is not None:
        #     df_min01_his.to_csv(f'D:/data/数据/分钟数据/{tr_code}.csv')
        #     # # 打印前5行数据
        #     print(tr_code)
        #     print(df_min01_his.head())
        df_min01_his = dw.api_get_history_min01(tr_code=tr_code, start_date=s_time, end_date=e_time)        
        if df_min01_his is not None and not df_min01_his.empty:
            df_min01_his.to_csv(f'D:/data/数据/分钟数据/{tr_code}.csv', index=False)
            print(tr_code)
            print(df_min01_his.head())



# 获取当日的1分钟数据
RT_MIN_DIR = r'D:/data/数据/实时分钟数据'   # 建议在文件开头统一定义

def get_redis_min01(df_info: pd.DataFrame):    
    for _, row in df_info.iterrows():
        tr_code = row['债券代码']+'.SH' if row['债券代码'][:2] == '11' else row['债券代码']+'.SZ'
        df_min = dw.redis_get_min01(tr_code)
        if df_min is not None:
            try:
                data_all = pd.read_csv(f'D:/data/数据/分钟数据/{tr_code}.csv')
                data_concat = pd.concat([data_all, df_min])
                data_concat = data_concat.drop_duplicates(subset=['time'], keep='first')
                data_concat.to_csv(f'{RT_MIN_DIR}/{tr_code}.csv', index=False)
            except:
                df_min.to_csv(f'{RT_MIN_DIR}/{tr_code}.csv', index=False)

            # 打印前5行数据，此处需要和历史数据进行合并处理
            print(tr_code)
            print(df_min.head())

def save_to_redis(df_sp: pd.DataFrame, sp_name: str):
    # 回写股池到redis中
    r = dw.get_redis()
    r.set(f"sp:{sp_name}", df_sp.to_json(orient='records', force_ascii=False))

In [82]:
# !pip install boto3


In [83]:
# !pip install s3fs

In [84]:
#转为dict格式
def factor_dict(factor):
    factor_dict = {}
    for day in list(factor.index):
        factor_dict[day] = factor.loc[day].sort_values()
    return factor_dict

#dict格式因子缩尾处理
def winsorize(factor,low = 0.01, up = 0.99):
    for date in factor.keys():
        s = factor[date].copy()
        lower = s.quantile(low)
        upper = s.quantile(up)
        factor[date] = s.clip(lower, upper)
    return factor

#series格式因子缩尾处理
def series_winsorize(factor,low = 0.01, up = 0.99):
    lower = factor.quantile(low)
    upper = factor.quantile(up)
    factor = factor.clip(lower, upper)
    return factor

#dict格式因子标准化
def factor_std(factor):
    for date in factor.keys():
        factor[date] = (factor[date] - factor[date].mean()) / factor[date].std()
    return factor

#series格式因子标准化
def series_factor_std(factor):
    factor = (factor - factor.mean()) / factor.std()
    return factor

#series格式因子对另一组因子中性化
def net_factor(factor1, factor_list):
    factor = {}
    factor_x = pd.DataFrame()
    for i in factor_list: 
        factor_x = pd.concat([factor_x, i], axis=1) 
    y = factor1.dropna()
    x = factor_x.dropna()
    x, y = x.align(y, join='inner', axis=0)
    X = sm.add_constant(x)
    model = sm.OLS(y, X).fit()
    factor = model.resid.sort_values()
    return factor

In [85]:
local_file_path = 'D:/data/数据/Index_Api.csv'
s3_object_key = 'Index_Api.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

True

In [86]:
# # 获取转债基础信息，含实时转股溢价率等信息 
df_info = get_redis_cb_info()
# # # 获取历史日线数据
get_redis_day01(df_info)
# # 获取历史1分钟数据
get_api_min01(df_info)
# 获取当日的1分钟数据
get_redis_min01(df_info)
# 进行因子逻辑运算。。。。

123259.SZ
                  time     open     high      low    close  pre_close  \
0  2025-11-06 00:00:00  130.000  145.000  130.000  145.000    100.000   
1  2025-11-07 00:00:00  145.650  153.700  144.010  151.049    145.000   
2  2025-11-10 00:00:00  151.999  153.594  149.352  150.632    151.049   
3  2025-11-11 00:00:00  151.000  154.100  151.000  151.050    150.632   
4  2025-11-12 00:00:00  150.300  151.050  146.120  148.171    151.050   
5  2025-11-13 00:00:00  148.000  153.100  148.000  150.497    148.171   
6  2025-11-14 00:00:00  150.000  151.620  149.100  149.150    150.497   
7  2025-11-17 00:00:00  148.720  149.852  146.850  147.435    149.150   

      volume        amount  
0   286412.0  4.106132e+08  
1  1743291.0  2.594081e+09  
2   398139.0  6.036975e+08  
3   295813.0  4.510939e+08  
4   150943.0  2.236199e+08  
5   252577.0  3.819602e+08  
6   138764.0  2.089263e+08  
7   107067.0  1.582896e+08  
110099.SH
                   time     open     high      low    close  

In [87]:
local_file_path = 'D:/data/数据/Fund_allValueDev.csv'
s3_object_key = 'Fund_allValueDev.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

local_file_path = 'D:/data/数据/Fund_StrbPremiumRate.csv'
s3_object_key = 'Fund_StrbPremiumRate.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

local_file_path = 'D:/data/数据/CBStyleMark.csv'
s3_object_key = 'CBStyleMark.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

local_file_path = 'D:/data/数据/Fund_NewBnd.csv'
s3_object_key = 'Fund_NewBnd.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

local_file_path = 'D:/data/数据/Fund_FlagST.csv'
s3_object_key = 'Fund_FlagST.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

local_file_path = 'D:/data/数据/CBredeem.csv'
s3_object_key = 'CBredeem.csv'
dw.s3_download_file('tantra.factor', s3_object_key, local_file_path)

True

In [88]:
df_info.to_csv('D:/data/数据/information.csv')

data_close_min = pd.DataFrame()
data_open_min = pd.DataFrame()
data_volume_min = pd.DataFrame()
data_amount_min = pd.DataFrame()
data_close_min.index = pd.to_datetime(data_close_min.index)
data_open_min.index = pd.to_datetime(data_open_min.index)
data_volume_min.index = pd.to_datetime(data_volume_min.index)
data_amount_min.index = pd.to_datetime(data_amount_min.index)
for csvname in os.listdir('D:/data/数据/实时分钟数据'):
    data_stock_close = pd.read_csv(f'D:/data/数据/实时分钟数据/{csvname}', parse_dates=['time'], index_col='time')['close']
    data_stock_open = pd.read_csv(f'D:/data/数据/实时分钟数据/{csvname}', parse_dates=['time'], index_col='time')['open']
    data_stock_volume = pd.read_csv(f'D:/data/数据/实时分钟数据/{csvname}', parse_dates=['time'], index_col='time')['volume']
    data_stock_amount = pd.read_csv(f'D:/data/数据/实时分钟数据/{csvname}', parse_dates=['time'], index_col='time')['amount']
    data_stock_close.name = csvname[:9]
    data_stock_open.name = csvname[:9]
    data_stock_volume.name = csvname[:9]
    data_stock_amount.name = csvname[:9]
    data_close_min = pd.concat([data_close_min, data_stock_close], axis=1)
    data_open_min = pd.concat([data_open_min, data_stock_open], axis=1)
    data_volume_min = pd.concat([data_volume_min, data_stock_volume], axis=1)
    data_amount_min = pd.concat([data_amount_min, data_stock_amount], axis=1)

day = data_close_min.index[-1].date().strftime('%Y-%m-%d')
data_close = pd.DataFrame()
data_open = pd.DataFrame()
data_volume = pd.DataFrame()
data_amount = pd.DataFrame()
data_close.index = pd.to_datetime(data_close.index)
data_open.index = pd.to_datetime(data_open.index)
data_volume.index = pd.to_datetime(data_volume.index)
data_amount.index = pd.to_datetime(data_amount.index)
for csvname in os.listdir('D:/data/数据/日线数据'):
    data_stock_close = pd.read_csv(f'D:/data/数据/日线数据/{csvname}', parse_dates=['time'], index_col='time')['close']
    data_stock_open = pd.read_csv(f'D:/data/数据/日线数据/{csvname}', parse_dates=['time'], index_col='time')['open']
    data_stock_volume = pd.read_csv(f'D:/data/数据/日线数据/{csvname}', parse_dates=['time'], index_col='time')['volume']
    data_stock_amount = pd.read_csv(f'D:/data/数据/日线数据/{csvname}', parse_dates=['time'], index_col='time')['amount']
    data_stock_close.name = csvname[:9]
    data_stock_open.name = csvname[:9]
    data_stock_volume.name = csvname[:9]
    data_stock_amount.name = csvname[:9]
    data_close = pd.concat([data_close, data_stock_close], axis=1)
    data_open = pd.concat([data_open, data_stock_open], axis=1)
    data_volume = pd.concat([data_volume, data_stock_volume], axis=1)
    data_amount = pd.concat([data_amount, data_stock_amount], axis=1)
data_close = pd.concat([data_close,data_close_min.iloc[-2].to_frame().T])
data_open = pd.concat([data_open,data_open_min.loc[day].iloc[0].to_frame().T])
data_volume = pd.concat([data_volume,data_volume_min.iloc[-2].to_frame().T])
data_amount = pd.concat([data_amount,data_amount_min.iloc[-2].to_frame().T])
data_close.index = data_close.index.normalize()
data_open.index = data_open.index.normalize()
data_volume.index = data_volume.index.normalize()
data_amount.index = data_amount.index.normalize()
data_iussequantity = pd.read_csv(f'D:/data/数据/information.csv', index_col='债券代码')['发行规模'] * 100000000/100
data_iussequantity.index = [str(index)+'.SH' if str(index).startswith('11') else str(index)+'.SZ' for index in data_iussequantity.index]
data_marketcap = data_iussequantity * data_close

information = pd.read_csv('D:/data/数据/information.csv', index_col='债券代码')
information.index = [str(code) + '.SH' if str(code).startswith('11') else str(code) + '.SZ' for code in information.index]
stock_list = information.index.tolist()

In [89]:
# df_info.to_csv('D:/data/数据/information.csv')

# # ========= 1. 汇总实时分钟数据 =========
# data_close_min = pd.DataFrame()
# data_open_min = pd.DataFrame()
# data_volume_min = pd.DataFrame()
# data_amount_min = pd.DataFrame()

# rt_min_dir = 'D:/data/数据/实时分钟数据'

# for csvname in os.listdir(rt_min_dir):
#     file_path = os.path.join(rt_min_dir, csvname)
#     df_min = pd.read_csv(file_path, parse_dates=['time'], index_col='time')

#     code = csvname[:9]   # 例如 '123456.SZ' / '113001.SH'
#     # 按列拼成宽表
#     data_close_min[code] = df_min['close']
#     data_open_min[code] = df_min['open']
#     data_volume_min[code] = df_min['volume']
#     data_amount_min[code] = df_min['amount']

# # 确保索引是时间类型并按时间排序
# for df in [data_close_min, data_open_min, data_volume_min, data_amount_min]:
#     df.index = pd.to_datetime(df.index)
#     df.sort_index(inplace=True)

# # ========= 2. 汇总历史日线数据 =========
# data_close = pd.DataFrame()
# data_open = pd.DataFrame()
# data_volume = pd.DataFrame()
# data_amount = pd.DataFrame()

# day_dir = 'D:/data/数据/日线数据'

# for csvname in os.listdir(day_dir):
#     file_path = os.path.join(day_dir, csvname)
#     df_day = pd.read_csv(file_path, parse_dates=['time'], index_col='time')

#     code = csvname[:9]
#     data_close[code] = df_day['close']
#     data_open[code] = df_day['open']
#     data_volume[code] = df_day['volume']
#     data_amount[code] = df_day['amount']

# # 日线索引也强制转为时间并排序
# for df in [data_close, data_open, data_volume, data_amount]:
#     df.index = pd.to_datetime(df.index)
#     df.sort_index(inplace=True)

# # ========= 3. 用分钟数据更新“最近一个交易日”的日线 =========
# if not data_close_min.empty:
#     # 最近一条分钟数据的时间戳
#     last_ts = data_close_min.index.max()
#     last_day = last_ts.normalize()   # 对应的日期（去掉时间部分）

#     # 取出这一天的所有分钟数据
#     mask = (data_close_min.index.normalize() == last_day)
#     close_today = data_close_min.loc[mask]
#     open_today = data_open_min.loc[mask]
#     volume_today = data_volume_min.loc[mask]
#     amount_today = data_amount_min.loc[mask]

#     # 若这一天确实有分钟数据，再做更新
#     if not close_today.empty:
#         # 开盘价：当日第一笔；收盘价：当日最后一笔
#         open_row = open_today.iloc[0]
#         close_row = close_today.iloc[-1]

#         # 成交量 / 成交额：按分钟累加（更符合日线定义）
#         volume_row = volume_today.sum()
#         amount_row = amount_today.sum()

#         # 统一转成 DataFrame，并把 index 设成这一天的日期
#         open_row = open_row.to_frame().T
#         close_row = close_row.to_frame().T
#         volume_row = volume_row.to_frame().T
#         amount_row = amount_row.to_frame().T

#         open_row.index = [last_day]
#         close_row.index = [last_day]
#         volume_row.index = [last_day]
#         amount_row.index = [last_day]

#         # 先把原来这一天的日线（如果有）删掉，避免重复
#         data_close = data_close[data_close.index != last_day]
#         data_open = data_open[data_open.index != last_day]
#         data_volume = data_volume[data_volume.index != last_day]
#         data_amount = data_amount[data_amount.index != last_day]

#         # 再把新的这一行拼进去
#         data_close = pd.concat([data_close, close_row]).sort_index()
#         data_open = pd.concat([data_open, open_row]).sort_index()
#         data_volume = pd.concat([data_volume, volume_row]).sort_index()
#         data_amount = pd.concat([data_amount, amount_row]).sort_index()

# # ========= 4. 发行量、市值、信息表 =========
# data_iussequantity = pd.read_csv(
#     'D:/data/数据/information.csv',
#     index_col='债券代码'
# )['发行规模'] * 100000000 / 100

# # 把代码补成 tr_code 形式
# data_iussequantity.index = [
#     (str(idx) + '.SH') if str(idx).startswith('11') else (str(idx) + '.SZ')
#     for idx in data_iussequantity.index
# ]

# # 市值 = 发行量 * 收盘价（按列对齐）
# data_marketcap = data_close.mul(data_iussequantity, axis=1)

# information = pd.read_csv('D:/data/数据/information.csv', index_col='债券代码')
# information.index = [
#     (str(code) + '.SH') if str(code).startswith('11') else (str(code) + '.SZ')
#     for code in information.index
# ]
# stock_list = information.index.tolist()


In [90]:
print("======== 基本信息检查 ========")

print("data_close（日线）索引类型：", type(data_close.index))
print("data_close_min（分钟）索引类型：", type(data_close_min.index))

# 日线日期范围
print("\n日线 data_close 日期范围：")
print("  最早：", data_close.index.min())
print("  最晚：", data_close.index.max())
print("  倒数10天：", list(data_close.index[-10:]))

# 分钟日期范围（按天normalize）
min_dates = sorted(data_close_min.index.normalize().unique())
print("\n分钟 data_close_min 日期范围（normalize 后）：")
print("  最早：", min_dates[0] if len(min_dates) > 0 else None)
print("  最晚：", min_dates[-1] if len(min_dates) > 0 else None)
print("  最后10天：", min_dates[-10:])


data_close（日线）索引类型： <class 'pandas.core.indexes.datetimes.DatetimeIndex'>
data_close_min（分钟）索引类型： <class 'pandas.core.indexes.datetimes.DatetimeIndex'>

日线 data_close 日期范围：
  最早： 2007-07-12 00:00:00
  最晚： 2025-11-18 00:00:00
  倒数10天： [Timestamp('2025-11-05 00:00:00'), Timestamp('2025-11-06 00:00:00'), Timestamp('2025-11-07 00:00:00'), Timestamp('2025-11-10 00:00:00'), Timestamp('2025-11-11 00:00:00'), Timestamp('2025-11-12 00:00:00'), Timestamp('2025-11-13 00:00:00'), Timestamp('2025-11-14 00:00:00'), Timestamp('2025-11-17 00:00:00'), Timestamp('2025-11-18 00:00:00')]

分钟 data_close_min 日期范围（normalize 后）：
  最早： 2025-10-20 00:00:00
  最晚： 2025-11-18 00:00:00
  最后10天： [Timestamp('2025-11-05 00:00:00'), Timestamp('2025-11-06 00:00:00'), Timestamp('2025-11-07 00:00:00'), Timestamp('2025-11-10 00:00:00'), Timestamp('2025-11-11 00:00:00'), Timestamp('2025-11-12 00:00:00'), Timestamp('2025-11-13 00:00:00'), Timestamp('2025-11-14 00:00:00'), Timestamp('2025-11-17 00:00:00'), Timestamp('2025-11-

In [91]:
# #第一个因子
# date_list = [date.strftime('%Y-%m-%d') for date in data_close.iloc[-10:].index]

# data_compare_True = data_close_min[data_close_min.index.normalize().isin(date_list)]
# data_compare_True = data_compare_True > data_compare_True.shift(1)
# data_compare_False = data_close_min[data_close_min.index.normalize().isin(date_list)]
# data_compare_False = data_compare_False <= data_compare_False.shift(1)

# drop_index =[]
# for date in date_list:
#     drop_index.extend([data_compare_True.loc[date].index[0], data_compare_True.loc[date].index[-1]])

# data_compare_True.drop(index = drop_index, inplace = True)
# data_compare_False.drop(index = drop_index, inplace = True)

# factor = {}
# for date in date_list:
#     factor[date] = data_volume_min.loc[date][data_compare_True.loc[date]].sum() / data_volume_min.loc[date][data_compare_False.loc[date]].sum()
# factor= pd.DataFrame(factor).T
# factor.index = pd.to_datetime(factor.index)

# def weight_sum(series, N=2):
#     weight = 1 / (sum(range(1,N+1)) / N)
#     weight = pd.Series(range(1,N+1), index = series.index) * weight / N
#     return (series * weight).sum()

# factor_1 = factor_dict(factor.rolling(10).mean())
# factor_2 = factor_dict(factor.rolling(2).apply(weight_sum))
# # 合成因子,factor1、factor2为dict格式
# def factor_combine(factor1, factor2, n1, n2):
#     factor1 = winsorize(factor1)
#     factor2 = winsorize(factor2)
#     factor1 = factor_std(factor1)
#     factor2 = factor_std(factor2)
#     factor1 = pd.DataFrame(factor1).T
#     factor2 = pd.DataFrame(factor2).T
#     conbined_factor = n1 * factor1 + n2 * factor2
#     return conbined_factor

# factor1 = factor_combine(factor_1, factor_2, 0.3, 0.7).iloc[-1]

# #第二个因子
# def cal_factor(series):
#     R1 = series / series.shift(1) - 1
#     R2 = series / series.shift(2) - 1
#     RC = R1 + R2
#     return RC.mean()

# factor_new = data_close_min.resample('D').apply(cal_factor)
# factor_new.index = pd.to_datetime(factor_new.index)
# factor_new.dropna(how='all', inplace=True)

# def weight_sum(series, N=3):
#     weight = 1 / (sum(range(1,N+1)) / N)
#     weight = pd.Series(range(1,N+1), index=series.index) * weight / N
#     return (series * weight).sum()

# factor2 = factor_new.rolling(3).apply(weight_sum).iloc[-1]

# #第三个因子
# def cal_factor(series):
#     series = series[1:-1]
#     min_index = series.index
#     return_min = series / series.shift(1) - 1
#     return_std = return_min.rolling(5).std()
#     return_std_std = return_std.rolling(5).std()
#     return_std_std_mean = return_std_std.mean()
#     return data_amount_min.loc[min_index][return_std_std > return_std_std_mean].mean() / data_amount_min.loc[min_index].mean()

# factor_5 = data_close_min.resample('D').apply(cal_factor)
# factor_5 = factor_5.dropna(how='all')
# factor5 = factor_5.rolling(4).mean()
# factor5 = factor5.iloc[-1]

# #第四个因子
# hl_return = (data_close / data_open - 1).rolling(20).mean().dropna(how='all').iloc[-20:]
# date_list = [date.strftime('%Y-%m-%d') for date in hl_return.index]
# factor = {}
# for date in date_list:
#     try:
#         xdkp_return = data_close_min.loc[date] / data_close_min.loc[date].iloc[0] - 1
#         xdkp_return_aligned, hl_return_aligned = xdkp_return.align(hl_return.loc[date], axis=1, join='inner')
#         gw_amount = data_amount_min.loc[date][xdkp_return_aligned > hl_return_aligned].sum()
#         dw_amount = data_amount_min.loc[date][xdkp_return_aligned < hl_return_aligned].sum()
#         gdec = (gw_amount - dw_amount) / data_marketcap.loc[date]
#         factor[date] = gdec
#     except:
#         factor[date] = pd.Series()

# gdec = pd.DataFrame(factor).T 
# gdec.index = pd.to_datetime(gdec.index)
# gdec.dropna(how='all', axis=1, inplace=True)

# def col_corr(series):
#     series = np.abs(series.dropna())
#     n = len(series)
#     return (series.sum() - 1) / (n-1)

# factor = {}
# for date in gdec.index[9:]:
#     df = gdec.iloc[gdec.index.get_loc(date)-9 : gdec.index.get_loc(date)+1].dropna(thresh=9, axis=1)
#     corr = df.corr(method='spearman')
#     factor[date] = corr.apply(col_corr)

# factor8 = pd.DataFrame(factor).T.iloc[-1]

# #溢价率因子
# yjl = information['转股溢价率'].dropna()

# #期权偏离因子
# OVD = pd.read_csv('D:/data/数据/Fund_allValueDev.csv').pivot(index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE').iloc[-1].dropna()

# #纯债溢价率因子
# SPR = pd.read_csv('D:/data/数据/Fund_StrbPremiumRate.csv').pivot(index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE').iloc[-1].dropna()

# # 处理极值和标准化
# yjl_std = series_factor_std(series_winsorize(yjl))
# OVD_std = series_factor_std(series_winsorize(OVD))
# SPR_std = series_factor_std(series_winsorize(SPR))
# factor2_std = series_factor_std(series_winsorize(factor2))
# factor5_std = series_factor_std(series_winsorize(factor5))
# factor8_std = series_factor_std(series_winsorize(factor8))
# factor1_std = series_factor_std(series_winsorize(factor1))

# # 输出高中平因子
# OVD_net = net_factor(OVD_std,[yjl_std])
# factor2_net = net_factor(factor2_std,[yjl_std,OVD_net])
# factor5_net = net_factor(factor5_std,[yjl_std,OVD_net,factor2_net])
# factor8_net = net_factor(factor8_std,[yjl_std,OVD_net,factor2_net,factor5_net])
# factor1_net = net_factor(factor1_std,[yjl_std,OVD_net,factor2_net,factor5_net,factor8_net])
# combined_factor = (yjl_std + OVD_net + factor2_net + factor5_net - factor8_net + factor1_net).reindex(stock_list).sort_values()

# stock_gp = pd.read_csv('D:/data/数据/CBStyleMark.csv', parse_dates=['date']).pivot(index='date', columns='SYMBOL9', values='GP').iloc[-1].dropna().index.tolist()
# stock_zp = pd.read_csv('D:/data/数据/CBStyleMark.csv', parse_dates=['date']).pivot(index='date', columns='SYMBOL9', values='ZP').iloc[-1].dropna().index.tolist()
# factor_gp = combined_factor.loc[stock_gp].sort_values()
# factor_zp = combined_factor.loc[stock_zp].sort_values()
# trade_list_gp = factor_gp.index[:int(len(factor_gp)/3)].tolist()
# trade_list_zp = factor_zp.index[:int(len(factor_zp)/4)].tolist()

# # 输出低平因子
# factor2_net = net_factor(factor2_std,[OVD_std, SPR_std])
# factor8_net = net_factor(factor8_std, [OVD_std, SPR_std, factor2_net])
# combined_factor = (OVD_std + SPR_std + factor2_net - factor8_net).reindex(stock_list).sort_values()
# stock_dp = pd.read_csv('D:/data/数据/CBStyleMark.csv', parse_dates=['date']).pivot(index='date', columns='SYMBOL9', values='DP').iloc[-1].dropna().index.tolist()
# factor_dp = combined_factor.loc[stock_dp].sort_values()
# trade_list_dp = factor_dp.index[:int(len(factor_dp)/3)].tolist()

# # # df_sp: [tr_code, mark]，其中mark为权重，百分制(mark和为100)
# # # sp_name: 股池名称，以[SP_]开头，后面接因子名称，股池名称，版本号等等
# new = pd.read_csv('D:/data/数据/Fund_NewBnd.csv').pivot(index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE').iloc[-1].dropna()
# new = new[new == 1].index.tolist()
# ST = pd.read_csv('D:/data/数据/Fund_FlagST.csv').pivot(index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE').iloc[-1].dropna()
# ST = ST[ST == 1].index.tolist()
# QS = pd.read_csv('D:/data/数据/CBredeem.csv').pivot(index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE').iloc[-1].dropna()
# QS = QS[QS == 1].index.tolist()
# # df1 = dw.api_get_sp(sp_name='SP_预警_基础')
# # df2 = dw.api_get_sp(sp_name='SP_预警_标准D')
# # SP1 = []
# # SP2 = []
# # if df1 is not None:
# #     SP1 = df1['tr_code'].tolist()
# # if df2 is not None:
# #     SP2 = df2['tr_code'].tolist()
    
# trade_list_gp = [x for x in trade_list_gp if x not in new and x not in ST and x not in QS]
# trade_list_zp = [x for x in trade_list_zp if x not in new and x not in ST and x not in QS]
# trade_list_dp = [x for x in trade_list_dp if x not in new and x not in ST and x not in QS]
# mark_gp = [round(100/len(trade_list_gp),4)] * len(trade_list_gp)
# mark_zp = [round(100/len(trade_list_zp),4)] * len(trade_list_zp)
# mark_dp = [round(100/len(trade_list_dp),4)] * len(trade_list_dp)

# df_sp = pd.DataFrame({'tr_code': trade_list_gp, 'mark': mark_gp})
# save_to_redis(df_sp=df_sp, sp_name='SP_RDS_WCF01_GP_G3') # SP_FACTORNAME_GP为股池名称
# df_sp = pd.DataFrame({'tr_code': trade_list_zp, 'mark': mark_zp})
# save_to_redis(df_sp=df_sp, sp_name='SP_RDS_WCF01_ZP_G4') # SP_FACTORNAME_GP为股池名称
# df_sp = pd.DataFrame({'tr_code': trade_list_dp, 'mark': mark_dp})
# save_to_redis(df_sp=df_sp, sp_name='SP_RDS_WCF01_DP_G3') # SP_FACTORNAME_GP为股池名称

In [92]:
import numpy as np
import pandas as pd

# 1. 把日线数据转成 行=转债，列=日期（与你在 qlib 研究代码里的形状保持一致）
data_close_factor     = data_close.T          # 行=转债，列=日期
data_volume_factor    = data_volume.T
data_amount_factor    = data_amount.T
data_marketcap_factor = data_marketcap.T

# 2. 构造溢价率时间序列 prem（这里用 information 里的“最新截面”在时间轴上复制）
#    information.index 已经是 tr_code（如 113001.SH），data_close 的列也是 tr_code
prem_last = information['转股溢价率'].reindex(data_close.columns)

prem_panel = pd.DataFrame(
    np.repeat(prem_last.values.reshape(1, -1), repeats=len(data_close.index), axis=0),
    index=data_close.index,          # 行=日期
    columns=data_close.columns       # 列=tr_code
)
yjl_factor = prem_panel.T           # 行=转债，列=日期（和 data_close_factor 对齐）

# 3. 直接套用你自研因子的计算逻辑（只是把变量名对上）
amount    = data_amount_factor                            # 也可以用 data_close_factor * data_volume_factor
close     = data_close_factor.replace([np.inf, -np.inf], np.nan)
prem      = yjl_factor.replace([np.inf, -np.inf], np.nan)
amount_   = amount.replace([np.inf, -np.inf], np.nan)
marketcap = data_marketcap_factor.replace([np.inf, -np.inf], np.nan)

# 日收益（用于波动 / Amihud）
ret1 = close.pct_change(axis=1)

def zscore_xsec(df: pd.DataFrame) -> pd.DataFrame:
    """对列=日期做截面 z-score，行=转债"""
    df = df.replace([np.inf, -np.inf], np.nan)
    mu  = df.mean(axis=0)
    sig = df.std(axis=0).replace(0, np.nan)
    return df.sub(mu, axis=1).div(sig, axis=1)

# 1) Value：低溢价（20 日平滑）
prem_val = prem / 100.0    # 如果 yjl 本来就是 0.x，可以改成 prem_val = prem.copy()
prem_ma20 = prem_val.rolling(window=20, min_periods=5, axis=1).mean()
F_val = -prem_ma20              # 溢价越低越好
F_val = F_val.fillna(0)
Z_val = zscore_xsec(F_val)

# 2) Momentum：20 / 60 日价格动量
M20 = close.pct_change(periods=20, axis=1)
M60 = close.pct_change(periods=60, axis=1)
M20 = M20.fillna(0)
M60 = M60.fillna(0)
Z_M20 = zscore_xsec(M20)
Z_M60 = zscore_xsec(M60)
F_M = 0.7 * Z_M20 + 0.3 * Z_M60

# 3) LowVol：20 日总波动 + 下行波动
vol20 = ret1.rolling(window=20, min_periods=10, axis=1).std()
down_ret = ret1.clip(upper=0)
down20   = (down_ret.pow(2)
            .rolling(window=20, min_periods=10, axis=1)
            .mean() ** 0.5)
vol20  = vol20.fillna(0)
down20 = down20.fillna(0)
Z_vol  = zscore_xsec(-vol20)    # 低波动好 → 取负再 z-score
Z_down = zscore_xsec(-down20)
F_lv = 0.5 * Z_vol + 0.5 * Z_down

# 4) Liquidity：换手 + Amihud
turn = amount_.replace(0, np.nan) / marketcap
turn20 = turn.rolling(window=20, min_periods=10, axis=1).mean()

amihud20 = (ret1.abs() / amount_.replace(0, np.nan)) \
    .rolling(window=20, min_periods=10, axis=1).mean()

turn20   = turn20.fillna(0)
amihud20 = amihud20.fillna(0)
Z_turn   = zscore_xsec(turn20)
Z_amihud = zscore_xsec(-amihud20)   # Amihud 越小越好

F_liq = 0.7 * Z_turn + 0.3 * Z_amihud

# 5) 合成信号（还没翻方向）
signal_raw = (
    0.30 * F_M   +   # 中期动量为主
    0.25 * Z_val +   # 略偏向低溢价
    0.25 * F_lv  +   # 低波动
    0.20 * F_liq     # 流动性
)

# ==== 自动确认方向 ====

# 目标：次日收益
ret1      = data_close_factor.pct_change(axis=1)
ret_next1 = ret1.shift(-1, axis=1)

def daily_rank_ic(F: pd.DataFrame, R: pd.DataFrame) -> pd.Series:
    """返回按日期的 RankIC 序列（index=日期）"""
    ics = []
    for dt in F.columns:
        x = F[dt]
        y = R[dt]
        valid = x.notna() & y.notna()
        if valid.sum() < 10:
            ics.append(np.nan)
            continue
        xr = x[valid].rank()
        yr = y[valid].rank()
        ic = xr.corr(yr)  # Pearson on ranks
        ics.append(ic)
    return pd.Series(ics, index=F.columns, name='rankic')

def confirm_direction(signal_raw: pd.DataFrame,
                      ret_next: pd.DataFrame,
                      train_start: str, train_end: str,
                      halflife: int = 60,
                      threshold: float = 0.01,
                      min_periods: int = 20) -> pd.Series:
    """
    - 仅用训练期的IC来“学习方向”与“触发阈值”
    - ewm(halflife) 平滑IC，> +threshold -> +1；< -threshold -> -1；否则保持上一个方向
    - 返回按‘日期’的方向序列 direction_t ∈ {+1,-1}
    """
    F = zscore_xsec(signal_raw)
    ic_series = daily_rank_ic(F.loc[:, train_start:train_end],
                              ret_next.loc[:, train_start:train_end])
    # 训练期平滑
    ic_smooth = ic_series.ewm(halflife=halflife, min_periods=min_periods).mean()

    # 训练期方向（带迟滞）
    direction_train = []
    last = 1.0  # 起始默认做多
    for val in ic_smooth:
        if np.isnan(val):
            direction_train.append(last)
            continue
        if val >  threshold:
            last =  1.0
        elif val < -threshold:
            last = -1.0
        direction_train.append(last)
    direction_train = pd.Series(direction_train, index=ic_smooth.index)

    # 全样本实时平滑 IC，用训练期末方向做初值
    ic_full = daily_rank_ic(F, ret_next).ewm(
        halflife=halflife, min_periods=min_periods
    ).mean()

    direction_full = []
    last = direction_train.iloc[-1] if len(direction_train.dropna()) else 1.0
    for dt, val in ic_full.items():
        if np.isnan(val):
            direction_full.append(last)
            continue
        if val >  threshold:
            last =  1.0
        elif val < -threshold:
            last = -1.0
        direction_full.append(last)
    direction_full = pd.Series(direction_full, index=ic_full.index, name='dir')

    return direction_full

# 配置训练窗口（可以跟你研究时保持一致）
train_start, train_end = '2023-01-01', '2024-06-30'

dir_series = confirm_direction(
    signal_raw, ret_next1,
    train_start=train_start,
    train_end=train_end,
    halflife=60,
    threshold=0.01,
    min_periods=20
)

# 最终日频因子：行=转债，列=日期
factor = zscore_xsec(signal_raw).mul(dir_series, axis=1).fillna(0)

# 取“当前交易日”的截面因子（最后一列）
current_date = factor.columns[-1]
factor_today = factor[current_date]               # Series，index=tr_code
factor_today = factor_today.reindex(stock_list)   # 对齐 stock_list
# 因子越大越好，所以后面选股要按降序排序

# ==== 高/中/低平池选股 + 写回 redis（沿用原来的框架，只把 combined_factor 换成 factor_today）====

style_mark = pd.read_csv('D:/data/数据/CBStyleMark.csv', parse_dates=['date'])

pivot_gp = style_mark.pivot(index='date', columns='SYMBOL9', values='GP')
pivot_zp = style_mark.pivot(index='date', columns='SYMBOL9', values='ZP')
pivot_dp = style_mark.pivot(index='date', columns='SYMBOL9', values='DP')

stock_gp = pivot_gp.iloc[-1].dropna().index.tolist()
stock_zp = pivot_zp.iloc[-1].dropna().index.tolist()
stock_dp = pivot_dp.iloc[-1].dropna().index.tolist()

# 用自研因子在三个池子里面排序（降序：因子大的优先买入）
factor_gp = factor_today.loc[stock_gp].dropna().sort_values(ascending=False)
factor_zp = factor_today.loc[stock_zp].dropna().sort_values(ascending=False)
factor_dp = factor_today.loc[stock_dp].dropna().sort_values(ascending=False)

# 和原来一样：高平池取前 1/3，中平池取前 1/4，低平池取前 1/3
trade_list_gp = factor_gp.index[:int(len(factor_gp) / 3)].tolist()
trade_list_zp = factor_zp.index[:int(len(factor_zp) / 4)].tolist()
trade_list_dp = factor_dp.index[:int(len(factor_dp) / 3)].tolist()

# ==== 叠加新债 / ST / 强赎等过滤，再写入 redis（这部分直接复用你原来的逻辑）====

new = pd.read_csv('D:/data/数据/Fund_NewBnd.csv').pivot(
    index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE'
).iloc[-1].dropna()
new = new[new == 1].index.tolist()

ST = pd.read_csv('D:/data/数据/Fund_FlagST.csv').pivot(
    index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE'
).iloc[-1].dropna()
ST = ST[ST == 1].index.tolist()

QS = pd.read_csv('D:/data/数据/CBredeem.csv').pivot(
    index='TRADINGDATE', columns='SYMBOL9', values='FACTORVALUE'
).iloc[-1].dropna()
QS = QS[QS == 1].index.tolist()

trade_list_gp = [x for x in trade_list_gp if x not in new and x not in ST and x not in QS]
trade_list_zp = [x for x in trade_list_zp if x not in new and x not in ST and x not in QS]
trade_list_dp = [x for x in trade_list_dp if x not in new and x not in ST and x not in QS]

mark_gp = [round(100 / len(trade_list_gp), 4)] * len(trade_list_gp)
mark_zp = [round(100 / len(trade_list_zp), 4)] * len(trade_list_zp)
mark_dp = [round(100 / len(trade_list_dp), 4)] * len(trade_list_dp)

df_sp = pd.DataFrame({'tr_code': trade_list_gp, 'mark': mark_gp})
save_to_redis(df_sp=df_sp, sp_name='SP_RDS_CBMF01_GP')  

df_sp = pd.DataFrame({'tr_code': trade_list_zp, 'mark': mark_zp})
save_to_redis(df_sp=df_sp, sp_name='SP_RDS_CBMF01_ZP')

df_sp = pd.DataFrame({'tr_code': trade_list_dp, 'mark': mark_dp})
save_to_redis(df_sp=df_sp, sp_name='SP_RDS_CBMF01_DP')

In [93]:
print(current_date)
print(factor_today.describe())
print(trade_list_gp[:10])


2025-11-18 00:00:00
count    9.890000e+02
mean    -5.747565e-17
std      9.831578e-01
min     -1.123916e+01
25%     -4.837125e-01
50%     -4.837125e-01
75%      4.012669e-01
max      5.589148e+00
Name: 2025-11-18 00:00:00, dtype: float64
['123118.SZ', '118043.SH', '123256.SZ', '127037.SZ', '110096.SH', '123188.SZ', '118057.SH', '123235.SZ', '113069.SH', '127081.SZ']
