# CPB V1 vs V2 模型對比測試

在 Colab 上執行，自動抓取訓練好的 V1 模型進行對比

## 架構說明
- WEB 端: cpb-trading-web (GitHub)
- 訓練端: cpbv2 (GitHub, 當前環境)
- 本筆記本: 負責 V1 vs V2 對比測試

In [None]:
# 安裝依賴
!pip install -q tensorflow numpy matplotlib pandas huggingface-hub requests

In [None]:
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import json
import urllib.request

# 設置中文字體
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print('[INFO] 環境初始化完成')

## Cell 1: 下載 V1 訓練好的模型

從 cpbv2 GitHub 倉庫下載已訓練的 V1 模型

In [None]:
# 下載 V1 模型訓練腳本
print('[1] 下載 V1 訓練完成的模型...')

# 從 GitHub 下載模型文件
try:
    os.makedirs('models', exist_ok=True)
    
    # 下載 V1 模型 (假設已保存在 GitHub)
    v1_model_url = 'https://raw.githubusercontent.com/caizongxun/cpbv2/main/models/v1_model.h5'
    
    # 檢查模型是否存在
    import requests
    response = requests.head(v1_model_url, timeout=5)
    
    if response.status_code == 200:
        print(f'[OK] V1 模型存在，開始下載...')
        urllib.request.urlretrieve(v1_model_url, 'models/v1_model.h5')
        print('[OK] V1 模型下載完成')
    else:
        print('[WARNING] V1 模型文件不存在於 GitHub')
        print('         將使用模擬 V1 模型進行測試')
        
except Exception as e:
    print(f'[WARNING] 下載失敗: {e}')
    print('[INFO] 使用模擬 V1 模型進行測試')

## Cell 2: 定義 V1 和 V2 預測函數

In [None]:
def calculate_atr(klines, period=14):
    """計算平均真實波幅 (ATR)"""
    if len(klines) < period:
        return 0
    
    trs = []
    for i in range(1, len(klines)):
        high_low = klines[i]['high'] - klines[i]['low']
        high_close = abs(klines[i]['high'] - klines[i-1]['close'])
        low_close = abs(klines[i]['low'] - klines[i-1]['close'])
        tr = max(high_low, high_close, low_close)
        trs.append(tr)
    
    atr = np.mean(trs[-period:]) if trs else 0
    return atr

def calculate_volatility(klines, period=14):
    """計算歷史波動率 (百分比)"""
    if len(klines) < period:
        return 0
    
    close_prices = [k['close'] for k in klines[-period:]]
    returns = np.diff(close_prices) / close_prices[:-1]
    volatility = np.std(returns) * 100
    
    return volatility

print('[OK] 指標計算函數已定義')

In [None]:
def predict_v1_hardcoded(current_price, direction):
    """V1 版本：硬編碼百分比"""
    if direction == 'up':
        price_3h = current_price * 1.02
        price_5h = current_price * 1.03
    else:
        price_3h = current_price * 0.98
        price_5h = current_price * 0.97
    
    return price_3h, price_5h

def predict_v2_dynamic(current_price, direction, volatility, atr):
    """V2 版本：動態波動率"""
    # 基於 ATR 的動態幅度
    atr_percent = (atr / current_price) * 100
    
    # 波動率調整係數
    vol_factor = max(0.5, min(2.0, volatility / 2.0))
    
    # 動態幅度 = ATR% * 波動率係數
    dynamic_move_3h = atr_percent * vol_factor * 1.5
    dynamic_move_5h = atr_percent * vol_factor * 2.5
    
    if direction == 'up':
        price_3h = current_price * (1 + dynamic_move_3h / 100)
        price_5h = current_price * (1 + dynamic_move_5h / 100)
    else:
        price_3h = current_price * (1 - dynamic_move_3h / 100)
        price_5h = current_price * (1 - dynamic_move_5h / 100)
    
    return price_3h, price_5h, dynamic_move_3h, dynamic_move_5h

print('[OK] V1 和 V2 預測函數已定義')

## Cell 3: 生成測試數據

In [None]:
def generate_historical_klines(num_candles=20, start_price=87800):
    """生成模擬歷史 K 線"""
    klines = []
    current_price = start_price
    
    for i in range(num_candles):
        daily_return = np.random.normal(0, 0.015)
        open_price = current_price
        close_price = current_price * (1 + daily_return)
        high_price = max(open_price, close_price) * (1 + np.random.uniform(0, 0.01))
        low_price = min(open_price, close_price) * (1 - np.random.uniform(0, 0.01))
        
        klines.append({
            'time': i,
            'open': open_price,
            'high': high_price,
            'low': low_price,
            'close': close_price,
            'volume': np.random.uniform(1000, 5000)
        })
        
        current_price = close_price
    
    return klines

def generate_future_klines(start_price, num_candles=5, true_direction=None):
    """生成模擬未來實際走勢"""
    klines = []
    current_price = start_price
    
    for i in range(num_candles):
        if true_direction == 'down':
            daily_return = np.random.normal(-0.003, 0.008)
        elif true_direction == 'up':
            daily_return = np.random.normal(0.003, 0.008)
        else:
            daily_return = np.random.normal(0, 0.01)
        
        open_price = current_price
        close_price = current_price * (1 + daily_return)
        high_price = max(open_price, close_price) * (1 + np.random.uniform(0, 0.008))
        low_price = min(open_price, close_price) * (1 - np.random.uniform(0, 0.008))
        
        klines.append({
            'time': i,
            'open': open_price,
            'high': high_price,
            'low': low_price,
            'close': close_price,
            'volume': np.random.uniform(1000, 5000)
        })
        
        current_price = close_price
    
    return klines

print('[OK] 數據生成函數已定義')

## Cell 4: 執行對比測試

In [None]:
print('\n' + '='*70)
print('CPB 模型 V1 vs V2 對比測試 (COLAB)')
print('='*70)

v1_errors = []
v2_errors = []
results = []

num_tests = 20

for test_num in range(num_tests):
    # 生成歷史數據
    hist_klines = generate_historical_klines(num_candles=20)
    current_price = hist_klines[-1]['close']
    
    # 計算指標
    atr = calculate_atr(hist_klines)
    volatility = calculate_volatility(hist_klines)
    direction = 'down' if np.random.random() > 0.5 else 'up'
    
    # V1 預測
    v1_price_3h, v1_price_5h = predict_v1_hardcoded(current_price, direction)
    
    # V2 預測
    v2_price_3h, v2_price_5h, move_3h, move_5h = predict_v2_dynamic(
        current_price, direction, volatility, atr
    )
    
    # 生成實際未來走勢
    future_klines = generate_future_klines(
        current_price,
        num_candles=5,
        true_direction=direction
    )
    actual_price_5h = future_klines[-1]['close']
    
    # 計算誤差
    v1_error = abs(v1_price_5h - actual_price_5h) / actual_price_5h * 100
    v2_error = abs(v2_price_5h - actual_price_5h) / actual_price_5h * 100
    
    v1_errors.append(v1_error)
    v2_errors.append(v2_error)
    
    results.append({
        'test': test_num + 1,
        'current_price': current_price,
        'volatility': volatility,
        'atr': atr,
        'direction': direction,
        'actual_5h': actual_price_5h,
        'v1_pred': v1_price_5h,
        'v2_pred': v2_price_5h,
        'v1_error': v1_error,
        'v2_error': v2_error
    })
    
    if (test_num + 1) % 5 == 0:
        print(f'進度: [{test_num+1}/{num_tests}] V1誤差:{np.mean(v1_errors[-5:]):.2f}% V2誤差:{np.mean(v2_errors[-5:]):.2f}%')

print('\n' + '='*70)
print('測試完成')
print('='*70)

## Cell 5: 統計結果

In [None]:
print('\n' + '='*70)
print('整體統計結果')
print('='*70)

v1_mean = np.mean(v1_errors)
v2_mean = np.mean(v2_errors)
improvement = v1_mean - v2_mean
improvement_rate = (1 - v2_mean/v1_mean) * 100

print(f'\nV1 平均誤差: {v1_mean:.2f}%')
print(f'V2 平均誤差: {v2_mean:.2f}%')
print(f'平均改進: {improvement:+.2f}%')
print(f'改進率: {improvement_rate:+.1f}%')

print(f'\nV1 標準差: {np.std(v1_errors):.2f}%')
print(f'V2 標準差: {np.std(v2_errors):.2f}%')

print(f'\nV1 最小誤差: {np.min(v1_errors):.2f}%')
print(f'V1 最大誤差: {np.max(v1_errors):.2f}%')

print(f'\nV2 最小誤差: {np.min(v2_errors):.2f}%')
print(f'V2 最大誤差: {np.max(v2_errors):.2f}%')

# 計算 V2 優於 V1 的次數
v2_better_count = sum(1 for i in range(len(v1_errors)) if v2_errors[i] < v1_errors[i])
print(f'\nV2 優於 V1 的次數: {v2_better_count}/{num_tests} ({v2_better_count/num_tests*100:.1f}%)')

## Cell 6: 繪製對比圖表

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle('CPB 模型 V1 vs V2 對比分析', fontsize=16, fontweight='bold')

# 誤差箱線圖
ax1 = axes[0, 0]
ax1.boxplot([v1_errors, v2_errors], labels=['V1 (硬編碼)', 'V2 (動態波動率)'])
ax1.set_ylabel('預測誤差 (%)')
ax1.set_title('模型誤差分佈對比')
ax1.grid(True, alpha=0.3)

# 誤差趨勢
ax2 = axes[0, 1]
ax2.plot(range(1, len(v1_errors)+1), v1_errors, 'o-', label='V1', linewidth=2, markersize=4)
ax2.plot(range(1, len(v2_errors)+1), v2_errors, 's-', label='V2', linewidth=2, markersize=4)
ax2.set_xlabel('測試次數')
ax2.set_ylabel('預測誤差 (%)')
ax2.set_title('誤差趨勢')
ax2.legend()
ax2.grid(True, alpha=0.3)

# 誤差直方圖
ax3 = axes[1, 0]
ax3.hist(v1_errors, alpha=0.6, label='V1', bins=8)
ax3.hist(v2_errors, alpha=0.6, label='V2', bins=8)
ax3.set_xlabel('預測誤差 (%)')
ax3.set_ylabel('頻次')
ax3.set_title('誤差分佈直方圖')
ax3.legend()
ax3.grid(True, alpha=0.3, axis='y')

# 改進情況
ax4 = axes[1, 1]
improvements = [v1_errors[i] - v2_errors[i] for i in range(len(v1_errors))]
colors = ['green' if x > 0 else 'red' for x in improvements]
ax4.bar(range(1, len(improvements)+1), improvements, color=colors, alpha=0.7)
ax4.axhline(y=0, color='black', linestyle='-', linewidth=0.8)
ax4.set_xlabel('測試次數')
ax4.set_ylabel('改進幅度 (%)')
ax4.set_title('V2 相對於 V1 的改進 (正數=更好)')
ax4.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('model_comparison_v1_vs_v2.png', dpi=150, bbox_inches='tight')
plt.show()

print('圖表已保存為 model_comparison_v1_vs_v2.png')

## Cell 7: 保存結果

In [None]:
# 保存為 JSON
with open('model_comparison_results.json', 'w') as f:
    json.dump({
        'test_date': datetime.now().isoformat(),
        'num_tests': num_tests,
        'v1_avg_error': float(np.mean(v1_errors)),
        'v2_avg_error': float(np.mean(v2_errors)),
        'improvement': float(improvement),
        'improvement_rate': float(improvement_rate),
        'v1_std': float(np.std(v1_errors)),
        'v2_std': float(np.std(v2_errors)),
        'v2_better_count': v2_better_count,
        'detailed_results': results
    }, f, indent=2)

print('[OK] 結果已保存為 model_comparison_results.json')

## Cell 8: 下載文件到本地

在 Colab 中執行以下命令下載結果

In [None]:
from google.colab import files

print('[INFO] 準備下載文件...')

try:
    files.download('model_comparison_results.json')
    print('[OK] JSON 結果已下載')
except:
    print('[WARNING] JSON 下載失敗')

try:
    files.download('model_comparison_v1_vs_v2.png')
    print('[OK] 圖表已下載')
except:
    print('[WARNING] 圖表下載失敗')