In [1]:
import pandas as pd
import numpy as np
import time
import bottleneck as bk
import cProfile
import pstats
from scipy import stats
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import summary_table

In [25]:
def time_analyse(code):
    'code 是一段python代码'
    cProfile.run(code, 'test.out')
    p = pstats.Stats('test.out')
    p.sort_stats('tottime').print_stats(10)

In [2]:
def insert_nan(data,percent):
    a = data.ravel()
    nan_numbers = round(len(a)*percent)
    index_list = np.random.randint(0,len(a)-1,nan_numbers)
    a[index_list] = np.nan
    
def get_test_data(a,b,per = 0.2):
    test_data = np.random.random((a,b))
    insert_nan(test_data,per)
    return test_data

In [24]:
test_data = get_test_data(3141,3824)
test_data2 = get_test_data(3141,3824)
test_data3 = get_test_data(3141,3824)

时间序列上的操作

In [None]:
def ts_delay(data,period):
    data_cols = data.shape[1]
    pre_data = np.zeros((period,data_cols))
    pre_data[:] = np.nan
    result = np.vstack([pre_data,data[:-period] ])
    return result

def ts_rank(data,period):
    result = bk.move_rank(data,window=period,axis = 0,min_count=3)
    return (result+1)/2

In [None]:
tsrank_res= ts_rank(test_data,4)

### 线性回归的解决方案

 单期简单回归

In [None]:
def cross_simple_regression(y,x):
    slope, intercept, r_value, p_value, std_err = stats.linregress(x,y)
    return slope,intercept
cross_simple_regression(test_data[0],test_data2[0]) # 使用stats.linregress 只能进行两个变量的回归，切无法处理缺失值

In [None]:
def simple_regression(datay,datax):
    # 进行单元回归 一期 获取系数
    x = datax.T
    X = sm.add_constant(x)
    y = datay
    model = sm.OLS(y, X, missing='drop')
    results = model.fit()
#     return results.summary()
    return results.params

In [None]:
%%timeit
simple_regression(test_data[0],test_data2[0:2,:])

单期多元回归

In [None]:
def multi_regression(datay,*args):
    # 进行多元回归 一期
    x = []
    for datax in args:
        x.append(datax)
    x = np.vstack(x)
    y = datay
    params = simple_regression(y,x) 
    return params # 参数为常数开始 其余的对应datax的顺序

In [None]:
%%timeit
multi_regression(test_data[0],test_data2[0],test_data2[1],test_data2[2],test_data2[3])

多期简单回归

In [None]:
def all_period_cross_simple_regression(datay,datax):
    # 进行单元回归 每一期进行回归
    paramList = []
    for i in range(datax.shape[0]):
        x = datax[i]
        y = datay[i]
        params = simple_regression(y,x)
        paramList.append(params)
    paramList = np.array(paramList)
    return paramList

In [None]:
all_period_cross_simple_regression(test_data,test_data2)

多期多元回归

In [None]:
def all_period_cross_multi_regression(datay,*args):
    paramList = []
    x_num = len(args)
    for i in range(datay.shape[0]):
        x = []
        for datax in args:
            x.append(datax[i])
        x = np.vstack(x)    
        y = datay[i]
        params = simple_regression(y,x)
        paramList.append(params)
    paramList = np.array(paramList)
    return paramList

In [None]:
%%timeit
all_period_cross_multi_regression(test_data,test_data2,test_data3)

In [None]:
time_analyse('all_period_cross_multi_regression(test_data,test_data2,test_data3)')

回归需要得到哪些数据 
- 每个截面回归的系数
- 系数的T值和P值
- 回归的T值
- 每个股票的残差
- 残差平方和
- 残差波动率-残差在时间序列上的操作

残差数据获取：要求一一对应 如果调用ols包的话会失去nan 

In [None]:
def simple_regression_resid(datay,datax):
    # 单元回归 一期
    x = datax
    y = datay
    
    not_nan_flag = ~(np.isnan(x*y))
    not_nan_index = np.arange(len(not_nan_flag))[not_nan_flag]
    
    new_x = x[not_nan_flag]
    new_y = y[not_nan_flag] 
    X = sm.add_constant(new_x.T)
    model = sm.OLS(new_y, X)
    results = model.fit()

    resid_result = np.zeros_like(y)
    resid_result[:] = np.nan
    resid_result[not_nan_index] = results.resid
    
    return resid_result

In [None]:
%%timeit
simple_regression_resid(test_data[0],test_data2[0])

In [None]:
def multi_regression_resid(datay,*args):
    # 进行多元回归 一期
    x = []
    for datax in args:
        x.append(datax)
    x = np.vstack(x)
    y = datay
    not_nan_flag = ~np.any(np.isnan(x*y),axis=0) 
    not_nan_index = np.arange(len(not_nan_flag))[not_nan_flag]
    
    x = x.T
    new_x = x[not_nan_flag]
    new_y = y[not_nan_flag] 
    X = sm.add_constant(new_x)
    model = sm.OLS(new_y, X)
    results = model.fit()

    resid_result = np.zeros_like(y)
    resid_result[:] = np.nan
    resid_result[not_nan_index] = results.resid
    return resid_result # 参数为常数开始 其余的对应datax的顺序

In [None]:
%%timeit
multi_regression_resid(test_data[0],test_data2[0],test_data2[1],test_data2[2],test_data2[3])


进行多期的残差获取

In [None]:
def all_period_simple_regression_resid(datay,datax):
    # 单元多期的残差获取
    resid_mat = np.zeros_like(datay)
    resid_mat[:] = np.nan
    
    for i in range(datay.shape[0]):
        x = datax[i]
        y = datay[i]
        
        not_nan_flag = ~(np.isnan(x*y))
        not_nan_index = np.arange(len(not_nan_flag))[not_nan_flag]
        
        new_x = x[not_nan_flag]
        new_y = y[not_nan_flag] 
        X = sm.add_constant(new_x.T)
        model = sm.OLS(new_y, X)
        results = model.fit()

        resid_result = np.zeros_like(y)
        resid_result[:] = np.nan
        resid_result[not_nan_index] = results.resid
        resid_mat[i] = resid_result
    return resid_mat

In [None]:
%%time 
resid_mat = all_period_simple_regression_resid(test_data,test_data2)

In [36]:
def all_period_multi_regression_resid(datay,*args):
    # 单元多期的残差获取
    resid_mat = np.zeros_like(datay)
    resid_mat[:] = np.nan
    
    for i in range(datay.shape[0]):        
        x = []
        for datax in args:
            x.append(datax[i])
        x = np.vstack(x)
        y = datay[i]
        not_nan_flag = ~np.any(np.isnan(x*y),axis=0) 
        not_nan_index = np.arange(len(not_nan_flag))[not_nan_flag]

        x = x.T
        new_x = x[not_nan_flag]
        new_y = y[not_nan_flag] 
        X = sm.add_constant(new_x)
        model = sm.OLS(new_y, X)
        results = model.fit()

        resid_result = np.zeros_like(y)
        resid_result[:] = np.nan
        resid_result[not_nan_index] = results.resid
        resid_mat[i] = resid_result
    return resid_mat

In [37]:
%%time
all_period_multi_regression_resid(test_data,test_data3,test_data2)

Wall time: 2.57 s


array([[        nan,         nan,         nan, ...,         nan,
        -0.42975754,  0.46613935],
       [        nan, -0.39590223,         nan, ..., -0.10480716,
         0.21333906,         nan],
       [-0.08225961,         nan, -0.42827803, ...,         nan,
        -0.30668546,         nan],
       ...,
       [        nan,  0.13247994,  0.13663867, ...,         nan,
         0.37424602,  0.44331813],
       [        nan,         nan,         nan, ...,         nan,
        -0.30719103,  0.20043683],
       [        nan,         nan,         nan, ...,  0.1511441 ,
        -0.05252606,  0.40100684]])

## 相关系数的解决方案

- 多期的相关系数
- 单期的相关系数

单期相关系数

In [None]:
def corr_vet_1(vet1,vet2):
    # 计算两个向量的相关性 如果存在nan值怎么处理 两个向量的nan值所在的位置不同
    # 取两个vect都不为nan的值
    notNullIndex = ~(np.isnan(vet1*vet2))
    new_vet1 = vet1[notNullIndex]
    new_vet2 = vet2[notNullIndex]
    corr = np.corrcoef(new_vet1,new_vet2)[0,1]
    return corr

def corr_vet_2(vet1,vet2):
    # 计算两个向量的相关性 如果存在nan值怎么处理 两个向量的nan值所在的位置不同
    # 取两个vect都不为nan的值
    corr = pd.Series(vet1).corr(pd.Series(vet2))
    return corr

In [None]:
%%timeit
corr_vet_2(test_data[0],test_data2[0])

In [None]:
%%timeit
corr_vet_1(test_data[0],test_data2[0])

多期相关系数

In [None]:
def corr_mat_1(mat1,mat2):
    # 计算两个矩阵 对应的截面的相关性
    corrs = []
    for i in range(len(mat1)):
        corrs.append(corr_vet_1(mat1[i],mat2[i]))
    corrs = np.array(corrs)
    return corrs

def corr_mat_2(mat1,mat2):
    # 计算两个矩阵 对应的截面的相关性
    new_mat1 = np.copy(mat1)
    new_mat2 = np.copy(mat2)
    nan_flag = np.isnan(mat1*mat2)
    new_mat1[nan_flag] = np.nan
    new_mat2[nan_flag] = np.nan
    
    prod_mean = bk.nanmean(new_mat1*new_mat2,axis=1) - bk.nanmean(new_mat1,axis=1)*np.nanmean(new_mat2,axis=1)
    prod_std = bk.nanstd(new_mat1,axis=1)* bk.nanstd(new_mat2,axis=1)
    
    corrs = prod_mean/prod_std
    return corrs

def corr_mat_3(mat1,mat2):
    # 计算两个矩阵 对应的截面的相关性
    new_mat1 = np.copy(mat1)
    new_mat2 = np.copy(mat2)
    nan_flag = np.isnan(mat1*mat2)
    new_mat1[nan_flag] = np.nan
    new_mat2[nan_flag] = np.nan
    
    prod_mean = np.nanmean(new_mat1*new_mat2,axis=1) - np.nanmean(new_mat1,axis=1)*np.nanmean(new_mat2,axis=1)
    prod_std = np.nanstd(new_mat1,axis=1)* np.nanstd(new_mat2,axis=1)
    
    corrs = prod_mean/prod_std
    return corrs

# 最终竟然是使用循环的速度最快
# bk 确实比 np 要快一点
# 考虑使用其他的加速包进行加速

In [None]:
res2 = corr_mat_2(test_data2,test_data)
res1 = corr_mat_1(test_data2,test_data)

In [None]:
cProfile.run('res2 = corr_mat_3(test_data2,test_data)', 'test.out')
p = pstats.Stats('test.out')
p.sort_stats('tottime').print_stats(10)

In [None]:
cProfile.run('res = corr_mat_2(test_data2,test_data)', 'test2.out')
p = pstats.Stats('test2.out')
p.sort_stats('tottime').print_stats(10)

In [None]:
test_data

In [None]:
low = 0
high = 0.1
rankAlpha = test_data
flag = (rankAlpha > low) & (rankAlpha < high)
flag_copy = flag*1.0
flag_copy[~flag]=np.nan
position = flag_copy
position = (position.T/np.nansum(position,axis=1)).T

### 进行去极值和归一化的操作

In [31]:
def filter_extreme_MAD(data,n): #MAD: 中位数去极值 
    median = bk.nanmedian(data,axis = 1)
    new_median = bk.nanmedian((np.abs(data.T - median).T),axis = 1)
    max_range = median + n*new_median
    min_range = median - n*new_median
    return np.clip(data.T,min_range,max_range).T

In [32]:
time_analyse('data = filter_extreme_MAD(test_data,3)')

Wed Dec 25 15:33:53 2019    test.out

         10 function calls in 0.434 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        2    0.252    0.126    0.252    0.126 {built-in method bottleneck.reduce.nanmedian}
        1    0.120    0.120    0.120    0.120 {method 'clip' of 'numpy.ndarray' objects}
        1    0.062    0.062    0.434    0.434 <ipython-input-31-50c2a995abe3>:1(filter_extreme_MAD)
        1    0.000    0.000    0.434    0.434 {built-in method builtins.exec}
        1    0.000    0.000    0.120    0.120 D:\bma\anaconda\lib\site-packages\numpy\core\fromnumeric.py:54(_wrapfunc)
        1    0.000    0.000    0.120    0.120 D:\bma\anaconda\lib\site-packages\numpy\core\fromnumeric.py:1903(clip)
        1    0.000    0.000    0.434    0.434 <string>:1(<module>)
        1    0.000    0.000    0.000    0.000 {built-in method builtins.getattr}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_ls

TypeError: unsupported format string passed to dict.__format__