<a href="https://colab.research.google.com/github/Loggo-MediCare/python-numpy-stock/blob/main/goog_success_20251128%E7%B7%9A%E6%80%A7%E5%9B%9E%E6%AD%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
# -*- coding: UTF-8 -*-
"""
================================================================================
策略名稱：多策略整合 (線性回歸 LR & 多因子ARIMAX)
目標：股價預測 (Stock Price Prediction)
================================================================================

本文件整合了兩種主要策略的數據下載和核心邏輯：
1. 技術指標融合策略 (MA + RSI + MACD)
2. 多因子 ARIMAX 策略 (計量模型)
3. **線性回歸系列模型 (LR, Lasso, ElasticNet)** (作為簡單模型基準)

策略週期：週線 (Weekly)
適用標的：GOOG (Alphabet Inc.)
================================================================================
"""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import warnings
import yfinance as yf # 數據下載庫

# ARIMAX 相關庫
import pandas_datareader.data as web # 用於下載 FRED 數據
from statsmodels.tsa.arima.model import ARIMA
from sklearn.linear_model import LinearRegression, Lasso, ElasticNet # 線性回歸系列
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold, cross_val_score

warnings.filterwarnings('ignore')

# 假設技術指標函數已在其他地方定義，這裡省略重複代碼
# 為了讓 WeeklyStrategy 運行，我們需要補齊前面定義的技術指標函數，但這裡暫時省略。

# =============================================================================
# 輔助函數 (技術指標) - 為了程式碼的完整性和運行性，將其從前面版本補齊
# =============================================================================
def calculate_MA(prices, period=10):
    prices = np.array(prices, dtype=float)
    ma = np.full(len(prices), np.nan)
    for i in range(period - 1, len(prices)):
        ma[i] = np.mean(prices[i - period + 1:i + 1])
    return ma

def calculate_EMA(prices, period=12):
    prices = np.array(prices, dtype=float)
    ema = np.full(len(prices), np.nan)
    multiplier = 2 / (period + 1)
    ema[period - 1] = np.mean(prices[:period])
    for i in range(period, len(prices)):
        ema[i] = (prices[i] - ema[i - 1]) * multiplier + ema[i - 1]
    return ema

def calculate_RSI(prices, period=14):
    prices = np.array(prices, dtype=float)
    rsi = np.full(len(prices), np.nan)
    deltas = np.diff(prices)
    gains = np.where(deltas > 0, deltas, 0)
    losses = np.where(deltas < 0, -deltas, 0)
    for i in range(period, len(prices)):
        avg_gain = np.mean(gains[i - period:i])
        avg_loss = np.mean(losses[i - period:i])
        if avg_loss == 0:
            rsi[i] = 100
        else:
            rs = avg_gain / avg_loss
            rsi[i] = 100 - (100 / (1 + rs))
    return rsi

def calculate_MACD(prices, fast_period=12, slow_period=26, signal_period=9):
    prices = np.array(prices, dtype=float)
    ema_fast = calculate_EMA(prices, fast_period)
    ema_slow = calculate_EMA(prices, slow_period)
    macd = ema_fast - ema_slow
    signal = np.full(len(prices), np.nan)
    valid_macd = macd[~np.isnan(macd)]
    if len(valid_macd) >= signal_period:
        signal_values = calculate_EMA(valid_macd, signal_period)
        start_idx = len(prices) - len(valid_macd) -1 # Correct the index to handle potential length mismatches for MACD signal
        if start_idx < len(prices):
            signal[start_idx:] = signal_values
        else:
            signal = np.full(len(prices), np.nan) # Ensure signal is NaN if not enough data
    return macd, signal, macd - signal # 為了避免錯誤，這裡簡化返回

def calculate_Bollinger_Bands(prices, period=20, num_std=2):
    prices = np.array(prices, dtype=float)
    middle = calculate_MA(prices, period)
    upper = np.full(len(prices), np.nan)
    lower = np.full(len(prices), np.nan)
    for i in range(period - 1, len(prices)):
        std = np.std(prices[i - period + 1:i + 1])
        upper[i] = middle[i] + num_std * std
        lower[i] = middle[i] - num_std * std
    return upper, middle, lower
# =============================================================================


# =============================================================================
# 多因子 ARIMAX/LR 策略核心類別 (維持不變)
# =============================================================================

class ARIMAXStrategy:

    def __init__(self, target_stock='GOOG'):
        self.target_stock = target_stock
        self.correlated_stocks = ['IBM', 'GOOGL']
        self.currency_pairs = ['DEXJPUS', 'DEXUSUK']
        self.indices = ['SP500', 'DJIA', 'VIXCLS']
        self.return_period = 5

    def load_data(self, start_date, end_date):
        """加载股票、货币和指数数据 (已修正多標的下載錯誤)"""
        print("正在加载 ARIMAX 所需的多因子數據...")

        stk_tickers = [self.target_stock] + self.correlated_stocks
        stk_data_multi_index = yf.download(stk_tickers, start=start_date, end=end_date, progress=False)

        ccy_data = web.DataReader(self.currency_pairs, 'fred', start_date, end_date)
        idx_data = web.DataReader(self.indices, 'fred', start_date, end_date)

        return stk_data_multi_index, ccy_data, idx_data


    def prepare_features(self, stk_data_multi_index, ccy_data, idx_data):
        """準備特徵變量和目標變量"""
        print("正在準備 ARIMAX/LR 特徵...")

        # Check if 'Adj Close' is available as the first level in the MultiIndex columns
        if 'Adj Close' in stk_data_multi_index.columns.get_level_values(0):
            stk_price_data = stk_data_multi_index['Adj Close'].copy()
        elif 'Close' in stk_data_multi_index.columns.get_level_values(0):
            print("警告: 'Adj Close' 資料可能不完整或不存在, 將使用 'Close' 價格。")
            stk_price_data = stk_data_multi_index['Close'].copy()
        else:
            raise ValueError("錯誤: 找不到 'Adj Close' 或 'Close' 價格數據。請檢查股票代碼或數據來源。")

        all_data = pd.concat([stk_price_data, ccy_data, idx_data], axis=1).ffill().dropna()

        Y = np.log(all_data.loc[:, self.target_stock]).diff(self.return_period).shift(-self.return_period)
        Y.name = f'{self.target_stock}_pred'

        X1 = np.log(all_data.loc[:, self.correlated_stocks]).diff(self.return_period)
        X2 = np.log(all_data.loc[:, self.currency_pairs]).diff(self.return_period)
        X3 = np.log(all_data.loc[:, self.indices]).diff(self.return_period)

        X4 = pd.concat([
            np.log(all_data.loc[:, self.target_stock]).diff(i)
            for i in [self.return_period, self.return_period*3,
                     self.return_period*6, self.return_period*12]
        ], axis=1)

        X4.columns = [f'{self.target_stock}_DT', f'{self.target_stock}_3DT',
                      f'{self.target_stock}_6DT', f'{self.target_stock}_12DT']

        X = pd.concat([X1, X2, X3, X4], axis=1)

        dataset = pd.concat([Y, X], axis=1).dropna().iloc[::self.return_period, :]
        Y_final = dataset.loc[:, Y.name]
        X_final = dataset.loc[:, X.columns]

        return X_final, Y_final, dataset

    def compare_models(self, X_train, Y_train, X_test, Y_test):
        """訓練並比較 LR, LASSO, EN 模型，並返回表現最佳者。"""
        print("-> 正在訓練 LR, LASSO, EN 模型...")

        models = [
            ('LR', LinearRegression()),
            ('LASSO', Lasso(random_state=42, max_iter=10000)),
            ('EN', ElasticNet(random_state=42, max_iter=10000))
        ]

        best_test_mse = float("inf")
        best_model_name = None
        best_regressor = None

        for name, model in models:
            try:
                model.fit(X_train, Y_train)
                test_pred = model.predict(X_test)
                test_mse = mean_squared_error(Y_test, test_pred)

                print(f'   - {name}: Test MSE={test_mse:.6f}')

                if test_mse < best_test_mse:
                    best_test_mse = test_mse
                    best_model_name = name
                    best_regressor = model
            except Exception as e:
                print(f'   - {name}: 訓練失敗 ({e})')
                continue

        if best_regressor:
            print(f"-> 最佳簡單模型: {best_model_name} (Test MSE: {best_test_mse:.6f})")

        return best_regressor, best_model_name, best_test_mse

    def train_arimax_model(self, X_train, Y_train, order=(1, 0, 0)):
        """訓練ARIMAX模型 (保留自您的代碼)"""
        print("-> 正在訓練 ARIMAX 模型...")

        exogenous_vars = [col for col in X_train.columns if not col.startswith(self.target_stock)]
        X_train_arima = X_train.loc[:, exogenous_vars]

        try:
            model = ARIMA(endog=Y_train, exog=X_train_arima, order=order)
            model_fit = model.fit()
            return model_fit, exogenous_vars
        except Exception as e:
            print(f"   - ARIMAX 模型訓練失敗: {e}")
            return None, exogenous_vars


    def run_simple_model_strategy(self):
        """執行 LR/ARIMAX 策略的完整流程"""
        TICKER = self.target_stock
        DATA_PERIOD_YEARS = 5
        end_date = datetime.now().strftime('%Y-%m-%d')
        start_date = (datetime.now() - timedelta(days=365 * DATA_PERIOD_YEARS)).strftime('%Y-%m-%d')

        print("\n" + "=" * 50)
        print(f"週三訓練：線性回歸與 ARIMAX ({TICKER})")
        print("=" * 50)

        stk_data, ccy_data, idx_data = self.load_data(start_date, end_date)

        X, Y, dataset = self.prepare_features(stk_data, ccy_data, idx_data)

        train_size = int(len(X) * 0.8)
        X_train, X_test = X.iloc[:train_size], X.iloc[train_size:]
        Y_train, Y_test = Y.iloc[:train_size], Y.iloc[train_size:]

        print(f"-> 訓練集大小: {len(X_train)} 週")

        best_regressor, best_name, best_mse = self.compare_models(X_train, Y_train, X_test, Y_test)

        final_model_arimax, arimax_exog_vars = self.train_arimax_model(X_train, Y_train, order=(2, 0, 1))

        if final_model_arimax:
            X_test_arima = X_test.loc[:, arimax_exog_vars]
            test_forecast_arimax = final_model_arimax.forecast(steps=len(X_test), exog=X_test_arima)
            arimax_test_mse = mean_squared_error(Y_test, test_forecast_arimax)
            print(f"-> ARIMAX (2,0,1) Test MSE: {arimax_test_mse:.6f}")
        else:
            arimax_test_mse = float('inf')


        current_features = X.iloc[-1:].fillna(0)

        if best_regressor:
            lr_prediction = best_regressor.predict(current_features)[0]
            print(f"\n-> 線性模型 ({best_name}) 下週預測回報率: {lr_prediction:.4f}")

        if final_model_arimax:
            # 確保 exog 數據與 ARIMAX 模型所需的特徵列一致
            arimax_prediction = final_model_arimax.forecast(steps=1, exog=current_features.loc[:, arimax_exog_vars])[0]
            print(f"-> ARIMAX 下週預測回報率: {arimax_prediction:.4f}")

        print("\n" + "=" * 50)
        print("週三訓練完成，模型已準備好進行週四過濾！")
        print("=" * 50)

# =============================================================================
# 技術指標策略類別 (已修改停利邏輯)
# =============================================================================

class WeeklyStrategy:
    """
    MA + RSI + MACD 綜合週策略 (已調整為趨勢停利)
    """

    def __init__(self,
                 ma_period=10, rsi_period=14, macd_fast=12, macd_slow=26, macd_signal=9,
                 stop_loss_pct=0.03, take_profit_pct=0.06, use_dynamic_tp=True): # 預設開啟動態停利

        self.ma_period = ma_period
        self.rsi_period = rsi_period
        self.macd_fast = macd_fast
        self.macd_slow = macd_slow
        self.macd_signal = macd_signal
        self.stop_loss_pct = stop_loss_pct
        self.take_profit_pct = take_profit_pct
        self.use_dynamic_tp = use_dynamic_tp # 新增：動態停利開關

        self.position = 0
        self.entry_price = 0
        self.trades = []
        self.total_profit = 0

    # 省略 calculate_indicators, execute_trade, backtest, get_performance_report 等函數
    # 為簡化，只顯示核心邏輯 generate_signal 的修改部分

    def calculate_indicators(self, data):
        """計算所有技術指標"""
        df = data.copy()

        close_prices = df['Close']
        df['MA'] = calculate_MA(close_prices.values, self.ma_period)
        df['RSI'] = calculate_RSI(close_prices.values, self.rsi_period)

        macd, signal, hist = calculate_MACD(close_prices.values, self.macd_fast, self.macd_slow, self.macd_signal)
        df['MACD'] = macd
        df['MACD_Signal'] = signal

        upper, middle, lower = calculate_Bollinger_Bands(close_prices.values, 20, 2)
        df['BB_Upper'] = upper
        df['BB_Middle'] = middle
        df['BB_Lower'] = lower

        return df

    def execute_trade(self, signal, price, time):
        """執行交易 - 簡化版本，確保程式碼運行"""

        if signal == 1:
            self.position = 1
            self.entry_price = price
        elif signal == -1:
            self.position = -1
            self.entry_price = price
        elif signal == -2:
            profit = price - self.entry_price
            self.total_profit += profit
            self.trades.append({'action': 'EXIT', 'profit': profit})
            self.position = 0
            self.entry_price = 0
        elif signal == 2:
            profit = self.entry_price - price
            self.total_profit += profit
            self.trades.append({'action': 'EXIT', 'profit': profit})
            self.position = 0
            self.entry_price = 0


    def backtest(self, data):
        """執行回測 - 簡化版本，確保程式碼運行"""

        self.position = 0
        self.entry_price = 0
        self.trades = []
        self.total_profit = 0

        df = self.calculate_indicators(data)

        for i in range(len(df)):
            signal = self.generate_signal(df, i)
            if signal != 0:
                time = df.index[i].strftime('%Y-%m-%d')
                price = df['Close'].iloc[i]
                self.execute_trade(signal, price, time)

        if self.position != 0:
            last_price = df['Close'].iloc[-1]
            if self.position == 1:
                profit = last_price - self.entry_price
            else:
                profit = self.entry_price - last_price
            self.total_profit += profit
            self.trades.append({'action': 'FORCED_EXIT', 'profit': profit})

        return self.get_performance_report()

    def get_performance_report(self):
        """生成績效報告 - 簡化版本，確保程式碼運行"""

        if not self.trades:
            return {'total_trades': 0, 'total_profit': 0, 'win_rate': 0, 'avg_profit': 0, 'max_profit': 0, 'max_loss': 0, 'profit_factor': 0}

        profits = [t.get('profit', 0) for t in self.trades]
        wins = [p for p in profits if p > 0]
        losses = [p for p in profits if p <= 0]
        total_loss = sum(losses)

        return {
            'total_trades': len(profits),
            'winning_trades': len(wins),
            'losing_trades': len(losses),
            'total_profit': self.total_profit,
            'win_rate': len(wins) / len(profits) * 100 if profits else 0,
            'profit_factor': abs(sum(wins) / total_loss) if total_loss != 0 else float('inf')
        }

    # **核心修改部分**
    def generate_signal(self, df, i):
        """
        生成交易訊號 (已修改為 動態停利 或 固定停利)
        """
        if i < 1:
            return 0

        current_price = df['Close'].iloc[i]
        prev_price = df['Close'].iloc[i-1]
        current_ma = df['MA'].iloc[i]
        prev_ma = df['MA'].iloc[i-1]

        if np.isnan(current_ma):
            return 0

        # --- 進場邏輯 (保持不變) ---
        if self.position == 0:
            # 這裡需要完整的進場邏輯 (RSI/MACD/MA 交叉)，但為簡化，只檢查 MA 交叉作為範例
            # 完整的 RSI/MACD 檢查應在實際運行中被加入
            if prev_price <= prev_ma and current_price > current_ma: return 1 # 買入
            if prev_price >= prev_ma and current_price < current_ma: return -1 # 賣出
            return 0

        # --- 出場邏輯 (加入動態停利) ---

        # 1. 停損檢查 (固定停損, 保持不變)
        if self.position == 1 and current_price <= self.entry_price * (1 - self.stop_loss_pct):
            return -2 # 多單停損
        if self.position == -1 and current_price >= self.entry_price * (1 + self.stop_loss_pct):
            return 2 # 空單停損

        # 2. 停利檢查 (根據 use_dynamic_tp 決定)
        if self.use_dynamic_tp:
            # 趨勢停利 (抓大波段/嘎空)：只看價格是否跌破趨勢線 (MA)

            # 多單出場條件：價格跌破 MA (趨勢反轉)
            if self.position == 1 and prev_price >= prev_ma and current_price < current_ma:
                return -2

            # 空單出場條件：價格突破 MA (趨勢反轉)
            if self.position == -1 and prev_price <= prev_ma and current_price > current_ma:
                return 2

        else:
            # 固定停利 (抓小波段)：達到 6.0% 就出場
            if self.position == 1 and current_price >= self.entry_price * (1 + self.take_profit_pct):
                return -2
            if self.position == -1 and current_price <= self.entry_price * (1 - self.take_profit_pct):
                return 2

        return 0


# 假設 WeeklyStrategy (技術指標部分) 和其他輔助函數存在

if __name__ == "__main__":
    # 這是週三執行的核心步驟

    # 範例 1：傳統策略 (固定停利 6.0%)
    print("--- 範例 1: 固定停利 (6.0%) ---")
    strategy_fixed = WeeklyStrategy(use_dynamic_tp=False)
    # strategy_fixed.backtest(華東數據) # 假設這是您跑出獲利因子 1.33 的結果

    # 範例 2：動態停利 (讓利潤奔跑)
    print("--- 範例 2: 動態停利 (MA 趨勢停利) ---")
    strategy_dynamic = WeeklyStrategy(use_dynamic_tp=True)
    # strategy_dynamic.backtest(華東數據) # 預期總損益會更高，勝率會降低

    # 執行 LR/ARIMAX 的訓練步驟 (作為週三主要任務)
    strategy_runner = ARIMAXStrategy(target_stock='GOOG') # 設定為華東
    strategy_runner.run_simple_model_strategy()

--- 範例 1: 固定停利 (6.0%) ---
--- 範例 2: 動態停利 (MA 趨勢停利) ---

週三訓練：線性回歸與 ARIMAX (GOOG)
正在加载 ARIMAX 所需的多因子數據...
正在準備 ARIMAX/LR 特徵...
警告: 'Adj Close' 資料可能不完整或不存在, 將使用 'Close' 價格。
-> 訓練集大小: 198 週
-> 正在訓練 LR, LASSO, EN 模型...
   - LR: Test MSE=0.002216
   - LASSO: Test MSE=0.002243
   - EN: Test MSE=0.002243
-> 最佳簡單模型: LR (Test MSE: 0.002216)
-> 正在訓練 ARIMAX 模型...
-> ARIMAX (2,0,1) Test MSE: 0.002232

-> 線性模型 (LR) 下週預測回報率: 0.0143
-> ARIMAX 下週預測回報率: 0.0067

週三訓練完成，模型已準備好進行週四過濾！
