In [1]:
import numpy as np
from scipy import stats


In [2]:
observations = np.array([[1,0,0,0,1,1,0,1,0,1],
                         [1,1,1,1,0,1,1,1,1,1],
                         [1,0,1,1,1,1,1,0,1,1],
                         [1,0,1,0,0,0,1,1,0,0],
                         [0,1,1,1,0,1,1,1,0,1]])

In [3]:
def em_single(priors, observations):
    """
    Performs a single EM step
    ---------
    priors : [theta_A, theta_B]
    observations : [m X n nested list]
    
    Returns
    --------
    new_priors: [new_theta_A, new_theta_B]
    """
    counts = {'A':{'H':0,'T':0}, 'B':{'H':0,'T':0}}
    theta_A = priors[0]
    theta_B = priors[1]
    # E step
    for observation in observations: 
        # How many coin tosses for this observation??
        len_observation = len(observation)
        # Count heads and tails
        num_heads = observation.sum()
        num_tails = len_observation - num_heads
        # Given our two possible prior probabilities...
        # What is the probability of seeing this observation?
        # Note use of binomial formula
        contribution_A = stats.binom.pmf(num_heads,len_observation,theta_A)
        contribution_B = stats.binom.pmf(num_heads,len_observation,theta_B)
        # Now what's the probability of this observation coming from 
        # a coin with theta A vs a coin with theta B?
        weight_A = contribution_A/(contribution_A+contribution_B)
        weight_B = contribution_B/(contribution_A+contribution_B)
        # Weight your dataset by probability of 
        # belonging to coin A or B
        # Sum all your observations for an estimate of 
        # Heads & Tails for each coin
        counts['A']['H']+= weight_A*num_heads
        #print counts['A']['H']
        counts['A']['T']+= weight_A*num_tails
        counts['B']['H']+= weight_B*num_heads
        counts['B']['T']+= weight_B*num_tails
    # M step
    # Given the weighted dataset
    # calculate the maximum likelihood
    new_theta_A = counts['A']['H']/(counts['A']['H']+counts['A']['T'])
    new_theta_B = counts['B']['H']/(counts['B']['H']+counts['B']['T'])
    return [new_theta_A, new_theta_B]

In [4]:
def em(observations, prior, tol=1e-7, iterations=1000):
    import math
    iteration = 0
    while iteration<iterations:
        new_prior = em_single(prior, observations)
        delta_change = np.abs(prior[0]-new_prior[0])
        if delta_change<tol:
            break
        else:
            prior = new_prior
            iteration+=1
    return [new_prior, iteration]

In [5]:
em_single([.00000000001, 0.5], observations)

[0.400000000001, 0.66000000000000003]

In [6]:

em(observations, [0.0000000000001,0.5])

[[0.5195830660193318, 0.79678903438696758], 16]