In [None]:
# 引入库
import sys

sys.path.append('../..')
from q_tools.Tdays import (Tdaysoffset, get_trade_period)
from q_tools.BuildStockPool import Filter_Stocks
from q_tools.Stragegy_performance import (get_performance_table,
                                              get_information_table,
                                              calc_mono_score)
from 

import functools
import warnings
from typing import (List, Tuple, Dict, Callable, Union)
from tqdm import tqdm_notebook

import alphalens as al
import alphalens.performance as perf
import statsmodels.api as sm
from scipy.stats import pearsonr
import pandas as pd
import numpy as np
import empyrical as ep

from jqdata import *
from jqfactor import (calc_factors, Factor)

import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

plt.rcParams['font.sans-serif'] = ['SimHei']  #用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False  #用来正常显示负号

In [None]:
"""因子构建相关"""
# 修饰器-分钟及数据数量大用次部分进行拆分


def split_securities(Limit: int = 500):
    def decorator(func: Callable):
        @functools.wraps(func)
        def wrapper(*args, **kw):

            if 'securities' not in kw:

                securities = args[0]

                args = args[1:]

            else:

                securities = kw['securities']

                del kw['securities']

            if isinstance(securities, str):

                securities = [securities]

            size = len(securities)

            if size > Limit:

                return pd.concat((func(securities[i:i + Limit], *args, **kw)
                                  for i in range(0, size, Limit)))

            else:

                return func(securities, *args, **kw)

        return wrapper

    return decorator

# 获取日内数据


@split_securities()
def get_intraday_price(securities: Union[str, List], end_date: str,
                       count: int) -> pd.DataFrame:
    """获取日内分钟数据

    Args:
        securities (Union[str, List]): 标的
        end_date (str): 观察日
        count (int): 天数

    Returns:
        pd.DataFrame: 数据
    """
    return get_price(securities,
                     end_date=end_date + ' 15:00:00',
                     count=240 * count,
                     frequency='1m',
                     fields=['close', 'volume'],
                     panel=False)


# 计算相关系数
def calc_corr(price: pd.DataFrame) -> pd.Series:
    """计算量价相关系数

    Args:
        price (pd.DataFrame): 量价数据，index-datetime

    Returns:
        pd.Series: 相关系数
    """
    group = price.index.normalize()

    return price.groupby(price.index.normalize()).apply(
        lambda x: pearsonr(x['close'], x['volume'])[0])


# 计算标准分
def scaling_z_score(ser: pd.Series) -> pd.Series:
    """标准分

    Args:
        ser (pd.Series): 因子值

    Returns:
        pd.Series: 标准化后的因子
    """
    return (ser - ser.mean()) / ser.std()


# 计算残差
def calc_ols(x: pd.Series, y: pd.Series,
             method: str = 'resid') -> pd.Series:
    """计算回归

    Args:
        x (pd.Series): 自变量
        y (pd.Series): 应变量

    Returns:
        pd.Series: 残差
    """

    result = sm.OLS(y.fillna(0), sm.add_constant(np.nan_to_num(x))).fit()

    return getattr(result, method)


def Culling_factor(factor_ser: pd.Series,
                   other_factor: Union[pd.Series, pd.DataFrame],
                   scaling: bool = True) -> pd.Series:
    """计算剔除其他因子

    再各自横截面标准化，等权线性相加构建综合因子
    Args:
        factor_ser (pd.Series): 因子
        other_factor (pd.Series|pd.DataFrame): 反转因子
        scaling (bool):是否标准化
    Returns:
        pd.DataFrame: 结果
    """

    # 去除反转因子的影响
    ser = calc_ols(other_factor, factor_ser, 'resid')

    if scaling:
        # 标准化
        scaling_ser = scaling_z_score(ser)

    else:

        scaling_ser = ser

    return scaling_ser


@split_securities()
def calc_pv_corr(securities: Union[list, str], end_date: str, count: int,
                 market_cap: pd.Series) -> pd.DataFrame:
    """计算因子

    Args:
        securities (Union[list, str]): 标的
        end_date (str): 观察日
        count (int): 周期
        market_cap (pd.Series): 市值数据

    Returns:
        pd.DataFrame: 因子
    """
    # 获取分钟数据
    price = get_intraday_price(securities, end_date=end_date, count=count)
    # 获取量价相关系数
    pv_corr = (price.set_index('time').groupby('code').apply(calc_corr).T)

    # 计算pv_corr_trend所需的回归系数

    pv_beta = pv_corr.apply(lambda x: calc_ols(np.arange(1,
                                                         len(x) + 1), x,
                                               'params')[1])  # 这里只是中间过程

    # 平均数因子
    pv_corr_avg = pv_corr.mean()
    # 波 动性因子
    pv_corr_std = pv_corr.std()

    # 市值-标准化处理
    market_cap = scaling_z_score(market_cap.loc[securities])
    # 市值中性化
    pv_corr_avg = Culling_factor(pv_corr_avg, market_cap, False)
    pv_corr_std = Culling_factor(pv_corr_std, market_cap, False)
    pv_beta = Culling_factor(pv_beta, market_cap, False)

    pv_corr = scaling_z_score(pv_corr_avg) + scaling_z_score(pv_corr_std)

    df = pd.concat((pv_corr_avg, pv_corr_std, pv_corr, pv_beta), axis=1)
    df.columns = ['pv_corr_avg', 'pv_corr_std', 'pv_corr', 'pv_corr_trend']

    return df


class PV_corr(Factor):

    warnings.filterwarnings("ignore")
    name = 'PV_corr'
    max_window = 20  # 可做敏感分析
    # ROC20-20日动量
    # VOL20-20日换手率
    # 聚宽居然木有波动率....
    dependencies = ['market_cap', 'ROC20', 'VOL20', 'close']

    def calc(self, data: Dict) -> pd.Series:

        codes = data['market_cap'].columns.tolist()
        tradeDate = data['market_cap'].index[-1].strftime('%Y-%m-%d')
        market_cap = data['market_cap'].iloc[-1]

        # 获取反转因子
        volatility: pd.Series = data['close'].pct_change().std()  # 计算20日波动率
        roc: pd.Series = data['ROC20'].iloc[-1]
        vol: pd.Series = data['VOL20'].iloc[-1]
        other_factor: pd.DataFrame = pd.concat((roc, vol, volatility), axis=1)
        # 反转因子标准化
        other_factor = other_factor.apply(scaling_z_score)

        df = calc_pv_corr(securities=codes,
                          end_date=tradeDate,
                          count=self.max_window,
                          market_cap=market_cap)

        # 计算pv_corr_deret20
        pv_corr_avg_: pd.Series = Culling_factor(df['pv_corr_avg'],
                                                 other_factor, True)

        pv_corr_std_: pd.Series = Culling_factor(df['pv_corr_std'],
                                                 other_factor, True)

        df['pv_corr_deret20'] = pv_corr_avg_ + pv_corr_std_

        # 计算pv_corr_trend
        df['pv_corr_trend'] = Culling_factor(df['pv_corr_trend'], other_factor,
                                             False)

        df['CPV'] = scaling_z_score(df['pv_corr_deret20']) + scaling_z_score(
            df['pv_corr_trend'])

        self.market_cap: pd.Series = market_cap
        self.other_factor: pd.DataFrame = other_factor
        self.pv_corr_factor: pd.DataFrame = df


"""因子获取"""


def get_factor(symbol: str, tradeDt) -> pd.DataFrame:
    """获取因子

    Args:
        symbol (str): 股票池范围 A为全A
        startDate (str): 起始日期
        endDate (str): 结束日期
        freq (str): 频率

    Returns:
        pd.DataFrame: 因子
    """

    for tradeDt in tqdm_notebook(periods, desc='因子获取'):

        stock_pool_func = Filter_Stocks(symbol, tradeDt)
        stock_pool_func.filter_paused(21, 20)  # 过滤21日停牌超过20日的股票
        stock_pool_func.filter_st()  # 过滤st
        stock_pool_func.filter_ipodate(60)  # 过滤次新

        PV_CORR = PV_corr()
        codes = stock_pool_func.securities
        calc_factors(codes, [PV_CORR], start_date=tradeDt, end_date=tradeDt)
        yield PV_CORR.pv_corr_factor


def get_freq_price(security: Union[List, str], periods: List) -> pd.DataFrame:
    """获取对应频率价格数据

    Args:
        security (Union[List, str]): 标的
        periods (List): 频率

    Yields:
        Iterator[pd.DataFrame]
    """
    for trade in tqdm_notebook(periods, desc='获取收盘价数据'):

        yield get_price(security,
                        end_date=trade,
                        count=1,
                        fields='close',
                        fq='post',
                        panel=False)


def get_pricing(factor_df: pd.DataFrame, last_periods: str = None) -> pd.DataFrame:
    """获取价格数据

    Args:
        factor_df (pd.DataFrame): 因子数据  MultiIndex levels-0 date levels-1 code
        last_periods (str, optional): 最后一期数据. Defaults to None.

    Returns:
        pd.DataFrame
    """
    if last_periods is not None:
        periods = factor_df.index.levels[0].tolist(
        ) + [pd.to_datetime(last_periods)]
    else:
        periods = factor_df.index.levels[0]

    securities = factor_df.index.levels[1].tolist()

    # 获取收盘价
    price_list = list(get_freq_price(securities, periods))
    price_df = pd.concat(price_list)
    pivot_price = pd.pivot_table(price_df,
                                 index='time',
                                 columns='code',
                                 values='close')
    return pivot_price


class get_factor_returns(object):

    def __init__(self, factors: pd.DataFrame, factor_name: str, max_loss: float) -> None:
        '''
        输入:factors MuliIndex level0-date level1-asset columns-factors
        '''
        self.factors = factors
        self.factor_name = factor_name
        self.name = self.factor_name
        self.max_loss = max_loss

    def get_calc(self, pricing: pd.DataFrame, periods: Tuple = (1,), quantiles: int = 5) -> pd.DataFrame:

        factor_ser: pd.Series = self.factors[self.factor_name]
        preprocessing_factor = al.utils.get_clean_factor_and_forward_returns(factor_ser,
                                                                             pricing,
                                                                             periods=periods,
                                                                             quantiles=quantiles,
                                                                             max_loss=self.max_loss)

        # 预处理好的因子
        self.factors_frame = preprocessing_factor

        # 分组收益
        self.group_returns = pd.pivot_table(preprocessing_factor.reset_index(
        ), index='date', columns='factor_quantile', values=1)

        # 分组累计收益
        self.group_cum_returns = ep.cum_returns(
            self.group_returns)

    def long_short(self, lower: int = 1, upper: int = 5) -> pd.Series:
        '''
        获取多空收益
        默认地分组为1,高分组为5
        '''
        try:
            self.group_returns
        except NameError:
            raise ValueError('请先执行get_calc')

        self.long_short_returns = self.group_returns[upper] - \
            self.group_returns[lower]
        self.long_short_returns.name = f'{self.name}_excess_ret'

        self.long_short_cum = ep.cum_returns(self.long_short_returns)
        self.long_short_cum.name = f'{self.name}_excess_cum'

In [None]:
# 因子获取
periods = get_trade_period('2014-01-01', '2021-10-31', 'ME')
factor_dfs = list(get_factor('A', periods))

# 因子值
factor_df = pd.concat({tradeDt: df for tradeDt, df in zip(
    periods, factor_dfs)}, names=['date', 'asset'])

# 数据储存
factor_df.to_csv('cpv.csv')

In [None]:
# 读取
factor_df = pd.read_csv('cpv.csv',index_col=[0,1],parse_dates=['date'])

# 获取收盘价数据
pricing = get_pricing(factor_df, '2021-11-30')

In [None]:
# 数据结构如下
factor_df.tail()

In [None]:
# 查看因子分布
fig, axes = plt.subplots(2, 3, figsize=(18, 9))

axes = [i for x in axes for i in x]
for ax, (name, ser) in zip(axes, factor_df.items()):
    ax.set_title(name)
    ser.hist(ax=ax)

In [None]:
# 获取因子回测数据
factor_name = factor_df.columns.tolist()

res_dic = {}
for name in factor_name:

    res = get_factor_returns(factor_df, name, 0.4)
    res.get_calc(pricing)
    res.long_short(5, 1)
    res_dic[name] = res

In [None]:
# 因子分组收益
fig, axes = plt.subplots(2, 3, figsize=(19, 9))
axes = [i for x in axes for i in x]
for ax,(k,v) in zip(axes,res_dic.items()):
    v.group_cum_returns.plot(
        ax=ax,
        title=k,
        color=['Navy', 'LightGrey', 'DimGray', 'DarkKhaki', 'LightSteelBlue'])
    res.long_short_cum.plot(ax=ax, ls='--', color='red')

plt.subplots_adjust(hspace=0.5)

In [None]:
# 因子分组平均收益
fig, axes = plt.subplots(2, 3, figsize=(19, 9))
axes = [i for x in axes for i in x]
for ax, (k, v) in zip(axes, res_dic.items()):
    
    mono_score = calc_mono_score(v.group_returns)
    ax.yaxis.set_major_formatter(
        mpl.ticker.FuncFormatter(lambda x, pos: '%.2f%%' % (x * 100)))
    ax.text(0.65,0.95,"单调性得分为:%.3f"%mono_score,
           fontsize=10,
           bbox={'facecolor': 'white', 'alpha': 1, 'pad': 5},
           transform=ax.transAxes,
           verticalalignment='top')
    v.group_returns.mean().plot.bar(ax=ax, title=k, color='#1f77b4')

plt.subplots_adjust(hspace=0.5)

In [None]:
# report
ret_by_y = pd.DataFrame(columns=list(res_dic.keys()))
ic_df = pd.DataFrame(columns=list(res_dic.keys()))
risk_df = pd.DataFrame(columns=list(res_dic.keys()))
for k, v in res_dic.items():
    ret_by_y[k] = v.group_returns[1].groupby(pd.Grouper(
        level=0, freq='Y')).apply(lambda x: ep.cum_returns(x)[-1])

    ic_df[k] = get_information_table(
        perf.factor_information_coefficient(v.factors_frame)).iloc[:, 0]
    risk_df[k] = get_performance_table(v.group_returns[[1]],
                                       periods='monthly').iloc[:, 0]

ret_by_y.index = ret_by_y.index.strftime('%Y')
ret_by_y.index.names = ['年度']

ret_by_y.style.format('{:.2%}').bar(align='mid', color=['#5fba7d', '#d65f5f'])

In [None]:
risk_df.style.format('{:.2%}').highlight_max(axis=1, color='#d65f5f')

In [None]:
# IC统计
ic_df