# calculate CVaR

In [1]:
import pandas as pd
import numpy as np
import numba as nb
from tqdm import notebook

dataset_name = 'total'
# 创建一个迭代器，按照 chunksize 逐块读取数据
panel = pd.read_csv(f'{dataset_name}_panel.csv', parse_dates=['trade_date'], index_col=['code','trade_date'])

In [2]:
# 回归参数
X = ['skew', 'kurt', 'turnoverrate', 'vol',  'Institutional_holding', 'Analyst_AVGRate'] # 用于回归确定Wx的自变量
Y = ['CVaR'] # 用于回归确定Wx的因变量
# 将坐标捆在一起 
Y_and_X = Y.copy()
Y_and_X.extend(X)

In [3]:
# 生成Wx
from scipy.stats import norm
# 定义岭回归函数

alpha = 0.7 # 置信水平
Z_alpha = norm.ppf(alpha)
@nb.jit(nopython=True)
def CVaR_algorithm(series, lam=0.96, Z_alpha=Z_alpha):
    mean = np.mean(series)
    lam_array = np.array([lam] * series.shape[0]).cumprod()[::-1]
    sigma = np.sqrt((1-lam)*np.sum((lam_array * (series - mean) ** 2))) # 指数加权法求方差
    VaR = - (mean - sigma * Z_alpha)
    CVaR_values = np.absolute(series[series <= - VaR])
    if len(CVaR_values) == 0:
        CVaR = 0
    else:
        CVaR = np.mean(CVaR_values)
    return CVaR
    
# 自变量对CVaR回归生成Wx
CVaR = panel.ret.dropna().groupby('code').rolling(66, method='single').apply(CVaR_algorithm, raw=True, engine='numba')
CVaR = CVaR.reset_index(level=0, drop=True)
CVaR.name ='CVaR' 
# panel = panel.drop(columns='CVaR')
panel = panel.join(CVaR)

In [4]:
# 数据处理

# 选出回归用到的数据
data = panel[Y_and_X].copy()
# 自变量滞后一阶
data = data[Y].join(data[X].groupby('code').shift(1))
# 剔除na值
data = data.dropna()

df = data.copy()

## 法一：用岭回归生成Wx 

In [None]:
# 生成Wx

# 定义岭回归函数
# @nb.jit(nopython=True)
def Righe(df, lam=0.1):
    Y = df[:, 0]
    X = df[:, 1:]
    for i in range(X.shape[0]): # 标准化
        X_mean = X[:,i].mean()
        X_std = X[:,i].std()
        X[:,i] = (X[:,i] - X_mean)/X_std()
    XTX = X.T @ X
    beta = np.linalg.inv(XTX + lam * np.identity(XTX.shape[1])) @ X.T @ Y
    return Y[-1], beta[0], beta[1], beta[2], beta[3], beta[4], beta[5]
    
# 自变量对CVaR回归生成Wx
Wx = data[Y_and_X].groupby('code').rolling(66, method='table').apply(Righe, raw=True, engine='numba')
Wx = Wx.reset_index(level=0, drop=True)

In [None]:
# 保存Wx

Wx.to_csv(f'{dataset_name}_Wx.csv')

## 法二：OLS生成Wx

In [17]:
import pandas as pd
import numpy as np
from tqdm import notebook

dataset_name = 'total'
# 创建一个迭代器，按照 chunksize 逐块读取数据
panel = pd.read_csv(f'{dataset_name}_panel.csv', parse_dates=['trade_date'], index_col=['code','trade_date'])

In [2]:
dataset_name = 'total'
chunksize=670  # 2020年12月1日到2023年8月31日的交易日

# 创建一个迭代器，按照 chunksize 逐块读取数据
data_iterator = pd.read_csv(f'{dataset_name}_panel.csv', chunksize=chunksize, parse_dates=['trade_date'], index_col=['code','trade_date'])
CVaR_list = []
# 遍历每个块
for chunk in notebook.tqdm(data_iterator):
    chunk_CVaR = chunk.ret.rolling(10).apply(lambda x: ES_Daily(x, alpha))
CVaR = pd.concat(CVaR_list, axis=0)

In [6]:
def ES_Daily(a,x):
    VaR=np.percentile(a,x*100)
    ES=a[a>=VaR].mean()
    return ES
    
# 置信水平
alpha = 0.95
CVaR = panel.ret.groupby('code').rolling(125, axis=0).apply(lambda x: ES_Daily(x, alpha))
CVaR = CVaR.reset_index(level=0, drop=True)
CVaR.name = 'CVaR'
panel = panel.join(CVaR)
panel.to_csv(f'{dataset_name}_panel.csv')

#### 回归

In [30]:
import pandas as pd
from sklearn import linear_model
from sklearn.preprocessing import StandardScaler

from tqdm import notebook
import gc

dataset_name = 'total'
chunksize=729  # 2020年9月1日到2023年8月31日的交易日 CVaR是半年收益率计算并从2021年3月10日开始的

X = ['skew', 'kurt', 'turnoverrate', 'vol',  'Institutional_holding', 'Analyst_AVGRate'] # 用于回归确定Wx的自变量
Y = ['CVaR'] # 用于回归确定Wx的因变量
# # 设置可调参数delta，用于稳健回归模型的delta参数
# delta = 1.0

# 创建一个迭代器，按照 chunksize 逐块读取数据
data_iterator = pd.read_csv(f'{dataset_name}_panel.csv', chunksize=chunksize, parse_dates=['trade_date'], index_col=['code','trade_date'])

code_name = panel.index.get_level_values(level=0).unique()
code_series = pd.Series(data=code_name, index=code_name)

    
# 遍历每个块
for chunk in notebook.tqdm(data_iterator):
    if chunk.index.get_level_values(level=0).unique()[0] in code_series['603863.SH':]:
    
        chunk.sort_index(ascending=True, inplace=True)
        # 自变量的滞后一阶
        chunk[X] = chunk[X].shift(1)
        # 使用 drop 方法删除第一行
        chunk = chunk.drop(chunk.index[0])
    
        # 每个 chunk 是一个 DataFrame，包含 chunksize 行的数据
        code = chunk.index.get_level_values(0).unique().to_list()[0] # 将索引转化为字符串
        time_list = chunk.index.get_level_values(1).unique().to_list()
    
        Wx_one_list = []
        X_mean_list = []
        X_std_list = []
            
        for t in range(len(time_list)-125-125): # 多减去的125是计算CVaR的周期
            # print(f'name of code{code} \n name of time{time_list[t+249]}')
            chunk_part = chunk.loc[(code, slice(time_list[t+125], time_list[t+125+125])), :]
            X_df = chunk_part[X]
            Y_df = chunk_part[Y]
            # 标准化输入特征
            scaler = StandardScaler()
            X_df_fit = scaler.fit(X_df)
            X_df = scaler.transform(X_df)
            
            #保存标准化参数
            X_mean = X_df_fit.mean_
            X_std = X_df_fit.scale_
            X_mean = pd.Series(data=X_mean, index=X, name=time_list[t+125+125])
            X_std = pd.Series(data=X_std, index=X, name=time_list[t+125+125])
            X_mean = pd.DataFrame(X_mean)
            X_std = pd.DataFrame(X_std)
            
            # 岭回归模型
            regr = linear_model.Ridge(alpha=0.01, fit_intercept=False, random_state=0)
            regr.fit(X_df, Y_df)
            Wx = regr.coef_
            Wx = pd.Series(data=Wx[0], index=X, name=time_list[t+125+125])
            Wx = pd.DataFrame(Wx)
            
            #保存数据到list
            Wx_one_list.append(Wx.T)
            X_mean_list.append(X_mean.T)
            X_std_list.append(X_std.T)
    
        # 将数据写入硬盘
        for _name, _list in [('Wx',Wx_one_list),('X_mean',X_mean_list),('X_std',X_std_list)]:
            _df = pd.concat(_list)
            _df['trade_date'] = _df.index
            _df['code'] = code
            _df.set_index(['code','trade_date'], inplace=True)
            _df.to_csv(f'{dataset_name}_{_name}/{_name}{code}.csv')
            del _df  # 释放内存
        del chunk
        
        # 保存处理后的批次数据
        Wx_one_df.to_csv(f'{dataset_name}_Wx/Wx{code}.csv')
        del Wx_one_df  # 释放内存
        del chunk
        gc.collect()  # 手动触发垃圾回收

0it [00:00, ?it/s]

### 整合数据

In [31]:
# 将Wx整合为面板
import os
path = 'total_Wx/'
filesnames = os.listdir(path)
filesnames = [f for f in filesnames if f.lower().endswith(".csv")]
df_list = []
for filename in filesnames:
    df_list.append(pd.read_csv(path + filename))

df = pd.concat(df_list)
# 保存数据
df.set_index(['code','trade_date'], inplace=True)
df.sort_index(ascending=True, inplace=True)
df.to_csv('total_Wx.csv')