## 非对称性和尾部因子

### 数据准备

In [10]:
import os

file_path = '/mnt/datadisk2/aglv/foraglv/DataDaily.cpython-38-x86_64-linux-gnu.so'
print(os.path.exists(file_path))

True


In [1]:
'''数据处理'''

import pandas as pd
import os
import sys
from sklearn.model_selection import train_test_split

# sys.path.append('../foraglv')
# sys.path.append('/mnt/datadisk2/aglv/foraglv')
# sys.path.append('/mnt/datadisk2/aglv/foraglv/DataDaily.cpython-38-x86_64-linux-gnu.so')

from DataDaily import DataDaily

# import the raw data
data_daily = DataDaily()

def adj_data(data_list, data=data_daily):
    '''drop the data not in the universe'''
    u = data.universe_all
    u = list(set(u).intersection(set(data_list.columns)))
    return data_list.loc[:, u]

close = adj_data(data_daily.close) # get the exact value of close prince

def price_to_yeild(p):
    '''from the price data to the yeild'''
    p_yeild = p.pct_change()
    p_yeild.iloc[0] = 0

    return p_yeild

# choose a specific stock to train
stock = 'SH600519'
s_close = close[stock]
nan_sum = s_close.isna().sum()
print(f'the stock has {nan_sum} lost value')

s_yeild = price_to_yeild(s_close)

if s_yeild.isna().sum() != 0:
    raise ValueError(f'there is nan value in stock {stock}')

X_train, X_test, y_train, y_test = train_test_split(
    s_close, s_yeild,
    train_size=0.8,
    shuffle=False, 
)

df = pd.DataFrame({'x':X_train, 'y':y_train})
df.to_csv('data.csv')

X_train = X_train.to_numpy().reshape(-1, 1)
y_train = y_train.values

# print(isinstance(y_train, pd.Series))

class my_data:
    def __init__(self, p_yeild) -> None:
        self.p_yeild = p_yeild

# if __name__ == '__main__':
print(X_train)
print(y_train)

load ./data_daily.h5...
the stock has 0 lost value
[[ 527.77000422]
 [ 540.06000563]
 [ 545.53999437]
 ...
 [1762.        ]
 [1766.        ]
 [1750.91999634]]
[ 0.          0.02328666  0.010147   ...  0.00685714  0.00227015
 -0.00853907]


### Utils

In [3]:
'''
function:[]
'''

import pandas as pd
import numpy as np
import scipy.integrate as integrate
import scipy.stats as states

class Asymmetric:
    '''
    蕴含计算收益率序列非对称的一些方法 Parameter:
    X:pd.Series 收益率数据
    Methods:
    skewness;e_phi;s_phi;asym_p;cVaR
    '''

    def __init__(self, X) -> None:
        self.X = pd.Series(X)

    def skewness(self):
        '''self.X序列的偏度: 标准化后的三阶中心矩'''
        z_socre = (self.X - self.X.mean()) / (self.X.std() + 1e-8)
        return (z_socre ** 3).mean()

    def _gauss_kernel(self, x):
        '''Gauss核函数 标准正态分布的密度函数'''
        return 1 / np.power(2*np.pi, 1/2) * np.exp(-1/2*np.power(x, 2))

    def kernel_density(self, y, kernel=_gauss_kernel):
        '''
        核密度估计法,基于数据y的分布密度函数 Parameters:  
        y:计算区间内的的超额收益,真实统计值
        -> function(float)
        '''
        h = 1.06 * y.std() * np.power(len(y), -1/5) # 基于Silverman的经验法则的带宽估计

        def f(x):
            new_series = self._gauss_kernel(1/h * (y - x))
            return 1/(len(y)*h) * new_series.sum()

        return f

    def e_phi(self, k=1):
        '''
        从异尾概率出发，从左右概率作差体现分别的非对称性  Parameters:
        self.X : 收益率序列
        k: 左右尾的阈值
        '''
        k = self.X.quantile(0.9)
        density_func = self.kernel_density(self.X)
        left_inte, _ = integrate.quad(density_func, -np.inf, -k)
        right_inte, _ = integrate.quad(density_func, k, np.inf)

        return right_inte - left_inte

    def s_phi(self, k=1):
        '''
        相较于e_phi改进
        '''
        k = self.X.quantile(0.9)
        sign = np.sign(self.e_phi(k))

        def diff_func(f_1, f_2):
            '''积分辅助函数'''
            def f(x):
                return np.power(np.power(f_1(x), 1/2) - np.power(f_2(x), 1/2), 2)
            return f

        f_1 = self.kernel_density(self.X)
        f_2 = self.kernel_density(-self.X + 2*self.X.mean())
        diff_f = diff_func(f_1, f_2)

        return sign * 1/2 * (integrate.quad(diff_f, -np.inf, -k)[0] + integrate.quad(diff_f, k, np.inf)[0])

    def asym_p(self):
        '''Asym_p因子 反映非对称性'''
        density_func = self.kernel_density(self.X)
        density_series = pd.Series([density_func(self.X.values[i]) for i in range(len(self.X))], index=self.X.index) # 收益率序列的密度函数 f(r)
        acc_series = pd.Series([integrate.quad(density_func, -np.inf, self.X.iloc[i])[0] for i in range(len(self.X))], index=self.X.index)

        return - density_series.corr(acc_series) if density_series.std() > 0 else 0
    
    def cVaR(self,c_level = 0.95):
        '''
        CVaR因子,计算给定置信水平下的超出部分的均值收益/损失,置信水平默认0.95
        '''
        right_VaR = self.X.quantile(c_level)
        left_VaR = self.X.quantile(1- c_level)

        X_low = pd.Series(self.X[self.X < left_VaR])
        X_high = pd.Series(self.X[self.X > right_VaR])

        low_mean = X_low.mean() if len(X_low) > 0 else 0
        high_mean = X_high.mean() if len(X_high) > 0 else 0

        return low_mean, high_mean
        

# if __name__ == '__main__':
#     X = pd.Series(np.exp(np.random.rand(100)))
#     asymetric = Asymmetric(X)
#     print(asymetric.X)
#     print(asymetric.skewness())
#     print(asymetric.e_phi())
#     print(asymetric.s_phi())
#     print(asymetric.asym_p())
#     print(asymetric.cVaR())

用正态分布序列尝试

In [4]:
X = pd.Series(np.random.randn(100))
asymetric = Asymmetric(X)
print(asymetric.X)
print(asymetric.skewness())
print(asymetric.e_phi())
print(asymetric.s_phi())
print(asymetric.asym_p())
print(asymetric.cVaR())

0     0.483959
1     0.449775
2     0.748157
3     1.451832
4    -0.660495
        ...   
95    0.214062
96   -0.335584
97   -0.239746
98   -0.928670
99   -0.604383
Length: 100, dtype: float64
-0.2291812549759374
0.002366379773287658
0.008154188091307274
0.0037901441748618516
(-2.058488130750803, 1.6717500210385352)


用非对称的数据测试

In [5]:
X = np.exp(2*np.random.rand(100) - 1) - 1 
X = pd.Series(X)

asymetric = Asymmetric(X)
print(asymetric.X)
print(asymetric.skewness())
print(asymetric.e_phi())
print(asymetric.s_phi())
print(asymetric.asym_p())
print(asymetric.cVaR())

0     1.599206
1    -0.588504
2    -0.186628
3    -0.632071
4     0.287276
        ...   
95    1.549212
96   -0.524226
97    0.220117
98   -0.061898
99   -0.576099
Length: 100, dtype: float64
0.7817076084178024
0.1107620310086124
0.04093269806597477
0.8825145003169632
(-0.6023481132370196, 1.5987306400699297)


贵州茅台数据

In [6]:
asymetric = Asymmetric(s_yeild)
print(asymetric.X)
print(asymetric.skewness())
print(asymetric.e_phi())
print(asymetric.s_phi())
print(asymetric.asym_p())
print(asymetric.cVaR())

date
20171009    0.000000
20171010    0.023287
20171011    0.010147
20171012   -0.002658
20171013    0.022165
              ...   
20240719    0.018771
20240722   -0.017449
20240723   -0.029353
20240724   -0.010296
20240725   -0.006618
Name: SH600519, Length: 1653, dtype: float64
0.16283287293993778
0.031688428985172296
0.0024799698479098372
0.1560422712437014
(-0.042386308330009455, 0.046370087578519535)


In [23]:
df = pd.DataFrame()
df['a'] = pd.Series([1, 3, 4, 2, 1])
df['b'] = pd.Series([5, 1, 4, 5, 2])

df['a'].iloc[0] = 9
print(df['a'])
# print(df)

# rolling = df.rolling(3).apply(lambda x:list(x), raw=False).values.tolist()
# # rolling = rolling.applymap(lambda x: x.values.to_list())
# print(rolling)

# rolling = []
# rolling_obj = df['a'].rolling(3)
# for i in rolling_obj:
#     if i.notna().all():
#         rolling.append(i)
# print(rolling)

0    9
1    3
2    4
3    2
4    1
Name: a, dtype: int64


### 主程序框架

以贵州茅台为例

In [40]:
data = s_yeild
print(data)
print(len(data))

date
20171009    0.000000
20171010    0.023287
20171011    0.010147
20171012   -0.002658
20171013    0.022165
              ...   
20240718   -0.002591
20240719    0.018771
20240722   -0.017449
20240723   -0.029353
20240724   -0.010296
Name: SH600519, Length: 1652, dtype: float64
1652


In [44]:
window = 60
def period_split(data, window = window):
    period = []
    num = len(data) // window
    for i in range(num + 1):
        if i < num:
            period.append(list(range(window*i, window*(i+1))))
        else:
            period.append(list(range(window*i, len(data))))
    
    return period

print(len(period_split(data)))
print(len(period_split(data)[0]))
print(len(period_split(data)[-1]))

28
60
32


In [45]:
periods = period_split(data)
s_period_split = [data.iloc[period] for period in periods]
print(s_period_split)

[date
20171009    0.000000
20171010    0.023287
20171011    0.010147
20171012   -0.002658
20171013    0.022165
20171016    0.008163
20171017   -0.007063
20171018    0.007167
20171019    0.039021
20171020   -0.014195
20171023   -0.001602
20171024   -0.010603
20171025   -0.002926
20171026    0.069687
20171027    0.073609
20171030   -0.042409
20171031   -0.006510
20171101    0.008058
20171102    0.006276
20171103    0.019540
20171106    0.021731
20171107   -0.016828
20171108    0.012943
20171109   -0.000477
20171110    0.042888
20171113    0.014647
20171114   -0.013273
20171115    0.013746
20171116    0.045096
20171117   -0.040133
20171120   -0.016081
20171121   -0.002798
20171122   -0.039468
20171123   -0.025810
20171124   -0.005823
20171127   -0.013888
20171128    0.043361
20171129   -0.015596
20171130   -0.011158
20171201   -0.013708
20171204    0.024809
20171205   -0.007573
20171206   -0.012560
20171207   -0.013312
20171208    0.017172
20171211    0.037798
20171212    0.000507
2017121

In [None]:
def calculate_by_term(data_periods):
    data_method = []
    data_yeild = pd.Series()
    for data_period in data_periods:
        obj = Asymetric(data_period)
        data_yeild.append(obj.X.sum())
        data_method.append(obj.method())
        # print(asymetric.X)
        # print(asymetric.skewness())
        # print(asymetric.e_phi())
        # print(asymetric.s_phi())
        # print(asymetric.asym_p())
        # print(asymetric.cVaR())

        

In [31]:
obj = Asymetric(s_yeild)
methods = [met for met in dir(Asymetric) if callable(getattr(obj, met)) and not met.startswith('__')]
print(methods)

['_gauss_kernel', 'asym_p', 'cVaR', 'e_phi', 'kernel_density', 's_phi', 'skewness']


In [28]:
methods = [met for met in methods if not met.startswith('_')]
print(methods)

['asym_p', 'cVaR', 'e_phi', 'kernel_density', 's_phi', 'skewness']


In [32]:
df = pd.DataFrame()
# for met in methods:
#     df[str(met)] = eval(df.met())
# print(df.columns)

df['skewness'] = obj.skewness()
df['s_phi'] = obj.s_phi()
df['e_phi'] = obj.e_phi()
df['cVaR_posi'], df['cVaR_negi'] = obj.cVaR()

print(df.corrwith(obj.X))


TypeError: can't multiply sequence by non-int of type 'numpy.float64'