In [2]:
import numpy as np
import pandas as pd
import yfinance as yf
import statsmodels.api as sm
import cvxpy as cp

In [3]:
etfs = ['FXE', 'EWJ', 'GLD', 'QQQ', 'SPY', 'SHV', 'DBA', 'USO', 'XBI', 'ILF', 'EPP', 'FEZ']
etf_data = yf.download(etfs, start='2007-03-01', end='2024-12-31')['Close']


etf_returns = etf_data.pct_change().dropna()

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  12 of 12 completed


In [5]:
ff_factors = pd.read_csv('F-F_Research_Data_Factors_daily.CSV', skiprows=3, index_col=0)
ff_factors.dropna(inplace=True)
ff_factors.tail()
ff_factors = ff_factors.loc['2007-03-01':]
ff_factors.head()


Unnamed: 0,Mkt-RF,SMB,HML,RF
20070103,-0.04,0.0,0.18,0.022
20070104,0.16,0.22,-0.49,0.022
20070105,-0.73,-0.94,-0.28,0.022
20070108,0.24,-0.07,0.05,0.022
20070109,0.0,0.3,-0.3,0.022


In [6]:
ff_factors.tail()

Unnamed: 0,Mkt-RF,SMB,HML,RF
20241224,1.11,-0.09,-0.05,0.017
20241226,0.02,1.04,-0.19,0.017
20241227,-1.17,-0.66,0.56,0.017
20241230,-1.09,0.12,0.74,0.017
20241231,-0.46,0.0,0.71,0.017


In [7]:
etf_returns = etf_data.pct_change().dropna()

# Convert ff_factors index to datetime if it's not already
ff_factors.index = pd.to_datetime(ff_factors.index, format='%Y%m%d')

# Align the data
etf_returns = etf_returns.reindex(ff_factors.index).dropna()
ff_factors = ff_factors.reindex(etf_returns.index).dropna()

# Create merged data
merged_data = pd.concat([etf_returns, ff_factors], axis=1)

# Run regression for each ETF using only the market factor
X = sm.add_constant(merged_data['Mkt-RF'])

for etf in etf_returns.columns:
    y = merged_data[etf]
    # Run regression
    model = sm.OLS(y, X).fit()
    # Calculate market risk premium
    beta = model.params['Mkt-RF']
    avg_market_return = merged_data['Mkt-RF'].mean()
    market_risk_premium = beta * avg_market_return
    market_risk_premiums.loc[etf, 'Market Risk Premium'] = market_risk_premium

# Display results
print("Market Risk Premiums for each ETF:")
print(market_risk_premiums.round(4))

# Display the betas and average market return for reference
print("\nMarket Betas:")
betas = pd.DataFrame(index=etf_returns.columns, columns=['Beta'])
for etf in etf_returns.columns:
    y = merged_data[etf]
    model = sm.OLS(y, X).fit()
    betas.loc[etf, 'Beta'] = model.params['Mkt-RF']
print(betas.round(4))


# %%



NameError: name 'market_risk_premiums' is not defined


$$
\text{(Strategy I)}
\begin{cases}
\underset{\omega\in\mathbb{R}^n}{\max\;}\rho^T \omega-\lambda\sqrt{\omega^T \Sigma\omega}\\
-0.5\leq\sum_{i=1}^{n}\beta_i^m\omega_i\leq 0.5\\
\sum_{i=1}^{n}\omega_i=1,\; -2\leq\omega_i\leq 2,
\end{cases}
$$

In [None]:
def strategy_I(expectedReturns, covMatrix, f)

In [8]:
factor_loadings_df = pd.DataFrame({
    'Alpha': [0.02, 0.01, 0.015, 0.018, 0.016, 0.013, 0.014, 0.017, 0.011, 0.012, 0.013, 0.014],
    'Mkt-RF': [1.0, 1.2, 1.1, 1.3, 1.1, 1.0, 1.2, 1.3, 1.0, 1.2, 1.1, 1.3],
    'SMB': [0.3, 0.2, 0.25, 0.28, 0.26, 0.24, 0.25, 0.27, 0.22, 0.23, 0.25, 0.26],
    'HML': [0.2, 0.15, 0.18, 0.19, 0.17, 0.16, 0.18, 0.19, 0.15, 0.17, 0.18, 0.19]
})

In [9]:
def tracking_error_volatility(weights, returns_data, benchmark_returns):
    """Calculate tracking error volatility between portfolio and benchmark returns"""
    portfolio_returns = returns_data @ weights
    tracking_error = portfolio_returns - benchmark_returns
    return np.sqrt(np.var(tracking_error))

def optimize_strategy_I(expected_returns, cov_matrix, factor_loadings, beta_constraints, lambd):
    """
    Optimize portfolio weights using Strategy I approach
    
    Parameters:
    - expected_returns: Expected returns for each asset
    - cov_matrix: Covariance matrix of returns
    - factor_loadings: DataFrame of factor exposures
    - beta_constraints: Tuple of (min_beta, max_beta)
    - lambd: Risk aversion parameter
    """
    n = len(expected_returns)
    
    # Define optimization variable
    w = cp.Variable(n)
    
    # Ensure covariance matrix is symmetric
    cov_matrix = (cov_matrix + cov_matrix.T) / 2
    
    # Define objective components
    portfolio_return = expected_returns @ w
    portfolio_risk = cp.quad_form(w, cov_matrix)
    portfolio_beta = factor_loadings['Mkt-RF'].values @ w
    
    # Define constraints
    constraints = [
        cp.sum(w) == 1,                    # Full investment
        portfolio_beta >= beta_constraints[0],  # Min beta
        portfolio_beta <= beta_constraints[1],  # Max beta
        w >= -2,                           # Long/short constraints
        w <= 2
    ]
    
    # Solve optimization problem
    objective = cp.Maximize(portfolio_return - lambd * cp.norm(portfolio_risk, 2))
    prob = cp.Problem(objective, constraints)
    prob.solve()
    
    return w.value

def optimize_strategy_II(expected_returns, returns_data, factor_loadings, beta_constraints, lambd, benchmark_returns):
    """
    Optimize portfolio weights using Strategy II approach
    
    Parameters:
    - expected_returns: Expected returns for each asset
    - returns_data: Historical returns data
    - factor_loadings: DataFrame of factor exposures  
    - beta_constraints: Tuple of (min_beta, max_beta)
    - lambd: Risk aversion parameter
    - benchmark_returns: Returns of benchmark portfolio
    """
    n = len(expected_returns)
    beta_values = factor_loadings['Mkt-RF'].values
    
    def objective(weights):
        portfolio_return = expected_returns @ weights
        tracking_vol = tracking_error_volatility(weights, returns_data, benchmark_returns)
        return -(portfolio_return - lambd * tracking_vol)
    
    # Define constraints
    constraints = [
        {'type': 'eq', 'fun': lambda w: np.sum(w) - 1},  # Full investment
        {'type': 'ineq', 'fun': lambda w: beta_constraints[1] - np.sum(w * beta_values)},  # Max beta
        {'type': 'ineq', 'fun': lambda w: np.sum(w * beta_values) - beta_constraints[0]},  # Min beta
        {'type': 'ineq', 'fun': lambda w: 2 - w},  # Upper bound
        {'type': 'ineq', 'fun': lambda w: w + 2}   # Lower bound
    ]
    
    # Set bounds and initial guess
    bounds = [(-2, 2) for _ in range(n)]
    initial_weights = np.ones(n) / n
    
    # Solve optimization
    result = minimize(objective, initial_weights, bounds=bounds, constraints=constraints)
    return result.x