In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-2.2.1.tar.gz (188.2MB)
[K    100% |████████████████████████████████| 188.2MB 4.3kB/s eta 0:00:01  7% |██▍                             | 14.3MB 17.7MB/s eta 0:00:10    13% |████▏                           | 24.6MB 11.2MB/s eta 0:00:15    14% |████▊                           | 27.5MB 9.5MB/s eta 0:00:17    20% |██████▌                         | 38.1MB 12.9MB/s eta 0:00:12    21% |██████▉                         | 40.3MB 12.7MB/s eta 0:00:12    23% |███████▋                        | 44.8MB 16.7MB/s eta 0:00:09    32% |██████████▍                     | 61.3MB 9.4MB/s eta 0:00:14    32% |██████████▌                     | 61.8MB 9.9MB/s eta 0:00:13    43% |██████████████                  | 81.8MB 16.4MB/s eta 0:00:07    55% |█████████████████▊              | 103.9MB 20.8MB/s eta 0:00:05    58% |██████████████████▊             | 110.4MB 15.4MB/s eta 0:00:06    59% |███████████████████▏            | 112.9MB 9.0MB/s eta 0:00:09    60% |█████████████████

In [2]:
!wget --no-verbose http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Clothing_Shoes_and_Jewelry_5.json.gz

2018-02-20 03:42:29 URL:http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Clothing_Shoes_and_Jewelry_5.json.gz [47285200/47285200] -> "reviews_Clothing_Shoes_and_Jewelry_5.json.gz" [1]


+/- Effect Lexicon http://mpqa.cs.pitt.edu/lexicons/effect_lexicon/

In [22]:
!wget --no-verbose http://mpqa.cs.pitt.edu/data/effectwordnet.zip

2018-02-20 04:30:50 URL:http://mpqa.cs.pitt.edu/data/effectwordnet.zip [528829/528829] -> "effectwordnet.zip" [1]


In [23]:
!unzip effectwordnet.zip

Archive:  effectwordnet.zip
   creating: effectwordnet/
  inflating: effectwordnet/EffectWordNet.tff  
  inflating: effectwordnet/goldStandard.tff  
  inflating: effectwordnet/README    


### Register special SQL magics

In [3]:
from IPython.core.magic import register_line_cell_magic

# https://github.com/LucaCanali/Miscellaneous/blob/master/Pyspark_SQL_Magic_Jupyter/IPython_Pyspark_SQL_Magic.ipynb
# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return spark.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return spark.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return spark.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return spark.sql(val).explain(detailed_explain)

### Build a SparkSession, the gateway to everything in Spark (2.x)

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import numpy as np
import time
import plotly
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode(connected=True)

In [5]:
spark = SparkSession.builder\
            .appName(name="PySpark NLP")\
            .master("local[*]")\
            .config("spark.jars.packages","JohnSnowLabs:spark-nlp:1.4.0")\
            .getOrCreate()

In [6]:
from pyspark.ml import Pipeline, PipelineModel
from sparknlp.annotator import *
from sparknlp.base import DocumentAssembler, Finisher

In [13]:
reviews = spark.read.json("reviews_Clothing_Shoes_and_Jewelry_5.json.gz")

In [18]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



#### FAST is GOOD!

In [15]:
reviews.cache().count()

278677

In [16]:
reviews.createOrReplaceTempView("reviews")

In [16]:
%%sql_display
SELECT * 
FROM reviews
ORDER BY RAND()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,B0057D57NK,"[0, 0]",5.0,This is pretty! The color was slightly differe...,"02 26, 2014",A2PTDSWNW0OMGH,MansonGirl,Nice!,1393372800
1,B000PHI5L4,"[6, 7]",4.0,"I ordered the 3XL and despite my size, there i...","08 14, 2012",AKD481B2E5V8L,"P. Klein ""librarian56""",Nice Shirt True to Photo,1344902400
2,B000W4IGHS,"[0, 0]",5.0,Got this as a surprise anniversary gift. It is...,"06 20, 2014",A38JGKFVRJRK1,"M. Rusk ""St.PatsDayFan""",Hubby loves it,1403222400
3,B003DIPCC2,"[0, 1]",5.0,el reloj esta bastante bien para el precio lo ...,"07 23, 2012",A1Q2TG5RLK66QL,Rodrigo,Buen producto,1343001600
4,B001VEIDBW,"[2, 2]",5.0,A perfect length and they stay up (unlike most...,"08 15, 2012",ABAPF3XY1W6JX,TucsonShopper,Rare decent socks,1344988800
5,B008J9Z310,"[0, 0]",5.0,"These glasses are well made, they came in a ni...","01 15, 2014",A3J75TDL94UHM3,"Lisa D. Sample-Page ""Leesa =)""",Lovely Eyewear,1389744000
6,B000OCSJN4,"[0, 0]",5.0,These are super comfortable Birks. Perfect for...,"01 12, 2013",A1S23OIVOLZ1HS,Mike P,Best Birks,1357948800
7,B0059DSSEI,"[0, 0]",4.0,Fit is great and I love the color! Not as warm...,"03 9, 2014",A2FJI1URV5ABLD,authorinak,love it!,1394323200
8,B000T9VK56,"[0, 0]",2.0,some of these parts are really cheap and pract...,"02 7, 2014",A2NVNMLC0AFL08,Environmentally Conscious Bargain Shopper,really cheap stuff. much of it barely works. ...,1391731200
9,B00DV19WBS,"[0, 0]",5.0,this robe is so soft and fluffy and comfortabl...,"02 21, 2014",A3LY749Q2Q3W7S,jessi,loooove,1392940800


In [17]:
print("Unique Items: {items}".format(items=reviews.select("asin").distinct().count()))
print("Reviews per Item:")
reviews.groupBy("asin").count().select("count").describe().show()

Unique Items: 23033
Reviews per Item:
+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|             23033|
|   mean|12.099031823904832|
| stddev|13.968255188902537|
|    min|                 5|
|    max|               441|
+-------+------------------+



In [7]:
gold_sent = spark.read\
                .option("inferSchema","true")\
                .option("sep","\t")\
                .csv("effectwordnet/goldStandard.tff")\
                .toDF("synset_offset", "sent_type", "synset", "gloss")

In [24]:
rest_sent = spark.read\
                .option("inferSchema","true")\
                .option("sep","\t")\
                .csv("effectwordnet/EffectWordNet.tff")\
                .toDF("synset_offset", "sent_type", "synset", "gloss")

In [25]:
gold_words = gold_sent.select("sent_type", F.explode(F.split("synset",",")).alias("word"))
gold_negative_words = gold_words.filter("sent_type = '-Effect'").select("word")
gold_positive_words = gold_words.filter("sent_type = '+Effect'").select("word")

rest_words = rest_sent.select("sent_type", F.explode(F.split("synset",",")).alias("word"))
rest_negative_words = rest_words.filter("sent_type = '-Effect'").select("word")
rest_positive_words = rest_words.filter("sent_type = '+Effect'").select("word")

negative_words = gold_negative_words.union(rest_negative_words).distinct()
positive_words = gold_positive_words.union(rest_positive_words).distinct()

In [28]:
negative_words.count()

3582

In [29]:
negative_words.toPandas().to_csv("negative_words.csv",index=False,header=False)
positive_words.toPandas().to_csv("positive_words.csv",index=False,header=False)

In [30]:
document_assembler = DocumentAssembler() \
            .setInputCol("reviewText")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")
    
tokenizer = Tokenizer() \
            .setInputCols(["sentence"]) \
            .setOutputCol("token")
        
normalizer = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("normal")
        
spell_checker = NorvigSweetingApproach() \
            .setInputCols(["normal"]) \
            .setOutputCol("spell")
        
sentiment_detector = ViveknSentimentApproach() \
    .setInputCols(["spell", "sentence"]) \
    .setOutputCol("sentiment") \
    .setPruneCorpus(0) \
    .setPositiveSource("positive_words.csv") \
    .setNegativeSource("negative_words.csv")
        
finisher = Finisher() \
    .setInputCols(["sentiment"]) \
    .setIncludeKeys(True)
        
pipeline = Pipeline(stages=[
    document_assembler,
    sentence_detector,
    tokenizer,
    normalizer,
    spell_checker,
    sentiment_detector,
    finisher
])

In [31]:
start = time.time()
sentiment_data = pipeline.fit(reviews).transform(reviews)
sentiment_data.show()
end = time.time()
print("Time elapsed pipeline process: " + str(end - start))

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+--------------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|  finished_sentiment|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+--------------------+
|0000031887| [0, 0]|    5.0|This is a great t...|02 12, 2011|A1KLRMWW2FWPL4|Amazon Customer "...|Great tutu-  not ...|    1297468800|result->positive@...|
|0000031887| [0, 0]|    5.0|I bought this for...|01 19, 2013|A2G5TCU2WDFZ65|     Amazon Customer|         Very Cute!!|    1358553600|result->positive@...|
|0000031887| [0, 0]|    5.0|What can I say......| 01 4, 2013|A1RLQXYNCMWRWN|              Carola|I have buy more t...|    1357257600|result->negative@...|
|0000031887| [0, 0]|    5.0|We bought several...|04 27, 2014| A8U3FAMS

In [32]:
sentiment_data.select("overall", "finished_sentiment", "reviewText").show(100, False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
sentiment_data.cache().count()

In [None]:
!free -m