# 双硬币模型

假设有两枚硬币A、B，以相同的概率随机选择一个硬币，进行如下的抛硬币实验：共做5次实验，每次实验独立的抛十次. 假设试验数据记录员忘了记录每次试验选择的是A还是B，我们无法观测实验数据中选择的硬币是哪个.问如何估计两个硬币正面出现的概率？

![#](http://ww3.sinaimg.cn/large/6cbb8645gw1f4b95mzejvj20p60t0qab.jpg)

假设已知A，B为正面的概率各为$\theta_A, \theta_B$,则对于一次连续投掷了10次正反各5次的实验,硬币X产生这么一组样本的概率为

$$
C_{10}^{5} \theta_X^5 (1-\theta_X)^{(10-5)}
$$

In [4]:
# theta_X = 0.6的话
(10*9*8*7*6*5*4*3*2*1) / (5*4*3*2*1)**2 * (0.6)**5 * (0.4)**5

0.20065812480000003

是一个二项分布，可以用`scipy`中的`binom`来计算。

    scipy.stats.binom.pmf(5,10,0.6)

这样正规化后就可以求出每一组实验来自硬币X的均值$\mu$. 有了$\mu$，就可以估计数据中AB分别产生正反面的次数了。$\mu$代表数据来自硬币A的概率的估计，将它乘上正面的总数，得到正面来自硬币A的总数，同理有反面，同理有B的正反面。

---

# coding

In [6]:
import numpy as np
from scipy import stats

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use("ggplot")

In [5]:
# 硬币投掷结果观测序列
observations = np.array([[1, 0, 0, 0, 1, 1, 0, 1, 0, 1],
                         [1, 1, 1, 1, 0, 1, 1, 1, 1, 1],
                         [1, 0, 1, 1, 1, 1, 1, 0, 1, 1],
                         [1, 0, 1, 0, 0, 0, 1, 1, 0, 0],
                         [0, 1, 1, 1, 0, 1, 1, 1, 0, 1]])

In [19]:
def EM_onestep(priors, observations):
    """
    EM算法单次迭代
    
    Arguments
    ---------
    priors : [theta_A, theta_B]
    observations : [m X n matrix]
 
    Returns
    --------
    new_priors: [new_theta_A, new_theta_B]
    :param priors:
    :param observations:
    :return:
    """
    counts = {'A': {'H': 0, 'T': 0}, 'B': {'H': 0, 'T': 0}}
    theta_A, theta_B = priors
    
    #-- E step
    for obs in observations:
        L = len(obs)
        num_heads = obs.sum()
        num_tails = L - num_heads
        contribution_A = stats.binom.pmf(num_heads, L, theta_A)
        contribution_B = stats.binom.pmf(num_heads, L, theta_B)
        weight_A = contribution_A / (contribution_A + contribution_B)
        weight_B = contribution_B / (contribution_A + contribution_B)
        # 更新在当前参数下A、B硬币产生的正反面次数
        counts['A']['H'] += weight_A * num_heads
        counts['A']['T'] += weight_A * num_tails
        counts['B']['H'] += weight_B * num_heads
        counts['B']['T'] += weight_B * num_tails
    
    #-- M step
    new_theta_A = counts['A']['H'] / (counts['A']['H'] + counts['A']['T'])
    new_theta_B = counts['B']['H'] / (counts['B']['H'] + counts['B']['T'])
    
    return new_theta_A, new_theta_B

def EM_main(observations, prior_init, niter=100):
    """
    EM算法
    :param observations: 观测数据
    :param prior_init: 模型初值
    :param niter: 迭代次数
    :return: 局部最优的模型参数
    """
    prior = prior_init.copy()
    for it in range(1,niter):
        prior = EM_onestep(prior, observations)
        print(f"[{it:3d}/{niter:3d}] --> A: {prior[0]:.6f}, B: {prior[1]:.6f}")
        
    return prior

In [23]:
EM_main(observations, [0.5, 0.4], niter=10)

[  1/ 10] --> A: 0.702257, B: 0.570088
[  2/ 10] --> A: 0.742641, B: 0.567627
[  3/ 10] --> A: 0.766954, B: 0.549492
[  4/ 10] --> A: 0.782615, B: 0.534776
[  5/ 10] --> A: 0.790814, B: 0.526387
[  6/ 10] --> A: 0.794435, B: 0.522440
[  7/ 10] --> A: 0.795891, B: 0.520751
[  8/ 10] --> A: 0.796451, B: 0.520056
[  9/ 10] --> A: 0.796663, B: 0.519774


(0.79666281984090592, 0.51977374275863775)

In [24]:
# 初始值相同的话不可解
EM_main(observations, [0.2, 0.2], niter=10)

[  1/ 10] --> A: 0.660000, B: 0.660000
[  2/ 10] --> A: 0.660000, B: 0.660000
[  3/ 10] --> A: 0.660000, B: 0.660000
[  4/ 10] --> A: 0.660000, B: 0.660000
[  5/ 10] --> A: 0.660000, B: 0.660000
[  6/ 10] --> A: 0.660000, B: 0.660000
[  7/ 10] --> A: 0.660000, B: 0.660000
[  8/ 10] --> A: 0.660000, B: 0.660000
[  9/ 10] --> A: 0.660000, B: 0.660000


(0.66000000000000003, 0.66000000000000003)