### This file include 2 models:
### Model 1: Logistic Regression with TF-IDF Features
### Model 2: Logistic Regression with Count Vector Features (Document-Term Vectors) using 5-Fold Cross Validation

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark-nlp") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5") \
    .getOrCreate()

In [2]:
spark

In [3]:
sc = spark.sparkContext 
sc

### read dataset

In [4]:
data = spark.read.json("s3://502-project/amazon_game_data")

### remove the columns that are not useful

In [5]:
data = data.drop("helpful").drop("reviewerID").drop("unixReviewTime").drop("reviewTime").drop("reviewerName")

### change the data type using pyspark.sql

In [6]:
from pyspark.sql.types import IntegerType
data = data.withColumn("overall", data["overall"].cast(IntegerType()))

In [7]:
data.show(10)

+----------+-------+--------------------+--------------------+
|      asin|overall|          reviewText|             summary|
+----------+-------+--------------------+--------------------+
|B00002STAU|      5|this is a old cla...|           a classic|
|B00002STAU|      4|This game is more...|  good fighting game|
|B00002STAU|      5|If you love WWF n...|WWF Wrestlemania ...|
|B00002STAU|      4|I had WWF Wrestle...|wrestling game wi...|
|B00002STAU|      4|I have to admit I...|           A Classic|
|B00002STAU|      5|This game was ama...|The Best Wrestlin...|
|B00002STAU|      4|This right here i...|wrestling at it's...|
|B00002SVP7|      3|The Rampage Editi...|A few new levels ...|
|B00002SVP7|      2|Remember the mome...|                WTF?|
|B00002SVP7|      2|Back in 1993 Sega...|           Bo-oring!|
+----------+-------+--------------------+--------------------+
only showing top 10 rows



### Change the rating labels into negative/neural/positve categories
#### overall=1 or 2, attitue =1 (negative) 
#### overall=3, attitue =2 (neural)
#### overall=4 or 5, attitue =3 (positive)

In [8]:
from pyspark.sql import functions as F

In [9]:
data=data.withColumn('attitude', F.when(F.col('overall')<3,1).otherwise(F.when( F.col('overall') == 3,2).otherwise(3)))

In [10]:
data.show(5)

+----------+-------+--------------------+--------------------+--------+
|      asin|overall|          reviewText|             summary|attitude|
+----------+-------+--------------------+--------------------+--------+
|B00002STAU|      5|this is a old cla...|           a classic|       3|
|B00002STAU|      4|This game is more...|  good fighting game|       3|
|B00002STAU|      5|If you love WWF n...|WWF Wrestlemania ...|       3|
|B00002STAU|      4|I had WWF Wrestle...|wrestling game wi...|       3|
|B00002STAU|      4|I have to admit I...|           A Classic|       3|
+----------+-------+--------------------+--------------------+--------+
only showing top 5 rows



In [11]:
from pyspark.sql.types import DoubleType
data = data.withColumn("attitude", data["attitude"].cast(DoubleType()))

In [12]:
data=data.drop("overall")

In [13]:
data=data.withColumn("overall", data["attitude"])

In [14]:
data=data.drop("attitude")

In [15]:
data.show(5)

+----------+--------------------+--------------------+-------+
|      asin|          reviewText|             summary|overall|
+----------+--------------------+--------------------+-------+
|B00002STAU|this is a old cla...|           a classic|    3.0|
|B00002STAU|This game is more...|  good fighting game|    3.0|
|B00002STAU|If you love WWF n...|WWF Wrestlemania ...|    3.0|
|B00002STAU|I had WWF Wrestle...|wrestling game wi...|    3.0|
|B00002STAU|I have to admit I...|           A Classic|    3.0|
+----------+--------------------+--------------------+-------+
only showing top 5 rows



# data cleaning-nlp pipeline 

In [16]:
import nltk
nltk.download('stopwords')
nltk.download('words')

[nltk_data] Downloading package stopwords to /home/hadoop/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package words to /home/hadoop/nltk_data...
[nltk_data]   Package words is already up-to-date!


True

In [17]:
# get the list of stopwords from nltk
from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')

In [18]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer, 
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

In [19]:
documentAssembler = DocumentAssembler() \
    .setInputCol('reviewText') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [20]:
pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

In [21]:
data.createOrReplaceTempView("data")
df = spark.sql("SELECT reviewText FROM data")
df.show(10)

+--------------------+
|          reviewText|
+--------------------+
|this is a old cla...|
|This game is more...|
|If you love WWF n...|
|I had WWF Wrestle...|
|I have to admit I...|
|This game was ama...|
|This right here i...|
|The Rampage Editi...|
|Remember the mome...|
|Back in 1993 Sega...|
+--------------------+
only showing top 10 rows



In [22]:
equifax = pipeline.fit(df).transform(df)
temp = equifax.select('finished_clean_lemma')
temp.show(10)

+--------------------+
|finished_clean_lemma|
+--------------------+
|[old, classic, wr...|
|[game, oneonone, ...|
|[love, wwf, call,...|
|[wwf, wrestlemani...|
|[admit, hadnt, st...|
|[game, amazing, b...|
|[right, bit, arca...|
|[rampage, edition...|
|[remember, moment...|
|[back, sega, rele...|
+--------------------+
only showing top 10 rows



In [23]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
data=data.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
temp=temp.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
data = data.join(temp, on=["row_index"]).drop("row_index")
data.show(10)

+----------+--------------------+--------------------+-------+--------------------+
|      asin|          reviewText|             summary|overall|finished_clean_lemma|
+----------+--------------------+--------------------+-------+--------------------+
|B00002STAU|this is a old cla...|           a classic|    3.0|[old, classic, wr...|
|B00002STAU|This game is more...|  good fighting game|    3.0|[game, oneonone, ...|
|B00002STAU|If you love WWF n...|WWF Wrestlemania ...|    3.0|[love, wwf, call,...|
|B00002STAU|I had WWF Wrestle...|wrestling game wi...|    3.0|[wwf, wrestlemani...|
|B00002STAU|I have to admit I...|           A Classic|    3.0|[admit, hadnt, st...|
|B00002STAU|This game was ama...|The Best Wrestlin...|    3.0|[game, amazing, b...|
|B00002STAU|This right here i...|wrestling at it's...|    3.0|[right, bit, arca...|
|B00002SVP7|The Rampage Editi...|A few new levels ...|    2.0|[rampage, edition...|
|B00002SVP7|Remember the mome...|                WTF?|    1.0|[remember, mom

In [24]:
data.cache()

DataFrame[asin: string, reviewText: string, summary: string, overall: double, finished_clean_lemma: array<string>]

In [25]:
splitted_data = data.randomSplit([0.8, 0.2])
train_data = splitted_data[0]
test_data = splitted_data[1]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))

Number of training records: 860037
Number of testing records : 215275


# model 1: Logistic Regression with TF-IDF Features

In [34]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [35]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "overall", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_data)
train_df = pipelineFit.transform(train_data)
test_df = pipelineFit.transform(test_data)
train_df.show(5)

+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
|      asin|          reviewText|             summary|overall|finished_clean_lemma|               words|                  tf|            features|label|
+----------+--------------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+-----+
|0700099867|1st shipment rece...|           Wrong key|    1.0|[st, shipment, re...|[1st, shipment, r...|(65536,[568,6534,...|(65536,[568,6534,...|  1.0|
|0700099867|Although this gam...|Great game, Bad Host|    2.0|[although, game, ...|[although, this, ...|(65536,[1924,2026...|(65536,[1924,2026...|  2.0|
|0700099867|Amazing graphics,...|         Great Game!|    3.0|[amazing, graphic...|[amazing, graphic...|(65536,[8026,1165...|(65536,[8026,1165...|  0.0|
|0700099867|Crashed in Vista....|Don't waste your ...|    1.0|[crash, vista, co...

In [36]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(test_df)


In [37]:
predictions

DataFrame[asin: string, reviewText: string, summary: string, overall: double, finished_clean_lemma: array<string>, words: array<string>, tf: vector, features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

### Evaluate the model accuracy

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7592238937529705

# model 2: Logistic Regression with Count Vector Features (Document-Term Vectors) using 5-Fold Cross Validation

In [39]:
lr1 = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [40]:
label_stringIdx = StringIndexer(inputCol = "overall", outputCol = "label")

In [41]:
countVectors = CountVectorizer(inputCol="finished_clean_lemma", outputCol="features", vocabSize=10000, minDF=5)

In [42]:
data.show(5)

+----------+--------------------+--------------------+-------+--------------------+
|      asin|          reviewText|             summary|overall|finished_clean_lemma|
+----------+--------------------+--------------------+-------+--------------------+
|B00002STAU|this is a old cla...|           a classic|    3.0|[old, classic, wr...|
|B00002STAU|This game is more...|  good fighting game|    3.0|[game, oneonone, ...|
|B00002STAU|If you love WWF n...|WWF Wrestlemania ...|    3.0|[love, wwf, call,...|
|B00002STAU|I had WWF Wrestle...|wrestling game wi...|    3.0|[wwf, wrestlemani...|
|B00002STAU|I have to admit I...|           A Classic|    3.0|[admit, hadnt, st...|
+----------+--------------------+--------------------+-------+--------------------+
only showing top 5 rows



In [43]:
pipeline = Pipeline(stages=[countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

In [44]:
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
print("Number of training records: " + str(trainingData.count()))
print("Number of testing records : " + str(testData.count()))

Number of training records: 860648
Number of testing records : 214664


In [45]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr1.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr1.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())

### cross validation

In [46]:
cv = CrossValidator(estimator=lr1, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=MulticlassClassificationEvaluator(), \
                    numFolds=5)

In [47]:
cvModel = cv.fit(trainingData)

### Evaluate the model accuracy

In [48]:
predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7760771669407684

In [None]:
sc.stop()
spark.stop()