# Initialize Spark

In [1]:
import os
import findspark
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    "--repositories http://repo.hortonworks.com/content/groups/public/ "
    "--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 "
    " pyspark-shell")
findspark.init()

In [2]:
from pyspark import SQLContext, SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf()
conf.setAppName("project2")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [3]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.appName("test2").getOrCreate()

# Data Extraction

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

schema = StructType([
    StructField("marketplace", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("review_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_parent", StringType(), True),
    StructField("product_title", StringType(), True), # "label" replaces "product_title"
    StructField("product_category", StringType(), True),
    StructField("star_rating", IntegerType(), True),
    StructField("helpful_votes", IntegerType(), True),
    StructField("total_votes", IntegerType(), True),
    StructField("vine", StringType(), True),
    StructField("verified_purchase", StringType(), True),
    StructField("review_headline", StringType(), True),
    StructField("review_body_raw", StringType(), True),
    StructField("review_date", StringType(), True)])

df_test = spark.read.csv('amazon_reviews_us_Grocery_v1_00.tsv', sep="\t", header=True, schema=schema)
#df_test = spark.read.csv('sample.csv', header=True, schema=schema)

In [5]:
df_test.count()

2402458

In [6]:
df_test = df_test.dropna()
df_test.count()
# only retain product_title and review_body
dft = df_test.drop('marketplace','customer_id','review_id','product_id','product_parent','product_category','star_rating','helpful_votes','total_votes','vine','verified_purchase','review_headline','review_date')

In [7]:
dft.count()

2402211

In [8]:
dft2 = dft.select('*').where('product_title is not null or review_body_raw is not null or product_title <> "" or review_body_raw <> "" ')

In [9]:
dft2.count()

2402211

In [10]:
dft.show(50)

+--------------------+--------------------+
|       product_title|     review_body_raw|
+--------------------+--------------------+
|The Cravings Plac...|As a family aller...|
|Mauna Loa Macadam...|My favorite nut. ...|
|Organic Matcha Gr...|This green tea ta...|
|15oz Raspberry Ly...|I love Melissa's ...|
|Stride Spark Kine...|                good|
|Herr's Popcorn Ho...|The popcorn was s...|
|Larabar uber, 1.4...|Love these bars, ...|
|Shirakiku Soba No...|Love the taste bu...|
|Jif Chocolate Nut...|I'm a member of t...|
|Orgain Organic Pl...|Used to be a dece...|
|Bragg - All Natur...|I cannot tell the...|
|Wholesome Sweeten...|Good flavor and s...|
|Kadoya Pure Sesam...|Great to use in r...|
|Nishiki Premium B...|It's rice. Have e...|
|Everly Passion Fr...|Very good tasting...|
|Charms Blue Razzb...|They were perfect...|
|Food Should Taste...|Wow, these are so...|
|Skippy Creamy Pea...|I bought this fro...|
|Celestial Seasoni...|I love this tea, ...|
|Nutiva Organic Vi...|I have use

In [11]:
# filter out reviews with less than 200 characters to remove
# irrelevant or weak samples
dft2 = dft2.select('*').where('length(review_body_raw)>200')

In [12]:
dft2.count()

912170

In [13]:
dft2.show(20)

+--------------------+--------------------+
|       product_title|     review_body_raw|
+--------------------+--------------------+
|The Cravings Plac...|As a family aller...|
|Jif Chocolate Nut...|I'm a member of t...|
|Orgain Organic Pl...|Used to be a dece...|
|Bragg - All Natur...|I cannot tell the...|
|Skippy Creamy Pea...|I bought this fro...|
|Celestial Seasoni...|I love this tea, ...|
|Nutiva Organic Vi...|I have used servo...|
|Hershey's Hallowe...|This is a variety...|
|LifeSavers Hard W...|LifeSavers brand ...|
|Amoretti Premium ...|This product is b...|
|Organic Cotton Ca...|My kids and I are...|
|80pk White Coffee...|I am disappointed...|
|Sorbee Sugar Free...|These are really ...|
|V8 V-Fusion Peach...|Who knew you coul...|
|Natural Touch Kaf...|I have used Kaffr...|
|Jif Chocolate Nut...|Im a choosie Moth...|
|Fiber One Chewy B...|I needed more fib...|
|Gerber Good Start...|Like most working...|
|Eatsmart Naturals...|I LOVE cheese puf...|
|Jelly Belly Bean ...|Played the

In [14]:
# rank products by counts

In [15]:
from pyspark.sql.functions import col
dft2.groupBy("product_title") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(20)

+--------------------+-----+
|       product_title|count|
+--------------------+-----+
|San Francisco Bay...| 5738|
|Viva Naturals Org...| 3598|
| Davidson's Tea Bulk| 2604|
|Nutiva Organic Vi...| 2562|
|Amazing Grass Gre...| 2057|
|Ekobrew Coffee Re...| 1980|
|Brooklyn Beans Si...| 1558|
|  Senseo Coffee Pods| 1265|
|Tuscan Dairy Whol...| 1231|
|Celestial Seasoni...| 1196|
|Keurig, The Origi...| 1168|
|Grove Square Capp...| 1136|
|Reese's Spreads P...| 1133|
|Timothy's World C...| 1043|
|Keurig Green Moun...| 1017|
|Matcha Green Tea ...|  997|
|Vita Coco Coconut...|  989|
|Nutiva Hi Fiber H...|  977|
|Surge Citrus Flav...|  945|
|YumEarth Organic ...|  909|
+--------------------+-----+
only showing top 20 rows



In [16]:
dft3 = dft2
dft3.show(20)

+--------------------+--------------------+
|       product_title|     review_body_raw|
+--------------------+--------------------+
|The Cravings Plac...|As a family aller...|
|Jif Chocolate Nut...|I'm a member of t...|
|Orgain Organic Pl...|Used to be a dece...|
|Bragg - All Natur...|I cannot tell the...|
|Skippy Creamy Pea...|I bought this fro...|
|Celestial Seasoni...|I love this tea, ...|
|Nutiva Organic Vi...|I have used servo...|
|Hershey's Hallowe...|This is a variety...|
|LifeSavers Hard W...|LifeSavers brand ...|
|Amoretti Premium ...|This product is b...|
|Organic Cotton Ca...|My kids and I are...|
|80pk White Coffee...|I am disappointed...|
|Sorbee Sugar Free...|These are really ...|
|V8 V-Fusion Peach...|Who knew you coul...|
|Natural Touch Kaf...|I have used Kaffr...|
|Jif Chocolate Nut...|Im a choosie Moth...|
|Fiber One Chewy B...|I needed more fib...|
|Gerber Good Start...|Like most working...|
|Eatsmart Naturals...|I LOVE cheese puf...|
|Jelly Belly Bean ...|Played the

In [17]:
# lower case all

In [18]:
from pyspark.sql.functions import lower, col

dft3 = dft3.withColumn("review_body", lower(col("review_body_raw")))
dft3 = dft3.selectExpr("product_title as product_title", "review_body as review_body")  

In [19]:
dft3.show(3)

+--------------------+--------------------+
|       product_title|         review_body|
+--------------------+--------------------+
|The Cravings Plac...|as a family aller...|
|Jif Chocolate Nut...|i'm a member of t...|
|Orgain Organic Pl...|used to be a dece...|
+--------------------+--------------------+
only showing top 3 rows



In [20]:
# count product instances to filter out low frequency products

In [21]:
import pyspark.sql.functions as F
from pyspark.sql import Window 
counts = dft3.groupBy('product_title').count()
counts = counts.selectExpr("product_title as product_title_tmp", "count as count")  

In [22]:
counts.show(1000)

+--------------------+-----+
|   product_title_tmp|count|
+--------------------+-----+
|Eatsmart Naturals...|    6|
|Caps for Keurig K...|    5|
|40 Count - Cake B...|    2|
|Wonka Laffy TaffyJar|   99|
|Banquet, Morning ...|    1|
|Torani Sugar Free...|  195|
|Dogswell, Happy H...|    3|
|Golden Barrel Bla...|   17|
|Juanitas H And S ...|    6|
|Aikane Kona Coffe...|    1|
|Lenny & Larry's t...|    1|
|Philippine Brand ...|   52|
|Carbon's Golden M...|   11|
| Hershey's Chocolate|  112|
|Kashi Crunchy Hon...|    2|
|BetterBody Foods ...|   46|
|The Spice Lab's P...|  254|
|Brooke Bond Taj M...|    5|
|Wake the F'Up Unc...|   24|
|Keurig Starbucks ...|    5|
|Iris Gummies Hemp...|    3|
|Organic Valley - ...|    3|
|GRASS FED, NON-GM...|    1|
|Lance Peanut Bar ...|    3|
|Stash Tea Single-...|    9|
|The Reaper Puree ...|   17|
|C. Howard Violet ...|   53|
|Italian Black Win...|    8|
|Johnnys Potatoes ...|    1|
|Sunsweet Lighter ...|    1|
|Anchovies Monte P...|    1|
|Carnation Bre

In [23]:
dft3 = dft3.join(counts, dft3.product_title == counts.product_title_tmp)
dft3 = dft3.drop('product_title_tmp')

In [24]:
dft3.count()

912170

In [25]:
dft3 = dft3.select('*').where("count > 1000")
dft3.show(100)

+--------------------+--------------------+-----+
|       product_title|         review_body|count|
+--------------------+--------------------+-----+
|Grove Square Capp...|was so excited to...| 1136|
|Grove Square Capp...|these are okay bu...| 1136|
|Grove Square Capp...|overall the vanil...| 1136|
|Grove Square Capp...|i enjoyed these<b...| 1136|
|Grove Square Capp...|how can this be c...| 1136|
|Grove Square Capp...|we really liked t...| 1136|
|Grove Square Capp...|my husband is alw...| 1136|
|Grove Square Capp...|nice flavor very ...| 1136|
|Grove Square Capp...|personally, i did...| 1136|
|Grove Square Capp...|love all the grov...| 1136|
|Grove Square Capp...|i really enjoy th...| 1136|
|Grove Square Capp...|this is my absolu...| 1136|
|Grove Square Capp...|waaayyy too sweet...| 1136|
|Grove Square Capp...|love, love, love!...| 1136|
|Grove Square Capp...|this product arri...| 1136|
|Grove Square Capp...|fantastic, they a...| 1136|
|Grove Square Capp...|i've had previous...| 1136|


In [26]:
dft3.count()

29286

In [27]:
# To solve class imbalance, undersample the overrepresented classes
# All products have 300+ counts at this point

In [28]:
from pyspark.sql.functions import col
dft3.groupBy("product_title") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(20)

+--------------------+-----+
|       product_title|count|
+--------------------+-----+
|San Francisco Bay...| 5738|
|Viva Naturals Org...| 3598|
| Davidson's Tea Bulk| 2604|
|Nutiva Organic Vi...| 2562|
|Amazing Grass Gre...| 2057|
|Ekobrew Coffee Re...| 1980|
|Brooklyn Beans Si...| 1558|
|  Senseo Coffee Pods| 1265|
|Tuscan Dairy Whol...| 1231|
|Celestial Seasoni...| 1196|
|Keurig, The Origi...| 1168|
|Grove Square Capp...| 1136|
|Reese's Spreads P...| 1133|
|Timothy's World C...| 1043|
|Keurig Green Moun...| 1017|
+--------------------+-----+



In [29]:
counts_by_product = dft3.groupBy("product_title") \
    .count()# \
    #.orderBy(col("count").desc()) \
    #.limit(30) \
    #.show(30)

In [30]:
counts_by_product.show(30)

+--------------------+-----+
|       product_title|count|
+--------------------+-----+
|Grove Square Capp...| 1136|
|Keurig, The Origi...| 1168|
|Ekobrew Coffee Re...| 1980|
|Viva Naturals Org...| 3598|
|Timothy's World C...| 1043|
| Davidson's Tea Bulk| 2604|
|Tuscan Dairy Whol...| 1231|
|Reese's Spreads P...| 1133|
|San Francisco Bay...| 5738|
|Keurig Green Moun...| 1017|
|Brooklyn Beans Si...| 1558|
|  Senseo Coffee Pods| 1265|
|Amazing Grass Gre...| 2057|
|Celestial Seasoni...| 1196|
|Nutiva Organic Vi...| 2562|
+--------------------+-----+



In [31]:
# sample min_count observations from all classes

In [32]:
min_count = counts_by_product.agg({"count": "min"}).collect()[0][0]
print(min_count)

1017


In [33]:
from pyspark.sql import Window
from pyspark.sql.functions import col, rand, row_number
import pyspark.sql.functions as sql_functions

# Take a random sample from each wiki's entries, with a sample size
# equal to the size of the smallest corpus.
w = Window.partitionBy(col("product_title")).orderBy(col("rnd_"))

sampled = (dft3
    .withColumn("rnd_", rand())  # Add random numbers column
    .withColumn("rn_", row_number().over(w))  # Add rowNumber over window
    .where(col("rn_") <= min_count)  # Take n observations
    .drop("rn_")  # drop helper columns
    .drop("rnd_"))

# sampled.show(5)
sampled.groupBy("product_title").count().show()

+--------------------+-----+
|       product_title|count|
+--------------------+-----+
|Grove Square Capp...| 1017|
|Keurig, The Origi...| 1017|
|Ekobrew Coffee Re...| 1017|
|Viva Naturals Org...| 1017|
|Timothy's World C...| 1017|
| Davidson's Tea Bulk| 1017|
|Tuscan Dairy Whol...| 1017|
|Reese's Spreads P...| 1017|
|San Francisco Bay...| 1017|
|Keurig Green Moun...| 1017|
|Brooklyn Beans Si...| 1017|
|  Senseo Coffee Pods| 1017|
|Amazing Grass Gre...| 1017|
|Celestial Seasoni...| 1017|
|Nutiva Organic Vi...| 1017|
+--------------------+-----+



In [34]:
# note: the count column below is not the actual count after undersampling
# it reflects previous counts because the count used for undersampling
# was not saved as a column

In [35]:
sampled.show(5000)

+--------------------+--------------------+-----+
|       product_title|         review_body|count|
+--------------------+--------------------+-----+
|Grove Square Capp...|i did not care fo...| 1136|
|Grove Square Capp...|i live in a famil...| 1136|
|Grove Square Capp...|i just tried this...| 1136|
|Grove Square Capp...|this was not a ba...| 1136|
|Grove Square Capp...|pros: it works in...| 1136|
|Grove Square Capp...|i had read other ...| 1136|
|Grove Square Capp...|this is a great t...| 1136|
|Grove Square Capp...|i like my coffee ...| 1136|
|Grove Square Capp...|i absolutely love...| 1136|
|Grove Square Capp...|i can define only...| 1136|
|Grove Square Capp...|i bought this pro...| 1136|
|Grove Square Capp...|my husband and i ...| 1136|
|Grove Square Capp...|unfortunately i d...| 1136|
|Grove Square Capp...|i am not a big co...| 1136|
|Grove Square Capp...|i hate coffee but...| 1136|
|Grove Square Capp...|i bought this to ...| 1136|
|Grove Square Capp...|these are pretty ...| 1136|


# Model Pipeline

In [36]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="words", pattern = "[^A-Za-z]+", toLowercase=True)


In [37]:
# define more words to remove with nltk library
import nltk
nltk.download('wordnet')

[nltk_data] Downloading package wordnet to
[nltk_data]     /home/alexdziena/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [38]:
# adverbs and adjectives have less predicting power, if any, in most cases
# remove adverbs and adjectives from data

In [39]:
from nltk.tokenize import word_tokenize
import nltk.corpus
import re
from nltk.stem.snowball import SnowballStemmer
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import wordnet as wn

list_adv = []
list_adj = []
list_n = []

for s in wn.all_synsets():
    if s.pos() in ['r']: # if synset is adverb
        for i in s.lemmas(): # iterate through lemmas for each synset
            list_adv.append(i.name())
    elif s.pos() in ['a']:
        for i in s.lemmas(): # iterate through lemmas for each synset
            list_adj.append(i.name())
    elif s.pos() in ['n']: # if synset is noun
        for i in s.lemmas(): # iterate through lemmas for each synset
            list_n.append(i.name())
       

In [40]:
# remove stop words and irrelevant words
add_stopwords = ["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now", "ve", "se", "didn", "hasn", "hadn", "hasnt", "isnt", "havent", "although", "despite", "however" ]
add_irrelevantwords = ["poor", "perfect", "good", "excellent", "excelent" ,"great", "horrible", "cheap", "expensive", "different", "awesome"]
single_alphabet = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"]

# word_filter is a concatenated string of all unnecessary words
word_filter = add_stopwords + add_irrelevantwords + single_alphabet + list_adv + list_adj
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(word_filter)

In [41]:
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [42]:
# combine processes into a pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "product_title", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])


In [43]:
# run data through the pipeline
pipelineFit_t = pipeline.fit(sampled)
dataset_t = pipelineFit_t.transform(sampled)
dataset_t.show(5)
dataset_final = dataset_t.select('*').where('label < 10')

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-----+
|       product_title|         review_body|count|               words|            filtered|            features|label|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-----+
|Grove Square Capp...|i did not care fo...| 1136|[i, did, not, car...|[care, product, c...|(5768,[0,4,6,24,2...|  8.0|
|Grove Square Capp...|i live in a famil...| 1136|[i, live, in, a, ...|[family, coffee, ...|(5768,[0,7,11,20,...|  8.0|
|Grove Square Capp...|i just tried this...| 1136|[i, just, tried, ...|[tried, product, ...|(5768,[2,6,8,13,1...|  8.0|
|Grove Square Capp...|this was not a ba...| 1136|[this, was, not, ...|[overall, tasting...|(5768,[0,20,35,45...|  8.0|
|Grove Square Capp...|pros: it works in...| 1136|[pros, it, works,...|[pros, works, keu...|(5768,[0,1,4,5,10...|  8.0|
+--------------------+--------------------+-----

In [44]:
# drop intermediate steps to clean up data
data = dataset_final.drop('count', 'words', 'filtered')
data.show(3)
#(trainingData_t, testData_t) = data.randomSplit([0.7, 0.3], seed = 100)
#print("Training Dataset 1 Count: " + str(trainingData_t.count()))
#print("Test Dataset Count: " + str(testData_t.count()))

+--------------------+--------------------+--------------------+-----+
|       product_title|         review_body|            features|label|
+--------------------+--------------------+--------------------+-----+
|Grove Square Capp...|i did not care fo...|(5768,[0,4,6,24,2...|  8.0|
|Grove Square Capp...|i live in a famil...|(5768,[0,7,11,20,...|  8.0|
|Grove Square Capp...|i just tried this...|(5768,[2,6,8,13,1...|  8.0|
+--------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [45]:
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
#evaluator.evaluate(predictions)

# Feature Selection

In [46]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

In [47]:
# use chi square selector to select the most effective features
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selectedFeatures", labelCol="label")

result = selector.fit(data).transform(data)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()

ChiSqSelector output with top 10 features selected
+--------------------+--------------------+--------------------+-----+--------------------+
|       product_title|         review_body|            features|label|    selectedFeatures|
+--------------------+--------------------+--------------------+-----+--------------------+
|Grove Square Capp...|i did not care fo...|(5768,[0,4,6,24,2...|  8.0|(10,[0,5],[2.0,2.0])|
|Grove Square Capp...|i live in a famil...|(5768,[0,7,11,20,...|  8.0|(10,[0,6],[2.0,1.0])|
|Grove Square Capp...|i just tried this...|(5768,[2,6,8,13,1...|  8.0|(10,[2,5,7],[1.0,...|
|Grove Square Capp...|this was not a ba...|(5768,[0,20,35,45...|  8.0|      (10,[0],[1.0])|
|Grove Square Capp...|pros: it works in...|(5768,[0,1,4,5,10...|  8.0|(10,[0,1,4,9],[3....|
|Grove Square Capp...|i had read other ...|(5768,[10,17,23,4...|  8.0|      (10,[9],[1.0])|
|Grove Square Capp...|this is a great t...|(5768,[6,13,20,36...|  8.0|      (10,[5],[1.0])|
|Grove Square Capp...|i like 

In [48]:
selected_data = result.drop('features')
selected_data = selected_data.selectExpr("product_title as product_title", "review_body as review_body", "selectedFeatures as features", "label as label")
selected_data.show(3)
(trainingData_t, testData_t) = selected_data.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset 1 Count: " + str(trainingData_t.count()))
print("Test Dataset Count: " + str(testData_t.count()))

+--------------------+--------------------+--------------------+-----+
|       product_title|         review_body|            features|label|
+--------------------+--------------------+--------------------+-----+
|Grove Square Capp...|i did not care fo...|(10,[0,5],[2.0,2.0])|  8.0|
|Grove Square Capp...|i live in a famil...|(10,[0,6],[2.0,1.0])|  8.0|
|Grove Square Capp...|i just tried this...|(10,[2,5,7],[1.0,...|  8.0|
+--------------------+--------------------+--------------------+-----+
only showing top 3 rows

Training Dataset 1 Count: 7206
Test Dataset Count: 2964


In [49]:
from pyspark.ml.classification import LogisticRegression

lrt = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8)
lrModelt = lrt.fit(trainingData_t)
predictions = lrModelt.transform(testData_t)
predictions.filter(predictions['prediction'] == 0) \
    .select("product_title","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 5, truncate = 30)

+------------------------------+------------------------------+-----+----------+
|                 product_title|                   probability|label|prediction|
+------------------------------+------------------------------+-----+----------+
|Grove Square Cappuccino, Si...|[0.10290129960212455,0.1016...|  8.0|       0.0|
|Grove Square Cappuccino, Si...|[0.10290129960212455,0.1016...|  8.0|       0.0|
|Grove Square Cappuccino, Si...|[0.10290129960212455,0.1016...|  8.0|       0.0|
|Grove Square Cappuccino, Si...|[0.10290129960212455,0.1016...|  8.0|       0.0|
|Grove Square Cappuccino, Si...|[0.10290129960212455,0.1016...|  8.0|       0.0|
+------------------------------+------------------------------+-----+----------+
only showing top 5 rows



# Cross Validation for hyperparameter tuning

In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

paramGrid = (ParamGridBuilder()
             .addGrid(lrt.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lrt.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# 5-fold CrossValidator
cv = CrossValidator(estimator=lrt, estimatorParamMaps=paramGrid,evaluator=MulticlassClassificationEvaluator(), numFolds=5)
cvModel = cv.fit(trainingData_t)
predictions_cv = cvModel.transform(testData_t)

In [51]:

selected = predictions_cv.select("product_title", "review_body", "probability", "label", "prediction")

evaluator.evaluate(predictions_cv)

0.37259774229121634

In [52]:
selected.count()

2964

In [53]:
selected.show(1000)

+--------------------+--------------------+--------------------+-----+----------+
|       product_title|         review_body|         probability|label|prediction|
+--------------------+--------------------+--------------------+-----+----------+
|Grove Square Capp...|1 star for shippi...|[0.08364765774851...|  8.0|       7.0|
|Grove Square Capp...|80 calories and o...|[0.09739090945340...|  8.0|       4.0|
|Grove Square Capp...|[[asin:b005k4q37u...|[0.03358936528190...|  8.0|       2.0|
|Grove Square Capp...|a &#34;flavored&#...|[0.06720578864222...|  8.0|       1.0|
|Grove Square Capp...|absolutely delici...|[0.07274400438263...|  8.0|       1.0|
|Grove Square Capp...|absolutely love t...|[0.09422981354997...|  8.0|       3.0|
|Grove Square Capp...|after drinking th...|[0.08031046712234...|  8.0|       8.0|
|Grove Square Capp...|after finding out...|[0.07649811562134...|  8.0|       9.0|
|Grove Square Capp...|after visiting us...|[0.06720578864222...|  8.0|       1.0|
|Grove Square Ca

In [54]:
dft3.where(dft3.product_title.contains("Grove Square Capp")).count()

1136

In [55]:
selected.select('product_title').distinct().count()

10

In [56]:
selected.select('label').distinct().count()

10

In [57]:
distinct_pts = selected.alias('distinct_pts').selectExpr('product_title as predicted_pt', 'label as lookup_label').distinct()
selected_pts = selected.alias('selected_pts').selectExpr('product_title as true_pt', 'prediction', 'label as true_label')
selected_pts.join(distinct_pts, selected_pts.prediction == distinct_pts.lookup_label, 'left').orderBy(rand()).show(1000) 

+--------------------+----------+----------+--------------------+------------+
|             true_pt|prediction|true_label|        predicted_pt|lookup_label|
+--------------------+----------+----------+--------------------+------------+
|Timothy's World C...|       8.0|       3.0|Grove Square Capp...|         8.0|
|Celestial Seasoni...|       0.0|       6.0| Davidson's Tea Bulk|         0.0|
|Celestial Seasoni...|       0.0|       6.0| Davidson's Tea Bulk|         0.0|
|Keurig Green Moun...|       4.0|       5.0|Nutiva Organic Vi...|         4.0|
|Grove Square Capp...|       1.0|       8.0|Keurig, The Origi...|         1.0|
|Brooklyn Beans Si...|       3.0|       7.0|Timothy's World C...|         3.0|
| Davidson's Tea Bulk|       6.0|       0.0|Celestial Seasoni...|         6.0|
|Keurig Green Moun...|       4.0|       5.0|Nutiva Organic Vi...|         4.0|
|Tuscan Dairy Whol...|       9.0|       9.0|Tuscan Dairy Whol...|         9.0|
| Davidson's Tea Bulk|       0.0|       0.0| Davidso

In [None]:
# random forest - dropped due to low accuracy
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
rfmodel = rf.fit(trainingData_t)
predictions_rf = rfmodel.transform(testData_t)
predictions_rf.select("predictedLabel", "label", "features").show(5)
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_rf)
print("Accuracy = %g" % (accuracy))