In [2]:
#!pip install pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row, column
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("yelp").getOrCreate()


In [3]:
reviews_updated = "/Users/krishmehta/Desktop/DataMining/yelp_dataset/review.json"
review = spark.read.json(reviews_updated)

In [4]:
import string
import re
def remove_punct(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", text)  
    return nopunct

In [5]:
def convert_rating(rating):
    if rating >=4:
        return 1
    else:
        return 0

In [6]:
# df_review_data = review.select("stars","date","text")

In [7]:
# df_review_data_pd = df_review_data.toPandas()

In [8]:
df_review = review

In [9]:
# df_review_test = review.filter(review.date>"2017-01-01")

In [10]:
from pyspark.sql.functions import udf
punct_remover = udf(lambda x: remove_punct(x))
rating_convert = udf(lambda x: convert_rating(x))

#select 1.5 mn rows of reviews text and corresponding star rating with punc removed and ratings converted
resultDF = df_review.select('review_id', punct_remover('text'), rating_convert('stars'), 'date', 'business_id', 'user_id').limit(150000)
#user defined functions change column names so we rename the columns back to its original names
resultDF = resultDF.withColumnRenamed('<lambda>(text)', 'text')
resultDF = resultDF.withColumnRenamed('<lambda>(stars)', 'stars')

In [11]:
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
#tokenizer and stop word remover
tok = Tokenizer(inputCol="text", outputCol="words")
#stop word remover
stopwordrm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
# Build the pipeline 
pipeline = Pipeline(stages=[tok, stopwordrm])
# Fit the pipeline 
review_tokenized = pipeline.fit(resultDF).transform(resultDF).cache()

In [12]:
# count vectorizer and tfidf
# from pyspark.ml.feature import HashingTF, IDF, Tokenizer
cv = CountVectorizer(inputCol='words_nsw', outputCol='tf')
cvModel = cv.fit(review_tokenized)
count_vectorized = cvModel.transform(review_tokenized)
# tfidfModel = IDF().fit(count_vectorized)
# tfidf_df = tfidfModel.transform(count_vectorized)

In [13]:
tfidfModel = IDF(inputCol="tf", outputCol="features").fit(count_vectorized)
tfidf_df = tfidfModel.transform(count_vectorized)

In [14]:
tfidf_df.show()

+--------------------+--------------------+-----+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           review_id|                text|stars|      date|         business_id|             user_id|               words|           words_nsw|                  tf|            features|
+--------------------+--------------------+-----+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|x7mDIiDB3jEiPGPHO...|The pizza was oka...|    0|2011-02-25|iCQpiavjjPzJ5_3gP...|msQe1u7Z_XuqjGoqh...|[the, pizza, was,...|[pizza, okay, , b...|(81390,[0,1,6,9,1...|(81390,[0,1,6,9,1...|
|dDl8zu1vWPdKGihJr...|I love this place...|    1|2012-11-13|pomGBqfbxcqPv14c3...|msQe1u7Z_XuqjGoqh...|[i, love, this, p...|[love, place, , f...|(81390,[0,1,3,4,1...|(81390,[0,1,3,4,1...|
|Er4NBWCmCD4nM8_p1...|Back in          ...|    0|2011-02-25|elqbB

In [15]:
from pyspark.sql.types import DoubleType
tfidf_df.printSchema()
# changedTypedf = tfidf_df.withColumn("label", tfidf_df["stars"].cast(DoubleType()))
changedTypedf = tfidf_df.withColumn("label", tfidf_df["stars"].cast("int"))


root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_nsw: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)



In [16]:
changedTypedf.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- date: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_nsw: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)



In [17]:
test_data = changedTypedf.filter(changedTypedf.date<"2017-01-01")


In [18]:
# Elastic Net Logit


from pyspark.ml.classification import LogisticRegression
lambda_par = 0.02
alpha_par = 0.3
lr = LogisticRegression().\
        setLabelCol('label').\
        setFeaturesCol('features').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)
lrModel = lr.fit(test_data)
# lr_pred = lrModel.transform(test)

In [19]:
train_data = changedTypedf.filter(changedTypedf.date<"2017-01-01")

In [20]:
lr_pred = lrModel.transform(train_data)

In [27]:
final_output_pd = lr_pred.select( 'business_id', 'user_id', 'prediction').toPandas()

In [28]:
final_output_pd

Unnamed: 0,business_id,user_id,prediction
0,iCQpiavjjPzJ5_3gPD5Ebg,msQe1u7Z_XuqjGoqhB0J5g,0.0
1,pomGBqfbxcqPv14c3XH-ZQ,msQe1u7Z_XuqjGoqhB0J5g,1.0
2,elqbBhBfElMNSrjFqW3now,msQe1u7Z_XuqjGoqhB0J5g,0.0
3,zdE82PiD6wquvjYLyhOJNA,msQe1u7Z_XuqjGoqhB0J5g,1.0
4,EAwh1OmG6t6p3nRaZOW_AA,msQe1u7Z_XuqjGoqhB0J5g,1.0
5,3Mx4renubPRnjHUw1n2UkA,eAavwM32i2h7sNNer--kGw,0.0
6,Xg5qEQiB-7L6kGJ5F4K3bQ,Me267nGzYOJAGNSbNR_e6A,1.0
7,vsFFbN71ehRCp46KeR5RdQ,0pf5VuzE4_1pwj5NJHG5TQ,1.0
8,Jj8ubiwwuCR-rrhrrjcryw,0pf5VuzE4_1pwj5NJHG5TQ,1.0
9,7m1Oa1VYV98UUuo_6i0EZg,0pf5VuzE4_1pwj5NJHG5TQ,1.0


In [38]:
# final_output_pd.set_index('business_id').T.to_dict('list')

In [32]:
# groupByBusiness_pd= final_output_pd.groupby(['business_id'])

In [39]:
# groupByBusiness_pd

In [40]:
# groupByBusiness_pd.apply(lambda x: x.set_index('user_id').resample('D').ffill())

In [53]:
dict_for_business={}

In [54]:
for row in final_output_pd.iterrows():
    temp_dic = {}
    if row[1][0] in dict_for_business:
        temp_dic = dict_for_business[row[1][0]]
    else:
        temp_dic = {}
    temp_dic[row[1][1]] = row[1][2]
    dict_for_business[row[1][0]] = temp_dic
#     dict_for_business[row[1][0]]=dict_for_business.get(row[1][0],row[1][1])
#     dict_for_business[row[1][0]][row[1][1]]=dict_for_business[row[1][0]].get(row[1][1],row[1][2])
#     print(row[1][2])


In [55]:
dict_for_business

{u'--9e1ONYQuAa-CB_Rrw7Tw': {u'-7hnKm0I8detrDCgsibKsA': 1.0,
  u'-F92f7vNic_Y1cPJJRlSIA': 0.0,
  u'-Q1QRzkwvN89m7R-9sKuwA': 1.0,
  u'-XzAzGKAGCLn-sxy0_3dDw': 1.0,
  u'-Z7Nw2UF7NiBSAzfXNA_XA': 1.0,
  u'0G9BcXSQscv_GnhlNwFfjw': 0.0,
  u'0Lio5NNlqy7OX6TUWx47Uw': 1.0,
  u'0YddhXMfi_IG4wKl1daa_g': 0.0,
  u'0jUf6HWYmnaIJVyhiD0p5g': 1.0,
  u'0syC71vtTGGzSeGzfTR6vw': 1.0,
  u'1dV5ct9dz1gYeOOgXPLG-w': 1.0,
  u'2WyWm7oyDyktCMbpwwGVkA': 1.0,
  u'2aeNFntqY2QDZLADNo8iQQ': 1.0,
  u'2bJG8cm2QT4pJbzwiI2HoA': 1.0,
  u'2xZCgpYf-eFL6LI_5ePLhg': 0.0,
  u'2zFKFGiVeNz--CkG2Qa4vA': 1.0,
  u'35k9FLqnR9s_qr2aHWfc6Q': 1.0,
  u'49eMil-dGqIG20KBWdVAZg': 0.0,
  u'4DrtQoV20cbDOlnOk_rxHw': 1.0,
  u'4hYlLptEJEIhQcb8k7QEqA': 1.0,
  u'5JIRtOLYy5ZFSCwGbB3VFA': 0.0,
  u'6gW3E_fdyKDA_S__92C66Q': 0.0,
  u'6v_jnMTkbe04XzRPrJWHCw': 0.0,
  u'7-aG0q6MJ2cFEiMAyg95ug': 0.0,
  u'7sH_HM1FMYWcFwCL4Jyu4g': 1.0,
  u'8_h-msoVieX7OdfOnzEGYg': 1.0,
  u'8jfGVLVliMyeUw2JoVWi2g': 0.0,
  u'8s-eaOPhBO4rf2KTOyiEmA': 1.0,
  u'8yQc2kxEKAwaBUsHW

In [58]:
import pickle
with open('/Users/krishmehta/Desktop/DataMining/yelp_dataset/sentiment.pickle', 'wb') as handle:
    pickle.dump(dict_for_business, handle, protocol=pickle.HIGHEST_PROTOCOL)