In [4]:
import numpy as np

class MixtureModels:
    
    def __init__(self, k: int = 3, max_iter: int = 1000, tol: float = 1e-5, random_state: int = None) -> None:
        """
        Initializes a mixture model instance.

        Args:
            k: Number of clusters to form. (default=3)
            max_iter: Maximum number of iterations to perform. (default=1000)
            tol: Convergence threshold. EM iterations will stop when the change in log likelihood 
                 between two consecutive iterations is less than this value. (default=1e-5)
            random_state: Seed value to use for random number generation. If set to None, a random seed 
                will be used. (default='auto')
        """
        
        self.k = k
        self.max_iter = max_iter
        self.tol = tol
        self.random_state = random_state
    
    def _initialize(self, X:  np.ndarray) -> None:
        """
        Initialize the vectors used for mixture models
        
        Args:
            X: Vector of observations
        """
        
         # Get number of samples and features
        self.n_samples, self.n_features = X.shape
        
        # Initialize means, covariances, and weights arrays as vectors of zeros
        self.means = np.zeros((self.k, self.n_features))
        self.covs = np.zeros((self.k, self.n_features, self.n_features))
        self.weights = np.ones(self.k) / self.k
        
        # In case the random state is not given, set a seed
        if self.random_state is not None:
            np.random.seed(self.random_state)
            
        # Populate the means and covs vector randomly
        self.means = X[np.random.choice(range(self.n_samples), self.k, replace=False)]
        self.covs = [np.eye(self.n_features)] * self.k
    
    def _e_step(self, X:  np.ndarray) -> None:
        """
        Compute the E-step of the EM algorith. 
        Therefore, it calculates the responsabilities of each subpopulation 
        
        Args:
            X: Vector of observations
        """
        
        # Initialize the responsibilities for each sample and subpopulation
        self.responsibilities = np.zeros((self.n_samples, self.k))
        
        # Calculate the responsibilities
        for i in range(self.k):
            # Compute the PDF of each sample in subpopulation i and multiply by the weight i
            self.responsibilities[:, i] = self.weights[i] * self._gaussian_pdf(X, self.means[i], self.covs[i])
        
        # Remember, responsabilitites must sum 1. So, we need to normalize
        self.responsibilities /= self.responsibilities.sum(axis=1, keepdims=True)
    
    def _m_step(self, X:  np.ndarray) -> None:
        """
        Compute the M-step of the EM algorith. 
        Therefore, it updates the parameters of the subpopulation
        
        Args:
            X: Vector of observations
        """
        
        # Compute the sum of the responsibilities for each subpopulation
        N_k = self.responsibilities.sum(axis=0)
        
        # Update the means 
        self.means = np.dot(self.responsibilities.T, X) / N_k[:, np.newaxis]
        
        # Update the covariances
        for i in range(self.k):
            # Compute the difference between each sample and the mean of the current subpopulation
            diff = X - self.means[i]
            # Compute the covariance using the responsibilities and sample differences
            self.covs[i] = np.dot(self.responsibilities[:, i] * diff.T, diff) / N_k[i]
        
        # Update the weights
        self.weights = N_k / self.n_samples
    
    def _gaussian_pdf(self, X: np.ndarray, mean: np.ndarray, cov: np.ndarray) -> np.ndarray:
        """
        Computes the value of the Gaussian probability density function
        
        Args:
            X: Data of observations
            mean: Vector of values of mean
            cov: Vector of values of covariance
        Returns:
            Numpy array with the values of the Gaussian PDF
        """
        
        # Compute the determinant of the covariance matrix
        det = np.linalg.det(cov)
        
        # Compute the normalization constant of the Gaussian distribution
        norm_const = 1.0 / ((2 * np.pi) ** (self.n_features / 2) * np.sqrt(det))
        
        # Compute the inverse of the covariance matrix
        inv_cov = np.linalg.inv(cov)
        
        # Compute the difference between each data point and the mean of the current subpopulation
        diff = X - mean
        
        # Compute the exponent of the Gaussian distribution
        exponent = np.exp(-0.5 * np.sum(diff.dot(inv_cov) * diff, axis=1))
        
        # Compute the value of the Gaussian probability density function for each data point
        return norm_const * exponent
    
    def log_likelihood(self, X:  np.ndarray) -> None:
        """
        Compute th log likelihood of a given data
        
        Args:
            X: Vector of observations
        """
        
        # Init a matrix to store the likelihood of each sample for each component
        likelihoods = np.zeros((self.n_samples, self.k))
        
        # Calculate the likelihood 
        for i in range(self.k):
            # Multiply the weight of the component by the likelihood of the sample given the component
            likelihoods[:, i] = self.weights[i] * self._gaussian_pdf(X, self.means[i], self.covs[i])
            
        # Compute the log-likelihood of the data
        return np.log(likelihoods.sum(axis=1)).sum()
    
    def fit(self, X:  np.ndarray) -> None:
        """
        Fits the Mixutre Model on the given data. 

        Args:
            X: A matrix with the observations
        """
        
        # Initialize the model's parameters
        self._initialize(X)
        
        # Iterate for the number of iterations
        for i in range(self.max_iter):
            # Compute the likelihood of the data
            old_likelihood = self.log_likelihood(X)
            # E-step 
            self._e_step(X)
            # M-step
            self._m_step(X)
            # Compute the likelihood of the data using the updated model
            new_likelihood = self.log_likelihood(X)
            # Check if the likelihood has converged
            if np.abs(new_likelihood - old_likelihood) < self.tol:
                 # If it has converged, stop iterating
                break
                
    def predict(self, X:  np.ndarray) -> np.ndarray:
        """
        Computes the prediction of a given set
        
        Args:
            X: Data to make predictions from
        Returns:
            Numpy array with the label classes
        """
        
        # Calculate the responsibilities (E-step)
        self._e_step(X)
        # Assign each sample to the component with highest responsibility
        return np.argmax(self.responsibilities, axis=1)