<a href="https://colab.research.google.com/github/FelipeTufaile/MixtureModels/blob/main/MixtureModels/tests/Tests.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [158]:
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.patches import Circle, Arc

In [159]:
# Generating a random Gaussian distribution
samples1 = np.random.normal(np.array([5.0, 5.0]), np.array([0.25, 0.25]), (100,2))
samples2 = np.random.normal(np.array([7.5, 7.5]), np.array([0.50, 0.50]), (100,2))
samples3 = np.random.normal(np.array([2.5, 2.5]), np.array([0.75, 0.75]), (100,2))

# Concatenate the arrays vertically
X = np.concatenate((samples1, samples2, samples3))

In [210]:
def init_mixture(self, X):
  """
  Initializes the mixture model with random points as initial means and uniform assingments

  Args:
      X: (n, d) array holding the data

  Returns:
      mixture: the initialized gaussian mixture
      post: (n, K) array holding the soft counts for all components for all examples
  """
  # Setting seed
  np.random.seed(self.random_state)

  # Calculating the number of samples "n" in the dataset (X array)
  n, d = X.shape

  # Initializing the weight (mixing proportions) for each component (cluster)
  self.p = np.ones(self.k) / self.k

  # Initialize the mean array with random points from X as initial means
  self.mu = X[np.random.choice(n, self.k, replace=False),:]

  # Initialize the variance array for each component (cluster)
  self.var = np.array([np.sum((X-self.mu[j,:])**2, axis=0) for j in range(self.k)])/(n-1)

  return self

In [161]:
def GaussianProbability(X, mu, var):
    """
    The function calculates the probability of X belonging to the Gaussian distribution using the multivariate Gaussian PDF formula.

    Args:
        X (numpy.ndarray): Point with shape (1, d).
        mu (numpy.ndarray): Mean of the Gaussian distribution with shape (1, d).
        var (numpy.ndarray): Variance of the Gaussian distribution with shape (1, d).

    Returns:
        float: Probability of X belonging to the Gaussian distribution.
    """
    # Ajusting the shape of X
    X = X.reshape(1,-1)

    # Ajusting the shape of mu
    mu = mu.reshape(1,-1)

    # Ajusting the shape of var
    var = var.reshape(1,-1)

    # Calculate the probability from the Gaussian Probability Density Function (PDF)
    probability = np.exp(-0.5 * np.sum((X - mu)**2 / var)) / ((2 * np.pi)**(X.shape[1] / 2) * np.prod(np.sqrt(var)))

    return probability


In [162]:
def estep(X, GMM, GaussianProbability=GaussianProbability):
    """
    E-step: Softly assigns each datapoint to a gaussian component

    Args:
        X: (n, d) array holding the data mixture: the current gaussian mixture

    Returns:
        np.ndarray: (n, K) array holding the soft counts for all components for all examples
        float: log-likelihood of the assignment
    """

    # Initializing the soft counts (posterior)
    post = []

    # Initializing the log likelihood parameter as zero
    l_theta = np.zeros((1,GMM.k))

    # Iterate over all datapoints
    for i in range(0, X.shape[0]):

        # Calculating the likelihood for the datapoint [i] belonging to each of the clusters [j]
        p_x_theta = np.array([GMM.p[j]*GaussianProbability(X=X[i,:], mu=GMM.mu[j,:], var=GMM.var[j,:]) for j in range(GMM.k)])

        # Calculating the likelihood for each cluster
        post.append(p_x_theta/np.sum(p_x_theta))

        # Updating the log-likelihood
        l_theta += np.log(p_x_theta)

    return np.array(post), l_theta

In [241]:
def mstep(X, post, GMM):
  """
  M-step: Updates the gaussian mixture by maximizing the log-likelihood of the weighted dataset

  Args:
      X: (n, d) array holding the data
      post: (n, K) array holding the soft counts for all components for all examples

  Returns:
      GaussianMixture: the new gaussian mixture
  """
  # Updating mixture proportions
  GMM.p = post/X.shape[0]

  # Updating the center of each distribution (cluster)
  GMM.mu = (X.T@post/np.sum(post, axis=0)).T

  # Updating the variance of each distribution (cluster)
  GMM.var = np.array([np.sum(post[:,0].reshape(-1,1)*(X-GMM.mu[0,:])**2, axis=0).reshape(1,-1) for j in range(GMM.k)]).reshape(GMM.k, X.shape[1])/(X.shape[1]*np.sum(post, axis=0)).reshape(GMM.k, -1)

  return GMM


In [245]:
class GaussianMixture():
  """
  Tuple holding a gaussian mixture

  mu: np.ndarray  # (K, d) array - each row corresponds to a gaussian component mean
  var: np.ndarray  # (K, ) array - each row corresponds to the variance of a component
  p: np.ndarray  # (K, ) array = each row corresponds to the weight of a component
  """
  """
    Initializes the gaussian mixture model.

    Args:
        K (int): number of components
        seed (int): random seed

    Returns:
        mixture: the initialized gaussian mixture model
    """

  def __init__(
      self,
      k,
      max_iter=10000,
      tol=10**(-5),
      n_init=1,
      verbose=False,
      random_state=0
    ):

    self.k = k
    self.max_iter = max_iter
    self.tol = tol
    self.n_init = n_init
    self.verbose = verbose
    self.random_state = random_state

  def fit(self, X, init_mixture=init_mixture, estep=estep, mstep=mstep):

    # Initialize mixture
    self = init_mixture(self, X)

    post, log_likelihood = estep(X=X, GMM=self)

    # Initialize l_theta
    l_log_likelihood = []

    # Initializing flag "run"
    run = True

    i = 1

    while (run):

      print(i)

      # Run E-Step
      post, log_likelihood = estep(X=X, GMM=self)

      # Run M-Step
      Self = mstep(X=X, post=post, GMM=self)

      # Update the list of log_likelihood
      l_log_likelihood.append(log_likelihood)

      # Check if tolerance was matched
      if(len(l_log_likelihood) > 1):
        if(np.abs(l_log_likelihood[-2] - l_log_likelihood[-1]) > np.abs(l_log_likelihood[-1])*self.tol):
          run = False
      else:
        if(i == 2):
          run = False

      i += 1

    return self


In [246]:
# Initializing a Gaussian Mixture Model
GMM = GaussianMixture(k=3)

In [247]:
# Fitting a sample distribution to the model
GMM, post, log_likelihood = GMM.fit(X=X)


1
2


ValueError: ignored

In [217]:
GMM.mu

array([[2.52775429, 3.07592681],
       [8.11243528, 7.53242805],
       [4.91150152, 4.65626218]])

In [218]:
GMM.var

array([[10.44924133,  8.07942352],
       [14.71454568, 11.34346813],
       [ 4.61409398,  4.67638934]])

In [239]:
j=0
t =

In [240]:
t.shape

(3, 2)

In [None]:
np.array([np.sum(post[:,j].reshape(X.shape[0],1)*(X-GMM.mu[j,:])**2, axis=0) for j in range(GMM.k)])

In [167]:
# Printing Gaussian Mixture Model parameters
for i, params in enumerate(zip(GMM.mu, GMM.var)):
  print(f"Center of distribution {str(i).zfill(3)} is {params[0]} and variance is {params[1]}")

Center of distribution 000 is [3.35320512 3.35605711] and variance is 3.221295426923884
Center of distribution 001 is [6.78950845 6.77112009] and variance is 2.3558326029018333
Center of distribution 002 is [4.94958664 4.95730559] and variance is 3.191269443706754


0

In [167]:
def run(X: np.ndarray, mixture: GaussianMixture,
        post: np.ndarray) -> Tuple[GaussianMixture, np.ndarray, float]:
    """Runs the mixture model

    Args:
        X: (n, d) array holding the data
        post: (n, K) array holding the soft counts
            for all components for all examples

    Returns:
        GaussianMixture: the new gaussian mixture
        np.ndarray: (n, K) array holding the soft counts
            for all components for all examples
        float: log-likelihood of the current assignment
    """

    prev_l_theta = None
    l_theta = None
    while (prev_l_theta is None or abs(prev_l_theta - l_theta) > abs(l_theta)*10**(-6)):
        prev_l_theta = l_theta
        post, l_theta = estep(X, mixture)
        mixture = mstep(X, post)

    return mixture, post, l_theta