In [4]:
# %%
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from hmmlearn import hmm
import yfinance as yf
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# ======== CONFIGURABLE PARAMETERS ========
# Market data parameters
TICKER = 'SPY'  # Main ticker to analyze
VIX_TICKER = '^VIX'  # Volatility index
START_DATE = "1995-01-01"  # Historical data start date

# HMM model parameters  
HIDDEN_STATES = 3  # Number of market regimes
EM_ITERATIONS = 100  # Training iterations for HMM

# High-pass filter parameters
HPF_WINDOW = 21  # Window size for the high-pass filter moving average

# Training period
TRAIN_START_DATE = "2019-01-01"  #"2019-01-01"
TRAIN_END_DATE = "2024-01-01"   #"2024-01-01"

# ======== DATA PREPARATION FUNCTIONS ========
def download_market_data(ticker, vix_ticker, start_date):
    """Download and prepare market data"""
    end_date = datetime.today().strftime("%Y-%m-%d")
    print(f"Downloading market data from {start_date} to {end_date}...")
    
    # Download ticker and VIX data
    df_ticker = yf.download(ticker, start=start_date, end=end_date, auto_adjust=True)
    df_vix = yf.download(vix_ticker, start=start_date, end=end_date)
    
    # Fix column structure and reset index
    if len(df_ticker.columns.names) > 1:
        df_ticker.columns = df_ticker.columns.droplevel(1)
    if len(df_vix.columns.names) > 1:
        df_vix.columns = df_vix.columns.droplevel(1)
    
    df_ticker = df_ticker.reset_index()
    df_vix = df_vix.reset_index()
    
    # Keep only Date and Close from VIX
    df_vix = df_vix[['Date', 'Close']].rename(columns={'Close': 'VIX'})
    
    # Merge data
    df = pd.merge(df_ticker, df_vix, on='Date', how='left')
    df['VIX'] = df['VIX'].fillna(method='ffill')
    df['LogVIX'] = np.log(df['VIX'])
    
    return df

def calculate_indicators(data):
    """Calculate technical indicators for regime classification"""
    df_copy = data.copy()
    
    # Calculate returns
    df_copy['Return'] = df_copy['Close'].pct_change() * 100
    
    # Calculate volatility (10-day window)
    df_copy['MA10'] = df_copy['Close'].rolling(window=10).mean()
    df_copy['Volatility'] = df_copy['Close'].rolling(window=10).apply(
        lambda x: np.sum((x - x.mean())**2) / len(x)
    )
    
    # LogVIX ratio compared to 10-day average
    df_copy['LogVIX_10MA'] = df_copy['LogVIX'].rolling(window=10).mean()
    df_copy['LogVIX_Ratio'] = df_copy['LogVIX'] / df_copy['LogVIX_10MA']
    
    # Forward fill and backward fill any NaNs
    df_copy = df_copy.fillna(method='ffill').fillna(method='bfill')
    
    return df_copy

def apply_high_pass_filter(data, window_size=10):
    """
    Apply high-pass filter by subtracting moving average 
    from original signal to emphasize significant market movements
    """
    filtered_data = data.copy()
    
    # Apply HPF to returns
    filtered_data['Return_MA'] = filtered_data['Return'].rolling(window=window_size).mean()
    filtered_data['HPF_Return'] = filtered_data['Return'] - filtered_data['Return_MA']
    
    # Apply HPF to LogVIX
    filtered_data['LogVIX_MA'] = filtered_data['LogVIX'].rolling(window=window_size).mean()
    filtered_data['HPF_LogVIX'] = filtered_data['LogVIX'] - filtered_data['LogVIX_MA']
    
    # Apply HPF to volatility
    filtered_data['Volatility_MA'] = filtered_data['Volatility'].rolling(window=window_size).mean()
    filtered_data['HPF_Volatility'] = filtered_data['Volatility'] - filtered_data['Volatility_MA']
    
    # Calculate SPY Volume Volatility (20-day rolling std of volume)
    filtered_data['Volume_Volatility'] = filtered_data['Volume'].rolling(window=20).std()
    filtered_data['Volume_Volatility_MA'] = filtered_data['Volume_Volatility'].rolling(window=window_size).mean()
    filtered_data['HPF_SPY_Volume_Volatility'] = filtered_data['Volume_Volatility'] - filtered_data['Volume_Volatility_MA']
    
    # Calculate SPY Volume index to vol ratio (20-day SMA)
    filtered_data['Volume_to_Volatility_Ratio'] = filtered_data['Volume'] / (filtered_data['Volatility'] + 1e-10)
    filtered_data['Volume_to_Volatility_Ratio_20SMA'] = filtered_data['Volume_to_Volatility_Ratio'].rolling(window=20).mean()
    filtered_data['Volume_to_Volatility_Ratio_20SMA_MA'] = filtered_data['Volume_to_Volatility_Ratio_20SMA'].rolling(window=window_size).mean()
    filtered_data['HPF_SPY_Volume_index_to_vol_20daySMA'] = filtered_data['Volume_to_Volatility_Ratio_20SMA'] - filtered_data['Volume_to_Volatility_Ratio_20SMA_MA']
    
    # Calculate VIX to SPY Volatility ratio
    filtered_data['VIX_to_SPY_Volatility'] = filtered_data['VIX'] / (filtered_data['Volatility'] + 1e-10)
    filtered_data['VIX_to_SPY_Volatility_MA'] = filtered_data['VIX_to_SPY_Volatility'].rolling(window=window_size).mean()
    filtered_data['HPF_volatility_VIX'] = filtered_data['VIX_to_SPY_Volatility'] - filtered_data['VIX_to_SPY_Volatility_MA']
    
    # Additional feature: Return to Volatility ratio (volatility-adjusted returns)
    filtered_data['Return_to_Volatility'] = filtered_data['Return'] / (filtered_data['Volatility'] + 1e-10)
    filtered_data['Return_to_Volatility_MA'] = filtered_data['Return_to_Volatility'].rolling(window=window_size).mean()
    filtered_data['HPF_Return_to_Volatility'] = filtered_data['Return_to_Volatility'] - filtered_data['Return_to_Volatility_MA']
    
    # Fill any NaNs created by rolling windows
    filtered_data = filtered_data.fillna(method='ffill').fillna(method='bfill')
    
    return filtered_data

# ======== HMM MODEL FUNCTIONS ========
def train_hmm_model(data, start_date, end_date, n_states=3, n_iter=75):
    """Train HMM model on high-pass filtered features"""
    # Filter data to training period
    training = data[(data['Date'] >= start_date) & (data['Date'] <= end_date)].copy()
    
    print(f"Training HMM model on data from {start_date} to {end_date}")
    print(f"Training data shape: {training.shape}")
    
    # Prepare observations for HMM using HPF features
    obs = np.column_stack([
        training['HPF_Volatility'].values, 
        training['HPF_Return'].values,
        training['HPF_LogVIX'].values,
        training['HPF_SPY_Volume_Volatility'].values,
        training['HPF_SPY_Volume_index_to_vol_20daySMA'].values,
        training['HPF_volatility_VIX'].values,
        training['HPF_Return_to_Volatility'].values
    ])
    
    # Create and train the model
    model = hmm.GaussianHMM(n_components=n_states, covariance_type="full", n_iter=n_iter)
    model.fit(obs)
    
    # Get predictions for training data
    predictions = model.predict(obs)
    
    # Analyze regime characteristics using original (non-HPF) data for interpretability
    regime_stats = {}
    for i in range(n_states):
        regime_mask = (predictions == i)
        if np.sum(regime_mask) > 0:
            regime_stats[i] = {
                'count': np.sum(regime_mask),
                'return_avg': np.mean(training.loc[regime_mask, 'Return']),
                'vix_avg': np.mean(training.loc[regime_mask, 'VIX']),
                'logvix_avg': np.mean(training.loc[regime_mask, 'LogVIX']),
                'volatility_avg': np.mean(training.loc[regime_mask, 'Volatility']),
                'volume_volatility_avg': np.mean(training.loc[regime_mask, 'Volume_Volatility']),
                'vix_to_spy_vol_avg': np.mean(training.loc[regime_mask, 'VIX_to_SPY_Volatility'])
            }
    
    # Assign labels to regimes (Bull, Bear, Neutral) based on characteristics
    regime_labels = [""] * n_states
    bull_scores = []
    bear_scores = []
    neutral_scores = []
    
    for i in range(n_states):
        if i not in regime_stats:
            bull_scores.append(0)
            bear_scores.append(0)
            neutral_scores.append(0)
            continue
            
        stats = regime_stats[i]
        
        # Bull regime scoring: high returns, lower VIX
        bull_score = 0
        if stats['return_avg'] > 0.05:
            bull_score += 2
        elif stats['return_avg'] > 0:
            bull_score += 1
        
        if stats['logvix_avg'] < 2.8:  # Log(16.5) ≈ 2.8
            bull_score += 2
        elif stats['logvix_avg'] < 3.0:  # Log(20) ≈ 3.0
            bull_score += 1
        
        # Bear regime scoring: negative returns, higher VIX
        bear_score = 0
        if stats['return_avg'] < -0.1:
            bear_score += 2
        elif stats['return_avg'] < 0:
            bear_score += 1
        
        if stats['logvix_avg'] > 3.2:  # Log(25) ≈ 3.2
            bear_score += 2
        elif stats['logvix_avg'] > 3.0:
            bear_score += 1
        
        # Neutral regime scoring: modest returns, moderate VIX
        neutral_score = 0
        if -0.05 < stats['return_avg'] < 0.05:
            neutral_score += 2
        elif -0.1 < stats['return_avg'] < 0.1:
            neutral_score += 1
        
        if 2.8 <= stats['logvix_avg'] <= 3.2:
            neutral_score += 2
        elif 2.7 <= stats['logvix_avg'] <= 3.3:
            neutral_score += 1
        
        bull_scores.append(bull_score)
        bear_scores.append(bear_score)
        neutral_scores.append(neutral_score)
    
    # Assign labels based on highest score
    labels_to_assign = ["Bull", "Bear", "Neutral"]
    scores = [(i, max(bull_scores[i], bear_scores[i], neutral_scores[i]), 
               "Bull" if bull_scores[i] >= max(bear_scores[i], neutral_scores[i]) else
               "Bear" if bear_scores[i] >= max(bull_scores[i], neutral_scores[i]) else
               "Neutral") 
              for i in range(n_states)]
    
    # Sort by score and assign labels ensuring each label is used only once
    scores.sort(key=lambda x: x[1], reverse=True)
    assigned_labels = set()
    
    for regime_idx, _, preferred_label in scores:
        if preferred_label not in assigned_labels:
            regime_labels[regime_idx] = preferred_label
            assigned_labels.add(preferred_label)
        else:
            # Find an unassigned label
            for label in labels_to_assign:
                if label not in assigned_labels:
                    regime_labels[regime_idx] = label
                    assigned_labels.add(label)
                    break
    
    # Print key regime statistics
    print("\nRegime Characteristics Summary:")
    print("=" * 60)
    print(f"{'Regime':<10} {'Label':<8} {'Count':<8} {'Return %':<10} {'VIX':<8} {'LogVIX':<8}")
    print("-" * 60)
    
    for i in range(n_states):
        if i in regime_stats:
            stats = regime_stats[i]
            print(f"{i:<10} {regime_labels[i]:<8} {stats['count']:<8} "
                  f"{stats['return_avg']:<10.2f} {stats['vix_avg']:<8.2f} {stats['logvix_avg']:<8.2f}")
    
    # Print transition matrix
    print("\nRegime Transition Matrix:")
    transition_matrix = model.transmat_
    
    print("=" * 60)
    print(f"{'From/To':<10}", end="")
    for i in range(n_states):
        print(f"{regime_labels[i]:<10}", end="")
    print()
    print("-" * 60)
    
    for i in range(n_states):
        print(f"{regime_labels[i]:<10}", end="")
        for j in range(n_states):
            print(f"{transition_matrix[i, j]:<10.2f}", end="")
        print()
    
    # Calculate stationary distribution
    stationary_dist = model.get_stationary_distribution()
    print("\nStationary Distribution (Long-term regime probabilities):")
    for i in range(n_states):
        print(f"Regime {i} [{regime_labels[i]}]: {stationary_dist[i]*100:.2f}%")
    
    return model, training, predictions, regime_labels, regime_stats

def predict_regimes(model, data, start_date, end_date, regime_labels):
    """Predict market regimes for a specific date range using the trained HMM model"""
    # Filter data for prediction period
    pred_data = data[(data['Date'] >= start_date) & (data['Date'] <= end_date)].copy()
    
    if len(pred_data) == 0:
        print(f"No data available for period {start_date} to {end_date}")
        return None
    
    # Prepare observations for prediction using HPF features
    obs = np.column_stack([
        pred_data['HPF_Volatility'].values, 
        pred_data['HPF_Return'].values,
        pred_data['HPF_LogVIX'].values,
        pred_data['HPF_SPY_Volume_Volatility'].values,
        pred_data['HPF_SPY_Volume_index_to_vol_20daySMA'].values,
        pred_data['HPF_volatility_VIX'].values,
        pred_data['HPF_Return_to_Volatility'].values
    ])
    
    # Predict regimes
    predictions = model.predict(obs)
    
    # Add predictions to dataframe
    pred_data['Predicted_Regime'] = predictions
    pred_data['Regime_Label'] = [regime_labels[r] for r in predictions]
    
    # Print basic statistics about the prediction
    print(f"Predicted regimes for period {start_date} to {end_date}")
    
    # Calculate regime distribution
    regime_counts = pd.Series(predictions).value_counts(normalize=True) * 100
    print("\nRegime Distribution:")
    for regime, percentage in regime_counts.items():
        print(f"Regime {regime} [{regime_labels[regime]}]: {percentage:.2f}%")
    
    # Calculate average regime duration
    regime_changes = (pred_data['Predicted_Regime'] != pred_data['Predicted_Regime'].shift(1)).sum()
    avg_duration = len(pred_data) / (regime_changes if regime_changes > 0 else 1)
    print(f"\nRegime persistence: {avg_duration:.2f} days average duration")
    
    return pred_data

# ======== VISUALIZATION FUNCTIONS ========
def plot_regimes(results, title=None):
    """Plot SPY price with regime classifications"""
    if results is None or len(results) == 0:
        print("No data available to plot")
        return
    
    # Set plot title
    if title is None:
        start_date = results['Date'].min().strftime('%Y-%m-%d')
        end_date = results['Date'].max().strftime('%Y-%m-%d')
        title = f'Market Regimes from {start_date} to {end_date}'
    
    # Create a categorical color map for regimes
    unique_regimes = results['Predicted_Regime'].unique()
    n_regimes = len(unique_regimes)
    regime_colors = px.colors.qualitative.Set2[:n_regimes]
    
    # Create figure
    fig = go.Figure()
    
    # Add price line
    fig.add_trace(
        go.Scatter(
            x=results['Date'],
            y=results['Close'],
            mode='lines',
            line=dict(color='rgba(0,0,0,0.3)', width=1),
            name=f'{TICKER} Price'
        )
    )
    
    # Add colored markers for different regimes
    for i, regime in enumerate(sorted(unique_regimes)):
        regime_data = results[results['Predicted_Regime'] == regime]
        regime_label = results.loc[results['Predicted_Regime'] == regime, 'Regime_Label'].iloc[0]
        
        fig.add_trace(
            go.Scatter(
                x=regime_data['Date'], 
                y=regime_data['Close'],
                mode='markers',
                marker=dict(color=regime_colors[i], size=6),
                name=f'{regime_label} Regime',
                hovertemplate='%{x}<br>Price: %{y:.2f}<br>Regime: ' + regime_label
            )
        )
    
    # Update layout
    fig.update_layout(
        title=title,
        xaxis_title='Date',
        yaxis_title=f'{TICKER} Price',
        template='plotly_white',
        legend_title='Market Regimes',
        hovermode='closest',
        height=600
    )
    
    fig.show()
    
    # Create pie chart showing regime distribution
    regime_distribution = results['Regime_Label'].value_counts().reset_index()
    regime_distribution.columns = ['Regime', 'Days']
    regime_distribution['Percentage'] = regime_distribution['Days'] / len(results) * 100
    
    fig_pie = px.pie(
        regime_distribution, 
        values='Percentage', 
        names='Regime',
        title=f'Regime Distribution ({results["Date"].min().strftime("%Y-%m-%d")} to {results["Date"].max().strftime("%Y-%m-%d")})',
        color_discrete_sequence=regime_colors
    )
    
    fig_pie.update_traces(textinfo='percent+label', textposition='inside')
    fig_pie.update_layout(uniformtext_minsize=12, uniformtext_mode='hide')
    
    fig_pie.show()

# ======== MAIN EXECUTION ========
def main():
    # Download and prepare data
    df = download_market_data(TICKER, VIX_TICKER, START_DATE)
    df = calculate_indicators(df)
    df = apply_high_pass_filter(df, HPF_WINDOW)
    
    # Train model
    model, training_data, train_predictions, regime_labels, regime_stats = train_hmm_model(
        df, TRAIN_START_DATE, TRAIN_END_DATE, HIDDEN_STATES, EM_ITERATIONS
    )
    
    # Plot training period results
    training_results = training_data.copy()
    training_results['Predicted_Regime'] = train_predictions
    training_results['Regime_Label'] = [regime_labels[r] for r in train_predictions]
    
    print("\nVisualization of training period regimes:")
    plot_regimes(training_results, f'Market Regimes - Training Period ({TRAIN_START_DATE} to {TRAIN_END_DATE})')
    
    # Example: Analyze the most recent 3 months
    today = datetime.today()
    three_months_ago = today - timedelta(days=150)
    
    recent_start = three_months_ago.strftime('%Y-%m-%d')
    recent_start ='2024-01-02'
    recent_end = today.strftime('%Y-%m-%d')
    
    print(f"\nPredicting regimes for recent period ({recent_start} to {recent_end}):")
    recent_results = predict_regimes(model, df, recent_start, recent_end, regime_labels)
    plot_regimes(recent_results, f'Market Regimes - Recent Period ({recent_start} to {recent_end})')
    
    return {
        'model': model,
        'data': df,
        'regime_labels': regime_labels,
        'training_results': training_results
    }

# Function to analyze any time period
def analyze_period(model_objects, start_date, end_date, title=None):
    """Analyze any time period using the trained model"""
    results = predict_regimes(
        model_objects['model'], 
        model_objects['data'], 
        start_date, 
        end_date, 
        model_objects['regime_labels']
    )
    
    if results is not None:
        plot_regimes(results, title)
    
    return results

# Execute main function
if __name__ == "__main__":
    model_objects = main()
    
    
    # Example 
    # To analyze a custom period, uncomment and modify:
    #custom_results = analyze_period(model_objects, "2008-01-01", "2009-12-31", "Market Regimes - 2008 Financial Crisis")

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

Downloading market data from 1995-01-01 to 2025-04-14...





Training HMM model on data from 2019-01-01 to 2024-01-01
Training data shape: (1258, 32)

Regime Characteristics Summary:
Regime     Label    Count    Return %   VIX      LogVIX  
------------------------------------------------------------
0          Bear     596      0.11       20.07    2.97    
1          Bull     362      0.08       16.28    2.77    
2          Neutral  300      -0.04      30.10    3.36    

Regime Transition Matrix:
From/To   Bear      Bull      Neutral   
------------------------------------------------------------
Bear      0.95      0.04      0.01      
Bull      0.06      0.94      0.00      
Neutral   0.03      0.00      0.97      

Stationary Distribution (Long-term regime probabilities):
Regime 0 [Bear]: 47.22%
Regime 1 [Bull]: 28.94%
Regime 2 [Neutral]: 23.84%

Visualization of training period regimes:



Predicting regimes for recent period (2024-01-02 to 2025-04-14):
Predicted regimes for period 2024-01-02 to 2025-04-14

Regime Distribution:
Regime 0 [Bear]: 62.93%
Regime 2 [Neutral]: 34.58%
Regime 1 [Bull]: 2.49%

Regime persistence: 14.59 days average duration


In [5]:
# %%
def get_posterior_probabilities(model, obs):
    """
    Compute posterior (gamma) probabilities for each time step in an HMM
    using the 'score_samples' method provided by hmmlearn.

    :param model: Trained hmmlearn GaussianHMM (or similar)
    :param obs: numpy array of shape (n_samples, n_features), the observation sequence
    :return: 2D numpy array of shape (n_samples, n_states) with posterior probabilities
    """
    # score_samples returns (total log-likelihood, posterior probabilities)
    _, posteriors = model.score_samples(obs)  
    return posteriors


def predict_regimes_with_threshold(model, data, start_date, end_date, regime_labels, threshold=0.6):
    """
    Predict market regimes using a trained HMM model, then apply a probability threshold
    to decide if we 'trust' a new state's posterior probability. If no state
    exceeds the threshold on a given day, remain in the previous day's regime.

    :param model: Trained hmmlearn GaussianHMM instance
    :param data: DataFrame with columns including 'Date' and the HPF features
    :param start_date: Start date string (YYYY-MM-DD)
    :param end_date: End date string (YYYY-MM-DD)
    :param regime_labels: List of string labels for each HMM state
    :param threshold: Float in [0,1], minimum posterior probability to accept a new regime
    :return: DataFrame with predicted regimes and final regime labels after threshold logic
    """
    # Filter data for prediction period
    pred_data = data[(data['Date'] >= start_date) & (data['Date'] <= end_date)].copy()
    if len(pred_data) == 0:
        print(f"No data available for period {start_date} to {end_date}")
        return None
    
    # Prepare observations
    obs = np.column_stack([
        pred_data['HPF_Volatility'].values, 
        pred_data['HPF_Return'].values,
        pred_data['HPF_LogVIX'].values,
        pred_data['HPF_SPY_Volume_Volatility'].values,
        pred_data['HPF_SPY_Volume_index_to_vol_20daySMA'].values,
        pred_data['HPF_volatility_VIX'].values,
        pred_data['HPF_Return_to_Volatility'].values
    ])
    
    # Compute posterior probabilities for each time step
    posteriors = get_posterior_probabilities(model, obs)  # shape (n_samples, n_states)
    
    # We will hold the final classification here
    final_states = np.zeros(len(pred_data), dtype=int)
    
    # Initialize with the maximum posterior state on the first day
    first_day_max = np.argmax(posteriors[0])
    final_states[0] = first_day_max
    
    # Apply threshold-based logic for subsequent days
    for i in range(1, len(pred_data)):
        max_state = np.argmax(posteriors[i])
        max_prob = posteriors[i, max_state]
        
        # If the new state's probability is below threshold, stick to previous day's state
        if max_prob < threshold:
            final_states[i] = final_states[i-1]
        else:
            # Accept the new state
            final_states[i] = max_state
    
    pred_data['Predicted_Regime'] = final_states
    pred_data['Regime_Label'] = [regime_labels[state] for state in final_states]
    
    # Print summary stats
    unique, counts = np.unique(final_states, return_counts=True)
    total_days = len(pred_data)
    print(f"\nThreshold-based Regime Prediction from {start_date} to {end_date}")
    print(f"Threshold = {threshold:.2f}")
    print(f"{'Regime':<10} {'Label':<10} {'Days':<6} {'Pct':<6}")
    for st, cnt in zip(unique, counts):
        pct = cnt / total_days * 100
        print(f"{st:<10} {regime_labels[st]:<10} {cnt:<6} {pct:.2f}%")
    
    return pred_data

# USAGE EXAMPLE AFTER YOU RUN YOUR 'main()':
# --------------------------------------------------------
threshold_results = predict_regimes_with_threshold(
     model_objects['model'],
     model_objects['data'],
     "2024-01-02",
     "2025-04-13",
     model_objects['regime_labels'],
     threshold=0.95
)
plot_regimes(threshold_results, "Threshold-based HMM Classification")


Threshold-based Regime Prediction from 2024-01-02 to 2025-04-13
Threshold = 0.95
Regime     Label      Days   Pct   
0          Bear       195    60.75%
1          Bull       11     3.43%
2          Neutral    115    35.83%
