# 情绪投资策略(2014-2022年全A股市场)

## 数据准备

### 1.1 加载面板数据

In [1]:
%%time
import sys
import os

sys.path.append('/home/ubuntu/notebook/Investor-Sentiment')


def load_data():
    # 数据集:个股K线面板数据,个股基本面数据
    data_list = ['ASHARE_BAR_PANEL.parquet', 'ASHARE_BASIC_PANEL.parquet']
    if not set(data_list).issubset(os.listdir('./DataSets/')):
        from loader.findata_loader import DownLoader
        DownLoader(MAX_CORE=10).load_data()


load_data()

CPU times: user 58 µs, sys: 9 µs, total: 67 µs
Wall time: 72.7 µs


In [2]:
%%time
import cudf
import pandas as pd


def extract_data():
    def extract_panel():
        # 个股K线数据
        df_bar = (cudf.read_parquet('./DataSets/ASHARE_BAR_PANEL.parquet', columns=['trade_date', 'ts_code', 'pct_chg'])
                  .rename(columns={'pct_chg': 'share_return'}))

        # 个股基本面数据
        df_basic = cudf.read_parquet('./DataSets/ASHARE_BASIC_PANEL.parquet', columns=['trade_date', 'ts_code', 'total_mv'])

        # 合并
        df_panel = cudf.concat([df_bar, df_basic], join="left", axis=1, sort=True)

        # 压缩数据
        df_panel.index.levels[1].astype('category', inplace=True)
        return df_panel

    def extract_time_series():
        # 股指数据
        from utils.sql import DB
        db_loader = DB()
        df_share_index = (
            pd.read_sql_table('399300.SZ', db_loader.ENGINE, 'FIN_DAILY_INDEX', columns=['trade_date', 'pct_chg'])
            .astype(dtype={'trade_date': 'uint32'}).set_index('trade_date')
            .rename(columns={'pct_chg': 'shareindex_return'}))

        # shibor数据
        df_shibor = (pd.read_sql_table('SHIBOR', db_loader.ENGINE, 'FIN_DAILY_INDEX', columns=['trade_date', '3m'])
                     .astype(dtype={'trade_date': 'uint32'}).set_index('trade_date')
                     .rename(columns={'3m': 'riskfree_return'})/360)

        df_time = pd.concat([df_share_index, df_shibor], join="inner", axis=1, sort=True)
        return cudf.from_pandas(df_time)

    def extract_merge():
        df_panel, df_time = extract_panel(), extract_time_series()
        df_m = cudf.merge(left=df_panel.reset_index(), right=df_time.reset_index(),
                          left_on='trade_date', right_on='trade_date', how="left",
                          sort=True)
        return df_m.set_index(['trade_date', 'ts_code']).sort_index(ascending=[True, True])

    return extract_merge()


df_panel = extract_data()

CPU times: user 3.45 s, sys: 1.88 s, total: 5.33 s
Wall time: 4.79 s


### 1.2 筛选和清洗数据

In [3]:
df_panel = df_panel[df_panel.index.get_level_values('trade_date') >= 20140101].to_pandas()
df_panel

Unnamed: 0_level_0,Unnamed: 1_level_0,share_return,total_mv,shareindex_return,riskfree_return
trade_date,ts_code,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
20140102,000001.SZ,-0.1641,1.002537e+07,-0.3454,0.015460
20140102,000002.SZ,-0.4972,8.799966e+06,-0.3454,0.015460
20140102,000004.SZ,1.3734,9.917646e+04,-0.3454,0.015460
20140102,000005.SZ,-0.4000,2.276691e+05,-0.3454,0.015460
20140102,000006.SZ,-1.2164,6.574476e+05,-0.3454,0.015460
...,...,...,...,...,...
20221130,872374.BJ,-1.7259,,0.1199,0.006092
20221201,301290.SZ,-8.7349,4.856403e+05,1.0831,0.006103
20221201,301311.SZ,12.7436,5.414400e+05,1.0831,0.006103
20221201,870199.BJ,-3.1447,1.498420e+05,1.0831,0.006103


## 2.构造截面异质波动率与市值高低组合

#### 2.1 计算面板数据的异质波动率IDVOL

In [9]:
%%time
from statsmodels.regression.rolling import RollingOLS
# 多线程分组计算
from pandarallel import pandarallel


def roll_idvol(df_code: cudf.DataFrame, ols_window: int, var_ma: int) -> pd.DataFrame:
    """
    滚动OLS回归求异质波动率
    """
    try:
        # 索引

        # 估计参数
        model = RollingOLS(endog=df_code[['Y']], exog=df_code[['CONST', 'X']], window=ols_window)
        df_para = model.fit().params.rename(columns={'CONST': 'Alpha', 'X': 'Beta'})

        # 预测残差 已经对齐了
        df_con = pd.concat([df_code, df_para], axis=1)
        df_con['Residual'] = df_con['Alpha'] + df_con['Beta']*df_con['X'] - df_con['Y']

        # 计算月波动率
        df_con['Idvol'] = df_con['Residual'].rolling(var_ma).apply(lambda x: np.var(x, ddof=1))
        return df_con[['share_return', 'total_mv', 'Idvol']]

    except Exception as e:
        return pd.DataFrame(columns=df_code.index.names).set_index(df_code.index.names)


# 定义回归变量 CAPM回归: (rm-rf)=a+b*(RM-rf)
df_panel['Y'] = df_panel['share_return'] - df_panel['riskfree_return']
df_panel['CONST'] = 1  #带截距项回归
df_panel['X'] = df_panel['shareindex_return'] - df_panel['riskfree_return']

#多线程加速
pandarallel.initialize(progress_bar=True)
df_out = (df_panel.groupby(level=['ts_code'])[['share_return', 'total_mv', 'Y', 'CONST', 'X']]
          .parallel_apply(lambda x: roll_idvol(x, 5, 30))).droplevel(2)

INFO: Pandarallel will run on 10 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=228), Label(value='0 / 228'))), HB…

CPU times: user 1.67 s, sys: 427 ms, total: 2.1 s
Wall time: 4.36 s


### 2.2 按照异质波动率分组

上面的面板数据计算完成后,从这里开始运行

In [None]:
%%time
import pandas as pd
import numpy as np

QUANTILE = 0.4
df_panel = (
    pd.read_parquet('../DataSets/TEMP_PANEL_FINAL_IDVOL.parquet')
    .drop(columns='ts_code').reset_index().drop(columns='level_1').set_index(['trade_date', 'ts_code']).sort_index()
)

# 分组
df_panel['idvol_top'] = df_panel['idvol'].groupby(level=['trade_date']).transform(
    lambda x: x.quantile(QUANTILE))
df_panel['idvol_group'] = np.where(df_panel['idvol'] >= df_panel['idvol_top'], 'HIGH', "LOW")
df_panel['idvol_group'] = df_panel['idvol_group'].astype('category')
df_panel = df_panel.reset_index().set_index(['trade_date', 'idvol_group', 'ts_code']).sort_index()

# 求组中市值加权系数,并求回报
df_panel['mv_ratio'] = (df_panel['total_mv']/
                        df_panel.groupby(level=['trade_date', 'idvol_group'])['total_mv']
                        .transform(lambda x: sum(x)))
# 求组中回报
df_panel['idvol_vw_ratio'] = df_panel['mv_ratio']*df_panel['pct_chg']
df_panel['idvol_group_return'] = (df_panel.groupby(level=['trade_date', 'idvol_group'])['idvol_vw_ratio']
                                  .transform(lambda x: sum(x)))
df_panel

In [None]:
# 提取分组数据
df_panel = df_panel[['idvol_group_return', 'img_neg', 'tex_neg']].reset_index().set_index(['trade_date', 'idvol_group'])
df_panel = df_panel[~df_panel.index.duplicated(keep='last')].reset_index()
df_panel

In [None]:
# 转为时间序列数据
df_series = (df_panel
             .pivot(index='trade_date', columns='idvol_group', values='idvol_group_return')
             .reset_index().astype(dtype={'trade_date': 'str'}).set_index('trade_date'))
df_series

In [None]:
# 连接其他数据
from sqlalchemy import create_engine

ENGINE = create_engine('mysql+mysqlconnector://root:1111@localhost:3306')
df_index = (pd.read_sql_table('TEMP_MERGE_INDEX', ENGINE, schema='FIN_DAILY_INDEX')
            .set_index('trade_date').sort_index())
df_new = df_series.join(df_index).dropna(axis=0)
df_new

## 3.按照观测窗口构造投资策略

In [None]:
def cal_return(df, MA):
    df[f'img_neg_m{MA}'] = (df['img_neg'].rolling(MA).mean())

    # 历史均值
    df['is_ma_img'] = (df['img_neg'] >= df[f'img_neg_m{MA}'])
    df['is_ma_img'] = df['is_ma_img'].shift(1)

    # 高于均值投资
    df['img_return'] = np.where(df['is_ma_img'], -1*(df['is_ma_img']*df['HIGH']), df['index_return'])

    # 换算
    df.dropna(axis=0, inplace=True)

    df['mv_csi300'] = (df['index_return'] + 100)/100
    df['mv_img'] = (df['img_return'] + 100)/100

    df['mv_csi300'] = df['mv_csi300'].cumprod(axis=0)
    df['mv_img'] = df['mv_img'].cumprod(axis=0)

    return df.rename(columns={'mv_img': f'mv_img_{MA}'})


df_in = df_new
for i in [5]:
    df_in = cal_return(df_in, i)
df_in = df_in[[i for i in df_in.columns if 'mv_' in i]]
df_in

In [None]:
df_in.to_csv('../DataSets/invest.csv')