In [63]:
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark import SparkConf,SparkContext
import matplotlib.pyplot as plt
from time import time 
from tqdm import tqdm
import sklearn
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,StringIndexer
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, countDistinct
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics,MulticlassMetrics
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [2]:
from pyspark.sql import SparkSession

In [3]:
sc = SparkSession \
    .builder \
    .appName("yelp-data-exploration") \
    .config("spark.driver.maxResultSize", "100g") \
    .config("spark.driver.memory", "50g") \
    .config("spark.executor.memory", "20g") \
    .config("spark.master", "local[2]") \
    .config("spark.local.dir","/home/bh_parijat/spark_temp/")\
    .getOrCreate()

In [4]:
#sc = SparkSession.builder.appName('yelp-data-exploration').getOrCreate()

In [5]:
#yelp_users = sc.read.json("/home/bh_parijat/data/yelp/user.json")

In [6]:
yelp_reviews = sc.read.json("/home/bh_parijat/data/yelp/review.json")

In [7]:
#yelp_business = sc.read.json("/home/bh_parijat/data/yelp/business.json")

In [8]:
#yelp_tip = sc.read.json("/home/bh_parijat/data/yelp/tip.json")

In [9]:
yelp_reviews.count()

6685900

In [10]:
reviews_columns = ["business_id", "cool","date","funny","review_id","stars","text","useful","user_id"]

In [11]:
yelp_reviews_df = yelp_reviews.toDF(*reviews_columns)

In [12]:
yelp_reviews.head(1)

[Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')]

In [13]:
yelp_reviews_df.head()

Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')

In [52]:
#yelp_users.head(1)

In [54]:
#yelp_users.count()

In [53]:
#yelp_users.columns

In [18]:
#users_columns = yelp_users.columns

In [55]:
#users_columns

In [26]:
#yelp_users_df = yelp_users.toDF(*users_columns)

In [None]:
#yelp_users_df.collect()

In [20]:
#business_columns = yelp_business.columns

In [56]:
#yelp_business.head(1)

In [57]:
#business_columns

In [22]:
#yelp_checkin = sc.read.json("/home/bh_parijat/data/yelp/checkin.json")

In [None]:
#checkin_columns = yelp_checkin.columns

In [None]:
#checkin_columns

In [None]:
#yelp_checkin.head(1)

In [None]:
#yelp_checkin.count()

In [None]:
#yelp_business.agg

In [None]:
#yelp_business.agg(countDistinct(col("business_id")).alias("different_business")).show()

In [None]:
#yelp_checkin.agg(countDistinct(col("business_id")).alias("different_business")).show()

In [None]:
#yelp_business.count()

In [None]:
#yelp_tip

In [None]:
#yelp_tip.head(1)

In [None]:
#yelp_tip.count()

In [None]:
#yelp_tip.agg(countDistinct(col("user_id")).alias("different_users")).show()

In [None]:
#yelp_tip.agg(countDistinct(col("business_id")).alias("business_id")).show()

In [None]:
# yelp_tip.groupBy("user_id")\
#         .agg({'compliment_count':'count'})\
#         .withColumnRenamed("count(compliment_count)","count")\
#         .sort(col("count").desc()).show()

In [None]:
# yelp_tip.groupBy("business_id")\
#         .agg({'compliment_count':'count'})\
#         .withColumnRenamed("count(compliment_count)","count")\
#         .sort(col("count").desc()).show()

In [None]:
#yelp_reviews

In [None]:
#yelp_reviews.columns

In [None]:
#r = yelp_reviews.select("text").limit(2).collect()

In [None]:
#r[1]

In [None]:
#yelp_reviews.count()

In [59]:
# yelp_reviews.groupBy("stars")\
#             .agg({"text":"count"})\
#             .withColumnRenamed("count(text)","count")\
#             .sort(col("count").desc())\
#             .show()

## TF-IDF Model

In [13]:
yelp_reviews_df.createOrReplaceTempView("reviews")

In [14]:
sc.sql("select stars, text from reviews").take(1)

[Row(stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.')]

In [15]:
def map_rating_sentiment(rating):
    return 1 if rating>=3 else 0


map_rating = udf(map_rating_sentiment,IntegerType())
yelp_reviews_df_final = yelp_reviews_df.withColumn("sentiment",map_rating("stars"))

In [16]:
yelp_reviews_df_final.select(["sentiment","text","stars"]).show(2)

+---------+--------------------+-----+
|sentiment|                text|stars|
+---------+--------------------+-----+
|        0|Total bill for th...|  1.0|
|        1|I *adore* Travis ...|  5.0|
+---------+--------------------+-----+
only showing top 2 rows



In [17]:
yelp_reviews_df_final.createOrReplaceTempView("reviews")

In [18]:
X = sc.sql("select text,sentiment from reviews")

In [19]:
X = X.withColumnRenamed("text","review")

In [20]:
tokenizer = Tokenizer(inputCol="review",outputCol="tokenized_text")
hashingTF = HashingTF(inputCol="tokenized_text",outputCol="mapped_text")
idf = IDF(inputCol="mapped_text",outputCol="features")
stringIndexer = StringIndexer(inputCol="sentiment",outputCol="label")
lr = LogisticRegression(maxIter=1000)

In [21]:
pipeline = Pipeline(stages=[tokenizer,hashingTF,idf,stringIndexer,lr])

In [22]:
#pipelineFit = pipeline.fit(X)

In [23]:
#X = pipelineFit.transform(X)

In [24]:
X.show(1)

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|Total bill for th...|        0|
+--------------------+---------+
only showing top 1 row



In [25]:
train,test = X.randomSplit([0.85,0.15])

In [26]:
parameters = ParamGridBuilder()\
            .addGrid(lr.regParam ,[0.001,0.01,0.1,1,10])\
            .build()

In [29]:
tvs = TrainValidationSplit(estimator=pipeline,
                           evaluator=BinaryClassificationEvaluator(),
                            estimatorParamMaps= parameters,
                           trainRatio=0.8
                          )

In [30]:
model = tvs.fit(train)

In [38]:
prediction = model.transform(test)

In [39]:
prediction_df = prediction.select("features","label","prediction")

In [40]:
prediction_df.show(10)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|(262144,[9639,244...|  0.0|       0.0|
|(262144,[991,1109...|  0.0|       0.0|
|(262144,[3067,344...|  0.0|       0.0|
|(262144,[1469,271...|  0.0|       0.0|
|(262144,[733,1469...|  0.0|       0.0|
|(262144,[118,836,...|  1.0|       1.0|
|(262144,[3367,963...|  0.0|       0.0|
|(262144,[14,9616,...|  0.0|       0.0|
|(262144,[14,1998,...|  0.0|       0.0|
|(262144,[2666,847...|  0.0|       0.0|
+--------------------+-----+----------+
only showing top 10 rows



In [42]:
def compare_label(x,y):
    return 1 if x==y else 0


compare = udf(compare_label,IntegerType())

In [44]:
prediction_df = prediction_df.withColumn("correct",compare("label","prediction"))

In [50]:
metrics = BinaryClassificationMetrics(prediction_df.select("label","prediction").rdd)

In [51]:
metrics.areaUnderPR

0.6693000053079718

## Confidence of predicting our classes correct depicted by Area under ROC is 90.6%

In [64]:
conf_metrics  = MulticlassMetrics(prediction_df.select("label","prediction").rdd)

In [66]:
conf_metrics.precision()

0.911682838116951

In [67]:
conf_metrics.recall()

0.911682838116951

In [70]:
conf_metrics.accuracy

0.911682838116951

In [72]:
conf_metrics.confusionMatrix()

DenseMatrix(2, 2, [753200.0, 18046.0, 70509.0, 160938.0], 0)

In [73]:
conf_metrics.fMeasure()

0.911682838116951

In [75]:
model.bestModel.save("model")