In [1]:
import numpy as np
from typing import List, Tuple
import matplotlib.pyplot as plt
# ============================================================
# Data Generation (This does the same thing as the genimages.py provided)
# ============================================================
np.random.seed(0)

features = [
    [0, 0, 1, 0,
     0, 1, 1, 1,
     0, 0, 1, 0,
     0, 0, 0, 0],
    [0, 1, 0, 0,
     0, 1, 0, 0,
     0, 1, 0, 0,
     0, 1, 0, 0],
    [1, 1, 1, 1,
     0, 0, 0, 0,
     0, 0, 0, 0,
     0, 0, 0, 0],
    [1, 0, 0, 0,
     0, 1, 0, 0,
     0, 0, 1, 0,
     0, 0, 0, 1],
    [0, 0, 0, 0,
     0, 0, 0, 0,
     1, 1, 0, 0,
     1, 1, 0, 0],
    [1, 1, 1, 1,
     1, 0, 0, 1,
     1, 0, 0, 1,
     1, 1, 1, 1],
    [0, 0, 0, 0,
     0, 1, 1, 0,
     0, 1, 1, 0,
     0, 0, 0, 0],
    [0, 0, 0, 1,
     0, 0, 0, 1,
     0, 0, 0, 1,
     0, 0, 0, 1],
]

num_samples = 2000
num_features = 16
K_true = len(features)

feature_weights = 0.5 + np.random.rand(K_true, 1) * 0.5
mu_true = np.array([weight * feat for weight, feat in zip(feature_weights, features)])
latent_factors = (np.random.rand(num_samples, K_true) < 0.3).astype(float)
data = latent_factors @ mu_true + np.random.randn(num_samples, num_features)*0.1


In [2]:
class GaussianPrior:
    def __init__(self, a: float, b: float, d: int, k: int):
        """
        Initialize the Gaussian prior for the mu matrix.

        Parameters
        ----------
        a : float
            Alpha parameter of the Gamma prior.
        b : float
            Beta parameter of the Gamma prior.
        d : int
            Number of dimensions in the observed data.
        k : int
            Number of latent variables.
        """
        self.a = a
        self.b = b
        self.mu = np.zeros((d, k))
        self.alpha = np.ones(k)
        self.w_covariance = np.zeros((k, k))

    def mu_k(self, latent_factor: int) -> np.ndarray:
        """
        Retrieve the column vector of the mu matrix corresponding to a specific latent factor.

        Parameters
        ----------
        latent_factor : int
            Index of the latent factor.

        Returns
        -------
        np.ndarray
            Column vector of mu, shape (d, 1).
        """
        return self.mu[:, latent_factor:latent_factor + 1]

    def w_d(self, dimension: int) -> np.ndarray:
        """
        Retrieve the row vector of the mu matrix corresponding to a specific data dimension.

        Parameters
        ----------
        dimension : int
            Index of the data dimension.

        Returns
        -------
        np.ndarray
            Row vector of mu, shape (1, k).
        """
        return self.mu[dimension:dimension + 1, :]

    @property
    def a_matrix(self) -> np.ndarray:
        """
        Compute the precision matrix for the weight vector w_d.

        Returns
        -------
        np.ndarray
            Precision matrix, shape (k, k).
        """
        return np.diag(self.alpha)

In [3]:
class VariationalBayes:
    def __init__(self, mu: 'GaussianPrior', variance: float, pi: np.ndarray):
        """
        Variational Bayes implementation with a Gaussian prior on mu.

        Parameters
        ----------
        mu : GaussianPrior
            Gaussian prior on latent features.
        variance : float
            Gaussian noise parameter.
        pi : np.ndarray
            Vector of prior probabilities, shape (1, number_of_latent_variables).
        """
        super().__init__()
        self.gaussian_prior = mu
        self._variance = variance
        self._pi = pi

    @property
    def log_pi(self) -> np.ndarray:
        """Compute the natural logarithm of the prior probabilities."""
        return np.log(self.pi)

    @property
    def log_one_minus_pi(self) -> np.ndarray:
        """Compute the natural logarithm of the complement of the prior probabilities."""
        return np.log(1 - self.pi)

    @property
    def variance(self) -> float:
        """Get the variance of the Gaussian noise."""
        return self._variance

    @property
    def pi(self) -> np.ndarray:
        """Get the vector of prior probabilities."""
        return self._pi

    @property
    def mu(self) -> np.ndarray:
        """Get the mean matrix of the Gaussian prior."""
        return self.gaussian_prior.mu

    @property
    def d(self) -> int:
        """Get the dimensionality of the observed data."""
        return self.mu.shape[0]

    @property
    def k(self) -> int:
        """Get the number of latent variables."""
        return self.mu.shape[1]

    @property
    def precision(self) -> float:
        """Get the precision (inverse of variance) of the Gaussian noise."""
        return 1.0 / self.variance

    def mu_exclude(self, exclude_latent_index: int) -> np.ndarray:
        """
        Exclude a specific latent variable index from the mean matrix.

        Parameters
        ----------
        exclude_latent_index : int
            Index of the latent variable to exclude.

        Returns
        -------
        np.ndarray
            Mean matrix excluding the specified latent variable,
            shape (number_of_dimensions, number_of_latent_variables - 1).
        """
        return np.concatenate(
            (
                self.mu[:, :exclude_latent_index],
                self.mu[:, exclude_latent_index + 1:]
            ),
            axis=1,
        )

    @staticmethod
    def calculate_maximisation_parameters(
        x: np.ndarray,
        binary_latent_factor_approximation: 'MeanFieldApproximation'
    ) -> Tuple[np.ndarray, float, np.ndarray]:
        """
        Calculate M-step parameters for the variational approximation.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (number_of_points, number_of_dimensions).
        binary_latent_factor_approximation : MeanFieldApproximation
            Mean field approximation object.

        Returns
        -------
        Tuple[np.ndarray, float, np.ndarray]
            Updated parameters: (means, variance, prior probabilities).
        """
        return m_step(
            X=x,
            ES=binary_latent_factor_approximation.expectation_s,
            ESS=binary_latent_factor_approximation.expectation_ss,
        )

    def _update_w_covariance(
        self,
        binary_latent_factor_approximation: 'MeanFieldApproximation',
    ) -> None:
        """
        Update the covariance matrix for the latent feature weights.

        Parameters
        ----------
        binary_latent_factor_approximation : MeanFieldApproximation
            Mean field approximation object.
        """
        a_matrix = self.gaussian_prior.a_matrix
        expectation_ss = binary_latent_factor_approximation.expectation_ss
        covariance_matrix = a_matrix + self.precision * expectation_ss
        self.gaussian_prior.w_covariance = np.linalg.inv(covariance_matrix)

    def _update_w_mean(
        self,
        x: np.ndarray,
        binary_latent_factor_approximation: 'MeanFieldApproximation',
        dimension_index: int,
    ) -> None:
        """
        Update the mean vector for the latent feature weights for a specific dimension.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (number_of_points, number_of_dimensions).
        binary_latent_factor_approximation : MeanFieldApproximation
            Mean field approximation object.
        dimension_index : int
            Index of the data dimension to update.
        """
        expectation_s = binary_latent_factor_approximation.expectation_s
        data_column = x[:, dimension_index : dimension_index + 1]
        updated_mean = (
            self.gaussian_prior.w_covariance
            @ (self.precision * expectation_s.T @ data_column)
        ).T
        self.gaussian_prior.mu[dimension_index : dimension_index + 1, :] = updated_mean

    def _hyper_maximisation_step(self) -> None:
        """
        Hyperparameter maximisation step to update alpha, which parameterizes
        the covariance matrix of the Gaussian prior on mu.
        """
        for latent_idx in range(self.k):
            mu_k = self.gaussian_prior.mu_k(latent_idx)
            w_cov_kk = self.gaussian_prior.w_covariance[latent_idx, latent_idx]
            numerator = 2 * self.gaussian_prior.a + self.d - 2
            denominator = (
                2 * self.gaussian_prior.b
                + np.sum(mu_k ** 2)
                + self.d * w_cov_kk
            )
            self.gaussian_prior.alpha[latent_idx] = numerator / denominator

    def maximisation_step(
        self,
        x: np.ndarray,
        binary_latent_factor_approximation: 'MeanFieldApproximation',
    ) -> None:
        """
        Perform the maximisation step, which includes the standard M-step,
        updates to the posterior distribution of mu, and a hyperparameter step.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (number_of_points, number_of_dimensions).
        binary_latent_factor_approximation : MeanFieldApproximation
            Mean field approximation object.
        """
        _, sigma, pi = self.calculate_maximisation_parameters(
            x, binary_latent_factor_approximation
        )
        self._variance = sigma ** 2
        self._pi = pi

        self._update_w_covariance(binary_latent_factor_approximation)

        for dim_idx in range(self.d):
            self._update_w_mean(x, binary_latent_factor_approximation, dim_idx)

        self._hyper_maximisation_step()

In [4]:
def m_step(X, ES, ESS):
    """
    mu, sigma, pie = MStep(X,ES,ESS)

    Inputs:
    -----------------
           X: shape (N, D) data matrix
          ES: shape (N, K) E_q[s]
         ESS: shape (K, K) sum over data points of E_q[ss'] (N, K, K)
                           if E_q[ss'] is provided, the sum over N is done for you.

    Outputs:
    --------
          mu: shape (D, K) matrix of means in p(y|{s_i},mu,sigma)
       sigma: shape (,)    standard deviation in same
         pie: shape (1, K) vector of parameters specifying generative distribution for s
    """
    N, D = X.shape
    if ES.shape[0] != N:
        raise TypeError('ES must have the same number of rows as X')
    K = ES.shape[1]
    if ESS.shape == (N, K, K):
        ESS = np.sum(ESS, axis=0)
    if ESS.shape != (K, K):
        raise TypeError('ESS must be square and have the same number of columns as ES')

    mu = np.dot(np.dot(np.linalg.inv(ESS), ES.T), X).T
    sigma = np.sqrt((np.trace(np.dot(X.T, X)) + np.trace(np.dot(np.dot(mu.T, mu), ESS))
                     - 2 * np.trace(np.dot(np.dot(ES.T, X), mu))) / (N * D))
    pie = np.mean(ES, axis=0, keepdims=True)

    return mu, sigma, pie


In [5]:
class MeanFieldApproximation:
    def __init__(
        self,
        lambda_matrix: np.ndarray,
        max_steps: int,
        convergence_criterion: float,
    ):
        """
        Initialize the Mean Field Approximation.

        Parameters
        ----------
        lambda_matrix : np.ndarray
            Parameters of the variational approximation, shape (n, k),
            where n is the number of data points and k is the number of latent variables.
        max_steps : int
            Maximum number of iterations for the variational expectation step.
        convergence_criterion : float
            Threshold for convergence based on the change in free energy.
        """
        self._lambda_matrix = lambda_matrix
        self.max_steps = max_steps
        self.convergence_criterion = convergence_criterion

    @property
    def lambda_matrix(self) -> np.ndarray:
        """
        Get the lambda matrix of the variational approximation.

        Returns
        -------
        np.ndarray
            The lambda matrix, shape (n, k).
        """
        return self._lambda_matrix

    @lambda_matrix.setter
    def lambda_matrix(self, value: np.ndarray) -> None:
        """
        Set the lambda matrix of the variational approximation.

        Parameters
        ----------
        value : np.ndarray
            New lambda matrix, shape (n, k).
        """
        self._lambda_matrix = value

    def lambda_matrix_exclude(self, exclude_latent_index: int) -> np.ndarray:
        """
        Exclude a specific latent variable index from the lambda matrix.

        Parameters
        ----------
        exclude_latent_index : int
            Index of the latent variable to exclude.

        Returns
        -------
        np.ndarray
            Lambda matrix excluding the specified latent variable,
            shape (n, k - 1).
        """
        return np.concatenate(
            (
                self.lambda_matrix[:, :exclude_latent_index],
                self.lambda_matrix[:, exclude_latent_index + 1 :],
            ),
            axis=1,
        )

    @property
    def expectation_s(self) -> np.ndarray:
        """
        Get the expectation of the latent variables (E[s]).

        Returns
        -------
        np.ndarray
            Expectation matrix, shape (n, k).
        """
        return self.lambda_matrix

    @property
    def expectation_ss(self) -> np.ndarray:
        """
        Get the expectation of the product of latent variables (E[s s^T]).

        Returns
        -------
        np.ndarray
            Expectation matrix, shape (k, k).
        """
        ess = self.lambda_matrix.T @ self.lambda_matrix
        np.fill_diagonal(ess, self.lambda_matrix.sum(axis=0))
        return ess

    @property
    def log_lambda_matrix(self) -> np.ndarray:
        """
        Compute the natural logarithm of the lambda matrix.

        Returns
        -------
        np.ndarray
            Log-transformed lambda matrix, shape (n, k).
        """
        return np.log(self.lambda_matrix)

    @property
    def log_one_minus_lambda_matrix(self) -> np.ndarray:
        """
        Compute the natural logarithm of (1 - lambda matrix).

        Returns
        -------
        np.ndarray
            Log-transformed complement of lambda matrix, shape (n, k).
        """
        return np.log(1 - self.lambda_matrix)

    @property
    def n(self) -> int:
        """
        Get the number of data points.

        Returns
        -------
        int
            Number of data points (n).
        """
        return self.lambda_matrix.shape[0]

    @property
    def k(self) -> int:
        """
        Get the number of latent variables.

        Returns
        -------
        int
            Number of latent variables (k).
        """
        return self.lambda_matrix.shape[1]

    def compute_free_energy(
        self,
        x: np.ndarray,
        binary_latent_factor_model: "VariationalBayes",
    ) -> float:
        """
        Compute the free energy associated with the current EM parameters and data x.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (n, d), where n is the number of data points and d is the number of dimensions.
        binary_latent_factor_model : VariationalBayes
            A binary latent factor model.

        Returns
        -------
        float
            Average free energy per data point.
        """
        expectation_log_p_x_s_given_theta = self._compute_expectation_log_p_x_s_given_theta(
            x, binary_latent_factor_model
        )
        approximation_model_entropy = self._compute_approximation_model_entropy()
        return (
            expectation_log_p_x_s_given_theta + approximation_model_entropy
        ) / self.n

    def _partial_expectation_step(
        self,
        x: np.ndarray,
        binary_latent_factor_model: "VariationalBayes",
        latent_factor: int,
    ) -> np.ndarray:
        """
        Perform a partial variational E-step for a specific latent factor across all data points.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (n, d).
        binary_latent_factor_model : VariationalBayes
            A binary latent factor model.
        latent_factor : int
            Index of the latent factor to update.

        Returns
        -------
        np.ndarray
            Updated lambda vector for the specified latent factor, shape (n, 1).
        """
        lambda_matrix_excluded = self.lambda_matrix_exclude(latent_factor)
        mu_excluded = binary_latent_factor_model.mu_exclude(latent_factor)

        mu_latent = binary_latent_factor_model.mu[:, latent_factor]

        # Compute the proportion of the expectation log p(x, s | theta)
        partial_log_prob = (
            binary_latent_factor_model.precision
            * (
                x
                - 0.5 * mu_latent.T
                - lambda_matrix_excluded @ mu_excluded.T
            )
            @ mu_latent
        )

        # Compute the log-odds for the latent factor
        log_odds = np.log(
            binary_latent_factor_model.pi[0, latent_factor]
            / (1 - binary_latent_factor_model.pi[0, latent_factor])
        )

        # Total partial expectation
        total_partial_log_prob = partial_log_prob + log_odds

        # Update lambda using the sigmoid function
        lambda_vector = 1 / (
            1 + np.exp(-total_partial_log_prob)
        )
        lambda_vector = np.clip(lambda_vector, 1e-10, 1 - 1e-10)
        return lambda_vector

    def _compute_expectation_log_p_x_s_given_theta(
        self,
        x: np.ndarray,
        binary_latent_factor_model: "VariationalBayes",
    ) -> float:
        """
        Compute the expectation of log P(X, S | theta).

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (n, d).
        binary_latent_factor_model : VariationalBayes
            A binary latent factor model.

        Returns
        -------
        float
            Expectation of log P(X, S | theta).
        """
        mu_lambda = self.lambda_matrix @ binary_latent_factor_model.mu.T

        # Compute E[s_i s_j] * mu_i * mu_j
        expectation_s_i_s_j_mu_i_mu_j = np.multiply(
            self.lambda_matrix.T @ self.lambda_matrix,
            binary_latent_factor_model.mu.T @ binary_latent_factor_model.mu,
        )

        # Calculate the expectation log P(X | S, theta)
        expectation_log_p_x_given_s_theta = (
            - (self.n * binary_latent_factor_model.d / 2)
            * np.log(2 * np.pi * binary_latent_factor_model.variance)
            - 0.5 * binary_latent_factor_model.precision
            * (
                np.sum(x ** 2)
                - 2 * np.sum(x * mu_lambda)
                + np.sum(expectation_s_i_s_j_mu_i_mu_j)
                - np.trace(expectation_s_i_s_j_mu_i_mu_j)
                + np.sum(
                    self.lambda_matrix
                    @ np.multiply(
                        binary_latent_factor_model.mu,
                        binary_latent_factor_model.mu,
                    ).T
                )
            )
        )

        # Calculate the expectation log P(S | theta)
        expectation_log_p_s_given_theta = np.sum(
            self.lambda_matrix * binary_latent_factor_model.log_pi
            + (1 - self.lambda_matrix) * binary_latent_factor_model.log_one_minus_pi
        )

        return expectation_log_p_x_given_s_theta + expectation_log_p_s_given_theta

    def _compute_approximation_model_entropy(self) -> float:
        """
        Compute the entropy of the variational approximation model.

        Returns
        -------
        float
            Model entropy.
        """
        entropy = -np.sum(
            self.lambda_matrix * self.log_lambda_matrix
            + (1 - self.lambda_matrix) * self.log_one_minus_lambda_matrix
        )
        return entropy

    def variational_expectation_step(
        self,
        x: np.ndarray,
        binary_latent_factor_model: "VariationalBayes",
    ) -> List[float]:
        """
        Perform the variational expectation step.

        Parameters
        ----------
        x : np.ndarray
            Data matrix, shape (n, d).
        binary_latent_factor_model : VariationalBayes
            A binary latent factor model.

        Returns
        -------
        List[float]
            List of free energy values at each step.
        """
        free_energy = [self.compute_free_energy(x, binary_latent_factor_model)]
        for step in range(self.max_steps):
            for latent_factor in range(binary_latent_factor_model.k):
                updated_lambda = self._partial_expectation_step(
                    x, binary_latent_factor_model, latent_factor
                )
                self.lambda_matrix[:, latent_factor] = updated_lambda
                free_energy.append(
                    self.compute_free_energy(x, binary_latent_factor_model)
                )
                energy_change = free_energy[-1] - free_energy[-2]
                if energy_change <= self.convergence_criterion:
                    break
            if free_energy[-1] - free_energy[-2] <= self.convergence_criterion:
                break
        return free_energy


def init_mean_field_approximation(
    k: int, n: int, max_steps: int, convergence_criterion: float
) -> MeanFieldApproximation:
    """
    Initialize a MeanFieldApproximation instance with random lambda matrix.

    Parameters
    ----------
    k : int
        Number of latent variables.
    n : int
        Number of data points.
    max_steps : int
        Maximum number of iterations for the variational expectation step.
    convergence_criterion : float
        Threshold for convergence based on the change in free energy.

    Returns
    -------
    MeanFieldApproximation
        Initialized MeanFieldApproximation instance.
    """
    lambda_matrix = np.random.rand(n, k)
    return MeanFieldApproximation(
        lambda_matrix=lambda_matrix,
        max_steps=max_steps,
        convergence_criterion=convergence_criterion,
    )

In [6]:
def automatic_relevance_determination(
    x: np.ndarray,
    binary_latent_factor_approximation: 'MeanFieldApproximation',
    a_parameter: float,
    b_parameter: float,
    k: int,
    em_iterations: int,
) -> Tuple['VariationalBayes', List[float]]:
    """
    Perform Automatic Relevance Determination (ARD) using Variational Bayes.

    This function initializes the Variational Bayes model with a Gaussian prior,
    performs the Expectation-Maximization (EM) algorithm to optimize the model parameters,
    and returns the optimized model along with the history of free energy values.

    Parameters
    ----------
    x : np.ndarray
        Data matrix of shape (n_samples, n_dimensions), where n_samples is the number of data points
        and n_dimensions is the number of observed dimensions.
    binary_latent_factor_approximation : MeanFieldApproximation
        An instance of MeanFieldApproximation representing the current variational approximation
        of the binary latent factors.
    a_parameter : float
        Alpha parameter for the Gamma prior on the precision (inverse variance) of the Gaussian prior.
    b_parameter : float
        Beta parameter for the Gamma prior on the precision (inverse variance) of the Gaussian prior.
    k : int
        Number of latent variables in the model.
    em_iterations : int
        Maximum number of iterations to run the EM algorithm.

    Returns
    -------
    Tuple[VariationalBayes, List[float]]
        A tuple containing:
        - The optimized VariationalBayes model.
        - A list of free energy values recorded at each EM step, representing the convergence progress.
    """
    # Calculate initial maximization parameters (means, variance, prior probabilities)
    _, sigma, pi = VariationalBayes.calculate_maximisation_parameters(
        x, binary_latent_factor_approximation
    )

    # Initialize Gaussian prior with provided hyperparameters
    gaussian_prior = GaussianPrior(
        a=a_parameter,
        b=b_parameter,
        d=x.shape[1],
        k=k,
    )

    # Initialize the Variational Bayes model with the Gaussian prior, variance, and prior probabilities
    variational_bayes_model = VariationalBayes(
        mu=gaussian_prior,
        variance=sigma**2,
        pi=pi,
    )

    # Learn binary factors using the Variational Bayes model and update the free energy
    _, variational_bayes_model, free_energy = learn_binary_factors(
        x=x,
        k=k,
        em_iterations=em_iterations,
        binary_latent_factor_model=variational_bayes_model,
        binary_latent_factor_approximation=binary_latent_factor_approximation,
    )

    return variational_bayes_model, free_energy

In [7]:
def is_converge(
    free_energies: List[float],
    current_lambda_matrix: np.ndarray,
    previous_lambda_matrix: np.ndarray,
    free_energy_threshold: float = 1e-6,
    lambda_threshold: float = 1e-6,
) -> bool:
    """
    Determine whether the algorithm has converged based on changes in free energy
    and the lambda matrix.

    Convergence is achieved if the change in free energy between the last two iterations
    is below a specified threshold and the change in the lambda matrix (measured by
    the Frobenius norm) is also below a specified threshold.

    Parameters
    ----------
    free_energies : List[float]
        List of free energy values recorded at each iteration.
    current_lambda_matrix : np.ndarray
        The current lambda matrix after the latest iteration.
    previous_lambda_matrix : np.ndarray
        The lambda matrix from the previous iteration.
    free_energy_threshold : float, optional
        Threshold for the change in free energy to determine convergence, by default 1e-6.
    lambda_threshold : float, optional
        Threshold for the change in the lambda matrix (Frobenius norm) to determine convergence,
        by default 1e-6.

    Returns
    -------
    bool
        True if both the change in free energy and the change in lambda matrix are below
        their respective thresholds, indicating convergence. Otherwise, False.
    """
    if len(free_energies) < 2:
        # Not enough data to determine convergence
        return False

    # Calculate the absolute change in free energy
    free_energy_change = abs(free_energies[-1] - free_energies[-2])

    # Calculate the Frobenius norm of the change in lambda matrix
    lambda_change = np.linalg.norm(current_lambda_matrix - previous_lambda_matrix)

    # Check if both changes are below their respective thresholds
    return (free_energy_change <= free_energy_threshold) and (lambda_change <= lambda_threshold)


def learn_binary_factors(
    x: np.ndarray,
    k: int,
    em_iterations: int,
    binary_latent_factor_model: 'VariationalBayes',
    binary_latent_factor_approximation: 'MeanFieldApproximation',
) -> Tuple['MeanFieldApproximation', 'VariationalBayes', List[float]]:
    """
    Perform the Expectation-Maximization (EM) algorithm to learn binary latent factors.

    This function iteratively performs the E-step and M-step to optimize the
    variational approximation of binary latent factors and update the
    variational Bayes model. It records the free energy at each iteration to
    monitor convergence.

    Parameters
    ----------
    x : np.ndarray
        Data matrix of shape (n_samples, n_dimensions), where n_samples is the
        number of data points and n_dimensions is the number of observed dimensions.
    em_iterations : int
        Maximum number of EM iterations to perform.
    binary_latent_factor_model : VariationalBayes
        An instance of VariationalBayes representing the current model.
    binary_latent_factor_approximation : MeanFieldApproximation
        An instance of MeanFieldApproximation representing the current variational
        approximation of the binary latent factors.

    Returns
    -------
    Tuple[MeanFieldApproximation, VariationalBayes, List[float]]
        A tuple containing:
        - The updated MeanFieldApproximation instance.
        - The updated VariationalBayes model.
        - A list of free energy values recorded at each EM iteration.
    """
    # Initialize the list of free energies with the initial free energy
    free_energies: List[float] = [
        binary_latent_factor_approximation.compute_free_energy(
            x, binary_latent_factor_model
        )
    ]

    for iteration in range(1, em_iterations + 1):
        # Store the previous lambda matrix for convergence checking
        previous_lambda_matrix = np.copy(binary_latent_factor_approximation.lambda_matrix)

        # E-step: Update the variational approximation (lambda matrix)
        free_energy_history = binary_latent_factor_approximation.variational_expectation_step(
            x=x,
            binary_latent_factor_model=binary_latent_factor_model,
        )

        # M-step: Update the variational Bayes model parameters
        binary_latent_factor_model.maximisation_step(
            x=x,
            binary_latent_factor_approximation=binary_latent_factor_approximation,
        )

        # Compute and record the new free energy
        current_free_energy = binary_latent_factor_approximation.compute_free_energy(
            x, binary_latent_factor_model
        )
        free_energies.append(current_free_energy)

        # Check for convergence
        if is_converge(
            free_energies=free_energies,
            current_lambda_matrix=binary_latent_factor_approximation.lambda_matrix,
            previous_lambda_matrix=previous_lambda_matrix,
        ):
            print(f"current K = {k},"
                  f" Convergence achieved at iteration {iteration},"
                  f" Free Energy at Convergence: {current_free_energy}.")
            break


    return binary_latent_factor_approximation, binary_latent_factor_model, free_energies

In [8]:
import os
import matplotlib.pyplot as plt
from matplotlib.axes import Axes
from matplotlib.offsetbox import AnnotationBbox, OffsetImage


def offset_image(coord: int, image_path: str, ax: Axes) -> None:
    """
    Add an image to a specific coordinate on a Matplotlib axis.

    Parameters
    ----------
    coord : int
        The x-coordinate on the axis where the image will be placed.
    image_path : str
        The file path to the image to be added.
    ax : Axes
        The Matplotlib axis to which the image will be added.
    """
    img = plt.imread(image_path)
    offset_img = OffsetImage(img, zoom=0.72)
    offset_img.image.axes = ax

    annotation = AnnotationBbox(
        offset_img,
        (coord, 0),
        xybox=(0.0, -19.0),
        frameon=False,
        xycoords="data",
        boxcoords="offset points",
        pad=0,
    )
    ax.add_artist(annotation)


def plot_factors(
    variational_bayes_models: List['VariationalBayes'],
    ks: List[int],
    max_k: int,
    save_path: str,
) -> None:
    """
    Plot the latent factors and their corresponding inverse alpha values.

    This function saves images of each latent factor and creates a bar plot
    showing the inverse alpha values for each model. The first eight bars in
    each bar plot are colored blue, and the remaining bars are grey.

    Parameters
    ----------
    variational_bayes_models : List[VariationalBayes]
        A list of VariationalBayes models corresponding to different numbers of latent factors.
    ks : List[int]
        A list of integers representing the number of latent factors for each model.
    max_k : int
        The maximum number of latent factors across all models.
    save_path : str
        The directory path where the plots and images will be saved.
    """
    # Ensure the save directory exists
    os.makedirs(save_path, exist_ok=True)

    # Store each feature as an image for later use
    for model_idx, k in enumerate(ks):
        sorted_indices = np.argsort(
            variational_bayes_models[model_idx].gaussian_prior.alpha
        )
        for factor_idx, alpha_idx in enumerate(sorted_indices):
            fig = plt.figure(figsize=(0.3, 0.3))
            ax = plt.Axes(fig, [0.0, 0.0, 1.0, 1.0])
            ax.set_axis_off()
            fig.add_axes(ax)
            latent_factor_image = variational_bayes_models[model_idx].mu[:, alpha_idx].reshape(4, 4)
            ax.imshow(latent_factor_image, cmap='gray')
            image_filename = f"-latent-factor-{model_idx}-{factor_idx}.png"
            fig.savefig(os.path.join(save_path, image_filename), bbox_inches="tight")
            plt.close()

    # Create a bar plot of inverse alphas
    fig, axes = plt.subplots(len(ks), 1, figsize=(15, 2 * len(ks)))
    plt.subplots_adjust(hspace=1)

    for model_idx, k in enumerate(ks):
        sorted_indices = np.argsort(
            variational_bayes_models[model_idx].gaussian_prior.alpha
        )
        inverse_alphas = (
            1.0 / variational_bayes_models[model_idx].gaussian_prior.alpha[sorted_indices]
        )
        # Pad with zeros if necessary to match max_k
        inverse_alphas_padded = list(inverse_alphas) + [0] * (max_k - k)
        axes[model_idx].set_title(f"Number of Latent Factors k = {k}")
        # Define colors: first eight bars blue, others grey
        colors = ['blue' if idx < 8 else 'grey' for idx in range(max_k)]
        axes[model_idx].bar(range(max_k), inverse_alphas_padded, color=colors)
        axes[model_idx].set_xticks([])
        axes[model_idx].set_ylabel("Inverse α")

    # Add feature image ticks
    for model_idx, k in enumerate(ks):
        sorted_indices = np.argsort(
            variational_bayes_models[model_idx].gaussian_prior.alpha
        )
        for factor_idx in range(len(sorted_indices)):
            image_path = os.path.join(save_path, f"-latent-factor-{model_idx}-{factor_idx}.png")
            offset_image(factor_idx, image_path, axes[model_idx])
            os.remove(image_path)

    fig.savefig(save_path + f"-latent-factors-comparison", bbox_inches="tight")
    plt.close()

def plot_free_energy(
    ks: List[int],
    free_energies: List[List[float]],
    model_name: str,
    save_path: str,
) -> None:
    """
    Plot the free energy over EM iterations for different models.

    Parameters
    ----------
    ks : List[int]
        List of integers representing different numbers of latent factors.
    free_energies : List[List[float]]
        List containing lists of free energy values for each model.
    model_name : str
        Name of the model for the plot title.
    save_path : str
        Directory path where the plot will be saved.
    """
    fig, ax = plt.subplots(figsize=(10, 6))
    cmap = plt.get_cmap("viridis")
    shades = np.flip(np.linspace(0.3, 0.9, len(ks)))

    for idx, k in enumerate(ks):
        color = cmap(shades[idx])
        ax.plot(free_energies[idx], label=f"K = {k}", color=color)

    ax.set_title(f"Free Energy of {model_name} Model Over Different Ks")
    ax.set_xlabel("Iterations")
    ax.set_ylabel("Free Energy")
    ax.legend()
    plt.savefig(save_path + "-free-energy", bbox_inches="tight")
    plt.close()

In [9]:
def runARD(
    x: np.ndarray,
    a_parameter: int,
    b_parameter: int,
    ks: List[int],
    max_k: int,
    em_iterations: int,
    e_maximum_steps: int,
    e_convergence_criterion: float,
    save_path: str,
) -> List[List[float]]:
    """
    Execute Automatic Relevance Determination (ARD) across multiple models.

    This function runs ARD for different numbers of latent factors, initializes the
    corresponding mean field approximations, performs the Expectation-Maximization (EM)
    algorithm to optimize the Variational Bayes models, collects free energy values
    for each model, and plots the resulting latent factors along with their inverse
    alpha values.

    Parameters
    ----------
    x : np.ndarray
        Data matrix of shape (n_samples, n_dimensions), where n_samples is the
        number of data points and n_dimensions is the number of observed dimensions.
    a_parameter : float
        Alpha parameter for the Gamma prior on the precision (inverse variance) of the Gaussian prior.
    b_parameter : float
        Beta parameter for the Gamma prior on the precision (inverse variance) of the Gaussian prior.
    ks : List[int]
        List of integers representing different numbers of latent factors to evaluate.
    max_k : int
        The maximum number of latent factors across all models.
    em_iterations : int
        Number of iterations to run the EM algorithm.
    e_maximum_steps : int
        Maximum number of steps for the variational expectation step.
    e_convergence_criterion : float
        Convergence threshold for the variational expectation step.
    save_path : str
        Directory path where plots and images will be saved.

    Returns
    -------
    List[List[float]]
        A list containing lists of free energy values for each model.
    """
    variational_bayes_models: List['VariationalBayes'] = []
    free_energies: List[List[float]] = []

    for model_idx, k in enumerate(ks):
        n = x.shape[0]

        # Initialize the MeanFieldApproximation instance
        mean_field_approximation = init_mean_field_approximation(
            k,
            n,
            max_steps=e_maximum_steps,
            convergence_criterion=e_convergence_criterion,
        )

        # Run Automatic Relevance Determination (ARD)
        variational_bayes_model, free_energy = automatic_relevance_determination(
            x=x,
            binary_latent_factor_approximation=mean_field_approximation,
            a_parameter=a_parameter,
            b_parameter=b_parameter,
            k=k,
            em_iterations=em_iterations,
        )

        variational_bayes_models.append(variational_bayes_model)
        free_energies.append(free_energy)

    # Plot the latent factors and inverse alpha values
    plot_factors(
        variational_bayes_models,
        ks,
        max_k,
        save_path,
    )

    return free_energies

In [11]:
import os
from typing import List

import numpy as np
import matplotlib.pyplot as plt


# Constants for output directories and random seed
OUTPUTS_FOLDER = "ARD"
DEFAULT_SEED = 42


def main():
    """
    Main function to perform Automatic Relevance Determination (ARD) using Variational Bayes.

    This function initializes the necessary directories, sets the random seed for reproducibility,
    runs the ARD algorithm across different numbers of latent factors, and plots the free energy
    progression for each model.
    """
    # Set the random seed for reproducibility
    np.random.seed(DEFAULT_SEED)

    # Ensure the main output directory exists
    os.makedirs(OUTPUTS_FOLDER, exist_ok=True)

    # Define output directory for Question 4


    # Define parameters for ARD
    a_parameter = 1.0
    b_parameter = 0.0
    latent_factors_list = list(range(4, 25))  # ks: number of latent factors from 4 to 24
    max_latent_factors = 24
    em_iterations = 100
    expectation_maximization_steps = 100
    expectation_convergence_threshold = 1e-6  # Changed from 0 to a small positive value for practical convergence

    # Run Automatic Relevance Determination (ARD)
    free_energies = runARD(
        x=data,
        a_parameter=a_parameter,
        b_parameter=b_parameter,
        ks=latent_factors_list,
        max_k=max_latent_factors,
        em_iterations=em_iterations,
        e_maximum_steps=expectation_maximization_steps,
        e_convergence_criterion=expectation_convergence_threshold,
        save_path=os.path.join(OUTPUTS_FOLDER, "b-1"),
    )

    # Plot the free energy progression for each model
    model_name = "Variational Bayes"
    plot_free_energy(
        ks=latent_factors_list,
        free_energies=free_energies,
        model_name="Variational Bayes",
        save_path=os.path.join(OUTPUTS_FOLDER, "b-2"),
    )


if __name__ == "__main__":
    main()

current K = 4, Convergence achieved at iteration 41, Free Energy at Convergence: -9.502440567188636.
current K = 5, Convergence achieved at iteration 54, Free Energy at Convergence: -8.189202403271139.
current K = 6, Convergence achieved at iteration 38, Free Energy at Convergence: -6.884485244477967.
current K = 7, Convergence achieved at iteration 48, Free Energy at Convergence: -5.563384475135205.
current K = 8, Convergence achieved at iteration 14, Free Energy at Convergence: -5.8765661368619355.
current K = 9, Convergence achieved at iteration 55, Free Energy at Convergence: -3.1628610534032613.
current K = 10, Convergence achieved at iteration 38, Free Energy at Convergence: -1.1067765141713457.
current K = 11, Convergence achieved at iteration 40, Free Energy at Convergence: -2.9764925575148737.
current K = 12, Convergence achieved at iteration 41, Free Energy at Convergence: -0.15271753694523182.
current K = 13, Convergence achieved at iteration 24, Free Energy at Convergence: 