In [1]:
%load_ext autoreload
%autoreload 2
import sys, os
from os.path import expanduser
## actions required!!!!!!!!!!!!!!!!!!!! change your folder path 
path = expanduser("~/Documents/G3_2/regime-identification")
sys.path.append(path)

path_file = expanduser("~/data/G3_2/regime-identification/simulation")
path_data = f"{path_file}/data"
path_estimation = f"{path_file}/estimation"
path_score = f"{path_file}/score"
path_figure = f"{path_file}/figure"

In [2]:
import numpy as np
from sklearn.metrics import roc_auc_score
from hmmlearn.hmm import GaussianHMM
from tqdm import trange, tqdm
import logging
logging.basicConfig(level=logging.WARNING+1)

from numpy.random import RandomState
random_state = RandomState(0)

In [3]:
from sklearn.base import BaseEstimator
from sklearn.utils import check_random_state
from sklearn.cluster import kmeans_plusplus

In [6]:
from regime.jump import *

In [4]:
# def _sort_centers_by_first_feature(init):
#     """
#     sort all the centers in each init by the first feature.
#     """
#     n_i = len(init)
#     idx = init[:, :, 0].argsort(axis=1)[:, ::-1]
#     return init[np.arange(n_i)[:, np.newaxis] , idx]

# def init_k_means_plusplus(X, n_c, n_init=10, random_state=None):
#     """
#     initialize the centers, by k-means++, for n_init times.
#     """
#     random_state = check_random_state(random_state)
#     init = [kmeans_plusplus(X, n_c, random_state=random_state)[0] for _ in range(n_init)]
#     return _sort_centers_by_first_feature(np.array(init))

In [4]:
from sklearn.cluster import KMeans

In [None]:
KMeans()

In [None]:
max_iter = 300,
tol = 1e-4,

In [9]:
def _sort_centers_by_first_feature(init):
    """
    sort all the centers in each init by the first feature.
    """
    n_i = len(init)
    idx = init[:, :, 0].argsort(axis=1)[:, ::-1]
    return init[np.arange(n_i)[:, np.newaxis] , idx]

def init_centers(X, n_c, n_init=10, init = "k-means++", random_state=None):
    """
    initialize the centers, by k-means++, for n_init times.
    """
    random_state = check_random_state(random_state)
    if init == "k-means++":
        centers = [kmeans_plusplus(X, n_c, random_state=random_state)[0] for _ in range(n_init)]
    elif init == "k-means":
        kmeans_instance = KMeans(n_c, n_init=10, random_state=random_state)
        centers = [kmeans_instance.fit(X).cluster_centers_ for _ in range(n_init)]
    else:
        raise NotImplementedError()
    return _sort_centers_by_first_feature(np.array(centers))

def init_k_means_plusplus(X, n_c, n_init=10, random_state=None):
    """
    initialize the centers, by k-means++, for n_init times.
    """
    return init_centers(X, n_c, n_init=n_init, init = "k-means++", random_state=random_state)
    # random_state = check_random_state(random_state)
    # init = [kmeans_plusplus(X, n_c, random_state=random_state)[0] for _ in range(n_init)]
    # return _sort_centers_by_first_feature(np.array(init))

In [21]:
X = np.load(f"{path_data}/Xs_3StateDaily1000_zheng.npy")

In [22]:
centers = init_centers(X[0], 3, 10, "k-means", 9)
centers

array([[[ 9.70372826e-03,  1.16024911e-02,  1.00417639e-02,
          3.05482258e-03,  8.01916713e-03,  1.57341692e-03,
          6.40415282e-03,  4.53622825e-03,  7.02783717e-03,
          1.66116031e-03,  8.61023903e-03,  6.36268170e-04,
          7.89014965e-03,  2.68605244e-03,  8.15070010e-03],
        [-1.33371406e-03,  5.16417118e-03,  6.10967759e-03,
         -4.63902590e-04,  5.75129310e-03, -2.40488927e-04,
          5.36242213e-03, -6.87316253e-04,  3.85945506e-03,
         -3.57996019e-05,  7.00427594e-03,  2.81049312e-04,
          7.03187048e-03, -3.52648516e-04,  6.05454991e-03],
        [-7.22434838e-03,  1.51492248e-02,  1.51343885e-02,
         -8.43939195e-04,  8.94625219e-03,  5.08825650e-04,
          6.56684525e-03, -2.19670404e-03,  8.96261445e-03,
          1.76575448e-05,  8.57497641e-03,  6.47542334e-04,
          7.16755947e-03, -6.12227244e-04,  8.90690877e-03]],

       [[ 9.97177703e-03,  1.37166441e-02,  1.25257409e-02,
          2.84793404e-03,  8.853292

In [5]:
class GaussianHMM_model(BaseEstimator):
    """
    GaussianHMM estimation. support several initializations.
    """
    def __init__(self,
                 n_components = 2,
                 n_init = 10,
                 init = "k-means++",
                 random_state = None,
                 **kwargs
                ):
        self.n_components = n_components
        self.n_init = n_init
        self.init = init
        self.random_state = check_random_state(random_state)
        self.hmm_instance = GaussianHMM(n_components, 
                                        covariance_type='full',
                                        init_params="sct", 
                                        min_covar=1e-6, 
                                        covars_prior=1e-6, 
                                        random_state=self.random_state, 
                                        **kwargs
                                       )
        
    def fit(self, X):
        n_c = self.n_components; n_init = self.n_init; init = self.init; hmm_instance = self.hmm_instance
        # initialization by k-means++
        init = init_centers(X, n_c, n_init=n_init, init = init, random_state=self.random_state)
        best_score = -np.inf
        # iter over all inits
        for i_i in range(n_init):
            # fit
            hmm_instance.means_ = init[i_i]; hmm_instance.fit(X)
            # score
            score = hmm_instance.score(X)
            # print(f"{i_i}: {score}. means: {hmm_instance.means_}")
            if score > best_score:
                best_idx = i_i
                best_score = score
                best_res = {"means_": hmm_instance.means_, 
                            "covars_": hmm_instance.covars_, 
                            "transmat_": hmm_instance.transmat_,
                           "startprob_": hmm_instance.startprob_}
        self.best_res = best_res
        # print(best_idx)
        hmm_instance.means_ = best_res["means_"]; hmm_instance.covars_ = best_res["covars_"]
        hmm_instance.transmat_ = best_res["transmat_"]; hmm_instance.startprob_ = best_res["startprob_"]
        # save res
        self.means_ = best_res["means_"].squeeze(); self.covars_ = best_res["covars_"].squeeze(); self.transmat_ = best_res["transmat_"]
        self.labels_ = hmm_instance.predict(X).astype(np.int32)
        self.proba_ = hmm_instance.predict_proba(X)
        return self

In [4]:
Xs = np.load(f"{path_data}/Xs_daily_1000_HMM.npy")
X=Xs[0]
X.shape

(1000, 1)

In [17]:
hmm_model = GaussianHMM_model().fit(X)

In [18]:
hmm_instance = hmm_model.hmm_instance
hmm_instance.means_, hmm_instance.covars_

(array([[-0.000471  ],
        [ 0.00077872]]),
 array([[[2.98940405e-04]],
 
        [[5.99465580e-05]]]))

In [19]:
def weighted_mean_vol(X, proba_):
    n_c = proba_.shape[1]
    means_, covars_ = np.full(n_c, np.nan), np.full(n_c, np.nan)
    total_weight = proba_.sum(0)
    idx = (total_weight>0)
    weighted_sum = X @ proba_
    means_[idx] = weighted_sum[idx] / total_weight[idx]
    weighted_sum_square = ((X[:, np.newaxis] - means_[np.newaxis, :])**2 * proba_).sum(0)
    covars_[idx] = weighted_sum_square[idx] / total_weight[idx]
    return means_, covars_


In [20]:
weighted_mean_vol(X.squeeze(), hmm_instance.predict_proba(X))

(array([-0.00047099,  0.00077872]), array([2.98938303e-04, 5.99466408e-05]))

In [10]:
hmm_model.best_res

{'means_': array([[-0.00047146],
        [ 0.00077841]]),
 'covars_': array([[[2.99406369e-04]],
 
        [[6.01212346e-05]]]),
 'transmat_': array([[0.98265705, 0.01734295],
        [0.00831349, 0.99168651]]),
 'startprob_': array([1.29164052e-07, 9.99999871e-01])}

In [17]:
hmm_model.hmm_instance.transmat_

array([[0.98265705, 0.01734295],
       [0.00831349, 0.99168651]])

In [18]:
hmm_instance = hmm_model.hmm_instance
hmm_instance.covars_

array([[[2.99406369e-04]],

       [[6.01212346e-05]]])