# 1. Initialize Spark and the SparkSession

In [71]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('Spark Lab1')
sc = SparkContext(conf=conf)

spark = SparkSession \
    .builder \
    .appName("Spark Lab1") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()
spark

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Spark Lab1, master=local[*]) created by __init__ at <ipython-input-1-482db03b4247>:7 

# 2. Import data and plot initial data information

In [72]:
import matplotlib.pyplot as plt
plt.figure(figsize=(10,10))
dataDF = spark.read.options(sep='\t',header=True).csv('amazon_reviews_us_Wireless_v1_00.tsv')
print("Total Dataset Length: {}".format(dataDF.count()))
dataDF.createOrReplaceTempView("reviewData")
reviewData = spark.sql("SELECT * FROM reviewData LIMIT 100000") 
print("Active Dataset Length: {}".format(reviewData.count()))
reviewData.show()
#https://www.amazon.com/gp/vine/help

Total Dataset Length: 9002021
Active Dataset Length: 100000
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     5168

<Figure size 720x720 with 0 Axes>

In [7]:
reviewData.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



# 3. Drop uninteresting columns and rows with missing data or error in input. Also filter out rows without verified purchase and drop the verified purchase column. Also convert vine column to integers instead of Y/N

In [73]:
# Drop the marketplace column and product_category column since they are the same for every product

reviewData = reviewData.drop("marketplace","product_category").dropna().dropDuplicates()

reviewData = reviewData.filter(reviewData.verified_purchase == "Y")
reviewData = reviewData.drop("verified_purchase")


reviewData_cleared = reviewData.replace(["Y","N"],["1","0"],"vine")
reviewData = reviewData_cleared.withColumn("vine",reviewData_cleared.vine.cast("int"))

reviewData = reviewData.withColumn("star_rating", reviewData.star_rating.cast("int"))
reviewData = reviewData.withColumn("helpful_votes", reviewData.helpful_votes.cast("int"))
reviewData = reviewData.withColumn("total_votes", reviewData.total_votes.cast("int"))

reviewData.show()

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+--------------------+--------------------+-----------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|     review_headline|         review_body|review_date|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+--------------------+--------------------+-----------+
|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|          2|            1|          3|   0|Looks good, funct...|2 issues  -  Once...| 2015-08-31|
|   15184378| RY8I449HNXSVF|B00SXRXUKO|     984297154|Tribe AB40 Water ...|          5|            0|          0|   0|          Five Stars|  Fits iPhone 6 well| 2015-08-31|
|   10203548|R18TLJYCKJFLSR|B009V5X1CE|     279912704|RAVPower® Element...|          5|            0|          0|   0|       Great char

In [91]:
test = reviewData_cleared.groupBy("product_id").count().sort("count",ascending=False)
test.show()
test2 = test.filter(test["count"] >= 5)
test.take(1)

+----------+-----+
|product_id|count|
+----------+-----+
|B00QN1T6NM|  287|
|B00LBK7OSY|  236|
|B00KWR8ME2|  167|
|B005X1Y7I2|  158|
|B00MQSMEEE|  148|
|B00JRGOKQ8|  119|
|B00NH13YHK|  118|
|B00K4VQZCM|  114|
|B00UH8KKA0|  103|
|B00UCZGS6S|   97|
|B00OT6YUIY|   92|
|B00N0YUKEO|   92|
|B00OJE1SG8|   86|
|B00NH131LY|   83|
|B00WUDX250|   83|
|B009USAJCC|   78|
|B00OQ19QYA|   73|
|B00P936188|   73|
|B00JM59JPG|   72|
|B00PGJWYJ0|   72|
+----------+-----+
only showing top 20 rows



[Row(product_id='B00QN1T6NM', count=287)]

# 4. Split review column into list and perform various filtering/cleaning

In [78]:
import pyspark.sql.functions as sqlFunc


# make review_body lower case
reviewData_temp = reviewData.withColumn("review_body",sqlFunc.lower(sqlFunc.col("review_body")))


# Clean data using various regex replacements
# 1) Removing non-letters
reviewData_temp = reviewData_temp.withColumn("review_body",sqlFunc.regexp_replace("review_body","[^a-zA-Z\\s]",""))
# 2) Removing repetitive spaces
reviewData_temp = reviewData_temp.withColumn("review_body",sqlFunc.regexp_replace("review_body","\s{2,}"," "))
# 3) Removing spaces at start of word
reviewData_temp = reviewData_temp.withColumn("review_body",sqlFunc.regexp_replace("review_body","^(\s)",""))
# Split review_body on space
split_col = sqlFunc.split(reviewData_temp.review_body, ' ')
# Replace " " with ""
reviewData_cleaned = reviewData_temp.withColumn("review_body", split_col).replace([" "],[""],"review_body")
reviewData_cleaned.show()
#treebankTagger = nltk.data.load('taggers/maxent_treebank_pos_tagger/english.pickle')


#def tag_df (s):
#    return treebankTagger.tag(s)


#tag_udf = udf(tag_df,ArrayType(ArrayType(StringType())))

#testCol = testRdd.withColumn("review_tagged",tag_udf(testRdd.review_body))


#print(testCol.show())


+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+--------------------+--------------------+-----------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|vine|     review_headline|         review_body|review_date|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+----+--------------------+--------------------+-----------+
|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|          2|            1|          3|   0|Looks good, funct...|[issues, once, i,...| 2015-08-31|
|   15184378| RY8I449HNXSVF|B00SXRXUKO|     984297154|Tribe AB40 Water ...|          5|            0|          0|   0|          Five Stars|[fits, iphone, well]| 2015-08-31|
|   10203548|R18TLJYCKJFLSR|B009V5X1CE|     279912704|RAVPower® Element...|          5|            0|          0|   0|       Great char

In [102]:
import nltk
from pyspark.sql.types import ArrayType, DoubleType, StringType
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import collect_list,col, udf
from nltk.sentiment.vader import SentimentIntensityAnalyzer 

# Initialize lemmatizer
lemmatizer = nltk.wordnet.WordNetLemmatizer()
# Initialize Sentiment analyzer 
sent = SentimentIntensityAnalyzer()

def lemmatizeUDF(revs): 
    return [[lemmatizer.lemmatize(w) for w in s] for s in revs]

def sentimentUDF(revs):
    return [sent.polarity_scores(" ".join(rev))["compound"] for rev in revs]

lemmatize_udf = udf(lemmatizeUDF, ArrayType(ArrayType(StringType())))
sentiment_udf = udf(sentimentUDF,ArrayType(DoubleType()))

# Get product id with reviews
idWithReviews = reviewData_cleaned.select("product_id","review_body")

# Remove Stop Words from reviews
stopWordRemover = StopWordsRemover(inputCol="review_body", outputCol="review_noStop")#,stopWords = stopWords)
stopWordsRemoved = stopWordRemover.transform(reviewData_cleaned)

# Product ID with the aggregated list of reviews
idsWithAllReviews = stopWordsRemoved.groupBy("product_id").agg(collect_list('review_noStop').alias("review_body_agg"))

# Perform Lemmatization on all reviews
idsWithAllReviews = idsWithAllReviews.withColumn("review_body_agg", lemmatize_udf(idsWithAllReviews.review_body_agg))

# Calculate the compound sentiment score for all reviews
idsWithAllReviews = idsWithAllReviews.withColumn("review_sentiments", sentiment_udf(idsWithAllReviews.review_body_agg))


idsWithAllReviews.show()



+----------+--------------------+--------------------+
|product_id|     review_body_agg|   review_sentiments|
+----------+--------------------+--------------------+
|0594033926|[[described, deli...|               [0.0]|
|0594459451|[[work, nook, col...|               [0.0]|
|0594481902|[[bought, item, r...|[-0.7351, 0.0, 0....|
|059448426X|[[best, nook, cov...|            [0.1779]|
|1059241536|[[poor, broke, day]]|           [-0.7096]|
|1059241692|[[hoping, allow, ...|             [0.836]|
|1059246147|[[cord, received,...|            [0.3612]|
|1059275171|[[worked, several...|           [-0.2263]|
|1059356937|            [[love]]|            [0.6369]|
|1059359189|[[good, broken, i...|   [-0.0516, 0.6249]|
|1059366363|[[good, quality, ...|  [0.8332, 0.0, 0.0]|
|1059451522|[[bought, family,...|               [0.0]|
|1059641682|[[simple, easy, r...|            [0.4404]|
|1059656000|[[exactly, descri...|               [0.0]|
|1059742772|[[charger, work, ...|            [0.5106]|
|105984410

In [98]:
from pyspark.ml.feature import NGram

flattened = idsWithAllReviews.withColumn("review_flat", sqlFunc.flatten("review_body_agg"))

ngram = NGram(n=2, inputCol= "review_flat", outputCol="ngrams")
ngramDF = ngram.transform(flattened)

ngramDF.show()


+----------+--------------------+--------------------+--------------------+
|product_id|     review_body_agg|         review_flat|              ngrams|
+----------+--------------------+--------------------+--------------------+
|0594033926|[[described, deli...|[described, deliv...|[described delive...|
|0594459451|[[work, nook, col...|[work, nook, colo...|[work nook, nook ...|
|0594481902|[[bought, item, r...|[bought, item, re...|[bought item, ite...|
|059448426X|[[best, nook, cov...|[best, nook, cove...|[best nook, nook ...|
|1059241536|[[poor, broke, day]]|  [poor, broke, day]|[poor broke, brok...|
|1059241692|[[hoping, allow, ...|[hoping, allow, c...|[hoping allow, al...|
|1059246147|[[cord, received,...|[cord, received, ...|[cord received, r...|
|1059275171|[[worked, several...|[worked, several,...|[worked several, ...|
|1059356937|            [[love]]|              [love]|                  []|
|1059359189|[[good, broken, i...|[good, broken, it...|[good broken, bro...|
|1059366363|

In [101]:
t = ngramDF.filter(ngramDF.product_id =="B00QN1T6NM").select(explode("ngrams").alias("ngrams")).groupBy("ngrams").count().orderBy("count",ascending=False)
t.show(n=10,truncate=False)

+----------------+-----+
|ngrams          |count|
+----------------+-----+
|screen protector|136  |
|easy install    |25   |
|phone screen    |20   |
|dropped phone   |20   |
|glass screen    |17   |
|easy apply      |15   |
|easy put        |15   |
|air bubble      |14   |
|great product   |12   |
|tempered glass  |12   |
+----------------+-----+
only showing top 10 rows



In [92]:
t = ngramDF.filter(ngramDF.product_id =="B00QN1T6NM").select(explode("review_flat").alias("review_flat")).groupBy("review_flat").count().orderBy("count",ascending=False)
t.show()

+-----------+-----+
|review_flat|count|
+-----------+-----+
|     screen|  278|
|  protector|  195|
|      phone|  149|
|       easy|   88|
|      great|   83|
|        one|   70|
|      glass|   59|
|    product|   57|
|     iphone|   46|
|     bubble|   41|
|        put|   39|
|       case|   37|
|       good|   36|
|    install|   35|
|       like|   34|
|       time|   31|
|       edge|   30|
|    dropped|   30|
|     really|   27|
|      crack|   27|
+-----------+-----+
only showing top 20 rows



In [84]:
from pyspark.mllib.feature import Word2Vec

inp = ngramDF.filter(ngramDF.product_id =="B00QN1T6NM").select("review_flat").rdd.map(lambda x: x.review_flat)


word2vec = Word2Vec()
model = word2vec.fit(inp)

synonyms = model.findSynonyms("look", 10)

for word, cosine_distance in synonyms:
    print("{}: {}".format(word, cosine_distance))



easy: 0.7581527233123779
product: 0.753288745880127
like: 0.7510282397270203
protector: 0.7502449154853821
easily: 0.7421942353248596
wipe: 0.7393692135810852
job: 0.7382448315620422
came: 0.737947940826416
never: 0.7375704646110535
still: 0.7339732050895691


In [19]:
from pyspark.sql.functions import explode, array

t = flattened.filter(flattened.product_id == "B00QN1T6NM").select(flattened.product_id,explode(flattened.review_body_agg).alias("reviews"))
#t.show()
ngram2 = NGram(n=2, inputCol= "reviews", outputCol="ngrams")
n = ngram2.transform(t)
n.show()

"""
t = ngramDF.filter(ngramDF.product_id == "B00QN1T6NM" ).select(ngramDF.product_id,explode(ngramDF.ngrams).alias("ngrams"))
t = t.select("product_id",array("ngrams").alias("ngrams")).groupBy("product_id").agg(collect_list("ngrams").alias("ngrams"))
t.show()"""

+----------+--------------------+--------------------+
|product_id|             reviews|              ngrams|
+----------+--------------------+--------------------+
|B00QN1T6NM|       [great, fit!]|        [great fit!]|
|B00QN1T6NM|[great, work, per...|[great work, work...|
|B00QN1T6NM|              [good]|                  []|
|B00QN1T6NM|    [cracked, week.]|     [cracked week.]|
|B00QN1T6NM|   [nice, protector]|    [nice protector]|
|B00QN1T6NM|[originally, beli...|[originally belie...|
|B00QN1T6NM|[like, reviewer, ...|[like reviewer, r...|
|B00QN1T6NM|[awesome, product...|[awesome product!...|
|B00QN1T6NM|[used, lot, diffe...|[used lot, lot di...|
|B00QN1T6NM|[easy, put, came,...|[easy put, put ca...|
|B00QN1T6NM|[work, great, rea...|[work great, grea...|
|B00QN1T6NM|[got, it-, easy, ...|[got it-, it- eas...|
|B00QN1T6NM|[put, correctly,,...|[put correctly,, ...|
|B00QN1T6NM|[previous, glass,...|[previous glass, ...|
|B00QN1T6NM|[safe, easy, inst...|[safe easy, easy ...|
|B00QN1T6N

'\nt = ngramDF.filter(ngramDF.product_id == "B00QN1T6NM" ).select(ngramDF.product_id,explode(ngramDF.ngrams).alias("ngrams"))\nt = t.select("product_id",array("ngrams").alias("ngrams")).groupBy("product_id").agg(collect_list("ngrams").alias("ngrams"))\nt.show()'

In [20]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import explode
wordsData = n#ngramDF.filter(ngramDF.product_id == "B00QN1T6NM").select( "product_id",explode(ngramDF.review_body).alias("review_body"))
wordsData.show()

hashingTF = HashingTF(inputCol="ngrams",outputCol="tf")
tf = hashingTF.transform(wordsData)
#tf.show()
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF(inputCol="tf",outputCol="tfidf",minDocFreq=5).fit(tf)
tfidf = idf.transform(tf)




# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
#idfIgnore = IDF(inputCol="tf",outputCol="tfidf",minDocFreq=2).fit(tf)
#tfidfIgnore = idfIgnore.transform(tf)
#tfidfIgnore.show(n=2,vertical=True,truncate=False)


+----------+--------------------+--------------------+
|product_id|             reviews|              ngrams|
+----------+--------------------+--------------------+
|B00QN1T6NM|       [great, fit!]|        [great fit!]|
|B00QN1T6NM|[great, work, per...|[great work, work...|
|B00QN1T6NM|              [good]|                  []|
|B00QN1T6NM|    [cracked, week.]|     [cracked week.]|
|B00QN1T6NM|   [nice, protector]|    [nice protector]|
|B00QN1T6NM|[originally, beli...|[originally belie...|
|B00QN1T6NM|[like, reviewer, ...|[like reviewer, r...|
|B00QN1T6NM|[awesome, product...|[awesome product!...|
|B00QN1T6NM|[used, lot, diffe...|[used lot, lot di...|
|B00QN1T6NM|[easy, put, came,...|[easy put, put ca...|
|B00QN1T6NM|[work, great, rea...|[work great, grea...|
|B00QN1T6NM|[got, it-, easy, ...|[got it-, it- eas...|
|B00QN1T6NM|[put, correctly,,...|[put correctly,, ...|
|B00QN1T6NM|[previous, glass,...|[previous glass, ...|
|B00QN1T6NM|[safe, easy, inst...|[safe easy, easy ...|
|B00QN1T6N

In [23]:
tfidf.printSchema()
tfidf.select("tfidf").show(2,vertical=True,truncate=False)

root
 |-- product_id: string (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- tf: vector (nullable = true)
 |-- tfidf: vector (nullable = true)

-RECORD 0----------------------------------
 tfidf | (262144,[117225],[0.0])           
-RECORD 1----------------------------------
 tfidf | (262144,[61576,255925],[0.0,0.0]) 
only showing top 2 rows



In [33]:
from pyspark.sql.types import DoubleType

def extract_values_from_vec(vector):
    return vector.values.tolist()

#def extract_values_from_vec_udf(col):
#        return udf(extract_values_from_vec,ArrayType(DoubleType()))
extract_values_from_vec_udf = udf(extract_values_from_vec,ArrayType(DoubleType()))
test = tfidf.withColumn("ext",extract_values_from_vec_udf("tfidf")).select("product_id","ngrams","ext").sort("ext",ascending=False)
test.show(vertical=False)
#tfidf.select(,"tfidf").show()

+----------+--------------------+--------------------+
|product_id|              ngrams|                 ext|
+----------+--------------------+--------------------+
|B00QN1T6NM|[this is, is a, a...|[3.87120101090789...|
|B00QN1T6NM|[love this, this ...|[3.87120101090789...|
|B00QN1T6NM|[it is, is a, a g...|[3.87120101090789...|
|B00QN1T6NM|[great product!, ...|[3.87120101090789...|
|B00QN1T6NM|[no problem, prob...|[3.87120101090789...|
|B00QN1T6NM|[excellent produc...|[3.87120101090789...|
|B00QN1T6NM|[such bad, bad qu...|[3.87120101090789...|
|B00QN1T6NM|[screen wa, wa a,...|[3.71705033108063...|
|B00QN1T6NM|[it a, a very, ve...|[3.71705033108063...|
|B00QN1T6NM|[this wa, wa quit...|[3.58351893845611...|
|B00QN1T6NM|[to wa, wa easy, ...|[3.58351893845611...|
|B00QN1T6NM|[very impressive,...|[3.58351893845611...|
|B00QN1T6NM|[it fit, fit my, ...|[3.46573590279972...|
|B00QN1T6NM|[shattered when, ...|[3.46573590279972...|
|B00QN1T6NM|          [the best]|[3.4657359027997265]|
|B00QN1T6N

In [34]:
test.rdd.first()

Row(product_id='B00QN1T6NM', ngrams=['this is', 'is a', 'a sturdy', 'sturdy protector', 'protector and', 'and the', 'the lifetime', 'lifetime warrenty', 'warrenty is', 'is a', 'a plus.', 'plus. wish', 'wish it', 'it had', 'had just', 'just a', 'a little', 'little more', 'more over', 'over lap', 'lap on', 'on the', 'the screen', 'screen to', 'to make', 'make it', 'it easier', 'easier to', 'to line', 'line everything', 'everything up.', 'up. the', 'the included', 'included cleaner', 'cleaner helped', 'helped get', 'get it', 'it on', 'on perfectly.'], ext=[3.871201010907891, 3.58351893845611, 0.0, 0.0, 0.0, 0.0, 0.0, 3.1780538303479458, 0.0, 0.0, 0.0, 0.0, 0.0, 2.5274662642067964, 0.0, 3.2650652073375754, 2.954910279033736, 0.0, 3.871201010907891, 0.0, 0.0, 3.7170503310806327, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.7170503310806327, 1.7509374747078, 3.3603753871419, 0.0, 0.0, 2.2289732756508, 6.3561076606958915])

In [120]:
from pyspark.ml.feature import Word2Vec

inp = ngramDF.filter(ngramDF.product_id =="B00YL0EKWE").select("review_flat").rdd.take(1)[0].review_flat
word2vec = Word2Vec()

model = word2vec.fit(inp)
synonyms = model.findSynonyms()

AttributeError: 'list' object has no attribute '_jdf'

In [30]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = reviewData

tokenizer = Tokenizer(inputCol="review_body", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

wordsData.show(5)
#hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
#featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

#idf = IDF(inputCol="rawFeatures", outputCol="features")
#idfModel = idf.fit(featurizedData)
#rescaledData = idfModel.transform(featurizedData)

#rescaledData.select("review_id", "features").show(n=10,vertical=True,truncate=False)

IllegalArgumentException: requirement failed: Input type must be string type but got array<string>.

In [9]:
import pyspark.sql.functions as sqlFunc
from pyspark.sql.types import ArrayType
testRdd = reviewData_cleared.filter(reviewData_cleared.product_id == "B009V5X1CE").select("review_body","star_rating")
split_col = sqlFunc.split(testRdd.review_body, ' ')
testRdd = testRdd.withColumn("review_body", split_col)
testRdd.describe("star_rating").show()
#averageRating.take(1)
testRdd.first()

+-------+------------------+
|summary|       star_rating|
+-------+------------------+
|  count|                36|
|   mean| 4.611111111111111|
| stddev|1.0495652917219116|
|    min|                 1|
|    max|                 5|
+-------+------------------+



Row(review_body=['Great', 'charger.', '', 'I', 'easily', 'get', '3+', 'charges', 'on', 'a', 'Samsung', 'Galaxy', '3.', '', 'Works', 'perfectly', 'for', 'camping', 'trips', 'or', 'long', 'days', 'on', 'the', 'boat.'], star_rating=5)

In [122]:
from nltk.sentiment import SentimentAnalyzer
from nltk.sentiment.util import mark_negation
import re
#print(mark_negation(testRdd.first().review_body))
#k = mark_negation(testRdd.first().review_body)
#testRdd= reviewData
print(reviewData.first().review_body)
print(re.split("([,.!? ])", reviewData.first().review_body))
#print([tag_df(row.review_body) for row in testRdd.take(3)])
#print(treebankTagger.tag(k))

2 issues  -  Once I turned on the circle apps and installed this case,  my battery drained twice as fast as usual.  I ended up turning off the circle apps, which kind of makes the case just a case...  with a hole in it.  Second,  the wireless charging doesn't work.  I have a Motorola 360 watch and a Qi charging pad. The watch charges fine but this case doesn't. But hey, it looks nice.
['2', ' ', 'issues', ' ', '', ' ', '-', ' ', '', ' ', 'Once', ' ', 'I', ' ', 'turned', ' ', 'on', ' ', 'the', ' ', 'circle', ' ', 'apps', ' ', 'and', ' ', 'installed', ' ', 'this', ' ', 'case', ',', '', ' ', '', ' ', 'my', ' ', 'battery', ' ', 'drained', ' ', 'twice', ' ', 'as', ' ', 'fast', ' ', 'as', ' ', 'usual', '.', '', ' ', '', ' ', 'I', ' ', 'ended', ' ', 'up', ' ', 'turning', ' ', 'off', ' ', 'the', ' ', 'circle', ' ', 'apps', ',', '', ' ', 'which', ' ', 'kind', ' ', 'of', ' ', 'makes', ' ', 'the', ' ', 'case', ' ', 'just', ' ', 'a', ' ', 'case', '.', '', '.', '', '.', '', ' ', '', ' ', 'with', ' 

In [145]:
b = re.split("([.!?])", reviewData.first().review_body)
st = sc.parallelize(b).map(lambda x : x.lower()).filter(lambda x: x not in [".",",","!","?"]) 

print(st.collect())

['2 issues  -  once i turned on the circle apps and installed this case,  my battery drained twice as fast as usual', '  i ended up turning off the circle apps, which kind of makes the case just a case', '', '', '  with a hole in it', "  second,  the wireless charging doesn't work", '  i have a motorola 360 watch and a qi charging pad', " the watch charges fine but this case doesn't", ' but hey, it looks nice', '']


In [137]:
import nltk

ps = nltk.wordnet.WordNetLemmatizer()
stemmed = filtered_st.map(lambda w: ps.lemmatize(w))
#print(stemmed.collect())
tagged = sc.parallelize(tag_df(stemmed.collect())).filter(lambda x: x[1] in ["NN","VBD","VBG","VV","NNS"])
print(tagged.collect())


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\Andreas\AppData\Roaming\nltk_data...


['issue', 'turned', 'circle', 'apps', 'installed', 'case', 'battery', 'drained', 'twice', 'fast', 'usual', 'ended', 'turning', 'circle', 'apps', 'kind', 'make', 'case', 'case', 'hole', 'second', 'wireless', 'charging', 'work', 'motorola', 'watch', 'qi', 'charging', 'pad', 'watch', 'charge', 'fine', 'case', 'hey', 'look', 'nice']
[('issue', 'NN'), ('turned', 'VBD'), ('circle', 'NN'), ('apps', 'NNS'), ('installed', 'VBD'), ('case', 'NN'), ('battery', 'NN'), ('drained', 'VBD'), ('twice', 'NN'), ('fast', 'NN'), ('ended', 'VBD'), ('turning', 'VBG'), ('circle', 'NN'), ('apps', 'NNS'), ('case', 'NN'), ('case', 'NN'), ('hole', 'NN'), ('second', 'NN'), ('wireless', 'NN'), ('charging', 'VBG'), ('work', 'NN'), ('motorola', 'NN'), ('watch', 'NN'), ('qi', 'NN'), ('charging', 'VBG'), ('pad', 'NN'), ('watch', 'NN'), ('charge', 'NN'), ('fine', 'NN'), ('case', 'NN'), ('hey', 'NN'), ('look', 'NN'), ('nice', 'NN')]


In [51]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer 
st = ["good","bad"]
sent = SentimentIntensityAnalyzer()
for w in st:
    print(w)
    print(sent.polarity_scores(w)["compound"])

good
0.4404
bad
-0.5423
