In [1]:
import numpy as np
import string
import re

from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.sql.functions import udf

from pyspark.sql import Row

PUNCTUATION = set(string.punctuation)
STOPWORDS = set(stopwords.words('english'))

In [2]:
#from nlp_cl_start import print_cl

In [3]:
api_f = ['attributes.RestaurantsPriceRange2', 'business_id', 'stars', 'review_count', 'categories']

In [4]:
from pyspark.sql.types import ArrayType, StringType, BooleanType

In [5]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [6]:
def if_restaurant(text):
    if text is None:
        return False
    else:
        return 'Restaurants' in text

if_rest_udf = udf(if_restaurant, BooleanType())

In [7]:
import pyspark as ps
spark = (ps.sql.SparkSession.builder
        .master("local[4]")
        .appName("yelp_academic")
        .getOrCreate()
        )
sc = spark.sparkContext

In [8]:
import pandas as pd

In [24]:
from nlp_cl_start import kmean_counts

In [9]:
biz = spark.read.json('yelp_dataset/yelp_academic_dataset_business.json')

In [10]:
rev = spark.read.json('yelp_dataset/yelp_academic_dataset_review.json')

In [11]:
bad = rev.filter('stars < 3')

In [12]:
bad.take(2)

[Row(business_id='iCQpiavjjPzJ5_3gPD5Ebg', cool=0, date='2011-02-25', funny=0, review_id='x7mDIiDB3jEiPGPHOmDzyw', stars=2, text="The pizza was okay. Not the best I've had. I prefer Biaggio's on Flamingo / Fort Apache. The chef there can make a MUCH better NY style pizza. The pizzeria @ Cosmo was over priced for the quality and lack of personality in the food. Biaggio's is a much better pick if youre going for italian - family owned, home made recipes, people that actually CARE if you like their food. You dont get that at a pizzeria in a casino. I dont care what you say...", useful=0, user_id='msQe1u7Z_XuqjGoqhB0J5g'),
 Row(business_id='jtQARsP6P-LbkyjbO1qNGg', cool=1, date='2014-10-23', funny=1, review_id='LZp4UX5zK3e-c5ZGSeo3kA', stars=1, text='Terrible. Dry corn bread. Rib tips were all fat and mushy and had no flavor. If you want bbq in this neighborhood go to john mulls roadkill grill. Trust me.', useful=3, user_id='msQe1u7Z_XuqjGoqhB0J5g')]

In [13]:
bad.count()

1345953

In [14]:
bad.cache()

DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [15]:
rests = biz.filter(if_rest_udf(biz.categories))

In [111]:
rest_rev = rev.join(rests.select('business_id', 'stars').withColumnRenamed('stars','rating'),'business_id')

In [18]:
bad_rest_rev = bad.join(rests.select('business_id'),'business_id')

In [19]:
bad_rest_rev.count()

768690

In [20]:
bad_rest_rev.cache()

DataFrame[business_id: string, cool: bigint, date: string, funny: bigint, review_id: string, stars: bigint, text: string, useful: bigint, user_id: string]

In [63]:
bad_sample = bad_rest_rev.select('business_id','review_id', 'user_id','stars', 'text').sample(False, 0.027, seed =91)

In [64]:
%%time
bad_sample.count()

CPU times: user 555 µs, sys: 1.01 ms, total: 1.57 ms
Wall time: 1.19 s


20765

In [65]:
bad_sample.cache()

DataFrame[business_id: string, review_id: string, user_id: string, stars: bigint, text: string]

In [32]:
def tokenize(text):
    regex = re.compile('<.+?>|[^a-zA-Z]')
    clean_txt = regex.sub(' ', text)
    tokens = clean_txt.split()
    lowercased = [t.lower() for t in tokens]

    no_punctuation = []
    for word in lowercased:
        punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION])
        no_punctuation.append(punct_removed)
    no_stopwords = [w for w in no_punctuation if not w in STOPWORDS]

    STEMMER = PorterStemmer()
    stemmed = [STEMMER.stem(w) for w in no_stopwords]
    return [w for w in stemmed if w]

In [33]:
udf_tokenize = udf(f=tokenize, returnType=ArrayType(StringType()))

In [66]:
bad_sample = bad_sample.withColumn('token', udf_tokenize('text'))

In [67]:
cv = CountVectorizer(minDF=10, vocabSize=5000, inputCol='token', outputCol='vectors')

In [68]:
model = cv.fit(bad_sample)

In [69]:
%%time
sample_vect = model.transform(bad_sample)

CPU times: user 5.03 ms, sys: 2.53 ms, total: 7.55 ms
Wall time: 67.4 ms


In [38]:
%%time
sample_vect.limit(3).toPandas()

CPU times: user 10.3 ms, sys: 4.88 ms, total: 15.2 ms
Wall time: 563 ms


Unnamed: 0,business_id,review_id,stars,text,token,vectors
0,leMIHa6TogufHv5HNYjnfw,6PkCZqMq9pgpBBQEe00eDg,2,We ended up at Ravi's Soups on Queen Street a ...,"[end, ravi, soup, queen, street, day, ago, fai...","(1.0, 0.0, 3.0, 2.0, 2.0, 1.0, 0.0, 2.0, 0.0, ..."
1,t5nyOtnEv455rShOS_g3RA,hGEpM4ndT2NnUyrTp7TpUg,2,We ordered the 2 fish combo and fish burrito. ...,"[order, fish, combo, fish, burrito, found, bes...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,klQnFvxN7stIgYAvRylY0Q,GOznKkym_lN65np3nmDofw,2,Not delicious. I wanted it to be but it wasn't...,"[delici, want, firecrack, shrimp, good, flavor...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, ..."


In [70]:
sample_vect.cache()

DataFrame[business_id: string, review_id: string, user_id: string, stars: bigint, text: string, token: array<string>, vectors: vector]

In [71]:
%%time
cl = 20

km = KMeans(k = cl, featuresCol='vectors', maxIter= 30)

model_km = km.fit(sample_vect)

centers_c = model_km.clusterCenters()

CPU times: user 217 ms, sys: 54.9 ms, total: 272 ms
Wall time: 30.7 s


In [46]:
vocab = np.array(model.vocabulary)
vocab[np.argsort(centers_c )[:2,-10:]]

array([['get', 'back', 'like', 'time', 'servic', 'good', 'order', 'food',
        'go', 'place'],
       ['restaur', 'good', 'go', 'time', 'get', 'one', 'place', 'order',
        'like', 'food']], dtype='<U15')

In [72]:
pred = model_km.transform(sample_vect)
pred.columns

['business_id',
 'review_id',
 'user_id',
 'stars',
 'text',
 'token',
 'vectors',
 'prediction']

In [154]:
test = rest_rev.select('business_id','user_id','rating', 'stars').sample(False, 0.00027, seed =91)

In [151]:
rest_rev.columns

['business_id',
 'cool',
 'date',
 'funny',
 'review_id',
 'stars',
 'text',
 'useful',
 'user_id',
 'rating']

In [155]:
test.count()

938

In [93]:
test.first()['business_id'], test.first()['user_id']

('3DgPtOe-FKRH0bFE3ptzxA', '6IhssoagVtfNMnNjU71Q1A')

In [156]:
test.cache()

DataFrame[business_id: string, user_id: string, rating: double, stars: bigint]

In [188]:
def cluster_rest(business_id, not_user_id):
    texts = rest_rev.filter('stars < 3').filter((rest_rev.business_id == business_id)
                                        &(rest_rev.user_id!=not_user_id)).select('text')
    texts = texts.withColumn('token', udf_tokenize('text'))
    vect = model.transform(texts)
    #vect.cache()
    pred = model_km.transform(vect).select('prediction')
    return pred
    

In [126]:
%%time
cl_r = cluster_rest('3DgPtOe-FKRH0bFE3ptzxA','6IhssoagVtfNMnNjU71Q1A')

CPU times: user 10.4 ms, sys: 3.64 ms, total: 14 ms
Wall time: 938 ms


In [187]:
def cluster_user(not_business_id, user_id):
    texts = rest_rev.filter('stars < 3').filter((rest_rev.user_id == user_id)
                                        &(rest_rev.business_id!=not_business_id)).select('text')
    texts = texts.withColumn('token', udf_tokenize('text'))
    vect = model.transform(texts)
    #vect.cache()
    pred = model_km.transform(vect).select('prediction')
    return pred

In [127]:
%%time
cl_u =cluster_user('3DgPtOe-FKRH0bFE3ptzxA','6IhssoagVtfNMnNjU71Q1A')

CPU times: user 14.1 ms, sys: 4.93 ms, total: 19 ms
Wall time: 719 ms


In [144]:
%%time
cl_r.intersect(cl_u).count()

CPU times: user 35 ms, sys: 16.8 ms, total: 51.8 ms
Wall time: 1.4 s


0

In [172]:
def predict_bad(row):
    business_id = row['business_id']
    user_id = row['user_id']
    mean_rating = row['rating']
    cl_u = cluster_user(business_id, user_id)
    cl_r = cluster_rest(business_id, user_id)
    if cl_r.intersect(cl_u).count() > 0:
        return 2
    else:
        return mean_rating

In [166]:
test1_df = test.toPandas()

In [167]:
test1_df.head()

Unnamed: 0,business_id,user_id,rating,stars
0,XXW_OFaYQkkGOGniujZFHg,ocM9_kR6iJfal2SXmJlhSQ,4.0,3
1,axnI-1l2a_kydvL12-toIQ,L-kq6bn9ayIT0gWZZnYNxA,4.0,5
2,KXITXbKuE60WSUDs7NZVLQ,-7EBoXfOAtDg53_KY5UGgQ,3.5,3
3,Rwahe1zbFpw6VIjb5ngZeg,T_GUcbHmVZYYDwdHqQ-wcQ,4.0,4
4,cA2ZGg41C-KEuHLwXxoB1Q,5fmlBpC5w9IhFRBeWmqa_g,3.5,5


In [181]:
test1_df.index

RangeIndex(start=0, stop=938, step=1)

In [189]:
predict_bad(test1_df.loc[3,:])

Py4JJavaError: An error occurred while calling o7912.count.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
	at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:158)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceRows(ColumnarBatchScan.scala:166)
	at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:80)
	at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:158)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.FileSourceScanExec.produce(DataSourceScanExec.scala:158)
	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:89)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2775)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2774)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2774)
	at sun.reflect.GeneratedMethodAccessor182.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:99)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1479)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:98)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:160)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:297)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:295)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:315)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:89)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:304)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [185]:
%%time
#test1_df['y_hat'] = test1_df.apply(predict_bad, axis =1)
test1_df['y_hat'] = 0
j = 0
for i in test1_df.index:
    j+=1
    test1_df.loc[i,'y_hat'] = predict_bad(test1_df.loc[i,:])

CPU times: user 512 µs, sys: 62 µs, total: 574 µs
Wall time: 540 µs


In [191]:
#

In [170]:
ttt = test1_df[:4].copy()

In [174]:
%%time
y_h = ttt.apply(predict_bad, axis =1)

CPU times: user 262 ms, sys: 104 ms, total: 365 ms
Wall time: 3min 11s


In [175]:
test2 = rest_rev.select('business_id','user_id','rating', 'stars').sample(False, 0.0027, seed =91)

In [176]:
test2.cache()

DataFrame[business_id: string, user_id: string, rating: double, stars: bigint]

In [177]:
t2_df = test2.toPandas()

In [None]:
t2_df['y_hat'] = t2_df.apply(predict_bad, axis =1)

['business_id', 'review_id', 'stars', 'text', 'token', 'vectors', 'prediction']

In [48]:
evaluator = ClusteringEvaluator(featuresCol='vectors')

silhouette = evaluator.evaluate(pred)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = -0.1117817836165352


In [76]:
count_tr_df =pred.join(biz.select(api_f ).withColumnRenamed('stars', 'rating'), 'business_id').toPandas()

In [109]:
len(count_tr_df.user_id.unique())

19575

In [110]:
count_tr_df.shape

(20765, 12)

In [77]:
count_tr_df.groupby('prediction').count()

Unnamed: 0_level_0,business_id,review_id,user_id,stars,text,token,vectors,RestaurantsPriceRange2,rating,review_count,categories
prediction,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,6841,6841,6841,6841,6841,6841,6841,6718,6841,6841,6841
1,161,161,161,161,161,161,161,161,161,161,161
2,3550,3550,3550,3550,3550,3550,3550,3515,3550,3550,3550
3,1642,1642,1642,1642,1642,1642,1642,1619,1642,1642,1642
4,1244,1244,1244,1244,1244,1244,1244,1221,1244,1244,1244
5,2425,2425,2425,2425,2425,2425,2425,2399,2425,2425,2425
6,48,48,48,48,48,48,48,48,48,48,48
7,511,511,511,511,511,511,511,507,511,511,511
8,219,219,219,219,219,219,219,216,219,219,219
9,355,355,355,355,355,355,355,354,355,355,355


## A little more beatiful

In [47]:
%%time
idf = IDF(inputCol= 'vectors', outputCol= 'features')

model2 = idf.fit(sample_vect)

sample_tfidf = model2.transform(sample_vect)

sample_tfidf.cache()

CPU times: user 8.44 ms, sys: 3.56 ms, total: 12 ms
Wall time: 1.02 s


In [48]:
sample_tfidf.first()

Row(business_id='8-_jXGGDFPFAPB4r1x-81w', review_id='KGUjV-xxvIOQPlQ-C6qBEg', stars=1, text='The food was meh....\n\nThe parking is unacceptable, zero self parking....\n\nThe service in the patio was terrible....\n\nWould not go back...', token=['food', 'meh', 'park', 'unaccept', 'zero', 'self', 'park', 'servic', 'patio', 'terribl', 'would', 'go', 'back'], vectors=SparseVector(1415, {0: 1.0, 5: 1.0, 7: 1.0, 11: 1.0, 12: 1.0, 115: 1.0, 355: 2.0, 487: 1.0, 626: 1.0, 771: 1.0, 885: 1.0, 1030: 1.0}), features=SparseVector(1415, {0: 0.6121, 5: 1.0005, 7: 1.0754, 11: 1.2986, 12: 1.2127, 115: 2.4725, 355: 7.1055, 487: 4.1589, 626: 3.9766, 771: 4.2279, 885: 4.6156, 1030: 4.7269}))

In [49]:
###number of cluster!! make sence to send so class
cl_num = 15
sample_tfidf.cache()

DataFrame[business_id: string, review_id: string, stars: bigint, text: string, token: array<string>, vectors: vector, features: vector]

In [50]:
%%time
km = KMeans(k = cl_num)

model_km = km.fit(sample_tfidf)

centers = model_km.clusterCenters()

CPU times: user 112 ms, sys: 38.9 ms, total: 151 ms
Wall time: 3.77 s


In [51]:
vocab = np.array(model.vocabulary)
vocab[np.argsort(centers )[:2,-10:]]

array([['said', 'wait', 'get', 'time', 'pizza', 'minut', 'ask', 'tabl',
        'order', 'us'],
       ['tabl', 'sauc', 'us', 'dish', 'server', 'order', 'fish', 'rice',
        'plate', 'restaur']], dtype='<U12')

In [52]:
sample_tfidf.columns

['business_id', 'review_id', 'stars', 'text', 'token', 'vectors', 'features']

In [53]:
pred = model_km.transform(sample_tfidf)
pred.columns

['business_id',
 'review_id',
 'stars',
 'text',
 'token',
 'vectors',
 'features',
 'prediction']

In [88]:
ClusteringEvaluator?

In [54]:
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(pred)
print("Silhouette with squared euclidean distance = " + str(silhouette))


Silhouette with squared euclidean distance = -0.2829883757234709


In [55]:
rest_with_cl_train =pred.join(biz.select(api_f ), 'business_id')

In [56]:
rest_with_cl_train.count()

1919

In [57]:
clust_df = rest_with_cl_train.toPandas()

In [58]:
clust_df.groupby('prediction').count()

Unnamed: 0_level_0,business_id,review_id,stars,text,token,vectors,features,RestaurantsPriceRange2,stars,review_count,categories
prediction,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
0,342,342,342,342,342,342,342,338,342,342,342
1,37,37,37,37,37,37,37,37,37,37,37
2,1,1,1,1,1,1,1,1,1,1,1
3,1,1,1,1,1,1,1,1,1,1,1
4,1,1,1,1,1,1,1,1,1,1,1
5,1,1,1,1,1,1,1,1,1,1,1
6,1526,1526,1526,1526,1526,1526,1526,1506,1526,1526,1526
7,1,1,1,1,1,1,1,1,1,1,1
8,2,2,2,2,2,2,2,2,2,2,2
9,1,1,1,1,1,1,1,1,1,1,1


In [106]:
clu12 = clust_df[clust_df.prediction == 12]

In [109]:
clu12[['business_id','stars','categories','RestaurantsPriceRange2','token']]

Unnamed: 0,business_id,stars,stars.1,categories,RestaurantsPriceRange2,token
0,qfcdMhm1Ff28JHVpHca20g,2,1.5,"Pizza, Restaurants",2.0,"[good, pizza, deliveri, time, realli, bad, eve..."
9,Z5eukYH32_nFljOTC2DJ0g,3,4.0,"Pizza, Restaurants",2.0,"[classic, ny, pizza, place, interior, design, ..."
127,OXrFWgoz533T8tMRemkiww,2,4.0,"Pizza, Italian, Restaurants",2.0,"[sever, disappoint, trip, il, pizzaiolo, marke..."
180,v9jNkOIBfP4aW2ru50Rn-A,2,2.5,"Restaurants, Italian",2.0,"[boyfriend, weekend, alway, best, experi, food..."
192,A5Rkh7UymKm0_Rxm9K2PJw,2,4.0,"Cocktail Bars, Vegetarian, Steakhouses, Americ...",2.0,"[last, night, visit, yardhous, big, fan, sure,..."
237,rHzf-EDTP9g6gQFYAz0RuQ,1,2.5,"Salad, Restaurants, Pizza, Chicken Wings",,"[disappoint, panago, pizza, order, larg, peppe..."
242,ai8nfTBNvL579cjIuqZajQ,3,4.0,"Food, Restaurants, Pizza, Sandwiches, Desserts...",2.0,"[want, tri, differ, pizza, place, one, ever, c..."
336,CJvN2k3gjR7JspTx21icTQ,3,4.5,"Pizza, Restaurants, Italian",1.0,"[pizza, delici, big, fan, new, york, pizza, pi..."
406,swi7mi1ixWpu5-tZO2mtsg,1,3.0,"Italian, Restaurants, Sandwiches, Pizza",2.0,"[earlier, even, place, order, grub, hub, inch,..."
444,kd1VNJdd92T2eQ4hqvYQ5A,2,4.0,"Pizza, Restaurants, Chicken Wings, Salad",1.0,"[updat, sicilian, receiv, april, order, somewh..."
