# Simulation Study for Noise Filtering

This is the v0 for the simulation study on the sparse jump model comparison with HMM, to show that SJM is able to filter away noisy data by using the weighting in the algorithm.


In [11]:
#load packages
import numpy as np
import pandas as pd
from hmmlearn import hmm
from sklearn.metrics import balanced_accuracy_score, confusion_matrix
from scipy.optimize import linear_sum_assignment
from jumpmodels.sparse_jump import SparseJumpModel    # Sparse JM class
from jumpmodels.jump import JumpModel    


## 1. Data Simulation & Utility Functions
def simulate_data(T, P, mu, random_state=None): """ Simulate data from a 3-state Gaussian HMM.

In [13]:
def simulate_data(T, P, mu, random_state=None):
    """
    Simulate data from a 3-state Gaussian HMM.
    
    Parameters:
        T (int): Number of observations.
        P (int): Total number of features (only first 15 are informative).
        mu (float): Signal magnitude for informative features.
        random_state (int or None): Seed for reproducibility.
        
    Returns:
        X (ndarray): Simulated observations (T x P).
        states (ndarray): True state sequence (length T).
    """
    rng = np.random.default_rng(random_state)
    
    # Transition matrix as given in your original code
    transmat = np.array([[0.9903, 0.0047, 0.0050],
                         [0.0157, 0.9666, 0.0177],
                         [0.0284, 0.0300, 0.9416]])
    transmat = transmat / transmat.sum(axis=1, keepdims=True)
    
    # Compute stationary distribution (eigenvector corresponding to eigenvalue 1)
    eigvals, eigvecs = np.linalg.eig(transmat.T)
    stat = np.real(eigvecs[:, np.isclose(eigvals, 1)])
    stat = stat[:, 0]
    stat = stat / np.sum(stat)
    
    # Generate state sequence
    states = np.zeros(T, dtype=int)
    states[0] = rng.choice(np.arange(3), p=stat)
    for t in range(1, T):
        states[t] = rng.choice(np.arange(3), p=transmat[states[t-1]])
    
    # Define means for each state
    means = np.zeros((3, P))
    # State 0: +mu in first 15 features
    # State 1: 0
    # State 2: -mu in first 15 features
    if P >= 15:
        means[0, :15] = mu
        means[2, :15] = -mu
    else:
        means[0, :P] = mu
        means[2, :P] = -mu
    
    # Generate observations: N(means[state], I_P)
    X = np.zeros((T, P))
    for t in range(T):
        X[t] = rng.normal(loc=means[states[t]], scale=1.0, size=P)
    
    return X, states

## 2.Aligning Predicted Labels With True Labels using the Hungarian Algorithm

In [14]:

def align_labels(true_labels, pred_labels):
    """
    Align predicted labels with true labels using the Hungarian algorithm.
    
    Returns:
        aligned (ndarray): Predicted labels after optimal permutation.
    """
    D = confusion_matrix(true_labels, pred_labels)
    row_ind, col_ind = linear_sum_assignment(-D)
    mapping = {col: row for row, col in zip(row_ind, col_ind)}
    aligned = np.array([mapping[x] for x in pred_labels])
    return aligned

## 3. Setting up the function to calcuate the BAC

In [15]:
def calculate_bac(true_states, pred_states):
    """
    Compute the Balanced Accuracy (BAC) after aligning the predicted state labels.
    """
    aligned_pred = align_labels(true_states, pred_states)
    return balanced_accuracy_score(true_states, aligned_pred)


## 4. Functions for model formulation

### 4.1 HMM With Nystrup (2021) initialization

In [None]:
def run_hmm(X, n_components=3, random_state=None):
    """
    Fit a Gaussian HMM to the data X with the following initialization:
      - Self-transition probability set to 0.95.
      - Covariance prior set to 1.0 (for regularization).
      - Up to 100 iterations of the EM algorithm.
    
    Parameters:
        X (ndarray): Data matrix.
        n_components (int): Number of hidden states.
        random_state (int or None): Seed for reproducibility.
    
    Returns:
        pred_states (ndarray): Predicted state sequence using Viterbi decoding.
    """
    model = hmm.GaussianHMM(
        n_components=n_components,             # Number of hidden states
        covariance_type='diag',                # Diagonal covariance matrices
        n_iter=100,                            # Maximum number of EM iterations
        random_state=random_state,             # Seed for reproducibility
        init_params="mc",                      # Initialize means ('m') and covariances ('c')
        covariance_prior=1.0                   # Regularization: prior added to covariance estimates
    )
    # Set uniform start probabilities
    model.startprob_ = np.full(n_components, 1.0 / n_components)
    # Initialize transition matrix: 0.95 on the diagonal, the remaining probability spread evenly
    transmat = np.full((n_components, n_components), 0.05 / (n_components - 1))
    np.fill_diagonal(transmat, 0.95)
    model.transmat_ = transmat
    
    # Fit the model to the data
    model.fit(X)
    # Predict the hidden state sequence using the Viterbi algorithm
    pred_states = model.predict(X)
    return pred_states


### 4.2 Normal (Standard) Jump Model with Grid Search over λ

In [None]:
def run_jump_model_grid_search(X, true_states, n_components=3, random_state=None):
    """
    Perform a grid search over 14 lambda values (logspace from 1e-2 to 1e4) for the jump model.
    
    For each lambda value:
      - A JumpModel is initialized and fitted.
      - The jump penalty (lambda) controls the cost of switching states.
        - A low lambda allows frequent state changes.
        - A high lambda penalizes state changes, resulting in fewer jumps.
      - The parameter 'cont' specifies whether the jump model is continuous (True) or discrete (False).
      - 'max_iter' defines the maximum number of iterations for the model fitting procedure.
    
    Parameters:
        X (ndarray): Data matrix.
        true_states (ndarray): The true hidden state sequence.
        n_components (int): Number of states.
        random_state (int or None): Seed for reproducibility.
    
    Returns:
        best_labels (ndarray): Predicted state sequence for the best lambda.
        best_bac (float): Best balanced accuracy achieved.
    """
    # Create 14 lambda values logarithmically spaced from 0.01 to 10,000.
    lambda_values = np.logspace(-2, 4, 14)
    best_bac = -1
    best_labels = None
    
    for lam in lambda_values:
        # Create a JumpModel instance with the following parameters:
        model = JumpModel(
            n_components=n_components,    # Number of hidden states
            jump_penalty=lam,             # Lambda: penalty for a state transition
            cont=False,                   # 'cont': if False, model uses discrete jumps
            max_iter=10,                  # Maximum number of iterations for fitting the model
            random_state=random_state     # Seed for reproducibility
        )
        # Fit the jump model to the data X
        model.fit(X)
        # Retrieve predicted state labels from the model
        labels = model.labels_
        # Calculate Balanced Accuracy (BAC) after aligning predicted labels with true states
        bac = calculate_bac(true_states, labels)
        # Update the best result if this lambda gives a higher BAC
        if bac > best_bac:
            best_bac = bac
            best_labels = labels
    
    return best_labels, best_bac

print("orale")

In [17]:
print("Running simulation...")

Running simulation...
