In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, PCA
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, Word2Vec

from pyspark.sql.functions import isnan, when, count, col

In [2]:
JSON_PATH = "/home/ds/notebooks/datasets/Amazon/Grocery_and_Gourmet_Food_5.json"
APP_NAME = "Amazon Reviews"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 10
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

In [3]:
sc = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read.json(JSON_PATH)

In [5]:
# Confirm the dataframe shape is 10,299 rows by 562 columns
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 151254 rows by 9 columns.


In [6]:
df.head()

Row(asin='616719923X', helpful=[0, 0], overall=4.0, reviewText='Just another flavor of Kit Kat but the taste is unique and a bit different.  The only thing that is bothersome is the price.  I thought it was a bit expensive....', reviewTime='06 1, 2013', reviewerID='A1VEELTKS8NLZB', reviewerName='Amazon Customer', summary='Good Taste', unixReviewTime=1370044800)

In [7]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [8]:
df.registerTempTable('reviews')

In [9]:
sqlContext.sql("select overall, count(overall) as reviewCount from reviews group by overall order by overall desc").show()

+-------+-----------+
|overall|reviewCount|
+-------+-----------+
|    5.0|      87446|
|    4.0|      32598|
|    3.0|      17514|
|    2.0|       7917|
|    1.0|       5779|
+-------+-----------+



In [10]:
RedefineScale = UserDefinedFunction(lambda x: 1 if x > 3.0 else -1, IntegerType())

df = df.withColumn("overall_recode",RedefineScale(df.overall))

In [11]:
df.select("overall", "overall_recode", "summary", "reviewText").show(10)

+-------+--------------+--------------------+--------------------+
|overall|overall_recode|             summary|          reviewText|
+-------+--------------+--------------------+--------------------+
|    4.0|             1|          Good Taste|Just another flav...|
|    3.0|            -1|3.5 stars,  sadly...|I bought this on ...|
|    4.0|             1|                Yum!|Really good. Grea...|
|    5.0|             1|Unexpected flavor...|I had never had i...|
|    4.0|             1|Not a very strong...|I've been looking...|
|    4.0|             1|              Subtle|These Kit-kats ar...|
|    3.0|            -1|Available in some...|I found these in ...|
|    5.0|             1|      So Delicious!!|Creamy white choc...|
|    5.0|             1|These are my favo...|After hearing mix...|
|    1.0|            -1|           Not a fan|I love green tea,...|
+-------+--------------+--------------------+--------------------+
only showing top 10 rows



In [12]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenized_text").transform(df)

word2Vec = Word2Vec(vectorSize=100, inputCol="tokenized_text", outputCol="w2v_vector").fit(tokenizer)

w2vdf=word2Vec.transform(tokenizer)

In [13]:
w2vdf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- overall_recode: integer (nullable = true)
 |-- tokenized_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- w2v_vector: vector (nullable = true)



In [14]:
w2vdf.select("overall_recode", "reviewText", "tokenized_text", "w2v_vector").show(10)

+--------------+--------------------+--------------------+--------------------+
|overall_recode|          reviewText|      tokenized_text|          w2v_vector|
+--------------+--------------------+--------------------+--------------------+
|             1|Just another flav...|[just, another, f...|[0.14400040171687...|
|            -1|I bought this on ...|[i, bought, this,...|[0.10373068780326...|
|             1|Really good. Grea...|[really, good., g...|[0.12804974936880...|
|             1|I had never had i...|[i, had, never, h...|[0.10261573377065...|
|             1|I've been looking...|[i've, been, look...|[0.08609757001115...|
|             1|These Kit-kats ar...|[these, kit-kats,...|[0.12085215287068...|
|            -1|I found these in ...|[i, found, these,...|[0.09676365884952...|
|             1|Creamy white choc...|[creamy, white, c...|[0.02746630740662...|
|             1|After hearing mix...|[after, hearing, ...|[0.09187632727088...|
|            -1|I love green tea,...|[i,

In [16]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="overall_recode", outputCol="indexedLabel").fit(w2vdf)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="w2v_vector", outputCol="indexedFeatures", maxCategories=4).fit(w2vdf)
    
# Split the data into training and validation sets (30% held out for testing)
(trainingData, testData) = w2vdf.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [17]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [18]:
# Make predictions.
predictions = model.transform(testData)

In [19]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.202955
Accuracy = 0.797045
