In [1]:
# import 
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import time
import warnings
import copy
import os
warnings.filterwarnings("ignore")
from sys import getsizeof

In [2]:
def get_window(data_dir: str,
               trade_dates: np.ndarray,
               time_window: int,
               current_index: int = 0,
               how:str = 'inner'
              ) -> pd.DataFrame:
    """
    从硬盘中整合eps的数据，读入内存。
    输入参数为交易起始日，和时间窗口长度，
    输出参数为一个pandas的df
    
    这个函数只在第一次调用的时候用到，读出的表格已经写入新的CSV文件，
    之后的硬盘读取不再需要这个函数，而是直接使用read_csv()方法来完成。
    """
    file_name = data_dir + 'eps_{}.csv'.format(trade_dates[current_index])
    df = pd.read_csv(file_name, index_col=1).loc[:]['eps']

    for i in range(1, time_window):

        file_name = data_dir + 'eps_{}.csv'.format(trade_dates[current_index + i])
        new_df = pd.read_csv(file_name, index_col=1).iloc[:]['eps']
        df = pd.concat([df, new_df], join=how, axis=1)  # 取交集

    df.columns = trade_dates[current_index:current_index + time_window]
    
    return df

In [3]:
def get_frame(data:pd.DataFrame,
              time_window:int,
             index_num:int,
             direction:str)->pd.DataFrame:
    """
    从内存中的数据框读取合适大小的数据用于计算
    """
    
    index_num += 1
    if direction == 'forward':
        df = data.iloc[:,index_num : index_num + time_window].dropna()
    elif direction == 'backward':
        df = data.iloc[:,index_num - time_window: index_num].dropna()
    
    return df

In [4]:
def z_score(X: pd.DataFrame)->pd.DataFrame:
    display(X)
    return (X - np.mean(X)) / np.std(X)

In [5]:
def find_trade_day(trade_dates:np.ndarray, day:int, direction:str = 'backward'):
    """
    将输入的随便的数转化成列表中有的交易日
    """
    if day in trade_dates: # 如果本来就在列表当中
        nearest_trade_day = day
    elif direction == 'backward':
        nearest_trade_day = trade_dates[max((0, np.argmax(trade_dates - day >0) - 1))]
    elif direction == 'forward':
        nearest_trade_day = trade_dates[np.argmax(trade_dates - day >0)]
        
    return nearest_trade_day

In [6]:
def cal_strategy(period, step, start_day_index, skip_first_day,
                 get_Xt, #关键的获得信号的函数
                 is_display,
                )-> tuple:
    """
    将权重函数（信号）和未来的五日收益率相乘的到策略的未来收益率
    ## 注意，这里的列名是指利用这一天及以前（含这一天）的数据对未来收益率做的预测。
    """
    

    daily_return = copy.deepcopy({'all':[],'top':[],'bottom':[]})
    daily_signal = copy.deepcopy({'all':[],'top':[],'bottom':[]})
    # 遍历我们要计算的每一天
    for i in range(0,period,step): 
        
        current_day_index =  start_day_index + i # 找到现在的交易日在交易日列表中的索引
        current_day = trade_dates[current_day_index]

        # 获取用于计算的数据框

        df_return = get_frame(data=total_return,
                             time_window=train_window, 
                             index_num = current_day_index,
                             direction='backward') 
        
        #  将数据框传到计算信号的函数当中。注意生成交易信号的时候就要判断是否多空

        X = df_return.apply(get_Xt,axis=1).sort_values(ascending=False)
        X.index = X.index.astype(int)
        
#         取与股票池的交集
        if is_intersect is True:
            current_target_stocks = target_stock.loc[:][current_day].dropna()
            current_target_stocks = current_target_stocks.astype(int)
            
            intersec_stock = pd.Series(list(set(X.index).intersection(set(current_target_stocks.values))))
            X = X.loc[intersec_stock][:]
            df_return=df_return.loc[intersec_stock][:]
        
        X = X.astype(float)
        X = (X - np.mean(X)) / np.std(X)  # 做 z-score
        X[X > 3] = 3
        X[X < -3]  = -3


        top =  X[X > 0].index
        bottom = X[X < 0].index

        df_X = pd.DataFrame(np.zeros((len(df_return.index),3)), # 先生成全 0 矩阵，方便后面的加法
                         columns=['all','top','bottom'], index=df_return.index)

        df_X.loc[top, 'top'] = X[top] / X[top].sum()
        df_X.loc[bottom ,'bottom'] = - X[bottom] / X[bottom].sum()
        df_X['all'] = df_X['top'] + df_X['bottom']
        df_X.replace(0, np.nan, inplace=True)
        X = df_X

        if skip_first_day is True: # 如果要跳过第一天的话
            current_day_index += 1
        
        # 获取用于计算未来收益率的数据框
        test_return = get_frame(data=total_return,
                               time_window=test_window,
                               index_num=current_day_index,
                               direction='forward')
        
        # 计算今日的收益率
        for column in X.columns:
            daily_signal[column].append(X[column])
            daily_return[column].append((test_return.sum(axis=1) * X[column]).dropna())
         
        

    
    # 将列表中的Series统一拼接
    strategy_return = copy.deepcopy({})
    strategy_signal = copy.deepcopy({})
    for key in daily_return.keys():
        strategy_return[key] = pd.concat(daily_return[key], axis=1, join='outer')
        strategy_signal[key] = pd.concat(daily_signal[key], axis=1, join='outer')
        
        strategy_return[key].columns= trade_dates[range(start_day_index,start_day_index + period, step)]
        strategy_signal[key].columns= trade_dates[range(start_day_index,start_day_index + period, step)]
        
    strategy_return['all'] = (strategy_return['top'].replace(np.nan,0)-strategy_return['bottom'].replace(np.nan,0)).replace(0,np.nan)
    strategy_return['bottom'] = -strategy_return['bottom']
#     if note is not None:
#         for key in daily_return.keys():
#             strategy_return[key].to_csv(f'./data/strategy_return_{key}/' + note +'.csv',encoding='gbk',index=True)
#             strategy_signal[key].to_csv(f'./data/strategy_signal_{key}/' + note +'.csv',encoding='gbk',index=True)
#             header = not os.path.isfile('modified_return.csv')
#             daily_return = pd.DataFrame(strategy_return[key].sum(),columns=[note,]).T
#             daily_return.to_csv('modified_return.csv', header=header,index=True,encoding='gbk',mode='a')

    return strategy_return, strategy_signal

###############################################################################
###############################################################################
###############################################################################

def show_strategy(strategy_return:dict,
                  strategy_signal:dict,
                  bench_mark:pd.Series,
                  long_short: str,
                  step: int = 5, 
                  note: str = '', 
                  is_display: bool = False,
                 ):
    """
    展示策略收益的函数，具有普遍适用性。
    这个函数需要绘制一些图片、计算一些数据，并存储到CSV文件当中
    具体而言，需要计算的比例有：
    （全策略、纯空头、纯多头的）胜率、回测收益、
    """
    daily_return = copy.deepcopy({})
    backtest_params = copy.deepcopy({})
    cumulative_return = copy.deepcopy({})
    drawdowns = copy.deepcopy({})
    tov_seq = copy.deepcopy({})
    
    for key in strategy_return.keys(): # 遍历每种组合的情况
        
        # 计算组合的日收益率、累计收益率序列
        daily_return[key] = strategy_return[key].sum()
        cumulative_return[key] = daily_return[key].cumsum()

        # 计算日均收益率、日均波动率、IR、下行偏差DD
        backtest_params[key + '_' + '日均收益率R'] = daily_return[key].mean() / step
        backtest_params[key + '_' + '日均波动率Vol'] = daily_return[key].std() / np.sqrt(step)
        
        backtest_params[key + '_' + '年化收益率ER'] = backtest_params[key + '_' + '日均收益率R'] * 252
        backtest_params[key + '_' + '年化波动率VOL'] = backtest_params[key + '_' + '日均波动率Vol'] * np.sqrt(252)
        
        backtest_params[key + '_' + '信息比率IR'] = backtest_params[key + '_' + '日均收益率R'] / backtest_params[key + '_' + '日均波动率Vol']
        backtest_params[key + '_' + '夏普比率SR'] = backtest_params[key + '_' + '信息比率IR'] * np.sqrt(252)

        backtest_params[key + '_' + '下行偏差DD'] = daily_return[key][daily_return[key] < 0].std()
        
        # 计算胜率：取每日胜率的平均值
        ve_seq = np.count_nonzero(strategy_return[key] > 0, axis = 0) / strategy_return[key].notna().sum(axis=0)
        backtest_params[key + '_' + '胜率VE'] = ve_seq.mean()
    
        # 计算盈亏比：取每日盈亏比的平均值
        gain = strategy_return[key][strategy_return[key] > 0].mean(axis=0)
        loss = strategy_return[key][strategy_return[key] < 0].mean(axis=0)
        backtest_params[key + '_' + '盈亏比PnL'] = (gain / abs(loss)).mean()
        
        # 计算最大回撤
        max_so_far = cumulative_return[key].values[0]
        drawdowns[key] = []
        for trade_day in cumulative_return[key].index:
            
            if cumulative_return[key][trade_day] > max_so_far:
                drawdown = 0
                drawdowns[key].append(drawdown)
                max_so_far = cumulative_return[key][trade_day]
            else:
                drawdown =  max_so_far - cumulative_return[key][trade_day]
                drawdowns[key].append(drawdown)
            
        
        backtest_params[key + '_' + '最大回撤MDD'] = max(drawdowns[key])
        
        # 计算Calmar比率、Sortino比率
        backtest_params[key + '_' + '卡玛比率Calmar'] = backtest_params[key + '_' + '年化收益率ER'] / backtest_params[key + '_' + '最大回撤MDD']
        backtest_params[key + '_' + '索提诺比率Sortino'] = backtest_params[key + '_' + '年化收益率ER'] / backtest_params[key + '_' + '下行偏差DD']
        
        # 计算换手率
#         daily_holding = strategy_signal[key] / strategy_signal[key].notna().sum(axis=0)# 先将交易信号转化为持仓数
        daily_holding = strategy_signal[key]
        prior = daily_holding.iloc[:, :daily_holding.shape[1] - 1].fillna(0)
        rear = daily_holding.iloc[:,1:daily_holding.shape[1]].fillna(0)
        
        prior.columns, rear.columns = range(daily_holding.shape[1] - 1),range(daily_holding.shape[1] - 1) # 为了df可以准确做减法，需要修改对齐列名
        
        tov_seq[key] = (rear - prior).abs().sum(axis=0) / 2
        tov_seq[key] = pd.concat([pd.Series(.5), tov_seq[key]])
        backtest_params[key + '_' + '换手率Tov'] = tov_seq[key].mean()
        
    df = pd.DataFrame.from_dict(backtest_params,orient='index')

    
    # 修改显示方式
    names = np.array(df.index).reshape((-1,3),order='F').reshape((1,-1),order='C').tolist()[0]
    df = df.loc[names,:]
    '''    
        # 绘制回测收益率曲线、当日回撤柱状图、换手率柱状图
        x = pd.to_datetime(daily_return[long_short].index,format='%Y%m%d')
        time_str = time.strftime('%Y%m%d_%H%M%S',time.localtime())


        plt.figure(figsize=(16,5)) 

        plt.plot(x, cumulative_return['all'] * 100, color='r')
        plt.plot(x, cumulative_return['top'] * 100, color='g')
        plt.plot(x, cumulative_return['bottom'] * 100, color='b')

        plt.plot(x, bench_mark * 100, color='k')

        plt.legend(['Strategy_all','Strategy_top','Strategy_bottom','Benchmark'])
        plt.xlabel('Date')
        plt.ylabel('Cumulative Return %')
        plt.title('CUMULATIVE RETURN', )
        if note is not None: 
            note = time_str + '_' + note
            plt.savefig('./XSMOM_image/' + note + '.png') # 保存图片

        plt.show()
        plt.figure(figsize=(16,6))
        plt.subplot(211)
        plt.bar(x, - np.array(drawdowns[long_short]) * 100,)
        plt.ylabel('Max Drawdown %')
        plt.title('DRAWDOWN')

        plt.subplot(212)
        tov_show = tov_seq[long_short]
        tov_show[0] = 0
        plt.bar(x,tov_show * 100)

        plt.ylabel('Turnover Rate %')
        plt.xlabel('Date')
        plt.title('TURNOVER RATE')
        plt.show()
    '''
    
    # 写入CSV文件。
    if note is None: 
        return
    df_log = df.T
    
    df_log.index = [note,]
    header = not os.path.isfile('XSMOM_modify.csv') # 如果文件存在，则不要写入表头。如果文件不存在则写入表头
    df_log.to_csv('XSMOM_modify.csv', mode='a', index=True, header=header, encoding='gbk')
    
    return 

In [7]:
def main(get_Xt, # 这是需要的输出信号的名称
         start_day:int = 20210101,
         end_day:int = 20221231,
         skip_first_day:bool = True,
         step:int = 5,
         long_short: str = 'all', # 限制字段，可选'none','short','long','all',默认为'all'，这个字段仅用作画图。而所有的数据都会计算
         note: str = '', # 记录调参细节等内容的字段，将写入最后的日志文件
         read: bool = False, # 写入csv
         is_display: bool = False,
         
        ):
    """
    回测的主函数。
    需要实现的功能有：
    1. 初始化
    2. 运行回测收益并存储数据
    3. 计算回测结果、相应收益指标等元素，并存储到csv文件中
    """
    
    # 初始化部分
    
    
    start_day = find_trade_day(trade_dates, start_day, 'forward')
    start_day_index = int(np.where(trade_dates == start_day)[0])
    end_day = find_trade_day(trade_dates, end_day, 'backward')
    end_day_index = int(np.where(trade_dates == end_day)[0])
    period = end_day_index - start_day_index
    
    bench_mark = zz500.iloc[start_day_index:end_day_index].cumsum().iloc[range(0,period,step)]
    
    if read is True and note is not None:
        
        strategy_return = copy.deepcopy({})
        strategy_signal = copy.deepcopy({})
        for key in ['all', 'top', 'short']:
            
            strategy_return[key] = pd.read_csv(f'./data/strategy_return_{key}/' + note +'.csv',encoding='gbk',index_col=0)
            strategy_signal[key] = pd.read_csv(f'./data/strategy_signal_{key}/' + note +'.csv',encoding='gbk',index_col=0)
            
    else:
        strategy_return, strategy_signal = cal_strategy(period, step, start_day_index, skip_first_day,get_Xt, is_display) ###
    
    show_strategy(strategy_return, strategy_signal, bench_mark, long_short, step, note,is_display)
    
    return 0

In [8]:
# 数据读取准备工作
data_dir = '../华安证券深度神经网络改进时间序列动量策略/baz_TSMOM/'

trade_dates = np.load(data_dir + 'tradedates.npy')
trade_dates = trade_dates[(trade_dates>=20170103) & (trade_dates <= 20230500)]
train_window =  5
test_window = 5


# total_day = trade_dates.shape[0] # 文件中拥有的全部天数
# total_return = get_window(data_dir=data_dir,
#                           trade_dates=trade_dates,
#                           time_window=total_day, # 一次全部读出
#                           current_index=0,
#                           how='outer')
# display(total_return)
# print(f'the size of train_return is {getsizeof(total_return) / (1024 * 1024):4.2f} MB.')
# total_price = total_return.cumsum(axis=1)
# display(total_price)
# total_return.to_csv('total_return.csv')
# total_price.to_csv('total_price.csv')

total_price=pd.read_csv(data_dir + 'total_price.csv',index_col=0)
total_return = pd.read_csv(data_dir + 'total_return.csv' ,index_col=0)
zz500 = pd.read_csv(data_dir + 'ZZ500.csv',index_col=0)
target_stock = pd.read_csv(data_dir + 'target_stock.csv',index_col=0)

total_price.columns = pd.to_numeric(total_price.columns) # 方便后文的整数索引
total_return.columns = pd.to_numeric(total_return.columns)
target_stock.columns = pd.to_numeric(target_stock.columns)
zz500 = zz500.loc[trade_dates]

display(zz500)
display(total_price)
display(total_return)
display(target_stock)

Unnamed: 0_level_0,ZZ500
dt,Unnamed: 1_level_1
20170103,0.009122
20170104,0.011693
20170105,0.000722
20170106,-0.004323
20170109,0.006874
...,...
20230424,-0.006682
20230425,-0.014597
20230426,0.003923
20230427,0.002569


Unnamed: 0_level_0,20170103,20170104,20170105,20170106,20170109,20170110,20170111,20170112,20170113,20170116,...,20230420,20230421,20230424,20230425,20230426,20230427,20230428,20230504,20230505,20230508
CodeInt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,-0.003937,-0.005309,-0.002760,-0.004754,-0.009661,-0.009346,-0.002768,0.003897,-0.010721,-0.039373,...,0.011273,0.007321,-0.014180,0.003221,-0.016329,-0.016141,0.010179,0.023087,0.035650,0.037762
2.0,0.000711,-0.005624,-0.000174,0.006677,0.005278,-0.004972,-0.007031,0.001514,0.069572,0.034962,...,0.220414,0.217020,0.203900,0.204323,0.181784,0.178815,0.196727,0.181249,0.214145,0.199537
4.0,-0.012706,-0.021648,-0.026045,-0.030087,-0.056350,-0.040720,-0.045468,-0.047901,-0.052015,-0.062331,...,0.633625,0.618704,0.650342,0.635326,0.616098,0.644642,0.630193,0.676022,0.703101,0.671319
5.0,0.006572,0.020866,0.019645,0.028747,0.031316,0.030033,0.019092,0.015007,0.012857,0.023810,...,-0.460491,-0.459247,-0.480376,-0.463113,-0.471025,-0.489864,-0.483946,-0.541763,-0.553666,-0.595196
6.0,0.010616,0.010543,0.010402,0.033770,0.017014,0.000989,0.008082,-0.021703,-0.055263,-0.067230,...,-0.249777,-0.257375,-0.267241,-0.274677,-0.279480,-0.283479,-0.282154,-0.292470,-0.274997,-0.291191
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
301387.0,,,,,,,,,,,...,-0.091205,-0.110193,-0.157357,-0.166558,-0.168862,-0.161303,-0.156582,-0.122120,-0.145462,-0.140088
603137.0,,,,,,,,,,,...,-0.154557,-0.230836,-0.222200,-0.247647,-0.268766,-0.261459,-0.239417,-0.229618,-0.232630,-0.218486
301307.0,,,,,,,,,,,...,,,,-0.016043,0.000387,0.061570,0.017362,0.003487,-0.054162,-0.043375
301360.0,,,,,,,,,,,...,,,,,0.037177,0.023285,0.074572,0.035899,-0.052570,0.047279


Unnamed: 0_level_0,20170103,20170104,20170105,20170106,20170109,20170110,20170111,20170112,20170113,20170116,...,20230420,20230421,20230424,20230425,20230426,20230427,20230428,20230504,20230505,20230508
CodeInt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,-0.003937,-0.001372,0.002549,-0.001994,-0.004906,0.000314,0.006578,0.006665,-0.014618,-0.028652,...,0.003602,-0.003953,-0.021501,0.017401,-0.019549,0.000188,0.026320,0.012908,0.012563,0.002112
2.0,0.000711,-0.006336,0.005450,0.006851,-0.001399,-0.010250,-0.002059,0.008545,0.068058,-0.034610,...,0.009222,-0.003394,-0.013119,0.000423,-0.022540,-0.002969,0.017913,-0.015478,0.032895,-0.014607
4.0,-0.012706,-0.008942,-0.004397,-0.004043,-0.026262,0.015629,-0.004748,-0.002432,-0.004115,-0.010316,...,0.008010,-0.014921,0.031638,-0.015016,-0.019228,0.028544,-0.014449,0.045829,0.027079,-0.031783
5.0,0.006572,0.014294,-0.001220,0.009102,0.002568,-0.001283,-0.010941,-0.004085,-0.002150,0.010953,...,-0.003068,0.001244,-0.021130,0.017263,-0.007911,-0.018840,0.005918,-0.057817,-0.011903,-0.041529
6.0,0.010616,-0.000073,-0.000141,0.023368,-0.016755,-0.016026,0.007093,-0.029784,-0.033560,-0.011968,...,-0.007278,-0.007598,-0.009866,-0.007437,-0.004803,-0.003999,0.001325,-0.010316,0.017473,-0.016194
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
301387.0,,,,,,,,,,,...,-0.014547,-0.018988,-0.047164,-0.009201,-0.002304,0.007560,0.004721,0.034462,-0.023341,0.005374
603137.0,,,,,,,,,,,...,-0.154557,-0.076278,0.008636,-0.025447,-0.021118,0.007307,0.022041,0.009799,-0.003012,0.014144
301307.0,,,,,,,,,,,...,,,,-0.016043,0.016430,0.061183,-0.044208,-0.013875,-0.057650,0.010787
301360.0,,,,,,,,,,,...,,,,,0.037177,-0.013891,0.051287,-0.038673,-0.088469,0.099848


Unnamed: 0,20170103,20170104,20170105,20170106,20170109,20170110,20170111,20170112,20170113,20170116,...,20230417,20230418,20230419,20230420,20230421,20230424,20230425,20230426,20230427,20230428
0,2.0,2.0,2.0,2.0,2.0,2.0,2.0,4.0,4.0,,...,,,,,,2.0,2.0,2.0,2.0,2.0
1,4.0,4.0,4.0,4.0,4.0,4.0,4.0,,,,...,,,,,,,,,,
2,34.0,34.0,34.0,34.0,34.0,34.0,34.0,,,,...,,,,,,,,,,
3,59.0,59.0,59.0,59.0,59.0,59.0,59.0,,,,...,88.0,88.0,88.0,88.0,88.0,88.0,88.0,88.0,88.0,
4,63.0,63.0,,,,,,63.0,63.0,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2882,,,,,,,,,,,...,,,,,,,,,,
2883,,,,,,,,,,,...,28.0,28.0,28.0,28.0,28.0,28.0,,,28.0,28.0
2884,,,,,,,,,,,...,,,,,,,,,,
2885,,,,,,,,,,,...,,,,,,,,,,


In [9]:
# longest_above_mean, longest_above_median
import itertools

def _get_length_sequences_where(x):
    """
    This method calculates the length of all sub-sequences where the array x is either True or 1.

    Examples
    --------

    :param x: An iterable containing only 1, True, 0 and False values
    :return: A list with the length of all sub-sequences where the array is either True or False. If no ones or Trues
    contained, the list [0] is returned.
    """
    if len(x) == 0:
        return [0]
    else:
        res = [len(list(group)) for value, group in itertools.groupby(x) if value == 1]
        return res if len(res) > 0 else [0]

def longest_above_mean(x):
    """
    Returns the length of the longest consecutive subsequence in x that is bigger than the mean of x

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :return: the value of this feature
    :return type: float
    """
    if not isinstance(x, (np.ndarray, pd.Series)):
        x = np.asarray(x)
    return np.max(_get_length_sequences_where(x > np.mean(x))) if x.size > 0 else 0

def longest_above_median(x):
    
    if not isinstance(x, (np.ndarray, pd.Series)):
        x = np.asarray(x)
    return np.max(_get_length_sequences_where(x > np.median(x))) if x.size > 0 else 0

a = np.random.randn(24)
a = (np.sign(np.sign(a) + 1))
display(a)
_get_length_sequences_where(a)

array([1., 0., 0., 1., 0., 1., 1., 0., 1., 0., 1., 1., 0., 1., 0., 1., 1.,
       1., 1., 1., 0., 1., 1., 0.])

[1, 1, 2, 1, 2, 1, 5, 2]

In [10]:
def large_std_dev(x):
    """
    Does time series have *large* standard deviation?

    Boolean variable denoting if the standard dev of x is higher than 'r' times the range = difference between max and
    min of x. Hence it checks if

    .. math::

        std(x) > r * (max(X)-min(X))

    According to a rule of the thumb, the standard deviation should be a forth of the range of the values.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param r: the percentage of the range to compare with
    :type r: float
    :return: the value of this feature
    :return type: bool
    """
    
    # 改进版本：不再判断是否r 倍于极差，而是输出标准差和极差的比值
    if not isinstance(x, (np.ndarray, pd.Series)):
        x = np.asarray(x)
    return np.std(x) / (np.max(x) - np.min(x))

In [11]:
def _roll(a, shift):
    """
    Roll 1D array elements. Improves the performance of numpy.roll() by reducing the overhead introduced from the
    flexibility of the numpy.roll() method such as the support for rolling over multiple dimensions.

    Elements that roll beyond the last position are re-introduced at the beginning. Similarly, elements that roll
    back beyond the first position are re-introduced at the end (with negative shift).

    Examples
    --------
    Benchmark
    ---------
    :param a: the input array
    :type a: array_like
    :param shift: the number of places by which elements are shifted
    :type shift: int

    :return: shifted array with the same shape as a
    :return type: ndarray
    """
    # 这个函数相当于把一位数组的前lag项放到了后面，
    if not isinstance(a, np.ndarray):
        a = np.asarray(a)
    idx = shift % len(a) # shift 除以 len(a)的余数
    return np.concatenate([a[-idx:], a[:-idx]])

In [12]:
def num_peaks(x, n = 5):
    """
    Calculates the number of peaks of at least support n in the time series x. A peak of support n is defined as a
    subsequence of x where a value occurs, which is bigger than its n neighbours to the left and to the right.

    Hence in the sequence

    4 is a peak of support 1 and 2 because in the subsequences

    4 is still the highest value. Here, 4 is not a peak of support 3 because 13 is the 3th neighbour to the right of 4
    and its bigger than 4.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param n: the support of the peak
    :type n: int
    :return: the value of this feature
    :return type: float
    """
    x_reduced = x[n:-n]

    res = None
    for i in range(1, n + 1):
        result_first = x_reduced > _roll(x, i)[n:-n]
        
        if res is None:
            res = result_first
        else:
            res &= result_first

        res &= x_reduced > _roll(x, -i)[n:-n]
    return np.sum(res)
a = np.random.randn(24)
num_peaks(a)

1

In [13]:
def c3(x, lag = 5):
    """
    Uses c3 statistics to measure non linearity in the time series

    This function calculates the value of

    .. math::

        \\frac{1}{n-2lag} \\sum_{i=1}^{n-2lag} x_{i + 2 \\cdot lag} \\cdot x_{i + lag} \\cdot x_{i}

    which is

    .. math::

        \\mathbb{E}[L^2(X) \\cdot L(X) \\cdot X]

    where :math:`\\mathbb{E}` is the mean and :math:`L` is the lag operator. It was proposed in [1] as a measure of
    non linearity in the time series.

    .. rubric:: References

    |  [1] Schreiber, T. and Schmitz, A. (1997).
    |  Discrimination power of measures for nonlinearity in a time series
    |  PHYSICAL REVIEW E, VOLUME 55, NUMBER 5

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param lag: the lag that should be used in the calculation of the feature
    :type lag: int
    :return: the value of this feature
    :return type: float
    """
    if not isinstance(x, (np.ndarray, pd.Series)):
        x = np.asarray(x)
    n = x.size
    if 2 * lag >= n:
        return 0
    else:
        return np.mean((_roll(x, 2 * -lag) * _roll(x, -lag) * x)[0 : (n - 2 * lag)])

In [14]:
def change_quant(x, ql = 0.2, qh = 0.8, isabs = True, f_agg = "std"):
    """
    First fixes a corridor given by the quantiles ql and qh of the distribution of x.
    Then calculates the average, absolute value of consecutive changes of the series x inside this corridor.

    Think about selecting a corridor on the
    y-Axis and only calculating the mean of the absolute change of the time series inside this corridor.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param ql: the lower quantile of the corridor
    :type ql: float
    :param qh: the higher quantile of the corridor
    :type qh: float
    :param isabs: should the absolute differences be taken?
    :type isabs: bool
    :param f_agg: the aggregator function that is applied to the differences in the bin
    :type f_agg: str, name of a numpy function (e.g. mean, var, std, median)

    :return: the value of this feature
    :return type: float
    """
    if ql >= qh:
        return 0

    div = np.diff(x)
    if isabs:
        div = np.abs(div)
    # All values that originate from the corridor between the quantiles ql and qh will have the category 0,
    # other will be np.NaN
    try:
        bin_cat = pd.qcut(x, [ql, qh], labels=False)
        bin_cat_0 = bin_cat == 0
    except ValueError:  # Occurs when ql are qh effectively equal, e.g. x is not long enough or is too categorical
        return 0
    # We only count changes that start and end inside the corridor
    ind = (bin_cat_0 & _roll(bin_cat_0, 1))[1:]

    if np.sum(ind) == 0:
        return 0
    else:
        ind_inside_corridor = np.where(ind == 1)

        aggregator = getattr(np, f_agg)
        return aggregator(div[ind_inside_corridor])
    
a = np.random.randn(24)
change_quant(a)

0.3263420713880414

In [15]:
def cid_ce(x, normalize = True):
    """
    This function calculator is an estimate for a time series complexity [1] (A more complex time series has more peaks,
    valleys etc.). It calculates the value of

    .. math::

        \\sqrt{ \\sum_{i=1}^{n-1} ( x_{i} - x_{i-1})^2 }

    .. rubric:: References

    |  [1] Batista, Gustavo EAPA, et al (2014).
    |  CID: an efficient complexity-invariant distance for time series.
    |  Data Mining and Knowledge Discovery 28.3 (2014): 634-669.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param normalize: should the time series be z-transformed?
    :type normalize: bool

    :return: the value of this feature
    :return type: float
    """
    if not isinstance(x, (np.ndarray, pd.Series)):
        x = np.asarray(x)
    if normalize:
        s = np.std(x)
        if s != 0:
            x = (x - np.mean(x)) / s
        else:
            return 0.0

    x = np.diff(x)
    return np.sqrt(np.dot(x, x))

In [16]:
from scipy.signal import cwt, ricker

def cwt_coeff(x, widths=tuple([2,2,2]), coeff=2, w=2):
    """
    Calculates a Continuous wavelet transform for the Ricker wavelet, also known as the "Mexican hat wavelet" which is
    defined by

    .. math::
        \\frac{2}{\\sqrt{3a} \\pi^{\\frac{1}{4}}} (1 - \\frac{x^2}{a^2}) exp(-\\frac{x^2}{2a^2})

    where :math:`a` is the width parameter of the wavelet function.

    This feature calculator takes three different parameter: widths, coeff and w. The feature calculator takes all the
    different widths arrays and then calculates the cwt one time for each different width array. Then the values for the
    different coefficient for coeff and width w are returned. (For each dic in param one feature is returned)

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param param: contains dictionaries {"widths":x, "coeff": y, "w": z} with x array of int and y,z int
    :type param: list
    :return: the different feature values
    :return type: pandas.Series
    """
    calculated = cwt(x, ricker, widths)
    i = widths.index(w)
    if calculated.shape[1] <= coeff:
        return np.NaN
    else:
        return calculated[i, coeff]

a = np.random.randn(24)
cwt_coeff(a)

1.1414772041616543

In [17]:
def ener_ratio_chunks(x, num_segments= 10, segment_focus = 9):
    """
    Calculates the sum of squares of chunk i out of N chunks expressed as a ratio with the sum of squares over the whole
    series.

    Takes as input parameters the number num_segments of segments to divide the series into and segment_focus
    which is the segment number (starting at zero) to return a feature on.

    If the length of the time series is not a multiple of the number of segments, the remaining data points are
    distributed on the bins starting from the first. For example, if your time series consists of 8 entries, the
    first two bins will contain 3 and the last two values, e.g. `[ 0.,  1.,  2.], [ 3.,  4.,  5.]` and `[ 6.,  7.]`.

    Note that the answer for `num_segments = 1` is a trivial "1" but we handle this scenario
    in case somebody calls it. Sum of the ratios should be 1.0.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param param: contains dictionaries {"num_segments": N, "segment_focus": i} with N, i both ints
    :return: the feature values
    :return type: list of tuples (index, data)
    """

    full_series_energy = np.sum(x**2)
    
    assert segment_focus < num_segments # segment_focus 的意思是你关心哪一个子块
    assert num_segments > 0

    if full_series_energy == 0:
        return np.NaN
    else:
        return np.sum(np.array_split(x, num_segments)[segment_focus] ** 2.0) / full_series_energy
a = np.random.randn(24)
ener_ratio_chunks(a)

0.030420353073445032

In [18]:
def fft_coeff(x, coeff=10, attr= "abs"):
    """
    Calculates the fourier coefficients of the one-dimensional discrete Fourier Transform for real input by fast
    fourier transformation algorithm

    .. math::
        A_k =  \\sum_{m=0}^{n-1} a_m \\exp \\left \\{ -2 \\pi i \\frac{m k}{n} \\right \\}, \\qquad k = 0,
        \\ldots , n-1.

    The resulting coefficients will be complex, this feature calculator can return the real part (attr=="real"),
    the imaginary part (attr=="imag), the absolute value (attr=""abs) and the angle in degrees (attr=="angle).

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param param: contains dictionaries {"coeff": x, "attr": s} with x int and x >= 0, s str and in ["real", "imag",
        "abs", "angle"]
    :type param: list
    :return: the different feature values
    :return type: pandas.Series
    """

    assert coeff >= 0, "Coefficients must be positive or zero."
    assert {attr} <= {
        "imag",
        "real",
        "abs",
        "angle",
    }, 'Attribute must be "real", "imag", "angle" or "abs"'

    fft = np.fft.rfft(x)

    res = getattr(np, attr)(fft[coeff]) if coeff < len(fft) else np.NaN
    
    return res
a = np.random.randn(24)
fft_coeff(a)


4.195022805944566

In [19]:
from scipy.signal import welch
def fourier_entropy(x, bins = 5):
    """
    Calculate the binned entropy of the power spectral density of the time series
    (using the welch method).

    Ref: https://hackaday.io/project/707-complexity-of-a-time-series/details
    Ref: https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.signal.welch.html

    """

    def binned_entropy(x, max_bins):
        """
        First bins the values of x into max_bins equidistant bins.
        Then calculates the value of

        .. math::

            - \\sum_{k=0}^{min(max\\_bins, len(x))} p_k log(p_k) \\cdot \\mathbf{1}_{(p_k > 0)}

        where :math:`p_k` is the percentage of samples in bin :math:`k`.

        :param x: the time series to calculate the feature of
        :type x: numpy.ndarray
        :param max_bins: the maximal number of bins
        :type max_bins: int
        :return: the value of this feature
        :return type: float
        """
        if not isinstance(x, (np.ndarray, pd.Series)):
            x = np.asarray(x)

        # nan makes no sense here
        if np.isnan(x).any():
            return np.nan

        hist, bin_edges = np.histogram(x, bins=max_bins)
        probs = hist / x.size
        probs[probs == 0] = 1.0
        return -np.sum(probs * np.log(probs))
    
    _, pxx = welch(x, nperseg=min((len(x), 256)))
    return binned_entropy(pxx / np.max(pxx), bins)
a = np.random.randn(24)
fourier_entropy(a)

1.2636256721237276

In [123]:
def _into_subchunks(x, subchunk_length, every_n=1):
    """
    Split the time series x into subwindows of length "subchunk_length", starting every "every_n".

    For example, the input data if [0, 1, 2, 3, 4, 5, 6] will be turned into a matrix

        0  2  4
        1  3  5
        2  4  6

    with the settings subchunk_length = 3 and every_n = 2
    """
    len_x = len(x)

    assert subchunk_length > 1
    assert every_n > 0

    # how often can we shift a window of size subchunk_length over the input?
    num_shifts = (len_x - subchunk_length) // every_n + 1
    shift_starts = every_n * np.arange(num_shifts)
    indices = np.arange(subchunk_length)

    indexer = np.expand_dims(indices, axis=0) + np.expand_dims(shift_starts, axis=1)
    return np.asarray(x)[indexer]

def permu_entropy(x, tau = 5, dimension = 2): # 这里将tau 设置为5就注定无法仅根据5天的数据计算
    """
    Calculate the permutation entropy.

    Three steps are needed for this:

    1. chunk the data into sub-windows of length D starting every tau.
       Following the example from the reference, a vector

        x = [4, 7, 9, 10, 6, 11, 3

       with D = 3 and tau = 1 is turned into

           [[ 4,  7,  9],
            [ 7,  9, 10],
            [ 9, 10,  6],
            [10,  6, 11],
            [ 6, 11,  3]]

    2. replace each D-window by the permutation, that
       captures the ordinal ranking of the data.
       That gives

           [[0, 1, 2],
            [0, 1, 2],
            [1, 2, 0],
            [1, 0, 2],
            [1, 2, 0]]

    3. Now we just need to count the frequencies of every permutation
       and return their entropy (we use log_e and not log_2).

    Ref: https://www.aptech.com/blog/permutation-entropy/
         Bandt, Christoph and Bernd Pompe.
         “Permutation entropy: a natural complexity measure for time series.”
         Physical review letters 88 17 (2002): 174102 .
    """

    X = _into_subchunks(x, dimension, tau)
    if len(X) == 0:
        return np.nan
    # Now that is clearly black, magic, but see here:
    # https://stackoverflow.com/questions/54459554/numpy-find-index-in-sorted-array-in-an-efficient-way
    permutations = np.argsort(np.argsort(X))
    # Count the number of occurences
    _, counts = np.unique(permutations, axis=0, return_counts=True)
    # turn them into frequencies
    probs = counts / len(permutations)
    # and return their entropy
    return -np.sum(probs * np.log(probs))

In [130]:
from pathos.multiprocessing import ThreadPool as Pool #多线程
def approx_entropy (s :list|np.ndarray, r:float= .2 , m :int =2): # 近似熵
    s = np.squeeze(np.asarray(s))
    th = r * np.std(s) #容限阈值
    def phi (m):
        n = len(s)
        x = s[ np.arange(n-m+1).reshape(-1,1) + np.arange(m) ]
        ci = lambda xi: (( np.abs(x-xi).max(1) <=th).sum()) / (n-m+1) # 构建一个匿名函数
        c = Pool().map (ci, x) #所传递的参数格式: 函数名,函数参数
        return np.sum(np.log(c)) /(n-m+1)
    x = Pool().map(phi, [m, m+1])
    return x[1] - x[0]
a = np.random.randn(24)
approx_entropy(a)

-0.015821905303942785

In [140]:
def sample_entropy(Datalist, r=.2, m=2): # 样本熵
    Datalist = np.asarray(Datalist)
    list_len = len(Datalist)  #总长度
    th = r * np.std(Datalist) #容限阈值
    def Phi(k):
        list_split = [Datalist[i:i+k] for i in range(0,list_len-k+(k-m))] #将其拆分成多个子列表
        #这里需要注意，2维和3维分解向量时的方式是不一样的！！！
        Bm = 0.0
        for i in range(0, len(list_split)): #遍历每个子向量
            Bm += ((np.abs(list_split[i] - list_split).max(1) <= th).sum()-1) / (len(list_split)-1) #注意分子和分母都要减1
        return Bm
    ## 多线程
    x = Pool().map(Phi, [m,m+1])
    return - np.log(x[1] / x[0]) 
#     H = - math.log(Phi(m+1) / Phi(m))
    
a = np.random.randn(60)
sample_entropy(a)

1.3862943611198908

In [141]:
def fuzzy_entropy(s, r = 0.2, m = 2, n = 2): # 模糊熵
    '''s:需要计算熵的向量; r:阈值容限(标准差的系数); m:向量维数; n:模糊函数的指数
    '''
    s = np.asarray(s)
    N = len(s)  #总长度
    th = r * np.std(s) #容限阈值
    def Phi(k):
        list_split = [s[i:i+k] for i in range(0,N-k+(k-m))] #将其拆分成多个子列表
        #这里需要注意，2维和3维分解向量时的方式是不一样的！！！
        B = np.zeros(len(list_split))
        for i in range(0, len(list_split)): #遍历每个子向量
            di = np.abs(list_split[i] - np.mean(list_split[i]) - list_split + np.mean(list_split,1).reshape(-1,1)).max(1)
            Di = np.exp(- np.power(di,n) / th)
            B[i] = (np.sum(Di) - 1) / (len(list_split)-1) #这里减1是因为要除去其本身，即exp(0)
        return np.sum(B) / len(list_split)
    
    x = Pool().map(Phi, [m, m+1])
    return - np.log(x[1]/ x[0])
a = np.random.randn(24)
fuzzy_entropy(a)

1.9069144059598258

In [142]:
# 函数测试诊断：多窗口
is_intersect = False
time1 = time.time()
for train_window in [60, 126, 252]:
    
    for func_name in ['approx_entropy','sample_entropy','fuzzy_entropy','permu_entropy']:
        
        note = f'Day{train_window}'+func_name
        get_Xt = eval(func_name)
        
        main(get_Xt=get_Xt,
             start_day=20210101,
             end_day=20221231,
             skip_first_day=True,
             step=5,
             note=note,
             is_display=False,
             )

        print(f'Calculation of {func_name} Freq{train_window} has completed!')
time2 = time.time()
print(f'Total time cost = {(time2 - time1)/60 :.2f} mins.')

Calculation of approx_entropy Freq60 has completed!
Calculation of sample_entropy Freq60 has completed!
Calculation of fuzzy_entropy Freq60 has completed!
Calculation of permu_entropy Freq60 has completed!
Calculation of approx_entropy Freq126 has completed!
Calculation of sample_entropy Freq126 has completed!
Calculation of fuzzy_entropy Freq126 has completed!
Calculation of permu_entropy Freq126 has completed!
Calculation of approx_entropy Freq252 has completed!
Calculation of sample_entropy Freq252 has completed!
Calculation of fuzzy_entropy Freq252 has completed!
Calculation of permu_entropy Freq252 has completed!
Total time cost = 1026.60 mins.


In [None]:
from scipy.signal import welch

def spkt_welch_dens(x, coeff = 3):
    """
    This feature calculator estimates the cross power spectral density of the time series x at different frequencies.
    To do so, the time series is first shifted from the time domain to the frequency domain.

    The feature calculators returns the power spectrum of the different frequencies.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param param: contains dictionaries {"coeff": x} with x int
    :type param: list
    :return: the different feature values
    :return type: pandas.Series
    """
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html#scipy-signal-welch

    freq, pxx = welch(x, nperseg=min(len(x), 256))
    display(pxx)

    if len(pxx) <= np.max(coeff):  
        # There are fewer data points in the time series than requested coefficients
        print('if')
        # filter coefficients that are not contained in pxx
        reduced_coeff = [coeff] if len(pxx) > coeff else []
        not_calculated_coefficients = [coeff] if coeff not in reduced_coeff else []

        # Fill up the rest of the requested coefficients with np.NaNs
        return np.array(list(pxx[reduced_coeff]) + [np.NaN] * len(not_calculated_coefficients))
    else:
        print('else')
        return np.array(pxx[coeff])
a = np.random.randn(5)
display(a)
spkt_welch_dens(a)

In [None]:
def ti_rev_asym_stat(x, lag = 2):
    """
    Returns the time reversal asymmetry statistic.

    This function calculates the value of

    .. math::

        \\frac{1}{n-2lag} \\sum_{i=1}^{n-2lag} x_{i + 2 \\cdot lag}^2 \\cdot x_{i + lag} - x_{i + lag} \\cdot  x_{i}^2

    which is

    .. math::

        \\mathbb{E}[L^2(X)^2 \\cdot L(X) - L(X) \\cdot X^2]

    where :math:`\\mathbb{E}` is the mean and :math:`L` is the lag operator. It was proposed in [1] as a
    promising feature to extract from time series.

    .. rubric:: References

    |  [1] Fulcher, B.D., Jones, N.S. (2014).
    |  Highly comparative feature-based time-series classification.
    |  Knowledge and Data Engineering, IEEE Transactions on 26, 3026–3037.

    :param x: the time series to calculate the feature of
    :type x: numpy.ndarray
    :param lag: the lag that should be used in the calculation of the feature
    :type lag: int
    :return: the value of this feature
    :return type: float
    """
    n = len(x)
    x = np.asarray(x)
    if 2 * lag >= n:
        return 0
    else:
        one_lag = _roll(x, -lag)
        two_lag = _roll(x, 2 * -lag)
        return np.mean(
            (two_lag * two_lag * one_lag - one_lag * x * x)[0 : (n - 2 * lag)]
        )

In [None]:
# 函数测试诊断：单窗口
train_window = 60
time1 = time.time()
is_intersect = False

get_Xt = num_peaks
note = None

main(get_Xt=get_Xt,
     start_day=20210101,
     end_day=20221231,
     skip_first_day=True,
     step=5,
     note=note,
     is_display=True,
     )

print(f'Calculation has completed!')
time2 = time.time()
print(f'Total time cost = {(time2 - time1)/60 :.2f} mins.')