# Amazon Video Games Sentiment Analysis
Raj Prasad
July 2019

[html version](https://daddyprasad5.github.io/amazon_video_games_sentiment.html) - with all the code hidden away for a quick read

[jupyter notebook version](https://github.com/daddyprasad5/thinkful/blob/amazon_video_games_sentiment.ipynb) - with all the code exposed in an interactive notebook

The goal of this exercise is to exercise spark batch processing skills in a machine learning context.  

I'll be building a sentiment analysis model using TF/IDF features and a logistic regression model.  

Much thanks to [Ricky Kim](https://towardsdatascience.com/@rickykim78?source=post_page---------------------------) whose [Medium Post ](https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35) on this topic was extremely useful and easy to follow. 

This notebook is designed to work on google colab. 

Here are the basic gottadoit's for any spark-on-colab notebook.

In [3]:
#install spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz

# Install spark-related depdencies for Python
!pip install -q findspark
!pip install pyspark

# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

# Point Colaboratory to Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/37/98/244399c0daa7894cdf387e7007d5e8b3710a79b67f3fd991c0b0b644822d/pyspark-2.4.3.tar.gz (215.6MB)
[K     |████████████████████████████████| 215.6MB 47kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 40.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/8d/20/f0/b30e2024226dc112e256930dd2cd4f06d00ab053c86278dcf3
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.3
Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent

In [0]:
#imports
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

In [0]:
#set constants
JSON_PATH = "/content/gdrive/My Drive/Colab Datasets/reviews_Video_Games_5.json" 
APP_NAME = "Amazon Video Game Sentiment Analysis"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8

In [0]:
#read our data
#data downloaded from: http://jmcauley.ucsd.edu/data/amazon/
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
df = spark.read.options(inferschema = "true").json(JSON_PATH)

In [0]:
#I need only the "asin" (id), "overall" (rating), and "reviewText" columns.  
reviews = df.select(["asin", "overall", "reviewText"])

In [0]:
#Translate the 1-5 "overall" score to negative (0) or positive (1) sentiment
reviews = reviews.withColumn("target", 
                             when(reviews.overall <= 3, 0).otherwise(1))

In [0]:
#splite into training, validation and test
(train_set, val_set, test_set) = reviews.randomSplit([0.98, 0.01, 0.01], seed = 2000)

I've done three different versions of the model - following Ricky's lead.  

*   TD/IDF with single word terms, using the HashingTF IDF feature calculator
*   TD/IDF with single word terms, using the CountVectorizer feature calculator
*  TD/IDF with n (1-3)-gram terms, using the CountVectorizer feature calculator

You'll see that the middle version was the best balance of performance and speed in this case.  

In [10]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+----------+-------+--------------------+------+--------------------+--------------------+--------------------+-----+
|      asin|overall|          reviewText|target|               words|                  tf|            features|label|
+----------+-------+--------------------+------+--------------------+--------------------+--------------------+-----+
|0700099867|    1.0|1st shipment rece...|     0|[1st, shipment, r...|(65536,[568,6534,...|(65536,[568,6534,...|  1.0|
|0700099867|    1.0|Crashed in Vista....|     0|[crashed, in, vis...|(65536,[4775,8315...|(65536,[4775,8315...|  1.0|
|0700099867|    1.0|DiRT 2 was like t...|     0|[dirt, 2, was, li...|(65536,[1672,1706...|(65536,[1672,1706...|  1.0|
|0700099867|    1.0|I bought this and...|     0|[i, bought, this,...|(65536,[5782,8436...|(65536,[5782,8436...|  1.0|
|0700099867|    1.0|I can't tell you ...|     0|[i, can't, tell, ...|(65536,[1903,2026...|(65536,[1903,2026...|  1.0|
+----------+-------+--------------------+------+--------

In [11]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#areaUnderROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.7606488356568426

In [12]:
#accuracy
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

0.7815013404825737

In [13]:
#same process, but using a different inverse document frequency calculator

%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7958
ROC-AUC: 0.8107
CPU times: user 135 ms, sys: 23.6 ms, total: 158 ms
Wall time: 3min 26s


In [0]:
#same as above, but using n-grams (n=1-3) instead of single words

from pyspark.ml.feature import NGram, VectorAssembler

def build_ngrams_wocs(inputCol=["reviewText","target"], n=3):
    tokenizer = [Tokenizer(inputCol="reviewText", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), 
               minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler 
                    + label_stringIdx+lr)


In [15]:
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)



CPU times: user 519 ms, sys: 121 ms, total: 640 ms
Wall time: 25min 10s


In [16]:
# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print("ROC-AUC: {0:.4f}".format(roc_auc_wocs))

Accuracy Score: 0.8552
ROC-AUC: 0.8917


That improved things somewhat.  Normally, I'd move on to tuning, but with 25 minutes per run, I'll leave the code below un-executed.  

In [0]:
# Create ParamGrid for Cross Validation

# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# paramGrid = (ParamGridBuilder()
#              .addGrid(lr.regParam, [0.01, 2.0])
#              .addGrid(lr.elasticNetParam, [0.0, 1.0])
#              .addGrid(lr.maxIter, [1, 10])
#              .build())

# EVERYTHING BELOW HERE IS NOT RUN & TESTED



In [0]:
# Create 5-fold CrossValidator
# cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# # Run cross validations
# cvModel = cv.fit(train_set)

In [0]:
# Use test set to measure the accuracy of our model on new data
# predictions = cvModel.transform(val_set)

# # cvModel uses the best model found from the Cross Validation
# # Evaluate best model
# evaluator.evaluate(predictions)

Finally, let's run the test set through the best model.  

In [21]:

%%time
predictions_wocs_test = trigramwocs_pipelineFit.transform(test_set)
accuracy_wocs_test = predictions_wocs_test.filter(predictions_wocs_test.label == predictions_wocs_test.prediction).count() / float(val_set.count())
roc_auc_wocs_test = evaluator.evaluate(predictions_wocs_test)


# print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(accuracy_wocs_test))
print("ROC-AUC: {0:.4f}".format(roc_auc_wocs_test))

Accuracy Score: 0.8878
ROC-AUC: 0.8697
CPU times: user 61.5 ms, sys: 17.8 ms, total: 79.3 ms
Wall time: 16.5 s
