In [2]:
using LinearAlgebra
# using Plots
using Random
using StateSpaceDynamics


const SSD = StateSpaceDynamics

StateSpaceDynamics

In [4]:
latent_dim=1
input_dim=2
obs_dim=3

true_model = SwitchingBernoulliRegression(K=latent_dim, input_dim=input_dim, output_dim=obs_dim, include_intercept=false)

HiddenMarkovModel([1.0;;], EmissionModel[BernoulliRegressionEmission(2, 3, [0.0 0.0 0.0; 0.0 0.0 0.0], false, 0.0)], [1.0], 1)

In [1]:
# Create Switching Regression Model
AR1 = AutoRegressionEmission(output_dim=3, order=2, include_intercept=false, β=[1 2 1;  2 3 3; 4  2 1; 1  3 3; 3 2 1; 2 3 3])  # 6 x 3
AR2 = AutoRegressionEmission(output_dim=3, order=2, include_intercept=true, β=[1 2 1;  2 3 3; 4 3 1; 1 3 3; 3 3 2; 3 2 3; 3 3 2 ])  # 7 x 3 (7 cuz intercept)

# Sampling from AR without intercept
order = AR1.order
output_dim = AR1.output_dim
Y_init = randn(order, output_dim)
SSD.sample(AR1, Y_init)

# Sampling from AR with intercept
order = AR2.order
output_dim = AR1.output_dim
Y_init = randn(order, output_dim)
SSD.sample(AR2, Y_init)

UndefVarError: UndefVarError: `AutoRegressionEmission` not defined

# Gaussian HMM

In [5]:
"""
Create an underlying GaussianHMM to generate data
"""

# Create Guassian Emission Models
output_dim = 2
μ = [0.0, 0.0]
Σ = 0.1 * Matrix{Float64}(I, output_dim, output_dim)
emission_1 = GaussianEmission(output_dim, μ, Σ)

μ = [2.0, 1.0]
Σ = 0.1 * Matrix{Float64}(I, output_dim, output_dim)
emission_2 = GaussianEmission(output_dim, μ, Σ)

# Create GaussianHMM
true_model = SSD.GaussianHMM(K=2, output_dim=2)
true_model.B[1] = emission_1
true_model.B[2] = emission_2
true_model.A = [0.9 0.1; 0.8 0.2]

# Sample from the model
n=10000
true_labels, data = SSD.sample(true_model, n=n)

# Fit a gaussian hmm to the data
test_model = SSD.GaussianHMM(K=2, output_dim=2)
test_model.A = [0.8 0.2; 0.05 0.95]
ll = SSD.fit!(test_model, data)

print(isapprox(test_model.B[1].μ, true_model.B[1].μ, atol=0.1) || isapprox(test_model.B[1].μ, true_model.B[2].μ, atol=0.1))
print(isapprox(test_model.B[2].μ, true_model.B[2].μ, atol=0.1) || isapprox(test_model.B[2].μ, true_model.B[1].μ, atol=0.1))

[32mRunning EM algorithm... 100%|██████████████████████████████████████████████████| Time: 0:00:11 ( 0.12  s/it)[39m[K


truetrue

In [15]:
"""
Find the nearest covariance matrix to matrix A in Frobenius norm using Higham's algorithm.

Arguments:
- `A`: Input matrix (should be square)
- `max_iter`: Maximum number of iterations
- `tol`: Convergence tolerance

Returns:
- The nearest covariance matrix to A
"""
function nearest_covariance_matrix(A; max_iter=100, tol=1e-6)
    # Convert to Matrix{Float64} to ensure numerical stability
    A = convert(Matrix{Float64}, A)
    n = size(A, 1)
    
    # Symmetrize A
    A = (A + A') / 2
    
    # Initial projection onto PSD cone
    F = eigen(Symmetric(A))
    eigvals = max.(F.values, 0)
    X = F.vectors * Diagonal(eigvals) * F.vectors'
    
    # Initialize Dykstra's correction matrices
    Y = zeros(n, n)
    S = zeros(n, n)
    
    for _ in 1:max_iter
        X_old = copy(X)
        
        # Project onto symmetric matrices with unit diagonal
        R = X - Y
        R = (R + R') / 2  # Ensure symmetry
        Y = R - Diagonal(diag(R)) + I
        
        # Project onto PSD cone
        R = Y - S
        F = eigen(Symmetric(R))
        eigvals = max.(F.values, 0)
        X = F.vectors * Diagonal(eigvals) * F.vectors'
        S = X - R
        
        # Check convergence using Frobenius norm
        if norm(X - X_old) < tol
            break
        end
    end
    
    # Ensure perfect symmetry in output
    return (X + X') / 2
end

nearest_covariance_matrix

In [None]:
"""
Find nearest covariance matrix by simple eigenvalue projection.
Just sets negative eigenvalues to zero.
"""
function enforce_posdef(A)
    # Symmetrize first
    A = (A + A') / 2
    
    # Eigendecomposition and fix negative eigenvalues
    F = eigen(Symmetric(A))
    eigvals = max.(F.values, 0)
    return F.vectors * Diagonal(eigvals) * F.vectors'
end

nearest_covariance_simple

In [33]:
nearest_covariance_matrix([0.98 0.92; 0.92 1.0])

2×2 Matrix{Float64}:
 1.0          1.66533e-16
 1.66533e-16  1.0

In [42]:
nearest_covariance_simple(rand(4, 4))

4×4 Matrix{Float64}:
 0.764114  0.624749  0.628557  0.715401
 0.624749  0.673734  0.345721  0.488553
 0.628557  0.345721  0.828511  0.593067
 0.715401  0.488553  0.593067  0.792134

# Switching Gaussian Regression #

In [3]:
# Create Emission Models
emission_1 = GaussianRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([3, 2, 2, 3], :, 1))
emission_2 = GaussianRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([-4, -2, 3, 2], :, 1))

# Create Switching Regression Model
true_model = SwitchingGaussianRegression(K=2, input_dim=3, output_dim=1, include_intercept=true)

# Plug in the emission models
true_model.B[1] = emission_1
true_model.B[2] = emission_2

# Sample from the model
n = 20000
Φ = randn(3, n)
true_labels, data = SSD.sample(true_model, Φ, n=n)

# Try to fit a new model to the data
test_model = SSD.SwitchingGaussianRegression(K=2, input_dim=3, output_dim=1, include_intercept=true)
# ll = SSD.fit!(test_model, data, Φ)

# # Test output -> not quite right yet
# print(isapprox(test_model.B[1].β, true_model.B[1].β, atol=0.1) || isapprox(test_model.B[1].β, true_model.B[2].β, atol=0.1))
# print(isapprox(test_model.B[2].β, true_model.B[2].β, atol=0.1) || isapprox(test_model.B[2].β, true_model.B[1].β, atol=0.1))

HiddenMarkovModel([0.9818882757850009 0.018111724214999136; 0.18937669040808647 0.8106233095919136], EmissionModel[GaussianRegressionEmission(3, 1, [0.0; 0.0; 0.0; 0.0;;], [1.0;;], true, 0.0), GaussianRegressionEmission(3, 1, [0.0; 0.0; 0.0; 0.0;;], [1.0;;], true, 0.0)], [0.881706175542047, 0.11829382445795306], 2)

In [2]:
# Create Emission Models
emission_1 = GaussianRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1 2; 3 1; 2 5])
emission_2 = GaussianRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[-2 4; 2 -1; 2 1])

# Create Switching Regression Model
true_model = SwitchingGaussianRegression(K=2, input_dim=3, output_dim=2, include_intercept=false)

# Plug in the emission models
true_model.B[1] = emission_1
true_model.B[2] = emission_2

# Sample from the model
n = 50
Φ = randn(3, n)
true_labels, data = SSD.sample(true_model, Φ, n=n)

# Try to fit a new model to the data
test_model = SSD.SwitchingGaussianRegression(K=2, input_dim=3, output_dim=2, include_intercept=false)
ll = SSD.fit!(test_model, data, Φ, max_iters=1)

# Test output -> not quite right yet
print(isapprox(test_model.B[1].β, true_model.B[1].β, atol=0.1) || isapprox(test_model.B[1].β, true_model.B[2].β, atol=0.1))
print(isapprox(test_model.B[2].β, true_model.B[2].β, atol=0.1) || isapprox(test_model.B[2].β, true_model.B[1].β, atol=0.1))

(50, 2)
[-1.5340565505547805 -17.46646361583557; -5.5191629797791535 -1.0777366586347439; -20.308987364730555 -11.378136797770896; -1.4602286599861038 -3.535497482819568; -1.0603279056262906 -16.657321652911207; -0.19317072902870383 -7.827707143351825; -0.3279438641737647 -1.5168317340973236; -0.1912135192440547 -0.9375157586361876; -0.07687575381717515 -0.16657004900427402; -0.8598983520877328 -3.2221098580874874; -6.112827498607898 -2.880580240006563; -2.504049986738344 -5.45494348889508; -3.8221541018849496 -65.78383766757811; -33.374463501911556 -47.24189502644629; -9.968311891461932 -12.804600415327847; -6.359059487717632 -39.060991722204214; -4.49303023247715 -3.7191139252289993; -0.5124173398558326 -14.562148448044102; -21.679509689708315 -15.530787288549252; -18.30590041861979 -20.48940807875529; -20.63014601655479 -0.01922854905624649; -0.5070332293201869 -5.748488882831594; -0.0886288201913494 -9.497477929143836; -1.086487333404308 -65.3476840506082; -25.18739190149651 -26.42

DimensionMismatch: DimensionMismatch: cannot broadcast array to have fewer non-singleton dimensions

In [5]:
"""
Create a Switching Poisson Regression
"""
# Create the emission models
emission_1 = PoissonRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1 2; 1 2; 1 2])
emission_2 = PoissonRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[2 1; 2 1; 2 1])

# Initialize the SwitchingPoissonRegression
true_model = SwitchingPoissonRegression(K=2, input_dim=3, output_dim=2, include_intercept=false)

# Plug in the emission models
true_model.B[1] = emission_1
true_model.B[2] = emission_2

# Sample from the HMM
n=20000
Φ = randn(3, n)
true_labels, data = SSD.sample(true_model, Φ, n=n)

# Create a new SwitchingPoissonRegression and try to recover parameters
test_model = SwitchingPoissonRegression(K=2, input_dim=3, output_dim=2, include_intercept=false)

# Create the emission models for warm start
emission_1 = PoissonRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1.0 2.0; 1.0 2.0; 1.0 2.0])
emission_2 = PoissonRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[2.0 1.0; 2.0 1.0; 2.0 1.0])
test_model.B[1], test_model.B[2] = emission_1, emission_2

ll = SSD.fit!(test_model, data, Φ, max_iters=100)

print(isapprox(test_model.B[1].β, true_model.B[1].β, atol=0.1) || isapprox(test_model.B[1].β, true_model.B[2].β, atol=0.1))
print(isapprox(test_model.B[2].β, true_model.B[2].β, atol=0.1) || isapprox(test_model.B[2].β, true_model.B[1].β, atol=0.1))


[32mRunning EM algorithm... 100%|██████████████████████████████████████████████████| Time: 0:01:56 ( 1.16  s/it)[39m[K


truetrue

# Switching Bernoulli Regression #

In [7]:
"""
Create Bernoulli Regression Model
"""
# Make Emission Models
emission_1 = SSD.BernoulliRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1 2; 1 2; 1 2])
emission_2 = SSD.BernoulliRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[2 1; 2 1; 2 1])

# Create Switching Bernoulli Regression and add the emissions
true_model = SSD.SwitchingBernoulliRegression(K=2, input_dim=3)
true_model.A = [0.9 0.1; 0.2 0.8]
true_model.B[1] = emission_1
true_model.B[2] = emission_2

# Sample from the model
n=40000
Φ = randn(3, n)
true_labels, data = SSD.sample(true_model, Φ, n=n)

# Fit a new Bernoulli Regression Model to the data
test_model = SSD.SwitchingBernoulliRegression(K=2, input_dim=2, λ=1.0)
test_model.A = [0.75 0.25; 0.1 0.9]
test_model.B[1] = SSD.BernoulliRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1.3 2.6; 1.2 2.6; 1.2 2.0])
test_model.B[2] = SSD.BernoulliRegressionEmission(input_dim=3, output_dim=2, include_intercept=false, β=[1.1 2.3; 1.1 2.2; 1.8 2.9])
ll = SSD.fit!(test_model, data, Φ, max_iters=200)

# # Test it works alright
print(isapprox(test_model.B[1].β, true_model.B[1].β, atol=0.2) || isapprox(test_model.B[1].β, true_model.B[2].β, atol=0.2))
print(isapprox(test_model.B[2].β, true_model.B[2].β, atol=0.2) || isapprox(test_model.B[2].β, true_model.B[1].β, atol=0.2))

[32mRunning EM algorithm... 100%|██████████████████████████████████████████████████| Time: 0:09:21 ( 2.81  s/it)[39m[K


falsefalse

In [None]:
 # Make Emission Models
 emission_1 = StateSpaceDynamics.BernoulliRegressionEmission(input_dim=2, output_dim=1, include_intercept=true, β = reshape([3, 1, 2], :, 1))
 emission_2 = StateSpaceDynamics.BernoulliRegressionEmission(input_dim=2, output_dim=1, include_intercept=true, β = reshape([-3, -2, 0.1], :, 1))

 # Create Switching Bernoulli Regression and add the emissions
 true_model = StateSpaceDynamics.SwitchingBernoulliRegression(K=2, input_dim=2)
 true_model.A = [0.9 0.1; 0.2 0.8]
 true_model.B[1] = emission_1
 true_model.B[2] = emission_2

 # Sample from the model
 n=20000
 Φ = randn(2, n)
 true_labels, data = StateSpaceDynamics.sample(true_model, Φ, n=n)

 # Fit a new Bernoulli Regression Model to the data
 test_model = StateSpaceDynamics.SwitchingBernoulliRegression(K=2, input_dim=2, λ=1.0)
 test_model.A = [0.75 0.25; 0.1 0.9]
 test_model.B[1] = StateSpaceDynamics.BernoulliRegressionEmission(input_dim=2, output_dim=1, include_intercept=true, β = reshape([2.5, 0.25, 1.0], :, 1))
 test_model.B[2] = StateSpaceDynamics.BernoulliRegressionEmission(input_dim=2, output_dim=1, include_intercept=true, β = reshape([-2.0, -3.0, -1.0], :, 1))
 ll = StateSpaceDynamics.fit!(test_model, data, Φ, max_iters=200)


# Switching Poisson Regression

In [None]:
"""
Create a Switching Poisson Regression
"""
# Create the emission models
emission_1 = PoissonRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([4, 3, 2, 4], :, 1))
emission_2 = PoissonRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([-4, -2, 1, 3], :, 1))

# Initialize the SwitchingPoissonRegression
true_model = SwitchingPoissonRegression(K=2, input_dim=3, output_dim=1)

# Plug in the emission models
true_model.B[1] = emission_1
true_model.B[2] = emission_2

# Sample from the HMM
n=20000
Φ = randn(3, n)
true_labels, data = SSD.sample(true_model, Φ, n=n)

# Create a new SwitchingPoissonRegression and try to recover parameters
test_model = SwitchingPoissonRegression(K=2, input_dim=3, output_dim=1)

# Create the emission models for warm start
emission_1 = PoissonRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([2.0, 1.0, 4.0, 2.0], :, 1))
emission_2 = PoissonRegressionEmission(input_dim=3, output_dim=1, include_intercept=true, β=reshape([-5.0, -1.0, 0.0, 2.0], :, 1))
test_model.B[1], test_model.B[2] = emission_1, emission_2

ll = SSD.fit!(test_model, data, Φ, max_iters=200)

print(isapprox(test_model.B[1].β, true_model.B[1].β, atol=0.1) || isapprox(test_model.B[1].β, true_model.B[2].β, atol=0.1))
print(isapprox(test_model.B[2].β, true_model.B[2].β, atol=0.1) || isapprox(test_model.B[2].β, true_model.B[1].β, atol=0.1))


In [None]:
# Create Guassian Emission Models
output_dim = 2
μ = [-5.0, -4.0]
Σ = 0.1 * Matrix{Float64}(I, output_dim, output_dim)
emission_1 = GaussianEmission(output_dim=output_dim, μ=μ, Σ=Σ)

μ = [2.0, 1.0]
Σ = 0.1 * Matrix{Float64}(I, output_dim, output_dim)
emission_2 = GaussianEmission(output_dim=output_dim, μ=μ, Σ=Σ)

# Create GaussianHMM
true_model = GaussianHMM(K=2, output_dim=2)
true_model.B[1] = emission_1
true_model.B[2] = emission_2
true_model.A = [0.9 0.1; 0.8 0.2]

# Generate trialized synthetic data
n = 100
num_trials = 30
Y = Vector{Matrix{Float64}}(undef, num_trials)
trial_labels = Vector{Vector{Int}}(undef, num_trials)  

for i in 1:num_trials
    true_labels, data = StateSpaceDynamics.sample(true_model, n=n)  # Generate data and labels
    Y[i] = data  # Store data matrix for the ith trial
end

# Fit a model to the trialized synthetic data
est_model = GaussianHMM(K=2, output_dim=2)
lls = StateSpaceDynamics.fit!(est_model, Y, max_iters=100)

In [None]:
est_model.A