<a href="https://colab.research.google.com/github/bbchen33/Machine-Learning/blob/master/Amazon_review_pyspark_MachineLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Amazon cellphone reviews data from https://www.kaggle.com/grikomsn/amazon-cell-phones-reviews

My goal is to use PySpark and machine learning to determine if it's possible to predict the ratings based on review contents.

In [6]:
!unzip amazon-cell-phones-reviews.zip

Archive:  amazon-cell-phones-reviews.zip
  inflating: 20191226-items.csv      
  inflating: 20191226-reviews.csv    


Use Java 8 and install pysark + spark-nlp  in colab https://github.com/JohnSnowLabs/spark-nlp#google-colab-notebook

In [1]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4
# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.4.1

openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
Collecting pyspark==2.4.4
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 38.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130388 sha256=af33dc8a7c126826138718af55eff3673a253f2942c37cbbdb594c24814e5b3f
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d18423005

In [2]:
import sparknlp
spark = sparknlp.start()

print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
Apache Spark version


'2.4.4'

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('phone_reviews').getOrCreate()

In [0]:
df = spark.read.csv('20191226-reviews.csv', header = True, inferSchema = True)

In [8]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- name: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- helpfulVotes: string (nullable = true)



In [9]:
df.describe().show()

+-------+----------+--------------------+------------------+-----------------+------------------+--------------------+--------------------+
|summary|      asin|                name|            rating|             date|             title|                body|        helpfulVotes|
+-------+----------+--------------------+------------------+-----------------+------------------+--------------------+--------------------+
|  count|     67986|               67986|             67986|            67986|             67986|               67974|               27826|
|   mean|      null| 9.817021480769231E8|3.8079163357161767|             null|252.29166666666666|                 9.0|   7.538604467286025|
| stddev|      null|2.5660842391329656E9|1.5829057573283758|             null| 612.3482542391192|                 NaN|  29.950618310021497|
|    min|B0000SX2UC|  """I am"" Bradley"|                 1|    April 1, 2011|                 !| 4000mAh and Dual...| "" Hand Candy""U...|
|    max|B0825BB7SG|

By using describe(), one can see that some of the review body is missing with 67974 entries rather than 67986 like the most of the columns. 

In [0]:
new_df = df.select('rating', 'body')

We can filter out rows where 'body' is null.

In [0]:
new_df = new_df.filter(df.body.isNotNull())

Now we can process the text in the body for machine learning.

In [12]:
new_df.columns

['rating', 'body']

Using Spark-nlp library to process text. 

In [0]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml import Pipeline

document_assembler = DocumentAssembler() \
    .setInputCol('body')

sentence_detector = SentenceDetector() \
    .setInputCols(['document']) \
    .setOutputCol('sentence')

tokenizer = Tokenizer() \
    .setInputCols(['sentence']) \
    .setOutputCol('token')

stemmer = Stemmer() \
    .setInputCols(['token']) \
    .setOutputCol('stem_token')
normalizer = Normalizer() \
    .setInputCols(['stem_token']) \
    .setOutputCol('normalized')

finisher = Finisher() \
    .setInputCols(['normalized']) \
    .setOutputCols(['ntokens']) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(True)


In [0]:
nlp_pipeline = Pipeline(stages = [document_assembler, 
                                  sentence_detector, 
                                  tokenizer, 
                                  stemmer, 
                                  normalizer, 
                                  finisher])

In [0]:
processed_df = nlp_pipeline.fit(new_df).transform(new_df)

In [16]:
processed_df.show()

+------+--------------------+--------------------+
|rating|                body|             ntokens|
+------+--------------------+--------------------+
|     3|I had the Samsung...|[i, had, the, sam...|
|     1|"Due to a softwar...|[due, to, a, soft...|
|     5|This is a great, ...|[thi, i, a, great...|
|     3|I love the phone ...|[i, love, the, ph...|
|     4|The phone has bee...|[the, phone, ha, ...|
|     4|Hello, I have thi...|[hello, i, have, ...|
|     5|Cool. Cheap. Colo...|[cool, cheap, col...|
|     4|The 3599i is over...|[the, i, i, overa...|
|     5|I've never owned ...|[iv, never, own, ...|
|     3|ok well im in sch...|[ok, well, im, in...|
|     4|I've had this pho...|[iv, had, thi, ph...|
|     1|1.) Slow - If you...|[slow, if, you, w...|
|     2|I bought this pho...|[i, bought, thi, ...|
|     4|This is an excell...|[thi, i, an, exce...|
|     1|DON'T BUY OUT OF ...|[dont, bui, out, ...|
|     4|I have been with ...|[i, have, been, w...|
|     5|I just got it and...|[i

Additional text processing using Spark ml feature library. 

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF

stopwords = StopWordsRemover.loadDefaultStopWords('english')
sw_remover = StopWordsRemover(inputCol = 'ntokens', outputCol = 'clean_tokens', stopWords = stopwords)
cv = CountVectorizer(inputCol = 'clean_tokens', outputCol = 'TF', vocabSize = 500)
idf = IDF(inputCol = 'TF', outputCol = 'IDF')

In [0]:
nlp_pipeline2 = Pipeline(stages = [sw_remover, cv, idf])
processed_df2 = nlp_pipeline2.fit(processed_df).transform(processed_df)

In [19]:
processed_df2.show()

+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|rating|                body|             ntokens|        clean_tokens|                  TF|                 IDF|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     3|I had the Samsung...|[i, had, the, sam...|[samsung, awhil, ...|(500,[0,1,2,3,4,5...|(500,[0,1,2,3,4,5...|
|     1|"Due to a softwar...|[due, to, a, soft...|[due, softwar, is...|(500,[0,1,3,13,17...|(500,[0,1,3,13,17...|
|     5|This is a great, ...|[thi, i, a, great...|[thi, great, reli...|(500,[0,1,5,8,10,...|(500,[0,1,5,8,10,...|
|     3|I love the phone ...|[i, love, the, ph...|[love, phone, bec...|(500,[0,14,17,19,...|(500,[0,14,17,19,...|
|     4|The phone has bee...|[the, phone, ha, ...|[phone, ha, great...|(500,[0,2,5,11,12...|(500,[0,2,5,11,12...|
|     4|Hello, I have thi...|[hello, i, have, ...|[hello, thi, phon...|(500,[0,1,2,4,5,6

In [0]:
final_df = processed_df2.select(['rating', 'IDF'])

In [0]:
train_df, test_df = final_df.randomSplit(weights = [0.7, 0.3], seed = 1)

Using a simple logistic regression model for the first prediction to see how well it can do.

In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'IDF', labelCol = 'rating', maxIter = 10)
lrModel = lr.fit(train_df)

In [0]:
trainingSummary = lrModel.summary

In [25]:
trainingSummary.accuracy

0.6805068921385561

In [0]:
predictions = lrModel.transform(test_df)

In [27]:
predictions.columns

['rating', 'IDF', 'rawPrediction', 'probability', 'prediction']

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'rating', predictionCol = 'prediction', metricName = 'accuracy')

In [29]:
evaluator.evaluate(predictions)

0.6690955639801093

The accuracy for the test set is 0.67. It's not amazing but given how simple the model is, 68% accuracy isn't bad. Things that can be done to improve the accuracy score: 
1. Word embedding instead of TF-IDF. Word embedding cares more about the context while TF-IDF only cares about frequency of the words. 
2. Sentiment analysis since the rating is likely to depend on the sentiment instead of the specific words used in the review. 
3. A more complex machine learning model. In my experience, SVC tends to work well with vectorized texts. 

---




Trying word2vec embedding instead of TF-IDF.

In [0]:
from pyspark.ml.feature import StopWordsRemover, Word2Vec, VectorAssembler
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF


stopwords = StopWordsRemover.loadDefaultStopWords('english')
sw_remover = StopWordsRemover(inputCol = 'ntokens', outputCol = 'clean_tokens', stopWords = stopwords)

word2vec = Word2Vec(vectorSize = 100, minCount = 2, seed = 1,
                    inputCol = 'clean_tokens', outputCol = 'embedding')
assembler = VectorAssembler(inputCols = ['embedding'], outputCol = 'feature')

In [0]:
nlp_pipeline3 = Pipeline(stages = [sw_remover, word2vec, assembler])
processed_df3 = nlp_pipeline3.fit(processed_df).transform(processed_df)

In [36]:
processed_df3.show()

+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|rating|                body|             ntokens|        clean_tokens|           embedding|             feature|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     3|I had the Samsung...|[i, had, the, sam...|[samsung, awhil, ...|[-0.0635263526240...|[-0.0635263526240...|
|     1|"Due to a softwar...|[due, to, a, soft...|[due, softwar, is...|[-0.0084085425062...|[-0.0084085425062...|
|     5|This is a great, ...|[thi, i, a, great...|[thi, great, reli...|[-0.0545550114236...|[-0.0545550114236...|
|     3|I love the phone ...|[i, love, the, ph...|[love, phone, bec...|[-0.0973284343384...|[-0.0973284343384...|
|     4|The phone has bee...|[the, phone, ha, ...|[phone, ha, great...|[-0.1161695932653...|[-0.1161695932653...|
|     4|Hello, I have thi...|[hello, i, have, ...|[hello, thi, phon...|[-0.0121190463368

In [0]:
final_df2 = processed_df3.select(['rating', 'feature'])

In [0]:
train_df2, test_df2 = processed_df3.randomSplit([0.7, 0.3], seed = 1)

In [0]:
lr = LogisticRegression(featuresCol = 'feature', labelCol = 'rating', maxIter = 10)
lrModel = lr.fit(train_df2)

In [40]:
trainingSummary = lrModel.summary
print(trainingSummary.accuracy)

0.67299582485366


In [0]:
predictions = lrModel.transform(test_df2)
evaluator = MulticlassClassificationEvaluator(
    labelCol = 'rating', predictionCol = 'prediction', metricName = 'accuracy')

In [42]:
evaluator.evaluate(predictions)

0.6690463295751071

Just want to point out that word2vec didn't work better here but also the only input for embedding training is the text in this file. It might work better using pre-trained model. 