# Description:
Goal: Amazon Reviews Sentiment Analysis

### Initialization

In [0]:
#environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_CLASSPATH"] = '/content/spark-2.4.5-bin-hadoop2.7'

In [0]:
# Install spark-related depdencies for Python
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 48.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=73d62cb4a1940c75edd84d08ef6b2171b317078c45e73bb80b7c3b36a0357cb5
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark NLP') \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.2.2") \
    .getOrCreate()

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
import urllib
import pandas as pd
from pyspark import SparkContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
import pyspark.sql.functions as F

!pip install spark-nlp
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import LightPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import DocumentAssembler, Finisher

Collecting spark-nlp
[?25l  Downloading https://files.pythonhosted.org/packages/0f/0b/ebb2aa778090574a63a14f3da4446bc3424b3728cef0500e288e13f75c17/spark_nlp-2.4.3-py2.py3-none-any.whl (108kB)
[K     |███                             | 10kB 18.6MB/s eta 0:00:01[K     |██████                          | 20kB 5.0MB/s eta 0:00:01[K     |█████████                       | 30kB 7.1MB/s eta 0:00:01[K     |████████████                    | 40kB 9.0MB/s eta 0:00:01[K     |███████████████▏                | 51kB 5.6MB/s eta 0:00:01[K     |██████████████████▏             | 61kB 6.5MB/s eta 0:00:01[K     |█████████████████████▏          | 71kB 7.4MB/s eta 0:00:01[K     |████████████████████████▏       | 81kB 8.2MB/s eta 0:00:01[K     |███████████████████████████▏    | 92kB 9.0MB/s eta 0:00:01[K     |██████████████████████████████▎ | 102kB 7.2MB/s eta 0:00:01[K     |████████████████████████████████| 112kB 7.2MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully instal

In [0]:
###########
#I am using a local copy of the above file, stored on gDrive, 
#instead of re-downloading the source file.
###########
source_url = r"http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Kindle_Store_5.json.gz"
data_folder = r'/content/gdrive/My Drive/thinkful/colab_datasets/amazon_reviews/'
reviews_arx = os.path.join(data_folder, 'reviews_Kindle_Store_5.json.gz')
reviews_raw = os.path.join(data_folder, 'Grocery_and_Gourmet_Food_5.json')
if not os.path.exists(reviews_raw):
    if not os.path.exists(data_folder):
        os.mkdir(data_folder)
    if not os.path.exists(reviews_arx):
        urllib.request.urlretrieve(source_url, filename=reviews_arx)
    import shutil
    import gzip
    with gzip.open(reviews_arx, 'rb') as f_in:
        with open(reviews_raw, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

SPARK_URL = "local[*]"
APP_NAME  = "amazon_food_reviews"

In [0]:
print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
2.2.2
Apache Spark version


'2.4.5'

In [0]:
reviews_df = spark.read.options(inferschema = "true").json(reviews_raw)

In [0]:
reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [0]:
reviews_df.show(5)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|           summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+
|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...| 05 5, 2014|A1F6404F1VG29J|          Avidreader|Nice vintage story|    1399248000|
|B000F83SZQ| [2, 2]|    4.0|This book is a re...| 01 6, 2014| AN0N05A9LIJEQ|            critters|      Different...|    1388966400|
|B000F83SZQ| [2, 2]|    4.0|This was a fairly...| 04 4, 2014| A795DMNCJILA6|                 dot|             Oldie|    1396569600|
|B000F83SZQ| [1, 1]|    5.0|I'd never read an...|02 19, 2014|A1FV0SX13TWVXQ|Elaine H. Turley ...|I really liked it.|    1392768000|
|B000F83SZQ| [0, 1]|    4.0|If you like perio...|03 19, 2014|A3SPTOKDG7WBLN|

In [0]:
reviews_df.select('overall').describe().show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            982619|
|   mean| 4.347801131466011|
| stddev|0.9550557821749456|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [0]:
reviews_df = reviews_df.withColumn('sentiment_label_fr_score',
                                   F.when(reviews_df["overall"] >= 4, 'Positive')
                                   .otherwise('Negative'))

In [0]:
reviews_df.show(10)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+------------------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|sentiment_label_fr_score|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+------------------------+
|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...| 05 5, 2014|A1F6404F1VG29J|          Avidreader|  Nice vintage story|    1399248000|                Positive|
|B000F83SZQ| [2, 2]|    4.0|This book is a re...| 01 6, 2014| AN0N05A9LIJEQ|            critters|        Different...|    1388966400|                Positive|
|B000F83SZQ| [2, 2]|    4.0|This was a fairly...| 04 4, 2014| A795DMNCJILA6|                 dot|               Oldie|    1396569600|                Positive|
|B000F83SZQ| [1, 1]|    5.0|I'd never read an.

In [0]:
reviews_df.dropna(how='any')
reviews_df.show(5)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+------------------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|           summary|unixReviewTime|sentiment_label_fr_score|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+------------------------+
|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...| 05 5, 2014|A1F6404F1VG29J|          Avidreader|Nice vintage story|    1399248000|                Positive|
|B000F83SZQ| [2, 2]|    4.0|This book is a re...| 01 6, 2014| AN0N05A9LIJEQ|            critters|      Different...|    1388966400|                Positive|
|B000F83SZQ| [2, 2]|    4.0|This was a fairly...| 04 4, 2014| A795DMNCJILA6|                 dot|             Oldie|    1396569600|                Positive|
|B000F83SZQ| [1, 1]|    5.0|I'd never read an...|02 19, 20

#### Add ID
Add unique ID so reviews can be grouped again after exploding to do sentiment analysis by sentence.

In [0]:
reviews_df = reviews_df.withColumn("unique_id", F.monotonically_increasing_id())

# Spark NLP Sentiment Analysis


Using example from jonsnow sparknlp


In [0]:
# use document assemble which puts data in annotaed form
document_assembler = DocumentAssembler() \
                      .setInputCol("reviewText") \
                      .setOutputCol("review_document")

In [0]:
assembled = document_assembler.transform(reviews_df)
assembled.select('review_document').take(5)

[Row(review_document=[Row(annotatorType='document', begin=0, end=293, result="I enjoy vintage books and movies so I enjoyed reading this book.  The plot was unusual.  Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone in the jaw to knock them out would wash today.Still it was a good read for me.", metadata={'sentence': '0'}, embeddings=[], sentence_embeddings=[])]),
 Row(review_document=[Row(annotatorType='document', begin=0, end=454, result="This book is a reissue of an old one; the author was born in 1910. It's of the era of, say, Nero Wolfe. The introduction was quite interesting, explaining who the author was and why he's been forgotten; I'd never heard of him.The language is a little dated at times, like calling a gun a &#34;heater.&#34;  I also made good use of my Fire's dictionary to look up words like &#34;deshabille&#34; and &#34;Canarsie.&#34; Still, it was well worth a look-see.", metadata={'sentence

In [0]:
#detect sentences
sentence_finder = SentenceDetector() \
    .setExplodeSentences(False) \
    .setInputCols("review_document") \
    .setOutputCol("sentence") 
sentence_data = sentence_finder.transform(assembled)
sentence_data.select("sentence").limit(5).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
first_obs = sentence_data.select('sentence') \
      .limit(1)
first_obs_df = first_obs.select('sentence', F.explode(first_obs.sentence).alias('_sentence'))

pd.set_option('max_colwidth', 150)
first_obs_df.toPandas()[['_sentence']]

Unnamed: 0,_sentence
0,"(document, 0, 63, I enjoy vintage books and movies so I enjoyed reading this book., {'sentence': '0'}, [], [])"
1,"(document, 66, 86, The plot was unusual., {'sentence': '1'}, [], [])"
2,"(document, 89, 293, Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone..."


In [0]:
pd.set_option('max_colwidth', 50)

In [0]:
#Tokenize
tokenizer = Tokenizer() \
              .setInputCols(['sentence']) \
              .setOutputCol('token')
token_data = tokenizer.fit(sentence_data).transform(sentence_data)
token_data.show(5)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+------------------------+---------+--------------------+--------------------+--------------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|           summary|unixReviewTime|sentiment_label_fr_score|unique_id|     review_document|            sentence|               token|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+------------------------+---------+--------------------+--------------------+--------------------+
|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...| 05 5, 2014|A1F6404F1VG29J|          Avidreader|Nice vintage story|    1399248000|                Positive|        0|[[document, 0, 29...|[[document, 0, 63...|[[token, 0, 0, I,...|
|B000F83SZQ| [2, 2]|    4.0|This book is a re...| 01 6, 2014| AN0N05A9LIJEQ|    

In [0]:
# needed only if json file incorrectly untarred
#token_data.where(token_data['_corrupt_record'].isNotNull())

In [0]:
#Normalize
normalizer = (Normalizer()
                .setInputCols(["token"])
                .setOutputCol('normed_token')
               )
normalizer_data = normalizer.fit(token_data).transform(token_data)
pd.DataFrame(normalizer_data.take(5))

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
0,B000F83SZQ,"[0, 0]",5.0,I enjoy vintage books and movies so I enjoyed ...,"05 5, 2014",A1F6404F1VG29J,Avidreader,Nice vintage story,1399248000,Positive,0,"[(document, 0, 293, I enjoy vintage books and ...","[(document, 0, 63, I enjoy vintage books and m...","[(token, 0, 0, I, {'sentence': '0'}, [], []), ...","[(token, 0, 0, I, {'sentence': '0'}, [], []), ..."
1,B000F83SZQ,"[2, 2]",4.0,This book is a reissue of an old one; the auth...,"01 6, 2014",AN0N05A9LIJEQ,critters,Different...,1388966400,Positive,1,"[(document, 0, 454, This book is a reissue of ...","[(document, 0, 36, This book is a reissue of a...","[(token, 0, 3, This, {'sentence': '0'}, [], []...","[(token, 0, 3, This, {'sentence': '0'}, [], []..."
2,B000F83SZQ,"[2, 2]",4.0,This was a fairly interesting read. It had ol...,"04 4, 2014",A795DMNCJILA6,dot,Oldie,1396569600,Positive,2,"[(document, 0, 374, This was a fairly interest...","[(document, 0, 34, This was a fairly interesti...","[(token, 0, 3, This, {'sentence': '0'}, [], []...","[(token, 0, 3, This, {'sentence': '0'}, [], []..."
3,B000F83SZQ,"[1, 1]",5.0,I'd never read any of the Amy Brewster mysteri...,"02 19, 2014",A1FV0SX13TWVXQ,"Elaine H. Turley ""Montana Songbird""",I really liked it.,1392768000,Positive,3,"[(document, 0, 100, I'd never read any of the ...","[(document, 0, 63, I'd never read any of the A...","[(token, 0, 2, I'd, {'sentence': '0'}, [], [])...","[(token, 0, 1, Id, {'sentence': '0'}, [], []),..."
4,B000F83SZQ,"[0, 1]",4.0,"If you like period pieces - clothing, lingo, y...","03 19, 2014",A3SPTOKDG7WBLN,Father Dowling Fan,Period Mystery,1395187200,Positive,4,"[(document, 0, 129, If you like period pieces ...","[(document, 0, 72, If you like period pieces -...","[(token, 0, 1, If, {'sentence': '0'}, [], []),...","[(token, 0, 1, If, {'sentence': '0'}, [], []),..."


In [0]:
#Check Spelling

# need to DL wordlist
# -N new only
# -P set directory
! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/spell/words.txt -P /tmp


--2020-03-11 01:26:46--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/spell/words.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.1.62
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.1.62|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4862966 (4.6M) [text/plain]
Saving to: ‘/tmp/words.txt’


2020-03-11 01:26:47 (7.05 MB/s) - ‘/tmp/words.txt’ saved [4862966/4862966]



In [0]:
# spell check option 1
spell_check_nv = (NorvigSweetingApproach()
                .setInputCols(['normed_token'])
                .setOutputCol('spell_checked')
                .setDictionary("/tmp/words.txt")
                .setDoubleVariants(False)  # set for speed
                .setShortCircuit(True)  # set for speed
               )
spell_check_data = spell_check_nv.fit(normalizer_data).transform(normalizer_data)
spell_check_data.show(5)

In [0]:
# spell check option 2
sherlock_url = r"http://www.gutenberg.org/files/1661/1661-0.txt"
! wget -O sherlockholmes.txt $sherlock_url -P /tmp
train_corpus = (spark.read.text("./sherlockholmes.txt")
    .withColumnRenamed("value", "text")
                )

spell_check_sym = (
    SymmetricDeleteApproach()
    .setInputCols(["normed_token"])
    .setOutputCol("spell_checked")
    .setDictionary("/tmp/words.txt")
    .fit(train_corpus)
)

--2020-03-11 01:28:17--  http://www.gutenberg.org/files/1661/1661-0.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 607788 (594K) [text/plain]
Saving to: ‘sherlockholmes.txt’


2020-03-11 01:28:19 (453 KB/s) - ‘sherlockholmes.txt’ saved [607788/607788]



AnalysisException: ignored

In [0]:
#sentiment
sentiment_analyzer = (ViveknSentimentApproach()
                      .setInputCols(['spell_checked', 'sentence'])
                      .setOutputCol('sentiment')
                      .setPruneCorpus(0)
                      .setSentimentCol('sentiment_label')
                      )

In [0]:
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeMetadata(False)

In [0]:
sentiment_pipe = Pipeline(stages=[document_assembler,
                                  sentence_finder,
                                  tokenizer,
                                  normalizer,
                                  #spell_check_sym,  # option 2
                                  sentiment_analyzer,
                                  finisher
                                  ])

In [0]:
review_sentiment_model = (sentiment_pipe
                          .fit(reviews_df)
                          .transform(reviews_df)
                          )

AnalysisException: ignored

In [0]:
review_sentiment_data.show(n=5, truncate=False)

In [0]:
review_sentiment_combined_df = (
    review_sentiment_model
    .groupBy("unique_id", "reviewerID", "unixReviewTime")
    .agg(avg("sentiment", avg("overall")))
)
review_sentiment_combined_df.show(5)

# Using pretrained pipeline

In [0]:
sparknlp.start()
pipeline = PretrainedPipeline(name='analyze_sentiment', lang='en')

In [0]:
prepipe = PretrainedPipeline(name='analyze_sentiment')
prepipe_result = prepipe.annotate(target=reviews_df, column="reviewText")
prepipe_result.show(5)

In [0]:
spark.createDataFrame(prepipe_result.select('sentiment').take(1))

In [0]:
prepipe_result.select('sentiment').show(truncate=False)

In [0]:
#get sentiment average for 
result.withColumn('avg_sentiment', 
                  F.when(F.col('sentiment')['result']==F.lit('positive'), F.lit((1,1))) \
                         .otherwise((0,1)) \
                         .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
                         .mapValues(lambda x: x[0]/x[1])) \
      .show()

In [0]:
prepipe_result.withColumn("sent_mean", lambda x: x["sentiment"])

In [0]:
prepipe_result.withColumn("exploded_sent", F.explode(F.col("sentiment"))) \
    .select("exploded_sent").printSchema()

In [0]:
schema = StructType([StructField])

In [0]:
result2 = (result
  .withColumn("reviewID", F.monotonically_increasing_id())
  .withColumn("exploded_sent", F.explode(F.col("sentiment")))
  .select(["exploded_sent.*", "overall", "sentiment_label", "reviewID"])
           )
result2.show(5)

In [0]:
result2.printSchema()

In [0]:
result2 = result2.withColumn("id", F.monotonically_increasing_id())
result2.show()

In [0]:
result3 = result2 \
  .withColumn("numerical_result", F.when(result2["result"] == "Positive", 1).otherwise(0)) \
  .groupBy(["ID", "overall", "sentiment_label"]) \
  .agg(F.mean("numerical_result").alias("result_mean")) \
  .show(truncate=False)

## Timeseries Analysis
I am grouping the data by date to reduce the data size. This will get it to a manageable size for in-memory ml tools. I will then use statsmodels ARIMA to model the change in sentiment for reviews.

In [0]:
ts_df = ('review_sentiment_data'
         .withColumn(rev_date_dt, col("unixReviewTime").cast("DateType")
         .groupBy("rev_date_dt", "asin")
         .agg(avg("sentiment", avg("overall")))
         )

In [0]:
ts_pdf = ts_df.toPandas()
ts_pdf.plot.line();