# Text processing and topic modelling of Amazon reviews 
* Automotive category analysis

### Importing libs

In [1]:
import os
import re
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from delta import *
import pyspark.sql.functions as F

from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer
from sparknlp.annotator import Lemmatizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.annotator import Normalizer
from sparknlp.annotator import StopWordsCleaner
from sparknlp.base import Finisher

In [2]:
load_dotenv()

STORAGE_ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
STORAGE_ACCOUNT_KEY = os.getenv('STORAGE_ACCOUNT_KEY')

builder = SparkSession.builder\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,\
io.delta:delta-core_2.12:2.4.0,\
io.delta:delta-storage:2.4.0,\
com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.0")\
        .config(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.dfs.core.windows.net", f"{STORAGE_ACCOUNT_KEY}")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()



In [3]:
root = 'abfss://default@stdatalakeakita.dfs.core.windows.net/synapse/workspaces/syn-synfactoreddatathon01-dev'

df = spark.read\
.format('delta')\
.load(f'{root}/silver/silver_amazon_reviews_automotive')

df.show()

+--------------+----------+--------------------+------+-------+--------------------+--------------------+--------------------+--------------------+--------------+-----------------+------------+------------+--------------------+------------+---------+-------------+
|   reviewer_id|      asin|               title|  rank|overall|flg_positive_overall|flg_negative_overall|             summary|         review_text|         brand|     2nd_category|3rd_category|4th_category|            also_buy|also_buy_qty|also_view|also_view_qty|
+--------------+----------+--------------------+------+-------+--------------------+--------------------+--------------------+--------------------+--------------+-----------------+------------+------------+--------------------+------------+---------+-------------+
|A2TYWZQNOGX2YS|B0001EVUCM|Auto Ventshade 77...|364973|      5|                true|               false|          Vent Shade|Did great for me....|Auto Ventshade|Replacement Parts| Body & Trim|        Body

### Pre-processing review text

#### 1. Removing ponctuation

In [4]:
def clean_text(c):
  c = F.lower(c)
  c = F.regexp_replace(c, "(https?\://)\S+", "") # Remove links
  c = F.regexp_replace(c, "(\\n)|\n|\r|\t", "") # Remove CR, tab, and LR
  c = F.regexp_replace(c, "(?:(?:[0-9]{2}[:\/,]){2}[0-9]{2,4})", "") # Remove dates
  c = F.regexp_replace(c, "@([A-Za-z0-9_]+)", "") # Remove usernames
  c = F.regexp_replace(c, "[0-9]", "") # Remove numbers
  c = F.regexp_replace(c, "\{|\}|\[|\]|\(|\)|\;|\:|\/|\#|\.|\?|\!|\&|\"|\,", "") # Remove symbols
  c = F.regexp_replace(c, ' +', ' ')# Remove multiple whitespaces
  c = F.trim(c)# Remove trailing whitespaces
  return c

df = df.withColumn("review_text_process", clean_text(F.col("review_text")))

#### 2. Document Assembler

In [5]:
# Removing null texts
df_reviews_automotive = (
    df
    .withColumn('review_text_process', F.coalesce(F.col('review_text_process'), F.lit('Null Review')))                    
)

# Step 1: Transforms raw texts to `document` annotation
document_assembler = (
    DocumentAssembler()
    .setInputCol("review_text_process")
    .setOutputCol("document")
)

#### 3. Tokenizer

In [6]:
# Step 3: Tokenization
tokenizer = (
    Tokenizer()
    .setInputCols(["document"])
    .setOutputCol("token")

)

#### 4. Normalizer

In [7]:
normalizer= Normalizer()\
    .setInputCols(["token"])\
    .setOutputCol("normalized")\
    .setLowercase(True)

#### 5. Removing Stop Words

In [8]:
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

stopwordsCleaner =StopWordsCleaner()\
    .setInputCols(["normalized"])\
    .setOutputCol("no_stop_words")\
    .setStopWords(eng_stopwords)

#### 6. Lemmatizer

In [9]:
lemmatizer = LemmatizerModel.pretrained() \
.setInputCols(["no_stop_words"]) \
.setOutputCol("lemma")

lemma_antbnc download started this may take some time.
Approximate size to download 907,6 KB
[OK!]


#### 7. POSTagger

In [10]:
from sparknlp.annotator import PerceptronModel
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemma']) \
     .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 3,9 MB
[OK!]


#### 8. Chunk

In [11]:
from sparknlp.annotator import Chunker
allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)

#### 9. Finisher

In [12]:
finisher = Finisher() \
.setInputCols(['lemma', 'ngrams'])

#### 10. Pipeline

In [13]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([document_assembler,
                 tokenizer,
                 normalizer,
                 stopwordsCleaner,
                 lemmatizer,
                 pos_tagger,
                 chunker,
                 finisher])

df_reviews_automotive = pipeline.fit(df_reviews_automotive).transform(df_reviews_automotive)

In [14]:
df_reviews_automotive = df_reviews_automotive.withColumn('final',
     F.concat(F.col('finished_lemma'), 
            F.col('finished_ngrams')))

df_reviews_automotive.limit(10).show(truncate=False)

+--------------+----------+---------------------------------------------------------------------------+-----+-------+--------------------+--------------------+-------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### 11. Count Vectorizer

In [15]:
from pyspark.ml.feature import CountVectorizer
tfizer = CountVectorizer(inputCol='finished_lemma',
                         outputCol='tf_features')
tf_model = tfizer.fit(df_reviews_automotive)
df_reviews_automotive_tf = tf_model.transform(df_reviews_automotive)

#### 12. IDF

In [16]:
from pyspark.ml.feature import IDF
idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(df_reviews_automotive_tf)
df_reviews_automotive_tfidf = idf_model.transform(df_reviews_automotive_tf)

#### 13. LDA

In [18]:
from pyspark.ml.clustering import LDA
lda = LDA(k=6, seed=1, optimizer="online", featuresCol='tf_idf_features')
lda.setMaxIter(50)
lda.clear(lda.maxIter)
model = lda.fit(df_reviews_automotive_tfidf)
model.setSeed(1)

LocalLDAModel: uid=LDA_4cc780654734, k=6, numFeatures=262144

In [25]:
model.describeTopics(5).show()

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|    [0, 1, 6, 10, 2]|[0.01342992135011...|
|    1|   [4, 14, 7, 1, 73]|[0.00852342529036...|
|    2|  [23, 5, 4, 220, 2]|[0.00544607578095...|
|    3|[97, 147, 28, 7, 14]|[0.00992288063464...|
|    4| [65, 3, 132, 9, 22]|[0.01719309367595...|
|    5|    [8, 23, 7, 1, 2]|[0.00622057015752...|
+-----+--------------------+--------------------+



In [29]:
print('TOPIC 0: ' + tf_model.vocabulary[0], tf_model.vocabulary[1], tf_model.vocabulary[6], tf_model.vocabulary[10], tf_model.vocabulary[2])
print('TOPIC 1: ' + tf_model.vocabulary[4], tf_model.vocabulary[14], tf_model.vocabulary[7], tf_model.vocabulary[1], tf_model.vocabulary[73])
print('TOPIC 2: ' + tf_model.vocabulary[23], tf_model.vocabulary[5], tf_model.vocabulary[4], tf_model.vocabulary[220], tf_model.vocabulary[2])
print('TOPIC 3: ' + tf_model.vocabulary[97], tf_model.vocabulary[147], tf_model.vocabulary[28], tf_model.vocabulary[7], tf_model.vocabulary[14])
print('TOPIC 4: ' + tf_model.vocabulary[65], tf_model.vocabulary[3], tf_model.vocabulary[132], tf_model.vocabulary[9], tf_model.vocabulary[22])
print('TOPIC 5: ' + tf_model.vocabulary[8], tf_model.vocabulary[23], tf_model.vocabulary[7], tf_model.vocabulary[1], tf_model.vocabulary[2])

TOPIC 0: great work product install fit
TOPIC 1: use car get work battery
TOPIC 2: light well use bag fit
TOPIC 3: filter key love get car
TOPIC 4: tire good awesome look quality
TOPIC 5: one light get work fit


In [22]:
lda_path = 'models' + "/lda_20230108"
lda.save(lda_path)
sameLDA = LDA.load(lda_path)
distributed_model_path = 'models' + "/lda_distributed_model_20230108"
model.save(distributed_model_path)

In [21]:
import pyspark.sql.types as T

vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

num_top_words = 5
topics = (model
     .describeTopics(num_top_words)
     .withColumn('topicWords', udf_to_words(F.col('termIndices'))))
topics.select('topic', 'topicWords').show(truncate=False)

Py4JJavaError: An error occurred while calling o684.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 252.0 failed 1 times, most recent failure: Lost task 0.0 in stage 252.0 (TID 1165) (DESKTOP-KP1GLEP executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 25 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 25 more
