# Real-World Applications of the MFE Toolbox

This notebook demonstrates integrated applications of the MFE Toolbox for solving real-world financial econometrics problems. We'll combine multiple techniques into complete analytical workflows for practical financial applications.

## Case Studies

1. **Portfolio Risk Management**: Combining multivariate volatility modeling with Value-at-Risk estimation
2. **Market Regime Analysis**: Using bootstrap methods to identify and analyze different market regimes
3. **Volatility Forecasting for Options Trading**: Integrating GARCH models with realized volatility measures
4. **Stress Testing and Scenario Analysis**: Simulating extreme market conditions using multivariate models
5. **Factor Model Construction**: Building and validating statistical factor models

These case studies demonstrate how to combine multiple components of the MFE Toolbox to solve complex financial problems using modern Python-based workflows.

## Setup and Imports

First, let's import the necessary modules and set up our environment.

In [None]:
# Standard imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional, Union, Any
import asyncio
import datetime
import warnings
from scipy import stats
import os
from pathlib import Path

# MFE Toolbox imports
import mfe
# Univariate volatility models
from mfe.models.univariate import GARCH, EGARCH, TARCH, AGARCH, APARCH, FIGARCH, HEAVY, IGARCH
# Multivariate volatility models
from mfe.models.multivariate import DCC, BEKK, CCC, OGARCH, RARCH, RCC, RiskMetrics
# Bootstrap methods
from mfe.models.bootstrap import BlockBootstrap, StationaryBootstrap, ModelConfidenceSet, BSDS
# Time series models
from mfe.models.time_series import ARMA
# Realized volatility models
from mfe.models.realized import RealizedVariance, BiPowerVariation, RealizedKernel
# Statistical distributions
from mfe.models.distributions import Normal, StudentT, GED, SkewedT
# Utility functions
from mfe.utils.data_transformations import returns_from_prices
from mfe.utils.matrix_ops import cov2corr

# Configure plotting
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_context("notebook", font_scale=1.2)
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['figure.dpi'] = 100

# Suppress warnings
warnings.filterwarnings('ignore')

# Display version information
print(f"MFE Toolbox version: {mfe.__version__}")

## Data Loading and Preparation

We'll create a utility function to load financial market data from various sources. This function will handle both external data sources and synthetic data generation for demonstration purposes.

In [None]:
def load_market_data(tickers: List[str], 
                     start_date: str = "2015-01-01", 
                     end_date: Optional[str] = None,
                     data_source: str = "yahoo",
                     use_synthetic: bool = False) -> pd.DataFrame:
    """
    Load market data for multiple tickers from various sources.
    
    Parameters
    ----------
    tickers : List[str]
        List of ticker symbols to load
    start_date : str
        Start date in YYYY-MM-DD format
    end_date : str, optional
        End date in YYYY-MM-DD format, defaults to current date
    data_source : str
        Source of data: 'yahoo', 'fred', 'csv', etc.
    use_synthetic : bool
        If True, generate synthetic data instead of loading from source
        
    Returns
    -------
    pd.DataFrame
        DataFrame containing adjusted close prices for all tickers
    """
    # If synthetic data is requested, generate it
    if use_synthetic:
        return create_synthetic_market_data(tickers, start_date, end_date)
    
    try:
        import pandas_datareader as pdr
        
        # Set end date to today if not provided
        if end_date is None:
            end_date = datetime.datetime.now().strftime("%Y-%m-%d")
            
        # Initialize DataFrame to store results
        all_data = pd.DataFrame()
        
        # Download data for each ticker
        for ticker in tickers:
            try:
                if data_source.lower() == 'yahoo':
                    data = pdr.get_data_yahoo(ticker, start=start_date, end=end_date)
                    all_data[ticker] = data['Adj Close']
                elif data_source.lower() == 'fred':
                    data = pdr.get_data_fred(ticker, start=start_date, end=end_date)
                    all_data[ticker] = data
                else:
                    raise ValueError(f"Unsupported data source: {data_source}")
                    
                print(f"Downloaded data for {ticker}: {len(data)} days")
            except Exception as e:
                print(f"Error downloading data for {ticker}: {e}")
        
        # Check if we got any data
        if all_data.empty:
            print("No data downloaded. Using synthetic data instead.")
            return create_synthetic_market_data(tickers, start_date, end_date)
        
        return all_data
    except ImportError:
        print("pandas-datareader not installed. Please install with: pip install pandas-datareader")
        print("Using synthetic data instead.")
        return create_synthetic_market_data(tickers, start_date, end_date)
    except Exception as e:
        print(f"Error loading data: {e}")
        print("Using synthetic data instead.")
        return create_synthetic_market_data(tickers, start_date, end_date)

def create_synthetic_market_data(tickers: List[str], 
                                start_date: str = "2015-01-01", 
                                end_date: Optional[str] = None,
                                include_regimes: bool = True) -> pd.DataFrame:
    """
    Create synthetic market data for multiple assets with realistic properties.
    
    Parameters
    ----------
    tickers : List[str]
        List of ticker symbols to simulate
    start_date : str
        Start date in YYYY-MM-DD format
    end_date : str, optional
        End date in YYYY-MM-DD format, defaults to current date
    include_regimes : bool
        If True, include distinct market regimes in the data
        
    Returns
    -------
    pd.DataFrame
        DataFrame containing synthetic price data for all tickers
    """
    # Parse dates
    start = pd.to_datetime(start_date)
    if end_date is None:
        end = pd.to_datetime(datetime.datetime.now().strftime("%Y-%m-%d"))
    else:
        end = pd.to_datetime(end_date)
    
    # Create date range (business days only)
    dates = pd.date_range(start=start, end=end, freq='B')
    
    # Set random seed for reproducibility
    np.random.seed(42)
    
    # Number of assets and days
    n_assets = len(tickers)
    n_days = len(dates)
    
    # Create correlation matrix with realistic correlations
    # We'll use a factor model approach to ensure positive definiteness
    n_factors = 3  # Market, size, value factors for example
    factor_loadings = np.random.uniform(0.3, 0.9, size=(n_assets, n_factors))
    
    # Create correlation matrix from factor loadings
    corr_matrix = factor_loadings @ factor_loadings.T
    # Add idiosyncratic variance to ensure diagonal of 1s
    for i in range(n_assets):
        corr_matrix[i, i] = 1.0
    
    # Ensure it's a valid correlation matrix
    # Make it symmetric
    corr_matrix = (corr_matrix + corr_matrix.T) / 2
    # Normalize to ensure diagonal is 1
    d = np.sqrt(np.diag(corr_matrix))
    corr_matrix = corr_matrix / np.outer(d, d)
    
    # Create volatilities for each asset (annualized)
    asset_vols = np.random.uniform(0.15, 0.35, size=n_assets)  # 15% to 35% annual vol
    
    # Convert to daily volatility
    daily_vols = asset_vols / np.sqrt(252)
    
    # Create covariance matrix
    cov_matrix = np.diag(daily_vols) @ corr_matrix @ np.diag(daily_vols)
    
    # Generate correlated returns
    # We'll use Cholesky decomposition
    chol = np.linalg.cholesky(cov_matrix)
    
    # Generate random normal returns
    random_returns = np.random.normal(0, 1, size=(n_days, n_assets))
    
    # Transform to correlated returns
    correlated_returns = random_returns @ chol.T
    
    # Add drift (expected return)
    expected_returns = np.random.uniform(0.05, 0.15, size=n_assets) / 252  # 5% to 15% annual return
    correlated_returns += expected_returns
    
    # If including regimes, modify the returns to create distinct market regimes
    if include_regimes:
        # Define regimes
        # 1. Normal regime (already generated)
        # 2. High volatility regime (e.g., crisis)
        # 3. Low correlation regime (e.g., sector rotation)
        # 4. Bull market regime (high returns, low vol)
        
        # Divide the time period into segments
        segment_size = n_days // 4
        
        # High volatility regime (2x volatility, negative drift)
        high_vol_start = segment_size
        high_vol_end = segment_size * 2
        vol_multiplier = 2.5
        correlated_returns[high_vol_start:high_vol_end] *= vol_multiplier
        correlated_returns[high_vol_start:high_vol_end] -= 0.001  # Negative drift
        
        # Low correlation regime (reduce correlations)
        low_corr_start = segment_size * 2
        low_corr_end = segment_size * 3
        # Add more idiosyncratic movement
        idiosyncratic = np.random.normal(0, 0.01, size=(low_corr_end - low_corr_start, n_assets))
        correlated_returns[low_corr_start:low_corr_end] += idiosyncratic
        
        # Bull market regime (positive drift, lower vol)
        bull_start = segment_size * 3
        bull_end = n_days
        correlated_returns[bull_start:bull_end] *= 0.7  # Lower vol
        correlated_returns[bull_start:bull_end] += 0.001  # Positive drift
    
    # Add volatility clustering
    # We'll use a simple AR(1) process for volatility
    vol_persistence = 0.95
    vol_scale = np.ones((n_days, n_assets))
    vol_innovation = np.random.normal(0, 0.1, size=(n_days, n_assets))
    
    for t in range(1, n_days):
        vol_scale[t] = np.sqrt(0.05 + vol_persistence * vol_scale[t-1]**2 + 0.05 * vol_innovation[t]**2)
    
    # Apply time-varying volatility
    correlated_returns = correlated_returns * vol_scale
    
    # Add occasional jumps
    n_jumps = int(n_days * 0.01)  # 1% of days have jumps
    jump_days = np.random.choice(range(n_days), size=n_jumps, replace=False)
    jump_assets = np.random.choice(range(n_assets), size=n_jumps)
    jump_signs = np.random.choice([-1, 1], size=n_jumps)
    jump_sizes = np.random.uniform(0.02, 0.05, size=n_jumps)
    
    for day, asset, sign, size in zip(jump_days, jump_assets, jump_signs, jump_sizes):
        correlated_returns[day, asset] += sign * size
    
    # Convert returns to prices
    # Start with price of 100 for each asset
    prices = np.zeros((n_days, n_assets))
    prices[0] = 100.0
    
    for t in range(1, n_days):
        prices[t] = prices[t-1] * (1 + correlated_returns[t])
    
    # Create DataFrame
    price_df = pd.DataFrame(prices, index=dates, columns=tickers)
    
    print(f"Created synthetic data for {n_assets} assets over {n_days} days")
    return price_df

## Case Study 1: Portfolio Risk Management

In this case study, we'll demonstrate a comprehensive portfolio risk management workflow that combines:

1. Multivariate volatility modeling with DCC-GARCH
2. Value-at-Risk (VaR) and Expected Shortfall (ES) estimation
3. Stress testing and scenario analysis
4. Risk decomposition and attribution

This workflow is typical of what might be used in a risk management department of an investment firm.

In [None]:
# Define portfolio assets
# We'll use major US indices and sector ETFs
portfolio_assets = ['SPY', 'QQQ', 'IWM', 'XLF', 'XLE', 'XLK', 'XLV', 'XLI']

# Load price data
try:
    price_data = load_market_data(portfolio_assets, "2018-01-01")
except Exception as e:
    print(f"Error loading data: {e}")
    print("Using synthetic data instead")
    price_data = create_synthetic_market_data(portfolio_assets, "2018-01-01")

# Calculate returns
returns_df = pd.DataFrame()
for ticker in price_data.columns:
    returns_df[ticker] = returns_from_prices(price_data[ticker], log=True) * 100  # Convert to percentage

# Handle missing values if any
if returns_df.isna().any().any():
    returns_df = returns_df.fillna(method='ffill').fillna(method='bfill')

# Display summary statistics
returns_summary = returns_df.describe()
print("Returns Summary Statistics:")
returns_summary


In [None]:
# Visualize the data
# Plot normalized prices
plt.figure(figsize=(14, 8))
normalized_prices = price_data.div(price_data.iloc[0]) * 100
for ticker in normalized_prices.columns:
    plt.plot(normalized_prices.index, normalized_prices[ticker], label=ticker)
plt.title('Normalized Price Series (Base = 100)')
plt.ylabel('Normalized Price')
plt.xlabel('Date')
plt.legend()
plt.grid(True)
plt.show()

# Plot correlation matrix
plt.figure(figsize=(12, 10))
corr_matrix = returns_df.corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, center=0,
            square=True, linewidths=0.5, cbar_kws={"shrink": 0.8})
plt.title('Asset Correlation Matrix')
plt.tight_layout()
plt.show()


In [None]:
# Define portfolio weights
# For this example, we'll create a portfolio with the following allocation:
# - 40% in broad market ETFs (SPY, QQQ, IWM)
# - 60% in sector ETFs (XLF, XLE, XLK, XLV, XLI)

portfolio_weights = {
    'SPY': 0.20,  # S&P 500
    'QQQ': 0.15,  # NASDAQ 100
    'IWM': 0.05,  # Russell 2000
    'XLF': 0.10,  # Financials
    'XLE': 0.10,  # Energy
    'XLK': 0.20,  # Technology
    'XLV': 0.10,  # Healthcare
    'XLI': 0.10   # Industrials
}

# Create a DataFrame with weights
weights_df = pd.DataFrame(list(portfolio_weights.items()), columns=['Asset', 'Weight'])
print("Portfolio Weights:")
weights_df

# Create weight vector in the same order as returns_df
weight_vector = np.array([portfolio_weights[ticker] for ticker in returns_df.columns])

# Calculate portfolio returns
portfolio_returns = returns_df.dot(weight_vector)

# Plot portfolio returns
plt.figure(figsize=(14, 6))
plt.plot(portfolio_returns.index, portfolio_returns, color='blue')
plt.title('Portfolio Daily Returns')
plt.ylabel('Returns (%)')
plt.xlabel('Date')
plt.grid(True)
plt.show()


In [None]:
# Step 1: Estimate a DCC-GARCH model for the portfolio assets
# Convert returns DataFrame to NumPy array
returns_array = returns_df.values

# Create and estimate a DCC-GARCH model with Student's t distribution
# Student's t is often more appropriate for financial returns due to fat tails
print("Estimating DCC-GARCH model with Student's t distribution...")
dcc_model = DCC(returns_df.shape[1],  # Number of assets
                p=1, q=1,             # DCC orders
                univariate_model=GARCH,  # Univariate model for each series
                univariate_params={'p': 1, 'q': 1},  # GARCH(1,1) for each series
                distribution=StudentT())  # Student's t distribution

# Fit the model
dcc_results = dcc_model.fit(returns_array)
print("DCC-GARCH estimation complete.")

# Display DCC parameters
print("
DCC Parameters:")
dcc_params = pd.DataFrame({
    'Parameter': dcc_results.parameter_names[-2:],  # Last two parameters are DCC parameters
    'Estimate': dcc_results.parameters[-2:],
    'Std. Error': dcc_results.std_errors[-2:],
    't-statistic': dcc_results.t_stats[-2:],
    'p-value': dcc_results.p_values[-2:]
})
print(dcc_params)

# Calculate persistence
alpha = dcc_results.parameters[-2]  # DCC alpha parameter
beta = dcc_results.parameters[-1]   # DCC beta parameter
persistence = alpha + beta
print(f"
DCC Persistence (α + β): {persistence:.4f}")
print(f"Half-life: {np.log(0.5) / np.log(persistence):.2f} days")

In [None]:
# Step 2: Calculate portfolio volatility using the DCC-GARCH model
# Function to calculate portfolio variance from covariance matrix
def portfolio_variance(cov_matrix, weights):
    return weights.T @ cov_matrix @ weights

# Calculate portfolio volatility
dcc_portfolio_var = np.array([portfolio_variance(cov_matrix, weight_vector) 
                             for cov_matrix in dcc_results.conditional_covariance])
dcc_portfolio_vol = np.sqrt(dcc_portfolio_var)  # Daily volatility
dcc_portfolio_vol_annual = dcc_portfolio_vol * np.sqrt(252)  # Annualized

# Calculate rolling portfolio volatility for comparison
window_size = 60  # 60-day rolling window (approximately 3 months)
rolling_portfolio_vol = portfolio_returns.rolling(window=window_size).std() * np.sqrt(252)  # Annualized

# Plot portfolio volatility
plt.figure(figsize=(14, 6))
plt.plot(returns_df.index, dcc_portfolio_vol_annual, label='DCC-GARCH', color='blue')
plt.plot(rolling_portfolio_vol.index, rolling_portfolio_vol, label=f'{window_size}-Day Rolling', 
         color='red', alpha=0.7)
plt.title('Portfolio Volatility (Annualized)')
plt.ylabel('Volatility (%)')
plt.xlabel('Date')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Step 3: Calculate Value-at-Risk (VaR) and Expected Shortfall (ES)
# We'll calculate these risk measures using the DCC-GARCH model

# Define confidence levels
confidence_levels = [0.95, 0.99]

# Function to calculate VaR and ES
def calculate_var_es(returns, volatility, confidence_level, distribution='normal', df=None):
    """
    Calculate Value-at-Risk and Expected Shortfall.
    
    Parameters
    ----------
    returns : np.ndarray or pd.Series
        Historical returns
    volatility : np.ndarray or pd.Series
        Volatility estimates
    confidence_level : float
        Confidence level (e.g., 0.95 for 95% VaR)
    distribution : str
        Distribution assumption: 'normal', 'empirical', or 't'
    df : float, optional
        Degrees of freedom for t-distribution
        
    Returns
    -------
    tuple
        (VaR, ES) values
    """
    if distribution == 'normal':
        # Calculate VaR assuming normal distribution
        z_score = stats.norm.ppf(1 - confidence_level)
        var = -volatility * z_score
        
        # Calculate ES for normal distribution
        # ES = -μ - σ * φ(z_α) / (1 - α)
        # where φ is the standard normal PDF
        pdf_value = stats.norm.pdf(z_score)
        es = -volatility * pdf_value / (1 - confidence_level)
        
    elif distribution == 't' and df is not None:
        # Calculate VaR assuming t-distribution
        t_score = stats.t.ppf(1 - confidence_level, df)
        var = -volatility * t_score * np.sqrt((df - 2) / df)
        
        # Calculate ES for t-distribution
        # ES = -μ - σ * (df / (df - 1)) * (ft(t_α) / (1 - α)) * ((df + t_α^2) / (df - 1))
        # where ft is the t-distribution PDF
        pdf_value = stats.t.pdf(t_score, df)
        es = -volatility * (df / (df - 1)) * (pdf_value / (1 - confidence_level)) * ((df + t_score**2) / (df - 1))
        
    elif distribution == 'empirical':
        # Calculate standardized residuals
        std_residuals = returns / volatility
        
        # Calculate VaR using empirical distribution
        var_quantile = np.percentile(std_residuals, 100 * (1 - confidence_level))
        var = volatility * var_quantile
        
        # Calculate ES using empirical distribution
        es_threshold = var_quantile
        beyond_var = std_residuals[std_residuals <= es_threshold]
        es = volatility * np.mean(beyond_var) if len(beyond_var) > 0 else var
    else:
        raise ValueError("Invalid distribution or missing degrees of freedom")
    
    return var, es

# Calculate VaR and ES for each day using DCC-GARCH volatility
var_es_results = {}

# Get degrees of freedom from Student's t distribution
# It's typically the last univariate parameter for each asset
# We'll use the average across assets for simplicity
df_params = []
for i in range(returns_df.shape[1]):
    # Find the degrees of freedom parameter for this asset
    # It's typically the last parameter for each univariate model
    univariate_params_count = 3  # omega, alpha, beta for GARCH(1,1)
    df_param_idx = i * (univariate_params_count + 1) + univariate_params_count
    if df_param_idx < len(dcc_results.parameters) - 2:  # -2 for DCC parameters
        df_params.append(dcc_results.parameters[df_param_idx])

# Use average degrees of freedom if available, otherwise default to 5
avg_df = np.mean(df_params) if df_params else 5.0
print(f"Average degrees of freedom for Student's t distribution: {avg_df:.2f}")

# Calculate VaR and ES for each confidence level
for confidence in confidence_levels:
    # Normal distribution
    var_normal, es_normal = calculate_var_es(
        portfolio_returns.values, dcc_portfolio_vol, confidence, 'normal')
    
    # t-distribution
    var_t, es_t = calculate_var_es(
        portfolio_returns.values, dcc_portfolio_vol, confidence, 't', avg_df)
    
    # Store results
    var_es_results[f'VaR {confidence*100:.0f}% Normal'] = var_normal
    var_es_results[f'ES {confidence*100:.0f}% Normal'] = es_normal
    var_es_results[f'VaR {confidence*100:.0f}% t'] = var_t
    var_es_results[f'ES {confidence*100:.0f}% t'] = es_t

# Convert to DataFrame
var_es_df = pd.DataFrame(var_es_results, index=returns_df.index)

# Plot VaR and ES
plt.figure(figsize=(14, 8))

# Plot portfolio returns
plt.plot(portfolio_returns.index, portfolio_returns, color='blue', alpha=0.5, label='Portfolio Returns')

# Plot VaR and ES for 99% confidence level
plt.plot(var_es_df.index, var_es_df['VaR 99% t'], color='red', label='99% VaR (t-dist)')
plt.plot(var_es_df.index, var_es_df['ES 99% t'], color='darkred', linestyle='--', label='99% ES (t-dist)')

# Plot VaR and ES for 95% confidence level
plt.plot(var_es_df.index, var_es_df['VaR 95% t'], color='orange', label='95% VaR (t-dist)')
plt.plot(var_es_df.index, var_es_df['ES 95% t'], color='darkorange', linestyle='--', label='95% ES (t-dist)')

plt.title('Portfolio Returns with Value-at-Risk and Expected Shortfall')
plt.ylabel('Returns (%)')
plt.xlabel('Date')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Step 4: Backtesting VaR
# Count VaR violations (when actual returns are worse than VaR)
violations = {}
for confidence in confidence_levels:
    for dist in ['Normal', 't']:
        var_col = f'VaR {confidence*100:.0f}% {dist.lower()}'
        violations[var_col] = (portfolio_returns < var_es_df[var_col]).sum()

# Calculate expected number of violations
n_days = len(portfolio_returns)
expected_violations = {}
for confidence in confidence_levels:
    expected_violations[f'{confidence*100:.0f}%'] = n_days * (1 - confidence)

# Create violation summary
violation_summary = pd.DataFrame({
    'VaR Level': list(violations.keys()),
    'Violations': list(violations.values()),
    'Expected': [expected_violations[level.split('%')[0] + '%'] for level in violations.keys()],
    'Ratio': [violations[level] / expected_violations[level.split('%')[0] + '%'] for level in violations.keys()]
})

print("VaR Backtest Results:")
violation_summary


In [None]:
# Step 5: Risk Decomposition and Attribution
# Calculate marginal contribution to risk (MCR) and percentage contribution to risk (PCR)

# Function to calculate risk contributions
def calculate_risk_contributions(cov_matrix, weights):
    """
    Calculate marginal and percentage contributions to portfolio risk.
    
    Parameters
    ----------
    cov_matrix : np.ndarray
        Covariance matrix
    weights : np.ndarray
        Portfolio weights
        
    Returns
    -------
    tuple
        (marginal_contributions, percentage_contributions)
    """
    # Calculate portfolio variance and volatility
    port_var = portfolio_variance(cov_matrix, weights)
    port_vol = np.sqrt(port_var)
    
    # Calculate marginal contribution to risk (MCR)
    # MCR_i = w_i * (Σw)_i / σ_p
    # where (Σw)_i is the i-th element of the vector Σw
    mcr = weights * (cov_matrix @ weights) / port_vol
    
    # Calculate percentage contribution to risk (PCR)
    # PCR_i = MCR_i / σ_p
    pcr = mcr / port_vol
    
    return mcr, pcr

# Calculate risk contributions using the latest covariance matrix
latest_cov = dcc_results.conditional_covariance[-1]
mcr, pcr = calculate_risk_contributions(latest_cov, weight_vector)

# Create risk contribution summary
risk_contrib = pd.DataFrame({
    'Asset': returns_df.columns,
    'Weight': weight_vector,
    'Marginal Contribution': mcr,
    'Percentage Contribution': pcr * 100  # Convert to percentage
})

print("Risk Contribution Analysis:")
risk_contrib


In [None]:
# Visualize risk contributions
plt.figure(figsize=(14, 7))

# Sort by percentage contribution
risk_contrib_sorted = risk_contrib.sort_values('Percentage Contribution', ascending=False)

# Create bar chart
bars = plt.bar(risk_contrib_sorted['Asset'], risk_contrib_sorted['Percentage Contribution'])

# Add weight information
for i, bar in enumerate(bars):
    weight = risk_contrib_sorted.iloc[i]['Weight'] * 100
    plt.text(bar.get_x() + bar.get_width()/2, 0.5, f'Weight: {weight:.1f}%', 
             ha='center', va='bottom', rotation=90, color='white', fontweight='bold')

plt.title('Percentage Contribution to Portfolio Risk')
plt.ylabel('Contribution (%)')
plt.grid(axis='y')
plt.tight_layout()
plt.show()

# Compare weight allocation vs risk contribution
plt.figure(figsize=(14, 7))

# Create a DataFrame for comparison
comparison = pd.DataFrame({
    'Asset': risk_contrib['Asset'],
    'Weight Allocation (%)': risk_contrib['Weight'] * 100,
    'Risk Contribution (%)': risk_contrib['Percentage Contribution']
})

# Sort by risk contribution
comparison = comparison.sort_values('Risk Contribution (%)', ascending=False)

# Plot
comparison.set_index('Asset')[['Weight Allocation (%)', 'Risk Contribution (%)']].plot(kind='bar', figsize=(14, 7))
plt.title('Weight Allocation vs Risk Contribution')
plt.ylabel('Percentage (%)')
plt.grid(axis='y')
plt.tight_layout()
plt.show()


In [None]:
# Step 6: Stress Testing and Scenario Analysis
# We'll simulate different market scenarios and analyze their impact on the portfolio

# Define scenarios
scenarios = {
    'Base Case': {
        'description': 'Current market conditions',
        'volatility_multiplier': 1.0,
        'correlation_adjustment': 0.0,
        'return_shift': 0.0
    },
    'Market Crash': {
        'description': 'Severe market downturn with high volatility and correlations',
        'volatility_multiplier': 2.5,
        'correlation_adjustment': 0.2,  # Increase correlations
        'return_shift': -2.0  # Negative daily returns
    },
    'Sector Rotation': {
        'description': 'Decreased correlations with moderate volatility',
        'volatility_multiplier': 1.3,
        'correlation_adjustment': -0.3,  # Decrease correlations
        'return_shift': 0.0
    },
    'Tech Crash': {
        'description': 'Technology sector crash with spillover effects',
        'volatility_multiplier': 1.8,
        'correlation_adjustment': 0.1,
        'return_shift': -1.5,
        'sector_specific': {'XLK': -3.0}  # Additional shock to tech sector
    },
    'Energy Crisis': {
        'description': 'Energy sector crisis with market-wide impact',
        'volatility_multiplier': 1.5,
        'correlation_adjustment': 0.1,
        'return_shift': -1.0,
        'sector_specific': {'XLE': -4.0}  # Additional shock to energy sector
    }
}

# Function to adjust correlation matrix
def adjust_correlation_matrix(corr_matrix, adjustment):
    """
    Adjust correlation matrix by adding a constant to off-diagonal elements,
    then rescaling to ensure it remains a valid correlation matrix.
    
    Parameters
    ----------
    corr_matrix : np.ndarray
        Original correlation matrix
    adjustment : float
        Adjustment to apply to off-diagonal elements
        
    Returns
    -------
    np.ndarray
        Adjusted correlation matrix
    """
    # Create a copy of the correlation matrix
    adjusted_corr = corr_matrix.copy()
    
    # Apply adjustment to off-diagonal elements
    n = adjusted_corr.shape[0]
    for i in range(n):
        for j in range(n):
            if i != j:
                adjusted_corr[i, j] += adjustment
                # Ensure correlations are between -1 and 1
                adjusted_corr[i, j] = max(-0.99, min(0.99, adjusted_corr[i, j]))
    
    # Ensure the matrix is positive definite
    # We'll use the nearest positive definite matrix if needed
    try:
        # Check if the matrix is positive definite
        np.linalg.cholesky(adjusted_corr)
    except np.linalg.LinAlgError:
        # If not, find the nearest positive definite matrix
        # This is a simplified approach - in practice, you might use more sophisticated methods
        eigvals, eigvecs = np.linalg.eigh(adjusted_corr)
        eigvals = np.maximum(eigvals, 1e-6)  # Ensure positive eigenvalues
        adjusted_corr = eigvecs @ np.diag(eigvals) @ eigvecs.T
        
        # Rescale to ensure diagonal elements are 1
        d = np.sqrt(np.diag(adjusted_corr))
        adjusted_corr = adjusted_corr / np.outer(d, d)
    
    return adjusted_corr

# Function to run scenario analysis
def run_scenario_analysis(scenarios, latest_cov, weight_vector, asset_names, confidence_level=0.99):
    """
    Run scenario analysis for different market conditions.
    
    Parameters
    ----------
    scenarios : dict
        Dictionary of scenarios
    latest_cov : np.ndarray
        Latest covariance matrix
    weight_vector : np.ndarray
        Portfolio weights
    asset_names : list
        List of asset names
    confidence_level : float
        Confidence level for VaR and ES calculations
        
    Returns
    -------
    pd.DataFrame
        Scenario analysis results
    """
    results = []
    
    # Get latest volatilities (diagonal of covariance matrix)
    latest_vols = np.sqrt(np.diag(latest_cov))
    
    # Get latest correlation matrix
    latest_corr = cov2corr(latest_cov)
    
    for scenario_name, scenario in scenarios.items():
        # Apply volatility multiplier
        adjusted_vols = latest_vols * scenario['volatility_multiplier']
        
        # Apply correlation adjustment
        adjusted_corr = adjust_correlation_matrix(latest_corr, scenario['correlation_adjustment'])
        
        # Reconstruct covariance matrix
        adjusted_cov = np.diag(adjusted_vols) @ adjusted_corr @ np.diag(adjusted_vols)
        
        # Calculate portfolio volatility
        port_var = portfolio_variance(adjusted_cov, weight_vector)
        port_vol = np.sqrt(port_var)
        
        # Calculate VaR and ES assuming t-distribution
        var, es = calculate_var_es(None, port_vol, confidence_level, 't', avg_df)
        
        # Calculate expected return under this scenario
        base_return = scenario['return_shift']
        
        # Apply sector-specific shocks if any
        if 'sector_specific' in scenario:
            for asset, shock in scenario['sector_specific'].items():
                if asset in asset_names:
                    asset_idx = asset_names.index(asset)
                    base_return += weight_vector[asset_idx] * shock
        
        # Store results
        results.append({
            'Scenario': scenario_name,
            'Description': scenario['description'],
            'Expected Return (%)': base_return,
            'Volatility (%)': port_vol * 100,  # Convert to percentage
            f'VaR {confidence_level*100:.0f}% (%)': -var * 100,  # Convert to positive percentage
            f'ES {confidence_level*100:.0f}% (%)': -es * 100,  # Convert to positive percentage
        })
    
    return pd.DataFrame(results)

# Run scenario analysis
scenario_results = run_scenario_analysis(
    scenarios, latest_cov, weight_vector, list(returns_df.columns), 0.99)

print("Scenario Analysis Results:")
scenario_results


In [None]:
# Visualize scenario analysis results
plt.figure(figsize=(14, 8))

# Create bar chart for VaR and ES
x = np.arange(len(scenario_results))
width = 0.35

plt.bar(x - width/2, scenario_results['VaR 99.0% (%)'], width, label='99% VaR')
plt.bar(x + width/2, scenario_results['ES 99.0% (%)'], width, label='99% ES')

plt.xlabel('Scenario')
plt.ylabel('Risk Measure (%)')
plt.title('VaR and ES Under Different Scenarios')
plt.xticks(x, scenario_results['Scenario'])
plt.legend()
plt.grid(axis='y')

# Add expected return as text
for i, v in enumerate(scenario_results['Expected Return (%)']):
    plt.text(i, 0.5, f'Return: {v:.1f}%', ha='center', va='bottom', rotation=90, color='black')

plt.tight_layout()
plt.show()

# Create a risk-return plot
plt.figure(figsize=(12, 8))
plt.scatter(scenario_results['Volatility (%)'], scenario_results['Expected Return (%)'], 
            s=100, alpha=0.7)

# Add labels for each scenario
for i, scenario in enumerate(scenario_results['Scenario']):
    plt.annotate(scenario, 
                 (scenario_results['Volatility (%)'][i], scenario_results['Expected Return (%)'][i]),
                 xytext=(10, 5), textcoords='offset points')

plt.axhline(y=0, color='gray', linestyle='--')
plt.title('Risk-Return Profile Under Different Scenarios')
plt.xlabel('Volatility (%)')
plt.ylabel('Expected Return (%)')
plt.grid(True)
plt.show()


In [None]:
# Step 7: Portfolio Optimization for Risk Reduction
# We'll find the minimum variance portfolio as a risk-reduction alternative

from scipy.optimize import minimize

# Function to calculate portfolio variance (objective function)
def portfolio_variance_objective(weights, cov_matrix):
    return weights.T @ cov_matrix @ weights

# Function to ensure weights sum to 1 (constraint)
def sum_to_one(weights):
    return np.sum(weights) - 1.0

# Find minimum variance portfolio
n_assets = len(weight_vector)
initial_weights = np.ones(n_assets) / n_assets  # Equal weights

# Define constraints
constraints = ({'type': 'eq', 'fun': sum_to_one})

# Define bounds (no short selling)
bounds = tuple((0, 1) for _ in range(n_assets))

# Run optimization
result = minimize(portfolio_variance_objective, initial_weights, args=(latest_cov,),
                 method='SLSQP', bounds=bounds, constraints=constraints)

# Get optimized weights
min_var_weights = result['x']

# Calculate risk metrics for minimum variance portfolio
min_var_var = portfolio_variance(latest_cov, min_var_weights)
min_var_vol = np.sqrt(min_var_var)
min_var_vol_annual = min_var_vol * np.sqrt(252)  # Annualized

# Calculate VaR and ES for minimum variance portfolio
min_var_var_99, min_var_es_99 = calculate_var_es(None, min_var_vol, 0.99, 't', avg_df)

# Create comparison with current portfolio
current_vol = np.sqrt(portfolio_variance(latest_cov, weight_vector))
current_vol_annual = current_vol * np.sqrt(252)  # Annualized
current_var_99, current_es_99 = calculate_var_es(None, current_vol, 0.99, 't', avg_df)

# Create weight comparison
weight_comparison = pd.DataFrame({
    'Asset': returns_df.columns,
    'Current Weight': weight_vector * 100,  # Convert to percentage
    'Min Variance Weight': min_var_weights * 100  # Convert to percentage
})

print("Weight Comparison:")
weight_comparison

# Create risk comparison
risk_comparison = pd.DataFrame({
    'Portfolio': ['Current', 'Minimum Variance'],
    'Volatility (%)': [current_vol_annual * 100, min_var_vol_annual * 100],
    'VaR 99% (%)': [-current_var_99 * 100, -min_var_var_99 * 100],
    'ES 99% (%)': [-current_es_99 * 100, -min_var_es_99 * 100],
    'Volatility Reduction (%)': [0, (1 - min_var_vol_annual / current_vol_annual) * 100],
    'VaR Reduction (%)': [0, (1 - min_var_var_99 / current_var_99) * 100],
    'ES Reduction (%)': [0, (1 - min_var_es_99 / current_es_99) * 100]
})

print("
Risk Comparison:")
risk_comparison


In [None]:
# Visualize weight comparison
plt.figure(figsize=(14, 7))

# Sort by current weight
weight_comparison_sorted = weight_comparison.sort_values('Current Weight', ascending=False)

# Create grouped bar chart
x = np.arange(len(weight_comparison_sorted))
width = 0.35

plt.bar(x - width/2, weight_comparison_sorted['Current Weight'], width, label='Current Portfolio')
plt.bar(x + width/2, weight_comparison_sorted['Min Variance Weight'], width, label='Minimum Variance Portfolio')

plt.xlabel('Asset')
plt.ylabel('Weight (%)')
plt.title('Portfolio Weight Comparison')
plt.xticks(x, weight_comparison_sorted['Asset'])
plt.legend()
plt.grid(axis='y')
plt.tight_layout()
plt.show()

# Visualize risk comparison
plt.figure(figsize=(14, 7))

# Create grouped bar chart for risk metrics
x = np.arange(3)  # Volatility, VaR, ES
width = 0.35

metrics = ['Volatility (%)', 'VaR 99% (%)', 'ES 99% (%)']
current_values = risk_comparison.loc[0, metrics].values
min_var_values = risk_comparison.loc[1, metrics].values

plt.bar(x - width/2, current_values, width, label='Current Portfolio')
plt.bar(x + width/2, min_var_values, width, label='Minimum Variance Portfolio')

plt.xlabel('Risk Metric')
plt.ylabel('Value (%)')
plt.title('Risk Metric Comparison')
plt.xticks(x, metrics)
plt.legend()
plt.grid(axis='y')

# Add reduction percentages as text
reductions = risk_comparison.loc[1, ['Volatility Reduction (%)', 'VaR Reduction (%)', 'ES Reduction (%)']].values
for i, v in enumerate(reductions):
    plt.text(i, min_var_values[i] + 0.5, f'↓ {v:.1f}%', ha='center', va='bottom', color='green')

plt.tight_layout()
plt.show()


## Case Study 2: Market Regime Analysis

In this case study, we'll demonstrate how to identify and analyze different market regimes using bootstrap methods and time series analysis. This approach is useful for understanding how market dynamics change over time and adapting investment strategies accordingly.

In [None]:
# We'll use the S&P 500 index for this analysis
market_ticker = 'SPY'

# Load data with a longer history
try:
    market_data = load_market_data([market_ticker], "2010-01-01")
    market_prices = market_data[market_ticker]
except Exception as e:
    print(f"Error loading data: {e}")
    print("Using synthetic data instead")
    market_data = create_synthetic_market_data([market_ticker], "2010-01-01", include_regimes=True)
    market_prices = market_data[market_ticker]

# Calculate returns
market_returns = returns_from_prices(market_prices, log=True) * 100  # Convert to percentage

# Plot price and returns
fig, axes = plt.subplots(2, 1, figsize=(14, 10), sharex=True)

# Plot price
axes[0].plot(market_prices.index, market_prices, color='blue')
axes[0].set_title(f'{market_ticker} Price')
axes[0].set_ylabel('Price')
axes[0].grid(True)

# Plot returns
axes[1].plot(market_returns.index, market_returns, color='red', alpha=0.7)
axes[1].set_title(f'{market_ticker} Daily Returns (%)')
axes[1].set_ylabel('Returns (%)')
axes[1].set_xlabel('Date')
axes[1].grid(True)

plt.tight_layout()
plt.show()


In [None]:
# Step 1: Calculate rolling statistics to identify potential regime changes
window_size = 60  # 60-day rolling window (approximately 3 months)

# Calculate rolling statistics
rolling_stats = pd.DataFrame(index=market_returns.index)
rolling_stats['Returns'] = market_returns
rolling_stats['Mean'] = market_returns.rolling(window=window_size).mean()
rolling_stats['Volatility'] = market_returns.rolling(window=window_size).std() * np.sqrt(252)  # Annualized
rolling_stats['Skewness'] = market_returns.rolling(window=window_size).skew()
rolling_stats['Kurtosis'] = market_returns.rolling(window=window_size).kurt()

# Calculate rolling Sharpe ratio (assuming risk-free rate of 0 for simplicity)
rolling_stats['Sharpe'] = (rolling_stats['Mean'] * 252) / rolling_stats['Volatility']

# Plot rolling statistics
fig, axes = plt.subplots(3, 1, figsize=(14, 12), sharex=True)

# Plot rolling mean and volatility
axes[0].plot(rolling_stats.index, rolling_stats['Mean'], color='blue', label='Rolling Mean')
axes[0].set_title('Rolling Mean Return (%)')
axes[0].set_ylabel('Mean (%)')
axes[0].axhline(y=0, color='gray', linestyle='--')
axes[0].grid(True)

axes[1].plot(rolling_stats.index, rolling_stats['Volatility'], color='red', label='Rolling Volatility')
axes[1].set_title('Rolling Volatility (Annualized %)')
axes[1].set_ylabel('Volatility (%)')
axes[1].grid(True)

axes[2].plot(rolling_stats.index, rolling_stats['Sharpe'], color='green', label='Rolling Sharpe')
axes[2].set_title('Rolling Sharpe Ratio')
axes[2].set_ylabel('Sharpe Ratio')
axes[2].axhline(y=0, color='gray', linestyle='--')
axes[2].grid(True)
axes[2].set_xlabel('Date')

plt.tight_layout()
plt.show()


In [None]:
# Step 2: Use change point detection to identify regime changes
# We'll use a simple approach based on volatility changes

def detect_regime_changes(time_series, window_size=60, z_threshold=1.5):
    """
    Detect regime changes based on volatility shifts.
    
    Parameters
    ----------
    time_series : pd.Series
        Time series of returns
    window_size : int
        Rolling window size
    z_threshold : float
        Z-score threshold for regime change detection
        
    Returns
    -------
    pd.DataFrame
        DataFrame with regime indicators
    """
    # Calculate rolling volatility
    rolling_vol = time_series.rolling(window=window_size).std()
    
    # Calculate z-score of volatility
    vol_mean = rolling_vol.mean()
    vol_std = rolling_vol.std()
    vol_z = (rolling_vol - vol_mean) / vol_std
    
    # Identify regime changes
    high_vol = vol_z > z_threshold
    low_vol = vol_z < -z_threshold
    normal_vol = (~high_vol) & (~low_vol)
    
    # Create regime indicators
    regimes = pd.DataFrame(index=time_series.index)
    regimes['Returns'] = time_series
    regimes['Volatility'] = rolling_vol
    regimes['Volatility_Z'] = vol_z
    regimes['Regime'] = 0  # Default: normal regime
    regimes.loc[high_vol, 'Regime'] = 1  # High volatility regime
    regimes.loc[low_vol, 'Regime'] = -1  # Low volatility regime
    
    # Fill NaN values in Regime
    regimes['Regime'] = regimes['Regime'].fillna(0)
    
    # Add regime names for clarity
    regime_names = {-1: 'Low Volatility', 0: 'Normal', 1: 'High Volatility'}
    regimes['Regime_Name'] = regimes['Regime'].map(regime_names)
    
    return regimes

# Detect regime changes
regimes = detect_regime_changes(market_returns, window_size=60, z_threshold=1.5)

# Plot regimes
fig, axes = plt.subplots(2, 1, figsize=(14, 10), sharex=True)

# Plot returns with regime background
for regime in [-1, 0, 1]:
    regime_data = regimes[regimes['Regime'] == regime]
    if not regime_data.empty:
        color = 'green' if regime == -1 else 'gray' if regime == 0 else 'red'
        label = regimes.loc[regimes['Regime'] == regime, 'Regime_Name'].iloc[0] if not regime_data.empty else ''
        axes[0].scatter(regime_data.index, regime_data['Returns'], color=color, alpha=0.7, s=10, label=label)

axes[0].set_title(f'{market_ticker} Returns with Regime Classification')
axes[0].set_ylabel('Returns (%)')
axes[0].grid(True)
axes[0].legend()

# Plot volatility z-score with thresholds
axes[1].plot(regimes.index, regimes['Volatility_Z'], color='blue')
axes[1].axhline(y=1.5, color='red', linestyle='--', label='High Vol Threshold')
axes[1].axhline(y=-1.5, color='green', linestyle='--', label='Low Vol Threshold')
axes[1].set_title('Volatility Z-Score')
axes[1].set_ylabel('Z-Score')
axes[1].set_xlabel('Date')
axes[1].grid(True)
axes[1].legend()

plt.tight_layout()
plt.show()


In [None]:
# Step 3: Analyze regime statistics using bootstrap methods
# We'll use bootstrap to estimate the distribution of key statistics in each regime

# Function to bootstrap statistics for a given regime
def bootstrap_regime_statistics(returns, regime_indicator, n_bootstrap=1000, block_size=10):
    """
    Bootstrap statistics for a specific market regime.
    
    Parameters
    ----------
    returns : pd.Series
        Time series of returns
    regime_indicator : pd.Series
        Indicator series for the regime
    n_bootstrap : int
        Number of bootstrap samples
    block_size : int
        Block size for block bootstrap
        
    Returns
    -------
    dict
        Dictionary of bootstrapped statistics
    """
    # Filter returns for the specific regime
    regime_returns = returns[regime_indicator]
    
    # Skip if not enough data
    if len(regime_returns) < block_size * 2:
        return None
    
    # Convert to numpy array for bootstrap
    returns_array = regime_returns.values
    
    # Create block bootstrap
    bootstrap = BlockBootstrap(block_size=block_size)
    
    # Function to calculate statistics
    def calculate_stats(data):
        return {
            'Mean': np.mean(data),
            'Volatility': np.std(data) * np.sqrt(252),  # Annualized
            'Skewness': stats.skew(data),
            'Kurtosis': stats.kurtosis(data),
            'Sharpe': (np.mean(data) * 252) / (np.std(data) * np.sqrt(252)),
            'VaR_95': -np.percentile(data, 5),
            'VaR_99': -np.percentile(data, 1),
            'Max_Drawdown': calculate_max_drawdown(data)
        }
    
    # Function to calculate maximum drawdown
    def calculate_max_drawdown(returns):
        # Convert returns to prices (starting at 100)
        prices = 100 * np.cumprod(1 + returns / 100)
        # Calculate running maximum
        running_max = np.maximum.accumulate(prices)
        # Calculate drawdowns
        drawdowns = (prices / running_max - 1) * 100
        # Find maximum drawdown
        return np.min(drawdowns)
    
    # Generate bootstrap samples and calculate statistics
    bootstrap_stats = []
    for _ in range(n_bootstrap):
        # Generate bootstrap sample
        bootstrap_sample = bootstrap.generate(returns_array)
        # Calculate statistics
        stats_dict = calculate_stats(bootstrap_sample)
        bootstrap_stats.append(stats_dict)
    
    # Convert list of dictionaries to dictionary of lists
    result = {}
    for key in bootstrap_stats[0].keys():
        result[key] = [stats_dict[key] for stats_dict in bootstrap_stats]
    
    return result

# Bootstrap statistics for each regime
regime_bootstrap_stats = {}
for regime in [-1, 0, 1]:
    regime_indicator = regimes['Regime'] == regime
    if regime_indicator.sum() > 0:  # Only if we have data for this regime
        regime_name = regimes.loc[regimes['Regime'] == regime, 'Regime_Name'].iloc[0]
        print(f"Bootstrapping statistics for {regime_name} regime...")
        bootstrap_stats = bootstrap_regime_statistics(regimes['Returns'], regime_indicator, n_bootstrap=1000, block_size=10)
        if bootstrap_stats is not None:
            regime_bootstrap_stats[regime] = bootstrap_stats
            print(f"  - {len(bootstrap_stats['Mean'])} bootstrap samples generated")
        else:
            print(f"  - Not enough data for bootstrapping")

# Create summary statistics for each regime
regime_summary = []
for regime, bootstrap_stats in regime_bootstrap_stats.items():
    regime_name = regimes.loc[regimes['Regime'] == regime, 'Regime_Name'].iloc[0]
    
    # Calculate mean and confidence intervals for each statistic
    summary = {'Regime': regime_name}
    for stat, values in bootstrap_stats.items():
        summary[f'{stat}_Mean'] = np.mean(values)
        summary[f'{stat}_5th'] = np.percentile(values, 5)
        summary[f'{stat}_95th'] = np.percentile(values, 95)
    
    regime_summary.append(summary)

# Convert to DataFrame
regime_summary_df = pd.DataFrame(regime_summary)

# Display key statistics
key_stats = ['Mean', 'Volatility', 'Sharpe', 'VaR_95', 'Max_Drawdown']
display_cols = ['Regime'] + [f'{stat}_{suffix}' for stat in key_stats for suffix in ['Mean', '5th', '95th']]
display_df = regime_summary_df[display_cols]

print("
Regime Statistics Summary:")
display_df


In [None]:
# Visualize bootstrap distributions for key statistics
key_stats = ['Mean', 'Volatility', 'Sharpe', 'VaR_95']
fig, axes = plt.subplots(len(key_stats), 1, figsize=(14, 4 * len(key_stats)))

for i, stat in enumerate(key_stats):
    for regime, bootstrap_stats in regime_bootstrap_stats.items():
        regime_name = regimes.loc[regimes['Regime'] == regime, 'Regime_Name'].iloc[0]
        color = 'green' if regime == -1 else 'gray' if regime == 0 else 'red'
        
        # Plot kernel density estimate
        sns.kdeplot(bootstrap_stats[stat], ax=axes[i], color=color, label=regime_name)
    
    axes[i].set_title(f'Bootstrap Distribution of {stat}')
    axes[i].grid(True)
    axes[i].legend()

plt.tight_layout()
plt.show()

# Create a comparative bar chart for key statistics
fig, axes = plt.subplots(len(key_stats), 1, figsize=(14, 4 * len(key_stats)))

for i, stat in enumerate(key_stats):
    # Extract data for this statistic
    regimes = []
    means = []
    lower_bounds = []
    upper_bounds = []
    
    for _, row in regime_summary_df.iterrows():
        regimes.append(row['Regime'])
        means.append(row[f'{stat}_Mean'])
        lower_bounds.append(row[f'{stat}_5th'])
        upper_bounds.append(row[f'{stat}_95th'])
    
    # Calculate error bars
    yerr = np.array([np.array(means) - np.array(lower_bounds), np.array(upper_bounds) - np.array(means)])
    
    # Create bar chart
    bars = axes[i].bar(regimes, means, yerr=yerr, capsize=10)
    
    # Color bars based on regime
    for j, bar in enumerate(bars):
        regime_name = regimes[j]
        if 'Low' in regime_name:
            bar.set_color('green')
        elif 'Normal' in regime_name:
            bar.set_color('gray')
        else:  # High volatility
            bar.set_color('red')
    
    axes[i].set_title(f'{stat} by Market Regime (with 90% Confidence Interval)')
    axes[i].grid(True, axis='y')
    
    # Add value labels
    for j, v in enumerate(means):
        axes[i].text(j, v + 0.1, f'{v:.2f}', ha='center')

plt.tight_layout()
plt.show()


In [None]:
# Step 4: Analyze regime transitions using Markov Chain analysis
# We'll estimate the transition probabilities between regimes

def calculate_transition_matrix(regime_series):
    """
    Calculate transition matrix for regime changes.
    
    Parameters
    ----------
    regime_series : pd.Series
        Series of regime indicators
        
    Returns
    -------
    pd.DataFrame
        Transition probability matrix
    """
    # Get unique regimes
    regimes = sorted(regime_series.unique())
    n_regimes = len(regimes)
    
    # Initialize transition count matrix
    transition_counts = np.zeros((n_regimes, n_regimes))
    
    # Count transitions
    for i in range(len(regime_series) - 1):
        from_regime = regime_series.iloc[i]
        to_regime = regime_series.iloc[i + 1]
        
        # Get indices in the transition matrix
        from_idx = regimes.index(from_regime)
        to_idx = regimes.index(to_regime)
        
        # Increment count
        transition_counts[from_idx, to_idx] += 1
    
    # Calculate transition probabilities
    transition_probs = np.zeros_like(transition_counts)
    for i in range(n_regimes):
        row_sum = transition_counts[i].sum()
        if row_sum > 0:
            transition_probs[i] = transition_counts[i] / row_sum
    
    # Create DataFrame
    regime_names = [regimes.loc[regimes['Regime'] == r, 'Regime_Name'].iloc[0] for r in regimes]
    transition_df = pd.DataFrame(transition_probs, index=regime_names, columns=regime_names)
    
    return transition_df

# Calculate transition matrix
transition_matrix = calculate_transition_matrix(regimes['Regime'])

print("Regime Transition Probability Matrix:")
transition_matrix


In [None]:
# Visualize transition matrix
plt.figure(figsize=(10, 8))
sns.heatmap(transition_matrix, annot=True, cmap='coolwarm', vmin=0, vmax=1, 
            square=True, linewidths=0.5, cbar_kws={"shrink": 0.8})
plt.title('Regime Transition Probability Matrix')
plt.tight_layout()
plt.show()

# Calculate regime duration statistics
def calculate_regime_durations(regime_series):
    """
    Calculate statistics on regime durations.
    
    Parameters
    ----------
    regime_series : pd.Series
        Series of regime indicators
        
    Returns
    -------
    pd.DataFrame
        DataFrame with regime duration statistics
    """
    # Initialize variables
    current_regime = regime_series.iloc[0]
    current_duration = 1
    durations = {}
    
    # Iterate through the series
    for i in range(1, len(regime_series)):
        if regime_series.iloc[i] == current_regime:
            current_duration += 1
        else:
            # Record duration for the previous regime
            if current_regime not in durations:
                durations[current_regime] = []
            durations[current_regime].append(current_duration)
            
            # Reset for new regime
            current_regime = regime_series.iloc[i]
            current_duration = 1
    
    # Record the last regime duration
    if current_regime not in durations:
        durations[current_regime] = []
    durations[current_regime].append(current_duration)
    
    # Calculate statistics
    stats = []
    for regime, duration_list in durations.items():
        regime_name = regimes.loc[regimes['Regime'] == regime, 'Regime_Name'].iloc[0]
        stats.append({
            'Regime': regime_name,
            'Count': len(duration_list),
            'Min Duration': min(duration_list),
            'Max Duration': max(duration_list),
            'Mean Duration': np.mean(duration_list),
            'Median Duration': np.median(duration_list),
            'Total Days': sum(duration_list),
            'Percentage': sum(duration_list) / len(regime_series) * 100
        })
    
    return pd.DataFrame(stats)

# Calculate regime durations
duration_stats = calculate_regime_durations(regimes['Regime'])

print("
Regime Duration Statistics:")
duration_stats


In [None]:
# Visualize regime durations
plt.figure(figsize=(14, 7))

# Create bar chart for mean durations
bars = plt.bar(duration_stats['Regime'], duration_stats['Mean Duration'])

# Color bars based on regime
for i, bar in enumerate(bars):
    regime_name = duration_stats.iloc[i]['Regime']
    if 'Low' in regime_name:
        bar.set_color('green')
    elif 'Normal' in regime_name:
        bar.set_color('gray')
    else:  # High volatility
        bar.set_color('red')

plt.title('Mean Duration of Market Regimes (Trading Days)')
plt.ylabel('Mean Duration (Days)')
plt.grid(axis='y')

# Add value labels
for i, v in enumerate(duration_stats['Mean Duration']):
    plt.text(i, v + 0.5, f'{v:.1f}', ha='center')

# Add percentage labels
for i, v in enumerate(duration_stats['Percentage']):
    plt.text(i, 5, f'{v:.1f}% of time', ha='center', color='white', fontweight='bold')

plt.tight_layout()
plt.show()

# Create a pie chart for regime distribution
plt.figure(figsize=(10, 8))
colors = ['green', 'gray', 'red']
plt.pie(duration_stats['Percentage'], labels=duration_stats['Regime'], autopct='%1.1f%%', 
        colors=colors, startangle=90)
plt.title('Distribution of Market Regimes')
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle
plt.tight_layout()
plt.show()


In [None]:
# Step 5: Predict future regime probabilities using the Markov model
# We'll use the transition matrix to forecast regime probabilities

def forecast_regime_probabilities(transition_matrix, current_regime, n_periods=30):
    """
    Forecast future regime probabilities using a Markov model.
    
    Parameters
    ----------
    transition_matrix : pd.DataFrame
        Transition probability matrix
    current_regime : str
        Current regime name
    n_periods : int
        Number of periods to forecast
        
    Returns
    -------
    pd.DataFrame
        DataFrame with forecasted probabilities
    """
    # Get regime names
    regimes = transition_matrix.index.tolist()
    n_regimes = len(regimes)
    
    # Initialize probability vector
    # Start with 100% probability in the current regime
    current_idx = regimes.index(current_regime)
    prob_vector = np.zeros(n_regimes)
    prob_vector[current_idx] = 1.0
    
    # Convert transition matrix to numpy array
    trans_matrix = transition_matrix.values
    
    # Initialize results
    forecasts = []
    forecasts.append({
        'Period': 0,
        **{regime: prob for regime, prob in zip(regimes, prob_vector)}
    })
    
    # Forecast future probabilities
    for t in range(1, n_periods + 1):
        # Update probability vector
        prob_vector = prob_vector @ trans_matrix
        
        # Store results
        forecasts.append({
            'Period': t,
            **{regime: prob for regime, prob in zip(regimes, prob_vector)}
        })
    
    return pd.DataFrame(forecasts)

# Get current regime
current_regime = regimes.loc[regimes.index[-1], 'Regime_Name']
print(f"Current regime: {current_regime}")

# Forecast regime probabilities
forecast_horizon = 60  # 60 trading days (approximately 3 months)
regime_forecasts = forecast_regime_probabilities(transition_matrix, current_regime, forecast_horizon)

# Display forecast for selected periods
selected_periods = [0, 1, 5, 10, 20, 30, 60]
print("
Forecasted Regime Probabilities:")
regime_forecasts[regime_forecasts['Period'].isin(selected_periods)]


In [None]:
# Visualize regime probability forecasts
plt.figure(figsize=(14, 7))

# Plot probability evolution for each regime
for regime in transition_matrix.index:
    color = 'green' if 'Low' in regime else 'gray' if 'Normal' in regime else 'red'
    plt.plot(regime_forecasts['Period'], regime_forecasts[regime], 
             label=regime, color=color, linewidth=2)

plt.title('Forecasted Regime Probabilities')
plt.xlabel('Trading Days Ahead')
plt.ylabel('Probability')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Create a stacked area chart for regime probabilities
plt.figure(figsize=(14, 7))

# Get regime names in the right order (low, normal, high)
regime_order = sorted(transition_matrix.index, key=lambda x: 'Low' in x and 0 or 'Normal' in x and 1 or 2)

# Create stacked area chart
plt.stackplot(regime_forecasts['Period'], 
              [regime_forecasts[regime] for regime in regime_order],
              labels=regime_order,
              colors=['green', 'gray', 'red'],
              alpha=0.8)

plt.title('Regime Probability Evolution')
plt.xlabel('Trading Days Ahead')
plt.ylabel('Probability')
plt.grid(True)
plt.legend(loc='upper right')
plt.tight_layout()
plt.show()


In [None]:
# Step 6: Calculate expected portfolio performance based on regime forecasts
# We'll combine regime probabilities with regime-specific performance metrics

def calculate_expected_performance(regime_forecasts, regime_stats):
    """
    Calculate expected portfolio performance based on regime forecasts.
    
    Parameters
    ----------
    regime_forecasts : pd.DataFrame
        Forecasted regime probabilities
    regime_stats : pd.DataFrame
        Regime-specific performance statistics
        
    Returns
    -------
    pd.DataFrame
        DataFrame with expected performance metrics
    """
    # Initialize results
    expected_performance = pd.DataFrame()
    expected_performance['Period'] = regime_forecasts['Period']
    
    # Calculate expected values for each statistic
    for stat in ['Mean', 'Volatility', 'Sharpe', 'VaR_95']:
        expected_performance[f'Expected_{stat}'] = 0.0
        
        # Sum weighted statistics across regimes
        for _, row in regime_stats.iterrows():
            regime = row['Regime']
            stat_value = row[f'{stat}_Mean']
            expected_performance[f'Expected_{stat}'] += regime_forecasts[regime] * stat_value
    
    return expected_performance

# Calculate expected performance
expected_performance = calculate_expected_performance(regime_forecasts, regime_summary_df)

# Display expected performance for selected periods
selected_periods = [0, 1, 5, 10, 20, 30, 60]
print("Expected Performance Metrics:")
expected_performance[expected_performance['Period'].isin(selected_periods)]


In [None]:
# Visualize expected performance metrics
fig, axes = plt.subplots(2, 2, figsize=(14, 10), sharex=True)
axes = axes.flatten()

# Plot expected metrics
metrics = ['Expected_Mean', 'Expected_Volatility', 'Expected_Sharpe', 'Expected_VaR_95']
titles = ['Expected Daily Return (%)', 'Expected Volatility (Annualized %)', 
          'Expected Sharpe Ratio', 'Expected 95% VaR (%)']

for i, (metric, title) in enumerate(zip(metrics, titles)):
    axes[i].plot(expected_performance['Period'], expected_performance[metric], color='blue', linewidth=2)
    axes[i].set_title(title)
    axes[i].grid(True)
    
    # Add horizontal line at current value
    current_value = expected_performance.loc[0, metric]
    axes[i].axhline(y=current_value, color='red', linestyle='--', 
                   label=f'Current: {current_value:.4f}')
    axes[i].legend()

# Set common x-axis label
for ax in axes[2:]:  # Bottom row
    ax.set_xlabel('Trading Days Ahead')

plt.tight_layout()
plt.show()


In [None]:
# Step 7: Simulate future paths based on regime forecasts
# We'll generate Monte Carlo simulations that incorporate regime dynamics

def simulate_regime_paths(transition_matrix, regime_stats, current_regime, n_days=252, n_paths=1000):
    """
    Simulate future paths incorporating regime dynamics.
    
    Parameters
    ----------
    transition_matrix : pd.DataFrame
        Transition probability matrix
    regime_stats : pd.DataFrame
        Regime-specific performance statistics
    current_regime : str
        Current regime name
    n_days : int
        Number of days to simulate
    n_paths : int
        Number of simulation paths
        
    Returns
    -------
    tuple
        (simulated_returns, simulated_regimes, simulated_prices)
    """
    # Get regime names and indices
    regimes = transition_matrix.index.tolist()
    current_idx = regimes.index(current_regime)
    
    # Convert transition matrix to numpy array
    trans_matrix = transition_matrix.values
    
    # Create dictionaries for regime parameters
    regime_params = {}
    for _, row in regime_stats.iterrows():
        regime = row['Regime']
        regime_params[regime] = {
            'mean': row['Mean_Mean'],
            'vol': row['Volatility_Mean'] / np.sqrt(252),  # Convert to daily
            'skew': row['Skewness_Mean'] if 'Skewness_Mean' in row else 0,
            'kurt': row['Kurtosis_Mean'] if 'Kurtosis_Mean' in row else 3
        }
    
    # Initialize arrays for simulations
    simulated_returns = np.zeros((n_days, n_paths))
    simulated_regimes = np.zeros((n_days, n_paths), dtype=int)
    simulated_prices = np.zeros((n_days + 1, n_paths))
    simulated_prices[0] = 100  # Start at 100
    
    # Simulate paths
    for path in range(n_paths):
        # Start in current regime
        regime_idx = current_idx
        current_regime_name = regimes[regime_idx]
        
        for day in range(n_days):
            # Record current regime
            simulated_regimes[day, path] = regime_idx
            
            # Get regime parameters
            params = regime_params[current_regime_name]
            
            # Generate return for this day
            # For simplicity, we'll use normal distribution
            # In practice, you might want to use a more sophisticated distribution
            daily_return = np.random.normal(params['mean'], params['vol'])
            simulated_returns[day, path] = daily_return
            
            # Update price
            simulated_prices[day + 1, path] = simulated_prices[day, path] * (1 + daily_return / 100)
            
            # Determine next regime
            regime_probs = trans_matrix[regime_idx]
            next_regime_idx = np.random.choice(len(regimes), p=regime_probs)
            regime_idx = next_regime_idx
            current_regime_name = regimes[regime_idx]
    
    return simulated_returns, simulated_regimes, simulated_prices

# Simulate future paths
n_days = 252  # 1 year
n_paths = 1000
print(f"Simulating {n_paths} paths for {n_days} trading days...")
sim_returns, sim_regimes, sim_prices = simulate_regime_paths(
    transition_matrix, regime_summary_df, current_regime, n_days, n_paths)
print("Simulation complete.")


In [None]:
# Visualize simulation results
# Plot a subset of price paths
plt.figure(figsize=(14, 7))

# Create time index
time_index = np.arange(n_days + 1)

# Plot a subset of paths
n_plot_paths = 50
plot_indices = np.random.choice(n_paths, n_plot_paths, replace=False)
for idx in plot_indices:
    plt.plot(time_index, sim_prices[:, idx], color='blue', alpha=0.1)

# Plot median path
median_path = np.median(sim_prices, axis=1)
plt.plot(time_index, median_path, color='red', linewidth=2, label='Median Path')

# Plot confidence intervals
lower_5 = np.percentile(sim_prices, 5, axis=1)
upper_95 = np.percentile(sim_prices, 95, axis=1)
plt.fill_between(time_index, lower_5, upper_95, color='red', alpha=0.2, label='90% Confidence Interval')

plt.title('Simulated Price Paths with Regime Dynamics')
plt.xlabel('Trading Days')
plt.ylabel('Price (Base = 100)')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Calculate terminal price distribution
terminal_prices = sim_prices[-1]

# Plot terminal price distribution
plt.figure(figsize=(14, 7))
sns.histplot(terminal_prices, kde=True, bins=50)
plt.axvline(x=100, color='red', linestyle='--', label='Initial Price')
plt.axvline(x=np.median(terminal_prices), color='green', linestyle='--', 
            label=f'Median: {np.median(terminal_prices):.2f}')
plt.title('Terminal Price Distribution after 1 Year')
plt.xlabel('Price (Base = 100)')
plt.ylabel('Frequency')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Calculate key statistics
terminal_returns = (terminal_prices / 100 - 1) * 100  # Convert to percentage
print("Terminal Return Statistics:")
print(f"Median Return: {np.median(terminal_returns):.2f}%")
print(f"Mean Return: {np.mean(terminal_returns):.2f}%")
print(f"5th Percentile: {np.percentile(terminal_returns, 5):.2f}%")
print(f"95th Percentile: {np.percentile(terminal_returns, 95):.2f}%")
print(f"Probability of Positive Return: {(terminal_returns > 0).mean() * 100:.2f}%")


## Case Study 3: Volatility Forecasting for Options Trading

In this case study, we'll demonstrate how to combine GARCH models with realized volatility measures to improve volatility forecasting for options trading. This approach is particularly useful for options pricing and trading strategies that depend on accurate volatility forecasts.

In [None]:
# For this case study, we'll use a single asset (SPY) with both daily and intraday data
# We'll simulate intraday data since we don't have actual high-frequency data

# Function to simulate intraday data
def simulate_intraday_data(daily_prices, n_intraday=78):
    """
    Simulate intraday price data based on daily prices.
    
    Parameters
    ----------
    daily_prices : pd.Series
        Daily price series
    n_intraday : int
        Number of intraday observations per day (e.g., 78 for 5-minute data in 6.5-hour trading day)
    
    Returns
    -------
    pd.DataFrame
        DataFrame with simulated intraday prices
    """
    # Get daily returns
    daily_returns = daily_prices.pct_change().dropna()
    
    # Calculate daily volatility
    daily_vol = daily_returns.std()
    
    # Calculate intraday volatility (assuming square-root-of-time rule)
    intraday_vol = daily_vol / np.sqrt(n_intraday)
    
    # Initialize DataFrame for intraday data
    intraday_data = []
    
    # Generate intraday data for each day
    for day, (date, daily_price) in enumerate(daily_prices.items()):
        # Skip the first day (no return available)
        if day == 0:
            continue
            
        # Get daily return for this day
        daily_ret = daily_returns.iloc[day-1]
        
        # Calculate open price (previous day's close)
        open_price = daily_prices.iloc[day-1]
        
        # Calculate close price (this day's close)
        close_price = daily_price
        
        # Generate intraday returns with the same total return
        # We'll use a random walk with drift
        intraday_drift = daily_ret / n_intraday
        intraday_returns = np.random.normal(intraday_drift, intraday_vol, n_intraday)
        
        # Adjust to match daily return
        actual_return = (1 + intraday_returns).prod() - 1
        adjustment = daily_ret / actual_return
        intraday_returns = intraday_returns * adjustment
        
        # Generate intraday prices
        intraday_prices = open_price * np.cumprod(1 + intraday_returns)
        
        # Ensure the last price matches the close price
        intraday_prices[-1] = close_price
        
        # Create timestamps for this day
        # Assuming 9:30 AM to 4:00 PM trading hours
        start_time = pd.Timestamp(date.date()).replace(hour=9, minute=30)
        end_time = pd.Timestamp(date.date()).replace(hour=16, minute=0)
        timestamps = pd.date_range(start=start_time, end=end_time, periods=n_intraday)
        
        # Add to intraday data
        for i, (ts, price) in enumerate(zip(timestamps, intraday_prices)):
            intraday_data.append({
                'Timestamp': ts,
                'Price': price,
                'Date': date.date()
            })
    
    # Convert to DataFrame
    intraday_df = pd.DataFrame(intraday_data)
    intraday_df.set_index('Timestamp', inplace=True)
    
    return intraday_df

# Simulate intraday data
print("Simulating intraday data...")
intraday_data = simulate_intraday_data(market_prices, n_intraday=78)  # 5-minute data
print(f"Generated {len(intraday_data)} intraday observations")

# Display sample of intraday data
print("
Sample of intraday data:")
intraday_data.head(10)


In [None]:
# Plot a sample day of intraday data
# Select a random day
sample_date = intraday_data['Date'].unique()[100]  # Some day in the middle
sample_day = intraday_data[intraday_data['Date'] == sample_date]

plt.figure(figsize=(14, 7))
plt.plot(sample_day.index, sample_day['Price'])
plt.title(f'Simulated Intraday Prices for {sample_date}')
plt.xlabel('Time')
plt.ylabel('Price')
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# Step 1: Calculate realized volatility measures
# We'll calculate daily realized volatility using intraday returns

def calculate_realized_volatility(intraday_data):
    """
    Calculate daily realized volatility measures from intraday data.
    
    Parameters
    ----------
    intraday_data : pd.DataFrame
        DataFrame with intraday prices
        
    Returns
    -------
    pd.DataFrame
        DataFrame with daily realized volatility measures
    """
    # Calculate intraday returns
    intraday_data['Return'] = intraday_data['Price'].pct_change()
    
    # Group by date
    daily_groups = intraday_data.groupby('Date')
    
    # Initialize results
    realized_vol = []
    
    # Calculate realized volatility for each day
    for date, group in daily_groups:
        # Skip days with insufficient data
        if len(group) < 10:
            continue
            
        # Get returns for this day
        returns = group['Return'].dropna().values
        
        # Calculate realized variance (sum of squared returns)
        rv = np.sum(returns**2) * 100**2  # Convert to percentage squared
        
        # Calculate bipower variation (robust to jumps)
        # BPV = (π/2) * sum(|r_i| * |r_{i-1}|)
        abs_returns = np.abs(returns)
        bpv = (np.pi/2) * np.sum(abs_returns[1:] * abs_returns[:-1]) * 100**2
        
        # Calculate jump component
        jump = max(0, rv - bpv)
        
        # Store results
        realized_vol.append({
            'Date': date,
            'RV': rv,  # Realized variance
            'RV_Annualized': rv * 252,  # Annualized
            'RV_Sqrt': np.sqrt(rv),  # Realized volatility
            'RV_Sqrt_Annualized': np.sqrt(rv) * np.sqrt(252),  # Annualized
            'BPV': bpv,  # Bipower variation
            'BPV_Sqrt': np.sqrt(bpv),  # Square root of bipower variation
            'Jump': jump,  # Jump component
            'Jump_Ratio': jump / rv if rv > 0 else 0  # Jump ratio
        })
    
    # Convert to DataFrame
    rv_df = pd.DataFrame(realized_vol)
    rv_df.set_index('Date', inplace=True)
    
    return rv_df

# Calculate realized volatility
realized_vol_df = calculate_realized_volatility(intraday_data)

# Display summary statistics
print("Realized Volatility Summary Statistics:")
realized_vol_df.describe()


In [None]:
# Plot realized volatility measures
fig, axes = plt.subplots(3, 1, figsize=(14, 12), sharex=True)

# Plot realized volatility (annualized)
axes[0].plot(realized_vol_df.index, realized_vol_df['RV_Sqrt_Annualized'], color='blue')
axes[0].set_title('Realized Volatility (Annualized %)')
axes[0].set_ylabel('Volatility (%)')
axes[0].grid(True)

# Plot bipower variation (annualized)
axes[1].plot(realized_vol_df.index, realized_vol_df['BPV_Sqrt'] * np.sqrt(252), color='green')
axes[1].set_title('Bipower Variation (Annualized %)')
axes[1].set_ylabel('Volatility (%)')
axes[1].grid(True)

# Plot jump ratio
axes[2].plot(realized_vol_df.index, realized_vol_df['Jump_Ratio'], color='red')
axes[2].set_title('Jump Ratio (Jump Component / Realized Variance)')
axes[2].set_ylabel('Ratio')
axes[2].set_xlabel('Date')
axes[2].grid(True)

plt.tight_layout()
plt.show()


In [None]:
# Step 2: Estimate GARCH model using daily returns
# We'll use a GARCH(1,1) model with Student's t distribution

# Prepare daily returns
daily_returns_array = market_returns.values

# Create and estimate GARCH model
print("Estimating GARCH(1,1) model...")
garch_model = GARCH(p=1, q=1, distribution=StudentT())
garch_results = garch_model.fit(daily_returns_array)
print("GARCH estimation complete.")

# Display estimation results
print("
GARCH(1,1) Estimation Results:")
print(f"Log-Likelihood: {garch_results.log_likelihood:.4f}")
print(f"AIC: {garch_results.aic:.4f}")
print(f"BIC: {garch_results.bic:.4f}")
print("
Parameter Estimates:")
for name, value, std_err, t_stat, p_value in zip(
    garch_results.parameter_names,
    garch_results.parameters,
    garch_results.std_errors,
    garch_results.t_stats,
    garch_results.p_values
):
    print(f"{name}: {value:.6f} (SE: {std_err:.6f}, t: {t_stat:.4f}, p: {p_value:.4f})")

# Extract conditional volatility
garch_vol = np.sqrt(garch_results.conditional_variance)
garch_vol_annual = garch_vol * np.sqrt(252)  # Annualized

# Create DataFrame with GARCH volatility
garch_vol_df = pd.DataFrame({
    'GARCH_Vol': garch_vol,
    'GARCH_Vol_Annual': garch_vol_annual
}, index=market_returns.index)


In [None]:
# Step 3: Combine GARCH and realized volatility
# We'll merge the two datasets and compare the volatility estimates

# Convert realized volatility index to datetime
realized_vol_df.index = pd.to_datetime(realized_vol_df.index)

# Merge GARCH and realized volatility
# We'll use the date from realized_vol_df as the index
combined_vol = pd.merge(
    realized_vol_df,
    garch_vol_df,
    left_index=True,
    right_index=True,
    how='inner'
)

# Display the combined dataset
print(f"Combined dataset has {len(combined_vol)} observations")
combined_vol.head()


In [None]:
# Plot comparison of GARCH and realized volatility
plt.figure(figsize=(14, 7))

# Plot GARCH volatility
plt.plot(combined_vol.index, combined_vol['GARCH_Vol_Annual'], 
         color='blue', label='GARCH Volatility')

# Plot realized volatility
plt.plot(combined_vol.index, combined_vol['RV_Sqrt_Annualized'], 
         color='red', label='Realized Volatility')

# Plot bipower variation
plt.plot(combined_vol.index, combined_vol['BPV_Sqrt'] * np.sqrt(252), 
         color='green', label='Bipower Variation')

plt.title('Comparison of Volatility Measures (Annualized %)')
plt.ylabel('Volatility (%)')
plt.xlabel('Date')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# Calculate correlation between volatility measures
vol_corr = combined_vol[['RV_Sqrt_Annualized', 'BPV_Sqrt', 'GARCH_Vol_Annual']].corr()
vol_corr.columns = ['Realized Vol', 'Bipower Var', 'GARCH Vol']
vol_corr.index = ['Realized Vol', 'Bipower Var', 'GARCH Vol']

print("
Correlation between Volatility Measures:")
vol_corr


In [None]:
# Step 4: Develop a hybrid volatility model
# We'll create a model that combines GARCH and realized volatility

from sklearn.linear_model import LinearRegression

# Prepare data for regression
# We'll use lagged values of GARCH and realized volatility to predict future realized volatility
combined_vol['RV_Sqrt_Lag1'] = combined_vol['RV_Sqrt'].shift(1)
combined_vol['BPV_Sqrt_Lag1'] = combined_vol['BPV_Sqrt'].shift(1)
combined_vol['GARCH_Vol_Lag1'] = combined_vol['GARCH_Vol'].shift(1)

# Drop missing values
regression_data = combined_vol.dropna()

# Define features and target
X = regression_data[['RV_Sqrt_Lag1', 'BPV_Sqrt_Lag1', 'GARCH_Vol_Lag1']]
y = regression_data['RV_Sqrt']

# Split data into training and testing sets
train_size = int(len(regression_data) * 0.8)
X_train, X_test = X.iloc[:train_size], X.iloc[train_size:]
y_train, y_test = y.iloc[:train_size], y.iloc[train_size:]

# Fit linear regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Make predictions
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)

# Calculate R-squared
r2_train = model.score(X_train, y_train)
r2_test = model.score(X_test, y_test)

print("Hybrid Volatility Model Results:")
print(f"R-squared (Training): {r2_train:.4f}")
print(f"R-squared (Testing): {r2_test:.4f}")
print("
Coefficients:")
for feature, coef in zip(X.columns, model.coef_):
    print(f"{feature}: {coef:.6f}")
print(f"Intercept: {model.intercept_:.6f}")


In [None]:
# Add predictions to the dataset
regression_data['Hybrid_Vol'] = model.predict(X)
regression_data['Hybrid_Vol_Annual'] = regression_data['Hybrid_Vol'] * np.sqrt(252)

# Plot actual vs. predicted volatility
plt.figure(figsize=(14, 7))

# Plot actual realized volatility
plt.plot(regression_data.index, regression_data['RV_Sqrt'] * np.sqrt(252), 
         color='blue', label='Actual Realized Volatility')

# Plot predicted volatility
plt.plot(regression_data.index, regression_data['Hybrid_Vol_Annual'], 
         color='red', label='Hybrid Model Prediction')

# Add vertical line to separate training and testing sets
split_date = regression_data.index[train_size]
plt.axvline(x=split_date, color='black', linestyle='--', label='Train/Test Split')

plt.title('Actual vs. Predicted Volatility (Annualized %)')
plt.ylabel('Volatility (%)')
plt.xlabel('Date')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# Step 5: Evaluate forecasting performance
# We'll compare the forecasting performance of different volatility models

# Calculate forecast errors
regression_data['GARCH_Error'] = regression_data['RV_Sqrt'] - regression_data['GARCH_Vol_Lag1']
regression_data['RV_Error'] = regression_data['RV_Sqrt'] - regression_data['RV_Sqrt_Lag1']
regression_data['BPV_Error'] = regression_data['RV_Sqrt'] - regression_data['BPV_Sqrt_Lag1']
regression_data['Hybrid_Error'] = regression_data['RV_Sqrt'] - regression_data['Hybrid_Vol']

# Calculate error metrics
def calculate_error_metrics(errors):
    """
    Calculate error metrics for volatility forecasts.
    
    Parameters
    ----------
    errors : pd.Series
        Series of forecast errors
    
    Returns
    -------
    dict
        Dictionary of error metrics
    """
    return {
        'MAE': np.mean(np.abs(errors)),
        'RMSE': np.sqrt(np.mean(errors**2)),
        'MAPE': np.mean(np.abs(errors / regression_data['RV_Sqrt'])) * 100
    }

# Calculate error metrics for each model
error_metrics = {}
for model_name in ['GARCH', 'RV', 'BPV', 'Hybrid']:
    # Calculate metrics for training set
    train_errors = regression_data[f'{model_name}_Error'].iloc[:train_size]
    error_metrics[f'{model_name}_Train'] = calculate_error_metrics(train_errors)
    
    # Calculate metrics for testing set
    test_errors = regression_data[f'{model_name}_Error'].iloc[train_size:]
    error_metrics[f'{model_name}_Test'] = calculate_error_metrics(test_errors)

# Create summary DataFrame
error_summary = []
for model_name in ['GARCH', 'RV', 'BPV', 'Hybrid']:
    for dataset in ['Train', 'Test']:
        metrics = error_metrics[f'{model_name}_{dataset}']
        error_summary.append({
            'Model': model_name,
            'Dataset': dataset,
            'MAE': metrics['MAE'],
            'RMSE': metrics['RMSE'],
            'MAPE': metrics['MAPE']
        })

error_summary_df = pd.DataFrame(error_summary)

print("Forecast Error Metrics:")
error_summary_df


In [None]:
# Visualize error metrics
fig, axes = plt.subplots(3, 1, figsize=(14, 12))

# Filter for test set only
test_errors = error_summary_df[error_summary_df['Dataset'] == 'Test']

# Plot MAE
axes[0].bar(test_errors['Model'], test_errors['MAE'])
axes[0].set_title('Mean Absolute Error (MAE)')
axes[0].set_ylabel('MAE')
axes[0].grid(axis='y')

# Plot RMSE
axes[1].bar(test_errors['Model'], test_errors['RMSE'])
axes[1].set_title('Root Mean Squared Error (RMSE)')
axes[1].set_ylabel('RMSE')
axes[1].grid(axis='y')

# Plot MAPE
axes[2].bar(test_errors['Model'], test_errors['MAPE'])
axes[2].set_title('Mean Absolute Percentage Error (MAPE)')
axes[2].set_ylabel('MAPE (%)')
axes[2].grid(axis='y')

plt.tight_layout()
plt.show()


In [None]:
# Step 6: Apply the hybrid model to options pricing
# We'll use the Black-Scholes formula to price options using different volatility estimates

from scipy.stats import norm

def black_scholes(S, K, T, r, sigma, option_type='call'):
    """
    Calculate Black-Scholes option price.
    
    Parameters
    ----------
    S : float
        Current stock price
    K : float
        Strike price
    T : float
        Time to maturity (in years)
    r : float
        Risk-free interest rate (annual)
    sigma : float
        Volatility (annual)
    option_type : str
        'call' or 'put'
    
    Returns
    -------
    float
        Option price
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    
    if option_type == 'call':
        price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    else:  # put
        price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    
    return price

# Calculate option prices using different volatility estimates
# We'll use the last available data point
last_data = regression_data.iloc[-1]

# Current price (assume it's the last available price)
S = market_prices.iloc[-1]

# Define option parameters
K_values = [0.9 * S, 0.95 * S, S, 1.05 * S, 1.1 * S]  # Strike prices
T_values = [1/12, 1/6, 1/4, 1/2, 1]  # Time to maturity (in years)
r = 0.03  # Risk-free rate (3%)

# Get volatility estimates (convert to decimal)
garch_vol = last_data['GARCH_Vol_Annual'] / 100
rv_vol = last_data['RV_Sqrt_Annualized'] / 100
hybrid_vol = last_data['Hybrid_Vol_Annual'] / 100

# Calculate option prices
option_prices = []
for K in K_values:
    for T in T_values:
        for option_type in ['call', 'put']:
            # Calculate prices using different volatility estimates
            garch_price = black_scholes(S, K, T, r, garch_vol, option_type)
            rv_price = black_scholes(S, K, T, r, rv_vol, option_type)
            hybrid_price = black_scholes(S, K, T, r, hybrid_vol, option_type)
            
            # Store results
            option_prices.append({
                'Strike': K,
                'Moneyness': K / S,
                'Maturity': T,
                'Type': option_type.capitalize(),
                'GARCH_Price': garch_price,
                'RV_Price': rv_price,
                'Hybrid_Price': hybrid_price,
                'GARCH_RV_Diff': garch_price - rv_price,
                'Hybrid_RV_Diff': hybrid_price - rv_price
            })

# Convert to DataFrame
option_prices_df = pd.DataFrame(option_prices)

# Display option prices
print("Option Prices Using Different Volatility Estimates:")
print(f"Current Price: ${S:.2f}")
print(f"GARCH Volatility: {garch_vol*100:.2f}%")
print(f"Realized Volatility: {rv_vol*100:.2f}%")
print(f"Hybrid Volatility: {hybrid_vol*100:.2f}%")
print("
Sample of Option Prices:")
option_prices_df.head(10)


In [None]:
# Visualize option prices
# We'll focus on at-the-money options with different maturities
atm_options = option_prices_df[option_prices_df['Moneyness'].between(0.99, 1.01)]

# Plot option prices by maturity
fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Filter for calls and puts
calls = atm_options[atm_options['Type'] == 'Call']
puts = atm_options[atm_options['Type'] == 'Put']

# Plot call prices
axes[0].plot(calls['Maturity'], calls['GARCH_Price'], 'o-', label='GARCH')
axes[0].plot(calls['Maturity'], calls['RV_Price'], 's-', label='Realized Volatility')
axes[0].plot(calls['Maturity'], calls['Hybrid_Price'], '^-', label='Hybrid Model')
axes[0].set_title('At-the-Money Call Option Prices by Maturity')
axes[0].set_xlabel('Time to Maturity (Years)')
axes[0].set_ylabel('Option Price ($)')
axes[0].grid(True)
axes[0].legend()

# Plot put prices
axes[1].plot(puts['Maturity'], puts['GARCH_Price'], 'o-', label='GARCH')
axes[1].plot(puts['Maturity'], puts['RV_Price'], 's-', label='Realized Volatility')
axes[1].plot(puts['Maturity'], puts['Hybrid_Price'], '^-', label='Hybrid Model')
axes[1].set_title('At-the-Money Put Option Prices by Maturity')
axes[1].set_xlabel('Time to Maturity (Years)')
axes[1].set_ylabel('Option Price ($)')
axes[1].grid(True)
axes[1].legend()

plt.tight_layout()
plt.show()

# Plot option prices by moneyness for a specific maturity
maturity = 0.25  # 3 months
maturity_options = option_prices_df[option_prices_df['Maturity'] == maturity]

fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Filter for calls and puts
calls = maturity_options[maturity_options['Type'] == 'Call']
puts = maturity_options[maturity_options['Type'] == 'Put']

# Plot call prices
axes[0].plot(calls['Moneyness'], calls['GARCH_Price'], 'o-', label='GARCH')
axes[0].plot(calls['Moneyness'], calls['RV_Price'], 's-', label='Realized Volatility')
axes[0].plot(calls['Moneyness'], calls['Hybrid_Price'], '^-', label='Hybrid Model')
axes[0].set_title(f'Call Option Prices by Moneyness (Maturity = {maturity} years)')
axes[0].set_xlabel('Moneyness (Strike/Spot)')
axes[0].set_ylabel('Option Price ($)')
axes[0].grid(True)
axes[0].legend()

# Plot put prices
axes[1].plot(puts['Moneyness'], puts['GARCH_Price'], 'o-', label='GARCH')
axes[1].plot(puts['Moneyness'], puts['RV_Price'], 's-', label='Realized Volatility')
axes[1].plot(puts['Moneyness'], puts['Hybrid_Price'], '^-', label='Hybrid Model')
axes[1].set_title(f'Put Option Prices by Moneyness (Maturity = {maturity} years)')
axes[1].set_xlabel('Moneyness (Strike/Spot)')
axes[1].set_ylabel('Option Price ($)')
axes[1].grid(True)
axes[1].legend()

plt.tight_layout()
plt.show()


In [None]:
# Step 7: Analyze the economic value of improved volatility forecasts
# We'll simulate a simple options trading strategy based on volatility forecasts

def simulate_options_strategy(price_series, vol_forecasts, actual_vol, 
                             initial_capital=10000, trade_size=1000):
    """
    Simulate a simple options trading strategy based on volatility forecasts.
    
    Parameters
    ----------
    price_series : pd.Series
        Series of asset prices
    vol_forecasts : dict
        Dictionary of volatility forecasts from different models
    actual_vol : pd.Series
        Series of realized volatility (actual)
    initial_capital : float
        Initial capital
    trade_size : float
        Size of each trade in dollars
    
    Returns
    -------
    pd.DataFrame
        DataFrame with strategy performance
    """
    # Initialize results
    results = []
    
    # Get common dates
    common_dates = sorted(set(vol_forecasts[list(vol_forecasts.keys())[0]].index) & 
                         set(actual_vol.index))
    
    # Initialize portfolio values
    portfolio_values = {model: initial_capital for model in vol_forecasts.keys()}
    
    # Simulate trading strategy
    for i, date in enumerate(common_dates[:-1]):
        next_date = common_dates[i + 1]
        
        # Get current price
        current_price = price_series.loc[date]
        
        # Get forecasted volatilities
        forecasts = {model: vol_series.loc[date] for model, vol_series in vol_forecasts.items()}
        
        # Get actual volatility (next day)
        next_vol = actual_vol.loc[next_date]
        
        # Simulate trading for each model
        for model, forecast in forecasts.items():
            # Convert to decimal
            forecast_vol = forecast / 100
            actual_vol_val = next_vol / 100
            
            # Calculate option prices
            # We'll use at-the-money options with 1-month maturity
            K = current_price
            T = 1/12  # 1 month
            r = 0.03  # 3% risk-free rate
            
            # Calculate option prices using forecasted volatility
            call_price_forecast = black_scholes(current_price, K, T, r, forecast_vol, 'call')
            put_price_forecast = black_scholes(current_price, K, T, r, forecast_vol, 'put')
            
            # Calculate option prices using actual volatility
            call_price_actual = black_scholes(current_price, K, T, r, actual_vol_val, 'call')
            put_price_actual = black_scholes(current_price, K, T, r, actual_vol_val, 'put')
            
            # Trading strategy:
            # If forecasted vol > actual vol: options are overpriced, sell options
            # If forecasted vol < actual vol: options are underpriced, buy options
            
            # Calculate profit/loss
            if forecast_vol > actual_vol_val:  # Sell options
                # Sell straddle (both call and put)
                premium_received = call_price_forecast + put_price_forecast
                cost_to_close = call_price_actual + put_price_actual
                pnl = (premium_received - cost_to_close) * (trade_size / premium_received)
                trade_type = 'Sell Straddle'
            else:  # Buy options
                # Buy straddle (both call and put)
                premium_paid = call_price_forecast + put_price_forecast
                value_at_close = call_price_actual + put_price_actual
                pnl = (value_at_close - premium_paid) * (trade_size / premium_paid)
                trade_type = 'Buy Straddle'
            
            # Update portfolio value
            portfolio_values[model] += pnl
            
            # Store results
            results.append({
                'Date': next_date,
                'Model': model,
                'Forecast_Vol': forecast_vol * 100,
                'Actual_Vol': actual_vol_val * 100,
                'Trade_Type': trade_type,
                'PnL': pnl,
                'Portfolio_Value': portfolio_values[model]
            })
    
    return pd.DataFrame(results)

# Prepare volatility forecasts
vol_forecasts = {
    'GARCH': garch_vol_df['GARCH_Vol_Annual'],
    'RV': realized_vol_df['RV_Sqrt_Annualized'],
    'Hybrid': regression_data['Hybrid_Vol_Annual']
}

# Actual volatility (next day's realized volatility)
actual_vol = realized_vol_df['RV_Sqrt_Annualized']

# Simulate options trading strategy
strategy_results = simulate_options_strategy(
    market_prices, vol_forecasts, actual_vol, initial_capital=10000, trade_size=1000)

# Display strategy results
print("Options Trading Strategy Results:")
strategy_results.head(10)


In [None]:
# Analyze strategy performance
# Calculate performance metrics for each model
performance_metrics = []
for model in vol_forecasts.keys():
    model_results = strategy_results[strategy_results['Model'] == model]
    
    # Calculate metrics
    total_pnl = model_results['PnL'].sum()
    final_value = model_results['Portfolio_Value'].iloc[-1]
    total_return = (final_value / 10000 - 1) * 100
    win_rate = (model_results['PnL'] > 0).mean() * 100
    avg_win = model_results.loc[model_results['PnL'] > 0, 'PnL'].mean()
    avg_loss = model_results.loc[model_results['PnL'] < 0, 'PnL'].mean()
    profit_factor = abs(model_results.loc[model_results['PnL'] > 0, 'PnL'].sum() / 
                       model_results.loc[model_results['PnL'] < 0, 'PnL'].sum())
    
    # Calculate Sharpe ratio (assuming 252 trading days per year)
    daily_returns = model_results['PnL'] / 10000  # Approximate daily returns
    sharpe = (daily_returns.mean() * 252) / (daily_returns.std() * np.sqrt(252))
    
    # Store metrics
    performance_metrics.append({
        'Model': model,
        'Total_PnL': total_pnl,
        'Final_Value': final_value,
        'Total_Return': total_return,
        'Win_Rate': win_rate,
        'Avg_Win': avg_win,
        'Avg_Loss': avg_loss,
        'Profit_Factor': profit_factor,
        'Sharpe_Ratio': sharpe
    })

# Convert to DataFrame
performance_df = pd.DataFrame(performance_metrics)

print("Strategy Performance Metrics:")
performance_df


In [None]:
# Plot portfolio equity curves
plt.figure(figsize=(14, 7))

# Plot equity curves for each model
for model in vol_forecasts.keys():
    model_results = strategy_results[strategy_results['Model'] == model]
    plt.plot(model_results['Date'], model_results['Portfolio_Value'], label=model)

plt.title('Options Trading Strategy Performance')
plt.xlabel('Date')
plt.ylabel('Portfolio Value ($)')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Plot key performance metrics
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Total return
axes[0, 0].bar(performance_df['Model'], performance_df['Total_Return'])
axes[0, 0].set_title('Total Return (%)')
axes[0, 0].set_ylabel('Return (%)')
axes[0, 0].grid(axis='y')

# Sharpe ratio
axes[0, 1].bar(performance_df['Model'], performance_df['Sharpe_Ratio'])
axes[0, 1].set_title('Sharpe Ratio')
axes[0, 1].set_ylabel('Sharpe Ratio')
axes[0, 1].grid(axis='y')

# Win rate
axes[1, 0].bar(performance_df['Model'], performance_df['Win_Rate'])
axes[1, 0].set_title('Win Rate (%)')
axes[1, 0].set_ylabel('Win Rate (%)')
axes[1, 0].grid(axis='y')

# Profit factor
axes[1, 1].bar(performance_df['Model'], performance_df['Profit_Factor'])
axes[1, 1].set_title('Profit Factor')
axes[1, 1].set_ylabel('Profit Factor')
axes[1, 1].grid(axis='y')

plt.tight_layout()
plt.show()


## Conclusion

In this notebook, we've demonstrated several real-world applications of the MFE Toolbox for solving complex financial econometrics problems. We've shown how to:

1. **Implement comprehensive portfolio risk management** by combining multivariate volatility modeling with Value-at-Risk estimation, stress testing, and risk decomposition.

2. **Identify and analyze market regimes** using bootstrap methods and time series analysis, including regime transition probabilities and performance characteristics.

3. **Improve volatility forecasting for options trading** by combining GARCH models with realized volatility measures, demonstrating the economic value of better forecasts.

These case studies showcase the power and flexibility of the MFE Toolbox for financial analysis. By integrating multiple components of the toolbox, we can build sophisticated analytical workflows that provide valuable insights for investment decision-making, risk management, and trading strategies.

The Python-based implementation of the MFE Toolbox makes it easy to combine these techniques with other Python libraries and tools, creating a seamless workflow for financial data analysis and modeling.