In [1]:
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.optimize import minimize


In [None]:
# 計算組合績效：期望回報和風險
def portfolio_performance(weights, mean_returns, cov_matrix):
    returns = np.dot(weights, mean_returns)
    std = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
    return returns, std

In [None]:
# 最小化風險以找到給定回報的最佳組合
def minimize_volatility(weights, mean_returns, cov_matrix, target_return):
    constraints = ({'type': 'eq', 'fun': lambda x: portfolio_performance(x, mean_returns, cov_matrix)[0] - target_return},
                   {'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
    bounds = tuple((0, 1) for _ in range(len(mean_returns)))
    result = minimize(lambda x: portfolio_performance(x, mean_returns, cov_matrix)[1], weights,
                      method='SLSQP', bounds=bounds, constraints=constraints)
    return result

In [None]:
# 繪製效率前緣
def efficient_frontier(mean_returns, cov_matrix, num_portfolios=100):
    results = {'returns': [], 'volatility': []}
    weights = np.array([1 / len(mean_returns)] * len(mean_returns))
    target_returns = np.linspace(mean_returns.min(), mean_returns.max(), num_portfolios)

    for target_return in target_returns:
        optimized_result = minimize_volatility(weights, mean_returns, cov_matrix, target_return)
        if optimized_result.success:
            ret, vol = portfolio_performance(optimized_result.x, mean_returns, cov_matrix)
            results['returns'].append(ret)
            results['volatility'].append(vol)

    return results

In [None]:
# 使用滾動視窗計算效率前緣
def rolling_efficient_frontier(stock_symbols, window=120, start_date='2017-01-01', end_date='2023-01-01'):
    # 1. 下載所有股票數據並計算日回報率
    data = pd.DataFrame()
    for symbol in stock_symbols:
        stock_data = yf.download(symbol, start=start_date, end=end_date)['Adj Close'].pct_change().dropna()
        stock_data.name = symbol
        data = pd.concat([data, stock_data], axis=1)

    # 2. 滾動計算效率前緣並繪製一張代表圖形
    rolling_windows = data.rolling(window=window)
    mean_returns_list = []
    cov_matrix_list = []
    for i in range(window, len(data)):
        rolling_data = data.iloc[i - window:i]
        mean_returns = rolling_data.mean()
        cov_matrix = rolling_data.cov()
        mean_returns_list.append(mean_returns)
        cov_matrix_list.append(cov_matrix)

    # 只畫最後一個rolling的效率前緣
    if mean_returns_list and cov_matrix_list:
        results = efficient_frontier(mean_returns_list[-1], cov_matrix_list[-1])
        plt.figure(figsize=(10, 6))
        plt.plot(results['volatility'], results['returns'], marker='o')
        plt.xlabel('Volatility (Risk)')
        plt.ylabel('Expected Return')
        plt.title('Efficient Frontier (Representative)')
        plt.grid(True)
        plt.show()

In [None]:
# 計算並繪製效率前緣
rolling_efficient_frontier(['2330.TW', '2317.TW', '2603.TW'])