In [1]:
import pandas as pd
import os
from arch import arch_model
import matplotlib
import matplotlib.pyplot as plt
import warnings
import seaborn as sns

# 忽略 FutureWarnings
warnings.simplefilter(action='ignore', category=FutureWarning)
# 设置matplotlib支持中文显示
matplotlib.rcParams['font.family'] = 'SimHei'  # 设置字体为黑体
matplotlib.rcParams['axes.unicode_minus'] = False  # 正确显示负号

# 定义文件夹路径
info_folder = 'StockData/market_info'
data_folder = 'StockData/market_data'

# 创建字典存储股票相关信息数据
market_info = {}
# 读取 market_info 中的文件
for year in range(2014, 2025):
    file_name = f'hs300stocks_{year}.csv'  # 文件名格式
    file_path = os.path.join(info_folder, file_name)
    if os.path.isfile(file_path):  # 检查文件是否存在
        data = pd.read_csv(file_path)  # 默认读取，自动检测标题
        market_info[f'year_{year}'] = data  # 存储数据到字典中
    else:
        print(f'Warning: File {file_path} does not exist.')

# 创建字典存储股票具体数据
market_data = {}
# 读取 market_data 中的文件
for year in range(2014, 2025):
    file_name = f'hs300stocks_kdata_{year}.csv'  # 文件名格式
    file_path = os.path.join(data_folder, file_name)
    if os.path.isfile(file_path):  # 检查文件是否存在
        data = pd.read_csv(file_path)  # 默认读取，自动检测标题
        market_data[f'year_{year}'] = data  # 存储数据到字典中
    else:
        print(f'Warning: File {file_path} does not exist.')

# 合并所有年份的市场数据为一个数据框
combined_market_data = pd.concat(market_data.values(), ignore_index=True)

In [16]:
import numpy as np
from arch import arch_model
from scipy.stats import norm

# 计算收益率
combined_market_data['time'] = pd.to_datetime(combined_market_data['time'], format="%Y-%m-%d %H:%M:%S", errors='coerce')
combined_market_data = combined_market_data.sort_values(by='time')

# 获取每只股票的每日收益率
combined_market_data['return'] = combined_market_data.groupby('code')['close'].pct_change()

# 定义计算最大回撤函数
def max_drawdown(prices):
    cum_returns = (1 + prices).cumprod()
    peak = cum_returns.cummax()
    drawdown = (cum_returns - peak) / peak
    return drawdown.min()

# 计算风险指标
def calculate_risk_metrics(data, window=252):
    results = {}
    
    # 计算每日收益率的波动率（基于EGARCH模型）
    for code, group in data.groupby('code'):
        group = group.dropna(subset=['return'])
        
        # 拟合EGARCH模型
        model = arch_model(group['return']*100, vol='EGARCH', p=1, q=1)
        model_fit = model.fit(disp="off",)
        
        # 提取波动率（即EGARCH模型的条件波动率）
        vol = model_fit.conditional_volatility
        
        # 在险价值 (CoVaR) 和边际期望损失 (MES) - 使用95%的置信区间
        VaR_95 = np.percentile(group['return'], 5)
        CoVaR = VaR_95 * vol  # 修正：直接使用最后一个波动率值
        
        # 系统性风险指数 (SRISK) - 通过最大回撤来衡量
        drawdown = max_drawdown(group['return'])
        SRISK = drawdown * 100  # 百分比表示
        
        # 成分期望损失 (CES) - 计算整体的平均波动性影响
        CES = np.mean(group['return'] * vol)  # 修改：使用整个波动率序列
        
        # 夏普比率 (Sharpe Ratio) - 假设无风险利率为0
        sharpe_ratio = np.mean(group['return']) / np.std(group['return'])
        
        # 存储指标
        results[code] = {
            'VaR_95': VaR_95,
            'CoVaR': CoVaR,
            'SRISK': SRISK,
            'CES': CES,
            'Sharpe_Ratio': sharpe_ratio,
            'Max_Drawdown': drawdown
        }
    
    return pd.DataFrame(results).T

# 计算市场风险指标
risk_metrics = calculate_risk_metrics(combined_market_data)

# 展示前几行
print(risk_metrics)

Iteration limit reached
See scipy.optimize.fmin_slsqp for code meaning.

Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.

Iteration limit reached
See scipy.optimize.fmin_slsqp for code meaning.

Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.

Iteration limit reached
See scipy.optimize.fmin_slsqp for code meaning.

Iteration limit reached
See scipy.optimize.fmin_slsqp for code meaning.

Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.

Iteration limit reached
See scipy.optimize.fmin_slsqp for code meaning.



               VaR_95                                              CoVaR  \
sse.600000  -0.020377  24673    -0.028020
24674    -0.030939
24675   ...   
sse.600004  -0.032385  375611   -0.076320
375612   -0.075476
375613  ...   
sse.600005  -0.041262  165269   -0.182044
165270   -0.155665
165271  ...   
sse.600008  -0.042442  24917    -0.095917
24918    -0.092737
24919   ...   
sse.600009  -0.031147  25162    -0.051051
25163    -0.054529
25164   ...   
...               ...                                                ...   
szse.300919 -0.033924  666574   -0.079680
666575   -0.078741
666576  ...   
szse.300957 -0.037128  666816   -0.092929
666817   -0.092014
666818  ...   
szse.300979  -0.03172  667058   -0.074921
667059   -0.070570
667060  ...   
szse.300999 -0.025182  593516   -0.046387
593517   -0.057136
593518  ...   
szse.301269 -0.041785  732798   -0.107200
732799   -0.105732
732800  ...   

                 SRISK       CES Sharpe_Ratio Max_Drawdown  
sse.600000   -41.21068  0.

In [15]:
import statsmodels.api as sm
import seaborn as sns
import matplotlib.pyplot as plt

# 确保risk_metrics中的数据为数值型
risk_metrics = risk_metrics.apply(pd.to_numeric, errors='coerce')

# 删除任何包含NaN的行（如果存在）
risk_metrics = risk_metrics.dropna()

# 使用标准化后的数据
X = risk_metrics.drop('SRISK', axis=1)  # 将SRISK作为因变量
y = risk_metrics['SRISK']

# 确保X和y的数据类型是数值型
X = X.apply(pd.to_numeric, errors='coerce')
y = pd.to_numeric(y, errors='coerce')

# 删除任何包含NaN的行（如果存在）
X = X.dropna()
y = y.dropna()

# 添加常数项（截距）
X = sm.add_constant(X)

# 拟合回归模型
model = sm.OLS(y, X).fit()

# 输出回归结果
print(model.summary())

# 提取回归系数
coefficients = model.params
print("\n回归系数:")
print(coefficients)

# 可视化回归系数（敏感性分析）
plt.figure(figsize=(10, 6))
sns.barplot(x=coefficients.index, y=coefficients.values, palette="viridis")
plt.title('回归系数 (敏感性分析)')
plt.xlabel('风险指标')
plt.ylabel('回归系数')
plt.xticks(rotation=45)
plt.show()

ValueError: zero-size array to reduction operation maximum which has no identity