In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark




Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:13 http://ppa.launchpad.net/graph

In [2]:
# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"


In [3]:
# Start a SparkSession

import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [5]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Wireless_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Wireless_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...|        Wireless|          4|    

In [6]:
from pyspark.ml.feature import  HashingTF, IDF, StringIndexer,Tokenizer,StopWordsRemover


In [7]:
from pyspark.sql.functions import length
#Create a length column to be used as a future feature
data_df = df.withColumn('length',length(df['product_title']))
data_df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|length|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|    54|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie Stick Fibl...|     

In [8]:
select_data_df = data_df.select("review_id","product_title","star_rating","vine","length")
select_data_df.show(5)
select_data_df.na.drop()

+--------------+--------------------+-----------+----+------+
|     review_id|       product_title|star_rating|vine|length|
+--------------+--------------------+-----------+----+------+
|R3W4P9UBGNGH1U|LG G4 Case Hard T...|          2|   N|    54|
|R15V54KBMTQWAY|Selfie Stick Fibl...|          4|   N|   120|
| RY8I449HNXSVF|Tribe AB40 Water ...|          5|   N|   132|
|R18TLJYCKJFLSR|RAVPower® Element...|          5|   N|   399|
|R1NK26SWS53B8Q|Fosmon Micro USB ...|          5|   N|   134|
+--------------+--------------------+-----------+----+------+
only showing top 5 rows



DataFrame[review_id: string, product_title: string, star_rating: string, vine: string, length: int]

In [9]:
#Create all dataset features

vine_to_num = StringIndexer(inputCol='vine',outputCol='label')
vine_to_num.setHandleInvalid('skip')
tokenizer = Tokenizer(inputCol='product_title',outputCol='token_text')
remover = StopWordsRemover(inputCol='token_text',outputCol='stop_token')
hashingtf = HashingTF(inputCol='stop_token',outputCol='hash_token')
idf=IDF(inputCol='hash_token',outputCol='idf_token')


In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
#Create feature vectors
clean_up = VectorAssembler(inputCols = ['idf_token', 'length'],outputCol='features')

In [11]:
#Create and run a data processing pipelaine
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[vine_to_num,tokenizer,remover,hashingtf,idf,clean_up])

In [12]:
#Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(select_data_df)
cleaned = cleaner.transform(select_data_df)
cleaned.select('vine','label','features').show(10)

+----+-----+--------------------+
|vine|label|            features|
+----+-----+--------------------+
|   N|  0.0|(262145,[2437,102...|
|   N|  0.0|(262145,[6122,208...|
|   N|  0.0|(262145,[23090,31...|
|   N|  0.0|(262145,[228,1252...|
|   N|  0.0|(262145,[17893,25...|
|   N|  0.0|(262145,[2437,992...|
|   N|  0.0|(262145,[24346,26...|
|   N|  0.0|(262145,[48443,67...|
|   N|  0.0|(262145,[13540,31...|
|   N|  0.0|(262145,[12580,17...|
+----+-----+--------------------+
only showing top 10 rows



In [13]:
#Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7,0.3],21)


In [14]:
from pyspark.ml.classification import NaiveBayes
#Create a NaiveBayes model and fit the training data

nb = NaiveBayes()
predictor = nb.fit(training)

In [15]:
#Transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------+--------------------+-----------+----+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     review_id|       product_title|star_rating|vine|length|label|          token_text|          stop_token|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------+--------------------+-----------+----+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|R1000147G0K1IW|CoverON® Hybrid D...|          5|   N|    91|  0.0|[coveron®, hybrid...|[coveron®, hybrid...|(262144,[2437,125...|(262144,[2437,125...|(262145,[2437,125...|[-506.90490574645...|[1.0,1.3539198120...|       0.0|
|R10005M68IK23V|Spigen Neo Hybrid...|          5|   N|   124|  0.0|[spigen, neo, hyb...|[spigen,

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval=BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was :%f" % acc)

Accuracy of model at predicting reviews was :0.926073
