### Import data

In [1]:
%load_ext autotime

time: 0 ns (started: 2021-04-22 13:10:23 -04:00)


In [2]:
import sparknlp
spark = sparknlp.start() 

time: 10.5 s (started: 2021-04-22 13:10:23 -04:00)


In [3]:
reviews = spark.read.json('yelp_academic_dataset_review.json')
reviews = reviews.select(['business_id', 'text', 'stars'])
reviews.count()

8021122

time: 37.8 s (started: 2021-04-22 13:10:34 -04:00)


Dataset is 5.89 GB. Project requires a dataset of at least 500 MB. Dataset has ~8,000,000 rows. By the below calculations, a dataset ~10x smaller, or a dataset with ~700,000 rows will satisfy the 500 MB requirement

In [4]:
# 5.89 / x = 0.5
# 0.5 * x = 5.89
x = 5.89 / 0.5
print(x)
print(int(8021122 / x))

11.78
680910
time: 0 ns (started: 2021-04-22 13:11:12 -04:00)


Convert stars column to string so that the logistic regression below treats one vs. five-stars as a classification rather than a regression column

In [5]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
reviews = reviews.withColumn('stars', col('stars').cast(StringType()))

time: 47 ms (started: 2021-04-22 13:11:12 -04:00)


### Subset data for Databricks

In [6]:
# x = (1/10)
# subset_df, large_df = reviews.randomSplit([x, 1 - x])

time: 0 ns (started: 2021-04-22 13:11:12 -04:00)


In [7]:
# subset_df.coalesce(1).write.format('json').save('reviews_1-10.json')

time: 0 ns (started: 2021-04-22 13:11:12 -04:00)


### Back to Jupyter
Merge the businesses and reviews datasets, filter by one or five-star reviews. This leaves us with ~1,000,000 rows.

In [8]:
businesses = spark.read.json('yelp_academic_dataset_business.json')
businesses = businesses.select(['business_id', 'categories'])

time: 1.61 s (started: 2021-04-22 13:11:12 -04:00)


In [9]:
restaurants = businesses.filter(businesses.categories.contains('Restaurants'))
restaurant_reviews = reviews.join(restaurants, "business_id", "inner")
restaurant_reviews = restaurant_reviews.select(['text', 'stars'])
restaurant_reviews.count()

5055992

time: 8.88 s (started: 2021-04-22 13:11:13 -04:00)


In [10]:
five_stars = restaurant_reviews.filter(restaurant_reviews.stars == 5.0)
one_stars = restaurant_reviews.filter(restaurant_reviews.stars == 1.0)
num_one_stars = one_stars.count()
five_stars = five_stars.limit(num_one_stars)
one_or_five_stars = five_stars.union(one_stars)
num_one_stars * 2

1256088

time: 9.62 s (started: 2021-04-22 13:11:22 -04:00)


Convert "1" and "5"-star reviews to 0 and 1 for classification.

In [11]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'stars', outputCol = 'categoryIndex')
indexed = indexer.fit(one_or_five_stars).transform(one_or_five_stars)

time: 18.9 s (started: 2021-04-22 13:11:32 -04:00)


By the below calculation, this one or five-stars dataset needs to be cut in about half to meet the 500 MB requirement.

In [12]:
# 680910 rows = ~500MB
# 1256088 * x = 680910
print(680910 / 1256088)

0.5420878155033724
time: 0 ns (started: 2021-04-22 13:11:51 -04:00)


In [13]:
x = 0.25
subset_df, large_df = indexed.randomSplit([x, 1 - x])
subset_count = subset_df.count()
print(subset_count)

314343
time: 28.7 s (started: 2021-04-22 13:11:51 -04:00)


An example review from the final subsetted dataset.

In [14]:
subset_df.take(1)

[Row(text="!!!!!\n\nWe found ourselves at Foxleys because we were visiting friends in town and they mentioned they had been wanting to try it. I'm so glad they waited!\n\nEverything we ordered was *excellent* (though some more than others - get the kale salad and the side ribs for sure), the wine selection was so so - we chose a moderately priced bottle of red which was fine (but apparently forgettable), and the service was friendly, but the waitress/bartender was seemingly (and understandably) overwhelmed.\n\nWe are trying to find a way to visit again so we can repeat the night and try the rest of the menu :)", stars='5.0', categoryIndex=1.0)]

time: 16.7 s (started: 2021-04-22 13:12:20 -04:00)


### Spark NLP

In [15]:
# Build NLP preprocessing pipeline
from sparknlp.base import DocumentAssembler
document_assembler = DocumentAssembler() \
.setInputCol('text') \
.setOutputCol('document')
from sparknlp.annotator import Tokenizer
tokenizer = Tokenizer() \
.setInputCols(['document']) \
.setOutputCol('tokenized') \
.setContextChars(['(', ')']) \
.setSplitChars(['-'])
from sparknlp.annotator import Normalizer
normalizer = Normalizer() \
.setInputCols(['tokenized']) \
.setOutputCol('normalized') \
.setLowercase(True) \
.setCleanupPatterns(['[^A-Za-z]'])
from sparknlp.annotator import LemmatizerModel
lemmatizer = LemmatizerModel \
.pretrained() \
.setInputCols(['normalized']) \
.setOutputCol('lemmatized')
from nltk.corpus import stopwords
nltk_stopwords = stopwords.words('english')
from sparknlp.annotator import StopWordsCleaner
stopwords_cleaner = StopWordsCleaner() \
.setInputCols(['lemmatized']) \
.setOutputCol('unigrams') \
.setStopWords(nltk_stopwords)
from sparknlp.annotator import NGramGenerator
ngrammer = NGramGenerator() \
    .setInputCols(['unigrams']) \
    .setOutputCol('ngrams') \
    .setN(2) \
    .setEnableCumulative(True) \
    .setDelimiter('_')
from sparknlp.base import Finisher
finisher = Finisher() \
.setInputCols(['unigrams', 'ngrams'])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
time: 11.1 s (started: 2021-04-22 13:12:36 -04:00)


In [16]:
# Assemble pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline() \
.setStages([document_assembler,
            tokenizer,
            normalizer,
            lemmatizer,
            stopwords_cleaner,
            ngrammer,
            finisher])

time: 0 ns (started: 2021-04-22 13:12:47 -04:00)


In [17]:
# Fit pipeline
processed_reviews = pipeline.fit(subset_df).transform(subset_df)

time: 500 ms (started: 2021-04-22 13:12:47 -04:00)


In [18]:
# Examine one processed review
processed_reviews.take(1)

[Row(text="!!!!!\n\nWe found ourselves at Foxleys because we were visiting friends in town and they mentioned they had been wanting to try it. I'm so glad they waited!\n\nEverything we ordered was *excellent* (though some more than others - get the kale salad and the side ribs for sure), the wine selection was so so - we chose a moderately priced bottle of red which was fine (but apparently forgettable), and the service was friendly, but the waitress/bartender was seemingly (and understandably) overwhelmed.\n\nWe are trying to find a way to visit again so we can repeat the night and try the rest of the menu :)", stars='5.0', categoryIndex=1.0, finished_unigrams=['find', 'foxleys', 'visit', 'friend', 'town', 'mention', 'want', 'try', 'im', 'glad', 'wait', 'everything', 'order', 'excellent', 'though', 'get', 'kale', 'salad', 'side', 'rib', 'sure', 'wine', 'selection', 'choose', 'moderately', 'price', 'bottle', 'red', 'fine', 'apparently', 'forgettable', 'service', 'friendly', 'waitressba

time: 17.8 s (started: 2021-04-22 13:12:48 -04:00)


In [19]:
# Train test split
(trainingData, testData) = processed_reviews.randomSplit([0.8, 0.2])

time: 16 ms (started: 2021-04-22 13:13:06 -04:00)


In [20]:
trainingData_count = subset_count * 0.8
print(trainingData_count)

251474.40000000002
time: 0 ns (started: 2021-04-22 13:13:06 -04:00)


In [21]:
# Count vectorization with minDF and maxDF parameters
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

tfizer = CountVectorizer(inputCol = 'finished_ngrams', outputCol = 'tf_features', 
                         minDF = 0.01, maxDF = 0.1, vocabSize = int(trainingData_count / 2))

tf_model = tfizer.fit(trainingData)
tf_result_training = tf_model.transform(trainingData)
tf_result_test = tf_model.transform(testData)

idfizer = IDF(inputCol = 'tf_features', outputCol = 'tfidf_features')

idf_model = idfizer.fit(tf_result_training)
tfidf_result_training = idf_model.transform(tf_result_training)
tfidf_result_test = idf_model.transform(tf_result_test)

time: 22min 24s (started: 2021-04-22 13:13:06 -04:00)


In [None]:
# Print vocablary length (i.e. # of columns)
print(len(tf_model.vocabulary))

In [None]:
tf_model.vocabulary

### Exploratory topic modeling

In [None]:
from pyspark.ml.clustering import LDA
num_topics = 10
max_iter = 10
lda = LDA(k = num_topics, 
          maxIter = max_iter, 
          featuresCol = 'tfidf_features')
lda_model = lda.fit(tfidf_result)

In [None]:
from pyspark.sql import types as T
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [None]:
num_top_words = 10
topics = lda_model \
.describeTopics(num_top_words) \
.withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate = 100)

### Logistic regression

In [None]:
# Define logistic regression with ridge
from pyspark.ml.classification import LogisticRegression
# lr = LogisticRegression(featuresCol = 'tfidf_features', labelCol = 'stars')
lr = LogisticRegression(featuresCol = 'tfidf_features', labelCol = 'categoryIndex', 
                        family = 'binomial', elasticNetParam = 0, regParam = 0.1)

In [None]:
# Print all parameters
{param[0].name: param[1] for param in lr.extractParamMap().items()}

In [None]:
# Fit LR model
lrModel = lr.fit(tfidf_result_training)

In [None]:
lrPredictions_training = lrModel.transform(tfidf_result_training)
lrPredictions_test = lrModel.transform(tfidf_result_test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'categoryIndex', predictionCol = 'prediction')

In [None]:
acc_training_lr = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "accuracy"})
acc_test_lr = evaluator.evaluate(lrPredictions_test, {evaluator.metricName: "accuracy"})
# f1 = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "f1"})
# weightedPrecision = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "weightedPrecision"})
# weightedRecall = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "weightedRecall"})

In [None]:
print('Training accuracy: ' + str(acc_training_lr))
print('Test accuracy: ' + str(acc_test_lr))

In [None]:
coef_matrix = lrModel.coefficientMatrix
coef_list = coef_matrix.toArray().tolist()

In [None]:
import pandas as pd
pd.DataFrame(coef_list).T.sort_values(0)

In [None]:
coef_df = pd.DataFrame(coef_list).T.sort_values(0, ascending = True)
for i in range(0, 20):
    print(tf_model.vocabulary[coef_df.index[i]])

In [None]:
coef_df = pd.DataFrame(coef_list).T.sort_values(0, ascending = False)
for i in range(0, 20):
    print(tf_model.vocabulary[coef_df.index[i]])

### Random forest

In [None]:
# from pyspark.ml.classification import RandomForestClassifier
# rf = RandomForestClassifier(featuresCol = 'tfidf_features', labelCol = 'categoryIndex')

In [None]:
# {param[0].name: param[1] for param in rf.extractParamMap().items()}

In [None]:
# rfModel = rf.fit(tfidf_result_training)

In [None]:
# rfPredictions_training = rfModel.transform(tfidf_result_training)
# rfPredictions_test = rfModel.transform(tfidf_result_test)

In [None]:
# acc_training_rf = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "accuracy"})
# acc_test_rf = evaluator.evaluate(rfPredictions_test, {evaluator.metricName: "accuracy"})
# # f1 = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "f1"})
# # weightedPrecision = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "weightedPrecision"})
# # weightedRecall = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "weightedRecall"})

In [None]:
# print('Training accuracy: ' + str(acc_training_rf))
# print('Test accuracy: ' + str(acc_test_rf))

In [None]:
# coef_matrix = rfModel.featureImportances
# coef_list = coef_matrix.toArray().tolist()

In [None]:
# import pandas as pd
# coef_df = pd.DataFrame(coef_list).sort_values(0, ascending = False)
# for i in range(0, 40):
#     print(tf_model.vocabulary[coef_df.index[i]])