<a href="https://colab.research.google.com/github/PCBZ/CS6140/blob/main/HW3/HW3_Problem6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
import numpy as np
from scipy.stats import binom
import pandas as pd
from scipy.optimize import linear_sum_assignment

def generate_coin_flip_data(priors, p_heads, K = 10, M = 100):
    """
    Generate coin flip data using 2 biased coins.

    Parameters:
    - priors: probablities of selecting coins, (priror1, prior2, ...)
    - p_heads: probablities of heads in coins, (p_heads1, p_heads2, ...)
    - K: number of flips
    - M: number of experiments

    Return:
    - data: M x K matrix of 0s (tails) and 1s (heads)
    """
    priors = np.array(priors)
    p_heads = np.array(p_heads)

    n_coins = len(priors)

    data = np.zeros((M, K), dtype=int)

    for i in range(M):
        # Pick a coin
        coin = np.random.choice(n_coins, p=priors)

        # Get selected coins p_head
        p = p_heads[coin]

        # Flip K times
        data[i, :] = np.random.choice([0, 1], size=K, p=[1-p, p])

    return data

class BinomialMixtureModel:
    """
    Binomial Mixture Model using EM algorithm.
    """
    def __init__(self, n_components = 2, max_iteration = 100, tolerance = 1e-6):
        """
        Initialize the model.

        Parameters:
        - n_components: number of components in the mixture
        - K: number of flips
        - max_iteration: maximum number of iterations
        - tolerance: tolerance for convergence
        """
        self.n_components = n_components
        self.K = None
        self.max_iteration = max_iteration
        self.tolerance = tolerance

        self.probs = None
        self.weights = None
        self.log_likelihoods = []

    def _initialize_parameters(self):
        """
        Initialize the parameters of the model.
        """
        T = self.n_components
        self.probs = np.random.uniform(0.05, 0.95, T)

        self.weights = np.ones(T) / T
        noise = np.random.uniform(-0.1, 0.1, T)
        self.weights += noise
        self.weights /= np.sum(self.weights)

    def _e_step(self, data):
        """
        E-step of the EM algorithm.

        Parameters:
        - data: M x K matrix of 0s (tails) and 1s (heads)

        Return:
        - responsibilities: M x T matrix of posterior probabilities
        - log_likelihood: log-likelihood of the data
        """
        M, K = data.shape
        T = self.n_components

        # Compute head count in a sequence
        head_counts = np.sum(data, axis=1)

        # Compute likelihood for each component
        likelihoods = binom.pmf(head_counts[:, np.newaxis], K, self.probs)

        # Weighted likilihood
        weighted_likelihoods = likelihoods * self.weights[np.newaxis, :]

        total_prob = np.sum(weighted_likelihoods, axis=1)

        # Responsibilities
        responsibilities = weighted_likelihoods / (total_prob[:, np.newaxis] + 1e-10)

        # Log-likelihood
        log_likelihood = np.sum(np.log(total_prob + 1e-10))

        return responsibilities, log_likelihood

    def _m_step(self, data, responsibilities):
        """
        M-step of the EM algorithm.

        Parameters:
        - data: M x K matrix of 0s (tails) and 1s (heads)
        - responsibilities: M x T matrix of posterior probabilities

        Return:
        - probs: updated probabilities of the components
        - weights: updated weights of the components
        """
        M, K = data.shape
        T = self.n_components

        updated_weights = np.mean(responsibilities, axis=0)

        # Compute head count in a sequence
        head_counts = np.sum(data, axis=1)

        numerator = np.sum(responsibilities * head_counts[:, np.newaxis], axis=0)
        denominator = np.sum(responsibilities, axis=0)

        updated_probs = numerator / (denominator * K + 1e-10)

        return updated_probs, updated_weights

    def fit(self, data):
        """
        Fit the model to the data.

        Parameters:
        - data: M x K matrix of 0s (tails) and 1s (heads)
        """
        M, K = data.shape
        self.K = K

        self._initialize_parameters()

        self.log_likelihoods = []

        for iteration in range(self.max_iteration):
            # E-step
            responsibilities, log_likelihood = self._e_step(data)
            self.log_likelihoods.append(log_likelihood)

            # M-step
            probs, weights = self._m_step(data, responsibilities)

            # Check convergence
            if len(self.log_likelihoods) > 1:
                diff = np.abs(self.log_likelihoods[-1] - self.log_likelihoods[-2])
                if diff < self.tolerance:
                    print(f"Converged at iteration {iteration}")
                    break

            self.probs = probs
            self.weights = weights

    def _sort_components(self, data):
        """
        Sort the components of the mixture model.

        Parameters:
        - data: M x K matrix of 0s (tails) and 1s (heads)
        """

def hungarian_match_components(true_params, model):
    """
    Match the components of the mixture model with the true parameters.
    """
    n_components = len(true_params["priors"])

    cost_matrix = np.zeros((n_components, n_components))

    for i in range(n_components):
        for j in range(n_components):
            prob_diff = np.abs(true_params["p_heads"][i] - model.probs[j])
            prior_diff = np.abs(true_params["priors"][i] - model.weights[j])
            cost_matrix[i, j] = prob_diff + prior_diff

    row_indices, col_indices = linear_sum_assignment(cost_matrix)

    matched_probs = np.zeros(n_components)
    matched_weights = np.zeros(n_components)
    for i, j in zip(row_indices, col_indices):
        matched_probs[i] = model.probs[j]
        matched_weights[i] = model.weights[j]

    return matched_probs, matched_weights

def compare_results(true_params, probs, weights):
    """
    Compare the performance of the model with 2 components.
    """
    n_components = len(true_params["priors"])

    data = []

    for i in range(n_components):
        weight_relative_error = np.abs(true_params["priors"][i] - weights[i]) / true_params["priors"][i]
        prob_relative_error = np.abs(true_params["p_heads"][i] - probs[i]) / true_params["p_heads"][i]

        data.append({
            'Component': i + 1,
            'True pi': true_params["priors"][i],
            'Estimated pi': weights[i],
            'Relative Error pi': f'{weight_relative_error * 100: .2f}%',
            'True p': true_params["p_heads"][i],
            'Estimated p': probs[i],
            'Relative Error p': f'{prob_relative_error * 100: .2f}%',
        })

    df = pd.DataFrame(data)
    print(df.to_string(index=False))

if __name__ == "__main__":
    # 2 components ture params
    true_params = [{
        "priors": (0.4, 0.6),
        "p_heads": (0.2, 0.7),
    }, {
        "priors": (0.2, 0.8),
        "p_heads": (0.3, 0.9),
    }, {
        "priors": (0.3, 0.7),
        "p_heads": (0.1, 0.8),
    }]
    #  Generate data
    datas = [generate_coin_flip_data(true_param["priors"], true_param["p_heads"]) for true_param in true_params]

    # Create model and fit
    for idx, data in enumerate(datas):
        model = BinomialMixtureModel(2)
        model.fit(data)

        matched_probs, matched_weights = hungarian_match_components(true_params[idx], model)

        compare_results(true_params[idx], matched_probs, matched_weights)

    # Generate 3, 4 components true params
    true_params_3 = {
        "priors": (0.1, 0.3, 0.6),
        "p_heads": (0.15, 0.5, 0.85),
    }

    data = generate_coin_flip_data(true_params_3["priors"], true_params_3["p_heads"], K=20, M=500)

    # Create model and fit
    model = BinomialMixtureModel(3)
    model.fit(data)

    matched_probs, matched_weights = hungarian_match_components(true_params_3, model)

    compare_results(true_params_3, matched_probs, matched_weights)

    true_params_4 = {
        "priors": (0.1, 0.2, 0.3, 0.4),
        "p_heads": (0.1, 0.35, 0.65, 0.9),
    }

    data = generate_coin_flip_data(true_params_4["priors"], true_params_4["p_heads"], K=30, M=1000)

    model = BinomialMixtureModel(4)
    model.fit(data)

    matched_probs, matched_weights = hungarian_match_components(true_params_4, model)

    compare_results(true_params_4, matched_probs, matched_weights)







Converged at iteration 11
 Component  True pi  Estimated pi Relative Error pi  True p  Estimated p Relative Error p
         1      0.4       0.45944            14.86%     0.2     0.193643            3.18%
         2      0.6       0.54056             9.91%     0.7     0.743733            6.25%
Converged at iteration 6
 Component  True pi  Estimated pi Relative Error pi  True p  Estimated p Relative Error p
         1      0.2      0.141751            29.12%     0.3     0.277431            7.52%
         2      0.8      0.858249             7.28%     0.9     0.892135            0.87%
Converged at iteration 5
 Component  True pi  Estimated pi Relative Error pi  True p  Estimated p Relative Error p
         1      0.3      0.347298            15.77%     0.1     0.115060           15.06%
         2      0.7      0.652702             6.76%     0.8     0.819731            2.47%
Converged at iteration 21
 Component  True pi  Estimated pi Relative Error pi  True p  Estimated p Relative Error 