In [None]:
!pip install arch

Collecting arch
  Downloading arch-7.2.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading arch-7.2.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (978 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m978.3/978.3 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: arch
Successfully installed arch-7.2.0


In [None]:
"""
VOLATILITY FORECASTING MODEL - COMPLETE IMPLEMENTATION
======================================================

Implement and compare various volatility forecasting models:
1. GARCH (Generalized Autoregressive Conditional Heteroskedasticity)
2. EGARCH (Exponential GARCH)
3. GJR-GARCH (Threshold GARCH)
4. SVR (Support Vector Regression)

Author: Quantitative Trading System
"""

import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
from arch import arch_model
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')


class VolatilityForecaster:
    """Comprehensive volatility forecasting framework"""

    def __init__(self, ticker, start_date, end_date):
        self.ticker = ticker
        self.start_date = start_date
        self.end_date = end_date
        self.data = None
        self.returns = None
        self.realized_vol = None
        self.models = {}
        self.forecasts = {}

    def download_data(self):
        """Download stock data"""
        print("="*80)
        print(f"VOLATILITY FORECASTING FOR {self.ticker}")
        print("="*80)

        self.data = yf.download(self.ticker, start=self.start_date, end=self.end_date)
        print(f"\nDownloaded {len(self.data)} trading days")
        print(f"Date range: {self.data.index[0].date()} to {self.data.index[-1].date()}")

        return self.data

    def calculate_returns(self):
        """Calculate log returns"""
        self.returns = np.log(self.data['Close'] / self.data['Close'].shift(1))
        self.returns = self.returns.dropna() * 100  # Convert to percentage

        print(f"\nReturn Statistics:")
        print(f"  Mean: {self.returns.mean().item():.4f}%")
        print(f"  Std Dev: {self.returns.std().item():.4f}%")
        print(f"  Skewness: {self.returns.skew().item():.4f}")
        print(f"  Kurtosis: {self.returns.kurtosis().item():.4f}")

        return self.returns

    def calculate_realized_volatility(self, window=20):
        """Calculate realized volatility (rolling std dev)"""
        self.realized_vol = self.returns.rolling(window=window).std()
        self.realized_vol = self.realized_vol.dropna()

        print(f"\nRealized Volatility (rolling {window}-day std):")
        print(f"  Mean: {self.realized_vol.mean().item():.4f}%")
        print(f"  Min: {self.realized_vol.min().item():.4f}%")
        print(f"  Max: {self.realized_vol.max().item():.4f}%")

        return self.realized_vol

    def fit_garch(self, p=1, q=1):
        """
        Fit GARCH(p, q) model

        GARCH equation:
        σ²(t) = ω + Σα(i)·ε²(t-i) + Σβ(j)·σ²(t-j)
        """
        print(f"\n{'='*80}")
        print(f"FITTING GARCH({p},{q}) MODEL")
        print(f"{'='*80}")

        model = arch_model(self.returns, vol='Garch', p=p, q=q, rescale=False)
        fitted_model = model.fit(disp='off')

        print(fitted_model.summary())

        self.models['GARCH'] = fitted_model

        # Extract conditional volatility
        self.forecasts['GARCH'] = fitted_model.conditional_volatility

        return fitted_model

    def fit_egarch(self, p=1, q=1):
        """
        Fit EGARCH(p, q) model

        EGARCH captures asymmetric effects (leverage effect)
        log(σ²(t)) = ω + Σα(i)·|ε(t-i)/σ(t-i)| + Σγ(i)·ε(t-i)/σ(t-i) + Σβ(j)·log(σ²(t-j))
        """
        print(f"\n{'='*80}")
        print(f"FITTING EGARCH({p},{q}) MODEL")
        print(f"{'='*80}")

        model = arch_model(self.returns, vol='EGARCH', p=p, q=q, rescale=False)
        fitted_model = model.fit(disp='off')

        print(fitted_model.summary())

        self.models['EGARCH'] = fitted_model
        self.forecasts['EGARCH'] = fitted_model.conditional_volatility

        return fitted_model

    def fit_gjr_garch(self, p=1, q=1):
        """
        Fit GJR-GARCH(p, q) model

        GJR-GARCH with threshold effect:
        σ²(t) = ω + Σ(α(i) + γ(i)·I(t-i))·ε²(t-i) + Σβ(j)·σ²(t-j)
        where I(t) = 1 if ε(t) < 0, else 0
        """
        print(f"\n{'='*80}")
        print(f"FITTING GJR-GARCH({p},{q}) MODEL")
        print(f"{'='*80}")

        model = arch_model(self.returns, vol='GARCH', p=p, o=1, q=q, rescale=False)
        fitted_model = model.fit(disp='off')

        print(fitted_model.summary())

        self.models['GJR-GARCH'] = fitted_model
        self.forecasts['GJR-GARCH'] = fitted_model.conditional_volatility

        return fitted_model

    def fit_svr(self, lookback=20):
        """
        Fit Support Vector Regression model for volatility forecasting

        Uses lagged volatility as features
        """
        print(f"\n{'='*80}")
        print(f"FITTING SVR MODEL (Lookback: {lookback})")
        print(f"{'='*80}")

        # Prepare features: lagged realized volatility
        vol_data = self.realized_vol.copy()

        # Create lagged features
        feature_df = pd.DataFrame(index=vol_data.index)
        for i in range(1, lookback + 1):
            feature_df[f'lag_{i}'] = vol_data.shift(i)

        feature_df['target'] = vol_data
        feature_df = feature_df.dropna()

        # Split data
        train_size = int(len(feature_df) * 0.8)
        train_data = feature_df.iloc[:train_size]
        test_data = feature_df.iloc[train_size:]

        X_train = train_data.drop('target', axis=1)
        y_train = train_data['target']
        X_test = test_data.drop('target', axis=1)
        y_test = test_data['target']

        # Scale features
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        # Fit SVR
        svr_model = SVR(kernel='rbf', C=10, gamma='scale', epsilon=0.1)
        svr_model.fit(X_train_scaled, y_train)

        # Predict
        y_pred = svr_model.predict(X_test_scaled)

        # Store predictions
        svr_forecast = pd.Series(index=test_data.index, data=y_pred)
        self.forecasts['SVR'] = svr_forecast

        # Calculate metrics
        mse = mean_squared_error(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        print(f"\nSVR Performance:")
        print(f"  MSE: {mse:.6f}")
        print(f"  MAE: {mae:.6f}")
        print(f"  R²: {r2:.6f}")

        self.models['SVR'] = {'model': svr_model, 'scaler': scaler,
                              'X_test': X_test_scaled, 'y_test': y_test}

        return svr_model

    def forecast_volatility(self, horizon=5):
        """
        Generate multi-step ahead forecasts

        Parameters:
        -----------
        horizon : int
            Number of periods ahead to forecast
        """
        print(f"\n{'='*80}")
        print(f"GENERATING {horizon}-STEP AHEAD FORECASTS")
        print(f"{'='*80}")

        multi_step_forecasts = {}

        # GARCH models
        for model_name in ['GARCH', 'EGARCH', 'GJR-GARCH']:
            if model_name in self.models:
                model = self.models[model_name]
                # Use simulation for horizon > 1 as analytic is not supported
                method = 'simulation' if horizon > 1 else 'analytic'
                try:
                    forecast = model.forecast(horizon=horizon, reindex=False, method=method)

                    # Extract volatility forecast (sqrt of variance)
                    vol_forecast = np.sqrt(forecast.variance.values[-1])
                    multi_step_forecasts[model_name] = vol_forecast

                    print(f"\n{model_name} {horizon}-day forecast:")
                    for i, vol in enumerate(vol_forecast, 1):
                        print(f"  Day {i}: {vol:.4f}%")
                except ValueError as e:
                    print(f"\nCould not generate {horizon}-step forecast for {model_name}: {e}")


        return multi_step_forecasts

    def evaluate_forecasts(self):
        """
        Evaluate forecast accuracy against realized volatility
        """
        print(f"\n{'='*80}")
        print("FORECAST EVALUATION")
        print(f"{'='*80}")

        results = {}

        # Align forecasts with realized volatility
        for model_name, forecast in self.forecasts.items():
            if model_name == 'SVR':
                # SVR already aligned during fitting
                actual = self.models['SVR']['y_test']
                pred = forecast
            else:
                # GARCH family models
                # Use conditional volatility as 1-step ahead forecast
                # GARCH family models often have one more observation than realized vol (because realized vol starts at lag=20)
                # We need to shift the realized vol and align the forecast

                # Align GARCH forecasts (which start from the 2nd day) with Realized Vol (which is a rolling window)
                # Conditional Volatility (h_t) is the forecast for tomorrow (t+1) based on information up to today (t).
                # To compare it to Realized Volatility, we must shift the Realized Volatility series.
                # However, since you are comparing h_t to a rolling-window std dev, let's keep the existing indexing logic
                # but ensure the result is a clean Series.

                common_idx = forecast.index.intersection(self.realized_vol.index)
                actual = self.realized_vol.loc[common_idx]
                pred = forecast.loc[common_idx]

            # --- FIX: Ensure the Series are valid before checking truth value ---

            # Calculate metrics
            mse = mean_squared_error(actual, pred)
            rmse = np.sqrt(mse)
            mae = mean_absolute_error(actual, pred)

            # QLIKE (Quasi-Likelihood)
            qlike = np.nan

            # Only proceed if there are valid (non-zero/positive) data points for QLIKE
            # Ensure the Series are not empty AND all elements are greater than a very small number (to handle near-zero)
            if not pred.empty and not actual.empty:
                 # Check if all elements are positive (using a small tolerance)
                actual_positive = (actual > 1e-9).all()
                pred_positive = (pred > 1e-9).all()

                if actual_positive and pred_positive:
                    # Calculate QLIKE
                    qlike_values = (actual / pred) - np.log(actual / pred) - 1
                    qlike = np.mean(qlike_values)


            results[model_name] = {
                'MSE': mse,
                'RMSE': rmse,
                'MAE': mae,
                'QLIKE': qlike
            }

            print(f"\n{model_name}:")
            print(f"  RMSE: {rmse:.6f}")
            print(f"  MAE: {mae:.6f}")
            print(f"  QLIKE: {qlike:.6f}" if not np.isnan(qlike) else f"  QLIKE: N/A")

        # Find best model
        # Exclude NaN QLIKE values from comparison for robustness, if you choose QLIKE as metric
        # Sticking to RMSE as in original code logic
        best_model = min(results, key=lambda x: results[x]['RMSE'])
        print(f"\n✓ Best Model (by RMSE): {best_model}")

        return results

    def sensitivity_analysis(self):
        """Test different model parameters"""
        print(f"\n{'='*80}")
        print("SENSITIVITY ANALYSIS")
        print(f"{'='*80}")

        configurations = [
            (1, 1), (1, 2), (2, 1), (2, 2)
        ]

        results = {}

        for p, q in configurations:
            print(f"\nTesting GARCH({p},{q})...")
            model = arch_model(self.returns, vol='Garch', p=p, q=q, rescale=False)
            fitted = model.fit(disp='off')

            # AIC and BIC
            aic = fitted.aic
            bic = fitted.bic

            results[f'GARCH({p},{q})'] = {
                'AIC': aic,
                'BIC': bic,
                'LogLikelihood': fitted.loglikelihood
            }

            print(f"  AIC: {aic:.2f}, BIC: {bic:.2f}")

        # Find best by BIC
        best_config = min(results, key=lambda x: results[x]['BIC'])
        print(f"\n✓ Best Configuration (by BIC): {best_config}")

        return results


    def plot_results(self):
        """Comprehensive visualization of results"""
        fig = plt.figure(figsize=(16, 14))

        # Plot 1: Returns
        ax1 = plt.subplot(4, 2, 1)
        ax1.plot(self.returns, linewidth=0.5, alpha=0.7)
        ax1.set_title('Log Returns', fontweight='bold')
        ax1.set_ylabel('Returns (%)')
        ax1.grid(True, alpha=0.3)

        # Plot 2: Returns histogram
        ax2 = plt.subplot(4, 2, 2)
        ax2.hist(self.returns, bins=50, alpha=0.7, edgecolor='black')
        ax2.axvline(self.returns.mean().item(), color='red', linestyle='--',
                   linewidth=2, label=f'Mean: {self.returns.mean().item():.3f}%')
        ax2.set_title('Returns Distribution', fontweight='bold')
        ax2.set_xlabel('Returns (%)')
        ax2.set_ylabel('Frequency')
        ax2.legend()
        ax2.grid(True, alpha=0.3)

        # Plot 3: Realized volatility
        ax3 = plt.subplot(4, 2, 3)
        ax3.plot(self.realized_vol, linewidth=1.5, color='blue', label='Realized Vol')
        ax3.set_title('Realized Volatility (20-day rolling)', fontweight='bold')
        ax3.set_ylabel('Volatility (%)')
        ax3.legend()
        ax3.grid(True, alpha=0.3)

        # Plot 4: Compare all forecasts
        ax4 = plt.subplot(4, 2, 4)

        # Plot realized volatility
        ax4.plot(self.realized_vol, linewidth=1, color='black',
                label='Realized', alpha=0.7)

        # Plot model forecasts
        colors = ['red', 'green', 'blue', 'orange']
        for (name, forecast), color in zip(self.forecasts.items(), colors):
            if name == 'SVR':
                ax4.plot(forecast, linewidth=1.5, label=name,
                        color=color, alpha=0.8)
            else:
                ax4.plot(forecast, linewidth=1.5, label=name,
                        color=color, alpha=0.8)

        ax4.set_title('Volatility Forecasts Comparison', fontweight='bold')
        ax4.set_ylabel('Volatility (%)')
        ax4.legend()
        ax4.grid(True, alpha=0.3)

        # Plot 5: GARCH conditional volatility vs realized
        if 'GARCH' in self.forecasts:
            ax5 = plt.subplot(4, 2, 5)
            common_idx = self.forecasts['GARCH'].index.intersection(self.realized_vol.index)
            ax5.scatter(self.realized_vol.loc[common_idx],
                       self.forecasts['GARCH'].loc[common_idx],
                       alpha=0.3, s=10)

            # Add diagonal line
            max_val = max(self.realized_vol.loc[common_idx].max(),
                         self.forecasts['GARCH'].loc[common_idx].max())
            min_val = min(self.realized_vol.loc[common_idx].min(),
                         self.forecasts['GARCH'].loc[common_idx].min())
            ax5.plot([min_val, max_val], [min_val, max_val],
                    'r--', linewidth=2)

            ax5.set_title('GARCH: Forecast vs Realized', fontweight='bold')
            ax5.set_xlabel('Realized Volatility (%)')
            ax5.set_ylabel('GARCH Forecast (%)')
            ax5.grid(True, alpha=0.3)

        # Plot 6: Forecast errors
        ax6 = plt.subplot(4, 2, 6)
        for name, forecast in self.forecasts.items():
            if name != 'SVR':
                common_idx = forecast.index.intersection(self.realized_vol.index)
                errors = forecast.loc[common_idx] - self.realized_vol.loc[common_idx]
                ax6.plot(errors, linewidth=1, label=name, alpha=0.7)

        ax6.axhline(0, color='black', linestyle='--', linewidth=1)
        ax6.set_title('Forecast Errors', fontweight='bold')
        ax6.set_ylabel('Error (%)')
        ax6.legend()
        ax6.grid(True, alpha=0.3)

        # Plot 7: Volatility clustering
        ax7 = plt.subplot(4, 2, 7)
        squared_returns = self.returns ** 2
        ax7.plot(squared_returns, linewidth=0.5, alpha=0.7, color='red')
        ax7.set_title('Squared Returns (Volatility Clustering)', fontweight='bold')
        ax7.set_ylabel('Squared Returns')
        ax7.set_xlabel('Date')
        ax7.grid(True, alpha=0.3)

        # Plot 8: ACF of squared returns
        ax8 = plt.subplot(4, 2, 8)
        from statsmodels.graphics.tsaplots import plot_acf
        plot_acf(squared_returns.dropna(), lags=30, ax=ax8, alpha=0.05)
        ax8.set_title('ACF of Squared Returns', fontweight='bold')
        ax8.set_xlabel('Lag')
        ax8.set_ylabel('Autocorrelation')

        plt.tight_layout()
        plt.savefig('volatility_forecasting_results.png', dpi=300, bbox_inches='tight')
        print("\nVisualization saved as 'volatility_forecasting_results.png'")

        return fig


    def sensitivity_analysis(self):
        """Test different model parameters"""
        print(f"\n{'='*80}")
        print("SENSITIVITY ANALYSIS")
        print(f"{'='*80}")

        configurations = [
            (1, 1), (1, 2), (2, 1), (2, 2)
        ]

        results = {}

        for p, q in configurations:
            print(f"\nTesting GARCH({p},{q})...")
            model = arch_model(self.returns, vol='Garch', p=p, q=q, rescale=False)
            fitted = model.fit(disp='off')

            # AIC and BIC
            aic = fitted.aic
            bic = fitted.bic

            results[f'GARCH({p},{q})'] = {
                'AIC': aic,
                'BIC': bic,
                'LogLikelihood': fitted.loglikelihood
            }

            print(f"  AIC: {aic:.2f}, BIC: {bic:.2f}")

        # Find best by BIC
        best_config = min(results, key=lambda x: results[x]['BIC'])
        print(f"\n✓ Best Configuration (by BIC): {best_config}")

        return results


def main():
    """Main execution function"""

    print("="*80)
    print("VOLATILITY FORECASTING - COMPLETE IMPLEMENTATION")
    print("="*80)

    # Configuration
    TICKER = 'SPY'  # S&P 500 ETF
    START_DATE = '2020-01-01'
    END_DATE = '2025-01-01'

    # Initialize forecaster
    forecaster = VolatilityForecaster(TICKER, START_DATE, END_DATE)

    # Step 1: Download data
    forecaster.download_data()

    # Step 2: Calculate returns
    forecaster.calculate_returns()

    # Step 3: Calculate realized volatility
    forecaster.calculate_realized_volatility(window=20)

    # Step 4: Fit models
    print("\n" + "="*80)
    print("FITTING VOLATILITY MODELS")
    print("="*80)

    forecaster.fit_garch(p=1, q=1)
    forecaster.fit_egarch(p=1, q=1)
    forecaster.fit_gjr_garch(p=1, q=1)
    forecaster.fit_svr(lookback=20)

    # Step 5: Multi-step forecasts
    forecaster.forecast_volatility(horizon=5)

    # Step 6: Evaluate forecasts
    results = forecaster.evaluate_forecasts()

    # Step 7: Sensitivity analysis
    forecaster.sensitivity_analysis()

    # Step 8: Visualize
    forecaster.plot_results()

    plt.show()

    print("\n" + "="*80)
    print("VOLATILITY FORECASTING COMPLETE!")
    print("="*80)

    return forecaster

if __name__ == "__main__":
    forecaster = main()

VOLATILITY FORECASTING - COMPLETE IMPLEMENTATION
VOLATILITY FORECASTING FOR SPY


[*********************100%***********************]  1 of 1 completed



Downloaded 1258 trading days
Date range: 2020-01-02 to 2024-12-31

Return Statistics:
  Mean: 0.0530%
  Std Dev: 1.3282%
  Skewness: -0.8102
  Kurtosis: 12.3721

Realized Volatility (rolling 20-day std):
  Mean: 1.1102%
  Min: 0.3410%
  Max: 5.9167%

FITTING VOLATILITY MODELS

FITTING GARCH(1,1) MODEL
                     Constant Mean - GARCH Model Results                      
Dep. Variable:                    SPY   R-squared:                       0.000
Mean Model:             Constant Mean   Adj. R-squared:                  0.000
Vol Model:                      GARCH   Log-Likelihood:               -1828.10
Distribution:                  Normal   AIC:                           3664.19
Method:            Maximum Likelihood   BIC:                           3684.74
                                        No. Observations:                 1257
Date:                Wed, Oct 15 2025   Df Residuals:                     1256
Time:                        19:30:29   Df Model:               

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().