In [28]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
try:
    sc
    spark
except NameError:
    sc = SparkContext('local')
    spark = SparkSession(sc)


In [29]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
try:
    sc
    spark
except NameError:
    sc = SparkContext('local')
    spark = SparkSession(sc)

from pyspark import SparkFiles
# url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
# spark.sparkContext.addFile(url)
stars = spark.read.csv("yelp_review.csv", sep=",", header=True)
stars = stars.na.drop(subset=["text", "stars"])
stars = stars.sample(False, 0.0001, 0)

# Show DataFrame
stars.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+
|           review_id|             user_id|         business_id|               stars|                date|                text|       useful|               funny|                cool|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+
|I wouldn't come h...| can't speak for ...| most restaurants...|                   2|                   0|                   1|         null|                null|                null|
|FzjInKYfM4K1Q5Hv9...|_ZAaHKOMDtvpWJvSp...|Huo1lJmVkdqvFuLtA...|                   5|          2013-05-13|Very friendly hom...|            1|                   0|                   1|
|0hGNzXujl2rvlFZ36...|VhkjxxBz-A4M-mMnc...|9uj0o9kwe6FQNspkc...|                

In [30]:
# %pyspark
from pyspark.sql.functions import length
# # Create a length column to be used as a future feature 
stars = stars.withColumn('length', length(stars['text']))
# data_df.show()
import pyspark.sql.functions as func
# means = df.groupby("id").agg(func.mean("col1"))
from pyspark.sql.functions import col, expr, when

five_stars = stars.withColumn(
    'class', when((col("stars") == '5'), 'positive')
    .otherwise("negative")
)


### Feature Transformations


In [31]:
# %pyspark
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [32]:
# %pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [33]:
# %pyspark
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [34]:
# five_stars.collect()

In [35]:
# %pyspark
# Fit and transform the pipeline
print(five_stars.count())
cleaner = data_prep_pipeline.fit(five_stars)
#cleaned = cleaner.transform(five_stars)

600


In [36]:
print(type(cleaner))
print(type(five_stars))

cleaned = cleaner.transform(five_stars)

<class 'pyspark.ml.pipeline.PipelineModel'>
<class 'pyspark.sql.dataframe.DataFrame'>


In [37]:
# %pyspark
# Show label and resulting features
# cleaned.select(['label', 'features']).show()
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[236232,2...|
|  1.0|(262145,[9639,158...|
|  0.0|(262145,[37852,58...|
|  1.0|(262145,[14,1846,...|
|  0.0|(262145,[21872,25...|
|  1.0|(262145,[15889,39...|
|  0.0|(262145,[8227,262...|
|  1.0|(262145,[7367,963...|
|  1.0|(262145,[3067,963...|
|  0.0|(262145,[115898,2...|
|  0.0|(262145,[4200,547...|
|  1.0|(262145,[15889,82...|
|  0.0|(262145,[9616,963...|
|  0.0|(262145,[14,1109,...|
|  0.0|(262145,[8968,190...|
|  0.0|(262145,[4200,961...|
|  0.0|(262145,[5377,963...|
|  1.0|(262145,[15889,29...|
|  0.0|(262145,[991,2315...|
|  0.0|(262145,[5497,963...|
+-----+--------------------+
only showing top 20 rows



In [38]:
# %pyspark
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [39]:
# %pyspark
# Tranform the model with the testing data
test_results = predictor.transform(testing)
# print(testing.columns)
# print(test_results.columns)
# test_results.toPandas().to_csv("test_results.csv")
test_results.show(5)

+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+------+--------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|           review_id|             user_id|         business_id|stars|      date|                text|useful|funny|cool|length|   class|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+-----+----------+--------------------+------+-----+----+------+--------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|0hGNzXujl2rvlFZ36...|VhkjxxBz-A4M-mMnc...|9uj0o9kwe6FQNspkc...|    1|2016-05-21|Updating my revi

In [40]:
# %pyspark
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.508880
