# 完整獨立的 Crypto ML 訓練筆記本## 特點- ✓ 所有代碼都在 Cell 裡- ✓ 不依賴 GitHub- ✓ 訓練完畢後直接丟掉- ✓ GPU 加速 (Tesla T4/L4)- ✓ Binance US API (無地區限制)- ✓ 直接上傳 HuggingFace- ✓ 15-20 分鐘完成 20+ 幣種訓練

In [None]:
# ===== STEP 0: 環境檢測與依賴安裝 =====import subprocess
import sys
print('='*70)print('環境檢測與依賴安裝'.center(70))print('='*70)
# 安裝依賴!pip install -q ccxt torch scikit-learn pandas numpy huggingface-hub
# 導入模塊import ccxtimport torchimport torch.nn as nnimport torch.optim as optimimport pandas as pdimport numpy as npfrom sklearn.preprocessing import MinMaxScalerfrom datetime import datetimeimport jsonimport osimport warningswarnings.filterwarnings('ignore')
print('環境信息:')print(f'  Python 版本: {sys.version.split()[0]}')print(f'  PyTorch 版本: {torch.__version__}')print(f'  Pandas 版本: {pd.__version__}')print(f'  NumPy 版本: {np.__version__}')print(f'  GPU 可用: {torch.cuda.is_available()}')if torch.cuda.is_available():    print(f'  GPU 型號: {torch.cuda.get_device_name(0)}')    print(f'  GPU 內存: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')print(f'  使用設備: {device}\n')print('✓ 依賴安裝完成')

In [None]:
# ===== STEP 1: 掛載 Google Drive =====print('\n' + '='*70)print('Step 1: 掛載 Google Drive'.center(70))print('='*70 + '\n')from google.colab import drive
drive.mount('/content/drive')
# 建立工作目錄work_dir = '/content/drive/MyDrive/crypto_training'os.makedirs(work_dir, exist_ok=True)os.chdir(work_dir)
print(f'✓ 工作目錄: {os.getcwd()}')

In [None]:
# ===== STEP 2: 數據採集函數 =====print('\n' + '='*70)print('Step 2: 數據採集'.center(70))print('='*70 + '\n')def fetch_binance_data(coins, limit=500, timeframe='1h'):    """    從 Binance US 採集 K 線數據 (無地區限制)    """    exchange = ccxt.binanceus()    data_dict = {}
    for coin in coins:        try:            print(f'採集 {coin}...', end=' ')            klines = exchange.fetch_ohlcv(coin, timeframe, limit=limit)            df = pd.DataFrame(klines, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')            df = df.sort_values('timestamp').reset_index(drop=True)            data_dict[coin] = df            print(f'✓ ({len(df)} K線)')        except Exception as e:            print(f'✗ ({str(e)[:40]})')
    print(f'\n✓ 共採集 {len(data_dict)} 個幣種')
    return data_dict
# 採集 20+ 個主流幣種coins = [    'BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'ADA/USDT',    'DOGE/USDT', 'XRP/USDT', 'DOT/USDT', 'LINK/USDT', 'UNI/USDT',    'AVAX/USDT', 'MATIC/USDT', 'LTC/USDT', 'ETC/USDT', 'BCH/USDT',    'XLM/USDT', 'FIL/USDT', 'AXS/USDT', 'MANA/USDT', 'GRT/USDT',    'CRV/USDT']
data_dict = fetch_binance_data(coins, limit=500, timeframe='1h')
print(f'數據統計:')for coin, df in data_dict.items():    print(f'  {coin}: {len(df)} K線, ${df["close"].min():.4f} - ${df["close"].max():.4f}')

In [None]:
# ===== STEP 3: 特徵工程函數 =====print('\n' + '='*70)print('Step 3: 特徵工程'.center(70))print('='*70 + '\n')def engineer_features(df):    """    計算技術指標特徵    """    df = df.copy()
    # 基本特徵    df['price_change'] = df['close'].pct_change() * 100    df['volume_change'] = df['volume'].pct_change() * 100    df['hl_ratio'] = (df['high'] - df['low']) / df['close']    df['oc_ratio'] = (df['close'] - df['open']) / df['open']
    # 移動平均線 (SMA)    for period in [5, 10, 20]:        df[f'sma_{period}'] = df['close'].rolling(window=period).mean()
    # 指數移動平均線 (EMA)    for period in [5, 10]:        df[f'ema_{period}'] = df['close'].ewm(span=period, adjust=False).mean()
    # 相對強度指數 (RSI)    def calc_rsi(prices, period=14):        delta = prices.diff()        gain = delta.where(delta > 0, 0).rolling(window=period).mean()        loss = -delta.where(delta < 0, 0).rolling(window=period).mean()        rs = gain / (loss + 1e-10)        rsi = 100 - (100 / (1 + rs))        return rsi    df['rsi'] = calc_rsi(df['close'])
    # MACD    ema12 = df['close'].ewm(span=12, adjust=False).mean()    ema26 = df['close'].ewm(span=26, adjust=False).mean()    df['macd'] = ema12 - ema26    df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
    # 波動率    df['volatility'] = df['close'].rolling(window=20).std()
    # 成交量特徵    df['volume_ma'] = df['volume'].rolling(window=20).mean()    df['volume_ratio'] = df['volume'] / (df['volume_ma'] + 1e-10)
    # 目標變數: 7天後的價格變化    df['target'] = df['close'].pct_change(7) * 100
    # 移除 NaN    df = df.dropna()
    return df
# 處理所有數據print('計算特徵中...')processed_data = {}for coin, df in data_dict.items():    processed_data[coin] = engineer_features(df)    print(f'  {coin}: {len(processed_data[coin])} 筆訓練數據')
print('\n✓ 特徵工程完成')

In [None]:
# ===== STEP 4: LSTM 模型定義 =====print('\n' + '='*70)print('Step 4: 定義 LSTM 模型'.center(70))print('='*70 + '\n')class CryptoLSTM(nn.Module):    """    LSTM 時間序列模型    """    def __init__(self, input_size, hidden_size=128, num_layers=2, dropout=0.2):        super(CryptoLSTM, self).__init__()        self.lstm = nn.LSTM(            input_size=input_size,            hidden_size=hidden_size,            num_layers=num_layers,            batch_first=True,            dropout=dropout        )        self.fc1 = nn.Linear(hidden_size, 64)        self.fc2 = nn.Linear(64, 1)        self.relu = nn.ReLU()
    def forward(self, x):        lstm_out, _ = self.lstm(x)        last_hidden = lstm_out[:, -1, :]  # 最後一步的隱藏狀態        fc1_out = self.relu(self.fc1(last_hidden))        output = self.fc2(fc1_out)        return output
print('✓ LSTM 模型已定義')

In [None]:
# ===== STEP 5: 訓練函數 =====def create_sequences(X, y, lookback=30):    """    為 LSTM 創建時間序列    """    X_seq, y_seq = [], []    for i in range(len(X) - lookback):        X_seq.append(X[i:i+lookback])        y_seq.append(y[i+lookback])    return np.array(X_seq), np.array(y_seq)
def train_model(X_train, y_train, X_val, y_val, epochs=50, batch_size=16, device='cpu'):    """    訓練 LSTM 模型    """    input_size = X_train.shape[2]    model = CryptoLSTM(input_size=input_size, hidden_size=128, num_layers=2).to(device)    optimizer = optim.Adam(model.parameters(), lr=0.001)    criterion = nn.MSELoss()
    best_val_loss = float('inf')    patience = 10    patience_counter = 0
    # 轉換為 Tensor    X_train_tensor = torch.FloatTensor(X_train).to(device)    y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1).to(device)    X_val_tensor = torch.FloatTensor(X_val).to(device)    y_val_tensor = torch.FloatTensor(y_val).reshape(-1, 1).to(device)
    for epoch in range(epochs):        model.train()        train_loss = 0
        for i in range(0, len(X_train_tensor), batch_size):            X_batch = X_train_tensor[i:i+batch_size]            y_batch = y_train_tensor[i:i+batch_size]
            optimizer.zero_grad()            y_pred = model(X_batch)            loss = criterion(y_pred, y_batch)            loss.backward()            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)            optimizer.step()
            train_loss += loss.item()
        # 驗證        model.eval()        with torch.no_grad():            y_val_pred = model(X_val_tensor)            val_loss = criterion(y_val_pred, y_val_tensor).item()
        if (epoch + 1) % 10 == 0:            print(f'  Epoch {epoch+1}/{epochs}, Loss: {train_loss/len(X_train_tensor):.6f}, Val Loss: {val_loss:.6f}')
        # Early stopping        if val_loss < best_val_loss:            best_val_loss = val_loss            patience_counter = 0        else:            patience_counter += 1            if patience_counter >= patience:                print(f'  Early stopping at epoch {epoch+1}')                break
    return model
print('✓ 訓練函數已定義')

In [None]:
# ===== STEP 6: 訓練所有幣種 =====print('\n' + '='*70)print('Step 6: 訓練 LSTM 模型'.center(70))print('='*70 + '\n')models = {}results = {}
for coin, df in processed_data.items():    print(f'\n訓練 {coin}...')    
    # 選擇特徵    feature_cols = [col for col in df.columns if col not in ['timestamp', 'open', 'high', 'low', 'close', 'volume', 'target']]    X = df[feature_cols].values    y = df['target'].values
    # 正規化    scaler = MinMaxScaler()    X = scaler.fit_transform(X)    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)  # 處理 inf/nan
    # 分割訓練/驗證/測試 (60/20/20)    train_idx = int(len(X) * 0.6)    val_idx = int(len(X) * 0.8)
    X_train, X_val, X_test = X[:train_idx], X[train_idx:val_idx], X[val_idx:]    y_train, y_val, y_test = y[:train_idx], y[train_idx:val_idx], y[val_idx:]
    # 創建序列    X_train_seq, y_train_seq = create_sequences(X_train, y_train, lookback=30)    X_val_seq, y_val_seq = create_sequences(X_val, y_val, lookback=30)    X_test_seq, y_test_seq = create_sequences(X_test, y_test, lookback=30)
    # 訓練    model = train_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, epochs=50, device=device)
    # 評估    model.eval()    with torch.no_grad():        X_test_tensor = torch.FloatTensor(X_test_seq).to(device)        y_test_pred = model(X_test_tensor).cpu().numpy()
    # 計算指標    mse = np.mean((y_test_pred - y_test_seq.reshape(-1, 1)) ** 2)    rmse = np.sqrt(mse)    mae = np.mean(np.abs(y_test_pred - y_test_seq.reshape(-1, 1)))
    print(f'✓ 訓練完成')    print(f'  RMSE: {rmse:.6f}')    print(f'  MAE: {mae:.6f}')
    models[coin] = model    results[coin] = {'rmse': float(rmse), 'mae': float(mae), 'num_params': sum(p.numel() for p in model.parameters())}

In [None]:
# ===== STEP 7: 顯示訓練結果 =====print('\n' + '='*70)print('訓練結果總結'.center(70))print('='*70 + '\n')results_df = pd.DataFrame(results).Tprint(results_df)
print(f'\n統計數據:')print(f'  訓練幣種數: {len(results)}')print(f'  平均 RMSE: {results_df["rmse"].mean():.6f}')print(f'  平均 MAE: {results_df["mae"].mean():.6f}')print(f'  總模型參數: {results_df["num_params"].sum():,}')

In [None]:
# ===== STEP 8: 保存模型到 Google Drive =====print('\n' + '='*70)print('Step 8: 保存模型'.center(70))print('='*70 + '\n')model_dir = 'trained_models'os.makedirs(model_dir, exist_ok=True)
for coin, model in models.items():    model_path = os.path.join(model_dir, f'{coin.replace("/", "_")}.pt')    torch.save(model.state_dict(), model_path)    print(f'  ✓ {coin}: {model_path}')
# 保存訓練元數據metadata = {    'timestamp': datetime.now().isoformat(),    'source': 'Binance US',    'coins_trained': len(results),    'training_info': {        'epochs': 50,        'batch_size': 16,        'device': str(device),        'lookback': 30,        'train_test_split': '60/20/20'    },    'results': results}
metadata_path = os.path.join(model_dir, 'metadata.json')with open(metadata_path, 'w') as f:    json.dump(metadata, f, indent=2)
print(f'\n✓ 元數據已保存: {metadata_path}')print(f'✓ 所有模型已保存到: {os.path.abspath(model_dir)}')

In [None]:
# ===== STEP 9: (可選) 上傳到 HuggingFace =====print('\n' + '='*70)print('Step 9 (可選): 上傳到 HuggingFace'.center(70))print('='*70 + '\n')print('如需上傳到 HuggingFace，請執行以下代碼:')print('''
# 取消註解以下代碼# from huggingface_hub import HfApi, create_repo, notebook_login# 
# # 登入 HuggingFace# notebook_login()# 
# repo_name = 'crypto-lstm-models-20plus'# api = HfApi()# 
# try:#     create_repo(repo_name, exist_ok=True, private=False)#     print(f'✓ Repo 已建立: {repo_name}')# except:#     print(f'✓ Repo 已存在: {repo_name}')# 
# # 上傳模型# for model_file in os.listdir(model_dir):#     file_path = os.path.join(model_dir, model_file)#     api.upload_file(#         path_or_fileobj=file_path,#         path_in_repo=model_file,#         repo_id=f'<YOUR_HF_USERNAME>/{repo_name}',#         repo_type='model'#     )#     print(f'  ✓ 已上傳: {model_file}')# 
# print(f'✓ 所有模型已上傳!')# print(f'URL: https://huggingface.co/<YOUR_HF_USERNAME>/{repo_name}')''')

In [None]:
# ===== 最終總結 =====print('\n' + '='*70)print('訓練完成！'.center(70))print('='*70 + '\n')print(f'✓ 訓練了 {len(models)} 個幣種')print(f'✓ 所有模型已保存到 Google Drive')print(f'✓ 位置: {os.path.abspath(model_dir)}')
print('下一步:')print('  1. 所有訓練代碼已在此 Colab 中完成')print('  2. 模型已保存到 Google Drive，可供後續使用')print('  3. 如需上傳到 HF，執行 Step 9')print('  4. 此筆記本訓練完成後可直接丟掉')
print('='*70)