# Simulation Study for Noise Filtering

This is the v0 for the simulation study on the sparse jump model comparison with HMM, to show that SJM is able to filter away noisy data by using the weighting in the algorithm.


In [2]:
#load packages
import numpy as np
import pandas as pd
from hmmlearn import hmm
from sklearn.metrics import balanced_accuracy_score, confusion_matrix
from scipy.optimize import linear_sum_assignment
from jumpmodels.sparse_jump import SparseJumpModel    # Sparse JM class
from jumpmodels.jump import JumpModel   
from scipy import stats 


## 1. Data Simulation & Utility Functions
def simulate_data(T, P, mu, random_state=None): """ Simulate data from a 3-state Gaussian HMM.

In [3]:
def simulate_data(T, P, mu, random_state=None):
    """
    Simulate data from a 2-state Gaussian HMM.
    
    Parameters:
        T (int): Number of observations.
        P (int): Total number of features (only first 15 are informative).
        mu (float): Signal magnitude for informative features.
        random_state (int or None): Seed for reproducibility.
        
    Returns:
        X (ndarray): Simulated observations (T x P).
        states (ndarray): True state sequence (length T).
    """
    rng = np.random.default_rng(random_state)
    
    # Transition matrix for 2 states
    transmat = np.array([[0.9980, 0.0020],
                         [0.0114, 0.9886]])
    
    # Compute stationary distribution
    eigvals, eigvecs = np.linalg.eig(transmat.T)
    stat = np.real(eigvecs[:, np.isclose(eigvals, 1)])
    stat = stat[:, 0]
    stat = stat / np.sum(stat)
    
    # Generate state sequence
    states = np.zeros(T, dtype=int)
    states[0] = rng.choice(np.arange(2), p=stat)
    for t in range(1, T):
        states[t] = rng.choice(np.arange(2), p=transmat[states[t-1]])
    
    # Define means for each state: state 0 = -mu, state 1 = mu for first 15 features.
    means = np.zeros((2, P))
    if P >= 15:
        means[0, :15] = -mu
        means[1, :15] = mu
    else:
        means[0, :P] = -mu
        means[1, :P] = mu
    
    # Generate observations: N(means[state], I_P)
    X = np.zeros((T, P))
    for t in range(T):
        X[t] = rng.normal(loc=means[states[t]], scale=1.0, size=P)
    
    return X, states

## 2.Aligning Predicted Labels With True Labels using the Hungarian Algorithm

In [4]:
def align_labels(true_labels, pred_labels):
    """
    Align predicted labels with true labels using the Hungarian algorithm.
    
    Returns:
        aligned (ndarray): Predicted labels after optimal permutation.
    """
    D = confusion_matrix(true_labels, pred_labels)
    row_ind, col_ind = linear_sum_assignment(-D)
    mapping = {col: row for row, col in zip(row_ind, col_ind)}
    aligned = np.array([mapping[x] for x in pred_labels])
    return aligned

## 3. Setting up the function to calcuate the BAC

In [5]:
def calculate_bac(true_states, pred_states):
    """
    Compute the Balanced Accuracy (BAC) after aligning the predicted state labels.
    """
    aligned_pred = align_labels(true_states, pred_states)
    return balanced_accuracy_score(true_states, aligned_pred)


## 4. Functions for model formulation

### 4.1 HMM With Nystrup (2021) initialization

In [6]:
def run_hmm(X, n_components=2, random_state=None):
    """
    Fit a Gaussian HMM to the data X.
    
    Parameters:
        X (ndarray): Data matrix.
        n_components (int): Number of hidden states.
        random_state (int or None): Seed for reproducibility.
    
    Returns:
        pred_states (ndarray): Predicted state sequence using Viterbi decoding.
    """
    model = hmm.GaussianHMM(
        n_components=n_components,
        covariance_type='diag',
        n_iter=100,
        random_state=random_state,
        init_params="mc",
        covars_prior=1.0
    )
    model.startprob_ = np.full(n_components, 1.0 / n_components)
    transmat = np.full((n_components, n_components), 0.05 / (n_components - 1))
    np.fill_diagonal(transmat, 0.95)
    model.transmat_ = transmat
    
    model.fit(X)
    pred_states = model.predict(X)
    return pred_states

### 4.2 Normal (Standard) Jump Model with Grid Search over λ

In [7]:
def run_jump_model_grid_search(X, true_states, n_components=2, random_state=None):
    """
    Perform a grid search over lambda values for the jump model.
    
    Parameters:
        X (ndarray): Data matrix.
        true_states (ndarray): True hidden state sequence.
        n_components (int): Number of states.
        random_state (int or None): Seed for reproducibility.
    
    Returns:
        best_labels (ndarray): Predicted state sequence for the best lambda.
        best_bac (float): Best balanced accuracy achieved.
    """
    lambda_values = np.logspace(-2, 4, 14)
    best_bac = -1
    best_labels = None
    
    for lam in lambda_values:
        model = JumpModel(
            n_components=n_components,
            jump_penalty=lam,
            cont=False,
            max_iter=10,
            random_state=random_state
        )
        model.fit(X)
        labels = model.labels_
        bac = calculate_bac(true_states, labels)
        if bac > best_bac:
            best_bac = bac
            best_labels = labels
    
    return best_labels, best_bac

### 4.3 Sparse Jump Model with Grid Search over λ and kappa

In [8]:
def run_sparse_jump_model_grid_search(X, true_states, n_components=2, random_state=None):
    """
    Perform a grid search for the best combination of jump_penalty (lambda) and feature selection
    level for the Sparse Jump Model (SJM).
    
    Parameters
    ----------
    X : ndarray
        Data matrix of shape (T, P).
    true_states : ndarray
        True hidden state sequence.
    n_components : int, default=2
        Number of hidden states.
    random_state : int or None, optional
        Seed for reproducibility.
    
    Returns
    -------
    best_labels : ndarray
        Predicted state sequence for the best combination.
    best_bac : float
        The best balanced accuracy achieved.
    """
    lambdas = np.logspace(-1, 2, 7)
    p = X.shape[1]
    kappas = np.linspace(1, np.sqrt(p), 14)
    
    best_bac = -1
    best_labels = None
    
    for lam in lambdas:
        for kappa in kappas:
            max_feats = kappa**2
            model = SparseJumpModel(
                n_components=n_components,
                jump_penalty=lam,
                cont=False,
                max_feats=max_feats,
                max_iter=10,
                random_state=random_state
            )
            model.fit(X)
            labels = model.labels_
            bac = calculate_bac(true_states, labels)
            if bac > best_bac:
                best_bac = bac
                best_labels = labels
    
    return best_labels, best_bac

# 5. Main Execution
 We split the code into three sections for each model and then combine results at the end.


In [9]:
if __name__ == "__main__":
    
    # --- Settings ---
    T = 500
    mu_values = [0.1, 0.25, 0.5, 0.75, 1.0]
    p_values = [15, 30, 60, 150, 300]
    n_simulations = 10
    
    final_rows = []
    
    for mu in mu_values:
        for P in p_values:
            hmm_bac_list = []
            jump_bac_list = []
            sparse_bac_list = []
            
            print(f"\nStarting simulations for mu = {mu}, P = {P}")
            for sim in range(n_simulations):
                X, true_states = simulate_data(T, P, mu, random_state=sim)
                
                # Gaussian HMM
                pred_hmm = run_hmm(X, n_components=2, random_state=sim)
                bac_hmm = calculate_bac(true_states, pred_hmm)
                hmm_bac_list.append(bac_hmm)
                
                # Normal Jump Model
                _, bac_jump = run_jump_model_grid_search(X, true_states, n_components=2, random_state=sim)
                jump_bac_list.append(bac_jump)
                
                # Sparse Jump Model
                _, bac_sparse = run_sparse_jump_model_grid_search(X, true_states, n_components=2, random_state=sim)
                sparse_bac_list.append(bac_sparse)
                
                print(f"  Completed simulation {sim+1}/{n_simulations} for mu = {mu}, P = {P}", flush=True)
            
            hmm_mean, hmm_std = np.mean(hmm_bac_list), np.std(hmm_bac_list)
            jump_mean, jump_std = np.mean(jump_bac_list), np.std(jump_bac_list)
            sparse_mean, sparse_std = np.mean(sparse_bac_list), np.std(sparse_bac_list)
            
            tstat, pval = stats.ttest_rel(jump_bac_list, sparse_bac_list)
            if (pval < 0.05) and (sparse_mean > jump_mean):
                sparse_str = f"**{sparse_mean:.2f} ± {sparse_std:.2f}**"
            else:
                sparse_str = f"{sparse_mean:.2f} ± {sparse_std:.2f}"
            
            final_rows.append({
                "mu": mu,
                "P": P,
                "HMM (mean ± std)": f"{hmm_mean:.2f} ± {hmm_std:.2f}",
                "Jump (mean ± std)": f"{jump_mean:.2f} ± {jump_std:.2f}",
                "Sparse Jump (mean ± std)": sparse_str,
                "p-value (Jump vs Sparse)": f"{pval:.3g}"
            })
            
            print(f"Finished analysis for mu = {mu}, P = {P}: HMM BAC = {hmm_mean:.3f}, Jump BAC = {jump_mean:.3f}, Sparse Jump BAC = {sparse_mean:.3f}", flush=True)
    


Starting simulations for mu = 0.1, P = 15




KeyboardInterrupt: 

In [None]:
    df_results = pd.DataFrame(final_rows)
    print("\nFinal Results:")
    print(df_results, flush=True)