In [23]:
from pyspark import SparkContext

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext

from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

sc

In [2]:
dataset = spark.read.format("libsvm").load("./test.txt")

dataset.show(5, truncate=False)

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
# transformed.show(truncate=False)

+-----+---------------------------------------------------------+
|label|features                                                 |
+-----+---------------------------------------------------------+
|0.0  |(11,[0,1,2,4,5,6,7,10],[1.0,2.0,6.0,2.0,3.0,1.0,1.0,3.0])|
|1.0  |(11,[0,1,3,4,7,10],[1.0,3.0,1.0,3.0,2.0,1.0])            |
|2.0  |(11,[0,1,2,5,6,8,9],[1.0,4.0,1.0,4.0,9.0,1.0,2.0])       |
|3.0  |(11,[0,1,3,6,8,9,10],[2.0,1.0,3.0,5.0,2.0,3.0,9.0])      |
|4.0  |(11,[0,1,2,3,4,6,9,10],[3.0,1.0,1.0,9.0,3.0,2.0,1.0,3.0])|
+-----+---------------------------------------------------------+
only showing top 5 rows

The lower bound on the log likelihood of the entire corpus: -804.509976387
The upper bound on perplexity: 3.09426915482
The topics described by their top-weighted terms:
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+------------------------

In [6]:
topics.printSchema()

root
 |-- topic: integer (nullable = false)
 |-- termIndices: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- termWeights: array (nullable = true)
 |    |-- element: double (containsNull = false)



In [3]:
business = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/yelp.business").load()
review = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/yelp.review").load()
review = review.select('business_id', 'text')

In [152]:
review.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)



In [8]:
review.select('text').rdd.first()

Row(text=u"If you need an inexpensive place to stay for a night or two then you may consider this place but for a longer stay I'd recommend somewhere with better amenities. \n\nPros:\nGreat location- you're right by the train station, central location to get to old town and new town, and right by sight seeing his tours. Food, bars, and shopping all within walking distance. Location, location, location.\nVery clean and very good maid service\n\nCons:\nTiny rooms \nUncomfortable bed \nAbsolutely no amenities \nNo phone in room \nNo wardrobe \n\nWas given a lot of attitude about me and my husband sharing a room which was quite strange and we were charged 15 pounds more for double occupancy not sure why that matters I felt like it was a money grab. It was just handled in a kind of odd manner to me... \n\nIf you book this hotel all you get is a bed, desk, and a bathroom. It isn't awful but know what you're getting into.")

In [11]:
train_raw = review.select('text').rdd.sample(False, 0.001, 1).map(lambda x: x[0])
print(train_raw.count())

4729


In [168]:
train_raw.first()

u"This is one star for service from Jennifer. I had set up time to see her at 9am when they opened. At 9am I was still waiting outside in line behind another gentleman, who happened to be in a wheelchair. At 9:04, Jennifer walked to the front door and unlocked it. I found it rude for her not to open the door for the wheelchair client. She hesitated to speak to me, assuming I was there with the other gentleman. If I'm going to sit in a chair with a stylist for hours, I don't care how good she/he is, I will not to pay my hard earned $ to someone who doesn't treat others kindly. I pay well for good service, so due to Jennifer, I will not only never return but I will not recommend Yelpers/friends/family to see Jennifer at Supercuts"

In [12]:
import nltk
import string
import re
from sklearn.feature_extraction import stop_words

In [100]:
stop_words = sc.textFile("./stop_words").collect()

In [169]:
common_words = ['good','great','place','time', "n't", 'food', 'das', 'best','will','nice','und', 'things']

def tokenize(text):
    regex = re.compile('[' + re.escape("!\"#$%&()*+,-./:;<=>?@[\]^_`{|}~") + '0-9\\r\\t\\n]')
    text = regex.sub(" ", text.lower())
    words = nltk.word_tokenize(text, 'english', False)
    words = [w for w in words if len(w) > 2 and w not in stop_words and w not in common_words]
    return words + [words[i] + '_' + words[i+1] for i in range(len(words)-1)]

In [170]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
word_occurence = train_raw.map(lambda x: [tokenize(x)]).toDF()

In [171]:
word_occurence.withColumn("words", word_occurence['_1'].cast(ArrayType(StringType()))).show(5)

+--------------------+--------------------+
|                  _1|               words|
+--------------------+--------------------+
|[star, service, j...|[star, service, j...|
|[year, son, trim,...|[year, son, trim,...|
|[nope, happen, pr...|[nope, happen, pr...|
|[absolute, worst,...|[absolute, worst,...|
|[coming, years, s...|[coming, years, s...|
+--------------------+--------------------+
only showing top 5 rows



In [172]:
cv = CountVectorizer(inputCol='_1', outputCol='features', vocabSize=2000)
cv_model = cv.fit(word_occurence)
train = cv_model.transform(word_occurence).cache()

train.show(5)

+--------------------+--------------------+
|                  _1|            features|
+--------------------+--------------------+
|[star, service, j...|(2000,[0,2,6,11,4...|
|[year, son, trim,...|(2000,[15,20,43,4...|
|[nope, happen, pr...|(2000,[83,104,163...|
|[absolute, worst,...|(2000,[163,213,22...|
|[coming, years, s...|(2000,[1,9,33,40,...|
+--------------------+--------------------+
only showing top 5 rows



In [179]:
lda_model = LDA(k=20, maxIter=5).fit(train)

In [182]:
# Describe topics.
topics = lda_model.describeTopics(5)
print("The topics described by their top-weighted terms:")
topics.show(truncate=True)

The topics described by their top-weighted terms:
+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[603, 708, 782, 1...|[0.01069670245101...|
|    1|  [0, 63, 21, 2, 13]|[0.01334178064445...|
|    2|   [0, 63, 7, 1, 33]|[0.01414439156481...|
|    3| [29, 16, 0, 68, 26]|[0.01495260920407...|
|    4|[33, 53, 45, 236,...|[0.01217465241637...|
|    5|[216, 4, 48, 76, ...|[0.01565735371239...|
|    6|[41, 119, 222, 68...|[0.01151257192206...|
|    7|[29, 45, 216, 754...|[0.01553261415574...|
|    8|  [3, 163, 17, 0, 4]|[0.02059190407718...|
|    9|[147, 5, 277, 130...|[0.01495722354229...|
|   10|   [3, 1, 37, 2, 40]|[0.01431688770436...|
|   11|  [14, 28, 43, 6, 0]|[0.01276977619288...|
|   12|  [9, 12, 1, 11, 28]|[0.01056777454495...|
|   13|  [10, 25, 8, 15, 0]|[0.02250086539069...|
|   14| [0, 27, 18, 84, 55]|[0.02194960943926...|
|   15|    [0, 5, 1, 37, 2]|[0.01180421317801...|


In [183]:
# Shows the result
# transformed = lda_model.transform(train)
# transformed.show(truncate=False)

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))

topics.withColumn("topics_words", indices_to_terms(cv_model.vocabulary)("termIndices")).select('topics_words').show(truncate=False)

+--------------------------------------------------+
|topics_words                                      |
+--------------------------------------------------+
|[carne, asada, carne_asada, valley, east]         |
|[service, bar, minutes, well, experience]         |
|[service, bar, people, staff, years]              |
|[pizza, restaurant, service, salad, pretty]       |
|[years, care, family, dog, money]                 |
|[sushi, love, happy, hour, happy_hour]            |
|[sauce, sandwich, cream, salad, lunch]            |
|[pizza, family, sushi, pasty, bar]                |
|[friendly, hair, amazing, service, love]          |
|[burger, order, pho, fries, ordered]              |
|[friendly, staff, store, well, awesome]           |
|[told, called, call, going, service]              |
|[work, day, staff, recommend, called]             |
|[chicken, delicious, ordered, definitely, service]|
|[service, car, customer, shop, customer_service]  |
|[service, order, staff, store, well]         

In [159]:
mesa_ids = business.filter(business.city == 'Mesa').select('business_id')
mesa_ids.show(10)

+--------------------+
|         business_id|
+--------------------+
|qrAHt4wWRYWj1sEjx...|
|n33Izvzk_z9_51H6N...|
|5Ghe0btvM7tXqANnh...|
|s_6WYPw3t50el2fSu...|
|EvX3LA7Wc1tv1S4qr...|
|gqv6zoGHSeTsNKynh...|
|ynN2PeYIpufc_eSjE...|
|g-HSA1m2vFPjKHYnq...|
|UlDvngOADDhshzMKv...|
|aBGT2NQ9fSesoB93-...|
+--------------------+
only showing top 10 rows



In [160]:
mesa_review = mesa_ids.join(review.select('business_id', 'text'), 'business_id','inner')
mesa_review.show(5)

+--------------------+--------------------+
|         business_id|                text|
+--------------------+--------------------+
|5Rduolg9SjUpg39hT...|This is one star ...|
|5Rduolg9SjUpg39hT...|Took my 2 year ol...|
|5Rduolg9SjUpg39hT...|Nope. Wont happen...|
|5Rduolg9SjUpg39hT...|Absolute worst ha...|
|5Rduolg9SjUpg39hT...|I have been comin...|
+--------------------+--------------------+
only showing top 5 rows



In [167]:
train_raw = mesa_review.rdd.map(lambda x: x[1]).cache()