In [1]:
import sparknlp
 
spark = sparknlp.start() # for GPU training >> sparknlp.start(gpu = True) # for Spark 2.3 =>> sparknlp.start(spark23 = True)
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
 
print("Spark NLP version", sparknlp.version())
 
print("Apache Spark version:", spark.version)

Spark NLP version 3.3.4
Apache Spark version: 3.1.2


In [2]:
reviews = spark.read.csv('_data/dataset_review.csv',inferSchema=True,header=True)
reviews = reviews.select(['business_id', 'text', 'stars'])
reviews.count()

8635403

In [3]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
reviews = reviews.withColumn('stars', col('stars').cast(StringType()))

In [4]:
five_stars = reviews.filter(reviews.stars == 5.0)
one_stars = reviews.filter(reviews.stars == 1.0)
num_one_stars = one_stars.count()
five_stars = five_stars.limit(num_one_stars)
one_or_five_stars = five_stars.union(one_stars)
print(five_stars.count())
num_one_stars

1262800


1262800

In [5]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'stars', outputCol = 'categoryIndex')
indexed = indexer.fit(one_or_five_stars).transform(one_or_five_stars)

In [6]:
x=0.25
subset_df, large_df = indexed.randomSplit([x, 1 - x])
subset_count = subset_df.count()
print(subset_count)

631353


In [7]:
#subset_df.take(1)

In [8]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/john/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [9]:
# Build NLP preprocessing pipeline
from sparknlp.base import DocumentAssembler
document_assembler = DocumentAssembler() \
.setInputCol('text') \
.setOutputCol('document')
from sparknlp.annotator import Tokenizer
tokenizer = Tokenizer() \
.setInputCols(['document']) \
.setOutputCol('tokenized') \
.setContextChars(['(', ')']) \
.setSplitChars(['-'])
from sparknlp.annotator import Normalizer
normalizer = Normalizer() \
.setInputCols(['tokenized']) \
.setOutputCol('normalized') \
.setLowercase(True) \
.setCleanupPatterns(['[^A-Za-z]'])
from sparknlp.annotator import LemmatizerModel
lemmatizer = LemmatizerModel \
.pretrained() \
.setInputCols(['normalized']) \
.setOutputCol('lemmatized')
from nltk.corpus import stopwords
nltk_stopwords = stopwords.words('english')
from sparknlp.annotator import StopWordsCleaner
stopwords_cleaner = StopWordsCleaner() \
.setInputCols(['lemmatized']) \
.setOutputCol('unigrams') \
.setStopWords(nltk_stopwords)
from sparknlp.annotator import NGramGenerator
ngrammer = NGramGenerator() \
    .setInputCols(['unigrams']) \
    .setOutputCol('ngrams') \
    .setN(2) \
    .setEnableCumulative(True) \
    .setDelimiter('_')
from sparknlp.base import Finisher
finisher = Finisher() \
.setInputCols(['unigrams', 'ngrams'])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [10]:
# Assemble pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline() \
.setStages([document_assembler,
            tokenizer,
            normalizer,
            lemmatizer,
            stopwords_cleaner,
            ngrammer,
            finisher])

In [11]:
processed_reviews = pipeline.fit(subset_df).transform(subset_df)

In [12]:
processed_reviews.take(1)

[Row(business_id='--0DF12EMHYI8XIgoFha6A', text="Being from Chicago originally and moving this year  we were in the market for a good mechanic. A good  honest mechanic. Joe himself answers the phone when we call and is the epitome of southern hospitality. We have had our car in his shop twice since October and his work is always excellent and his prices are honest. He won't try and convince you to pay for bogus parts you don't need or fake repairs. A friend recommended him for his honesty and affordability. After dropping the car off one time  he offered to drive us home so we wouldn't have to walk 2 miles-AND- my husband is somewhat handy with cars and Joe offered to let him borrows the special wrench he would need to install it himself-FOR FREE- and Joe could order the part for him (O2 sensor) and only charge us for that and not labor! Joe even said we could do it at his shop!  We are greatful for Joe and would recommend him to anyone looking for a good mechanic :)", stars='5.0', cat

In [13]:
(trainingData, testData) = processed_reviews.randomSplit([0.8, 0.2])

In [14]:
trainingData_count = subset_count * 0.8
print(trainingData_count)

505082.4


In [15]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

In [16]:
tfizer = CountVectorizer(inputCol = 'finished_ngrams', outputCol = 'tf_features', 
                         minDF = 0.01, maxDF = 0.1, vocabSize = int(trainingData_count / 2))

tf_model = tfizer.fit(trainingData)
tf_result_training = tf_model.transform(trainingData)
tf_result_test = tf_model.transform(testData)

idfizer = IDF(inputCol = 'tf_features', outputCol = 'tfidf_features')

idf_model = idfizer.fit(tf_result_training)
tfidf_result_training = idf_model.transform(tf_result_training)
tfidf_result_test = idf_model.transform(tf_result_test)

In [17]:
tf_model.save("_data/tfModel.model")
idf_model.save("_data/idfModel.model")

In [18]:
print(tfidf_result_training.show(2))
print(tfidf_result_test.show(2))

+--------------------+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|         business_id|                text|stars|categoryIndex|   finished_unigrams|     finished_ngrams|         tf_features|      tfidf_features|
+--------------------+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|--0DF12EMHYI8XIgo...|Being from Chicag...|  5.0|          1.0|[chicago, origina...|[chicago, origina...|(826,[0,1,8,14,26...|(826,[0,1,8,14,26...|
|--0r8K_AQ4FZfLsX3...|I am really happy...|  5.0|          1.0|[really, happy, b...|[really, happy, b...|(826,[32,84,93,98...|(826,[32,84,93,98...|
+--------------------+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows

None
+--------------------+--------------------+-----+-------------+-------------------

In [20]:
# CountVectorizer並CountVectorizerModel旨在幫助將文本文檔集合轉換為標記計數向量。
# 當先驗字典不可用時，CountVectorizer可用作Estimator提取詞彙表，並生成CountVectorizerModel. 
# 該模型通過詞彙表為文檔生成稀疏表示，然後可以將其傳遞給其他算法，如 LDA。
from pyspark.ml.feature import CountVectorizerModel
tf_model = CountVectorizerModel.load("file:/home/john/_data/tfModel.model")

In [21]:
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("file:/home/john/_data/idfModel.model")

In [22]:
# Print vocablary length (i.e. # of columns)
print(len(tf_model.vocabulary))

826


In [23]:
tf_model.vocabulary

['car',
 'pay',
 'two',
 'drink',
 'another',
 'review',
 'table',
 'still',
 'walk',
 'right',
 'amazing',
 'long',
 'chicken',
 'check',
 'friend',
 'around',
 'manager',
 'definitely',
 'late',
 'taste',
 'every',
 'last',
 'sure',
 'store',
 'delicious',
 'location',
 'since',
 'everything',
 'next',
 'cant',
 'little',
 'night',
 'business',
 'week',
 'guy',
 'visit',
 'clean',
 'sit',
 'care',
 'menu',
 'room',
 'help',
 'bring',
 'lot',
 'customer_service',
 'home',
 'money',
 'start',
 'bar',
 'many',
 'charge',
 'put',
 'end',
 'star',
 'pizza',
 'something',
 'away',
 'keep',
 'wasnt',
 'seem',
 'close',
 'area',
 'fry',
 'move',
 'phone',
 'buy',
 'show',
 'let',
 'company',
 'return',
 'open',
 'meal',
 'go_back',
 'month',
 'rude',
 'owner',
 'come_back',
 'small',
 'server',
 'park',
 'seat',
 'offer',
 'shop',
 'someone',
 'super',
 'anything',
 'line',
 'nothing',
 'fresh',
 'hair',
 'serve',
 'old',
 'sauce',
 'pick',
 'job',
 'pretty',
 'top',
 'decide',
 'happy',
 's

In [24]:
#Exploratory topic modeling
# 潛在狄利克雷分配 (LDA)
# LDA被實現為Estimator同時支持EMLDAOptimizer和OnlineLDAOptimizer，並生成 aLDAModel作為基本模型。如果需要，專家用戶可以將LDAModel生成的EMLDAOptimizer轉換為生成的 DistributedLDAModel
from pyspark.ml.clustering import LDA
num_topics = 10
max_iter = 10
lda = LDA(k = num_topics, 
          maxIter = max_iter, 
          featuresCol = 'tfidf_features')
ldaModel = lda.fit(tfidf_result_training)

In [25]:
from pyspark.sql import types as T
from pyspark.sql import functions as F
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [26]:
# 取10個字詞
num_top_words = 10
topics = ldaModel \
.describeTopics(num_top_words) \
.withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate = 100)

+-----+-----------------------------------------------------------------------------------+
|topic|                                                                         topicWords|
+-----+-----------------------------------------------------------------------------------+
|    0|       [car, rude, manager, dog, store, employee, customer_service, guy, buy, walk]|
|    1|[pizza, wing, tea, server, great_service, every, drink, cup, every_time, excellent]|
|    2|            [chicken, delicious, fry, taste, sauce, dish, salad, fresh, menu, rice]|
|    3|  [dr, appointment, doctor, cream, ice, care, ice_cream, patient, daughter, office]|
|    4|               [massage, park, bar, beer, house, local, selection, space, fun, lot]|
|    5|             [nail, drink, bartender, bar, burger, game, play, crowd, night, close]|
|    6|              [room, hotel, delivery, stay, deliver, coffee, bed, book, desk, thai]|
|    7|         [hair, cut, store, sandwich, dress, salon, burger, product, colo

In [27]:
# Define logistic regression with ridge
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'tfidf_features', labelCol = 'categoryIndex', 
                        family = 'binomial', elasticNetParam = 0, regParam = 0.1)

In [28]:
# Print all parameters
{param[0].name: param[1] for param in lr.extractParamMap().items()}

{'aggregationDepth': 2,
 'elasticNetParam': 0.0,
 'family': 'binomial',
 'featuresCol': 'tfidf_features',
 'fitIntercept': True,
 'labelCol': 'categoryIndex',
 'maxBlockSizeInMB': 0.0,
 'maxIter': 100,
 'predictionCol': 'prediction',
 'probabilityCol': 'probability',
 'rawPredictionCol': 'rawPrediction',
 'regParam': 0.1,
 'standardization': True,
 'threshold': 0.5,
 'tol': 1e-06}

In [29]:
# Fit LR model
lrModel = lr.fit(tfidf_result_training)

In [31]:
lrModel.save("_data/lrModel.model")

In [37]:
# 邏輯回歸是一種流行的預測分類響應的方法。它是預測結果概率的廣義線性模型的特例。在spark.ml邏輯回歸中，可以使用二項邏輯回歸來預測二元結果
from pyspark.ml.classification import LogisticRegressionModel
lrModel = LogisticRegressionModel.load("file:/home/john/_data/lrModel.model")

In [38]:
lrPredictions_training = lrModel.transform(tfidf_result_training)
lrPredictions_test = lrModel.transform(tfidf_result_test)

In [39]:
# ML 中的一項重要任務是模型選擇，或使用數據為給定任務找到最佳模型或參數。這也稱為調諧。可以針對單個EstimatorsLogisticRegression或Pipeline包括多個算法、特徵化和其他步驟的整個s 進行調整。用戶可以Pipeline一次調整整體，而不是Pipeline單獨調整每個元素
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'categoryIndex', predictionCol = 'prediction')

In [40]:
# spark.ml向量和矩陣類型。用於轉換實用工具DataFrame從列spark.mllib.linalg到spark.ml.linalg類型
acc_training_lr = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "accuracy"})
acc_test_lr = evaluator.evaluate(lrPredictions_test, {evaluator.metricName: "accuracy"})
# f1 = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "f1"})
# weightedPrecision = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "weightedPrecision"})
# weightedRecall = evaluator.evaluate(lrPredictions_training, {evaluator.metricName: "weightedRecall"})

In [41]:
print('Training accuracy: ' + str(acc_training_lr))
print('Test accuracy: ' + str(acc_test_lr))

Training accuracy: 0.9321971173731405
Test accuracy: 0.9325505751685839


In [42]:
# 系數矩陣
coef_matrix = lrModel.coefficientMatrix
coef_list = coef_matrix.toArray().tolist()

In [48]:
import pandas as pd
pd.DataFrame(coef_list).T.sort_values(0)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,816,817,818,819,820,821,822,823,824,825
0,0.0,0.115201,3e-06,8e-06,0.000609,0.002514,0.0,0.000546,0.0,0.0,...,0.0,0.0,0.0,0.000272,0.0,0.0,0.0,0.0,0.0,8e-06


In [49]:
coef_df = pd.DataFrame(coef_list).T.sort_values(0, ascending = True)
for i in range(0, 20):
    print(tf_model.vocabulary[coef_df.index[i]])

car


IndexError: index 1 is out of bounds for axis 0 with size 1

In [32]:
# 隨機森林 是決策樹的集合。隨機森林是用於分類和回歸的最成功的機器學習模型之一。它們結合了許多決策樹以降低過度擬合的風險。與決策樹一樣，隨機森林處理分類特徵，擴展到多類分類設置，不需要特徵縮放，並且能夠捕捉非線性和特徵交互。
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'tfidf_features', labelCol = 'categoryIndex')

In [33]:
{param[0].name: param[1] for param in rf.extractParamMap().items()}

{'bootstrap': True,
 'cacheNodeIds': False,
 'checkpointInterval': 10,
 'featureSubsetStrategy': 'auto',
 'featuresCol': 'tfidf_features',
 'impurity': 'gini',
 'labelCol': 'categoryIndex',
 'leafCol': '',
 'maxBins': 32,
 'maxDepth': 5,
 'maxMemoryInMB': 256,
 'minInfoGain': 0.0,
 'minInstancesPerNode': 1,
 'minWeightFractionPerNode': 0.0,
 'numTrees': 20,
 'predictionCol': 'prediction',
 'probabilityCol': 'probability',
 'rawPredictionCol': 'rawPrediction',
 'seed': 6889613275193631008,
 'subsamplingRate': 1.0}

In [34]:
rfModel = rf.fit(tfidf_result_training)

In [35]:
rfPredictions_training = rfModel.transform(tfidf_result_training)
rfPredictions_test = rfModel.transform(tfidf_result_test)

In [44]:
acc_training_rf = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "accuracy"})
acc_test_rf = evaluator.evaluate(rfPredictions_test, {evaluator.metricName: "accuracy"})
# f1 = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "f1"})
# weightedPrecision = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "weightedPrecision"})
# weightedRecall = evaluator.evaluate(rfPredictions_training, {evaluator.metricName: "weightedRecall"})

In [45]:
print('Training accuracy: ' + str(acc_training_rf))
print('Test accuracy: ' + str(acc_test_rf))

Training accuracy: 0.8124372901011868
Test accuracy: 0.8121221737405792


In [46]:
coef_matrix = rfModel.featureImportances
coef_list = coef_matrix.toArray().tolist()

In [47]:
import pandas as pd
coef_df = pd.DataFrame(coef_list).sort_values(0, ascending = False)
for i in range(0, 40):
    print(tf_model.vocabulary[coef_df.index[i]])

pay
horrible
amazing
definitely
manager
awesome
delicious
highly
waste
rude
excellent
money
highly_recommend
terrible
poor
fantastic
someone
charge
perfect
business
customer_service
nothing
fresh
late
happen
speak
bill
refund
receive
awful
person
wonderful
cold
ignore
suppose
dirty
knowledgeable
love_place
company
send
