In [1]:
# coding = utf-8

%matplotlib inline

import os
import config
import pandas as pd
import numpy as np
import time
import datetime
import multiprocessing
import statsmodels.api as sm
from scipy.stats import ttest_1samp

from PyFin.api import *
from PyFin.api import makeSchedule
from alphamind.api import *
from alphamind.data.processing import factor_processing
from alphamind.data.standardize import standardize
from alphamind.data.winsorize import winsorize_normal
from alphamind.analysis.quantileanalysis import er_quantile_analysis

from sqlalchemy import create_engine, select, and_, or_
from sqlalchemy.pool import NullPool
from models import Alpha191
import inspect
import warnings
warnings.filterwarnings("ignore")

In [2]:
%%time
# database engins
engine = SqlEngine(config.alpha_db) # alpha-mind engine
engine191 = create_engine(config.alpha_db, poolclass=NullPool) # alpha191 engine

CPU times: user 20 ms, sys: 4 ms, total: 24 ms
Wall time: 24.1 ms


In [3]:
%%time
# params
factor_name = 'alpha_53'
neutralized_styles = ["SIZE"] + industry_styles
start_date = '2019-01-01'
end_date = '2019-06-06'
universe_name = ['zz500','hs300','ashare']
#universe_name ='ashare'
benchmark_code = 905
freq = '10b'
session = str(int(time.time() * 1000000 + datetime.datetime.now().microsecond))

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 28.6 µs


In [4]:
%%time
# get Alpha191 factors
query = select([Alpha191.trade_date, Alpha191.code, Alpha191.__dict__[factor_name]]).where(
    and_(Alpha191.trade_date >= start_date, Alpha191.trade_date <= end_date, ))
    
factors = pd.read_sql(query, engine191)

CPU times: user 1.04 s, sys: 220 ms, total: 1.26 s
Wall time: 9.29 s


In [6]:
# get risk info and rets
universe = None
for name in universe_name:
    if universe is None:
        universe = Universe(name)
    else:
        universe += Universe(name)
dates = makeSchedule(start_date, end_date, freq, calendar='china.sse')
factor_negMkt = engine.fetch_factor_range(universe, "negMarketValue", dates=dates)
risk_cov, risk_factors = engine.fetch_risk_model_range(universe, dates=dates)
dx_returns = engine.fetch_dx_return_range(universe, dates=dates, horizon=map_freq(freq))

# data combination
total_data = pd.merge(factors, risk_factors, on=['trade_date', 'code'])
total_data = pd.merge(total_data, factor_negMkt, on=['trade_date', 'code'])
total_data = pd.merge(total_data, dx_returns, on=['trade_date', 'code'])
total_data.dropna(inplace=True)

In [7]:
total_data

Unnamed: 0,trade_date,code,alpha_53,srisk,BETA,MOMENTUM,SIZE,EARNYILD,RESVOL,GROWTH,...,IronSteel,NonBankFinan,ELECEQP,AERODEF,Conglomerates,COUNTRY,negMarketValue,chgPct,secShortName,dx
0,2019-02-20,1,58.333333,26.521,-0.084,0.906,1.912,1.621,-0.034,-0.185,...,0,0,0,0,0,1,1.959125e+11,0.0124,平安银行,0.136524
1,2019-02-20,2,50.000000,31.903,0.090,1.018,2.343,1.367,0.560,0.021,...,0,0,0,0,0,1,2.684301e+11,0.0222,万科A,0.064749
2,2019-02-20,4,66.666667,28.263,-1.624,-0.604,-2.900,-0.970,-0.434,0.569,...,0,0,0,0,0,1,1.383000e+09,0.0000,国农科技,0.132541
3,2019-02-20,5,58.333333,21.849,-0.032,-0.349,-2.070,-0.927,-0.535,2.288,...,0,0,0,0,0,1,3.279633e+09,-0.0096,世纪星源,0.187544
4,2019-02-20,6,66.666667,34.499,0.753,-0.602,-1.238,1.091,-0.412,-1.015,...,0,0,0,0,0,1,7.698839e+09,-0.0223,深振业A,0.106388
5,2019-02-20,7,58.333333,100.069,0.858,-3.152,-2.483,-1.139,3.305,-2.652,...,0,0,0,0,1,1,1.915478e+09,0.0214,全新好,0.194383
6,2019-02-20,8,58.333333,54.084,-0.822,-1.400,-0.894,-0.533,1.968,-0.057,...,0,0,0,0,0,1,1.010383e+10,-0.0125,神州高铁,0.184378
7,2019-02-20,9,58.333333,22.950,1.205,-0.656,-0.951,-0.075,-0.927,0.074,...,0,0,0,0,1,1,1.021738e+10,-0.0143,中国宝安,0.255082
8,2019-02-20,10,66.666667,55.760,-0.481,-0.754,-2.243,-3.805,0.267,0.543,...,0,0,0,0,0,1,1.749274e+09,-0.0089,美丽生态,0.206893
9,2019-02-20,11,75.000000,34.803,0.805,-0.632,-1.473,-0.572,-0.173,0.291,...,0,0,0,0,0,1,1.788523e+09,-0.0277,深物业A,0.084844


In [6]:
def prc_win_std(params):
    df = params[0]
    factor_name = params[1]
    ret_preprocess = factor_processing(df[[factor_name]].values,
                                       pre_process=[winsorize_normal, standardize],
                                  )
    df["prc_factor"] = ret_preprocess
    return df

In [7]:
# 数据预处理的步骤
# 去极值、标准化
grouped_list = []
grouped = total_data.groupby(['trade_date'])
for k, g in grouped:
    grouped_list.append([g,factor_name])

cpus = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=cpus) as p:
    alpha_res = p.map(prc_win_std, grouped_list)
total_data = pd.concat(alpha_res).reset_index(drop=True)  

In [8]:
# 中性化
prc_factor = neutralize(total_data[neutralized_styles].values.astype(float),
                        total_data["prc_factor"].values,
                                 groups=total_data['trade_date'])
total_data["prc_factor"] = prc_factor

In [9]:
##写入数据库
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
destination = sa.create_engine("mysql+mysqlconnector://quant:AUsYCJ4cMa@127.0.0.1:3306/quant")
destsession = sessionmaker( bind=destination, autocommit=False, autoflush=True)
def update_destdb(table_name, sets):
    sets = sets.where(pd.notnull(sets), None)
    sql_pe = 'INSERT INTO {0} SET'.format(table_name)
    updates = ",".join( "{0} = :{0}".format(x) for x in list(sets) )
    sql_pe = sql_pe + '\n' + updates
    sql_pe = sql_pe + '\n' +  'ON DUPLICATE KEY UPDATE'
    sql_pe = sql_pe + '\n' + updates
    session = destsession()
    print('update_destdb')
    for index, row in sets.iterrows():
        dict_input = dict( row )
        #dict_input['trade_date'] = dict_input['trade_date'].to_pydatetime()
        session.execute(sql_pe, dict_input)
    session.commit()
    session.close()

In [10]:
task_id = str(int(time.time() * 1000000 + datetime.datetime.now().microsecond))

## 等权五分位存储

In [40]:
n_bins=5
df = pd.DataFrame(columns=['q' + str(i) for i in range(1, n_bins+1)])
grouped = total_data.groupby('trade_date')
for k, g in grouped:
    er = g['prc_factor'].values
    dx_return = g['dx'].values
    res = er_quantile_analysis(er, n_bins=n_bins, dx_return=dx_return, de_trend=True)
    df.loc[k, :] = res
df.index.name = 'trade_date'
df = df.reset_index()

In [41]:
cum_df = df
cum_df['session'] = session
cum_df['factor_name'] = factor_name
cum_df['task_id'] =  task_id
cum_df['q0'] = cum_df.q5 - cum_df.q1
cum_df[['q0','q1','q2','q3','q4','q5']] = cum_df[['q0','q1','q2','q3','q4','q5']].cumsum()
cum_df['trade_date'] = cum_df['trade_date'].apply(lambda x : x.to_pydatetime())
update_destdb('cum_quantile',cum_df)

update_destdb


## 逐年收益

In [13]:
## 逐年收益
yearly_df = df
yearly_df['year'] = yearly_df['trade_date'].apply(lambda x : x.year)
groupd = yearly_df.groupby('year')
new_df = pd.DataFrame(columns=['q' + str(i) for i in range(1, n_bins+1)])
for k, g in groupd:
    new_df.loc[k, :] = g[['q1','q2','q3','q4','q5']].sum()
new_df['session'] = session
new_df['factor_name'] = factor_name
new_df['task_id'] = task_id
new_df.index.name = 'year'
new_df = new_df.reset_index()
update_destdb('yearly_quantile',new_df)

update_destdb


## IC 序列

In [14]:
ic_series = total_data.groupby('trade_date').apply(lambda x: np.corrcoef(x['prc_factor'], x['dx'])[0, 1])
ic_df = pd.DataFrame(ic_series,columns=['ic_values'])
ic_df['session'] = session
ic_df['factor_name'] = factor_name
ic_df['task_id'] = task_id

ic_df = ic_df.reset_index()
ic_df['trade_date'] = ic_df['trade_date'].apply(lambda x : x.to_pydatetime())
update_destdb('ic_serialize',ic_df)

update_destdb


## 行业IR 分析

In [15]:
industry_category = engine.fetch_industry_range(universe, dates=dates)
total_data = pd.merge(total_data, industry_category, on=['trade_date', 'code']).dropna()
industry_ic = total_data.groupby(['trade_date', 'industry']).apply(lambda x: np.corrcoef(x['prc_factor'], x['dx'])[0, 1])
industry_ir = (industry_ic.groupby(level=1).mean() / industry_ic.groupby(level=1).std())

In [16]:
industry_ir_df  = pd.DataFrame(industry_ir,columns=['ir_values'])
industry_ir_df['session'] = session
industry_ir_df['factor_name'] = factor_name
industry_ir_df['task_id'] = task_id
industry_ir_df.index.name='industry_name'
industry_ir_df = industry_ir_df.reset_index()
update_destdb('industry_ir',industry_ir_df)

update_destdb


## IC Decay分析

In [17]:
def factor_shift(params):
    g = params[0]
    ms = params[1]
    for i in range(1, ms+1):
        g["prc_factor_l"+str(i)] = g["prc_factor"].shift(i)
    return g

In [18]:
max_shift = 5

grouped_list = []
grouped = total_data.groupby("code")
for k, g in grouped:
    grouped_list.append([g,max_shift])

cpus = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=cpus) as p:
    alpha_res = p.map(factor_shift, grouped_list)

total_data = pd.concat(alpha_res).reset_index(drop=True)

In [19]:
factor_names = ["prc_factor"]
for i in range(1, max_shift+1):
    factor_names.append("prc_factor_l"+str(i))

values = {}
for f in factor_names:
    ic_series = total_data.groupby('trade_date').apply(lambda x: np.corrcoef(x[f], x['dx'])[0, 1])
    values[f] = ic_series.mean()
values = pd.DataFrame(pd.Series(values))

In [20]:
tvalues = values.T
tvalues['session'] = session
tvalues['factor_name'] = factor_name
tvalues['task_id'] = task_id
tvalues = tvalues.rename(columns={'prc_factor_l1':'l1','prc_factor_l2':'l2',
                               'prc_factor_l3':'l3','prc_factor_l4':'l4',
                               'prc_factor_l5':'l5','prc_factor':'l0'})
update_destdb('ic_decay',tvalues)

update_destdb


## T值序列

In [21]:
grouped = total_data.groupby('trade_date')
fac_rets_ls_series = df.q5 - df.q1
# ## 回归法的因子收益
fac_rets_list = []
t_list = []

# 加权最小二乘
for k, g in grouped:
    X = g[["prc_factor"] + neutralized_styles]
    y = g[["dx"]]
    wts =np.sqrt(g[["negMarketValue"]])
    results = sm.WLS(y,X,weights=wts).fit()
    fac_rets_list.append(results.params[0])
    t_list.append(results.tvalues[0])

fac_rets_series = pd.Series(fac_rets_list, index=fac_rets_ls_series.index)
t_series = pd.Series(t_list, index=fac_rets_ls_series.index)

In [22]:
t_df = pd.DataFrame(t_series.values,index=df.trade_date, columns=['t_values']).reset_index()
t_df['session'] = session
t_df['factor_name'] = factor_name
t_df['task_id'] = task_id
update_destdb('t_serialize',t_df)

update_destdb


## 年化收益率 换手率 IC均值  IR值 T值 换手率计算

In [23]:
#收益率
fac_rets = fac_rets_series.cumsum().values[-1]

#收益率t值
fac_rets_ttest = ttest_1samp(fac_rets_series, 0)
t_rets = fac_rets_ttest.statistic

ic_mean = ic_series.mean()
ic_std = ic_series.std()
ic_marked = len(ic_series[ic_series.abs()>0.02])/len(ic_series)
ir = ic_mean / ic_std

#年化收益率
annualized = cum_df.q0.values[-1] / len(cum_df.q0) * 250

In [49]:
from alphamind.data.quantile import quantile
last_code = None
diff_count = 0
sum_count = 0
res = []
grouped = total_data.groupby('trade_date')
for k, g in grouped:
    er = g['prc_factor'].values
    g['group'] = quantile(er.flatten(), n_bins)
    res.append(g)
turnover_df = pd.concat(res).reset_index()[['trade_date','code','group']]
group_n = 0
if df.q0.cumsum().values[-1] > df.q4.cumsum().values[-1]:
    group_df = turnover_df.set_index('group').loc[0].reset_index()
else:
    group_df = turnover_df.set_index('group').loc[4].reset_index()
grouped = group_df.groupby('trade_date')
for k, g in grouped:
    if last_code is None:
        sum_count = len(g.code.values)
        last_code = g.code.values
    else:
        mix_code = set(g.code.values) & set(last_code)
        print(len(mix_code), len(g.code.values), len(last_code))
        diff_count += (len(g.code.values) - len(mix_code))
        sum_count += len(g.code.values)
diff_count/sum_count

18 100 100
27 100 100
16 100 100
13 100 100
18 100 100
23 100 100
16 100 100
20 100 100
18 100 100
29 100 100


0.7290909090909091

In [53]:
basic_info = {'fac_rets':fac_rets,'t_rets':t_rets,'ic_mean':ic_mean,
             'ic_std':ic_std,'ic_marked':ic_marked,'ir':ir,
             'annualized':annualized,'turnover_rate':turnover_rate,
             'update_time':datetime.datetime.now(),
             'remark':'中证500股票池'}

In [54]:
basic_df = pd.DataFrame([basic_info])
basic_df['session'] = session
basic_df['factor_name'] = factor_name
basic_df['task_id'] = task_id
update_destdb('basic_info',basic_df)

update_destdb
