## HW6.7  Implement Bernoulli Mixture Model via EM
Implement the EM clustering algorithm to determine Bernoulli Mixture Model for discrete data in MRJob.

As a unit test use the dataset in the following slides:

https://www.dropbox.com/s/maoj9jidxj1xf5l/MIDS-Live-Lecture-06-EM-Bernouilli-MM-Systems-Test.pdf?dl=0

Cross-check that you get the same cluster assignments and cluster Bernouilli models as presented in the slides after 25 iterations. Dont forget the smoothing.

As a full test: use the same dataset from HW 4.5, the Tweet Dataset. 
Using this data, you will implement a 1000-dimensional EM-based Bernoulli Mixture Model  algorithm in MrJob on the users
by their 1000-dimensional word stripes/vectors using K = 4.  Use the same smoothing as in the unit test.

Repeat this experiment using your KMeans MRJob implementation fron HW4.
Report the rand index score using the class code as ground truth label for both algorithms and comment on your findings.

Here is some more information on the Tweet Dataset.

Here you will use a different dataset consisting of word-frequency distributions 
for 1,000 Twitter users. These Twitter users use language in very different ways,
and were classified by hand according to the criteria:

0: Human, where only basic human-human communication is observed.

1: Cyborg, where language is primarily borrowed from other sources
(e.g., jobs listings, classifieds postings, advertisements, etc...).

2: Robot, where language is formulaically derived from unrelated sources
(e.g., weather/seismology, police/fire event logs, etc...).

3: Spammer, where language is replicated to high multiplicity
(e.g., celebrity obsessions, personal promotion, etc... )

Check out the preprints of  recent research,
which spawned this dataset:

http://arxiv.org/abs/1505.04342
http://arxiv.org/abs/1508.01843

The main data lie in the accompanying file:

[topUsers_Apr-Jul_2014_1000-words.txt](#https://www.dropbox.com/s/6129k2urvbvobkr/topUsers_Apr-Jul_2014_1000-words.txt?dl=0)

and are of the form:

USERID,CODE,TOTAL,WORD1_COUNT,WORD2_COUNT,...
.
.

where

USERID = unique user identifier
CODE = 0/1/2/3 class code
TOTAL = sum of the word counts

Using this data, you will implement a 1000-dimensional K-means algorithm in MrJob on the users
by their 1000-dimensional word stripes/vectors using several 
centroid initializations and values of K.

### Complete likelihood

Let $N$ be the number of documents of $M$ unique terms of $x_{n,m} = (x_{1,1}\ldots,x_{N,M})$ binary features denoting whether a word is present in a document, that is $x_{n,m} \in {0,1}$. Moreover, let $\mu_{k,m}$ denote the probability that word $x_{n,m}$ is in cluster $k$ ,$k \in (1,\ldots,K$), and $\pi_k$, $\sum_k \pi = 1$, the proportion of documents in cluster $k$.

Hence, the probability of a word belonging to cluster $k$ can be written as

\begin{equation}
  \text{Pr}(x_{n,m} \mid \mu_{k,m}) = \mu_{k,m}^{x_{n,m}}\left(1-\mu_{k,m}\right)^{1-x_{n,m}}
\end{equation}

and the joint probability of all words being in cluster $k$ is (by assuming independence between their presences)

\begin{equation}
   \text{Pr}(\mathbf{x} \mid \mathbf{\mu}_{k}) = \prod_{m = 1}^M \mu_{k,m}^{x_{n,m}}\left(1-\mu_{k,m}\right)^{1-x_{n,m}}\,.
\end{equation}


The documents are assigned to clusters through the set of words that they contain, that is $x_n \in d$ for document $d$. $d$ However, the assignment so far has been probabilistic, and is performed through a latent vector of $K$-dimensional binary representations $\mathbf{r}_n$ which equal $1$ if document is in cluster $k$, else 0. Let the set of $N$ representations denoted by $\mathbf{R}$, then 

\begin{equation}
   \text{Pr}(\mathbf{R} \mid \mathbf{\pi}) = \prod_{n=1}^N \prod_{k=1}^{K} \pi_{k}^{r_{n,k}}
\end{equation}

For the set of all documents $\mathbf{D} = (d_1,\ldots,d_N)$, the likelihood that the distribution of features across all documents were generated by the representations, the probability that a given word is in a given cluster and the proportion of documents in clusters,

\begin{equation}
  \text{Pr}(\mathbf{D} \mid \mathbf{R},\mathbf{\mu},\mathbf{\pi}) = \prod_{n=1}^N \prod_{k=1}^K \left(\prod_{m=1}^M \mu_{k,m}^{x_{n,m}}(1-\mu_{k,m})^{1-x_{n,m}}\right)^{r_{n,k}}
\end{equation}

and the complete data log-likelihood, that is that the observed distribution of words across documents and their representations are generated by $(\mathbf{\mu},\mathbf{\pi})$:

\begin{equation}
  \text{Pr}(\mathbf{D}, \mathbf{R} \mid \mathbf{\mu},\mathbf{\pi}) = \text{Pr}(\mathbf{D} \mid \mathbf{R},\mathbf{\mu},\mathbf{\pi}) \text{Pr}(\mathbf{R} \mid \mathbf{\pi}) = \prod_{n=1}^N \prod_{k=1}^K \left(\pi_k\prod_{m=1}^M \mu_{k,m}^{x_{n,m}}(1-\mu_{k,m})^{1-x_{n,m}}\right)^{r_{n,k}}\,.
\end{equation}

### EM algorithm: E step

However, $\mathbf{R}$ is unobserved but can be approximated as its expectation,

\begin{equation}
    \text{E}[r_{n,k}] = \sum_{n=1}^N \text{Pr}(r_{n,k} \mid \mathbf{x}_n, \mathbf{\mu}, \mathbf{\pi})r_{n,k} = \frac{\pi_k \text{Pr}(\mathbf{x}_n \mid \mathbf{\mu}_k)}{\sum_{\kappa=1}^K\pi_\kappa \text{Pr}(\mathbf{x}_n \mid \mathbf{\mu}_\kappa)} = \frac{\pi_k \prod_{m = 1}^M \mu_{k,m}^{x_{n,m}}\left(1-\mu_{k,m}\right)^{1-x_{n,m}}}{\sum_{\kappa =1}^K \pi_\kappa \prod_{m = 1}^M \mu_{\kappa,m}^{x_{n,m}}\left(1-\mu_{\kappa,n}\right)^{1-x_{n,m}}}
\end{equation} 

### EM algorithm: M step

Taking the natural logarithm of $\mathcal{L}$ and substituting $\text{E}[r_{n,k}]$ for $r_{n,k}$ yield

\begin{equation}
  \mathcal{l} = \sum_{n=1}^N\sum_{k=1}^K \text{E}[r_{n,k}]\left(\log \pi_k + \sum_{m=1}^M x_{n,m}\log \mu_{k,m}+(1-x_{n,m})\log (1-\mu_{k,m})\right)\,.
\end{equation}

Taking the partial derivative of $l$ by $\mathbf{\mu}$ and equating it to 0 gives after some arithmetic

\begin{equation}
  \mu_{k,m}^{max} = \frac{\sum_{n=1}^N \mathbf{x}_n\text{E}[r_{n,k}]}{\sum_{n=1}^N \text{E}[r_{n,k}]}\,.
\end{equation}

To calculate $\pi_k^{max}$, we can use Lagrange-multipliers because of the $\sum_k \pi_k = 1$ constraint, after derivating the Lagrangian by $\pi_k$ and the introduced multiplier and solving the systems of equations, we get

\begin{equation}
  \pi_k^{max} = \frac{\sum_{n=1}^N \text{E}[r_{n,k}]}{N} 
\end{equation}



## Unit test

In [1]:
documents = [
"hot chocolate cocoa beans",
"cocoa ghana africa",
"beans harvest ghana",
"cocoa butter",
"butter truffles",
"sweet chocolate",
"sweet sugar",
"sugar cane brazil",
"sweet sugar beet",
"sweet cake icing",
"cake black frost"
]

init_rs = [
    "None None",
    "None None",
    "None None",
    "None None",
    "None None",
    "1     0",
    "0     1",
    "None None",
    "None None",
    "None None",
    "None None"
]



We have to either append a key column or add $r_{nk}$ initialization to the data file or use a single mapper. Otherwise, it is difficult to ensure that the order of rows of the $r_{nk}$ matrix will be the same as the mapped data with multiple mappers.

In [None]:
with open('unit_test_67.txt','w') as outfile:
    for document, init in zip(documents, init_rs):
        outfile.write("{0},{1}\n".format(document,init))

#### Initialize

In [None]:
%%writefile bernoulli_init.py
#!/usr/bin/env python

from mrjob.job import MRJob
from mrjob.step import MRStep
import numpy as np


class MRBernoulliMixtureModelInit(MRJob):
        

    def __init__(self, *args, **kwargs):
        super(MRBernoulliMixtureModelInit, self).__init__(*args, **kwargs)
        self.pi = [1.0/self.options.k]*self.options.k
        self.doc_counter = 0
        # write an empty file to disk to fill up later with vocabulary entries
        open('vocabulary.txt','w').close()

    def configure_options(self):
        super(MRBernoulliMixtureModelInit,self).configure_options()
        self.add_passthrough_option('--smooth', type = 'float', default = 0)
        self.add_passthrough_option('--k', type='int', default=2)

    def steps(self):

        # 1st job: build binary feature represenations
        # 2nd job: run E-M algorithm
        return [
            MRStep(
                mapper_init = self.set_vocabulary,
                mapper = self.get_vocabulary,
                mapper_final = self.save_vocabulary,
                reducer_init = self.read_vocabulary,
                reducer = self.binarize_doc_features,
                reducer_final = self.save_binary_features
            ),
            MRStep(
                reducer_init = self.store_reducer_parameters,
                reducer = self.M_aggregate,
                reducer_final = self.M_calculate
            )
        ]

    # ----------------------------------- helper functions -------------------------------- #

    def maximize_mu_m(self,r_nk,x_nm):
        # weighted mean of number of documents in cluster k: column sum of r_nk*x_n
        weighted_mean_num_docs = np.dot(r_nk,x_nm)+self.options.smooth
        # effective number of documents in cluster k: column sum of r_nk
        num_docs = np.sum(r_nk,axis=0)+self.options.smooth*2
        return weighted_mean_num_docs/num_docs

    def maximize_pi(self,r_nk):
        # number of documents
        N = r_nk.shape[0]
        # effective number of documents in cluster k: column sum of r_nk
        num_docs = np.sum(r_nk, axis=0)
        return num_docs/N

    # --------------------------------------- MR steps ------------------------------------ #

    def set_vocabulary(self):
        self.vocabulary = []
        self.num_words_voc = 0


    def vocabulary_builder(self,word):
        if word not in self.vocabulary:
            self.vocabulary.append(word)


    def get_vocabulary(self, _, value):
        word_list, r_nks = value.split(",")
        words = word_list.split()
        for word in words:
            self.vocabulary_builder(word)
        yield np.random.uniform(size=1), (words, r_nks)


    def save_vocabulary(self):
        with open('/home/cloudera/PycharmProjects/HW6/vocabulary.txt','a') as outfile:
            for item in self.vocabulary:
                outfile.write("%s\n" % item)

    def read_vocabulary(self):
        self.vocabulary = list(set(line.strip() for line in open('/home/cloudera/PycharmProjects/HW6/vocabulary.txt','r')))
        self.bin_feat = [None]*len(self.vocabulary)
            
    def binarize_doc_features(self, key, values):
        for value in values:
            features, r_nks = value
            # build feature representations
            binary_features = [0]*len(self.vocabulary)
            insert_index = [self.vocabulary.index(str(word)) for word in features]
            for index in insert_index:
                binary_features[index] = 1
            # pass to self.save binary features()     
            self.bin_feat = np.vstack((self.bin_feat,binary_features))
            yield key, (binary_features, r_nks)
   
    def save_binary_features(self):        
        with open('/home/cloudera/PycharmProjects/HW6/binary_features.txt','a') as outfile:
            for line in self.bin_feat:
                if line[0] is not None:
                    outfile.write("%s\n" % line)


    def store_reducer_parameters(self):
        self.vocabulary = list(set(line.strip() for line in open('/home/cloudera/PycharmProjects/HW6/vocabulary.txt', 'r')))
        self.vocabulary_size = len(self.vocabulary)
        self.r_nk = []
        self.x_nm = [None]*self.vocabulary_size

    def M_aggregate(self, _,values):
        for value in values:
            x_nms, r_nks = value
            r_nks = r_nks.split()
            r_nks = [float(r) for r in r_nks if r != 'None']
            # emit each cluster separately
            for k in range(len(r_nks)):
#                 # extract r_nk and x_nm
                 self.r_nk.append(r_nks[k])
                 self.x_nm = np.vstack((self.x_nm, np.array(x_nms)))
        
    def M_calculate(self):
        # delete the first line of None's which was created as a placeholder
        self.x_nm = np.delete(self.x_nm,0,0)
        # maximize mu and pi
        mu = self.maximize_mu_m(np.array(self.r_nk),self.x_nm)
        pi = self.maximize_pi(np.array(self.r_nk))
        yield None,(pi,mu)
        

if __name__ == '__main__':
    MRBernoulliMixtureModelInit.run()

In [None]:
!rm /home/cloudera/PycharmProjects/HW6/vocabulary.txt
!rm /home/cloudera/PycharmProjects/HW6/binary_features.txt
!python bernoulli_init.py unit_test_67.txt > 'bmm_it.txt'
!cat 'bmm_it.txt'

In [None]:
%%writefile bernoulli_update.py
#!/home/cloudera/anaconda2/bin/python

from mrjob.job import MRJob
from mrjob.step import MRStep
import numpy as np
import math


class MRBernoulliMixtureModelUpdate(MRJob):
        

    def __init__(self, *args, **kwargs):
        super(MRBernoulliMixtureModelUpdate, self).__init__(*args, **kwargs)
        self.pi = [1.0/self.options.k]*self.options.k
        self.doc_counter = 0
        # write an empty file to disk to fill up later with vocabulary entries
        open('vocabulary.txt','w').close()

    def configure_options(self):
        super(MRBernoulliMixtureModelUpdate,self).configure_options()
        self.add_passthrough_option('--smooth', type = 'float', default = 0)
        self.add_passthrough_option('--k', type='int', default=2)

    def steps(self):

        return [
            MRStep(
                mapper = self.load_binary_features,
                ),
            MRStep(
                mapper_init = self.read_vocabulary,
                mapper = self.E,
                reducer_init = self.store_reducer_parameters,
                reducer = self.M_aggregate,
                reducer_final = self.M_calculate,
#                jobconf = {'mapred.map.tasks':'1'},
            )
        ]

    # ----------------------------------- helper functions -------------------------------- #

    def log_prob_all_word_cluster_k(self,x_n,mu_k):
        '''Calculates the probability that all binarized representations are in cluster k.

        Parameters:
        -----------
        x_n = a list of binary representaitons of words in document n (length M)
        mu_k = list of probabilities of mth words in cluster k (length M)
        '''
        
        PR = 0
        try:
            PR += x_n*math.log(mu_k)  + (1-x_n)*math.log(1-mu_k)
        except ValueError:
            PR += x_n*math.log(mu_k+1e-10) + (1 - x_n)*math.log(1-mu_k+1e-10)
        return PR
    

    def maximize_mu_m(self,r_nk,x_nm):
        # weighted mean of number of documents in cluster k: column sum of r_nk*x_n
        weighted_mean_num_docs = np.dot(x_nm.T,r_nk)+self.options.smooth
        # effective number of documents in cluster k: column sum of r_nk
        num_docs = np.sum(r_nk,axis=0)+self.options.smooth*2
        return weighted_mean_num_docs/num_docs

    def maximize_pi(self,r_nk):
        # number of documents
        N = r_nk.shape[0]
        # effective number of documents in cluster k: column sum of r_nk
        num_docs = np.sum(r_nk, axis=0)
        return num_docs/N

    # --------------------------------------- MR steps ------------------------------------ #
    
    def load_binary_features(self, _, x_ns):
        
        x_n = x_ns.strip("[,] ")
        x_n = [int(x) for x in x_n if x != " "]
        yield None, x_n

    def read_vocabulary(self):
        self.vocabulary = list(set(line.strip() for line in open('/home/cloudera/PycharmProjects/HW6/vocabulary.txt','r')))
        self.pars = [line.strip().split() for line in open('/home/cloudera/W261/Spring2017/HW6/bmm_it.txt','r')]
        self.pi = []
        self.mu = [None]*len(self.vocabulary)
        for pars in self.pars:
            pars = eval(pars[1])
            self.pi.append(float(pars[0]))
            foo = [float(mu) for mu in pars[1]]
            self.mu = np.vstack((self.mu,pars[1]))
        self.mu = np.delete(self.mu,0,0)
                
            
    def E(self, _, x_n):
        numerator = []
        for k in range(len(self.pi)):
            mu_k = self.mu[k,]
            pi_k = self.pi[k]
            # calculate the numerator of E[r_{n,k}] for each k
            PR = 0
            for x_nm, mu_km in zip(x_n, mu_k):
                PR += self.log_prob_all_word_cluster_k(x_nm,mu_km)
            numerator.append(pi_k*np.exp(PR))
        denominator = np.sum(numerator)
        r_nk = numerator/denominator
        yield _, (x_n, r_nk)
   

    def store_reducer_parameters(self):
        self.i = 0
        self.vocabulary = list(set(line.strip() for line in open('/home/cloudera/PycharmProjects/HW6/vocabulary.txt', 'r')))
        self.vocabulary_size = len(self.vocabulary)
        self.r_nk = [None]*self.options.k
        self.x_nm = [None]*self.vocabulary_size

    def M_aggregate(self, _,values):
        for value in values:
            x_nms, r_nks = value
            r_nks = [float(r) for r in r_nks if r != 'None']
            self.r_nk = np.vstack((self.r_nk,np.array(r_nks)))
            self.x_nm = np.vstack((self.x_nm, np.array(x_nms)))
                
#            yield self.r_nk, self.x_nm
        
    def M_calculate(self):
        # delete the first line of None's which was created as a placeholder
        self.x_nm = np.delete(self.x_nm,0,0)
        self.r_nk = np.delete(self.r_nk,0,0)
        # maximize mu and pi
        mu = self.maximize_mu_m(np.array(self.r_nk),self.x_nm)
        pi = self.maximize_pi(np.array(self.r_nk))
        for p, m in zip(pi, mu.T):
            yield None, (p,m)
        

if __name__ == '__main__':
    MRBernoulliMixtureModelUpdate.run()

In [None]:
!python bernoulli_update.py /home/cloudera/PycharmProjects/HW6/binary_features.txt --k 2 --smooth 0.0001  > 'bmm_it_temp.txt'
!mv bmm_it_temp.txt bmm_it.txt
!cat bmm_it.txt

In [None]:
%%writefile bmm_driver.sh

#!/bin/bash

rm /home/cloudera/PycharmProjects/HW6/vocabulary.txt
rm /home/cloudera/PycharmProjects/HW6/binary_features.txt

# script creates 'vocabulary.txt', 'binary_features.txt' and saves the optimal parameters in 'bmm_it.txt'
python bernoulli_init.py unit_test_67.txt -q > 'bmm_it.txt'


for i in {0..24}
    do
       echo "iteration: $i"
       time python bernoulli_update.py /home/cloudera/PycharmProjects/HW6/binary_features.txt \
       --k 2 --smooth 0.0001 -q > 'bmm_it_temp.txt'
       mv bmm_it_temp.txt bmm_it.txt 
       echo '
        ready
        '
done
times

In [None]:
!chmod a+x bmm_driver.sh
!./bmm_driver.sh

In [None]:
import numpy as np

words = ['ghana', 'butter',  'cane',  'icing', 'truffles', 'sweet', 'frost', 'brazil', 'africa', 'chocolate', 'cocoa',  
 'hot', 'beans', 'cake', 'beet', 'sugar', 'harvest', 'black']
pi = []
mu = [None]*len(words)

with open('bmm_it.txt','r') as infile:
    for line in infile:
        pars = eval(line.split()[1])
        pi.append(round(pars[0],2))
        foo = [round(m,3) for m in pars[1]]
        mu = np.vstack((mu,foo))

for i in range(mu.shape[1]): 
    if i == 0:
        print '{word:10} | {p1:5} | {p2:5} |'.format(word='prop.',p1=pi[0],p2=pi[1])  
        print '-'*30
    print '{word:10} | {k1:5} | {k2:5} |'.format(word=words[i],k1=mu[1:,i][0],k2=mu[2:,i][0])