diff --git a/scopyon/analysis/hmm.py b/scopyon/analysis/hmm.py index 384abd6..cbe4cab 100644 --- a/scopyon/analysis/hmm.py +++ b/scopyon/analysis/hmm.py @@ -183,9 +183,9 @@ class PTHMM(_BaseHMM): r"""Hidden Markov Model for Particle Tracking. Args: - n_components (int): Number of states. + n_diffusivities (int): Number of diffusivity states. n_oligomers (int): Number of oligomeric states. - n_components must be divisible by n_oligomers. + n_components is equal to (n_diffusivities * n_oliogmers). min_var (float, optional): Floor on the variance to prevent overfitting. Defaults to 1e-5. startprob_prior (array, optional): @@ -225,7 +225,7 @@ class PTHMM(_BaseHMM): Initial state occupation distribution. transmat\_ (array): shape (n_components, n_components). Matrix of transition probabilities between states. - diffusivities\_ (array): shape (n_components, 1). + diffusivities\_ (array): shape (n_diffusivities, 1). Diffusion constants for each state. intensity_means\_ (array): shape (1, 1). Base mean parameter of intensity distributions. @@ -233,53 +233,55 @@ class PTHMM(_BaseHMM): Base Variance parameter of intensity distributions. """ - def __init__(self, n_components=1, n_oligomers=1, + def __init__(self, n_diffusivities=3, n_oligomers=4, min_var=1e-5, startprob_prior=1.0, transmat_prior=1.0, algorithm="viterbi", random_state=None, n_iter=10, tol=1e-2, verbose=False, params="stdmv", init_params="stdmv"): - _BaseHMM.__init__(self, n_components, + _BaseHMM.__init__(self, n_diffusivities * n_oligomers, startprob_prior=startprob_prior, transmat_prior=transmat_prior, algorithm=algorithm, random_state=random_state, n_iter=n_iter, tol=tol, params=params, verbose=verbose, init_params=init_params) + self.covariance_type = covariance_type self.min_var = min_var + self.n_diffusivities = n_diffusivities self.n_oligomers = n_oligomers - assert self.n_components % self.n_oligomers == 0 + assert self.n_components == self.n_diffusivities * self.n_oligomers def _check(self): super()._check() - self.diffusivities_ = np.asarray(self.diffusivities_) - assert self.diffusivities_.shape == (self.n_components, 1) - self.intensity_means_ = np.asarray(self.intensity_means_) + self.diffusivities_ = numpy.asarray(self.diffusivities_) + assert self.diffusivities_.shape == (self.n_diffusivities, 1) + self.intensity_means_ = numpy.asarray(self.intensity_means_) assert self.intensity_means_.shape == (1, 1) - self.intensity_vars_ = np.asarray(self.intensity_vars_) + self.intensity_vars_ = numpy.asarray(self.intensity_vars_) assert self.intensity_vars_.shape == (1, 1) - self.n_features = 1 + + self.n_features = 2 def _generate_sample_from_state(self, state, random_state=None): - D = self.diffusivities_[state] + m = state // self.n_oligomers n = state % self.n_oligomers mean = self.intensity_means_[0] * (n + 1) var = self.intensity_vars_[0] * (n + 1) - return np.hstack([ - np.sqrt(np.power(random_state.normal(scale=np.sqrt(2 * D), size=2), 2).sum(keepdims=True)), - random_state.normal(loc=mean, scale=np.sqrt(var), size=(1, )), + D = self.diffusivities_[m] + return numpy.hstack([ + numpy.sqrt(numpy.power(random_state.normal(scale=numpy.sqrt(2 * D), size=2), 2).sum(keepdims=True)), + random_state.normal(loc=mean, scale=numpy.sqrt(var), size=(1, )), ]) def _get_n_fit_scalars_per_param(self): - nc = self.n_components - nf = self.n_features return { - "s": nc - 1, - "t": nc * (nc - 1), - "d": nc * nf, - "m": 1 * nf, - "v": 1 * nf, + "s": self.n_components - 1, + "t": self.n_components * (self.n_components - 1), + "d": self.n_diffusivities, + "m": 1, + "v": 1, } def _init(self, X, lengths=None): @@ -287,49 +289,48 @@ def _init(self, X, lengths=None): super()._init(X, lengths=lengths) _, n_features = X.shape + assert n_features == 2 if hasattr(self, 'n_features') and self.n_features != n_features: raise ValueError('Unexpected number of dimensions, got %s but ' 'expected %s' % (n_features, self.n_features)) - self.n_features = n_features if 'd' in self.init_params or not hasattr(self, "diffusivities_"): - diffusivity_means = np.mean(X[:, [0]], axis=0) * 0.25 - variations = np.arange(1, self.n_components + 1) + diffusivity_means = numpy.mean(X[:, [0]], axis=0) * 0.25 + variations = numpy.arange(1, self.n_diffusivities + 1) variations = variations / variations.sum() - self.diffusivities_ = diffusivity_means * variations[:, np.newaxis] + self.diffusivities_ = diffusivity_means * variations[:, numpy.newaxis] if 'm' in self.init_params or not hasattr(self, "intensity_means_"): - # from sklearn import cluster # kmeans = cluster.KMeans(n_clusters=self.n_components, # random_state=self.random_state) # kmeans.fit(X[:, [1]]) # self.intensity_means_ = kmeans.cluster_centers_ - self.intensity_means_ = np.array([[np.average(X[:, 1]) * 0.5]]) + self.intensity_means_ = numpy.array([[numpy.average(X[:, 1]) * 0.5]]) if 'v' in self.init_params or not hasattr(self, "intensity_vars_"): - var = np.var(X[:, [1]].T) + self.min_var - self.intensity_vars_ = np.array([[var]]) + var = numpy.var(X[:, [1]].T) + self.min_var + self.intensity_vars_ = numpy.array([[var]]) def _initialize_sufficient_statistics(self): stats = super()._initialize_sufficient_statistics() - stats['post'] = np.zeros(self.n_components) - stats['obs1**2'] = np.zeros((self.n_components, 1)) - stats['obs2'] = np.zeros((self.n_components, 1)) - stats['obs2**2'] = np.zeros((self.n_components, 1)) + stats['post'] = numpy.zeros(self.n_components) + stats['obs1**2'] = numpy.zeros((self.n_components, 1)) + stats['obs2'] = numpy.zeros((self.n_components, 1)) + stats['obs2**2'] = numpy.zeros((self.n_components, 1)) return stats def _compute_log_likelihood(self, X): - D = self.diffusivities_ + # D = self.diffusivities_ + D = numpy.repeat(self.diffusivities_, self.n_oligomers, axis=0) mean = self.intensity_means_[0, 0] - mean *= np.tile(np.arange(1, self.n_oligomers + 1), (1, self.n_components // self.n_oligomers)).T + mean *= numpy.tile(numpy.arange(1, self.n_oligomers + 1), (1, self.n_diffusivities)).T var = self.intensity_vars_[0, 0] - var *= np.tile(np.arange(1, self.n_oligomers + 1), (1, self.n_components // self.n_oligomers)).T - + var *= numpy.tile(numpy.arange(1, self.n_oligomers + 1), (1, self.n_diffusivities)).T if any(var <= 0.0): raise ValueError(f'Variance must be positive [{var}]') - q1 = np.log(X[:, [0]] / (2 * D[:, 0])) - (X[:, [0]] ** 2 / (4 * D[:, 0])) - q2 = -0.5 * np.log(2 * np.pi * var[:, 0]) - (X[:, [1]] - mean[:, 0]) ** 2 / (2 * var[:, 0]) + q1 = numpy.log(X[:, [0]] / (2 * D[:, 0])) - (X[:, [0]] ** 2 / (4 * D[:, 0])) + q2 = -0.5 * numpy.log(2 * numpy.pi * var[:, 0]) - (X[:, [1]] - mean[:, 0]) ** 2 / (2 * var[:, 0]) # print("mean=", mean) # print("var=", var) # print("self.intensity_means_.shape=", self.intensity_means_.shape) @@ -346,11 +347,11 @@ def _accumulate_sufficient_statistics(self, stats, obs, framelogprob, if any(param in self.params for param in 'dmv'): stats['post'] += posteriors.sum(axis=0) if 'd' in self.params: - stats['obs1**2'] += np.dot(posteriors.T, obs[:, [0]] ** 2) + stats['obs1**2'] += numpy.dot(posteriors.T, obs[:, [0]] ** 2) if 'm' in self.params: - stats['obs2'] += np.dot(posteriors.T, obs[:, [1]]) + stats['obs2'] += numpy.dot(posteriors.T, obs[:, [1]]) if 'v' in self.params: - stats['obs2**2'] += np.dot(posteriors.T, obs[:, [1]] ** 2) + stats['obs2**2'] += numpy.dot(posteriors.T, obs[:, [1]] ** 2) # print("posteriors=", posteriors.shape) # print("obs=", obs.shape) @@ -363,7 +364,7 @@ def _accumulate_sufficient_statistics(self, stats, obs, framelogprob, def _do_mstep(self, stats): super()._do_mstep(stats) - denom = stats['post'][:, np.newaxis] + denom = stats['post'][:, numpy.newaxis] # print("denom=", denom.shape) # print("stats['post']=", stats['post'].shape) # print("stats['obs1**2']=", stats['obs1**2'].shape) @@ -374,18 +375,19 @@ def _do_mstep(self, stats): # print("intensity_vars_=", self.intensity_vars_) if 'd' in self.params: - self.diffusivities_ = 0.25 * stats['obs1**2'] / denom + k = numpy.repeat(numpy.identity(self.n_diffusivities), self.n_oligomers, axis=1) + self.diffusivities_ = 0.25 * numpy.dot(k, stats['obs1**2']) / numpy.dot(k, denom) if 'm' in self.params: post = denom x = stats['obs2'] - k = np.tile(np.arange(1, self.n_oligomers + 1), (1, self.n_components // self.n_oligomers)) - self.intensity_means_ = x.sum(axis=0) / np.dot(k, post) + k = numpy.tile(numpy.arange(1, self.n_oligomers + 1), (1, self.n_diffusivities)) + self.intensity_means_ = x.sum(axis=0) / numpy.dot(k, post) if 'v' in self.params: post = denom x = stats['obs2'] x2 = stats['obs2**2'] mu = self.intensity_means_ - k = np.tile(np.arange(1, self.n_oligomers + 1), (1, self.n_components // self.n_oligomers)) - self.intensity_vars_ = (np.dot(1 / k, x2) - 2 * mu * x.sum(axis=0) + mu ** 2 * np.dot(k, post)) / post.sum(axis=0) + k = numpy.tile(numpy.arange(1, self.n_oligomers + 1), (1, self.n_diffusivities)) + self.intensity_vars_ = (numpy.dot(1 / k, x2) - 2 * mu * x.sum(axis=0) + mu ** 2 * numpy.dot(k, post)) / post.sum(axis=0)