In [None]:
#可视化波动率特征
from random import gauss
from datetime import datetime
import tushare as ts
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Rectangle
import matplotlib.dates as md

# 获取沪深300的收盘价数据并计算收益率
hs300 = ts.get_k_data('hs300', '2000-01-01').set_index('date').close
hs300.index = pd.to_datetime(hs300.index)
hs300_return = hs300.pct_change().dropna()   # 计算日收益率
hs300_return.name = 'hs300_return'

# 定义作图辅助函数
def date2num(date: str):
    """
    将日期字符串转化为matplotlib可用的数值
    """
    return md.date2num(datetime.strptime(date,'%Y-%m-%d'))

def add_marks(time_periods, ax):
    """
    在ax子图上为time_period传入的时间范围添加背景颜色
    param time_periods: 需要添加标记的时间范围列表，列表元素为(start, end)组成的元组
    param ax: 需要添加标记的子图
    """
    bottom, top = ax.get_ylim()
    height = top - bottom
    for start_date, end_date in time_periods:
        start = date2num(start_date)
        end = date2num(end_date)
        width = end - start
        rect = Rectangle((start, bottom), width, height, color='y', fill=True, alpha=0.7)
        ax.add_patch(rect)

# 对价格和日收益率作图
f, (ax0, ax1) = plt.subplots(2, 1, gridspec_kw = {'height_ratios':[5,2.5]}, sharex=True)
f.set_size_inches(14, 8)

ax0.plot(hs300)
ax0.set_title('Price')
ax1.plot(hs300_return, linewidth=0.5)
ax1.set_title('Daily_return')

periods = [('2007-10-16', '2008-10-28'), ('2015-06-12', '2016-01-27'), ('2018-01-29', '2018-10-31')]

# 在图上添加高亮区域
add_marks(periods, ax0)
add_marks(periods, ax1)
plt.show()

#模拟GARCH过程
# 参数设置
a0 = 0.06
a1 = 0.1
b1 = 0.8
sigma0 = np.sqrt(a0/(1-a1-b1))
sigma0

# 定义函数模拟GARCH过程
def GARCH(a0, a1, b1, sigma0, T):
    """
    模拟一个T步的GARCH过程
    params a0, a1, b1: 对应GARCH(1,1)过程中的（omega, alpha, beta)
    params sigma0: 初始波动率
    params T: 模拟的总步数
    return: 返回模拟的波动率路径
    """
    sigma = [sigma0, ]
    for i in range(1, T+1):
        e = gauss(0, 1)
        sigma_next = np.sqrt(a0 + a1*(e*sigma[-1])**2 + b1*sigma[-1]**2)
        sigma.append(sigma_next)
    return np.array(sigma[1:])

GARCH(a0, a1, b1, sigma0, 100)

# 模拟一个GARCH
sigma = GARCH(a0, a1, b1, sigma0, 2000)[200:]

# 图形输出模拟波动率
plt.figure(figsize=(16,4))
fig = plt.plot(sigma, linewidth=1)
plt.title('Simulated Volatility')
plt.show()

#用arch包拟合GARCH(1, 1)模型
from arch.univariate import arch_model

# 将收益率数据分列为训练数据和测试数据两部分
train = hs300_return[:'2017'].dropna()
test = hs300_return['2017':].dropna()
train.tail()

# 构造并拟合一个GARCH(1,1)模型
garch = arch_model(train*100, mean='zero', p=1, o=0, q=1)  # 收益乘以100，避免数字太小造成的模型不稳定
res = garch.fit(disp='off')                              
print(res.summary())

# 从拟合结果中提取条件波动率
con_vol = res.conditional_volatility

# 比较条件波动率和真实收益率数据的图像
fig, (ax1, ax2) = plt.subplots(2,1)
fig.set_size_inches(16,8)
ax1.plot(con_vol['2014':])
ax1.set_title('Conditional Volatility')
ax2.plot(train['2014':], lw=1)
ax2.set_title('Hs300 Daily Return')
plt.show()

# 使用拟合的模型进行样本内预测
forecasts = res.forecast(horizon=1, start=train.index[0])                 # horizon是预测的步数
train_df = pd.concat([forecasts.mean, forecasts.variance**0.5], axis=1)   # 提取模型预测的均值和方差
train_df.columns =['mu', 'sigma']
train_df.tail()

# 根据预测的均值和方差计算预测收益率 𝑟𝑡=𝜇𝑡+𝜎𝑡𝑧𝑡
train_df['z'] = np.random.normal(0, 1, len(data))
train_df['r_hat'] = (data.mu + data.sigma*data.z)/100     # 需要除以100， 因为模型拟合过程中使用的是daily_return*100
train_df.head()

# 样本内预测值和真实值比较
train_df.r_hat.plot(figsize=(16,5), c='b', alpha=0.7, lw=1, legend=True)
hs300_return[:'2017'].plot(alpha=0.7, c='y', legend=True)
plt.show()

#使用拟合的模型预测波动率
# 在测试数据上使用拟合好的模型
test_df = pd.DataFrame(test)
test_df['volatility_pre'] = (test_df.hs300_return*100).rolling(60).std().shift(1)   # 前一天的滚动波动率
test_df.dropna(inplace=True)
test_df.head()

# 拟合模型得到的参数
res.params

# 从拟合模型中提取参数
omega, alpha, beta = res.params
f'mu：0, omega: {omega:.4f}, alpha: {alpha:4f}, beta: {beta:.4f}'

# 使用拟合的参数和之前定义的模拟函数预测下一个波动率
GARCH(omega, alpha, beta, 0.54, 1)

# 根据前一天的滚动波动率计算下一天的预测波动率
test_df['volatility_est'] = test_df.volatility_pre.map(lambda v: GARCH(omega, alpha, beta, v, 1)[-1])
# 根据预测波动率计算预测收益率
test_df['return_est'] = (mu + test_df.volatility_est * np.random.normal(0,1, len(test_df)))/100
test_df.head()

# 比较样本外预测结果与真实的收益率
hs300_return['2013':].plot(figsize=(16,6), c='b', alpha=0.7, lw=1, legend=True)
train_df['2013':].r_hat.plot(c='y', alpha=0.8, lw=1, legend=True)
test_df['2018'].return_est.plot(c='aqua', alpha=0.5, legend=True)
plt.show()