In [1]:
!pip install pyspark



In [2]:
!wget --no-verbose http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Clothing_Shoes_and_Jewelry_5.json.gz

2018-02-20 03:42:29 URL:http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Clothing_Shoes_and_Jewelry_5.json.gz [47285200/47285200] -> "reviews_Clothing_Shoes_and_Jewelry_5.json.gz" [1]


+/- Effect Lexicon http://mpqa.cs.pitt.edu/lexicons/effect_lexicon/

In [22]:
!wget --no-verbose http://mpqa.cs.pitt.edu/data/effectwordnet.zip

2018-02-20 04:30:50 URL:http://mpqa.cs.pitt.edu/data/effectwordnet.zip [528829/528829] -> "effectwordnet.zip" [1]


In [23]:
!unzip effectwordnet.zip

Archive:  effectwordnet.zip
   creating: effectwordnet/
  inflating: effectwordnet/EffectWordNet.tff  
  inflating: effectwordnet/goldStandard.tff  
  inflating: effectwordnet/README    


In [2]:
import sys
sys.path.insert(0, "../")
from garrens_utils import *

In [3]:
from pyspark.ml import Pipeline, PipelineModel
from sparknlp.annotator import *
from sparknlp.base import DocumentAssembler, Finisher

In [5]:
reviews = spark.read.json("../data/input/reviews_Clothing_Shoes_and_Jewelry_5.json.gz")
reviews.cache().count() # FAST is GOOD
reviews.createOrReplaceTempView("reviews")

In [18]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [16]:
%%sql_display
SELECT * 
FROM reviews
ORDER BY RAND()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,B0057D57NK,"[0, 0]",5.0,This is pretty! The color was slightly differe...,"02 26, 2014",A2PTDSWNW0OMGH,MansonGirl,Nice!,1393372800
1,B000PHI5L4,"[6, 7]",4.0,"I ordered the 3XL and despite my size, there i...","08 14, 2012",AKD481B2E5V8L,"P. Klein ""librarian56""",Nice Shirt True to Photo,1344902400
2,B000W4IGHS,"[0, 0]",5.0,Got this as a surprise anniversary gift. It is...,"06 20, 2014",A38JGKFVRJRK1,"M. Rusk ""St.PatsDayFan""",Hubby loves it,1403222400
3,B003DIPCC2,"[0, 1]",5.0,el reloj esta bastante bien para el precio lo ...,"07 23, 2012",A1Q2TG5RLK66QL,Rodrigo,Buen producto,1343001600
4,B001VEIDBW,"[2, 2]",5.0,A perfect length and they stay up (unlike most...,"08 15, 2012",ABAPF3XY1W6JX,TucsonShopper,Rare decent socks,1344988800
5,B008J9Z310,"[0, 0]",5.0,"These glasses are well made, they came in a ni...","01 15, 2014",A3J75TDL94UHM3,"Lisa D. Sample-Page ""Leesa =)""",Lovely Eyewear,1389744000
6,B000OCSJN4,"[0, 0]",5.0,These are super comfortable Birks. Perfect for...,"01 12, 2013",A1S23OIVOLZ1HS,Mike P,Best Birks,1357948800
7,B0059DSSEI,"[0, 0]",4.0,Fit is great and I love the color! Not as warm...,"03 9, 2014",A2FJI1URV5ABLD,authorinak,love it!,1394323200
8,B000T9VK56,"[0, 0]",2.0,some of these parts are really cheap and pract...,"02 7, 2014",A2NVNMLC0AFL08,Environmentally Conscious Bargain Shopper,really cheap stuff. much of it barely works. ...,1391731200
9,B00DV19WBS,"[0, 0]",5.0,this robe is so soft and fluffy and comfortabl...,"02 21, 2014",A3LY749Q2Q3W7S,jessi,loooove,1392940800


In [17]:
print("Unique Items: {items}".format(items=reviews.select("asin").distinct().count()))
print("Reviews per Item:")
reviews.groupBy("asin").count().select("count").describe().show()

Unique Items: 23033
Reviews per Item:
+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|             23033|
|   mean|12.099031823904832|
| stddev|13.968255188902537|
|    min|                 5|
|    max|               441|
+-------+------------------+



In [6]:
gold_sent = spark.read\
                .option("inferSchema","true")\
                .option("sep","\t")\
                .csv("../data/input/effectwordnet/goldStandard.tff")\
                .toDF("synset_offset", "sent_type", "synset", "gloss")

In [7]:
rest_sent = spark.read\
                .option("inferSchema","true")\
                .option("sep","\t")\
                .csv("../data/input/effectwordnet/EffectWordNet.tff")\
                .toDF("synset_offset", "sent_type", "synset", "gloss")

### ETL positive and negative words

In [9]:
gold_words = gold_sent.select("sent_type", F.explode(F.split("synset",",")).alias("word"))
gold_negative_words = gold_words.filter("sent_type = '-Effect'").select("word")
gold_positive_words = gold_words.filter("sent_type = '+Effect'").select("word")

rest_words = rest_sent.select("sent_type", F.explode(F.split("synset",",")).alias("word"))
rest_negative_words = rest_words.filter("sent_type = '-Effect'").select("word")
rest_positive_words = rest_words.filter("sent_type = '+Effect'").select("word")

negative_words = gold_negative_words.union(rest_negative_words).distinct()
positive_words = gold_positive_words.union(rest_positive_words).distinct()

In [29]:
negative_words.toPandas().to_csv("../data/input/negative_words.csv",index=False,header=False)
positive_words.toPandas().to_csv("../data/input/positive_words.csv",index=False,header=False)

### Spark NLP Estimators and Transformers

In [30]:
document_assembler = DocumentAssembler() \
            .setInputCol("reviewText")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
    
tokenizer = Tokenizer() \
            .setInputCols(["sentence"]) \
            .setOutputCol("token")
        
normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normal")
        
spell_checker = NorvigSweetingApproach() \
            .setInputCols(["normal"]) \
            .setOutputCol("spell")
        
sentiment_detector = ViveknSentimentApproach() \
    .setInputCols(["spell", "sentence"]) \
    .setOutputCol("sentiment") \
    .setPruneCorpus(0) \
    .setPositiveSource("../data/input/positive_words.csv") \
    .setNegativeSource("../data/input/negative_words.csv")
        
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeKeys(True)
        
pipeline = Pipeline(stages=[
    document_assembler,
    sentence_detector,
    tokenizer,
    normalizer,
    spell_checker,
    sentiment_detector,
    finisher
])

### Run Estimators and Transformers to train and apply model respectively

In [31]:
start = time.time()
sentiment_data = pipeline.fit(reviews).transform(reviews.sort(F.rand()).limit(1000))
end = time.time()
print("Time elapsed pipeline process: " + str(end - start))

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+--------------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|  finished_sentiment|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+--------------------+
|0000031887| [0, 0]|    5.0|This is a great t...|02 12, 2011|A1KLRMWW2FWPL4|Amazon Customer "...|Great tutu-  not ...|    1297468800|result->positive@...|
|0000031887| [0, 0]|    5.0|I bought this for...|01 19, 2013|A2G5TCU2WDFZ65|     Amazon Customer|         Very Cute!!|    1358553600|result->positive@...|
|0000031887| [0, 0]|    5.0|What can I say......| 01 4, 2013|A1RLQXYNCMWRWN|              Carola|I have buy more t...|    1357257600|result->negative@...|
|0000031887| [0, 0]|    5.0|We bought several...|04 27, 2014| A8U3FAMS

In [11]:
sentimentalized = spark.read.parquet("../data/output/sentimentalized_10k/")
sentimentalized.createOrReplaceTempView("sentimentalized")

### Confirm sample is representative of population

In [12]:
%%sql_display
SELECT overall, COUNT(1), ROUND(COUNT(1)/(SELECT COUNT(1) FROM sentimentalized),2) AS pct_total
FROM sentimentalized
GROUP BY overall
ORDER BY overall ASC

Unnamed: 0,overall,count(1),pct_total
0,1.0,441,0.04
1,2.0,534,0.05
2,3.0,1058,0.11
3,4.0,2100,0.21
4,5.0,5867,0.59


In [76]:
%%sql_display
SELECT overall, COUNT(1), ROUND(COUNT(1)/(SELECT COUNT(1) FROM reviews),2) AS pct_total
FROM reviews
GROUP BY overall
ORDER BY overall ASC

Unnamed: 0,overall,count(1),pct_total
0,1.0,11192,0.04
1,2.0,15463,0.06
2,3.0,30425,0.11
3,4.0,58357,0.21
4,5.0,163240,0.59


In [37]:
from pyspark.sql.functions import udf

Given an array of sentence sentiments, calculate ratio of positivity

In [38]:
@udf
def sentiment_ratio(sentiment_result_array): 
    positives = [s for s in sentiment_result_array if s == 'positive']
    sentiment_count = len(sentiment_result_array)
    return round(len(positives)/sentiment_count,1)

#### Break down sentence sentiments by ratio of positivity

In [61]:
sentenced_to_sentimentalization = sentimentalized.select(
    F.col("overall"), 
    F.split(
        F.regexp_replace("finished_sentiment", "result->", "")
    , "@").alias("split_sentiments")
)\
.withColumn("num_sentences", F.size("split_sentiments"))\
.withColumn("sentiment_ratio", sentiment_ratio(F.col("split_sentiments")))\
.withColumn("first_sentiment", F.expr("split_sentiments[0]"))\
.withColumn("last_sentiment", F.expr("split_sentiments[num_sentences-1]"))

In [63]:
sentenced_to_sentimentalization.createOrReplaceTempView("sentencing")

In [73]:
%%sql_display
SELECT overall, AVG(sentiment_ratio) AS avg_sent_ratio, AVG(num_sentences)
FROM sentencing
GROUP BY overall
ORDER BY overall

Unnamed: 0,overall,avg_sent_ratio,avg(num_sentences)
0,1.0,0.590703,4.519274
1,2.0,0.615918,4.507491
2,3.0,0.639225,3.943289
3,4.0,0.67419,4.12619
4,5.0,0.720112,4.13022
