In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings


import itertools
import datetime
from tqdm.notebook import tqdm
from sqlalchemy import create_engine

In [16]:
hgyz_signals = ['zyhgitt', 'zcqlb', 'srclyoy', 'xckpmiidy', 'tb10y_diff', 'tb10y_ttm', 'nrfs']
jszb_signals = ['roc', 'sma', 'dma', 'dma_ama', 'macd', 'trix', 'bbi', 'bbands', 'cci', 'kdj', 'rsi', 'cmo']
qx_signals = ['bbands_rzmr', 'bbands_iccfe', 'bbands_ifcfe', 'volumeratio', 'oiratio']
factors_signals = ['zyhgitt', 'zcqlb', 'srclyoy', 'xckpmiidy', 'tb10y_diff', 'tb10y_ttm', 'nrfs', 'roc', 'sma', 'dma', 'dma_ama', 'macd', 'trix', 'bbi', 'bbands', 'cci', 'kdj', 'rsi', 'cmo', 'bbands_rzmr', 'bbands_iccfe', 'bbands_ifcfe', 'volumeratio', 'oiratio']
# factors_signals_delvol = ['zyhgitt','zcqlb','srclyoy','xckpmiidy','tb10y_diff','tb10y_ttm','nrfs','roc','sma','dma','dma_ama','macd','trix','bbi','bbands','cci','kdj','rsi','cmo','bbands_rzmr','bbands_iccfe','bbands_ifcfe','oiratio']

# backtesting period configuration
fromdate = datetime.datetime(2024, 6, 10)
todate = datetime.datetime(2024, 6, 17)
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
warnings.filterwarnings('ignore')

In [3]:
def kelly_criterion(p, b):
    q = 1 - p
    f_star = (b * p - q) / b if (b * p - q) / b > 0 else 0
    return f_star


def draw_sng(nv_stg, nv_index, text=''):
    """

    :param nv_index:
    :param nv_stg:
    :type text: str
    """
    fig = plt.figure(figsize=(14, 7))
    ax = fig.add_subplot(1, 1, 1)

    ax.plot(nv_stg, label='nv_stg')
    ax.plot(nv_index, label='nv_index')
    ax.legend()

    ax.set_title('策略&基准净值走势', fontsize='x-large')
    ax.set_xlabel("{}\n{}".format('year', text), fontsize=10, fontfamily='sans-serif', fontstyle='italic')
    ax.set_ylabel(ylabel='', fontsize=10, fontstyle='oblique')

    ax.xaxis.set_tick_params(rotation=45, labelsize=10, colors='black')
    ax.set_ylim(bottom=0)


def ratio_analyze(nv_stg, position, timeperiod='d', rf=0):
    # 计算每日收益率
    ret = nv_stg.pct_change(1) * 100

    # 根据时间周期计算年化收益率和年化波动率
    try:
        if timeperiod == 'd':
            periods_per_year = 252
        elif timeperiod == 'm':
            periods_per_year = 12
        elif timeperiod == 'w':
            periods_per_year = 52
        else:
            raise ValueError
    except ValueError:
        raise ValueError("timeperiod 参数必须是 'd'（日）、'm'（月）或 'w'（周）")

    anl_ret = (np.power(nv_stg.iloc[-1], periods_per_year / len(nv_stg)) - 1) * 100
    vol = np.sqrt(periods_per_year) * np.std(ret)

    # 计算最大回撤率
    mdd_r = ((nv_stg.cummax() - nv_stg) / nv_stg.cummax()).max() * 100

    # 计算夏普比率
    sharp = (anl_ret - rf) / vol

    # 计算换手率/调仓率（双边）
    list_item = [list(v) for k, v in itertools.groupby(position.values)]
    turnover = len(list_item) / (len(nv_stg) / periods_per_year)

    # 计算cagr/vol比率
    cagr_vol = anl_ret / vol

    # 输出结果
    ratio_txt = (
        "复合年化收益率：{:.2f}%；年化波动率：{:.2f}；最大回撤率：{:.2f}%；"
        "夏普比：{:.2f}；换手率/调仓率（双边）：{:.2f}；cagr/vol：{:.2f}"
    ).format(anl_ret, vol, mdd_r, sharp, turnover, cagr_vol)

    return ratio_txt


# 通过signal拿到了一个df position
def get_position(df_signal, barShifted=1):
    # 通过signal获取series position
    series = df_signal.values
    list_position = [0 for _ in range(barShifted)]
    # list_position.extend([0]*(len(roc_signal['signal'])))
    for i, signal in enumerate(series):
        if signal == 1:
            list_position.append(1)
        elif signal == -1:
            list_position.append(0)
        else:
            list_position.append(list_position[-1])
    # from list to series
    series_posiiton = pd.Series(list_position)
    series_posiiton.name = 'position'
    # from series to df，附加日期
    df_position = pd.merge(left=series_posiiton, right=df_signal.reset_index(), left_index=True, right_index=True, how='left')[['Date', 'position']]

    return df_position


# 移动signal，根据历史数据获得当期signal值
def get_current_signal(df_signal, barShifted=1):
    # 判断period
    length = len(df_signal) / ((df_signal.index[-1] - df_signal.index[0]).days / 365)
    period = 12 if length < 20 else 52 if 20 < length < 100 else 250

    # get datetime index[-1]
    if period == 12:
        next_month = pd.date_range(start=df_signal.index[-1], periods=3, freq='M')[-2:]
        concat_data = pd.DataFrame(next_month)
    elif period == 52:
        next_week = pd.date_range(start=df_signal.index[-1], periods=3, freq='W-SUN')[-2:]
        concat_data = pd.DataFrame(next_week)
    else:
        pass

    df_signal = pd.concat([df_signal, concat_data])
    df_signal_ = df_signal['signal'].shift(2)

    return df_signal_


def get_bm(freq, table_name='881001_wi', indicator_name='ret'):
    def toweek_inclusive(df):
        df.index = pd.to_datetime(df.index)
        df_ = df.resample('W-SUN').last().ffill().dropna()  # resample用前最邻近值
        return df_

    def tomonth_inclusive(df):
        df.index = pd.to_datetime(df.index)
        df_ = df.resample('M').last().ffill().dropna()  # resample用前最邻近值
        return df_

    engine = create_engine("mysql+pymysql://root:@localhost:3306/meta_data?charset=utf8mb4")
    if table_name == '881001_wi':
        df = pd.read_sql('SELECT Date,HIGH,LOW,CLOSE FROM {}'.format(table_name), engine, index_col='Date')  # 从meta_data提取数据
    elif table_name == 'cba00101_cs':
        df = pd.read_sql('SELECT Date,CLOSE FROM {}'.format(table_name), engine, index_col='Date')  # 从meta_data提取数据
    else:
        pass

    if freq == 'M':
        df_ = tomonth_inclusive(df)
    elif freq == 'W-SUN':
        df_ = toweek_inclusive(df)
    else:
        df_ = df.copy()
        df_.index = pd.to_datetime(df_.index)

    df_['ret'] = df_['CLOSE'].pct_change(1)
    return df, df_, df_[indicator_name]


def get_net_value(df_position, fromdate, todate,benchmark_map):
    # get frequency
    length = len(df_position) / ((df_position[df_position['Date'].notna()]['Date'].iat[-1] - df_position['Date'].iat[0]).days / 365)
    period = 12 if length < 20 else 52 if 20 < length < 100 else 250

    # 根据频率获取基准收益率
    right = benchmark_map[period]

    df_position_ = pd.merge(left=df_position, right=right, left_on='Date', right_on='Date', how='left').set_index('Date')

    # calculate net value
    df_ = df_position_[(df_position_.index >= fromdate) & (df_position_.index <= todate)]  # 切回测区间，包含首尾
    df_.iloc[0] = 0  # 设置day1 ret=0
    df_.loc[:, 'nv_idc'] = (df_.ret * df_.loc[:, 'position'] + 1.0).cumprod()  # 计算因子净值
    df_.loc[:, 'nv_bm'] = (df_.ret + 1.0).cumprod()  # 计算基准净值
    return df_[['nv_idc', 'nv_bm']]


# 获得加权指标-->数据处理-->获得权重
def linear_weighting(dic, how='e', is0neThird=False, solver='MMS', ispositive=False, fromdate=fromdate, todate=todate, rf=0, hgyz_signals=hgyz_signals, jszb_signals=jszb_signals, qx_signals=qx_signals):
    def get_sharp(k, df_nv, dic_ratio):  # 夏普比率函数
        ret = np.power(df_nv.iloc[-1, 0], period / len(df_nv)) - 1
        vol = np.sqrt(period) * np.std(df_nv.iloc[:, 0].pct_change(1) * 100)
        if not vol:
            return  # 如果 vol 为零，直接返回
        dic_ratio[k] = (ret - rf) / vol
        if ispositive:
            dic_ratio = {key: dic_ratio[key] for key in dic_ratio.keys() if dic_ratio[key] > 0}
        else:
            pass

    def get_cagrvol(k, df_nv, dic_ratio):  # cagr/vol 函数
        ret = np.power(df_nv.iloc[-1, 0], period / len(df_nv)) - 1
        vol = np.sqrt(period) * np.std(df_nv.iloc[:, 0].pct_change(1) * 100)
        if not vol:
            return  # 如果 vol 为零，直接返回
        dic_ratio[k] = ret / vol
        if ispositive:
            dic_ratio = {key: dic_ratio[key] for key in dic_ratio.keys() if dic_ratio[key] > 0}
        else:
            dic_ratio = dic_ratio

    def min_max_scaler(dic):  # scaler函数
        v_min = min(dic.values())
        v_max = max(dic.values())
        for k, value in dic.items():
            dic[k] = (value + 1 - v_min) / (v_max - v_min) if len(dic) > 1 else 1  # 三元表达
        return dic

    def get_weighted(dic, solver='ASONE'):  # 获取权重。ASONE：所有因子低位同等；1/3_e：1/3 * 因子内部等权

        def get_weighted_asone(dic):
            sum_v = sum(dic.values())
            for k, v in dic.items():
                dic[k] = v / sum_v
            return dic

        if solver == 'ASONE':
            dic = get_weighted_asone(dic)

        elif solver == '1/3_e':
            for lst in [hgyz_signals, jszb_signals, qx_signals]:
                temp_weight = get_weighted_asone({key: dic[key] for key in dic.keys() if key in lst})
                for k, v in temp_weight.items():
                    dic[k] = 1 / 3 * v
        else:
            pass
        return dic

    dic_ratio = {}  # key：因子名；value：ratio(等权时为1)

    # key：因子名称；value:指标
    for k, df_nv in dic.items():
        # 获取因子对应的period
        length = len(df_nv) / ((todate - fromdate).days / 365)
        period = 12 if length < 20 else 52 if 20 < length < 100 else 250
        # 根据方法形成指标字典
        if how == 'e':
            dic_ratio[k] = 1
        elif how == 's':
            get_sharp(k, df_nv, dic_ratio)
        elif how == 'c':
            get_cagrvol(k, df_nv, dic_ratio)

    # 根据因子、指标获取权重
    if how == 'e':
        if is0neThird:
            weight = get_weighted(dic_ratio, solver='1/3_e')
        weight = get_weighted(dic_ratio)

    else:
        if solver == 'MMS':
            scaled_dic_ratio = min_max_scaler(dic_ratio)
            if is0neThird:
                weight = get_weighted(scaled_dic_ratio, solver='1/3_e')
            weight = get_weighted(scaled_dic_ratio)
        else:
            pass
    return weight


def calculate_composite_signal(dic_position, benchmark_map,is0neThird=True, freq='W-SUN', fromdate=fromdate, todate=todate, factors_signals=factors_signals):
    """
    计算组合信号并进行回测
    :param fromdate: 回测开始日期
    :param todate: 回测结束日期
    :param factors_signals: 因子信号列表
    :param dic_position: 因子仓位字典
    :param linear_weighting_func: 线性加权函数
    :param is0neThird: 线性加权函数参数，默认True
    :param freq: 回测频率，默认每周一次
    :return: 回测结果
    """
    # 创建回测区间序列
    backtesting_period = pd.date_range(start=fromdate, end=todate, freq=freq)
    engine = create_engine("mysql+pymysql://root:@localhost:3306/factors_signal?charset=utf8mb4")  # 创建数据库连接
    results = pd.DataFrame()  # 初始化缓存结果
    results_backtesting = pd.DataFrame()  # 初始化回测结果

    for i, todate_ in enumerate(tqdm(backtesting_period, desc='PROCESSING')):
        next_day_ = todate_ + datetime.timedelta(days=1)  # 初始化信号日期
        fromdate_ = todate_ - datetime.timedelta(days=360)  # 初始化业绩回测起始日期

        # 计算因子净值
        dic_all_net_value = {
            factor_name: get_net_value(dic_position[factor_name], fromdate_, todate_,benchmark_map)
            for factor_name in factors_signals
        }

        # 筛选具有绝对收益的因子
        list_filtered_factor = [
            key for key in dic_all_net_value.keys()
            if dic_all_net_value[key]['nv_idc'].iat[-1] > dic_all_net_value[key]['nv_bm'].iat[-1] or key == 'zcqlb'
        ]

        # 从数据库中提取信号
        results_ = pd.DataFrame()
        for table in list_filtered_factor:
            query = f"SELECT `signal` FROM {table} WHERE `Date` BETWEEN '{fromdate_.strftime('%Y-%m-%d')}' AND '{todate_.strftime('%Y-%m-%d')}' ORDER BY `Date` DESC LIMIT 1"
            df = pd.read_sql(query, engine)
            df.rename(columns={'signal': table}, inplace=True)
            results_ = pd.concat([results_, df], axis=1)

        # 插入信号日期
        results_.insert(0, 'Date', next_day_)

        # 对筛选出的因子赋权
        dic_filtered_net_value = {key: dic_all_net_value[key] for key in list_filtered_factor}
        weight = pd.DataFrame(linear_weighting(dic_filtered_net_value, is0neThird=is0neThird), index=[0])
        results_['bullish_probability'] = results_.apply(
            lambda row: sum(row[keys] * weight[keys] if row[keys] > 0 else 0.5 * weight[keys] if row[keys] == 0 else 0 for keys in weight.keys()), axis=1
        )

        # 合并结果
        combine_results_ = pd.concat([results_, weight], axis=0)
        results = pd.concat([results, combine_results_], axis=0)
        results_backtesting = pd.concat([results_backtesting, results_], axis=0)

        # 将 'bullish_probability' 列移动到最后一列
        cols = list(results.columns)
        cols.append(cols.pop(cols.index('bullish_probability')))
        results = results[cols]
        results_backtesting = results_backtesting[cols]

    engine.dispose()
    return results, results_backtesting


# 下面这个是回测用
# FIXME:这个func还没做完，下次用了再优化
def backtest_composite_signals(fromdate, todate, benchmark_map, factors_signals, dic_position, linear_weighting_func, is0neThird=True):
    """
    回测组合信号
    :param fromdate: 回测开始日期
    :param todate: 回测结束日期
    :param db_url: 数据库连接URL
    :param factors_signals: 因子信号列表
    :param dic_position: 因子仓位字典
    :param linear_weighting_func: 线性加权函数
    :param is0neThird: 线性加权函数参数，默认True
    :return: 回测结果
    """
    # 创建回测区间序列
    backtesting_period = pd.date_range(start=fromdate, end=todate, freq='D')
    df_result_position = pd.DataFrame(index=backtesting_period)

    # 合并因子仓位
    for k, v in dic_position.items():
        v_ = v.set_index('Date')
        v_.rename(columns={'position': f'{k}_position'}, inplace=True)
        df_result_position = pd.merge(left=df_result_position, right=v_, left_index=True, right_index=True, how='left')

    # 填充缺失值
    df_result_position.bfill(inplace=True)
    df_result_position.ffill(inplace=True)

    results = pd.DataFrame()  # 初始化缓存结果
    results_backtesting = pd.DataFrame()  # 初始化回测结果

    for i, todate_ in enumerate(tqdm(backtesting_period, desc='PROCESSING')):
        next_day_ = todate_ + datetime.timedelta(days=1)  # 初始化信号日期
        fromdate_ = todate_ - datetime.timedelta(days=360)  # 初始化业绩回测起始日期

        # 计算因子净值
        dic_all_net_value = {
            factor_name: get_net_value(dic_position[factor_name], fromdate_, todate_,benchmark_map)
            for factor_name in factors_signals
        }

        # 筛选具有绝对收益的因子
        list_filtered_factor = [
            key for key in dic_all_net_value.keys()
            if dic_all_net_value[key]['nv_idc'].iat[-1] > dic_all_net_value[key]['nv_bm'].iat[-1] or key == 'zcqlb'
        ]

        # 从 df_result_position 中提取仓位
        results_ = pd.DataFrame()
        for item in list_filtered_factor:
            df = pd.DataFrame([df_result_position.loc[next_day_, f'{item}_position']], index=[0], columns=[item])
            results_ = pd.concat([results_, df], axis=1)

        # 插入信号日期
        results_.insert(0, 'Date', next_day_)

        # 对筛选出的因子赋权
        dic_filtered_net_value = {key: dic_all_net_value[key] for key in list_filtered_factor}
        weight = pd.DataFrame(linear_weighting_func(dic_filtered_net_value, is0neThird=is0neThird), index=[0])
        results_['bullish_probability'] = results_.apply(
            lambda row: sum(row[keys] * weight[keys] for keys in weight.keys() if row[keys] > 0), axis=1
        )

        # 合并结果
        combine_results_ = pd.concat([results_, weight], axis=0)
        results = pd.concat([results, combine_results_], axis=0)
        results_backtesting = pd.concat([results_backtesting, results_], axis=0)

        # 将 'bullish_probability' 列移动到最后一列
        cols = list(results.columns)
        cols.append(cols.pop(cols.index('bullish_probability')))
        results = results[cols]
        results_backtesting = results_backtesting[cols]

    return results, results_backtesting


def perform_backtesting(results_backtesting, fromdate=fromdate, todate=todate, e=1, ret_1='ret_x', ret_2='ret_y', probility='bullish_probability', isDraw=True):
    def prepare_backtesting_data():
        engine = create_engine("mysql+pymysql://root:@localhost:3306/meta_data?charset=utf8mb4")  # 创建数据库连接
        # ret_benchmark = pd.read_sql('SELECT cba00101_cs.Date,881001_wi.ret AS ret_x,cba00101_cs.ret AS ret_y  FROM 881001_wi RIGHT JOIN cba00101_cs ON 881001_wi.Date = cba00101_cs.Date', engine)
        # 提取基准数据
        try:
            close_benchmark = pd.read_sql('SELECT cba00101_cs.Date,881001_wi.CLOSE AS close_x,cba00101_cs.CLOSE AS close_y  FROM 881001_wi RIGHT JOIN cba00101_cs ON 881001_wi.Date = cba00101_cs.Date', engine).set_index('Date')
        except Exception as e:  # 捕获异常
            print(f"Error connecting to the database or executing SQL: {e}")
            return None
        # setting date.index and resample
        close_benchmark.index = pd.to_datetime(close_benchmark.index)
        close_benchmark_ = close_benchmark.resample('W-MON').last().ffill()
        close_benchmark_['ret_x'] = (close_benchmark_['close_x'].shift(-1) - close_benchmark_['close_x']) / close_benchmark_['close_x']
        close_benchmark_['ret_y'] = (close_benchmark_['close_y'].shift(-1) - close_benchmark_['close_y']) / close_benchmark_['close_y']
        ret_benchmark = close_benchmark_[['ret_x', 'ret_y']]
        ret_benchmark.reset_index(inplace=True)
        # extract the backtesting period
        ret_benchmark['Date'] = pd.to_datetime(ret_benchmark['Date'])
        ret_benchmark_ = ret_benchmark[(ret_benchmark['Date'] >= fromdate) & (ret_benchmark['Date'] <= todate)]  # 截取和 bullish posibility 相同的回测区间
        # get bullish_probability
        df_bullish_posobility = results_backtesting[['Date', 'bullish_probability']]  # get posibility from loop_body function return

        # 将 position 和 daily ret 对应起来，需要注意的是我们每一天都有posibility
        df_bullish_posobility_ = pd.merge(left=ret_benchmark_, right=df_bullish_posobility, left_on='Date', right_on='Date', how='left')
        df_bullish_posobility_.set_index('Date', inplace=True)  # setting
        return df_bullish_posobility_

    def initialize_first_row(df):
        df[['ast_1', 'ast_2', 'nv_stg', 'nv_index']] = 0
        df.loc[df.index[0], 'ast_1'] = 1 * df.loc[df.index[0], probility] * (1 + df.loc[df.index[0], ret_1])
        df.loc[df.index[0], 'ast_2'] = 1 * (1 - df.loc[df.index[0], probility]) * (1 + df.loc[df.index[0], ret_2])
        df.loc[df.index[0], 'nv_stg'] = df.loc[df.index[0], 'ast_1'] + df.loc[df.index[0], 'ast_2']

    def update_row(df, inx, prev_row, curr_row):
        if curr_row[probility] == 2:
            df.loc[inx, 'ast_1'] = prev_row['ast_1'] * (1 + curr_row[ret_1])
            df.loc[inx, 'ast_2'] = prev_row['ast_2'] * (1 + curr_row[ret_2])
            df.loc[inx, 'nv_stg'] = df.loc[inx, 'ast_1'] + df.loc[inx, 'ast_2']
        else:
            df.loc[inx, 'ast_1'] = prev_row['nv_stg'] * curr_row[probility] * (1 + curr_row[ret_1])
            df.loc[inx, 'ast_2'] = prev_row['nv_stg'] * (1 - curr_row[probility]) * (1 + curr_row[ret_2])
            df.loc[inx, 'nv_stg'] = df.loc[inx, 'ast_1'] + df.loc[inx, 'ast_2']

    df_bullish_posobility_ = prepare_backtesting_data()
    initialize_first_row(df_bullish_posobility_)
    df_bullish_posobility_['nv_index'] = (1 + e * df_bullish_posobility_[ret_1] + (1 - e) * df_bullish_posobility_[ret_2]).cumprod()

    # 遍历DataFrame
    for i, inx in enumerate(df_bullish_posobility_.index):
        if i == 0:
            continue

        prev_row = df_bullish_posobility_.iloc[i - 1]
        curr_row = df_bullish_posobility_.iloc[i]
        update_row(df_bullish_posobility_, inx, prev_row, curr_row)

    if isDraw:
        model_text = ratio_analyze(df_bullish_posobility_['nv_stg'], df_bullish_posobility_['bullish_probability'], timeperiod='w')
        nv_text = ratio_analyze(df_bullish_posobility_['nv_index'], pd.Series(np.ones(len(df_bullish_posobility_))), timeperiod='w')
        draw_sng(df_bullish_posobility_['nv_stg'], df_bullish_posobility_['nv_index'], text='{}\n{}'.format(model_text, nv_text))
        plt.show()
    else:
        pass
    return df_bullish_posobility_

In [11]:
engine = create_engine("mysql+pymysql://root:@localhost:3306/factors_signal?charset=utf8mb4")
dic_signal = {}  # key:factor_name;value:signal in df format
for f in factors_signals:  # 提取signal
    temp_signal = pd.read_sql('SELECT * FROM {}'.format(f), engine, index_col='Date')
    temp_signal.index = pd.to_datetime(temp_signal.index)  # 规范Date格式
    dic_signal[f] = temp_signal

# 单独获取 position
dic_position = {'zyhgitt': get_position(dic_signal['zyhgitt']),
                'zcqlb': get_position(dic_signal['zcqlb'], barShifted=2),
                'srclyoy': get_position(dic_signal['srclyoy'], barShifted=2),
                'xckpmiidy': get_position(dic_signal['xckpmiidy'], barShifted=2),
                'tb10y_diff': get_position(dic_signal['tb10y_diff']),
                'tb10y_ttm': get_position(dic_signal['tb10y_ttm']),
                'nrfs': get_position(dic_signal['nrfs']),
                'roc': get_position(dic_signal['roc']),
                'sma': get_position(dic_signal['sma']),
                'dma': get_position(dic_signal['dma']),
                'dma_ama': get_position(dic_signal['dma_ama']),
                'macd': get_position(dic_signal['macd']),
                'trix': get_position(dic_signal['trix']),
                'bbi': get_position(dic_signal['bbi']),
                'bbands': get_position(dic_signal['bbands']),
                'cci': get_position(dic_signal['cci']),
                'kdj': get_position(dic_signal['kdj']),
                'rsi': get_position(dic_signal['rsi']),
                'cmo': get_position(dic_signal['cmo']),
                'bbands_rzmr': get_position(dic_signal['bbands_rzmr']),
                'bbands_iccfe': get_position(dic_signal['bbands_iccfe']),
                'bbands_ifcfe': get_position(dic_signal['bbands_ifcfe']),
                'volumeratio': get_position(dic_signal['volumeratio']),
                'oiratio': get_position(dic_signal['oiratio'])}  # key:factor_name;value: position with DatetimeIndex

In [12]:
benchmark_map = {
    12: get_bm('M')[2].reset_index(),
    52: get_bm('W-SUN')[2].reset_index(),
    250: get_bm('D')[2].reset_index()
}

In [17]:
# 创建回测区间序列
backtesting_period = pd.date_range(start=fromdate, end=todate, freq='W-SUN')
engine = create_engine("mysql+pymysql://root:@localhost:3306/factors_signal?charset=utf8mb4")  # 创建数据库连接
results = pd.DataFrame()  # 初始化缓存结果
results_backtesting = pd.DataFrame()  # 初始化回测结果


for i, todate_ in enumerate(tqdm(backtesting_period, desc='PROCESSING')):
    
    next_day_ = todate_ + datetime.timedelta(days=1)  # 初始化信号日期
    fromdate_ = todate_ - datetime.timedelta(days=360)  # 初始化业绩回测起始日期

    # 计算因子净值
    dic_all_net_value = {
        factor_name: get_net_value(dic_position[factor_name], fromdate_, todate_,benchmark_map)
        for factor_name in factors_signals
    }

    # 筛选具有绝对收益的因子
    list_filtered_factor = [
        key for key in dic_all_net_value.keys()
        if dic_all_net_value[key]['nv_idc'].iat[-1] > dic_all_net_value[key]['nv_bm'].iat[-1] or key == 'zcqlb'
    ]

    # 从数据库中提取信号
    results_ = pd.DataFrame()
    for table in list_filtered_factor:
        query = f"SELECT `signal` FROM {table} WHERE `Date` BETWEEN '{fromdate_.strftime('%Y-%m-%d')}' AND '{todate_.strftime('%Y-%m-%d')}' ORDER BY `Date` DESC LIMIT 1"
        df = pd.read_sql(query, engine)
        df.rename(columns={'signal': table}, inplace=True)
        results_ = pd.concat([results_, df], axis=1)

    # 插入信号日期
    results_.insert(0, 'Date', next_day_)

    # 对筛选出的因子赋权
    dic_filtered_net_value = {key: dic_all_net_value[key] for key in list_filtered_factor}
    weight = pd.DataFrame(linear_weighting(dic_filtered_net_value, is0neThird=True), index=[0])
    results_['bullish_probability'] = results_.apply(
        lambda row: sum(row[keys] * weight[keys] if row[keys] > 0 else 0.5 * weight[keys] if row[keys] == 0 else 0 for keys in weight.keys()), axis=1
    )

    # 合并结果
    combine_results_ = pd.concat([results_, weight], axis=0)
    results = pd.concat([results, combine_results_], axis=0)
    results_backtesting = pd.concat([results_backtesting, results_], axis=0)

    # 将 'bullish_probability' 列移动到最后一列
    cols = list(results.columns)
    cols.append(cols.pop(cols.index('bullish_probability')))
    results = results[cols]
    results_backtesting = results_backtesting[cols]

engine.dispose()


PROCESSING:   0%|          | 0/1 [00:00<?, ?it/s]

In [None]:
df_bullish_posobility_ = perform_backtesting(results_backtesting,isDraw=True)