In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

In [72]:
json_PATH = "/home/ds/notebooks/datasets/reviews_Musical_Instruments_5.json"
SPARK_URL = "local[*]"
APP_NAME = "Amazone Review Example"
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 10

In [18]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
df = spark.read.options(inferschema = "true").json(json_PATH)

In [19]:
df.show()

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|                good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|                Jake|    1363392000|
|1384719342|  [1, 1]|    5.0|The primary job o...|08 28, 2013|A195EZSQDW3E21|Rick Bennette "Ri...|It Does The Job Well|    1377648000|
|1384719342|  [0, 0]|    5.0|Nice windscreen p...|02 14, 2014|A2C00NNG1ZQQG2|RustyBill "Sunday...|GOOD WINDSCREEN F...|    1392336000|
|1384719342|  [0, 0]|    5.0|This pop filter i...|02 21

In [6]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [7]:
df.count()

10261

In [11]:
len(df.columns)

9

In [13]:
df.describe().show()

+-------+-------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|summary|         asin|           overall|          reviewText|reviewTime|          reviewerID|        reviewerName|             summary|      unixReviewTime|
+-------+-------------+------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|  count|        10261|             10261|               10261|     10261|               10261|               10234|               10261|               10261|
|   mean|1.384719342E9| 4.488743787155248|                null|      null|                null|                null|                null|1.3606059557547998E9|
| stddev|          0.0|0.8946423761647264|                null|      null|                null|                null|                null| 3.779735074639005E7|
|    min|   1384719342|               1.0|    

In [20]:
df = df.dropna()

In [21]:
df.count()

10234

In [22]:
df.show(2)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+-------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+-------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|   good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|   Jake|    1363392000|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+-------+--------------+
only showing top 2 rows



In [23]:
df.select('overall','reviewText').show(5)

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    5.0|Not much to write...|
|    5.0|The product does ...|
|    5.0|The primary job o...|
|    5.0|Nice windscreen p...|
|    5.0|This pop filter i...|
+-------+--------------------+
only showing top 5 rows



In [41]:
def calculateRatingDecode(x):
    if int(x) > 3:
        return 1
    else:
        return 0

In [42]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

func_udf = udf(calculateRatingDecode, IntegerType())
df = df.withColumn('overall_encoded',func_udf(df['overall']))

In [43]:
df.select('overall','reviewText','overall_encoded').show()

+-------+--------------------+---------------+
|overall|          reviewText|overall_encoded|
+-------+--------------------+---------------+
|    5.0|Not much to write...|              1|
|    5.0|The product does ...|              1|
|    5.0|The primary job o...|              1|
|    5.0|Nice windscreen p...|              1|
|    5.0|This pop filter i...|              1|
|    5.0|So good that I bo...|              1|
|    5.0|I have used monst...|              1|
|    3.0|I now use this ca...|              0|
|    5.0|Perfect for my Ep...|              1|
|    5.0|Monster makes the...|              1|
|    5.0|Monster makes a w...|              1|
|    4.0|I got it to have ...|              1|
|    3.0|If you are not us...|              0|
|    5.0|I love it, I used...|              1|
|    5.0|I bought this to ...|              1|
|    2.0|I bought this to ...|              0|
|    4.0|This Fender cable...|              1|
|    5.0|wanted it just on...|              1|
|    5.0|I've

In [60]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenized_words")
tokenized = tokenizer.transform(df)

In [64]:
tokenized.select('tokenized_words').show(5)

+--------------------+
|     tokenized_words|
+--------------------+
|[not, much, to, w...|
|[the, product, do...|
|[the, primary, jo...|
|[nice, windscreen...|
|[this, pop, filte...|
+--------------------+
only showing top 5 rows



## word2Vec

In [68]:
from pyspark.ml.feature import Word2Vec

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="tokenized_words", outputCol="result")
model = word2Vec.fit(tokenized)

word2Vec_result = model.transform(tokenized)

In [70]:
word2Vec_result.select('result').show(5)

+--------------------+
|              result|
+--------------------+
|[0.13150993396765...|
|[0.07757678634180...|
|[-0.0373586639338...|
|[-0.0377182355150...|
|[0.02990736892180...|
+--------------------+
only showing top 5 rows



In [102]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="overall_encoded", outputCol="indexedLabel").fit(word2Vec_result)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="result", outputCol="indexedFeatures", maxCategories=4).fit(word2Vec_result)
    
# Split the data into training and validation sets (30% held out for testing)
(trainingData, testData) = word2Vec_result.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=500)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [103]:
model = pipeline.fit(trainingData)

In [104]:
predictions = model.transform(testData)

In [105]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.112856
Accuracy = 0.887144


### TF/IDF

In [106]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [108]:
hashingTF = HashingTF(inputCol="tokenized_words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(tokenized)

In [109]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [110]:
rescaledData.select('features').show(5)

+--------------------+
|            features|
+--------------------+
|(20,[0,1,2,3,4,5,...|
|(20,[0,1,2,3,4,5,...|
|(20,[0,1,2,3,4,5,...|
|(20,[0,1,3,4,5,6,...|
|(20,[0,1,4,5,6,8,...|
+--------------------+
only showing top 5 rows



In [117]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="overall_encoded", outputCol="indexedLabel").fit(rescaledData)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(rescaledData)
    
# Split the data into training and validation sets (30% held out for testing)
(trainingData, testData) = rescaledData.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=500)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [118]:
model = pipeline.fit(trainingData)
predictions = model.transform(testData)

In [119]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.117591
Accuracy = 0.882409


### Gradient-boosted tree classifier

In [121]:
from pyspark.ml.classification import GBTClassifier
# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print(f"Accuracy = {accuracy:g}")

gbtModel = model.stages[2]
print(gbtModel)  # summary only


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(20,[0,1,2,3,4,5,...|
|       0.0|         0.0|(20,[0,1,3,4,5,6,...|
|       0.0|         0.0|(20,[1,2,3,4,5,6,...|
|       0.0|         0.0|(20,[0,1,2,3,4,5,...|
|       0.0|         0.0|(20,[0,1,3,4,7,8,...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.122849
Accuracy = 0.877151
GBTClassificationModel (uid=GBTClassifier_4c8687228e8791cacc1c) with 10 trees


### Conclusions

- As we can observed word2Vec performed better compared to TF/IDF though difference is minimal 
- Random forest performing better if we tunned it by increasing number of trees