# Expectation Maximization: Binomial Mixture Model

__Notebook by Emmanuel Contreras-Campana, PhD__

The Expectation Maximization algorithm is an iterative method for finding maximum likelihood (MLE) or maximum a posteriori (MAP) estimates of parameters in statistical models, where the model depends on unobserved latent variables.

## Load Libraries

In [None]:
import numpy as np
import scipy.stats

## Coin Flip Experiment

As an example, consider a simple coin-flipping experiment in which we are given a pair of coins A and B of unknown biases, $\theta_A$ and $\theta_B$, respectively (that is, on any given flip, coin A will land on heads with probability $\theta_A$ and tails with probability $1-\theta_A$ and similarly for coin B). Our goal is to estimate $\theta = (\theta_A,\ \theta_B)$ by repeating the following procedure several times.

In [None]:
# coin likelihood
def binomial_likelihood(num_heads, trials, bias):
    """
    Calcuate binomial likelihood
    
    Parameters
    ----------
    obs : list
        observations (i.e. rolls)
    bias : float
        bias of coin (i.e. probability of success for the coin)
    
    # number of successes
    
    # number of tosses
    Returns
    -------
    likelihood : float
        likelihood of drawing the coin to perform experiment
    """
    
    # likelihood P(X | theta)
    likelihood = scipy.stats.binom(trials, bias).pmf(num_heads)
    
    return likelihood

In [None]:
# expectation step
def expection_step(observations, theta_A, theta_B,
                   prior_A):
    """
    Calculate the expected number of heads and tails attributed
    to coin A as well as the expected number of heads and tails
    attributed to coin B
    
    Parameters
    ----------
    observations : list of lists
        observations (i.e. list of rolls)
    theta_A : float
        bias of coin (i.e. probability of success for the coin)
    theta_B : float
        bias of coin (i.e. probability of success for the coin)
    
    Returns
    -------
    heads_A, tails_A, heads_B, tails_B : floats
    """
    
    heads = []
    tails = []
         
    # expected number of heads/tails attributed to coin A
    gamma_A = []

    # expected number of heads/tails attributed to coin B
    gamma_B = []
    
    # prior distribution for drawing coin B
    prior_B = 1 - prior_A
    
    for rolls in observations:
        
        # number of heads in each set of rolls
        num_heads = rolls.count("H")
        trails = len(rolls)
        
        # likelihood of each coin
        likelihood_A = binomial_likelihood(num_heads, trails, theta_A)
        
        likelihood_B = binomial_likelihood(num_heads, trails, theta_B)
        
        # evidence of the data
        p_D = likelihood_A*prior_A + likelihood_B*prior_B
        
        # posterior distributions for drawing coins A or B
        # note: Bayes' theorem
        p_A = likelihood_A*prior_A/p_D
        
        # expected number of heads and tails attributed
        # to coin A
        gamma_A.append(p_A)
        
        heads.append(num_heads)
        
    return gamma_A, heads

In [None]:
# maximization algorith for coin flips
def  maximization_step(gamma_A, heads):
    """
    Obtain the maximum likelihood estimator theta (i.e. theta MLE)
    
    Parameters
    ----------
    heads_A : list of lists
        observations (i.e. list of rolls)
    tails_A : float
        bias of coin (i.e. probability of success for the coin)
    heads_B : float
        bias of coin (i.e. probability of success for the coin)
    tails_B : float
        bias of coin (i.e. probability of success for the coin)
    
    Returns
    -------
    theta_A, theta_B : float
        likelihood of drawing the coin to perform experiment
    """

    # Replace dummy values with your implementation
    size = len(gamma_A)
    gamma_B = np.array([1]*size) - np.array(gamma_A)
    
    tails = np.array([10]*5) - np.array(heads)
    
    exp_heads_A = np.array(heads)*np.array(gamma_A)
    exp_tails_A = np.array(tails)*np.array(gamma_A)
    
    theta_A = np.sum(exp_heads_A)/(np.sum(exp_heads_A) + np.sum(exp_tails_A))
    
    exp_heads_B = np.array(heads)*np.array(gamma_B)
    exp_tails_B = np.array(tails)*np.array(gamma_B)

    theta_B = np.sum(exp_heads_B)/(np.sum(exp_heads_B) + np.sum(exp_tails_B))
    
    prior_A = np.mean(gamma_A)
    
    return theta_A, theta_B, prior_A

In [None]:
# expectation maximization algorithm for coin flips
def expectation_maximization(rolls, theta_A, theta_B,
                             prior_A, max_iter=10):
    """
    Expectation Maximization (EM) algorithm for Binomial Mixture Model
    
    Parameters
    ----------
    rolls : list of lists
        observations of all the rolls
    theta_A : float
        bias of coin A (i.e. probability of success for the coin)
    theta_B : float
        bias of coin B (i.e. probability of success for the coin)
    max_iter : float
        the number of iterations to use for EM algorithm
    
    Returns
    -------
    thetas : float
        list of all thetas calculated with EM algorithm
    """
    
    # store biases of coin from each iteration
    thetas = []
    
    # Iterate through EM algorithm
    for i in range(max_iter+1):
        
        # store initial biases of coins
        if i==0:
            thetas.append((theta_A, theta_B))

        # store updated biases of coins
        else:
            # perform expectation step
            gamma_A, heads = expection_step(rolls, theta_A, theta_B,
                                            prior_A)

            # perform maximization step
            theta_A, theta_B, prior_A = maximization_step(gamma_A, heads)

            # rounding
            theta_A = float(format(theta_A, '.2f'))
            theta_B = float(format(theta_B, '.2f'))

            # all theta values
            thetas.append((theta_A, theta_B))
        
        print "#%d:\t%0.2f %0.2f (prior %0.2f)" % (i, theta_A, theta_B, prior_A)
        
    thetas = np.array(thetas)
    
    return thetas

In [None]:
# intial random guess for biases of coins
theta_A = 0.6
theta_B = 0.5

# initial random guess for the probability of drawing each coin
prior_A = 0.45 # mixing proportion

# number of interations to perform
max_iter = 10

# set of observations
rolls = [ "HTTTHHTHTH", "HHHHTHHHHH", "HTHHHHHTHH", 
          "HTHTTTHHTT", "THHHTHHHTH" ]

thetas = expectation_maximization(rolls, theta_A, theta_B, 
                                  prior_A, max_iter)

## Appendix

In [None]:
# coin likelihood
def binom_likelihood(obs, bias):
    """
    Calcuate binomial likelihood
    
    Parameters
    ----------
    obs : list
        observations (i.e. rolls)
    bias : float
        bias of coin (i.e. probability of success for the coin)
    
    Returns
    -------
    likelihood : float
        likelihood of drawing the coin to perform experiment
    """
    
    # number of successes
    num_heads = obs.count("H")
    
    # number of tosses
    trials = len(obs)
    
    # likelihood P(X | theta)
    likelihood = pow(bias, num_heads) * pow(1 - bias, trials - num_heads)
    
    return likelihood

In [None]:
# expectation step
def e_step(observations, theta_A, theta_B):
    """
    Calculate the expected number of heads and tails attributed
    to coin A as well as the expected number of heads and tails
    attributed to coin B
    
    Parameters
    ----------
    observations : list of lists
        observations (i.e. list of rolls)
    theta_A : float
        bias of coin (i.e. probability of success for the coin)
    theta_B : float
        bias of coin (i.e. probability of success for the coin)
    
    Returns
    -------
    heads_A, tails_A, heads_B, tails_B : floats
    """
    
    # expected number of heads/tails attributed to coin A
    heads_A =0
    tails_A = 0
    
    # expected number of heads/tails attributed to coin B
    heads_B = 0
    tails_B = 0
    
    for rolls in observations:
        likelihood_A = binom_likelihood(rolls, theta_A)
        likelihood_B = binom_likelihood(rolls, theta_B)
        
        # improper priors for drawing coins A or B
        prior_A = 1/(likelihood_A + likelihood_B)
        prior_B = prior_A
        
        # posterior distributions for drawing coins A or B
        p_A = likelihood_A * prior_A
        p_B = likelihood_B * prior_B
        
        # expected number of heads and tails attributed
        # to coin A
        heads_A += p_A * rolls.count("H")
        tails_A += p_A * rolls.count("T")
        
        # expected number of heads and tails attributed
        # to coin B
        heads_B += p_B * rolls.count("H")
        tails_B += p_B * rolls.count("T")
        
    return heads_A, tails_A, heads_B, tails_B

In [None]:
# maximization algorith for coin flips
def  m_step(heads_A, tails_A, heads_B, tails_B):
    """
    Obtain the maximum likelihood estimator theta (i.e. theta MLE)
    
    Parameters
    ----------
    heads_A : list of lists
        observations (i.e. list of rolls)
    tails_A : float
        bias of coin (i.e. probability of success for the coin)
    heads_B : float
        bias of coin (i.e. probability of success for the coin)
    tails_B : float
        bias of coin (i.e. probability of success for the coin)
    
    Returns
    -------
    theta_A, theta_B : float
        likelihood of drawing the coin to perform experiment
    """

    # Replace dummy values with your implementation
    theta_A = heads_A / (heads_A + tails_A)
    theta_B = heads_B / (heads_B + tails_B)
    
    return theta_A, theta_B

In [None]:
# expectation maximization algorithm for coin flips
def em_algorithm(rolls, theta_A, theta_B, max_iter=10):
    """
    Expectation Maximization (EM) algorithm for Binomial Mixture Model
    
    Parameters
    ----------
    rolls : list of lists
        observations of all the rolls
    theta_A : float
        bias of coin A (i.e. probability of success for the coin)
    theta_B : float
        bias of coin B (i.e. probability of success for the coin)
    max_iter : float
        the number of iterations to use for EM algorithm
    
    Returns
    -------
    thetas : float
        list of all thetas calculated with EM algorithm
    """
    
    # store biases of coin from each iteration
    thetas = []
    
    # Iterate through EM algorithm
    for i in range(max_iter+1):
        
        # store initial biases of coins
        if i==0:
            thetas.append((theta_A, theta_B))

        # store updated biases of coins
        else:
            # perform expectation step
            heads_A, tails_A, heads_B, tails_B = e_step(rolls, theta_A, theta_B)

            # perform maximization step
            theta_A, theta_B = m_step(heads_A, tails_A, heads_B, tails_B)

            # rounding
            theta_A = float(format(theta_A, '.2f'))
            theta_B = float(format(theta_B, '.2f'))

            # all theta values
            thetas.append((theta_A, theta_B))
        
        print("#%d:\t%0.2f %0.2f" % (i, theta_A, theta_B))
        
    thetas = np.array(thetas)
    
    return thetas

In [None]:
# intial random guess for biases of coins
theta_A = 0.6
theta_B = 0.5

# number of interations to perform
max_iter = 10

# set of observations
rolls = [ "HTTTHHTHTH", "HHHHTHHHHH", "HTHHHHHTHH", 
          "HTHTTTHHTT", "THHHTHHHTH" ]

thetas = em_algorithm(rolls, theta_A, theta_B, max_iter)

## Further Readings
0. http://statisticalrecipes.blogspot.com/2012/04/applying-em-algorithm-binomial-mixtures.html
1. http://karlrosaen.com/ml/notebooks/em-coin-flips/
2. https://www.youtube.com/watch?v=TBouIiMZNf0
3. https://www.youtube.com/watch?v=7e65vXZEv5Q
4. https://www.youtube.com/watch?v=Ps2qV9vCn60
5. https://en.wikipedia.org/wiki/Expectation%E2%80%93maximization_algorithm
6. https://cs.stackexchange.com/questions/10637/applying-expectation-maximization-to-coin-toss-examples
7. https://www.youtube.com/watch?v=AnbiNaVp3eQ
8. https://www.youtube.com/watch?v=xpkaTRxzQ2k
9. https://www.youtube.com/watch?v=-q0KFRaPuvc
10. http://dawenl.github.io/files/em.pdf
11. http://local.disia.unifi.it/grilli/files/Papers/BinomialMixtureModellingCredits.pdf
12. https://www.cs.cmu.edu/~tom/10701_sp11/recitations/EM%20Algorithm.pdf