Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions experimental_env/experiment/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from experimental_env.utils import OrderedProblem, choose_best_mle
from mpest import Problem
from mpest.em import EM
from mpest.em.methods.l_moments_method import IndicatorEStep, LMomentsMStep
from mpest.em.methods.l_moments_method import LMomentsMStep
from mpest.em.methods.likelihood_method import BayesEStep, LikelihoodMStep
from mpest.em.methods.method import Method
from mpest.optimizers import ALL_OPTIMIZERS
from mpest.utils import ANamed, Factory, ResultWithLog

METHODS: dict = {
"Likelihood": [[Factory(BayesEStep), Factory(LikelihoodMStep, optimizer)] for optimizer in ALL_OPTIMIZERS],
"L-moments": [Factory(IndicatorEStep), Factory(LMomentsMStep)],
"L-moments": [Factory(BayesEStep), Factory(LMomentsMStep)],
}


Expand Down
134 changes: 11 additions & 123 deletions mpest/em/methods/l_moments_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,103 +9,11 @@
from mpest.core.distribution import Distribution
from mpest.core.mixture_distribution import MixtureDistribution
from mpest.core.problem import Problem, Result
from mpest.em.methods.abstract_steps import AExpectation, AMaximization
from mpest.exceptions import EStepError, MStepError
from mpest.em.methods.abstract_steps import AMaximization
from mpest.exceptions import MStepError
from mpest.utils import ResultWithError, find_file

EResult = tuple[Problem, list[float | None], np.ndarray] | ResultWithError[MixtureDistribution]


class IndicatorEStep(AExpectation[EResult]):
"""
Class which represents method for performing E step in L moments method.
"""

def __init__(self):
"""
Object constructor
"""

self.indicators: np.ndarray

def calc_indicators(self, problem: Problem) -> None:
"""
A function that recalculates the matrix with indicators.

:param problem: Object of class Problem, which contains samples and mixture.
"""

samples, mixture = np.sort(problem.samples), problem.distributions
priors = np.array([dist.prior_probability for dist in mixture])
k, m = len(mixture), len(samples)

z = np.zeros((k, m), dtype=float)

pdf_values = np.zeros((k, m), dtype=float)
for j, d_j in enumerate(mixture):
pdf_values[j] = [d_j.model.pdf(samples[i], d_j.params) for i in range(m)]

denominators = np.sum(priors[:, np.newaxis] * pdf_values, axis=0)
if 0.0 in denominators:
self.indicators = np.ndarray([])
return None

for j in range(k):
numerators = priors[j] * pdf_values[j]
z[j] = numerators / denominators

self.indicators = z
return None

def update_priors(self, problem: Problem) -> list[float | None]:
"""
A function that recalculates the list with prior probabilities.

:param problem: Object of class Problem, which contains samples and mixture.
:return: List with prior probabilities
"""

k, m = len(problem.distributions), len(problem.samples)

new_priors = []
for j in range(k):
p_j = np.sum([self.indicators[j][i] for i in range(m)])
new_priors.append(p_j / m)

return new_priors

def step(self, problem: Problem) -> EResult:
"""
A function that performs E step

:param problem: Object of class Problem, which contains samples and mixture.
:return: Tuple with problem, new_priors and indicators.
"""
sorted_problem = Problem(np.sort(problem.samples), problem.distributions)

if (
any(d.model.name == "Gaussian" for d in sorted_problem.distributions)
and sorted_problem.samples[0] < 0
and not hasattr(self, "indicators")
):
self.indicators = distribute_numbers_transposed(sorted_problem.samples, sorted_problem.distributions)
else:
self.calc_indicators(sorted_problem)

if self.indicators.shape == ():
error = EStepError("The indicators could not be calculated")
return ResultWithError(sorted_problem.distributions, error)

if np.isnan(self.indicators).any():
return ResultWithError(sorted_problem.distributions, EStepError(""))

new_priors = self.update_priors(sorted_problem)
new_problem = Problem(
sorted_problem.samples,
MixtureDistribution.from_distributions(sorted_problem.distributions, new_priors),
)
return new_problem, new_priors, self.indicators

EResult = tuple[Problem, np.ndarray] | ResultWithError[MixtureDistribution]

class LMomentsMStep(AMaximization[EResult]):
"""
Expand Down Expand Up @@ -147,6 +55,7 @@ def calculate_mr_j(self, r: int, j: int, samples: Samples, indicators: np.ndarra
mr_j += p_rk * b_k
return mr_j


def step(self, e_result: EResult) -> Result:
"""
A function that performs E step
Expand All @@ -157,8 +66,13 @@ def step(self, e_result: EResult) -> Result:
if isinstance(e_result, ResultWithError):
return e_result

problem, new_priors, indicators = e_result
samples, mixture = problem.samples, problem.distributions
problem, indicators = e_result

samples = problem.samples

mixture = problem.distributions

new_priors = np.sum(indicators, axis=1) / len(samples)

max_params_count = max(len(d.params) for d in mixture)
l_moments = np.zeros(shape=[len(mixture), max_params_count])
Expand All @@ -180,29 +94,3 @@ def step(self, e_result: EResult) -> Result:

new_mixture = MixtureDistribution.from_distributions(new_distributions, new_priors)
return ResultWithError(new_mixture)


def distribute_numbers_transposed(numbers, mixture: MixtureDistribution):
"""
A function that implements a heuristic for negative elements
"""

# TODO: To complete the heuristic approach

n = len(numbers)
m = len(mixture)

gaussian_indices = [i for i, dist in enumerate(mixture) if dist.model.name == "Gaussian"]
gaussian_index = gaussian_indices[0]

result = np.zeros((m, n))

for i, num in enumerate(numbers):
if num < 0:
result[gaussian_index, i] = 1
else:
uniform_probability = 1 / len(mixture)
for j in range(len(mixture)):
result[j, i] = uniform_probability

return result
31 changes: 21 additions & 10 deletions mpest/em/methods/likelihood_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,65 @@
from mpest.core.mixture_distribution import MixtureDistribution
from mpest.core.problem import Problem, Result
from mpest.em.methods.abstract_steps import AExpectation, AMaximization
from mpest.exceptions import SampleError
from mpest.exceptions import EStepError
from mpest.models import AModel, AModelDifferentiable
from mpest.optimizers import AOptimizerJacobian, TOptimizer
from mpest.utils import ResultWithError

EResult = tuple[list[float], np.ndarray, Problem] | ResultWithError[MixtureDistribution]
EResult = tuple[Problem, np.ndarray] | ResultWithError[MixtureDistribution]


class BayesEStep(AExpectation[EResult]):
"""
Class which represents Bayesian method for calculating matrix for M step in likelihood method
"""


def step(self, problem: Problem) -> EResult:
"""
A function that performs E step

:param problem: Object of class Problem, which contains samples and mixture.
:return: Return active_samples, matrix with probabilities and problem.
"""
samples = problem.samples

samples = np.sort(problem.samples)
mixture = problem.distributions

p_xij = []
active_samples = []

for x in samples:
p = np.array([d.model.pdf(x, d.params) for d in mixture])
if np.any(p):
p_xij.append(p)
active_samples.append(x)

if not active_samples:
error = SampleError("None of the elements in the sample is correct for this mixture")
error = EStepError("None of the elements in the sample is correct for this mixture")
return ResultWithError(mixture, error)

# h[j, i] contains probability of X_i to be a part of distribution j
m = len(p_xij)
k = len(mixture)
h = np.zeros([k, m], dtype=float)
curr_w = np.array([d.prior_probability for d in mixture])
h = np.zeros([k, m], dtype=float) #matrix of hidden variables

curr_weight = np.array([d.prior_probability for d in mixture])
for i, p in enumerate(p_xij):
wp = curr_w * p
wp = curr_weight * p
swp = np.sum(wp)

if not swp:
return ResultWithError(mixture, ZeroDivisionError())
h[:, i] = wp / swp

h[:, i] = wp / swp
if np.isnan(h).any():
return ResultWithError(problem.distributions, EStepError(""))

return active_samples, h, problem
new_problem = Problem(np.array(active_samples),
MixtureDistribution.from_distributions(mixture))

return new_problem, h


# class ML(AExpectation[EResult]):
Expand Down Expand Up @@ -98,10 +108,11 @@ def step(self, e_result: EResult) -> Result:
if isinstance(e_result, ResultWithError):
return e_result

samples, h, problem = e_result
problem, h = e_result
optimizer = self.optimizer

m = len(h[0])
samples = problem.samples
mixture = problem.distributions

new_w = np.sum(h, axis=1) / m
Expand Down
5 changes: 3 additions & 2 deletions tests/tests_l_moments/l_moments_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
FiniteChecker,
PriorProbabilityThresholdChecker,
)
from mpest.em.methods.l_moments_method import IndicatorEStep, LMomentsMStep
from mpest.em.methods.l_moments_method import LMomentsMStep
from mpest.em.methods.likelihood_method import BayesEStep
from mpest.em.methods.method import Method


def run_test(problem: Problem, deviation: float) -> Result:
"""TODO"""
method = Method(IndicatorEStep(), LMomentsMStep())
method = Method(BayesEStep(), LMomentsMStep())
em_algo = EM(
StepCountBreakpointer() + ParamDifferBreakpointer(deviation=deviation),
FiniteChecker() + PriorProbabilityThresholdChecker(),
Expand Down