In [None]:
#pip install akshare

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
import itertools
from sklearn.metrics import mean_absolute_error, mean_squared_error
import akshare as ak

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

# 1. 使用 AKshare 获取茅台（600519）日行情数据
start_date = "20200101"
end_date = "20250507"
data = ak.stock_zh_a_daily(symbol="sz000025", start_date=start_date, end_date=end_date)
data["date"] = pd.to_datetime(data["date"])
data.set_index("date", inplace=True)

# 计算简单收益率
def compute_returns(df):
    """
    计算简单收益率：r_t = (P_t - P_{t-1}) / P_{t-1}
    """
    returns = df["close"].pct_change().dropna()
    returns.name = "returns"
    return returns

# 2. 计算收益率并绘图
returns = compute_returns(data)
plt.figure(figsize=(12, 4))
plt.plot(returns.index, returns.values)
plt.title("股票日收益率")
plt.xlabel("日期")
plt.ylabel("收益率")
plt.show()

# 训练/测试集划分：80% 训练，20% 测试
n = len(returns)
train_size = int(n * 0.8)
train_ret = returns.iloc[:train_size]
test_ret = returns.iloc[train_size:]

# 自动通过 AIC 选择最优 ARIMA 阶数
def select_arima_order(ts, p_max=3, d_max=2, q_max=3):
    best_aic = np.inf
    best_order = None
    for p, d, q in itertools.product(range(p_max+1), range(d_max+1), range(q_max+1)):
        try:
            model = ARIMA(ts, order=(p, d, q)).fit()
            if model.aic < best_aic:
                best_aic = model.aic
                best_order = (p, d, q)
        except:
            continue
    return best_order

# 3. 构建 ARIMA 模型并预测
order_arima = select_arima_order(train_ret)
print(f"ARIMA 最优阶数: {order_arima}")
model_arima = ARIMA(train_ret, order=order_arima).fit()

# 使用整数索引进行预测，避免时间戳匹配错误
start_idx = train_ret.shape[0]
end_idx = train_ret.shape[0] + test_ret.shape[0] - 1
forecast_arima = model_arima.predict(start=start_idx, end=end_idx, dynamic=False)
forecast_arima.index = test_ret.index  # 设回原测试集的日期索引

# 4. 评估并报告 ARIMA 模型性能
def report_performance(true, pred, label):
    mae = mean_absolute_error(true, pred)
    rmse = np.sqrt(mean_squared_error(true, pred))
    mape = np.mean(np.abs((true - pred) / true)) * 100
    print(f"{label} - MAE: {mae:.6f}, RMSE: {rmse:.6f}, MAPE: {mape:.2f}%")
    return mae, rmse, mape

print("ARIMA 模型性能：")
report_performance(test_ret, forecast_arima, "测试集")

# 可视化训练/测试对比
plt.figure(figsize=(12, 4))
plt.plot(train_ret.index, train_ret, label="训练集")
plt.plot(test_ret.index, test_ret, label="真实测试集")
plt.plot(forecast_arima.index, forecast_arima, label="ARIMA 预测")
plt.legend()
plt.title("ARIMA 训练/测试对比")
plt.show()

# 残差分析
resid = model_arima.resid
plt.figure(figsize=(12, 4))
plt.plot(resid)
plt.title("ARIMA 残差序列")
plt.show()




In [None]:
# 5. 加入情绪指标 Sentiment.xlsx
sentiment = pd.read_excel("daily_sentiment.xlsx")
sentiment['date'] = pd.to_datetime(sentiment['date'])
sentiment.set_index('date', inplace=True)

# 合并收益率和情绪指标，删除 NaN
merged = pd.concat([returns, sentiment], axis=1).dropna()

# 重新划分训练/测试
train_data = merged.iloc[:int(len(merged)*0.8)]
test_data  = merged.iloc[int(len(merged)*0.8):]

# 6. 构建 ARIMAX 模型并预测
order_arimax = select_arima_order(train_data['returns'])
print(f"ARIMAX 最优阶数: {order_arimax}")
model_arimax = ARIMA(endog=train_data['returns'],
                     exog=train_data['sentiment_score'],
                     order=order_arimax).fit()

start_idx2 = train_data.shape[0]
end_idx2   = train_data.shape[0] + test_data.shape[0] - 1
forecast_arimax = model_arimax.predict(start=start_idx2,
                                       end=end_idx2,
                                       exog=test_data['sentiment_score'],
                                       dynamic=False)
forecast_arimax.index = test_data.index

# 7. 报告 ARIMAX 性能
print("ARIMAX 模型性能：")
report_performance(test_data['returns'], forecast_arimax, "测试集")

# 可视化 ARIMAX 训练/测试对比
plt.figure(figsize=(12, 4))
plt.plot(train_data.index, train_data['returns'], label="训练集")
plt.plot(test_data.index, test_data['returns'], label="真实测试集")
plt.plot(forecast_arimax.index, forecast_arimax, label="ARIMAX 预测")
plt.legend()
plt.title("ARIMAX 训练/测试对比")
plt.show()

# ARIMAX 残差分析
resid_x = model_arimax.resid
plt.figure(figsize=(12, 4))
plt.plot(resid_x)
plt.title("ARIMAX 残差序列")
plt.show()

# 8. 对比 ARIMA 与 ARIMAX
print("对比结果：")
mae_a, rmse_a, mape_a = report_performance(test_ret, forecast_arima, "ARIMA 测试集")
mae_x, rmse_x, mape_x = report_performance(test_data['returns'], forecast_arimax, "ARIMAX 测试集")
print(f"指标   ARIMA      ARIMAX")
print(f"MAE    {mae_a:.6f}   {mae_x:.6f}")
print(f"RMSE   {rmse_a:.6f}   {rmse_x:.6f}")
print(f"MAPE   {mape_a:.2f}%   {mape_x:.2f}%")