In [None]:
# 导入依赖
import sys
sys.path.insert(0, '..')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch

from src.strategy.ar_model import (
    LinearARModel, 
    train_ar_model, 
    prepare_ar_features,
    save_ar_model,
    load_ar_model,
    StreamingARPredictor
)
from src.data.tradingview import TradingViewDataFetcher
from src.strategy.quant_advisor import QuantAdvisor, MarketRegime

# 设置随机种子
np.random.seed(42)
torch.manual_seed(42)

# 设置绘图样式
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei', 'WenQuanYi Micro Hei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

## 1. 获取数据

In [None]:
# 获取 BTC/USDT 数据
fetcher = TradingViewDataFetcher()

# 获取 1 小时 K 线数据
symbol = "BTC/USDT:USDT"
timeframe = "30m"
limit = 1000  # 获取更多数据用于训练

print(f"正在获取 {symbol} {timeframe} 数据...")
df = fetcher.get_klines(symbol, timeframe, limit=limit)

if df is not None:
    # 将 index 转为 datetime 列
    df = df.reset_index()
    df = df.rename(columns={'index': 'datetime'})
    print(f"获取到 {len(df)} 条数据")
    print(f"时间范围: {df['datetime'].iloc[0]} 到 {df['datetime'].iloc[-1]}")
    df.head()

In [None]:
# 绘制价格走势
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

# 价格图
axes[0].plot(df['datetime'], df['close'], label='Close Price', color='blue')
axes[0].set_ylabel('Price (USDT)')
axes[0].set_title(f'{symbol} {timeframe} Price Chart')
axes[0].legend()

# Log Return 图
log_returns = np.log(df['close'] / df['close'].shift(1))
axes[1].plot(df['datetime'], log_returns, label='Log Return', color='green', alpha=0.7)
axes[1].axhline(y=0, color='red', linestyle='--', alpha=0.5)
axes[1].set_ylabel('Log Return')
axes[1].set_xlabel('Time')
axes[1].legend()

plt.tight_layout()
plt.show()

## 2. 分析 Log Return 自相关性

In [None]:
# 计算自相关系数
log_returns = np.log(df['close'] / df['close'].shift(1)).dropna()

max_lags = 10
autocorr = [log_returns.autocorr(lag=i) for i in range(1, max_lags + 1)]

# 绘制自相关图
fig, ax = plt.subplots(figsize=(10, 5))
bars = ax.bar(range(1, max_lags + 1), autocorr, color='steelblue', alpha=0.7)

# 添加显著性线 (95% 置信区间)
n = len(log_returns)
significance = 1.96 / np.sqrt(n)
ax.axhline(y=significance, color='red', linestyle='--', label=f'95% CI: ±{significance:.4f}')
ax.axhline(y=-significance, color='red', linestyle='--')
ax.axhline(y=0, color='gray', linestyle='-', alpha=0.5)

ax.set_xlabel('Lag')
ax.set_ylabel('Autocorrelation')
ax.set_title('Log Return 自相关性分析')
ax.legend()

# 在柱状图上添加数值
for bar, val in zip(bars, autocorr):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005, 
            f'{val:.4f}', ha='center', va='bottom', fontsize=8)

plt.tight_layout()
plt.show()

print("\n自相关系数:")
for i, ac in enumerate(autocorr, 1):
    significance_mark = "*" if abs(ac) > significance else ""
    print(f"  Lag {i}: {ac:+.6f} {significance_mark}")

## 3. 训练 AR 模型

In [None]:
# 训练 AR(3) 模型
prices = df['close'].values

n_lags = 3
forecast_horizon = 1
test_size = 0.25
n_epochs = 2000

print(f"训练 AR({n_lags}) 模型...")
print(f"预测周期: {forecast_horizon}")
print(f"测试集比例: {test_size}")
print(f"训练轮数: {n_epochs}")
print("-" * 50)

result = train_ar_model(
    prices=prices,
    n_lags=n_lags,
    forecast_horizon=forecast_horizon,
    test_size=test_size,
    n_epochs=n_epochs,
    lr=0.01,
    verbose=True
)

In [None]:
# 可视化学习到的权重
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# 权重柱状图
lag_labels = [f'Lag {i}' for i in range(1, n_lags + 1)]
colors = ['green' if w > 0 else 'red' for w in result.weights]
bars = axes[0].bar(lag_labels, result.weights, color=colors, alpha=0.7)
axes[0].axhline(y=0, color='gray', linestyle='-', alpha=0.5)
axes[0].set_ylabel('Weight')
axes[0].set_title(f'AR({n_lags}) 模型权重')

# 在柱状图上添加数值
for bar, w in zip(bars, result.weights):
    axes[0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.005, 
                f'{w:.4f}', ha='center', va='bottom', fontsize=10)

# 模型性能饼图
labels = ['Win Rate', 'Loss Rate']
sizes = [result.win_rate, 1 - result.win_rate]
colors_pie = ['lightgreen', 'lightcoral']
explode = (0.05, 0)

axes[1].pie(sizes, explode=explode, labels=labels, colors=colors_pie,
           autopct='%1.1f%%', shadow=True, startangle=90)
axes[1].set_title(f'模型胜率: {result.win_rate:.1%}')

plt.tight_layout()
plt.show()

print(f"\n模型解读:")
for i, w in enumerate(result.weights, 1):
    if w < 0:
        print(f"  Lag {i}: {w:.4f} → 均值回归效应 (涨了倾向跌)")
    else:
        print(f"  Lag {i}: {w:.4f} → 动量效应 (涨了继续涨)")
print(f"  Bias: {result.bias:.6f}")

## 4. 回测可视化

In [None]:
# 准备测试数据
X, y = prepare_ar_features(prices, n_lags, forecast_horizon)
split_idx = int(len(X) * (1 - test_size))
X_test, y_test = X[split_idx:], y[split_idx:]

# 预测
model = result.model
model.eval()
with torch.no_grad():
    X_test_t = torch.tensor(X_test, dtype=torch.float32)
    y_pred = model(X_test_t).numpy().flatten()

# 计算交易收益
trade_returns = np.sign(y_pred) * y_test
equity_curve = np.cumsum(trade_returns)
buy_hold_curve = np.cumsum(y_test)

# 绘制回测结果 (不共享 x 轴)
fig, axes = plt.subplots(3, 1, figsize=(14, 10))

# 累计收益曲线
axes[0].plot(equity_curve, label='AR Model Strategy', color='blue', linewidth=2)
axes[0].plot(buy_hold_curve, label='Buy & Hold', color='orange', alpha=0.7)
axes[0].axhline(y=0, color='gray', linestyle='--', alpha=0.5)
axes[0].fill_between(range(len(equity_curve)), equity_curve, 0, 
                     where=equity_curve > 0, color='green', alpha=0.2)
axes[0].fill_between(range(len(equity_curve)), equity_curve, 0, 
                     where=equity_curve < 0, color='red', alpha=0.2)
axes[0].set_xlabel('Trade Number')
axes[0].set_ylabel('Cumulative Log Return')
axes[0].set_title('策略回测: AR模型 vs 持有')
axes[0].legend(loc='upper left')

# 预测 vs 实际 (散点图)
axes[1].scatter(y_test, y_pred, alpha=0.3, s=10)
axes[1].axline((0, 0), slope=1, color='red', linestyle='--', label='Perfect Prediction')
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.5)
axes[1].axvline(x=0, color='gray', linestyle='-', alpha=0.5)
axes[1].set_xlabel('Actual Log Return')
axes[1].set_ylabel('Predicted Log Return')
axes[1].set_title('预测 vs 实际')
axes[1].legend()

# 单笔交易收益分布 (直方图)
axes[2].hist(trade_returns, bins=50, color='steelblue', alpha=0.7, edgecolor='white')
axes[2].axvline(x=0, color='red', linestyle='--', linewidth=2)
axes[2].axvline(x=trade_returns.mean(), color='green', linestyle='-', linewidth=2, 
               label=f'Mean: {trade_returns.mean():.6f}')
axes[2].set_xlabel('Trade Return (Log)')
axes[2].set_ylabel('Frequency')
axes[2].set_title('单笔交易收益分布')
axes[2].legend()

plt.tight_layout()
plt.show()

# 统计信息
print("\n回测统计:")
print(f"  总交易数: {len(trade_returns)}")
print(f"  胜率: {result.win_rate:.1%}")
print(f"  平均单笔收益: {trade_returns.mean():.6f}")
print(f"  收益标准差: {trade_returns.std():.6f}")
print(f"  年化 Sharpe Ratio: {result.sharpe:.2f}")
print(f"  累计收益: {equity_curve[-1]:.4f} ({(np.exp(equity_curve[-1])-1)*100:.2f}%)")
print(f"  Buy & Hold 收益: {buy_hold_curve[-1]:.4f} ({(np.exp(buy_hold_curve[-1])-1)*100:.2f}%)")

## 5. 保存模型

In [None]:
# 保存模型
model_path = '../data/ar_model.pth'
save_ar_model(result.model, model_path)
print(f"模型已保存到: {model_path}")

# 验证加载
loaded_model = load_ar_model(model_path)
print(f"\n模型加载成功!")
print(f"  n_lags: {loaded_model.n_lags}")
weights, bias = loaded_model.get_weights()
print(f"  weights: {weights}")
print(f"  bias: {bias:.6f}")

## 6. 测试 QuantAdvisor 集成

In [None]:
# 使用带 AR 模型的 QuantAdvisor
advisor_with_ar = QuantAdvisor(
    lookback_period=100,
    ar_model=result.model,
    ar_n_lags=n_lags
)

# 不带 AR 模型的 QuantAdvisor
advisor_without_ar = QuantAdvisor(lookback_period=100)

# 分析
advice_with_ar = advisor_with_ar.analyze(df, symbol)
advice_without_ar = advisor_without_ar.analyze(df, symbol)

print("=" * 60)
print("对比分析: 带 AR 模型 vs 不带 AR 模型")
print("=" * 60)

print(f"\n【带 AR 模型】")
print(f"  方向: {advice_with_ar.direction.value}")
print(f"  置信度: {advice_with_ar.confidence:.1f}%")
print(f"  胜率: {advice_with_ar.win_probability:.1%}")
print(f"  市场状态: {advice_with_ar.market_regime.value}")
if 'ar_prediction' in advice_with_ar.factors:
    print(f"  AR 预测: {advice_with_ar.factors['ar_prediction']:.6f}")
    print(f"  AR Z-Score: {advice_with_ar.z_scores.get('ar_prediction', 'N/A')}")

print(f"\n【不带 AR 模型】")
print(f"  方向: {advice_without_ar.direction.value}")
print(f"  置信度: {advice_without_ar.confidence:.1f}%")
print(f"  胜率: {advice_without_ar.win_probability:.1%}")
print(f"  市场状态: {advice_without_ar.market_regime.value}")

In [None]:
# 可视化 Z-Scores 对比
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 带 AR 模型的 Z-Scores
factors_ar = list(advice_with_ar.z_scores.keys())
z_values_ar = [advice_with_ar.z_scores[f] for f in factors_ar]
colors_ar = ['green' if z > 0 else 'red' for z in z_values_ar]

bars1 = axes[0].barh(factors_ar, z_values_ar, color=colors_ar, alpha=0.7)
axes[0].axvline(x=0, color='gray', linestyle='-', alpha=0.5)
axes[0].axvline(x=2, color='blue', linestyle='--', alpha=0.5, label='Z=2 threshold')
axes[0].axvline(x=-2, color='blue', linestyle='--', alpha=0.5)
axes[0].set_xlabel('Z-Score')
axes[0].set_title('因子 Z-Scores (带AR模型)')
axes[0].legend()

# 不带 AR 模型的 Z-Scores
factors_no_ar = list(advice_without_ar.z_scores.keys())
z_values_no_ar = [advice_without_ar.z_scores[f] for f in factors_no_ar]
colors_no_ar = ['green' if z > 0 else 'red' for z in z_values_no_ar]

bars2 = axes[1].barh(factors_no_ar, z_values_no_ar, color=colors_no_ar, alpha=0.7)
axes[1].axvline(x=0, color='gray', linestyle='-', alpha=0.5)
axes[1].axvline(x=2, color='blue', linestyle='--', alpha=0.5, label='Z=2 threshold')
axes[1].axvline(x=-2, color='blue', linestyle='--', alpha=0.5)
axes[1].set_xlabel('Z-Score')
axes[1].set_title('因子 Z-Scores (不带AR模型)')
axes[1].legend()

plt.tight_layout()
plt.show()

## 7. 不同滞后期数对比

In [None]:
# 测试不同滞后期数的模型
lag_range = range(1, 8)
results_by_lag = []

print("训练不同滞后期数的 AR 模型...")
for n in lag_range:
    r = train_ar_model(
        prices=prices,
        n_lags=n,
        forecast_horizon=1,
        test_size=0.25,
        n_epochs=1000,
        lr=0.01,
        verbose=False
    )
    results_by_lag.append(r)
    print(f"  AR({n}): Win Rate = {r.win_rate:.1%}, Sharpe = {r.sharpe:.2f}")

# 可视化
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# 胜率
win_rates = [r.win_rate for r in results_by_lag]
axes[0].bar(list(lag_range), win_rates, color='steelblue', alpha=0.7)
axes[0].axhline(y=0.5, color='red', linestyle='--', label='50% Baseline')
axes[0].set_xlabel('Number of Lags')
axes[0].set_ylabel('Win Rate')
axes[0].set_title('Win Rate vs Number of Lags')
axes[0].legend()

# Sharpe Ratio
sharpes = [r.sharpe for r in results_by_lag]
colors = ['green' if s > 0 else 'red' for s in sharpes]
axes[1].bar(list(lag_range), sharpes, color=colors, alpha=0.7)
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.5)
axes[1].set_xlabel('Number of Lags')
axes[1].set_ylabel('Annualized Sharpe Ratio')
axes[1].set_title('Sharpe比率 vs 滞后期数')

plt.tight_layout()
plt.show()

# 找到最佳模型
best_idx = np.argmax(sharpes)
best_result = results_by_lag[best_idx]
print(f"\n最佳模型: AR({best_result.n_lags})")
print(f"  Win Rate: {best_result.win_rate:.1%}")
print(f"  Sharpe Ratio: {best_result.sharpe:.2f}")
print(f"  Weights: {best_result.weights}")

## 8. 总结

In [None]:
print("=" * 60)
print("AR 模型分析总结")
print("=" * 60)
print(f"\n数据:")
print(f"  交易对: {symbol}")
print(f"  时间周期: {timeframe}")
print(f"  数据量: {len(df)} 条")

print(f"\n主模型 AR({n_lags}):")
print(f"  权重: {result.weights}")
print(f"  偏置: {result.bias:.6f}")
print(f"  胜率: {result.win_rate:.1%}")
print(f"  Sharpe: {result.sharpe:.2f}")

print(f"\n权重解读:")
for i, w in enumerate(result.weights, 1):
    effect = "均值回归" if w < 0 else "动量"
    print(f"  Lag {i}: {w:+.4f} ({effect})")

print(f"\n最佳滞后期数: {best_result.n_lags}")
print(f"\n模型保存路径: {model_path}")
print(f"\n使用方法:")
print(f"  advisor = QuantAdvisor(ar_model_path='{model_path}')")