## PLSA

In [1]:
import re
import numpy as np
import pandas as pd
import nltk
from pyspark import SparkContext
from nltk.stem import SnowballStemmer
from tokenize import tokenize
from nltk.corpus import stopwords,words
from scipy.spatial.distance import cdist


In [23]:
stemmer = SnowballStemmer('english')
def token_processor(token):
    return stemmer.stem(token)
wordss = words.words()

def closest_node(node, nodes):
    return nodes[cdist([node], nodes).argmin()]

In [4]:
# coding:utf8
from pyspark import SparkContext
from pyspark import RDD
import numpy as np
from numpy.random import RandomState

import sys
if sys.version[0] == '2':
    reload(sys)
    sys.setdefaultencoding("utf-8")



class PLSA:

    def __init__(self, data, sc, k, is_test=False, max_itr=1000, eta=1e-6):

        """
        init the algorithm

        :type data RDD
        :param data: document rdd
        :type max_itr int
        :param max_itr: maximum EM iter
        :type is_test bool
        :param is_test: test or not,if yes, rd = RandomState(1)，otherwise rd = RandomState()
        :type sc SparkContext
        :param sc: spark context
        :type k int
        :param k : number of theme
        :type eta float
        :param : threshold，when the changement of log likelyhood<eta, stop iteration
        :return : PLSA object
        """

        self.max_itr = max_itr
        self.k = sc.broadcast(k)
        self.ori_data = data#.map(lambda x: x.split(' '))
        self.data = data
        self.sc = sc
        self.eta = eta
        self.rd = sc.broadcast(RandomState(1) if is_test else RandomState())

    def train(self):
        #get the dictionary words
        self.word_dict_b = self._init_dict_()
        #transform the words in the documents into the indexes in the dictionary
        self._convert_docs_to_word_index()
        #initialization, the distribution under each theme
        self._init_probility_word_topic_()

        pre_l= self._log_likelyhood_()

        print("L(%d)=%.5f" %(0,pre_l))

        for i in range(self.max_itr):
            #update the posterior distribution
            self._E_step_()
            #maimize the lower bound
            self._M_step_()
            now_l = self._log_likelyhood_()

            improve = np.abs((pre_l-now_l)/pre_l)
            pre_l = now_l

            print("L(%d)=%.5f with %.6f%% improvement" %(i+1,now_l,improve*100))
            if improve <self.eta:
                break

    def _M_step_(self):
        """
        update: p(z=k|d),p(w|z=k)
        :return: None
        """
        k = self.k
        v = self.v

        def update_probility_of_doc_topic(doc):
            """
            update the distribution of the documents of the themes
            """
            topic_doc = doc['topic'] - doc['topic']
            words = doc['words']
            for (word_index,word) in words.items():
                topic_doc += word['count']*word['topic_word']
            topic_doc /= np.sum(topic_doc)

            return {'words':words,'topic':topic_doc}

        self.data = self.data.map(update_probility_of_doc_topic)
        
        self.data.cache()

        def update_probility_word_given_topic(doc):
            """
            up date the distribution of the words of the themes
            """
            probility_word_given_topic = np.matrix(np.zeros((k.value,v.value)))

            words = doc['words']
            for (word_index,word) in words.items():
                probility_word_given_topic[:,word_index] += np.matrix(word['count']*word['topic_word']).T

            return probility_word_given_topic

        probility_word_given_topic = self.data.map(update_probility_word_given_topic).sum()
        probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1))

        #normalization
        probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)

        self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)

    def _E_step_(self):
        """
        update the latent viariable:  p(z|w,d)
        :return: None
        """
        probility_word_given_topic = self.probility_word_given_topic
        k = self.k

        def update_probility_of_word_topic_given_word(doc):
            topic_doc = doc['topic']
            words = doc['words']

            for (word_index,word) in words.items():
                topic_word = word['topic_word']
                for i in range(k.value):
                    topic_word[i] = probility_word_given_topic.value[i,word_index]*topic_doc[i]
                #normalization
                topic_word /= np.sum(topic_word)
                word['topic_word'] = topic_word # added
            return {'words':words,'topic':topic_doc}

        self.data = self.data.map(update_probility_of_word_topic_given_word)

    def  _init_probility_word_topic_(self):
        """
        init p(w|z=k)
        :return: None
        """
        #dict length(words in dict)
        m = self.v.value

        probility_word_given_topic = self.rd.value.uniform(0,1,(self.k.value,m))
        probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1)).T

        #normalization
        probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)

        self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)

    def _convert_docs_to_word_index(self):

        word_dict_b = self.word_dict_b
        k = self.k
        rd = self.rd
        '''
        I wonder is there a better way to execute function with broadcast varible
        '''
        def _word_count_doc_(doc):
            print(doc)
            wordcount ={}
            word_dict = word_dict_b.value
            for word in doc:
                if word_dict[word] in wordcount:
                    wordcount[word_dict[word]]['count'] += 1
                else:
                    #first one is the number of word occurance
                    #second one is p(z=k|w,d)
                    wordcount[word_dict[word]] = {'count':1,'topic_word': rd.value.uniform(0,1,k.value)}

            topics = rd.value.uniform(0, 1, k.value)
            topics = topics/np.sum(topics)
            return {'words':wordcount,'topic':topics}
        self.data = self.ori_data.map(_word_count_doc_)

    def _init_dict_(self):
        """
        init word dict of the documents,
        and broadcast it
        :return: None
        """
        words = self.ori_data.flatMap(lambda d: d).distinct().collect()
        word_dict = {w: i for w, i in zip(words, range(len(words)))}
        self.v = self.sc.broadcast(len(word_dict))
        return self.sc.broadcast(word_dict)

    def _log_likelyhood_(self):
        
        probility_word_given_topic = self.probility_word_given_topic
        k = self.k
        def likelyhood(doc):
            print("succ")
            l = 0.0
            topic_doc = doc['topic']
            words = doc['words']
            for (word_index,word) in words.items():
                print(word)
                l += word['count']*np.log(np.matrix(topic_doc)*probility_word_given_topic.value[:,word_index])
            return l
        return self.data.map(likelyhood).sum()



    def save(self):
        """
        save the result of the model TODO 
        :param f_word_given_topic: distribution of words given the topic
        :param f_doc_topic:  distribution of topic given the documents
        :return:
        """
        doc_topic = self.data.map(lambda x:' '.join([str(q) for q in x['topic'].tolist()])).collect()
        probility_word_given_topic = self.probility_word_given_topic.value

        word_dict = self.word_dict_b.value
        word_given_topic = []

        for w,i in word_dict.items():
            word_given_topic.append('%s %s' %(w,' '.join([str(q[0]) for q in probility_word_given_topic[:,i].tolist()])))
        return word_given_topic, doc_topic
    

In [5]:
# sc = SparkContext()

In [6]:
#data = sc.textFile("/Users/panxiao/IdeaProjects/lyric_project/input/lyrics.csv")
# data = sc.textFile("../lyr.txt")
# data1 = sc.parallelize(data.take(50))

In [8]:
# data2 = data1.map(lambda x: re.sub(r"\W+"," ", str(x).lower()))
# data3 = data2.map(lambda x: x.split(' '))
# data4 = data3.map(lambda l: [x for x in l if x and x in wordss and x not in stopwords.words("english")])
# data5 = data4.map(lambda l: " ".join(x for x in l))

In [11]:
plsa = PLSA(data=data4,sc=sc,k=5,max_itr=10,is_test=True)

In [13]:
plsa.train()

L(0)=-47322.77605
L(1)=-39816.96600 with 15.860883% improvement
L(2)=-39199.63764 with 1.550415% improvement
L(3)=-38320.42535 with 2.242909% improvement
L(4)=-37257.51865 with 2.773734% improvement
L(5)=-36328.59207 with 2.493259% improvement
L(6)=-35716.79729 with 1.684059% improvement
L(7)=-35355.76814 with 1.010811% improvement
L(8)=-35130.30709 with 0.637692% improvement
L(9)=-34976.52296 with 0.437753% improvement
L(10)=-34864.82935 with 0.319339% improvement


In [31]:
word_given_topic, topic_given_doc = plsa.save()

In [14]:
topic_word = pd.DataFrame([sub.split(" ") for sub in word_given_topic])
topic_word_1 = topic_word[topic_word.columns[:6]]
topic_word_1 = topic_word_1.set_index(0).astype('float')

In [19]:
doc_topic = pd.DataFrame([sub.split(" ") for sub in topic_given_doc])
doc_topic_1 = doc_topic[doc_topic.columns[:6]]
doc_topic_1 = doc_topic_1.astype('float')

In [22]:
doc_topic_1.head(10)

Unnamed: 0,0,1,2,3,4
0,0.8552794,0.134975,0.001326,0.007109,0.00131
1,0.2457515,0.174097,0.000107,0.011834,0.568211
2,0.003035068,0.228032,4.2e-05,0.707619,0.061272
3,0.02440765,0.094287,0.711033,0.167938,0.002334
4,4.117353e-07,7e-06,3e-06,2.8e-05,0.999961
5,5.038094e-05,0.000199,0.006569,0.057533,0.935648
6,0.0006357306,0.014669,0.067429,0.012144,0.905123
7,1.373357e-05,0.009437,0.96859,0.003125,0.018834
8,0.005805301,0.020588,0.007185,0.957579,0.008844
9,0.858065,0.007072,4.6e-05,0.13434,0.000476


In [None]:
topic_word_1.sort_values(topic_word_1.columns[4],ascending=False)

In [None]:
topic_word.sort_values(topic_word.columns[2],ascending=False)

In [None]:
topic_word.sort_values(topic_word.columns[3],ascending=False)

In [None]:
topic_word.sort_values(topic_word.columns[4],ascending=False)

In [None]:
topic_word.sort_values(topic_word.columns[5],ascending=False)

In [60]:
topic_given_doc_1 = [x.split(" ") for x in topic_given_doc]
topic_given_doc_2 = [[float(y) for y in x]for x in topic_given_doc_1]

In [83]:
from scipy.spatial import distance

some_pt = topic_given_doc_2[0]
min_index = distance.cdist([some_pt], topic_given_doc_2)[0].argsort()
#topic_given_doc_2[min_index]

In [84]:
print(min_index)

[ 0 21  9 32 11 28 10 44 46 33 25 36  1 26 12 35  3 34  2 16 29 23 48 30 18
 38 45 37 17 43  6 40 14  5 42  8 15 41 39  7 13 27 19 20 22 47 49  4 31 24]


In [63]:
print(len(topic_given_doc_2))

50
