In [1]:
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations
import matplotlib.pyplot as plt
import os

In [2]:
file_path = '期货.xlsx'

#清洗数据
#前两行
df_header = pd.read_excel(file_path, header=None, nrows=2)

#合并前两行
new_columns = []
for col1, col2 in zip(df_header.iloc[0], df_header.iloc[1]):
    #处理空值
    col1_str = str(col1).strip() if pd.notna(col1) else ''
    col2_str = str(col2).strip() if pd.notna(col2) else ''
    new_col = f"{col1_str}_{col2_str}".strip('_')
    new_columns.append(new_col)

#从第3行开始读数据
df = pd.read_excel(file_path, header=[2])
df.columns = new_columns  #替换合成列名


In [8]:
#保留“日期”
date_cols = [col for col in df.columns if '日期' in str(col)]

#保留“收盘价”
close_cols = [col for col in df.columns if str(col).strip().endswith('_收盘价')]

# 最终保留的列
keep_cols = date_cols + close_cols
df_filtered = df[keep_cols].copy()

# 保存结果
df_filtered.to_excel("期货收盘价.xlsx", index=False)

In [6]:
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations
import matplotlib.pyplot as plt
import os

data_path = "期货收盘价.xlsx"
residual_dir = "residual_plots"
zscore_dir = "zscore_plots"

#文件夹保存
os.makedirs(residual_dir, exist_ok=True)
os.makedirs(zscore_dir, exist_ok=True)

#读数据
df = pd.read_excel(data_path)
df = df.dropna().reset_index(drop=True)

#识别日期
date_col = [col for col in df.columns if '日期' in str(col)][0]
df[date_col] = pd.to_datetime(df[date_col])
price_cols = [col for col in df.columns if col != date_col]

#协整检验
def engle_granger_with_zscore(series1, series2):
    X = sm.add_constant(series1)
    model = sm.OLS(series2, X).fit()
    beta = model.params[1]
    residuals = model.resid

    #z-score标准化
    mu = residuals.mean()
    sigma = residuals.std()
    zscore = (residuals - mu) / sigma

    #ADF检验
    adf_stat, p_value, _, _, crit_vals, _ = adfuller(residuals)

    return {
        'beta': beta,
        'residuals': residuals,
        'zscore': zscore,
        'ADF统计量': adf_stat,
        'p值': p_value,
        '5%临界值': crit_vals['5%'],
        '是否协整(5%)': p_value < 0.05
    }

#循环
results = []

for col1, col2 in combinations(price_cols, 2):
    test = engle_granger_with_zscore(df[col1], df[col2])
    
    result = {
        '标的1': col1,
        '标的2': col2,
        'ADF统计量': test['ADF统计量'],
        'p值': test['p值'],
        'beta': test['beta'],
        '是否协整(5%)': test['是否协整(5%)']
    }
    results.append(result)

    # 残差图
    plt.figure(figsize=(10, 4))
    plt.plot(df[date_col], test['residuals'], label='残差')
    plt.axhline(0, color='gray', linestyle='--')
    plt.title(f"{col1} & {col2} 残差时间序列")
    plt.xlabel("日期")
    plt.ylabel("残差")
    plt.tight_layout()
    plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
    plt.close()

    # z-score 图
    plt.figure(figsize=(10, 4))
    plt.plot(df[date_col], test['zscore'], label='z-score')
    plt.axhline(0, color='black', linestyle='--', linewidth=1)
    plt.axhline(2, color='red', linestyle='--')
    plt.axhline(-2, color='red', linestyle='--')
    plt.axhline(1, color='gray', linestyle=':')
    plt.axhline(-1, color='gray', linestyle=':')
    plt.title(f"{col1} & {col2} Z-Score 时间序列")
    plt.xlabel("日期")
    plt.ylabel("z-score")
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"{zscore_dir}/zscore_{col1}_{col2}.png")
    plt.close()

#保存结果
result_df = pd.DataFrame(results)
result_df.to_excel("协整检验结果.xlsx", index=False)

print("搞定")

  beta = model.params[1]
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.tight_layout()
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  plt.savefig(f"{residual_dir}/残差_{col1}_{col2}.png")
  p

搞定
