In [None]:
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal

In [None]:
def multivariate_normal_pdf(x, mean, cov):
    """
    Compute the probability density function of a multivariate normal distribution.

    Parameters:
        x (ndarray): Data points of shape (n, d), where n is the number of points and d is the dimension.
        mean (ndarray): Mean vector of shape (d,).
        cov (ndarray): Covariance matrix of shape (d, d).

    Returns:
        ndarray: Probability density function evaluated at each data point.
    """
    d = mean.shape[0]  # Dimension of the space
    cov_det = np.linalg.det(cov)  # Determinant of the covariance matrix
    cov_inv = np.linalg.inv(cov)  # Inverse of the covariance matrix
    norm_const = 1.0 / ((2 * np.pi) ** (d / 2) * cov_det ** 0.5)  # Normalization constant

    diff = x - mean  # Difference between x and the mean
    exponent = -0.5 * np.sum(diff @ cov_inv * diff, axis=1)  # Exponent calculation

    return norm_const * np.exp(exponent)  # Final PDF

In [None]:
# Class GMM
class GMM:
  """constructor method with parameters of k (numbers of gaussians),
  data_dim is the number of dimensiones (2 if x,y),
  max_iter is the max number of iterations."""
  def __init__(self,k,data_dim,max_iter,tol):
    self.k = k
    self.data_dim = data_dim
    self.max_iter = max_iter
    self.tol = tol

  def init_parameters(self,data):
    n = data.shape [0] #all data retrieving from the rows
    self.mu = np.random.rand(self.k, self.data_dim) * np.max(data, axis=0) #init random mu
    self.cov = [np.eye(self.data_dim) for _ in range(self.k)] #init cov matrix
    self.w = np.ones(self.k)/self.k #sames


  def e_step(self,data):
    """computes the conditionals probabilities that the x_i value belongs
    to the cluster k given the actual models parameters > responsabilitites"""
    n = data.shape[0]
    responsab = np.zeros((n, self.k)) #responsab matrix initilization

    for i in range(self.k):
      responsab[:, i] = self.w[i] * multivariate_normal_pdf(data, mean=self.mu[i], cov=self.cov[i])

    responsab /= np.sum(responsab, axis=1, keepdims=True)  # Normalizat to sum 1
    return responsab


  def m_step(self, data, responsab):
    """using the data and the responsab computed in the e-step
    updates the model parameters theta"""
    sum_cluster = np.sum(responsab, axis=0) #suming responsab per cluster


    self.w = sum_cluster / np.sum(sum_cluster) #new weights

    self.mu = np.dot(responsab.T, data) / sum_cluster[:, np.newaxis] #new mu

    self.cov = []
    for j in range(self.k): #for each gaussiana
      difference = data - self.mu[j] #computation of the "difference" wich is the data - mean
      cov_matrix = np.dot(responsab[:, j] * difference.T, difference) / sum_cluster[j]
      #responsability shape (N,K), difference shape (N,K)
      self.cov.append(cov_matrix)

  def log_likelihood(self, data):
        """computation of total log likelihood"""
        log_likelihood = 0
        for j in range(self.k):
            log_likelihood += np.sum(self.w[j] * multivariate_normal_pdf(data, mean=self.mu[j], cov=self.cov[j]))
        return np.log(log_likelihood)


  def fit(self, data):
        """applying EM"""
        self.init_parameters(data)
        log_likelihoods = []

        for iteration in range(self.max_iter):
            r = self.e_step(data)  # E-step
            self.m_step(data, r)  # M-step
            log_likelihood = self.log_likelihood(data)
            log_likelihoods.append(log_likelihood)

            if iteration > 0 and abs(log_likelihoods[-1] - log_likelihoods[-2]) < self.tol:
                print(f"convergency {iteration}")
                break

        return log_likelihoods

  def predict(self, data):
      """assigning points to the closest clusters"""
      responsab = self.e_step(data)
      return np.argmax(responsab, axis=1)

  def get_params(self):
      """returning theta"""
      return self.w, self.mu, self.cov



In [None]:
#2dim data
data_2d = pd.read_csv('2gaussian.txt', header=None, delim_whitespace=True).values

# instantiation of the model
gmm_2d = GMM(k=2, data_dim=2, max_iter=100, tol=1e-4)
log_likelihoods_2d = gmm_2d.fit(data_2d)

w_2d, mu_2d, cov_2d = gmm_2d.get_params()

print("Weights:", w_2d)
print("Mean:", mu_2d)
print("Cov:", cov_2d)


convergency 14
Weights: [0.66466015 0.33533985]
Mean: [[7.01481251 3.98408503]
 [2.99735405 3.05172257]]
Cov: [array([[0.97187054, 0.49583164],
       [0.49583164, 0.99975505]]), array([[1.01557908, 0.0266261 ],
       [0.0266261 , 2.93532613]])]


  data_2d = pd.read_csv('2gaussian.txt', header=None, delim_whitespace=True).values


In [None]:
#3dim data
dim3_data = pd.read_csv('3gaussian.txt', header=None, delim_whitespace=True).values

# instantiation of the model
gmm_3d = GMM(k=3, data_dim=2, max_iter=100, tol=1e-4)
log_likelihoods_3d = gmm_3d.fit(dim3_data)

w_3d, mu_3d, cov_3d = gmm_3d.get_params()

print("Weights:", w_3d)
print("Mean:", mu_3d)
print("Cov:", cov_3d)

  dim3_data = pd.read_csv('3gaussian.txt', header=None, delim_whitespace=True).values


convergency 78
Weights: [0.29548617 0.22098842 0.48352542]
Mean: [[7.04168934 4.0254312 ]
 [3.12078453 3.2167729 ]
 [5.03740251 7.02608048]]
Cov: [array([[0.9577765 , 0.48713354],
       [0.48713354, 0.98980409]]), array([[1.10953378, 0.1651689 ],
       [0.1651689 , 3.71079462]]), array([[0.95481118, 0.16824403],
       [0.16824403, 0.9403016 ]])]
