In [1]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
import statsmodels.api as sm
import warnings

# 忽略特定类型的警告
warnings.filterwarnings("ignore", message="No frequency information was provided, so inferred frequency B will be used.")
warnings.filterwarnings("ignore", message="A date index has been provided, but it has no associated frequency information and so will be ignored when e.g. forecasting.")

# 读取Excel文件
data = pd.read_excel('D:/统计建模/期现价格(完全修订版).xlsx', sheet_name='全部', header=0)

# 将日期列转换为日期时间格式
data['日期'] = pd.to_datetime(data['日期'])

# 设置日期列为索引
data.set_index('日期', inplace=True)

# 对数据进行一阶差分
data_diff = data.diff().dropna()

max_lag = 15  # 设置最大滞后阶数
window_size = 60 # 窗口大小
n_windows = len(data_diff) - window_size + 1  # 计算窗口数
variance_contributions = []
lag_information=[]
lag_result=[]

#找到最优的滞后阶数n
def find_optimal_lag(data, max_lag):
    best_aic = np.inf
    best_lag = 0
    for lag in range(1, max_lag + 1):
        model = VAR(data).fit(lag)
        aic = model.aic
        if aic < best_aic:
            best_aic = aic
            best_lag = lag
    return best_lag

# 按照滑动窗口建立VAR模型
for i in range(n_windows):
    window_data = data_diff.iloc[i:i+window_size]
    optimal_lag = find_optimal_lag(window_data, max_lag)
    model = VAR(window_data)
    var_model = model.fit(optimal_lag)
    lag_information.append(optimal_lag)
    
# 按照滑动窗口建立VAR模型并进行方差分解
for i in range(n_windows):
    window_data = data_diff.iloc[i:i+window_size]
    optimal_lag = find_optimal_lag(window_data, max_lag)
    model = VAR(window_data)
    var_model = model.fit(optimal_lag)
    fevd = var_model.fevd(100)  # 计算方差分解，指定滞后期为第100期
    fevd_data = fevd.decomp
    variance_contributions.append(fevd_data[1][99][0])
    
# 计算脉冲响应的平均滞后期数
  
for i in range(n_windows):
    window_data = data_diff.iloc[i:i+window_size]
    optimal_lag = find_optimal_lag(window_data, max_lag)
    model = VAR(window_data)
    var_model = model.fit(optimal_lag)
    irf = var_model.irf(10)
    impulse_response_sum=0
    weight=0
    for x in range(10):
        impulse_response = abs(irf.irfs[x][1][0])
        weight = weight+impulse_response*x
        
        impulse_response_sum = impulse_response_sum + impulse_response
    average_lag_period = weight/impulse_response_sum
    lag_result.append(average_lag_period)

In [2]:
df = pd.DataFrame({'Variance Contributions': variance_contributions})

# 将DataFrame写入Excel文件
df.to_excel('variance_contributions.xlsx', index=False)

In [3]:
df = pd.DataFrame({'lag_information': lag_information})

# 将DataFrame写入Excel文件
df.to_excel('lag_information.xlsx', index=False)

df = pd.DataFrame({'lag_result': lag_result})

# 将DataFrame写入Excel文件
df.to_excel('lag_result.xlsx', index=False)

In [None]:
#按照滑动窗口法进行ADF检验
for i in range(n_windows):
    window_data = data_diff.iloc[i:i+window_size]
    adfResult = sm.tsa.stattools.adfuller(data_diff.iloc[i:i+window_size,0],max_lag)
    if adfResult[1]>0.05:
        print(adfResult[1]) 