
---

## 因子实战 第拾叁（XIII）集 
# 宏观<span style="color: red;">择时</span>
#### 哈老师，你的新视频做好了吗？

### 🎬 大导演哈罗德
#### 欢迎来到哈罗德的*量化频道*
- 香港中文大学（深圳）金融工程本科
- 下一步学业：即将前往美国攻读金融工程硕士（已获得录取）
- 🌐 [关注我的Bilibili，看所有人都能听得懂的量化学习内容](https://space.bilibili.com/629573485)
- 🌐 [点击这里关注我的YouTube](https://www.youtube.com/@BD_Harold)

🌟🌟🌟 我有一个梦想，就是让量化变成不再是束之高阁的灵丹妙药，而是让散户投资者认识市场风险最好的工具 @哈罗德的量化频道 🌟🌟🌟

---

In [None]:
import pandas as pd
import numpy as np
import math
import pickle
from scipy.stats import gmean
from scipy.stats import percentileofscore
import dateutil.relativedelta

def At(t:int,lam):
    
    #e_t:tx1，最后一行是1，其余是0
    e_t=np.zeros(t)
    e_t[-1]=1    
    
    #I_t:t-2的单位阵
    I_t=np.identity(t-2)
    
    #Q_t:二阶差分矩阵，(t-2)xt
    Q_t=np.zeros((t-2,t)) #先设置shape
    #再通过循环设置每一行的值
    for i in range(t-2):
        Q_t[i,i],Q_t[i,i+1],Q_t[i,i+2]=1, -2, 1
    
    #通过矩阵运算，计算常数阵
    # @:矩阵乘法； Matrix.T:矩阵转置； Matrix.I：矩阵求逆
    A_t=np.matrix(e_t)@(Q_t.T)@(np.linalg.inv(Q_t@(Q_t.T)+I_t/lam))@Q_t
    #结果是一个1xt的矩阵
    return A_t

def one_side_HP_filter(original_series, lam=650):
    """
    :param 功能：单边HP滤波
    :param original_series: 原始时间序列单列的DataFrame，行索引为Time_stamp格式
    :param lam: HP_filter的参数
    
    :param return: 滤波后的时间序列
    """
    
    df_local=original_series.copy()
    data_series=np.array(df_local) #nx1的matrix
    length=len(original_series)
    
    list_cycle=[math.nan,math.nan] #t=1,2时是没有的，用math.nan填充
    for i in range(2,length): 
        #t=i+1
        sub_series=data_series[:i+1] #一共有i+1=t项
        sub_A_t=At(i+1,lam)
        cycle_t=(sub_A_t@sub_series)[0,0]
        list_cycle.append(cycle_t)
    df_local['cycle_1sHP']=list_cycle
    df_local['trend_1sHP']=original_series[original_series.columns[0]]-np.array(list_cycle)
    
    return df_local['trend_1sHP']

def calculate_exceeded_median(data, column_name, window_size):
    # 设置'date'列为索引
    
    # 计算滚动区间的中位数
    rolling_median = data[column_name].rolling(window=window_size).median()

    # 判断每个月月末是否超出滚动区间的中位数
    exceeded_median = (data[column_name] > rolling_median).astype(int)

    return exceeded_median


df_month = pd.read_excel('经济增长_month.xlsx')
df_day = pd.read_excel('经济增长_day.xlsx')

# 对指定列应用滤波函数
df_month['中国:金融机构:中长期贷款余额 trend_filtered'] = one_side_HP_filter(df_month[['中国:金融机构:中长期贷款余额']])
df_month.set_index('date', inplace=True)


# Import the monthly economic growth data
result_dict = {}

# 列名列表
column_names = ['中国:PMI:新出口订单', '中国:产量:发电量:当月值', '中国:社会融资规模:新增人民币贷款:当月值', '中国:金融机构:中长期贷款余额 trend_filtered']

df_day['国债利差 10Y-1M'] = df_day['中国:中债国债到期收益率:10年'] - df_day['中债国债到期收益率:1个月']
df_day.set_index('date', inplace=True)
# 计算滚动区间中位数并标记超出中位数的情况

# 指定滚动窗口的大小
window_sizes = [60,72,84]


for hhwindow_size in window_sizes:

    # 对每一列执行相同的功能
    for column_name in column_names:
        # 计算滚动区间中的中位数并标记超出中位数的情况
        exceeded_median = calculate_exceeded_median(df_month, column_name, hhwindow_size)
        df_month[column_name + " signal" + str(hhwindow_size)] = exceeded_median

        # print(exceeded_median)
        # result_dict[column_name] = exceeded_median

    df_day['国债利差 10Y-1M signal' + str(hhwindow_size)] =  calculate_exceeded_median(df_day, '国债利差 10Y-1M', hhwindow_size)
    
df_month.to_excel('月度数据触发.xlsx')
df_day.to_excel('日度数据触发.xlsx')


In [None]:
for hhwindow_size in window_sizes:

    # 对每一列执行相同的功能
    for column_name in column_names:
        # 计算滚动区间中的中位数并标记超出中位数的情况
        exceeded_median = calculate_exceeded_median(df_month, column_name, hhwindow_size)
        result_dict[column_name] = exceeded_median



    # 计算国债利差
    # 将结果存储到字典中
    result_dict['国债利差 10Y-1M'] =  calculate_exceeded_median(df_day, '国债利差 10Y-1M', hhwindow_size)

    closeprice = pd.read_pickle('IndexQuote_ClosePrice.txt')
    closeprice.index = pd.to_datetime(closeprice.index)

    start_time = pd.to_datetime('20110101', format='%Y%m%d')

    # 00300.SH  000905.SH  399006.SH 分别是300、500和创业板指的代码
    closeprice_500 = closeprice['000905.SH'].loc[closeprice['000905.SH'].index >= start_time]
    closeprice_cyb = closeprice['399006.SZ'].loc[closeprice['399006.SZ'].index >= start_time]
    closeprice_300 = closeprice['000300.SH'].loc[closeprice['000300.SH'].index >= start_time]


    close_price_行业 = pd.read_pickle("IndexQuote_SWS_ClosePrice.txt")
    close_price_行业.index = pd.to_datetime(close_price_行业.index)

    SW_IND_MAP_1 = {
        '801010.SI': '农林牧渔',
        '801030.SI': '基础化工',
        '801040.SI': '钢铁',
        '801050.SI': '有色金属',
        '801080.SI': '电子',
        '801110.SI': '家用电器',
        '801120.SI': '食品饮料',
        '801130.SI': '纺织服饰',
        '801140.SI': '轻工制造',
        '801150.SI': '医药生物',
        '801160.SI': '公用事业',
        '801170.SI': '交通运输',
        '801180.SI': '房地产',
        '801200.SI': '商贸零售',
        '801210.SI': '社会服务',
        '801230.SI': '综合',
        '801710.SI': '建筑材料',
        '801720.SI': '建筑装饰',
        '801730.SI': '电力设备',
        '801740.SI': '国防军工',
        '801750.SI': '计算机',
        '801760.SI': '传媒',
        '801770.SI': '通信',
        '801780.SI': '银行',
        '801790.SI': '非银金融',
        '801880.SI': '汽车',
        '801890.SI': '机械设备',
    } 
    #     '801950.SI': '煤炭',
    #     '801960.SI': '石油石化',
    #     '801970.SI': '环保',
    #     '801980.SI': '美容护理'
    # }

    close_price_行业 = close_price_行业.loc[close_price_行业.index >= start_time]
    close_price_行业 = close_price_行业[SW_IND_MAP_1.keys()]
    # close_price_行业 = close_price_行业.fillna(0)

    def win_rate_test(sig_df, rets_df, window_size):

        
        sig_df = sig_df[(sig_df.index < rets_df.index.max())]

        record = []
        for win in window_size:
            total = sig_df[:len(sig_df.index)-win+1].sum()
            win_num = 0
            for i in range(len(sig_df.index)-win+1):

                # 事件发生的时间
                date = sig_df.index[i]
                
                # 如果发生了事情
                if sig_df.loc[date] == 1:
                    date = pd.to_datetime(date, format='%Y%m%d')
                    # print(date)
                    # print(rets_df.loc[date:][0])
                    # print(date + window)
                    # print(rets_df.loc[date:][0].index)
                    # break
                
                    if rets_df.loc[date:][0] < rets_df.loc[date:][win]:
                        win_num += 1

            
            win_ratio = win_num / total
            # print('{}日胜率={}, 触发次数={}'.format(win, win_ratio, total))      
            record.append(win_ratio)
        record.append(total)
        result = pd.DataFrame(record, index=['20日胜率', '60日胜率','触发次数'])
        result = result.T
        return result
        

    window_size = [20,60]

    output_dict = {}  # Dictionary to store the results
    for name,item in result_dict.items():
        # print()
        # print(name)
        # # print(item)
        # print(len(item))
        temp_500 = win_rate_test(item, closeprice_500, window_size)
        temp_300 = win_rate_test(item, closeprice_300, window_size)
        temp_cyb = win_rate_test(item, closeprice_cyb, window_size)

        industry_results = {}
        for i, j in close_price_行业.items():

            industry_name = SW_IND_MAP_1[i]
            # print(industry_name)
            temp = win_rate_test(item, j, window_size)
            industry_results[industry_name] = temp

        output_dict[name] = {
            '沪深500指数': temp_500,
            '沪深300指数': temp_300,
            '创业板指数': temp_cyb,
            '行业': industry_results
        }

    output_df = pd.DataFrame(columns=['因子', '标的', '20日胜率', '60日胜率', '触发次数','行业'])

    for factor_name, factor_result in output_dict.items():
        for stock_name, stock_result in factor_result.items():
            if isinstance(stock_result, pd.DataFrame):
                temp_df = stock_result.reset_index()
                temp_df['因子'] = factor_name
                temp_df['标的'] = stock_name
                # output_df = output_df.append(temp_df, ignore_index=True)
                output_df = pd.concat([output_df, temp_df], ignore_index=True)
            elif isinstance(stock_result, dict):
                for industry_name, industry_result in stock_result.items():
                    temp_df = industry_result.reset_index()
                    temp_df['因子'] = factor_name
                    temp_df['标的'] = stock_name
                    temp_df['行业'] = industry_name
                    # output_df = output_df.append(temp_df, ignore_index=True)
                    output_df = pd.concat([output_df, temp_df], ignore_index=True)

    # Save the output_df DataFrame to an Excel file
    output_df.to_excel(f"{hhwindow_size}_results.xlsx")