In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF, StringIndexer
from pyspark.sql.types import FloatType, IntegerType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
import pandas as pd

In [2]:
spark = SparkSession.builder.master("local[*]") \
                    .appName('Yelp') \
                    .getOrCreate()

In [3]:
df = spark.read.option("multiLine", True).csv("store/yelp.csv", header=True, sep='|')
df = df.withColumn("review_stars", col("review_stars").cast(FloatType())) \
       .withColumn("useful", col("useful").cast(IntegerType())) \
       .withColumn("funny", col("funny").cast(IntegerType())) \
       .withColumn("cool", col("cool").cast(IntegerType())) \
       .withColumn("latitude", col("latitude").cast(FloatType())) \
       .withColumn("longitude", col("longitude").cast(FloatType())) \
       .withColumn("business_stars", col("business_stars").cast(FloatType())) \
       .withColumn("review_count", col("review_count").cast(IntegerType())) \
       .withColumn("is_open", col("is_open").cast(IntegerType()))

In [4]:
df.count()

1168818

In [5]:
df = df.withColumn("sentiment", when(df.review_stars >= 3.5, 1).otherwise(when(df.review_stars <= 2.5, -1).otherwise(0)))

In [6]:
# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
df_tokens = tokenizer.transform(df)

In [7]:
# Stop word remove
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="words")
filtered_data = stopwords_remover.transform(df_tokens)

In [8]:
# HashingTF to convert text to numeric features
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2**20)
featurized_data = hashingTF.transform(filtered_data)

In [9]:
# IDF to rescale features
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

In [10]:
data = rescaled_data.select(
    col("business_stars"),
    col("review_count"),
    col("rawfeatures"),
    col("features"),
    col("sentiment")
)
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
label_model = label_stringIdx.fit(data)
data = label_model.transform(data)

In [11]:
(train, test) = data.randomSplit([0.8, 0.2])

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

nbModel = nb.fit(train)

predictions = nbModel.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
nb_accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
print("Naive Bayes Classification Model Test Accuracy = %g" % nb_accuracy)

Naive Bayes Classification Model Test Accuracy = 0.80931


In [12]:
result = pd.DataFrame({
    'model': ['Naive Bayes Classification'],
    'accuracy': [nb_accuracy]
})
result.to_csv("store/result.csv")

top10_result_review = df.groupBy("name").agg(count("review_stars").alias("review_stars_total")).orderBy(col("review_stars_total").desc()).limit(10)
top10_result_review = top10_result_review.toPandas()
top10_result_review.to_csv("store/top10_result_review.csv")

top10_business_review = df.groupBy("name").agg(count("business_stars").alias("business_stars_total")).orderBy(col("business_stars_total").desc()).limit(10)
top10_business_review = top10_business_review.toPandas()
top10_business_review.to_csv("store/top10_business_review.csv")

top10_city_review = df.groupBy("city").agg(count("business_stars").alias("business_stars_total")).orderBy(col("business_stars_total").desc()).limit(10)
top10_city_review = top10_city_review.toPandas()
top10_city_review.to_csv("store/top10_city_review.csv")

review_stars_dist = df.groupBy("review_stars").agg(count("review_stars").alias("review_stars_total")).orderBy(col("review_stars").desc())
review_stars_dist = review_stars_dist.toPandas()
review_stars_dist.to_csv("store/review_stars_dist.csv")

business_stars_dist = df.groupBy("business_stars").agg(count("business_stars").alias("business_stars_total")).orderBy(col("business_stars").desc())
business_stars_dist = business_stars_dist.toPandas()
business_stars_dist.to_csv("store/business_stars_dist.csv")

top10_city_business_count = df.groupBy('city').count().sort(col("count").desc()).limit(10)
top10_city_business_count = top10_city_business_count.toPandas()
top10_city_business_count.to_csv("store/top10_city_business_count.csv")

top10_most_reviewed_business = df.groupBy(col('name'), col('review_count'), col('city')).agg(avg("review_stars").alias("review_stars_average")).orderBy(col("review_count").desc()).limit(10)
top10_most_reviewed_business = top10_most_reviewed_business.toPandas()
top10_most_reviewed_business.to_csv("store/top10_most_reviewed_business.csv")

sentiment_dist = data.groupBy("sentiment").agg(count("sentiment").alias("sentiment_total")).orderBy(col("sentiment").desc())
sentiment_dist = sentiment_dist.toPandas()
sentiment_dist.to_csv("store/sentiment_dist.csv")

In [13]:
spark.stop()