### Analyzing/Predicting Sentiment From Amazon Reviews

In this notebook, I use Spark (in batch mode only) to build a pipeline that can take in set of amazon reviews, featurize them using TFIDF vectors, and then use Logistic Regression to predict sentiment (where >3.5 stars is positive). 

In [26]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import monotonically_increasing_id, create_map, lit
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF, IDFModel, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from itertools import chain

In [2]:
#Define some constants to initiate our spark context
data = "examples/AmznInstantVideo.json"
name = "AMZN_Sentiment"
SPARK_URL = "local[*]"

In [3]:
#create a spark context, and then a sql context to load and manipulate the df/rdd
sc = SparkSession.builder.appName(name).master(SPARK_URL).getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
#Read in the file, then examine:
raw =sqlContext.read.json(data)
raw.columns

['asin',
 'helpful',
 'overall',
 'reviewText',
 'reviewTime',
 'reviewerID',
 'reviewerName',
 'summary',
 'unixReviewTime']

In [5]:
raw.select('asin','helpful','overall','reviewTime','reviewerID',
           'reviewerName','unixReviewTime').show(5)

+----------+-------+-------+-----------+--------------+--------------------+--------------+
|      asin|helpful|overall| reviewTime|    reviewerID|        reviewerName|unixReviewTime|
+----------+-------+-------+-----------+--------------+--------------------+--------------+
|B000H00VBQ| [0, 0]|    2.0| 05 3, 2014|A11N155CW1UV02|            AdrianaM|    1399075200|
|B000H00VBQ| [0, 0]|    5.0| 09 3, 2012|A3BC8O2KCL29V2|             Carol T|    1346630400|
|B000H00VBQ| [0, 1]|    1.0|10 16, 2013| A60D5HQFOTSOM|Daniel Cooper "da...|    1381881600|
|B000H00VBQ| [0, 0]|    4.0|10 30, 2013|A1RJPIGRSNX4PW|      J. Kaplan "JJ"|    1383091200|
|B000H00VBQ| [1, 1]|    5.0|02 11, 2009|A16XRPF40679KG|       Michael Dobey|    1234310400|
+----------+-------+-------+-----------+--------------+--------------------+--------------+
only showing top 5 rows



In [6]:
raw.select('reviewText','summary').show(5)

+--------------------+--------------------+
|          reviewText|             summary|
+--------------------+--------------------+
|I had big expecta...|A little bit bori...|
|I highly recommen...|Excellent Grown U...|
|This one is a rea...|Way too boring fo...|
|Mysteries are int...|Robson Green is m...|
|This show always ...|Robson green and ...|
+--------------------+--------------------+
only showing top 5 rows



In [7]:
#Examine the dimensions:
print(raw.count())
print(len(raw.columns)) #s/b 9 based on the above...

37121
9


In [8]:
#Converts star ratings to a boolean sentiment score
scoremapping = {
    0.0: 0,
    1.0: 0,
    2.0: 0,
    3.0: 0,
    4.0: 1,
    5.0: 1}

mapping_expr = create_map([lit(x) for x in chain(*scoremapping.items())])

raw = raw.withColumn('sentiment',mapping_expr[raw['overall']])

In [9]:
# Split the data into training and validation sets (30% held out for testing)
(trainingData, testData) = raw.randomSplit([.7, .3])

In [10]:
#create a tokenizer for the pipeline
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")

#and transform the training data just to demonstrate it works
wordsData = tokenizer.transform(trainingData)
wordsData.select('reviewText', 'words').show(5)

+--------------------+--------------------+
|          reviewText|               words|
+--------------------+--------------------+
|Mysteries are int...|[mysteries, are, ...|
|This one is a rea...|[this, one, is, a...|
|This show always ...|[this, show, alwa...|
|I discovered this...|[i, discovered, t...|
|It beats watching...|[it, beats, watch...|
+--------------------+--------------------+
only showing top 5 rows



In [11]:
#create simple term frequency transformer
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100)

#and transform the training data just to demonstrate it works
hashedData = hashingTF.transform(wordsData)
hashedData.select('reviewText','words','rawFeatures').show(5)

+--------------------+--------------------+--------------------+
|          reviewText|               words|         rawFeatures|
+--------------------+--------------------+--------------------+
|Mysteries are int...|[mysteries, are, ...|(100,[1,9,10,13,1...|
|This one is a rea...|[this, one, is, a...|(100,[4,10,14,20,...|
|This show always ...|[this, show, alwa...|(100,[0,3,6,7,9,1...|
|I discovered this...|[i, discovered, t...|(100,[0,1,2,3,4,6...|
|It beats watching...|[it, beats, watch...|(100,[7,20,26,29,...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [12]:
#Examination of the output
hashedData.select('rawFeatures').show(1,truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------+
|rawFeatures                                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------------+
|(100,[1,9,10,13,15,18,20,22,30,33,38,52,61,68,72,81,83,90],[1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0])|
+-------------------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



In [13]:
#Create a transformer that will scale the term frequences by the inverse document frequency
idf = IDF(minDocFreq=3, inputCol='rawFeatures', outputCol='tfidfFeatures').fit(hashedData)

#and transform the training data just to demonstrate it works
tfidf = idf.transform(hashedData)
tfidf.select('tfidfFeatures').show(1, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tfidfFeatures                                                                                                                                                                                                                                                                                                                                                                                                     |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
#Recap of raw input, labels, and features:
tfidf.select('reviewText','sentiment','tfidfFeatures').show(5)

+--------------------+---------+--------------------+
|          reviewText|sentiment|       tfidfFeatures|
+--------------------+---------+--------------------+
|Mysteries are int...|        1|(100,[1,9,10,13,1...|
|This one is a rea...|        0|(100,[4,10,14,20,...|
|This show always ...|        1|(100,[0,3,6,7,9,1...|
|I discovered this...|        1|(100,[0,1,2,3,4,6...|
|It beats watching...|        0|(100,[7,20,26,29,...|
+--------------------+---------+--------------------+
only showing top 5 rows



# Pipeline to Logistic Regression

In [15]:
# Fit a label indexer 
labelIndexer = StringIndexer(inputCol="sentiment", outputCol="indexedLabel").fit(tfidf)

# Fit a feature indexer
featureIndexer = VectorIndexer(inputCol="tfidfFeatures", outputCol="indexedFeatures", maxCategories=4).fit(tfidf)

# Train a Logistic Regression
lr = LogisticRegression(featuresCol='indexedFeatures', labelCol='indexedLabel', predictionCol='pred')

# Chain feature transformers, indexers and regression in a Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, 
                            labelIndexer, featureIndexer, lr])

In [16]:
#Fit the full model to the training data
model = pipeline.fit(trainingData)

#Predict test data 
predictions = model.transform(testData)
predictions.select('reviewText', 'sentiment','pred','probability').show(5)

+--------------------+---------+----+--------------------+
|          reviewText|sentiment|pred|         probability|
+--------------------+---------+----+--------------------+
|I had big expecta...|        0| 0.0|[0.81175358172645...|
|I highly recommen...|        1| 0.0|[0.87999870113215...|
|This is the best ...|        1| 0.0|[0.78954232320191...|
|if this had to do...|        0| 0.0|[0.83113051584153...|
|Watched it for Ke...|        1| 0.0|[0.80906059339862...|
+--------------------+---------+----+--------------------+
only showing top 5 rows



In [17]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="pred", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy:g}")

Accuracy = 0.798482


In [27]:
predictionAndLabels = predictions.select('pred','sentiment').rdd
type(predictionAndLabels)

pyspark.rdd.RDD

In [19]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

AttributeError: 'SparkSession' object has no attribute 'serializer'