In [61]:
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import requests
import json
from datetime import datetime, timedelta
from tqdm import tqdm

In [62]:
def convert_minguo_to_ad(date_str):
    parts = date_str.split('/')
    year = int(parts[0]) + 1911
    return f"{year}/{parts[1]}/{parts[2]}"

def fetch_tpex_stock_data(stock_code, date):
    url = "https://www.tpex.org.tw/www/zh-tw/afterTrading/tradingStock"
    headers = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"
    }
    payload = {
        "code": stock_code,
        "date": date,
        "id": "",
        "response": "json"
    }
    res = requests.post(url, headers=headers, data=payload)
    data = json.loads(res.text)

    rows = data["tables"][0]["data"]
    columns = data["tables"][0]["fields"]

    df = pd.DataFrame(rows, columns=columns)

    df[["開盤", "最高", "最低", "收盤"]] = df[["開盤", "最高", "最低", "收盤"]].apply(pd.to_numeric, errors='coerce')
    df = df[["日 期", "開盤", "最高", "最低", "收盤", "成交仟股"]]
    df.rename(columns={
        "日 期": "Date",
        "開盤": "Open",
        "最高": "High",
        "最低": "Low",
        "收盤": "Close",
        "成交仟股": "Volume"
    }, inplace=True)
    df = df.replace(',', '', regex=True)
    df["Date"] = df["Date"].apply(convert_minguo_to_ad)
    df["Date"] = pd.to_datetime(df["Date"], format="%Y/%m/%d")

    return df

def fetch_twse_monthly_data(stock_id, year, month, fallback_to_tpex = True):    
    url = "https://www.twse.com.tw/exchangeReport/STOCK_DAY"
    params = {
        "response": "json",
        "date": f"{year}{month:02}01",
        "stockNo": stock_id
    }

    r = requests.get(url, params = params)
    r.encoding = 'utf-8-sig'
    data = r.json()

    if data['stat'] != 'OK':
        if fallback_to_tpex:
            date = f"{year}/{month:02}/01"
            return fetch_tpex_stock_data(stock_id, date)
        return None

    df = pd.DataFrame(data['data'], columns = data['fields'])

    df['Date'] = df['日期'].apply(convert_minguo_to_ad)
    df['Date'] = pd.to_datetime(df['Date'], format='%Y/%m/%d')
    df.drop(columns=['日期'], inplace=True)
    df = df.rename(columns={
        '日期': 'Date',
        '開盤價': 'Open',
        '最高價': 'High',
        '最低價': 'Low',
        '收盤價': 'Close',
        '成交股數': 'Volume'
    })
    df = df[['Date', 'Open', 'High', 'Low', 'Close', 'Volume']]
    df = df.replace(',', '', regex = True)
    df[['Open', 'High', 'Low', 'Close', 'Volume']] = df[['Open', 'High', 'Low', 'Close', 'Volume']].apply(pd.to_numeric, errors='coerce')

    return df


def get_tw_stock_by_twse(stock_id, start_date, end_date):
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")

    all_data = []
    current = start.replace(day = 1)

    while current <= end:
        df = fetch_twse_monthly_data(stock_id, current.year, current.month)
        if df is not None:
            df['Date'] = pd.to_datetime(df['Date']) 
            df = df[(df['Date'] >= start) & (df['Date'] <= end)]
            all_data.append(df)
        current += timedelta(days=32)
        current = current.replace(day=1)

    result = pd.concat(all_data).sort_values('Date').set_index('Date')
    return result

In [63]:
def calculate_weights(leverage_ratio, leverage_factors: list):
    """
    根據目標槓桿比、標的槓桿倍數計算權重
    leverage_ratio: 目標槓桿比
    leverage_factors: 標的槓桿倍數
    """
    f1, f2 = leverage_factors

    w1 = (f2 - leverage_ratio) / (f2 - f1)
    w2 = (leverage_ratio - f1) / (f2 - f1)

    if not (0 <= w1 <= 1 and 0 <= w2 <= 1):
        raise ValueError("Weights are out of bounds [0, 1]")
    
    return [w1, w2]

In [64]:
def dynamic_models(prices, alpha_ewma = 0.05):
    """
    用EWMA概念處理動態模型
    prices: 標的資產的價格序列
    alpha_ewma: EWMA 的平滑參數
    """

    returns = prices.pct_change().dropna()

    ewma_mean = returns.ewm(alpha = alpha_ewma, adjust = False).mean()
    ewma_cov = returns.ewm(alpha = alpha_ewma, adjust = False).cov()

    return ewma_mean, ewma_cov

In [65]:
def simulate_portfolio(prices, target_weights, initial_capital, cost, ewma_mean: pd.DataFrame, ewma_cov: pd.DataFrame, num_days, rebalance_threshold):
    """
    模擬 portfolio 表現
    prices: 標的資產的價格序列
    target_weights: 目標權重
    initial_capital: 初始資本
    cost: 交易成本
    ewma_mean: EWMA 計算的序列 mean
    dcc_garch_results: DCC-Garch 模型的結果
    num_days: 模擬天數
    rebalance_threshold: 再平衡的閾值
    """
    num_assets = len(target_weights)
    price_simulate = pd.DataFrame(index = range(num_days + 1), columns = prices.columns)
    price_simulate.iloc[0] = prices.iloc[-1]

    holdings = initial_capital * np.array(target_weights) / price_simulate.iloc[0]
    portfolio_value = [initial_capital]
    num_ewma_point = len(ewma_mean)
    rebalance_count = 0
    error_count = 0

    for t in range(num_days):
        # 取出對應的 mean
        ewma_idx = min(t, num_ewma_point - 1)
        current_mean = ewma_mean.iloc[ewma_idx].values

        # 得到目前的 EWMA cov_metrix
        current_cov_metrix = np.eye(num_assets) * 1e-6
        try:
            if ewma_idx < len(ewma_cov.index.levels[0]):
                current_date = ewma_cov.index.levels[0][ewma_idx]
                cov_slice = ewma_cov.loc[current_date]

                current_cov_metrix = np.zeros((num_assets, num_assets))
                for r_idx, row_asset in enumerate(prices.columns):
                    for c_idx, col_asset in enumerate(prices.columns):
                        if (row_asset, col_asset) in cov_slice.index:
                            current_cov_metrix[r_idx, c_idx] = cov_slice.loc[(row_asset, col_asset)]
                        elif (col_asset, row_asset) in cov_slice.index:
                            current_cov_metrix[r_idx, c_idx] = cov_slice.loc[(col_asset, row_asset)]

            if not np.all(np.linalg.eigvals(current_cov_metrix) > 0):
                # 若矩陣不是正定，則添加最小值讓他正
                current_cov_metrix = current_cov_metrix + np.eye(num_assets) * 1e-6

        except Exception as e:
            # 預測失敗就用 fallback 的 cov_metrix，也就是不動他
            error_count += 1

        # 生成隨機報酬
        try:
            rand_returns = np.random.multivariate_normal(current_mean, current_cov_metrix, 1)
            daily_returns = pd.Series(rand_returns, index = prices.columns)
        except np.linalg.LinAlgError as e:
            daily_returns = pd.Series(np.zeros(num_assets), index = prices.columns)
            error_count += 1

        # 更新價格
        price_simulate.iloc[t + 1] = price_simulate.iloc[t] * (1 + daily_returns)

        # 計算現在的 portfolio 的價值、權重
        current_portfolio_value = np.sum(holdings * price_simulate.iloc[t + 1])
        current_weights = (holdings * price_simulate.iloc[t + 1]) / current_portfolio_value

        # 檢查是否需要再平衡
        for i in range(num_assets):
            if abs(current_weights.iloc[i] - target_weights[i]) > rebalance_threshold:
                target_portfolio_value = current_portfolio_value
                target_holdings = target_portfolio_value * np.array(target_weights) / price_simulate.iloc[t + 1]

                trade_volume = np.abs(target_holdings - holdings) * price_simulate.iloc[t + 1]
                transaction_cost = trade_volume.sum() * cost
                current_portfolio_value -= transaction_cost

                holdings = target_holdings
                rebalance_count += 1
                break
        
        portfolio_value.append(current_portfolio_value)
    
    return pd.Series(portfolio_value, index = price_simulate.index), rebalance_count, error_count

In [66]:
def evaluate_threshold(prices, leverage_ratio, leverage_factors: list, cost, threshold, num_simulations = 1000, num_days = 42, initial_capital = 10000):
    """
    開始模擬
    prices: 標的資產的價格序列
    leverage_ratio: 目標槓桿比
    leverage_factors: 標的槓桿倍數
    cost: 交易成本
    threshold: 再平衡的閾值
    num_simulations: 模擬次數
    num_days: 模擬天數
    initial_capital: 初始資本
    """
    final_value = []
    total_rebalances = 0
    total_error_count = 0
    target_weights = calculate_weights(leverage_ratio, leverage_factors)

    ewma_mean, ewma_cov_metrix = dynamic_models(prices)
    if ewma_mean is None or ewma_cov_metrix is None:
        print("啪，沒了，dynamic model 壞了")
        return np.nan, np.nan
    
    for _ in tqdm(range(num_simulations), desc = f"Simulate threshold {threshold: .2f}"):
        simulated_portfolio, rebalances, error_count = simulate_portfolio(
            prices,
            target_weights,
            initial_capital,
            cost,
            ewma_mean,
            ewma_cov_metrix,
            num_days,
            threshold
        )
        final_value.append(simulated_portfolio.iloc[-1])
        total_rebalances += rebalances
        total_error_count += error_count

    avg_final_value = np.mean(final_value) if final_value else np.nan
    avg_rebalances = total_rebalances / num_simulations if num_simulations > 0 else np.nan
    avg_error_count = total_error_count / num_simulations

    print(f"Average forecast erroes : {avg_error_count}")
    return avg_final_value, avg_rebalances

In [67]:
security1 = "0050"
security2 = "00631L"
end_date = datetime.now()
start_date = end_date - timedelta(days = 365 * 2)

df1 = get_tw_stock_by_twse(security1, '2024-01-01', '2025-04-30')
df2 = get_tw_stock_by_twse(security2, '2024-01-01', '2025-04-30')

print(f"Fetch data for {security1} and {security2}")

prices = pd.concat([df1['Close'], df2['Close']], axis = 1)
prices.dropna(inplace = True)
prices.columns = [security1, security2]

Fetch data for 0050 and 00631L


In [68]:
testing_thresholds = np.arange(0.01, 0.1, 0.01)
leverage_factors = [1, 2]
leverage_ratio = 1.3
cost = 0.001785 # 單邊手續費 0.000285 + 證交稅 0.003
                # 0.5 * vol *0.000285 (buy) + 0.5 * vol * 0.003285 (sell) = 0.00357 / 2 * vol = 0.001785 * vol
results = {}
for threshold in testing_thresholds:
    avg_final_value, avg_rebalances = evaluate_threshold(
        prices,
        leverage_ratio,
        leverage_factors,
        cost,
        threshold,
        num_simulateions = 500,
        nnum_days = 42
    )
    results[threshold] = {
        "avg_final_value": avg_final_value,
        "avg_rebalances": avg_rebalances
    }

df_results = pd.DataFrame.from_dict(results, orient = "index")
print("Simulate results\n", df_results)

# plot
fig, ax1 = plt.subplots(figsize = (16, 6))

color = "tab:red"
ax1.set_xlabel("Rebalance threshold")
ax1.set_ylabel("Avg final value", color = color)
ax1.plot(df_results.index, df_results["avg_final_value"], color = color, marker = "o")
ax1.tick_params(axis = "y", labelcolor = color)

ax2 = ax1.twinx()

color = "tab:blue"
ax2.set_ylabel("Number of rebalances", color = color)
ax2.plot(df_results.index, df_results["avg_rebalances"], color = color, marker = "x")
ax2.tick_params(axis = "y", labelcolor = color)

fig.tight_layout()
plt.title("Simulation with Dynamic Models")
plt.grid(True)
plt.show()

best_threshold = df_results["avg_final_value"].idxmax()
print(f"Best threshold: {best_threshold}")

TypeError: evaluate_threshold() got an unexpected keyword argument 'num_simulateions'