diff --git a/experimental_env/experiment/estimators.py b/experimental_env/experiment/estimators.py index f61364ab..4aa8dc0c 100644 --- a/experimental_env/experiment/estimators.py +++ b/experimental_env/experiment/estimators.py @@ -10,7 +10,7 @@ from experimental_env.utils import OrderedProblem, choose_best_mle from mpest import Problem from mpest.em import EM -from mpest.em.methods.l_moments_method import IndicatorEStep, LMomentsMStep +from mpest.em.methods.l_moments_method import LMomentsMStep from mpest.em.methods.likelihood_method import BayesEStep, LikelihoodMStep from mpest.em.methods.method import Method from mpest.optimizers import ALL_OPTIMIZERS @@ -18,7 +18,7 @@ METHODS: dict = { "Likelihood": [[Factory(BayesEStep), Factory(LikelihoodMStep, optimizer)] for optimizer in ALL_OPTIMIZERS], - "L-moments": [Factory(IndicatorEStep), Factory(LMomentsMStep)], + "L-moments": [Factory(BayesEStep), Factory(LMomentsMStep)], } diff --git a/mpest/em/methods/l_moments_method.py b/mpest/em/methods/l_moments_method.py index bd00210a..5490996d 100644 --- a/mpest/em/methods/l_moments_method.py +++ b/mpest/em/methods/l_moments_method.py @@ -9,103 +9,11 @@ from mpest.core.distribution import Distribution from mpest.core.mixture_distribution import MixtureDistribution from mpest.core.problem import Problem, Result -from mpest.em.methods.abstract_steps import AExpectation, AMaximization -from mpest.exceptions import EStepError, MStepError +from mpest.em.methods.abstract_steps import AMaximization +from mpest.exceptions import MStepError from mpest.utils import ResultWithError, find_file -EResult = tuple[Problem, list[float | None], np.ndarray] | ResultWithError[MixtureDistribution] - - -class IndicatorEStep(AExpectation[EResult]): - """ - Class which represents method for performing E step in L moments method. - """ - - def __init__(self): - """ - Object constructor - """ - - self.indicators: np.ndarray - - def calc_indicators(self, problem: Problem) -> None: - """ - A function that recalculates the matrix with indicators. - - :param problem: Object of class Problem, which contains samples and mixture. - """ - - samples, mixture = np.sort(problem.samples), problem.distributions - priors = np.array([dist.prior_probability for dist in mixture]) - k, m = len(mixture), len(samples) - - z = np.zeros((k, m), dtype=float) - - pdf_values = np.zeros((k, m), dtype=float) - for j, d_j in enumerate(mixture): - pdf_values[j] = [d_j.model.pdf(samples[i], d_j.params) for i in range(m)] - - denominators = np.sum(priors[:, np.newaxis] * pdf_values, axis=0) - if 0.0 in denominators: - self.indicators = np.ndarray([]) - return None - - for j in range(k): - numerators = priors[j] * pdf_values[j] - z[j] = numerators / denominators - - self.indicators = z - return None - - def update_priors(self, problem: Problem) -> list[float | None]: - """ - A function that recalculates the list with prior probabilities. - - :param problem: Object of class Problem, which contains samples and mixture. - :return: List with prior probabilities - """ - - k, m = len(problem.distributions), len(problem.samples) - - new_priors = [] - for j in range(k): - p_j = np.sum([self.indicators[j][i] for i in range(m)]) - new_priors.append(p_j / m) - - return new_priors - - def step(self, problem: Problem) -> EResult: - """ - A function that performs E step - - :param problem: Object of class Problem, which contains samples and mixture. - :return: Tuple with problem, new_priors and indicators. - """ - sorted_problem = Problem(np.sort(problem.samples), problem.distributions) - - if ( - any(d.model.name == "Gaussian" for d in sorted_problem.distributions) - and sorted_problem.samples[0] < 0 - and not hasattr(self, "indicators") - ): - self.indicators = distribute_numbers_transposed(sorted_problem.samples, sorted_problem.distributions) - else: - self.calc_indicators(sorted_problem) - - if self.indicators.shape == (): - error = EStepError("The indicators could not be calculated") - return ResultWithError(sorted_problem.distributions, error) - - if np.isnan(self.indicators).any(): - return ResultWithError(sorted_problem.distributions, EStepError("")) - - new_priors = self.update_priors(sorted_problem) - new_problem = Problem( - sorted_problem.samples, - MixtureDistribution.from_distributions(sorted_problem.distributions, new_priors), - ) - return new_problem, new_priors, self.indicators - +EResult = tuple[Problem, np.ndarray] | ResultWithError[MixtureDistribution] class LMomentsMStep(AMaximization[EResult]): """ @@ -147,6 +55,7 @@ def calculate_mr_j(self, r: int, j: int, samples: Samples, indicators: np.ndarra mr_j += p_rk * b_k return mr_j + def step(self, e_result: EResult) -> Result: """ A function that performs E step @@ -157,8 +66,13 @@ def step(self, e_result: EResult) -> Result: if isinstance(e_result, ResultWithError): return e_result - problem, new_priors, indicators = e_result - samples, mixture = problem.samples, problem.distributions + problem, indicators = e_result + + samples = problem.samples + + mixture = problem.distributions + + new_priors = np.sum(indicators, axis=1) / len(samples) max_params_count = max(len(d.params) for d in mixture) l_moments = np.zeros(shape=[len(mixture), max_params_count]) @@ -180,29 +94,3 @@ def step(self, e_result: EResult) -> Result: new_mixture = MixtureDistribution.from_distributions(new_distributions, new_priors) return ResultWithError(new_mixture) - - -def distribute_numbers_transposed(numbers, mixture: MixtureDistribution): - """ - A function that implements a heuristic for negative elements - """ - - # TODO: To complete the heuristic approach - - n = len(numbers) - m = len(mixture) - - gaussian_indices = [i for i, dist in enumerate(mixture) if dist.model.name == "Gaussian"] - gaussian_index = gaussian_indices[0] - - result = np.zeros((m, n)) - - for i, num in enumerate(numbers): - if num < 0: - result[gaussian_index, i] = 1 - else: - uniform_probability = 1 / len(mixture) - for j in range(len(mixture)): - result[j, i] = uniform_probability - - return result diff --git a/mpest/em/methods/likelihood_method.py b/mpest/em/methods/likelihood_method.py index d3a1413b..bd8ee7e1 100644 --- a/mpest/em/methods/likelihood_method.py +++ b/mpest/em/methods/likelihood_method.py @@ -8,12 +8,12 @@ from mpest.core.mixture_distribution import MixtureDistribution from mpest.core.problem import Problem, Result from mpest.em.methods.abstract_steps import AExpectation, AMaximization -from mpest.exceptions import SampleError +from mpest.exceptions import EStepError from mpest.models import AModel, AModelDifferentiable from mpest.optimizers import AOptimizerJacobian, TOptimizer from mpest.utils import ResultWithError -EResult = tuple[list[float], np.ndarray, Problem] | ResultWithError[MixtureDistribution] +EResult = tuple[Problem, np.ndarray] | ResultWithError[MixtureDistribution] class BayesEStep(AExpectation[EResult]): @@ -21,6 +21,7 @@ class BayesEStep(AExpectation[EResult]): Class which represents Bayesian method for calculating matrix for M step in likelihood method """ + def step(self, problem: Problem) -> EResult: """ A function that performs E step @@ -28,10 +29,13 @@ def step(self, problem: Problem) -> EResult: :param problem: Object of class Problem, which contains samples and mixture. :return: Return active_samples, matrix with probabilities and problem. """ - samples = problem.samples + + samples = np.sort(problem.samples) mixture = problem.distributions + p_xij = [] active_samples = [] + for x in samples: p = np.array([d.model.pdf(x, d.params) for d in mixture]) if np.any(p): @@ -39,24 +43,30 @@ def step(self, problem: Problem) -> EResult: active_samples.append(x) if not active_samples: - error = SampleError("None of the elements in the sample is correct for this mixture") + error = EStepError("None of the elements in the sample is correct for this mixture") return ResultWithError(mixture, error) # h[j, i] contains probability of X_i to be a part of distribution j m = len(p_xij) k = len(mixture) - h = np.zeros([k, m], dtype=float) - curr_w = np.array([d.prior_probability for d in mixture]) + h = np.zeros([k, m], dtype=float) #matrix of hidden variables + + curr_weight = np.array([d.prior_probability for d in mixture]) for i, p in enumerate(p_xij): - wp = curr_w * p + wp = curr_weight * p swp = np.sum(wp) if not swp: return ResultWithError(mixture, ZeroDivisionError()) + h[:, i] = wp / swp - h[:, i] = wp / swp + if np.isnan(h).any(): + return ResultWithError(problem.distributions, EStepError("")) - return active_samples, h, problem + new_problem = Problem(np.array(active_samples), + MixtureDistribution.from_distributions(mixture)) + + return new_problem, h # class ML(AExpectation[EResult]): @@ -98,10 +108,11 @@ def step(self, e_result: EResult) -> Result: if isinstance(e_result, ResultWithError): return e_result - samples, h, problem = e_result + problem, h = e_result optimizer = self.optimizer m = len(h[0]) + samples = problem.samples mixture = problem.distributions new_w = np.sum(h, axis=1) / m diff --git a/tests/tests_l_moments/l_moments_utils.py b/tests/tests_l_moments/l_moments_utils.py index 8f89dd44..749eb4bf 100644 --- a/tests/tests_l_moments/l_moments_utils.py +++ b/tests/tests_l_moments/l_moments_utils.py @@ -7,13 +7,14 @@ FiniteChecker, PriorProbabilityThresholdChecker, ) -from mpest.em.methods.l_moments_method import IndicatorEStep, LMomentsMStep +from mpest.em.methods.l_moments_method import LMomentsMStep +from mpest.em.methods.likelihood_method import BayesEStep from mpest.em.methods.method import Method def run_test(problem: Problem, deviation: float) -> Result: """TODO""" - method = Method(IndicatorEStep(), LMomentsMStep()) + method = Method(BayesEStep(), LMomentsMStep()) em_algo = EM( StepCountBreakpointer() + ParamDifferBreakpointer(deviation=deviation), FiniteChecker() + PriorProbabilityThresholdChecker(),