In [73]:
import keras

建立改进的arch模型。

$ x_t = \epsilon_t * \sigma_t $ <br>
$ \sigma^2_t = RNN(x_{t-1},h_{t-1}) $ <br>

In [74]:
from keras.layers import LSTM,Input,Dropout,Dense,GRU,BatchNormalization,Add
from keras import Model
from keras import backend as K
from keras.layers import Layer

### 定义模型结构

In [114]:
from keras import backend as K
from keras import optimizers,initializers
from keras.callbacks import ModelCheckpoint

def MLE_loss(y_true, y_pred):
    epsilon = 1e-6
    y_pred = K.exp(y_pred)
    return K.mean( 0.5*K.log(y_pred+epsilon)+y_true*y_true/(2.0*y_pred+epsilon))

def build_model():    
    inputs = Input(shape = (None,1))

    x = LSTM(16,activation='tanh',return_sequences=True)(inputs)
    x = Dropout(0.5)(x)
    h = LSTM(16,activation='tanh',return_sequences=False)(x)

    y = Dense(1,use_bias=True,activation='tanh')(h)
    model = Model(input = inputs,output = y)

    sgd = optimizers.SGD(lr=0.001, clipvalue=0.05)
    model.compile(optimizer=sgd,loss=MLE_loss)
    return model

def build_new_model():
    inputs = Input(shape = (12,))
    var = Input(shape = (1,))
#     y = Dense(12,use_bias = True,activation='tanh')(inputs)
    h = Dense(1,use_bias = True,
              kernel_initializer=initializers.random_normal(stddev=0.001))(inputs)
    y = Add()([h,var])
    model = Model(input = [inputs,var],output = y)

    sgd = optimizers.SGD(lr=0.001, clipvalue=0.1)
    model.compile(optimizer=sgd,loss=MLE_loss)
    return model

### 读取数据

In [115]:
import pandas as pd
import numpy as np
data = open("/Users/Apple/Desktop/毕业的部分材料/fts-master/data/m-intc7308.txt",'r').readlines()
df = []
columns = []
for i,line in enumerate(data):
    tmp = [x for x in line.strip().split(' ') if x != '']
    if i == 0:
        columns = tmp
    else:
        df.append(tmp)
df = pd.DataFrame(df,columns=columns)

### 做标准化

In [116]:
df['rtn'] = df['rtn'].map(lambda x: float(x))
x = np.log(df['rtn'].values + 1)
std_x = x - np.mean(x)
print('数据的均值为:{:.4f}'.format(np.mean(x)))

数据的均值为:0.0139


### 生成样本

In [117]:
def generate_data(LENGTH):
    sample_num = len(std_x)
    x_data = []
    y_data = []
    x_var = []
    for i in range(sample_num-LENGTH):
        x_data.append(std_x[i:i+LENGTH])
        x_var.append(np.var(std_x[i:i+LENGTH]))
        y_data.append(std_x[i+LENGTH])
        
    #转换成array。
    x_data,y_data,x_var = np.array(x_data),np.array(y_data),np.array(x_var).reshape(-1,1)
    x_data = np.concatenate([x_data,x_data**2],axis = -1)
    return x_data,y_data,x_var 

In [118]:
x_data,y_data,x_var = generate_data(6)

### 训练数据

In [129]:
import os
keras.backend.clear_session()
model = build_new_model()

model_path = 'arch_model_3'
if not os.path.exists(model_path):
    os.makedirs(model_path)
checkpoint_path = os.path.join(model_path,'epoch_{epoch:03d}loss{loss:.3f}.h5')
checkpoint = ModelCheckpoint(checkpoint_path,monitor='loss',
                             save_best_only=True,save_weights_only=False,
                             verbose=1)



In [132]:
compute_loss(y_data,model.predict([x_data,x_var]))

loss is -0.9309


In [121]:
model.fit([x_data,x_var],y_data,epochs=1,callbacks=[checkpoint],verbose=1)

Epoch 1/1

Epoch 00001: loss improved from 0.01442 to 0.01373, saving model to arch_model_3/epoch_001loss0.014.h5


<keras.callbacks.History at 0x139f860b8>

In [122]:
x_data.shape

(426, 12)

In [123]:
# model = keras.models.load_model('arch_model_1/epoch_100loss-0.768.h5',
#                                 custom_objects={'MLE_loss': MLE_loss})

In [124]:
# #把这次的模型保存下来。
# model.save('best_model0.45.h5') 

In [131]:
# def compute_loss1(y_true,y_pred):
#     epsilon = 0.0000001
#     print("loss is {:.4f}".format( np.mean(0.5*np.log(y_pred+epsilon)+y_true*y_true/(2.*y_pred+epsilon)) ))

def compute_loss(y_true, y_pred):
    epsilon = 1e-6
    loss = np.mean( 0.5*np.log(y_pred+epsilon)+y_true*y_true/(2.0*y_pred+epsilon))
    print("loss is {:.4f}".format(loss))
    
compute_loss(y_data,x_var)

loss is -0.9299


### 模型检验

In [13]:
predict_y = model.predict(x_data**2)
std_red = y_data/np.sqrt(predict_y).reshape(-1,)

In [14]:
from statsmodels.stats.diagnostic import acorr_ljungbox
acorr_ljungbox(y_data**2,10)

(array([ 9.03706158, 27.34674833, 43.85964414, 51.91976296, 57.42843455,
        61.20534825, 67.65937975, 70.36293507, 73.34663844, 74.95281138]),
 array([2.64560535e-03, 1.15273386e-06, 1.61642458e-09, 1.43378721e-10,
        4.12635494e-11, 2.56023480e-11, 4.38457216e-12, 4.16104563e-12,
        3.34831879e-12, 4.85947829e-12]))

In [15]:
from matplotlib import pyplot as plt
plt.plot(std_red**2)

[<matplotlib.lines.Line2D at 0x1361da0b8>]

In [16]:
a0 = 0.01058810 
a1 = 0.23714881
a2 = 0.07274721
a3 = 0.05308030
arch_param = [a0,a1,a2,a3]

In [17]:
def predict_arch(std_x,arch_param):
    #使用标准序列std_x，和arch模型的参数，预测条件方差。
    a0,a1,a2,a3 = arch_param
    pred = a0+a1*std_x[2:-1]**2+a2*std_x[1:-2]**2+a3*std_x[0:-3]**2
    return pred
compute_loss(std_x[3:],predict_arch(std_x,arch_param))

loss is -1.5933


In [18]:
def box_test(std_x):
    #对序列std_x使用arch模型预测条件方差，对其标准残差做Ljung-box检验。
    sigma2 = predict_arch(std_x,arch_param)
    res = std_x[3:]/np.sqrt(sigma2)
    box_value,box_p = acorr_ljungbox(res**2,10)
    print('Q(10)={:.3f}({:.3f})'.format(box_value[9],box_p[9]))
box_test(std_x)

Q(10)=7.311(0.696)


In [110]:
box_test(y_data)

Q(10)=6.701(0.753)


### 使用过去的滞后项计算的方差作为条件方差

In [133]:
pvar = []
for i in range(y_data.shape[0]-6):
    x = np.std(y_data[i:i+6])
    pvar.append(x)

In [134]:
pvar = np.array(pvar)
red_pred = y_data[6:]/pvar

In [135]:
acorr_ljungbox(red_pred**2,10)

(array([0.05156867, 1.02603984, 1.85755485, 2.94300556, 4.33238078,
        6.3472149 , 6.38192348, 7.65132582, 7.69814296, 7.76094747]),
 array([0.82035582, 0.59868486, 0.60249115, 0.56740815, 0.50261843,
        0.38543915, 0.49592733, 0.46824879, 0.56482888, 0.6521743 ]))