# Amazon Reviews - Model

In [61]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SparkSession, SQLContext
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("amazon-reviews-project").getOrCreate()

In [62]:
#for now, only reading reviews for items in the "Kitchen" category
reviews = sqlContext.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Kitchen/")

***

## Data Extraction
Obtaining sentiment polarity from review string contents

In [63]:
reviews = reviews.na.fill({'review_body': '', 'review_headline': ''})

In [64]:
from pyspark.sql import Row
from pyspark.sql.functions import udf
from textblob import TextBlob

polarity = udf(lambda x: TextBlob(x).sentiment.polarity)

reviews = reviews.withColumn('headline_polarity', polarity('review_headline'))\
                 .withColumn('body_polarity', polarity('review_body'))

## Data Schema

In [5]:
reviews.count()

4882831

In [6]:
reviews.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = false)
 |-- review_body: string (nullable = false)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- headline_polarity: string (nullable = true)
 |-- body_polarity: string (nullable = true)



## Applying a filter - keeping only reviews with higher than 100 total votes received

In [69]:
reviews = reviews.filter(reviews.total_votes > 100)

In [70]:
reviews.count()

12989

# Creating "helpful?" variable
##### A review is helpful if at least 75% of 'total_votes' have been 'helpful_votes'.

In [71]:
import pyspark.sql.functions as f
reviews = reviews.withColumn("helpful-ratio", reviews.helpful_votes/reviews.total_votes)

In [72]:
reviews = reviews.withColumn("helpful?", f.when(reviews["helpful-ratio"] > 0.75, 1).otherwise(0))

In [73]:
reviews = reviews.withColumn("verified_purchase", f.when(reviews["verified_purchase"] == "Y", 1).otherwise(reviews.verified_purchase))
reviews = reviews.withColumn("verified_purchase", f.when(reviews["verified_purchase"] == "N", 0).otherwise(reviews.verified_purchase))
reviews = reviews.withColumn("vine", f.when(reviews["vine"] == "Y", 1).otherwise(reviews.vine))
reviews = reviews.withColumn("vine", f.when(reviews["vine"] == "N", 0).otherwise(reviews.vine))

In [37]:
reviews.take(1)

[Row(marketplace='US', product_parent='224029078', product_title='Preethi Eco Twin Jar Mixer Grinder, 550-Watt', star_rating=5, total_votes=4, vine=bytearray(b'0'), verified_purchase=bytearray(b'1'), headline_polarity=0.0, body_polarity=0.08665532618761063, helpful?=0, year_bkt=2.0)]

In [74]:
import pyspark.ml.evaluation as ev
from pyspark.ml import Pipeline
import pyspark.ml.regression as rg
import pyspark.sql.functions as f
import pyspark.ml.feature as feat
import pyspark.ml.classification as cl

In [75]:
# running bucketizer for pickup_longitude and adding it in the dataset
splits = [-float("inf"), 0, 5, float("inf")]

bucketizer = feat.Bucketizer(splits=splits, inputCol="year", outputCol="year_bkt")

reviews = bucketizer.transform(reviews)

In [76]:
reviews = reviews.drop('customer_id','review_id','product_id','parent_product','product title', 'helpful_votes', 'review_headline', 'review_body', 'review_date', 'year', 'helpful-ratio')

In [77]:
reviews.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- headline_polarity: string (nullable = true)
 |-- body_polarity: string (nullable = true)
 |-- helpful?: integer (nullable = false)
 |-- year_bkt: double (nullable = true)



In [78]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType  
reviews = reviews.withColumn("headline_polarity", reviews["headline_polarity"].cast(FloatType()))
reviews = reviews.withColumn("body_polarity", reviews["body_polarity"].cast(FloatType()))
reviews = reviews.withColumn("vine", reviews["vine"].cast(IntegerType()))
reviews = reviews.withColumn("verified_purchase", reviews["verified_purchase"].cast(IntegerType()))

In [79]:
reviews=reviews.drop('features') #removes the column 'features' if it already exists
#selects all numeric columns to be combined into column 'features'
Cols_to_Select = reviews["star_rating", "total_votes", "headline_polarity", "body_polarity", "year_bkt", "vine", "verified_purchase"]
assembler = feat.VectorAssembler(inputCols=Cols_to_Select.columns, outputCol="features") #creates the VectorAssembler object

In [80]:
# running the VectorAssembler transformation onto the dataframe to create the 'features' column
reviews=assembler.setHandleInvalid("skip").transform(reviews)

In [81]:
#splitting the data into train, test, and predict datasets
splitted_data = reviews.randomSplit([0.7, 0.3], 199)
train_data = splitted_data[0]
test_data = splitted_data[1]

In [82]:
# creating the logistic regression object 
logReg_obj = cl.LogisticRegression(
    labelCol="helpful?"
    , featuresCol = "features",
    maxIter=5
)
# using pipeline to run the logistic regression, plus all other objects intially created
pipeline = Pipeline(
    stages=[
        logReg_obj
    ])

pipelineModel = pipeline.fit(train_data) #running the model on training dataset


In [83]:
trainingSummary = pipelineModel.stages[-1].summary

print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

areaUnderROC: 0.8131373889645948


In [57]:
import pyspark.ml.evaluation as ev
#evaluating the model created against test dataset
results_logReg = (
    pipelineModel
    .transform(test_data)
    .select('helpful?', 'probability', 'prediction')
)



In [58]:
evaluator = ev.MulticlassClassificationEvaluator(
    predictionCol='prediction'
    , labelCol='helpful?')


In [59]:

(
    evaluator.evaluate(results_logReg)
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'weightedPrecision'}
    ) 
    , evaluator.evaluate(
        results_logReg
        , {evaluator.metricName: 'accuracy'}
    )
)

Py4JJavaError: An error occurred while calling o882.evaluate.
: org.apache.spark.SparkException: Job 16 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2286)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2193)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.fpByClass$lzycompute(MulticlassMetrics.scala:53)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.fpByClass(MulticlassMetrics.scala:49)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:106)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.fMeasure(MulticlassMetrics.scala:124)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics$$anonfun$weightedFMeasure$2.apply(MulticlassMetrics.scala:216)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics$$anonfun$weightedFMeasure$2.apply(MulticlassMetrics.scala:215)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure$lzycompute(MulticlassMetrics.scala:215)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure(MulticlassMetrics.scala:215)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [60]:
spark.stop()