In [None]:
pip install -r hmmlearn -t .

In [None]:
#util.py 
import numpy as np
from scipy import special


def normalize(a, axis=None):
    a_sum = a.sum(axis)
    if axis and a.ndim > 1:
        a_sum[a_sum == 0] = 1
        shape = list(a.shape)
        shape[axis] = 1
        a_sum.shape = shape

    a /= a_sum


def log_normalize(a, axis=None):
    if axis is not None and a.shape[axis] == 1:
        a[:] = 0
    else:
        with np.errstate(under="ignore"):
            a_lse = special.logsumexp(a, axis, keepdims=True)
        a -= a_lse


def fill_covars(covars, covariance_type='full', n_components=1, n_features=1):
    if covariance_type == 'full':
        return covars
    elif covariance_type == 'diag':
        return np.array(list(map(np.diag, covars)))
    elif covariance_type == 'tied':
        return np.tile(covars, (n_components, 1, 1))
    elif covariance_type == 'spherical':
        covars = np.ravel(covars)
        eye = np.eye(n_features)[np.newaxis, :, :]
        covars = covars[:, np.newaxis, np.newaxis]
        return eye * covars

In [None]:
#base.py

import logging
import string
import sys
from collections import deque

import numpy as np
from scipy import linalg, special
from sklearn.base import BaseEstimator
from sklearn.utils.validation import (
    check_array, check_is_fitted, check_random_state)

from . import _hmmc, _kl_divergence as _kl, _utils
from .utils import normalize, log_normalize


_log = logging.getLogger(__name__)
#: Supported decoder algorithms.
DECODER_ALGORITHMS = frozenset(("viterbi", "map"))


class ConvergenceMonitor:


    _template = "{iter:>10d} {log_prob:>16.8f} {delta:>+16.8f}"

    def __init__(self, tol, n_iter, verbose):
        
        self.tol = tol
        self.n_iter = n_iter
        self.verbose = verbose
        self.history = deque()
        self.iter = 0

    def __repr__(self):
        class_name = self.__class__.__name__
        params = sorted(dict(vars(self), history=list(self.history)).items())
        return ("{}(\n".format(class_name)
                + "".join(map("    {}={},\n".format, *zip(*params)))
                + ")")

    def _reset(self):
        self.iter = 0
        self.history.clear()

    def report(self, log_prob):
        
        if self.verbose:
            delta = log_prob - self.history[-1] if self.history else np.nan
            message = self._template.format(
                iter=self.iter + 1, log_prob=log_prob, delta=delta)
            print(message, file=sys.stderr)

        # Allow for some wiggleroom based on precision.
        precision = np.finfo(float).eps ** (1/2)
        if self.history and (log_prob - self.history[-1]) < -precision:
            delta = log_prob - self.history[-1]
            _log.warning(f"Model is not converging.  Current: {log_prob}"
                         f" is not greater than {self.history[-1]}."
                         f" Delta is {delta}")
        self.history.append(log_prob)
        self.iter += 1

    @property
    def converged(self):
        """Whether the EM algorithm converged."""
        # XXX we might want to check that ``log_prob`` is non-decreasing.
        return (self.iter == self.n_iter or
                (len(self.history) >= 2 and
                 self.history[-1] - self.history[-2] < self.tol))


class _AbstractHMM(BaseEstimator):

    def __init__(self, n_components, algorithm, random_state, n_iter,
                 tol, verbose, params, init_params, implementation):
       

        self.n_components = n_components
        self.params = params
        self.init_params = init_params
        self.algorithm = algorithm
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        self.implementation = implementation
        self.random_state = random_state

    def score_samples(self, X, lengths=None):
        return self._score(X, lengths, compute_posteriors=True)

    def score(self, X, lengths=None):
        return self._score(X, lengths, compute_posteriors=False)[0]

    def _score(self, X, lengths=None, *, compute_posteriors):
        check_is_fitted(self, "startprob_")
        self._check()

        X = check_array(X)
        impl = {
            "scaling": self._score_scaling,
            "log": self._score_log,
        }[self.implementation]
        return impl(
            X=X, lengths=lengths, compute_posteriors=compute_posteriors)

    def _score_log(self, X, lengths=None, *, compute_posteriors):
        log_prob = 0
        sub_posteriors = [np.empty((0, self.n_components))]
        for sub_X in _utils.split_X_lengths(X, lengths):
            log_frameprob = self._compute_log_likelihood(sub_X)
            log_probij, fwdlattice = _hmmc.forward_log(
                self.startprob_, self.transmat_, log_frameprob)
            log_prob += log_probij
            if compute_posteriors:
                bwdlattice = _hmmc.backward_log(
                    self.startprob_, self.transmat_, log_frameprob)
                sub_posteriors.append(
                    self._compute_posteriors_log(fwdlattice, bwdlattice))
        return log_prob, np.concatenate(sub_posteriors)

    def _score_scaling(self, X, lengths=None, *, compute_posteriors):
        log_prob = 0
        sub_posteriors = [np.empty((0, self.n_components))]
        for sub_X in _utils.split_X_lengths(X, lengths):
            frameprob = self._compute_likelihood(sub_X)
            log_probij, fwdlattice, scaling_factors = _hmmc.forward_scaling(
                self.startprob_, self.transmat_, frameprob)
            log_prob += log_probij
            if compute_posteriors:
                bwdlattice = _hmmc.backward_scaling(
                    self.startprob_, self.transmat_,
                    frameprob, scaling_factors)
                sub_posteriors.append(
                    self._compute_posteriors_scaling(fwdlattice, bwdlattice))

        return log_prob, np.concatenate(sub_posteriors)

    def _decode_viterbi(self, X):
        log_frameprob = self._compute_log_likelihood(X)
        return _hmmc.viterbi(self.startprob_, self.transmat_, log_frameprob)

    def _decode_map(self, X):
        _, posteriors = self.score_samples(X)
        log_prob = np.max(posteriors, axis=1).sum()
        state_sequence = np.argmax(posteriors, axis=1)
        return log_prob, state_sequence

    def decode(self, X, lengths=None, algorithm=None):
        check_is_fitted(self, "startprob_")
        self._check()

        algorithm = algorithm or self.algorithm
        if algorithm not in DECODER_ALGORITHMS:
            raise ValueError(f"Unknown decoder {algorithm!r}")

        decoder = {
            "viterbi": self._decode_viterbi,
            "map": self._decode_map
        }[algorithm]

        X = check_array(X)
        log_prob = 0
        sub_state_sequences = []
        for sub_X in _utils.split_X_lengths(X, lengths):
            # XXX decoder works on a single sample at a time!
            sub_log_prob, sub_state_sequence = decoder(sub_X)
            log_prob += sub_log_prob
            sub_state_sequences.append(sub_state_sequence)

        return log_prob, np.concatenate(sub_state_sequences)

    def predict(self, X, lengths=None):
        _, state_sequence = self.decode(X, lengths)
        return state_sequence

    def predict_proba(self, X, lengths=None):
        _, posteriors = self.score_samples(X, lengths)
        return posteriors

    def sample(self, n_samples=1, random_state=None, currstate=None):
        check_is_fitted(self, "startprob_")
        self._check()

        if random_state is None:
            random_state = self.random_state
        random_state = check_random_state(random_state)

        transmat_cdf = np.cumsum(self.transmat_, axis=1)

        if currstate is None:
            startprob_cdf = np.cumsum(self.startprob_)
            currstate = (startprob_cdf > random_state.rand()).argmax()

        state_sequence = [currstate]
        X = [self._generate_sample_from_state(
            currstate, random_state=random_state)]

        for t in range(n_samples - 1):
            currstate = (
                (transmat_cdf[currstate] > random_state.rand()).argmax())
            state_sequence.append(currstate)
            X.append(self._generate_sample_from_state(
                currstate, random_state=random_state))

        return np.atleast_2d(X), np.array(state_sequence, dtype=int)

    

    def _fit_scaling(self, X):
        raise NotImplementedError("Must be overridden in subclass")

    def _fit_log(self, X):
        raise NotImplementedError("Must be overridden in subclass")

    def _compute_posteriors_scaling(self, fwdlattice, bwdlattice):
        posteriors = fwdlattice * bwdlattice
        normalize(posteriors, axis=1)
        return posteriors

    def _compute_posteriors_log(self, fwdlattice, bwdlattice):
        log_gamma = fwdlattice + bwdlattice
        log_normalize(log_gamma, axis=1)
        with np.errstate(under="ignore"):
            return np.exp(log_gamma)

    def _needs_init(self, code, name):
        if code in self.init_params:
            if hasattr(self, name):
                _log.warning(
                    "Even though the %r attribute is set, it will be "
                    "overwritten during initialization because 'init_params' "
                    "contains %r", name, code)
            return True
        if not hasattr(self, name):
            return True
        return False

    def _check_and_set_n_features(self, X):
        _, n_features = X.shape
        if hasattr(self, "n_features"):
            if self.n_features != n_features:
                raise ValueError(
                    f"Unexpected number of dimensions, got {n_features} but "
                    f"expected {self.n_features}")
        else:
            self.n_features = n_features

    def _get_n_fit_scalars_per_param(self):
        raise NotImplementedError("Must be overridden in subclass")

    def _check_sum_1(self, name):
        """Check that an array describes one or more distributions."""
        s = getattr(self, name).sum(axis=-1)
        if not np.allclose(s, 1):
            raise ValueError(
                f"{name} must sum to 1 (got {s:.4f})" if s.ndim == 0 else
                f"{name} rows must sum to 1 (got {s})" if s.ndim == 1 else
                "Expected 1D or 2D array")

    def _check(self):
        raise NotImplementedError("Must be overridden in subclass")

    def _compute_likelihood(self, X):
        if (self._compute_log_likelihood  # prevent recursion
                != __class__._compute_log_likelihood.__get__(self)):
            # Probabilities equal to zero do occur, and exp(-LARGE) = 0 is OK.
            with np.errstate(under="ignore"):
                return np.exp(self._compute_log_likelihood(X))
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _compute_log_likelihood(self, X):
        if (self._compute_likelihood  # prevent recursion
                != __class__._compute_likelihood.__get__(self)):
            # Probabilities equal to zero do occur, and log(0) = -inf is OK.
            likelihood = self._compute_likelihood(X)
            with np.errstate(divide="ignore"):
                return np.log(likelihood)
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _generate_sample_from_state(self, state, random_state):
        return ()

    def _initialize_sufficient_statistics(self):
        stats = {'nobs': 0,
                 'start': np.zeros(self.n_components),
                 'trans': np.zeros((self.n_components, self.n_components))}
        return stats

    def _accumulate_sufficient_statistics(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        impl = {
            "scaling": self._accumulate_sufficient_statistics_scaling,
            "log": self._accumulate_sufficient_statistics_log,
        }[self.implementation]

        return impl(stats=stats, X=X, lattice=lattice, posteriors=posteriors,
                    fwdlattice=fwdlattice, bwdlattice=bwdlattice)

    def _accumulate_sufficient_statistics_scaling(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return
            xi_sum = _hmmc.compute_scaling_xi_sum(
                fwdlattice, self.transmat_, bwdlattice, lattice)
            stats['trans'] += xi_sum

    def _accumulate_sufficient_statistics_log(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return
            log_xi_sum = _hmmc.compute_log_xi_sum(
                fwdlattice, self.transmat_, bwdlattice, lattice)
            with np.errstate(under="ignore"):
                stats['trans'] += np.exp(log_xi_sum)



    def _do_estep(self, X, lengths):
        impl = {
            "scaling": self._fit_scaling,
            "log": self._fit_log,
        }[self.implementation]

        stats = self._initialize_sufficient_statistics()
        self._estep_begin()
        curr_logprob = 0
        for sub_X in _utils.split_X_lengths(X, lengths):
            lattice, logprob, posteriors, fwdlattice, bwdlattice = impl(sub_X)
            # Derived HMM classes will implement the following method to
            # update their probability distributions, so keep
            # a single call to this method for simplicity.
            self._accumulate_sufficient_statistics(
                stats, sub_X, lattice, posteriors, fwdlattice,
                bwdlattice)
            curr_logprob += logprob
        return stats, curr_logprob

    def _estep_begin(self):
        pass

    def _compute_lower_bound(self, curr_logprob):
        raise NotImplementedError("Must be overridden in subclass")


class BaseHMM(_AbstractHMM):
    
    def __init__(self, n_components=1,
                 startprob_prior=1.0, transmat_prior=1.0,
                 algorithm="viterbi", random_state=None,
                 n_iter=10, tol=1e-2, verbose=False,
                 params=string.ascii_letters,
                 init_params=string.ascii_letters,
                 implementation="log"):

        super().__init__(
            n_components=n_components, algorithm=algorithm,
            random_state=random_state, n_iter=n_iter, tol=tol,
            verbose=verbose, params=params, init_params=init_params,
            implementation=implementation)
        self.startprob_prior = startprob_prior
        self.transmat_prior = transmat_prior
        self.monitor_ = ConvergenceMonitor(self.tol, self.n_iter, self.verbose)

    def get_stationary_distribution(self):
        """Compute the stationary distribution of states."""
        # The stationary distribution is proportional to the left-eigenvector
        # associated with the largest eigenvalue (i.e., 1) of the transition
        # matrix.
        check_is_fitted(self, "transmat_")
        eigvals, eigvecs = linalg.eig(self.transmat_.T)
        eigvec = np.real_if_close(eigvecs[:, np.argmax(eigvals)])
        return eigvec / eigvec.sum()

    def _fit_scaling(self, X):
        frameprob = self._compute_likelihood(X)
        log_prob, fwdlattice, scaling_factors = _hmmc.forward_scaling(
            self.startprob_, self.transmat_, frameprob)
        bwdlattice = _hmmc.backward_scaling(
            self.startprob_, self.transmat_, frameprob, scaling_factors)
        posteriors = self._compute_posteriors_scaling(fwdlattice, bwdlattice)
        return frameprob, log_prob, posteriors, fwdlattice, bwdlattice

    def _fit_log(self, X):
        log_frameprob = self._compute_log_likelihood(X)
        log_prob, fwdlattice = _hmmc.forward_log(
            self.startprob_, self.transmat_, log_frameprob)
        bwdlattice = _hmmc.backward_log(
            self.startprob_, self.transmat_, log_frameprob)
        posteriors = self._compute_posteriors_log(fwdlattice, bwdlattice)
        return log_frameprob, log_prob, posteriors, fwdlattice, bwdlattice

    def _do_mstep(self, stats):
        if 's' in self.params:
            startprob_ = np.maximum(self.startprob_prior - 1 + stats['start'],
                                    0)
            self.startprob_ = np.where(self.startprob_ == 0, 0, startprob_)
            normalize(self.startprob_)
        if 't' in self.params:
            transmat_ = np.maximum(self.transmat_prior - 1 + stats['trans'], 0)
            self.transmat_ = np.where(self.transmat_ == 0, 0, transmat_)
            normalize(self.transmat_, axis=1)

    def _compute_lower_bound(self, curr_logprob):
        return curr_logprob

    def _init(self, X, lengths=None):
        self._check_and_set_n_features(X)
        init = 1. / self.n_components
        random_state = check_random_state(self.random_state)
        if self._needs_init("s", "startprob_"):
            self.startprob_ = random_state.dirichlet(
                np.full(self.n_components, init))
        if self._needs_init("t", "transmat_"):
            self.transmat_ = random_state.dirichlet(
                np.full(self.n_components, init), size=self.n_components)
        n_fit_scalars_per_param = self._get_n_fit_scalars_per_param()
        if n_fit_scalars_per_param is not None:
            n_fit_scalars = sum(
                n_fit_scalars_per_param[p] for p in self.params)
            if X.size < n_fit_scalars:
                _log.warning(
                    "Fitting a model with %d free scalar parameters with only "
                    "%d data points will result in a degenerate solution.",
                    n_fit_scalars, X.size)

    def _check_sum_1(self, name):
        """Check that an array describes one or more distributions."""
        s = getattr(self, name).sum(axis=-1)
        if not np.allclose(s, 1):
            raise ValueError(
                f"{name} must sum to 1 (got {s:.4f})" if s.ndim == 0 else
                f"{name} rows must sum to 1 (got {s})" if s.ndim == 1 else
                "Expected 1D or 2D array")

    def _check(self):
        self.startprob_ = np.asarray(self.startprob_)
        if len(self.startprob_) != self.n_components:
            raise ValueError("startprob_ must have length n_components")
        self._check_sum_1("startprob_")

        self.transmat_ = np.asarray(self.transmat_)
        if self.transmat_.shape != (self.n_components, self.n_components):
            raise ValueError(
                "transmat_ must have shape (n_components, n_components)")
        self._check_sum_1("transmat_")

    def aic(self, X, lengths=None):
        n_params = sum(self._get_n_fit_scalars_per_param().values())
        return -2 * self.score(X, lengths=lengths) + 2 * n_params

    def bic(self, X, lengths=None):
        n_params = sum(self._get_n_fit_scalars_per_param().values())
        return -2 * self.score(X, lengths=lengths) + n_params * np.log(len(X))

_BaseHMM = BaseHMM

In [None]:
import logging

import numpy as np
from scipy import linalg
from sklearn import cluster
from sklearn.utils import check_random_state

from . import _emissions, _utils
from .base import BaseHMM
from .utils import fill_covars, normalize


__all__ = [
    "GMMHMM", "GaussianHMM", "CategoricalHMM", "MultinomialHMM", "PoissonHMM",
]


_log = logging.getLogger(__name__)
COVARIANCE_TYPES = frozenset(("spherical", "diag", "full", "tied"))


class GMMHMM(_emissions.BaseGMMHMM):
    def __init__(self, n_components=1, n_mix=1,
                 min_covar=1e-3, startprob_prior=1.0, transmat_prior=1.0,
                 weights_prior=1.0, means_prior=0.0, means_weight=0.0,
                 covars_prior=None, covars_weight=None,
                 algorithm="viterbi", covariance_type="diag",
                 random_state=None, n_iter=10, tol=1e-2,
                 verbose=False, params="stmcw",
                 init_params="stmcw",
                 implementation="log"):

        BaseHMM.__init__(self, n_components,
                         startprob_prior=startprob_prior,
                         transmat_prior=transmat_prior,
                         algorithm=algorithm, random_state=random_state,
                         n_iter=n_iter, tol=tol, verbose=verbose,
                         params=params, init_params=init_params,
                         implementation=implementation)
        self.covariance_type = covariance_type
        self.min_covar = min_covar
        self.n_mix = n_mix
        self.weights_prior = weights_prior
        self.means_prior = means_prior
        self.means_weight = means_weight
        self.covars_prior = covars_prior
        self.covars_weight = covars_weight

    def _init(self, X, lengths=None):
        super()._init(X, lengths=None)
        nc = self.n_components
        nf = self.n_features
        nm = self.n_mix

        def compute_cv():
            return np.cov(X.T) + self.min_covar * np.eye(nf)

        # Default values for covariance prior parameters
        self._init_covar_priors()
        self._fix_priors_shape()

        main_kmeans = cluster.KMeans(n_clusters=nc,
                                     random_state=self.random_state,
                                     n_init=10)  # sklearn >=1.2 compat.
        cv = None  # covariance matrix
        labels = main_kmeans.fit_predict(X)
        main_centroid = np.mean(main_kmeans.cluster_centers_, axis=0)
        means = []
        for label in range(nc):
            kmeans = cluster.KMeans(n_clusters=nm,
                                    random_state=self.random_state,
                                    n_init=10)  # sklearn >=1.2 compat.
            X_cluster = X[np.where(labels == label)]
            if X_cluster.shape[0] >= nm:
                kmeans.fit(X_cluster)
                means.append(kmeans.cluster_centers_)
            else:
                if cv is None:
                    cv = compute_cv()
                m_cluster = np.random.multivariate_normal(main_centroid,
                                                          cov=cv,
                                                          size=nm)
                means.append(m_cluster)

        if self._needs_init("w", "weights_"):
            self.weights_ = np.full((nc, nm), 1 / nm)

        if self._needs_init("m", "means_"):
            self.means_ = np.stack(means)

        if self._needs_init("c", "covars_"):
            if cv is None:
                cv = compute_cv()
            if not cv.shape:
                cv.shape = (1, 1)
            if self.covariance_type == 'tied':
                self.covars_ = np.zeros((nc, nf, nf))
                self.covars_[:] = cv
            elif self.covariance_type == 'full':
                self.covars_ = np.zeros((nc, nm, nf, nf))
                self.covars_[:] = cv
            elif self.covariance_type == 'diag':
                self.covars_ = np.zeros((nc, nm, nf))
                self.covars_[:] = np.diag(cv)
            elif self.covariance_type == 'spherical':
                self.covars_ = np.zeros((nc, nm))
                self.covars_[:] = cv.mean()

    def _init_covar_priors(self):
        if self.covariance_type == "full":
            if self.covars_prior is None:
                self.covars_prior = 0.0
            if self.covars_weight is None:
                self.covars_weight = -(1.0 + self.n_features + 1.0)
        elif self.covariance_type == "tied":
            if self.covars_prior is None:
                self.covars_prior = 0.0
            if self.covars_weight is None:
                self.covars_weight = -(self.n_mix + self.n_features + 1.0)
        elif self.covariance_type == "diag":
            if self.covars_prior is None:
                self.covars_prior = -1.5
            if self.covars_weight is None:
                self.covars_weight = 0.0
        elif self.covariance_type == "spherical":
            if self.covars_prior is None:
                self.covars_prior = -(self.n_mix + 2.0) / 2.0
            if self.covars_weight is None:
                self.covars_weight = 0.0

    def _fix_priors_shape(self):
        nc = self.n_components
        nf = self.n_features
        nm = self.n_mix
        self.weights_prior = np.broadcast_to(
            self.weights_prior, (nc, nm)).copy()
        self.means_prior = np.broadcast_to(
            self.means_prior, (nc, nm, nf)).copy()
        self.means_weight = np.broadcast_to(
            self.means_weight, (nc, nm)).copy()

        if self.covariance_type == "full":
            self.covars_prior = np.broadcast_to(
                self.covars_prior, (nc, nm, nf, nf)).copy()
            self.covars_weight = np.broadcast_to(
                self.covars_weight, (nc, nm)).copy()
        elif self.covariance_type == "tied":
            self.covars_prior = np.broadcast_to(
                self.covars_prior, (nc, nf, nf)).copy()
            self.covars_weight = np.broadcast_to(
                self.covars_weight, nc).copy()
        elif self.covariance_type == "diag":
            self.covars_prior = np.broadcast_to(
                self.covars_prior, (nc, nm, nf)).copy()
            self.covars_weight = np.broadcast_to(
                self.covars_weight, (nc, nm, nf)).copy()
        elif self.covariance_type == "spherical":
            self.covars_prior = np.broadcast_to(
                self.covars_prior, (nc, nm)).copy()
            self.covars_weight = np.broadcast_to(
                self.covars_weight, (nc, nm)).copy()

    def _check(self):
        super()._check()
        if not hasattr(self, "n_features"):
            self.n_features = self.means_.shape[2]
        nc = self.n_components
        nf = self.n_features
        nm = self.n_mix

        self._init_covar_priors()
        self._fix_priors_shape()

        # Checking covariance type
        if self.covariance_type not in COVARIANCE_TYPES:
            raise ValueError(
                f"covariance_type must be one of {COVARIANCE_TYPES}")

        self.weights_ = np.array(self.weights_)
        # Checking mixture weights' shape
        if self.weights_.shape != (nc, nm):
            raise ValueError(
                f"weights_ must have shape (n_components, n_mix), "
                f"actual shape: {self.weights_.shape}")

        # Checking mixture weights' mathematical correctness
        self._check_sum_1("weights_")

        # Checking means' shape
        self.means_ = np.array(self.means_)
        if self.means_.shape != (nc, nm, nf):
            raise ValueError(
                f"means_ must have shape (n_components, n_mix, n_features), "
                f"actual shape: {self.means_.shape}")

        # Checking covariances' shape
        self.covars_ = np.array(self.covars_)
        covars_shape = self.covars_.shape
        needed_shapes = {
            "spherical": (nc, nm),
            "tied": (nc, nf, nf),
            "diag": (nc, nm, nf),
            "full": (nc, nm, nf, nf),
        }
        needed_shape = needed_shapes[self.covariance_type]
        if covars_shape != needed_shape:
            raise ValueError(
                f"{self.covariance_type!r} mixture covars must have shape "
                f"{needed_shape}, actual shape: {covars_shape}")

        # Checking covariances' mathematical correctness
        if (self.covariance_type == "spherical" or
                self.covariance_type == "diag"):
            if np.any(self.covars_ < 0):
                raise ValueError(f"{self.covariance_type!r} mixture covars "
                                 f"must be non-negative")
            if np.any(self.covars_ == 0):
                _log.warning("Degenerate mixture covariance")
        elif self.covariance_type == "tied":
            for i, covar in enumerate(self.covars_):
                if not np.allclose(covar, covar.T):
                    raise ValueError(
                        f"Covariance of state #{i} is not symmetric")
                min_eigvalsh = linalg.eigvalsh(covar).min()
                if min_eigvalsh < 0:
                    raise ValueError(
                        f"Covariance of state #{i} is not positive definite")
                if min_eigvalsh == 0:
                    _log.warning("Covariance of state #%d has a null "
                                 "eigenvalue.", i)
        elif self.covariance_type == "full":
            for i, mix_covars in enumerate(self.covars_):
                for j, covar in enumerate(mix_covars):
                    if not np.allclose(covar, covar.T):
                        raise ValueError(
                            f"Covariance of state #{i}, mixture #{j} is not "
                            f"symmetric")
                    min_eigvalsh = linalg.eigvalsh(covar).min()
                    if min_eigvalsh < 0:
                        raise ValueError(
                            f"Covariance of state #{i}, mixture #{j} is not "
                            f"positive definite")
                    if min_eigvalsh == 0:
                        _log.warning("Covariance of state #%d, mixture #%d "
                                     "has a null eigenvalue.", i, j)

    def _do_mstep(self, stats):
        super()._do_mstep(stats)
        nf = self.n_features
        nm = self.n_mix

        # Maximizing weights
        if 'w' in self.params:
            alphas_minus_one = self.weights_prior - 1
            w_n = stats['post_mix_sum'] + alphas_minus_one
            w_d = (stats['post_sum'] + alphas_minus_one.sum(axis=1))[:, None]
            self.weights_ = w_n / w_d

        # Maximizing means
        if 'm' in self.params:
            m_n = stats['m_n']
            m_d = stats['post_mix_sum'] + self.means_weight
            
            m_d[(self.weights_ == 0) & (m_n == 0).all(axis=-1)] = 1
            self.means_ = m_n / m_d[:, :, None]

        # Maximizing covariances
        if 'c' in self.params:
            lambdas, mus = self.means_weight, self.means_prior
            centered_means = self.means_ - mus

            def outer_f(x):  # Outer product over features.
                return x[..., :, None] * x[..., None, :]

            if self.covariance_type == 'full':
                centered_means_dots = outer_f(centered_means)

                psis_t = np.transpose(self.covars_prior, axes=(0, 1, 3, 2))
                nus = self.covars_weight

                c_n = psis_t + lambdas[:, :, None, None] * centered_means_dots
                c_n += stats['c_n']
                c_d = (
                    stats['post_mix_sum'] + 1 + nus + nf + 1
                )[:, :, None, None]

            elif self.covariance_type == 'diag':
                alphas = self.covars_prior
                betas = self.covars_weight
                centered_means2 = centered_means ** 2

                c_n = lambdas[:, :, None] * centered_means2 + 2 * betas
                c_n += stats['c_n']
                c_d = stats['post_mix_sum'][:, :, None] + 1 + 2 * (alphas + 1)

            elif self.covariance_type == 'spherical':
                centered_means_norm2 = np.einsum(  # Faster than (x**2).sum(-1)
                    '...i,...i', centered_means, centered_means)

                alphas = self.covars_prior
                betas = self.covars_weight

                c_n = lambdas * centered_means_norm2 + 2 * betas
                c_n += stats['c_n']
                c_d = nf * (stats['post_mix_sum'] + 1) + 2 * (alphas + 1)

            elif self.covariance_type == 'tied':
                centered_means_dots = outer_f(centered_means)

                psis_t = np.transpose(self.covars_prior, axes=(0, 2, 1))
                nus = self.covars_weight

                c_n = np.einsum('ij,ijkl->ikl',
                                lambdas, centered_means_dots) + psis_t
                c_n += stats['c_n']
                c_d = (stats['post_sum'] + nm + nus + nf + 1)[:, None, None]

            self.covars_ = c_n / c_d

In [None]:


import numpy as np
from scipy.special import gammaln, digamma

from . import _utils


def kl_dirichlet(q, p):

    q = np.asarray(q)
    p = np.asarray(p)
    qsum = q.sum()
    psum = p.sum()
    return (gammaln(qsum) - gammaln(psum)
            - np.sum(gammaln(q) - gammaln(p))
            + np.einsum("i,i->", (q - p), (digamma(q) - digamma(qsum))))


def kl_normal_distribution(mean_q, variance_q, mean_p, variance_p):
    result = ((np.log(variance_p / variance_q)) / 2
              + ((mean_q - mean_p)**2 + variance_q) / (2 * variance_p)
              - .5)
    assert result >= 0, result
    return result


def kl_multivariate_normal_distribution(mean_q, covar_q, mean_p, covar_p):
    # Ensure arrays
    mean_q = np.asarray(mean_q)
    covar_q = np.asarray(covar_q)
    mean_p = np.asarray(mean_p)
    covar_p = np.asarray(covar_p)

    # Need the precision of distribution p
    precision_p = np.linalg.inv(covar_p)

    mean_diff = mean_q - mean_p
    D = mean_q.shape[0]

    # These correspond to the four terms in the ~wpenny paper documented above
    return .5 * (_utils.logdet(covar_p) - _utils.logdet(covar_q)
                 + np.trace(precision_p @ covar_q)
                 + mean_diff @ precision_p @ mean_diff
                 - D)


def kl_gamma_distribution(b_q, c_q, b_p, c_p):

    result = ((b_q - b_p) * digamma(b_q)
              - gammaln(b_q) + gammaln(b_p)
              + b_p * (np.log(c_q) - np.log(c_p))
              + b_q * (c_p-c_q) / c_q)
    assert result >= 0, result
    return result


def kl_wishart_distribution(dof_q, scale_q, dof_p, scale_p):
    
    scale_q = np.asarray(scale_q)
    scale_p = np.asarray(scale_p)
    D = scale_p.shape[0]
    return ((dof_q - dof_p)/2 * _E(dof_q, scale_q)
            - D * dof_q / 2
            + dof_q / 2 * np.trace(scale_p @ np.linalg.inv(scale_q))
            # Division of logarithm turned into subtraction here
            + _logZ(dof_p, scale_p)
            - _logZ(dof_q, scale_q))


def _E(dof, scale):
    r"""
    $L(a, B) = \int \mathcal{Wishart}(\Gamma; a, B) \log |\Gamma| d\Gamma$
    """
    return (-_utils.logdet(scale / 2)
            + digamma((dof - np.arange(scale.shape[0])) / 2).sum())


def _logZ(dof, scale):
    D = scale.shape[0]
    return ((D * (D - 1) / 4) * np.log(np.pi)
            - dof / 2 * _utils.logdet(scale / 2)
            + gammaln((dof - np.arange(scale.shape[0])) / 2).sum())