## PLSA

In [2]:
import re
import numpy as np
import pandas as pd
#import nltk
from pyspark import SparkContext
#from nltk.stem import SnowballStemmer
#from tokenize import tokenize
#from nltk.corpus import stopwords,words
from scipy.spatial.distance import cdist
import pronouncing
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, split, explode, udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql import functions as F
import pandas as pd

# First step
# Find all the candidate lines of lyrics that have the same rhyme and sentence structure with the given line
# Input: csv file
# Output: dataframe of all the possible candidate lines
# Using library pronouncing to handle with the rhyme
# Count the length of a sentence to make sure each line in rap is almost the same length

rowData = spark.read.csv("/FileStore/tables/lyrics.csv", inferSchema=True, header = True, multiLine=True)
rowData = rowData.filter(rowData.genre == 'Hip-Hop')
rowData.show(n=5)
data_array_lyrics = rowData.withColumn(
    "lyrics",
    split(col("lyrics"), "\n").cast(ArrayType(StringType())).alias("lyrics")
)
data_array_lyrics.show(n=5)
data_line = data_array_lyrics.withColumn("lyrics", explode(data_array_lyrics.lyrics))
data_line.show(n=5)
def phone_phrase(phrase):
    phone_consonant = ['B', 'CH', 'D', 'DH', 'F', 'G', 'HH', 'JH', 'K', 'L', 'M', 'N', 'P', 'R', 'S', 'SH', 'T', 'TH', 'V', 'W', 'Y', 'Z', 'ZH']
    s = filter(bool, re.split(r'\W+', phrase.lower()))
    phone = []
    for p in s:
    tmp = pronouncing.phones_for_word(p)
    if len(tmp) != 0:
        tmp_list = tmp[0].split()
        for x in tmp_list:
            if x not in phone_consonant:
                phone.append(x[:-1])
    if len(phone) >=3:
    res = phone[-3] + phone[-2] + phone[-1]
    else:
    res = ''
    return res
pp_udf = udf(phone_phrase, StringType())

data_phone = data_line.withColumn("phoneme", pp_udf(data_line['lyrics']))
data_phone.show(n=20)
def length_phrase(phrase):
    s = filter(bool, re.split(r'\W+', phrase.lower()))
    return len(s)
len_udf = udf(length_phrase, IntegerType())
data_phone_len = data_phone.withColumn('length', len_udf(data_phone['lyrics']))
data_phone_len.show(n=20)
data_phone_len.printSchema()

In [3]:
test_lyric = 'Today I would like show a freestyle rap lyrics'
pho, length = phone_phrase(test_lyric), length_phrase(test_lyric)
print pho, length
print type(pho)
data_final = data_phone_len.filter(data_phone_len.phoneme == pho).filter(data_phone_len.length<length+5).filter(data_phone_len.length>length-5)
data_final = data_final.select(data_final.lyrics).distinct()
data_final.show(n=100, truncate=False)

In [4]:
# Second step: preprocessing
# Given a set of candidate lines, preprocess those lines and do some feature extractin
# Input: dataframe of candidate lines
# Output: rdd of 2 grams


import findspark
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NLP_starter').getOrCreate()
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [5]:
df = data_final

In [6]:

# tokenizer = Tokenizer(inputCol='lyrics', outputCol='words')
count_words = udf(lambda words: len(words), IntegerType())
# tokenized_df = tokenizer.transform(df)

regex_tokenizer = RegexTokenizer(inputCol='lyrics', outputCol='words', pattern='\\W')
#regex_tokenizer.setMinTokenLength(4)
regex_df = regex_tokenizer.transform(df)
regex_tokenized_counts = regex_df.withColumn('freq', count_words('words'))
regex_tokenized_counts.show(truncate=False)

from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol='words', outputCol='tokens')
stopWords = ['a','an','the', 'is','are', 'for', 'hi', 'in', 'on','row','lyrics','u','t','s','re','i','m']
remover.setStopWords(stopWords)
tokens_filtered = remover.transform(regex_tokenized_counts)
cleanDF= tokens_filtered.withColumn('count_tokens', count_words('tokens'))

#tokens_filtered1 = remover.transform(newcleanDF)


from pyspark.ml.feature import NGram
ngram = NGram(n=2, inputCol='tokens', outputCol='2grams')
my_2ngrams =ngram.transform(cleanDF)
my_2ngrams.show()
my_2ngrams.select('2grams').show(truncate =False)

In [7]:
types = [f.dataType for f in my_2ngrams.schema.fields]
types

In [8]:
grams = my_2ngrams.select('2grams').collect()

In [9]:
gramlist = [list(g[0]) for g in grams]

In [10]:
gramrdd = sc.parallelize(gramlist)
gramrdd.collect()

In [11]:
# coding:utf8
from pyspark import SparkContext
from pyspark import RDD
import numpy as np
from numpy.random import RandomState

import sys
if sys.version[0] == '2':
    reload(sys)
    sys.setdefaultencoding("utf-8")



class PLSA:

    def __init__(self, data, sc, k, is_test=False, max_itr=1000, eta=1e-6):

        """
        init the algorithm

        :type data RDD
        :param data: document rdd
        :type max_itr int
        :param max_itr: maximum EM iter
        :type is_test bool
        :param is_test: test or not,if yes, rd = RandomState(1)，otherwise rd = RandomState()
        :type sc SparkContext
        :param sc: spark context
        :type k int
        :param k : number of theme
        :type eta float
        :param : threshold，when the changement of log likelyhood<eta, stop iteration
        :return : PLSA object
        """

        self.max_itr = max_itr
        self.k = sc.broadcast(k)
        self.ori_data = data#.map(lambda x: x.split(' '))
        self.data = data
        self.sc = sc
        self.eta = eta
        self.rd = sc.broadcast(RandomState(1) if is_test else RandomState())

    def train(self):
        #get the dictionary words
        self.word_dict_b = self._init_dict_()
        #transform the words in the documents into the indexes in the dictionary
        self._convert_docs_to_word_index()
        #initialization, the distribution under each theme
        self._init_probility_word_topic_()

        pre_l= self._log_likelyhood_()

        print("L(%d)=%.5f" %(0,pre_l))

        for i in range(self.max_itr):
            #update the posterior distribution
            self._E_step_()
            #maimize the lower bound
            self._M_step_()
            now_l = self._log_likelyhood_()

            improve = np.abs((pre_l-now_l)/pre_l)
            pre_l = now_l

            print("L(%d)=%.5f with %.6f%% improvement" %(i+1,now_l,improve*100))
            if improve <self.eta:
                break

    def _M_step_(self):
        """
        update: p(z=k|d),p(w|z=k)
        :return: None
        """
        k = self.k
        v = self.v

        def update_probility_of_doc_topic(doc):
            """
            update the distribution of the documents of the themes
            """
            topic_doc = doc['topic'] - doc['topic']
            words = doc['words']
            for (word_index,word) in words.items():
                topic_doc += word['count']*word['topic_word']
            topic_doc /= np.sum(topic_doc)

            return {'words':words,'topic':topic_doc}

        self.data = self.data.map(update_probility_of_doc_topic)
        
        self.data.cache()

        def update_probility_word_given_topic(doc):
            """
            up date the distribution of the words of the themes
            """
            probility_word_given_topic = np.matrix(np.zeros((k.value,v.value)))

            words = doc['words']
            for (word_index,word) in words.items():
                probility_word_given_topic[:,word_index] += np.matrix(word['count']*word['topic_word']).T

            return probility_word_given_topic

        probility_word_given_topic = self.data.map(update_probility_word_given_topic).sum()
        probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1))

        #normalization
        probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)

        self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)

    def _E_step_(self):
        """
        update the latent viariable:  p(z|w,d)
        :return: None
        """
        probility_word_given_topic = self.probility_word_given_topic
        k = self.k

        def update_probility_of_word_topic_given_word(doc):
            topic_doc = doc['topic']
            words = doc['words']

            for (word_index,word) in words.items():
                topic_word = word['topic_word']
                for i in range(k.value):
                    topic_word[i] = probility_word_given_topic.value[i,word_index]*topic_doc[i]
                #normalization
                topic_word /= np.sum(topic_word)
                word['topic_word'] = topic_word # added
            return {'words':words,'topic':topic_doc}

        self.data = self.data.map(update_probility_of_word_topic_given_word)

    def  _init_probility_word_topic_(self):
        """
        init p(w|z=k)
        :return: None
        """
        #dict length(words in dict)
        m = self.v.value

        probility_word_given_topic = self.rd.value.uniform(0,1,(self.k.value,m))
        probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1)).T

        #normalization
        probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)

        self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)

    def _convert_docs_to_word_index(self):

        word_dict_b = self.word_dict_b
        k = self.k
        rd = self.rd
        '''
        I wonder is there a better way to execute function with broadcast varible
        '''
        def _word_count_doc_(doc):
            print(doc)
            wordcount ={}
            word_dict = word_dict_b.value
            for word in doc:
                if word_dict[word] in wordcount:
                    wordcount[word_dict[word]]['count'] += 1
                else:
                    #first one is the number of word occurance
                    #second one is p(z=k|w,d)
                    wordcount[word_dict[word]] = {'count':1,'topic_word': rd.value.uniform(0,1,k.value)}

            topics = rd.value.uniform(0, 1, k.value)
            topics = topics/np.sum(topics)
            return {'words':wordcount,'topic':topics}
        self.data = self.ori_data.map(_word_count_doc_)

    def _init_dict_(self):
        """
        init word dict of the documents,
        and broadcast it
        :return: None
        """
        words = self.ori_data.flatMap(lambda d: d).distinct().collect()
        word_dict = {w: i for w, i in zip(words, range(len(words)))}
        self.v = self.sc.broadcast(len(word_dict))
        return self.sc.broadcast(word_dict)

    def _log_likelyhood_(self):
        
        probility_word_given_topic = self.probility_word_given_topic
        k = self.k
        def likelyhood(doc):
            print("succ")
            l = 0.0
            topic_doc = doc['topic']
            words = doc['words']
            for (word_index,word) in words.items():
                print(word)
                l += word['count']*np.log(np.matrix(topic_doc)*probility_word_given_topic.value[:,word_index])
            return l
        return self.data.map(likelyhood).sum()



    def save(self):
        """
        save the result of the model TODO 
        :param f_word_given_topic: distribution of words given the topic
        :param f_doc_topic:  distribution of topic given the documents
        :return:
        """
        doc_topic = self.data.map(lambda x:' '.join([str(q) for q in x['topic'].tolist()])).collect()
        probility_word_given_topic = self.probility_word_given_topic.value

        word_dict = self.word_dict_b.value
        word_given_topic = []

        for w,i in word_dict.items():
            word_given_topic.append('%s %s' %(w,' '.join([str(q[0]) for q in probility_word_given_topic[:,i].tolist()])))
        return word_given_topic, doc_topic
    

In [15]:
plsa = PLSA(data=gramrdd,sc=sc,k=5,max_itr=10,is_test=True)

In [16]:
plsa.train()

In [17]:
word_given_topic, topic_given_doc = plsa.save()

In [18]:
topic_word = pd.DataFrame([sub.split(" ") for sub in word_given_topic])
topic_word_1 = topic_word[topic_word.columns[:6]]
topic_word_1 = topic_word_1.set_index(0).astype('float')

In [19]:
doc_topic = pd.DataFrame([sub.split(" ") for sub in topic_given_doc])
doc_topic_1 = doc_topic[doc_topic.columns[:6]]
doc_topic_1 = doc_topic_1.astype('float')

In [20]:
doc_topic_1.head(10)

In [25]:
topic_word.sort_values(topic_word.columns[5],ascending=False)

In [26]:
topic_given_doc_1 = [x.split(" ") for x in topic_given_doc]
topic_given_doc_2 = [[float(y) for y in x]for x in topic_given_doc_1]

In [27]:
from scipy.spatial import distance
some_pt = topic_given_doc_2[0]
min_index = distance.cdist([some_pt], topic_given_doc_2)[0].argsort()


In [28]:
print(min_index[1:6])

In [29]:
dfl = df.collect()

In [30]:
print test_lyric
for i in min_index[1:6]:
    print(dfl[i])