# Binomial Mixture Model with Expectation-Maximization (EM) Algorithm

## The Binomial distribution

The Given $N_i$, the probability of $n_i$ is

$P(n_i | N_i, \Theta) = \sum_{k=1}^{2}\pi_k \mathrm{Bino}(n_i|N_i, \theta_k)$, 

where the Binomial Distribution is

$\mathrm{Bino}(n_i|N_i, \theta) = {N_i!\over n_i!(N_i-n_i)!} \theta^{n_i} (1-\theta)^{N_i-n_i}$,

and the sum of $\pi$'s is unity, i.e.

$\sum_{k=1}^{2} \pi_k = 1$

In [1]:
import numpy as np
import torch
from torch.distributions.binomial import Binomial
import matplotlib.pyplot as plt

if torch.cuda.is_available():
    print("cuda is available")
    import torch.cuda as t
else:
    print("cuda is unavailable")
    import torch as t

cuda is unavailable


## Help functions

In [2]:
def new_dataset(S, k_len, theta_err, theta_cor, pi_err):

    # probability of a base being correct
    pi_cor = 1.0 - pi_err

    # the list of (Ni| i =1, 2, ..., S), uniformly drawn between low and high
    N_ls_all = t.full((S,), k_len)
    N_ls_all = N_ls_all.type(t.FloatTensor)

    # the list of theta, each theta is either theta_1 or theta_2. The probability of theta_i is pi_i
    theta_ls = t.FloatTensor(np.random.choice([theta_err,theta_cor], size=S, p=[pi_err,pi_cor]))

    # the list of (ni | i=1,2 ...,S)
    n_ls_all = Binomial(N_ls_all, theta_ls).sample()

    #errors
    ground_truth = (theta_ls > theta_cor)
    
    # Split into train and validation sets
    shuffled_indice = torch.randperm(S)
    N_ls_shuffled = N_ls_all[shuffled_indice]
    n_ls_shuffled = n_ls_all[shuffled_indice]

    # percentage of train set.
    train_frac = 0.7
    train_index = int(0.7*S)
    N_ls_train = N_ls_shuffled[0:train_index]
    N_ls_valid = N_ls_shuffled[train_index:]
    n_ls_train = n_ls_shuffled[0:train_index]
    n_ls_valid = n_ls_shuffled[train_index:]

    #reorder ground truth
    ground_truth = ground_truth[shuffled_indice]
    ground_truth_train = ground_truth[0:train_index]
    ground_truth_valid = ground_truth[train_index:]
    
    return (N_ls_train, n_ls_train, ground_truth_train), (N_ls_valid, n_ls_valid, ground_truth_valid)

Make some figures to get some visual impression of the dataset.

In [3]:
def plot(N_ls_all, n_ls_all):
    %matplotlib inline 

    fig, axes = plt.subplots(1,2,figsize=(14,6))
    axes[0].scatter(N_ls_all-n_ls_all, n_ls_all)
    axes[0].set_xlabel("N-n",size=16)
    axes[0].set_ylabel("n",size=16)
    axes[0].tick_params(labelsize=14)

    axes[1].hist(n_ls_all/N_ls_all, bins=20)
    axes[1].set_xlabel("n/N", size=16)
    axes[1].tick_params(labelsize=14)
    axes[1].set_title("Histogram of n/N", size=16)
    plt.show()

### Calculating the `log_likelihood` 

The `log_likelihood` is the log of the probability of the parameters, $\Theta$, given the observed data, `N_ls` and `n_ls`. It is defined below,

`log_likelihood` $= \ln(L(\Theta, {ni})) =\ln( P({ni} | \Theta)) = \sum_{i=1}^{S} \ln(\sum_{k=1}^{K} \pi_k * \mathrm{Binom}(n_i|N_i, \theta_k) )$

In [4]:
# calculate log_likelihood using a method which supposes to avoid underflow or overflow in
# log_sum_exp = log(sum_{i=1}^{S} exp(bi)). When bi >> 1, overflow leads to log_sum_exp = infty
# When bi << -1, underflow leads to log_sum_exp = -infty.

def calc_logL(N_ls, n_ls, pi_list, theta_list):
    '''
    Input: N_ls is a [S] shape tensor = [N1, N2, ..., NS]
           n_ls is a [S] shape tensor = [n1, n2, ..., nS]
           pi_list is a [K] shape tensor = [pi_1, .., pi_K]
           theta_list is a [K] shape tensor = [theta_1, ..., theta_K]
    Output: log_likelihood of the parameters (pi and theta) given the observed data (N, n).
    '''
    S = len(N_ls)
    K = len(pi_list)

    # log_binom_mat has shape (S,K), element_{i,l} = log_Binomial(ni|Ni, theta_l)
    # log with natural base.
    log_binom_mat = Binomial(N_ls.reshape(S,1), theta_list.reshape(1,K)).log_prob(n_ls.reshape(S,1))

    # mean_log_binom, the mean value of all elements in log_binom_mat.
    c = torch.mean(log_binom_mat)

    # binom_mat has shape (S,K), element_{i,l} = Binomial(ni|Ni, theta_l)
    binom_mat = torch.exp(log_binom_mat - c)

    # log_likelihood = sum_{i=1}^{S} log(prob_i), this is a real number
    log_likelihood = S*c + torch.sum(torch.log(torch.matmul(binom_mat, pi_list)))

    return log_likelihood


### Calculating $P(z_i=m| n_i, \Theta_\mathrm{old})$

$P(z_i=m| n_i, \Theta_\mathrm{old}) = \left[\sum_{l=1}^{K} {\pi_{l,old}\over \pi_{m,old}}\left(\theta_{l,old}\over\theta_{m,old}\right)^{n_i}\left({1-\theta_{l,old}}\over{1-\theta_{m,old}}\right)^{N_i-n_i}\right]^{-1}$

We take advantage of the [broadcast](https://pytorch.org/docs/stable/notes/broadcasting.html) nature for torch tensors. Any torch math manipulations, except element-wise manipulations, would take action in a broadcast way as long as the tensors are broadcastable. Broadcasting does not make copy of the data on the memory and thus is very efficient and much more preferred to `for loop`.

In [5]:
def calc_Posterior(N_ls, n_ls, pi_list, theta_list):
    '''
    Input: N_ls is a [S] shape tensor = [N1, N2, ..., NS]
           n_ls is a [S] shape tensor = [n1, n2, ..., nS]
           pi_list is a [K] shape tensor = [pi_1, .., pi_K]
           theta_list is a [K] shape tensor = [theta_1, ..., theta_K]
    Output: Posterior, a tensor with shape (K,S) and its element_{m,i} = P(zi=m|ni,Theta_old) which is
            the posterior probability of the i-th sample belonging to the m-th Binomial distribution.
    '''
    
    # shape = (K,K) with theta_ratio_{m,l} = theta_l/theta_m, m-th row, l-th column
    theta_ratio = torch.div(theta_list.reshape(1,K), theta_list.reshape(K,1))

    # shape = (K,K), element_{ml} = (1-theta_l)/(1-theta_m)
    unity_minus_theta_ratio = torch.div((1e0 - theta_list).reshape(1,K), (1e0 - theta_list).reshape(K,1))

    # shape = (K,K), element_{m,l} = (theta_l/theta_m) * [(1-theta_l)/(1-theta_m)]
    mixed_ratio = torch.mul(theta_ratio, unity_minus_theta_ratio)

    # shape = (K,K,S) with element_{m,l,i} = [(theta_l/theta_m)*(1-theta_l)/(1-theta_m)]^ni
    # its element won't be either 0 or infty no matther whether theta_l > or < theta_m
    mixed_ratio_pow = torch.pow(theta_ratio.reshape(K,K,1), n_ls)
    mixed_ratio_pow = torch.clamp(mixed_ratio_pow, min=0.0, max=1e15)

    # shape = (K,K,S) with element_{m,l,i} = [ (1-theta_l)/(1-theta_m) ]^(Ni-2ni)
    # its element may be infty if theta_l<<theta_m, or 0 if theta_l >> theta_m
    unity_minus_theta_ratio_pow = torch.pow(unity_minus_theta_ratio.reshape(K,K,1), N_ls-2.0*n_ls)
    unity_minus_theta_ratio_pow = torch.clamp(unity_minus_theta_ratio_pow, min=0.0, max=1e15)

    # In below, we multiply the element of mixed_ratio_pow and the element of unity_minus_theta_ratio_pow,
    # and there won't be nan caused by 0*infty or infty*0 because the element in mixed_ratio_pow won't be 0 or infty.
    # Thus we make sure there won't be nan in Posterior.

    # element-wise multiply, pow_tensor has shape(K,K,S), element_{m,l,i} = (theta_l/theta_m)^ni * [(1-theta_l)/(1-theta_m)]^(Ni-ni).
    # Note that torch.mul(a, b) would broadcast if a and b are different in shape & they are
    # broadcastable. If a and b are the same in shape, torch.mul(a,b) would operate element-wise multiplication.

    pow_tensor = torch.mul(mixed_ratio_pow, unity_minus_theta_ratio_pow)

    # pi_ratio has shape (K,K) with element_{m,l} = pi_l/pi_m
    pi_ratio = torch.div(pi_list.reshape(1,K), pi_list.reshape(K,1))

    # posterior probability tensor, Pzim = P(zi=m|ni,Theta_old)
    # shape (K,S), element_{m,i} = P(zi=m|ni,Theta_old)
    S = len(N_ls)
    Posterior = torch.pow(torch.matmul(pi_ratio.reshape(K,1,K), pow_tensor), -1e0).reshape(K,S)

    return Posterior

### Updating $\Theta \equiv \{(\pi_m, \theta_m)|m=1,2,...,K\}$ According to the EM Algorithm

The computational complexity of the EM Algorithm is $S\times K$ per iteration.

$\pi_m ={1\over S} \sum_{i=1}^{S} P(z_i=m| n_i, \Theta_{old})$

$\theta_m = {{\sum_{i=1}^{S} n_i P(z_i=m| n_i, \Theta_{old})}\over{\sum_{j=1}^{S} N_j P(z_j=m| n_j, \Theta_{old})}}$

In [6]:
def calc_params(N_ls, n_ls, Posterior):
    '''
    Input: N_ls, tensor of shape [S]
           n_ls, tensor of shape [S]
           Posterior, tensor of shape (K,S)
    '''
    # update pi_list
    # torch.sum(tensor, n) sum over the n-th dimension of the tensor
    # e.g. if tensor'shape is (K,S) and n=1, the resulting tensor has shape (K,)
    # the m-th element is the sum_{i=1}^{S} tensor_{m,i}
    pi_list = torch.sum(Posterior,1)/len(N_ls)

    # update theta_list
    theta_list = torch.div(torch.matmul(Posterior, n_ls), torch.matmul(Posterior, N_ls))

    return pi_list, theta_list

### Initializing the parameters $\Theta$

We denote $\Theta$ by `params` in this code.

In [7]:
def initialize_parameters(K = 2, small_value = 1e-6):

    # initialize pi's, make sure that the sum of all pi's is unity
    # pi is drawn from a Uniform distribution bound by [small_value, 1)
    from torch.distributions.uniform import Uniform
    pi_list = Uniform(low=small_value, high=1e0).sample([K-1])
    pi_K = t.FloatTensor([1e0]) - pi_list.sum()
    pi_list = torch.cat([pi_list, pi_K], dim=0)

    # initialize theta's, make sure that each theta satisfies 0<theta<1
    from torch.distributions.normal import Normal
    theta_list = torch.clamp(Normal(loc=0.5, scale=0.3).sample(t.IntTensor([K])), min=small_value, max=1e0-small_value)

    # combine all pi and theta into a list of tuples called `params`, which is the capital Theta in my article
    # params has the shape of K rows x 2 columns
    params = torch.stack([pi_list, theta_list], dim=1)
    
    return params

### EM algorithm with optimization

In [100]:
def EM(init_steps, K, N_ls, n_ls):
    
    best_log_likelihood = float('-inf')
    
    for i in np.arange(init_steps):

        cond_step = True
        cond_params = True
        iter_step = 0
        log_likelihood = float('-inf')

        pi_list, theta_list = initialize_parameters()

        while cond_step and cond_params:

            # posterior probability tensor, Pzim = P(zi=m|ni,Theta_old)
            # shape (K,S), element_{m,i} = P(zi=m|ni,Theta_old)
            Posterior = calc_Posterior(N_ls, n_ls, pi_list, theta_list)

            # calculate the new pi_list and theta_list
            pi_list_new, theta_list_new = calc_params(N_ls, n_ls, Posterior)

            # calculate the new log_likelihood
            log_likelihood_new = calc_logL(N_ls, n_ls, pi_list_new, theta_list_new)

            # calculate the change of the log-likelihood and theta
            delta_log_likelihood = abs(log_likelihood_new - log_likelihood)
            delta_theta_list = abs(theta_list_new - theta_list)

            # update params 
            pi_list = pi_list_new
            theta_list = theta_list_new

            # update log_likelihood and theta
            log_likelihood = log_likelihood_new
            theta_list = theta_list_new

            # increase iter_step by 1
            iter_step += 1
            
            # update the conditions for the while loop
            # cond_params = (delta_params > epsilon)
            cond_params = (abs(delta_log_likelihood / log_likelihood) > tolerance_log_likelihood) or t.any(t.abs(delta_theta_list / theta_list) > tolerance_theta)
            cond_step = t.BoolTensor([iter_step < max_step])

            if log_likelihood_new > best_log_likelihood or best_log_likelihood == float('-inf'):
                best_log_likelihood = log_likelihood_new
                best_params = torch.stack([pi_list, theta_list], dim=1)
                best_Posterior = Posterior

    print(f"Initialization = {i+1}, iterations = {iter_step}")
    print(f"\ndelta_log_likelihood = {delta_log_likelihood}")
    print(f"log_likelihood ={log_likelihood}")
    params = torch.stack([pi_list, theta_list], dim=1)
    print(f"{params}")

    return best_Posterior, best_params

### Assign posteriors

In [77]:
def assign_posteriors(Posterior):

    # collapse probability densities
    if Posterior.mean() > 0.5:
        Posterior_assignments = Posterior < 0.5
    else:
        Posterior_assignments = Posterior > 0.5
        
    return Posterior_assignments

### Precision and recall

In [82]:
def precision_recall(Posterior_assignments, ground_truth):
    
    P = (ground_truth == True).sum()
    N = (ground_truth == False).sum()
    Err_rate = P/(P+N)

    print("Ground truth:\nP={0}\nN={1}\nError rate={2}\n".format(P, N, Err_rate))

    TP = ((Posterior_assignments == ground_truth) & (Posterior_assignments == True)).sum()
    TN = ((Posterior_assignments == ground_truth) & (Posterior_assignments == False)).sum()
    FP = ((Posterior_assignments != ground_truth) & (Posterior_assignments == True)).sum()
    FN = ((Posterior_assignments != ground_truth) & (Posterior_assignments == False)).sum()

    Precision = TP / (TP+FP)
    Recall = TP / (TP+FN)

    print("Results:\nTP={0}\nTN={1}\nFP={2}\nFN={3}\nPrecision={4}\nRecall={5}".format(TP, TN, FP, FN, Precision, Recall))

    Est_err_rate = (TP+FP)/(TP+TN+FP+FN)
    print("Estimated error rate={0}\nDiff={1}".format(Est_err_rate, abs(Est_err_rate-Err_rate)))

# Fitting the Data by a Binomial Mixture Model

The method would fit the parameters

$\Theta = \{ (\pi_k, \theta_k) | k=1, 2, ..., K\}$

We need to pre-set K. Here we set $K=2$.

In [105]:
import time
start_time = time.time()

init_steps = 5

tolerance_log_likelihood = 0.001 # tolerance for the change of the log-likelihood
tolerance_theta = 0.001 # tolerance for the change of theta
max_step = int(1e1) # maximum steps for iteration

K = 2
S = int(1e7)
k_len = 31
theta_err = 0.70 # probability of a kmer being absent if the base is incorrect
theta_cor = 0.001 # probability of a kmer being absent if the base is correct
pi_err = 0.0001 # genome per base error rate (QV)
training, validation = new_dataset(S, k_len, theta_err, theta_cor, pi_err)

print("\n===Training===\n")
N_ls, n_ls, ground_truth = training
Posterior, best_params = EM(init_steps, K, N_ls, n_ls)
Posterior_assignments = assign_posteriors(Posterior[0])
precision_recall(Posterior_assignments, ground_truth)

print("\n===Validation===\n")
N_ls, n_ls, ground_truth = validation
Posterior = calc_Posterior(N_ls, n_ls, *best_params)
Posterior_assignments = assign_posteriors(Posterior[0])
precision_recall(Posterior_assignments, ground_truth)

#plt.hist(Posterior[0])
#plt.show()

print(f"used {time.time()-start_time}")


===Training===

Initialization = 5, iterations = 2

delta_log_likelihood = 24.0
log_likelihood =-981272.0
tensor([[9.9990e-01, 9.9913e-04],
        [1.0214e-04, 6.9628e-01]])
Ground truth:
P=715
N=6999285
Error rate=0.0001021428543026559

Results:
TP=715
TN=6999285
FP=0
FN=0
Precision=1.0
Recall=1.0
Estimated error rate=0.0001021428543026559
Diff=0.0

===Validation===

Ground truth:
P=299
N=2999701
Error rate=9.966666402760893e-05

Results:
TP=299
TN=2999701
FP=0
FN=0
Precision=1.0
Recall=1.0
Estimated error rate=9.966666402760893e-05
Diff=0.0
used 14.157631874084473
