<a href="https://colab.research.google.com/github/chrisoyer/thinkful_notes/blob/master/Amazon_Kindle_Reviews__Sentiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Description:
Goal: Amazon Reviews Sentiment Analysis

### Initialization

In [0]:
#environment
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_CLASSPATH"] = '/content/spark-2.4.5-bin-hadoop2.7'

In [0]:
# Install spark-related depdencies for Python
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 54kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 46.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=bc4a3cdb71918739f8151e4817d85c1693f701a6e4cb0cff290f6e4d612d540c
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Spark NLP') \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.2.2") \
    .getOrCreate()

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
import urllib
from pyspark import SparkContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
import pyspark.sql.functions as F

!pip install spark-nlp
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import LightPipeline
from sparknlp.annotator import *
from sparknlp.common import RegexRule
from sparknlp.base import DocumentAssembler, Finisher



In [0]:
source_url = r"http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Kindle_Store_5.json.gz"

I am using a local copy of the above file, stored on gDrive, instead of re-downloading the source file.

In [0]:
data_folder = r'/content/gdrive/My Drive/thinkful/colab_datasets/amazon_reviews/'
reviews_arx = os.path.join(data_folder, 'reviews_Kindle_Store_5.json.gz')
reviews_raw = os.path.join(data_folder, 'Grocery_and_Gourmet_Food_5.json')
if not os.path.exists(reviews_raw):
    if not os.path.exists(data_folder):
        os.mkdir(data_folder)
    if not os.path.exists(reviews_arx):
        urllib.request.urlretrieve(source_url, filename=reviews_arx)
    import shutil
    import gzip
    with gzip.open(reviews_arx, 'rb') as f_in:
        with open(reviews_raw, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

SPARK_URL = "local[*]"
APP_NAME  = "amazon_food_reviews"

In [25]:
print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
2.2.2
Apache Spark version


'2.4.5'

In [0]:
sparknlp.start()
pipeline = PretrainedPipeline(name='analyze_sentiment', lang='en')

analyze_sentiment download started this may take some time.
Approx size to download 4.9 MB
[OK!]


In [0]:
reviews_df = spark.read.options(inferschema = "true").json(reviews_raw)

In [0]:
reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [0]:
reviews_df.show(5)

+----------+-------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|   reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+---------------+--------------------+--------------+
|616719923X| [0, 0]|    4.0|Just another flav...| 06 1, 2013|A1VEELTKS8NLZB|Amazon Customer|          Good Taste|    1370044800|
|616719923X| [0, 1]|    3.0|I bought this on ...|05 19, 2014|A14R9XMZVJ6INB|        amf0001|3.5 stars,  sadly...|    1400457600|
|616719923X| [3, 4]|    4.0|Really good. Grea...| 10 8, 2013|A27IQHDZFQFNGG|        Caitlin|                Yum!|    1381190400|
|616719923X| [0, 0]|    5.0|I had never had i...|05 20, 2013|A31QY5TASILE89|   DebraDownSth|Unexpected flavor...|    1369008000|
|616719923X| [1, 2]|    4.0|I've been looking...|05 26, 2013|A2LWK003FFMCI5|       Diana X.|Not a

In [0]:
reviews_df.select('overall').describe().show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            151254|
|   mean| 4.243041506340329|
| stddev|1.0900026138973262|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [0]:
reviews_df = reviews_df.withColumn('sentiment_label', F.when(reviews_df["overall"] >= 4, 'Positive').otherwise('Negative'))

In [0]:
reviews_df.show(10)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+---------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|sentiment_label|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+---------------+
|616719923X| [0, 0]|    4.0|Just another flav...| 06 1, 2013|A1VEELTKS8NLZB|     Amazon Customer|          Good Taste|    1370044800|       Positive|
|616719923X| [0, 1]|    3.0|I bought this on ...|05 19, 2014|A14R9XMZVJ6INB|             amf0001|3.5 stars,  sadly...|    1400457600|       Negative|
|616719923X| [3, 4]|    4.0|Really good. Grea...| 10 8, 2013|A27IQHDZFQFNGG|             Caitlin|                Yum!|    1381190400|       Positive|
|616719923X| [0, 0]|    5.0|I had never had i...|05 20, 2013|A31QY5TASILE89|        DebraDownSth|Une

In [0]:
reviews_df = reviews_df.na.drop()

# Spark NLP Sentiment Analysis


Using example from jonsnow sparknlp


In [0]:
# use document assemble which puts data in annotaed form
document_assembler = DocumentAssembler() \
                      .setInputCol("reviewText") \
                      .setOutputCol("review_document")

In [32]:
assembled = document_assembler.transform(reviews_df)
assembled.select('review_document').take(5)

[Row(review_document=[Row(annotatorType='document', begin=0, end=293, result="I enjoy vintage books and movies so I enjoyed reading this book.  The plot was unusual.  Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone in the jaw to knock them out would wash today.Still it was a good read for me.", metadata={'sentence': '0'}, embeddings=[], sentence_embeddings=[])]),
 Row(review_document=[Row(annotatorType='document', begin=0, end=454, result="This book is a reissue of an old one; the author was born in 1910. It's of the era of, say, Nero Wolfe. The introduction was quite interesting, explaining who the author was and why he's been forgotten; I'd never heard of him.The language is a little dated at times, like calling a gun a &#34;heater.&#34;  I also made good use of my Fire's dictionary to look up words like &#34;deshabille&#34; and &#34;Canarsie.&#34; Still, it was well worth a look-see.", metadata={'sentence

In [33]:
#detect sentences
sentence_finder = SentenceDetector() \
    .setExplodeSentences(False) \
    .setInputCols("review_document") \
    .setOutputCol("sentence") 
sentence_data = sentence_finder.transform(assembled)
sentence_data.select("sentence").limit(5).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
first_obs = sentence_data.select('sentence') \
      .limit(1)
first_obs_df = first_obs.select('sentence', F.explode(first_obs.sentence).alias('_sentence'))
first_obs_df.toPandas()[['_sentence']]

Unnamed: 0,_sentence
0,"(document, 0, 63, I enjoy vintage books and mo..."
1,"(document, 66, 86, The plot was unusual., {'se..."
2,"(document, 89, 293, Don't think killing someon..."


In [35]:
#Tokenize
tokenizer = Tokenizer() \
              .setInputCols(['sentence']) \
              .setOutputCol('token')
token_data = tokenizer.fit(sentence_data).transform(sentence_data)
token_data.take(5)

[Row(_corrupt_record=None, asin='B000F83SZQ', helpful=[0, 0], overall=5.0, reviewText="I enjoy vintage books and movies so I enjoyed reading this book.  The plot was unusual.  Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone in the jaw to knock them out would wash today.Still it was a good read for me.", reviewTime='05 5, 2014', reviewerID='A1F6404F1VG29J', reviewerName='Avidreader', summary='Nice vintage story', unixReviewTime=1399248000, review_document=[Row(annotatorType='document', begin=0, end=293, result="I enjoy vintage books and movies so I enjoyed reading this book.  The plot was unusual.  Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone in the jaw to knock them out would wash today.Still it was a good read for me.", metadata={'sentence': '0'}, embeddings=[], sentence_embeddings=[])], sentence=[Row(annotatorType='document',

In [48]:
#Normalize
normalizer = Normalizer() \
                .setInputCols(["token"]) \
                .setOutputCol('normed_token')
normalizer_data = normalizer.fit(token_data).transform(token_data)
normalizer_data.show(5)

+---------------+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|_corrupt_record|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|           summary|unixReviewTime|     review_document|            sentence|               token|        normed_token|
+---------------+----------+-------+-------+--------------------+-----------+--------------+--------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|           null|B000F83SZQ| [0, 0]|    5.0|I enjoy vintage b...| 05 5, 2014|A1F6404F1VG29J|          Avidreader|Nice vintage story|    1399248000|[[document, 0, 29...|[[document, 0, 63...|[[token, 0, 0, I,...|[[token, 0, 0, I,...|
|           null|B000F83SZQ| [2, 2]|    4.0|This book is a re...| 01 6, 

In [36]:
# -N new only
# -P set directory
! wget -N https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/spell/words.txt -P /tmp

--2020-03-09 07:46:12--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/spell/words.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.249.246
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.249.246|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4862966 (4.6M) [text/plain]
Saving to: ‘/tmp/words.txt’


2020-03-09 07:46:13 (43.2 MB/s) - ‘/tmp/words.txt’ saved [4862966/4862966]



In [0]:
#Check Spelling
spell_check = NorvigSweetingApproach() \
                .setInputCols(['normed_token']) \
                .setOutputCol('spell_checked') \
                .setDictionary("/tmp/words.txt")
spell_check_data = spell_check.fit(normalizer_data).transform(normalizer_data)
spell_check_data.show(5)

In [0]:
#sentiment
sentiment_analyzer = ViveknSentimentApproach() \
                      .setInputCols(['spell_checked', 'sentence']) \
                      .setOutputCol('sentiment') \
                      .setPruneCorpus(0) \
                      .setSentimentCol('sentiment_label')
                      

In [0]:
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeMetadata(False)

In [0]:
sentiment_pipe = Pipeline(stages=[
                                  document_assembler,
                                  sentence_finder,
                                  tokenizer,
                                  normalizer,
                                  spell_check,
                                  sentiment_analyzer,
                                  finisher
                                  ])

In [43]:
review_sentiment_model = sentiment_pipe.fit(reviews_df)

Py4JJavaError: ignored

In [0]:
review_sentiment_data = review_sentiment_model.transform(reviews_df)

In [0]:
review_sentiment_data.show(n=5, truncate=False)

In [0]:
sentiment_pipe2 = LightPipeline(sentiment_pipe)

# Using pretrained pipeline

In [0]:
prepipe = PretrainedPipeline(name='analyze_sentiment')
result = prepipe.annotate(target=reviews_df, column="reviewText")
result.show()

analyze_sentiment download started this may take some time.
Approx size to download 4.9 MB
[OK!]
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|helpful|overall|                text| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|sentiment_label|            document|            sentence|               token|             checked|           sentiment|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|616719923X| [0, 0]|    4.0|Just another flav...| 06 1, 2013|A1VEELTKS8NLZB|     Amazon Customer|          Good Taste|    1370044800|     

In [0]:
spark.createDataFrame(result.select('sentiment').take(1))

ValueError: ignored

In [0]:
result.select('sentiment').show(truncate=False)

In [0]:
result.withColumn('avg_sentiment', 
                  F.when(F.col('sentiment')['result']==F.lit('positive'), F.lit((1,1))) \
                         .otherwise((0,1)) \
                         .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
                         .mapValues(lambda x: x[0]/x[1])) \
      .show()

Py4JJavaError: ignored

In [0]:
result.withColumn("sent_mean", lambda x: x["sentiment"])

AssertionError: ignored

In [0]:
result.withColumn("exploded_sent", F.explode(F.col("sentiment"))).select("exploded_sent").printSchema()

In [0]:
schema = StructType([StructField])

In [0]:
result2 = result \
  .withColumn("reviewID", F.monotonically_increasing_id()) \
  .withColumn("exploded_sent", F.explode(F.col("sentiment"))) \
  .select(["exploded_sent.*", "overall", "sentiment_label", "reviewID"])
result2.show(5)

In [0]:
result2.printSchema()

In [0]:
result2 = result2.withColumn("id", F.monotonically_increasing_id())
result2.show()

In [0]:
result3 = result2 \
  .withColumn("numerical_result", F.when(result2["result"] == "Positive", 1).otherwise(0)) \
  .groupBy(["ID", "overall", "sentiment_label"]) \
  .agg(F.mean("numerical_result").alias("result_mean")) \
  .show(truncate=False)

## Timeseries Analysis

In [0]:
ts_df = (review_sentiment_data
         .withColumn(rev_date, col("unixReviewTime").cast("DateType")
         )