In [None]:
import pandas as pd
import numpy as np
from scipy.optimize import minimize
from collections import Counter
from itertools import product
from itertools import combinations
import matplotlib.pyplot as plt

In [None]:
# Markov Chain's states
S = 4

# Length of Sequence
N = 50000 

states = []
for i in range(S):
    states.append(i)

stationary_dist = []
for i in range(S):
    stationary_dist.append(1/S)

# sequence generation function
def McSimulate(p_ij, length):
    data = [np.random.choice(states, size=1, p=[1 / 4, 1 / 4, 1 / 4, 1 / 4])[0]]
    for i in range(length - 1):
        data.append(np.random.choice([0, 1, 2, 3], size=1, p=p_ij[data[i]])[0])
    return data

# Data set Split
T = 5
T_len = N//T


In [None]:
### True_1
p_ij_true = np.array([
    [0.37, 0.09, 0.4, 0.14],
    [0.48, 0.19, 0.03, 0.3],
    [0.35, 0.38, 0.23, 0.04],
    [0.36, 0.01, 0.45, 0.18]
])

np.sum(p_ij_true, axis=1)

### True_2
# p_ij_true = np.array([
#     [0.4, 0.06, 0.4, 0.14],
#     [0.5, 0.17, 0.03, 0.3],
#     [0.4, 0.33, 0.25, 0.02],
#     [0.4, 0.01, 0.4, 0.19]
# ])


In [None]:
p_ij_true_sequence = p_ij_true.flatten()
class_true = np.unique(p_ij_true_sequence)
each_class_index = []

for class_val in class_true:
    indices = np.where(p_ij_true_sequence == class_val)[0]
    each_class_index.append(indices.tolist())

In [None]:
#### Transition Counts function
def TransitionCounts(T, M):
    pairs: list[str] = [str(T[i]) + str(T[i + 1]) for i in range(len(T) - 1)]
    pairs_order = [str(i) + str(j) for i, j in product(range(M), repeat=2)]
    iter = Counter(pairs)
    iter = [iter[pair] for pair in pairs_order]
    return iter

#### MLE function
def mle(N):
    p_ij_hat_matrix = N / N.sum(axis=1, keepdims=True)
    return p_ij_hat_matrix

#### Transition Probability Difference function
def prob_diff(z):
    pairs= list(combinations(z,2))
    w=[]
    for pair in pairs:
        diff = abs(pair[0]-pair[1])
        w.append(diff)
    return np.array(w)

def penalty_function(numerator,denomiator):  #### Penalty fraction function
    pelt = numerator/denomiator
    pelt[denomiator==0]=0 ## IF the denomiator is 0, then the whole penalty term force to 0
    return pelt

# Constraint Functions
def eval_g_eq(x):
    row = [x[i*M:(i+1)*M] for i in range(len(x)//M)]
    constraints = []
    for z in row:
        constraints.extend([np.sum(z)-1])
    return constraints


## Generated function for Ada lasso
def ada_lasso(data, lam):
    n =TransitionCounts(data,M)
    n_ij_matrix = np.reshape(n,(M,M))
    p_ij_hat_matrix = mle(n_ij_matrix)
    p_ij_hat = p_ij_hat_matrix.flatten()  ## Each set's MLE counts DONE!!
    w_deno= prob_diff(p_ij_hat) 
    def obj_f(x): #### Objective function
        w_num  = prob_diff(x) 
        pelt = penalty_function(w_num,w_deno)
        return -np.sum((n * np.log(x))) + lam*np.sum(pelt) #/np.sqrt(np.sum(n)）
    
    res = minimize(obj_f, p_ij_hat, ## Initial value set to the MLE results for further optimize
    constraints={'type': 'eq', 'fun': eval_g_eq}, 
    bounds=[(0, 1)] * (M**2), 
    method='SLSQP', 
    options={'maxiter': 160000}
    )
    # res_tilde = np.round(res.x,3)
    # p_tilde = res_tilde
    p_tilde = res.x
    return n, p_tilde


## Generated function for lasso
def lasso(data, lam):
    n =TransitionCounts(data,M)
    n_ij_matrix = np.reshape(n,(M,M))
    p_ij_hat_matrix = mle(n_ij_matrix)
    p_ij_hat = p_ij_hat_matrix.flatten()  ## Each set's MLE counts DONE!!
    w_deno= prob_diff(p_ij_hat) 
    def obj_f(x): #### Objective function
        diff_x  = prob_diff(x) 
        return -np.sum((n * np.log(x))) + lam*np.sum(diff_x) #/np.sqrt(np.sum(n)）
    
    res = minimize(obj_f, p_ij_hat, ## Initial value set to the MLE results for further optimize
    constraints={'type': 'eq', 'fun': eval_g_eq}, 
    bounds=[(0, 1)] * (M**2), 
    method='SLSQP', 
    options={'maxiter': 160000}
    )
    # res_tilde = np.round(res.x,3)
    # p_check = res_tilde
    p_check = res.x
    return n, p_check


## Purity Function
def purity(tilde):
    purity_set = []
    each_class_tilde_index = []
    class_tilde = np.unique(tilde)
    each_class_tilde_index = []
    for class_val in class_tilde:
        indices = np.where(tilde == class_val)[0]
        each_class_tilde_index.append(indices.tolist())
    max_count = []
    for ell in each_class_index:
        count = []
        for i in each_class_tilde_index:
            count.append(len(np.intersect1d(ell,i)))
        max_count.append(np.max(count))
    purity_result = np.sum(max_count)/M**2    
    return purity_result


### Finite-dimensional complex normed spaces
def FDnorm(b):
    b_matrix = np.reshape(b,(M,M))
    p_ij_tilde_bar = b_matrix- p_ij_true
    norm = np.sqrt(np.sum(np.abs(p_ij_tilde_bar) ** 2))
    return norm

#### Lasso-based Penalized likelihood estimaiton with CV

In [None]:
cv_each_lambda_adalasso=[]

LAMBDA = np.arange(0, 2, 0.2) 

cv_each_lambda_lasso = []

for j in range(len(LAMBDA)):
    L = LAMBDA[j]
    cv_each_training_lasso = []
    cv_each_test_lasso=[]
    for i in range(T):
        testing_set = DATA[i]
        n_ij_testing = TransitionCounts(testing_set,M)
        training_set = np.delete(DATA, i, axis=0)
        for z in range(T-1):
            train_subdata = training_set[z]
            n_train_lasso, p_tilde_train_lasso = lasso(train_subdata,L)
            cv_lasso = -np.sum(n_ij_testing*np.log(p_tilde_train_lasso))  
            cv_each_training_lasso.append(np.sum(cv_lasso))
        cv_each_test_lasso.append(np.sum(cv_each_training_lasso))
    cv_each_lambda_lasso.append(np.sum(cv_each_test_lasso))   



position = list(cv_each_lambda_lasso).index(min(cv_each_lambda_lasso))
opt_lambda_lasso = LAMBDA[position]
a, p_check = lasso(sequence, opt_lambda_lasso)
p_check = np.round(p_check,3)
p_check_matrix = np.reshape(p_check,(M,M))


p_check_matrix = np.reshape(p_check,(M,M))
print(p_ij_true)
print(p_check_matrix)
print(purity(p_check))
print(FDnorm(p_check))

In [None]:
plt.plot(LAMBDA,cv_each_lambda_lasso, marker='o')  
plt.show()