In [1]:
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col,sum

import os
import json
import gzip
from urllib.request import urlopen

In [2]:
spark = SparkSession.builder.appName('App-Review').getOrCreate()

In [3]:
#!wget http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Grocery_and_Gourmet_Food.json.gz

--2020-04-29 01:38:53--  http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Grocery_and_Gourmet_Food.json.gz
Resolving deepyeti.ucsd.edu (deepyeti.ucsd.edu)... 169.228.63.50
Connecting to deepyeti.ucsd.edu (deepyeti.ucsd.edu)|169.228.63.50|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 586910426 (560M) [application/octet-stream]
Saving to: ‘Grocery_and_Gourmet_Food.json.gz’


2020-04-29 01:40:37 (5.39 MB/s) - ‘Grocery_and_Gourmet_Food.json.gz’ saved [586910426/586910426]



In [35]:
df = spark.read.json("Grocery_and_Gourmet_Food.json.gz")

In [36]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Design:: string (nullable = true)
 |    |-- Display Height:: string (nullable = true)
 |    |-- Edition:: string (nullable = true)
 |    |-- Flavor Name:: string (nullable = true)
 |    |-- Flavor:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Item Display Weight:: string (nullable = true)
 |    |-- Item Package Quantity:: string (nullable = true)
 |    |-- Material Type:: string (nullable = true)
 |    |-- Material:: string (nullable = true)
 |    |-- Number of Items:: string (nullable = true)
 |    |-- Package Quantity:: string (nulla

In [37]:
df.head()

Row(asin='1888861614', image=None, overall=5.0, reviewText='Very pleased with my purchase. Looks exactly like the picture and will look great on my cake. It definitely will sparkle.', reviewTime='06 4, 2013', reviewerID='ALP49FBWT4I7V', reviewerName='Lori', style=None, summary='Love it', unixReviewTime=1370304000, verified=True, vote=None)

In [38]:
df.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------------+-----+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------------+-----+--------------------+--------------+--------+----+
|1888861614| null|    5.0|Very pleased with...| 06 4, 2013| ALP49FBWT4I7V|               Lori| null|             Love it|    1370304000|    true|null|
|1888861614| null|    4.0|Very nicely craft...|05 23, 2014|A1KPIZOCLB9FZ8|         BK Shopper| null|      Nice but small|    1400803200|    true|null|
|1888861614| null|    4.0|still very pretty...| 05 9, 2014|A2W0FA06IYAYQE|     daninethequeen| null|the "s" looks lik...|    1399593600|    true|null|
|1888861614| null|    5.0|I got this for ou...|04 20, 2014|A2PTZTCH2QUYBC|            Tammara|

In [39]:
df_dataset = df.limit(3)
df_dataset.toPandas()

Unnamed: 0,asin,image,overall,reviewText,reviewTime,reviewerID,reviewerName,style,summary,unixReviewTime,verified,vote
0,1888861614,,5.0,Very pleased with my purchase. Looks exactly l...,"06 4, 2013",ALP49FBWT4I7V,Lori,,Love it,1370304000,True,
1,1888861614,,4.0,Very nicely crafted but too small. Am going to...,"05 23, 2014",A1KPIZOCLB9FZ8,BK Shopper,,Nice but small,1400803200,True,
2,1888861614,,4.0,still very pretty and well made...i am super p...,"05 9, 2014",A2W0FA06IYAYQE,daninethequeen,,"the ""s"" looks like a 5, kina",1399593600,True,


In [8]:
df_describe = df.describe().toPandas().transpose()

In [9]:
df_describe

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
asin,5074160,7.235929823505674E9,2.5676460997660003E9,0681727810,B01HJHSVG6
overall,5074160,4.314708247276396,1.2493033560071378,1.0,5.0
reviewText,5071277,6.94467343837058E9,9.984502391787317E10,\n\n\n\n\n\nI,~~~I love this!!! I just don't like the liquid...
reviewTime,5074160,,,"01 1, 2004","12 9, 2017"
reviewerID,5074160,,,A000013090ZI3HIT9N5V,AZZZYAYJQSDOJ
reviewerName,5073805,Infinity,,,
summary,5072833,1.1431184167809786E87,1.1879635061594103E88,,
unixReviewTime,5074160,1.446592262935343E9,6.227839116083965E7,961372800,1538870400
vote,659472,6.160340246646972,16.67664117913327,1023,99


In [10]:
df.select("reviewText").distinct().show()

+--------------------+
|          reviewText|
+--------------------+
|"Delicious" is in...|
|"Draught of the L...|
|"Fudge Drizzled C...|
|"Healthy and deli...|
|"I think this is ...|
|"Mentos Watermelo...|
|"Nielsen-Massey" ...|
|"Rich Tea" are th...|
|$10.95 per pound ...|
|$17.99 for one pa...|
|$3ea for a tiny p...|
|$4 for 6 tiny ste...|
|$42 for a few oun...|
|$50.00 for a 16 o...|
|'The cake topper ...|
|(As per medical r...|
|(Janelle here)  t...|
|(My daughter is w...|
|*** I RAN OUT OF ...|
|****

Celestial S...|
+--------------------+
only showing top 20 rows



In [40]:
df.groupBy ('overall').count().show()

+-------+-------+
|overall|  count|
+-------+-------+
|    1.0| 405330|
|    4.0| 553201|
|    3.0| 322134|
|    2.0| 219497|
|    5.0|3573998|
+-------+-------+



In [13]:
!pip install gensim

Collecting gensim
  Downloading gensim-3.8.2-cp37-cp37m-manylinux1_x86_64.whl (24.2 MB)
[K     |████████████████████████████████| 24.2 MB 51 kB/s  eta 0:00:01
Collecting smart-open>=1.8.1
  Downloading smart_open-2.0.0.tar.gz (103 kB)
[K     |████████████████████████████████| 103 kB 46.4 MB/s eta 0:00:01
Collecting boto
  Downloading boto-2.49.0-py2.py3-none-any.whl (1.4 MB)
[K     |████████████████████████████████| 1.4 MB 39.4 MB/s eta 0:00:01
[?25hCollecting boto3
  Downloading boto3-1.12.48-py2.py3-none-any.whl (128 kB)
[K     |████████████████████████████████| 128 kB 39.5 MB/s eta 0:00:01
Collecting s3transfer<0.4.0,>=0.3.0
  Downloading s3transfer-0.3.3-py2.py3-none-any.whl (69 kB)
[K     |████████████████████████████████| 69 kB 4.8 MB/s  eta 0:00:01
[?25hCollecting botocore<1.16.0,>=1.15.48
  Downloading botocore-1.15.48-py2.py3-none-any.whl (6.1 MB)
[K     |████████████████████████████████| 6.1 MB 37.5 MB/s eta 0:00:01
[?25hCollecting jmespath<1.0.0,>=0.7.1
  Downloadin

In [41]:
import gensim.parsing.preprocessing as gsp
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from gensim import utils


filters = [
           gsp.strip_tags, 
           gsp.strip_punctuation,
           gsp.strip_multiple_whitespaces,
           gsp.strip_numeric,
           gsp.remove_stopwords, 
           gsp.strip_short, 
           gsp.stem_text
          ]

def clean_text(x):
    s = x[1]
    s = s.lower()
    s = utils.to_unicode(s)
    for f in filters:
        s = f(s)
    return (x[0],s)

In [42]:
#data=df.select(df.columns[2:4])
data = df.rdd\
    .map(lambda x: (x["overall"], x["reviewText"]))\
    .toDF(["overall", "reviewText"])

data.show()

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    5.0|Very pleased with...|
|    4.0|Very nicely craft...|
|    4.0|still very pretty...|
|    5.0|I got this for ou...|
|    4.0|It was just what ...|
|    1.0|The S is beautifu...|
|    5.0|Omg.. The S was i...|
|    3.0|It was a nice siz...|
|    5.0|Perfect!!! Can no...|
|    5.0|This was exactly ...|
|    5.0|This arrived in t...|
|    5.0| No adverse comment.|
|    5.0|These are hard to...|
|    5.0|Best black tea in...|
|    5.0|if you like stron...|
|    5.0|I first tasted th...|
|    5.0|Truly the finest ...|
|    4.0|Tried this while ...|
|    5.0|We first came acr...|
|    5.0|I first tasted it...|
+-------+--------------------+
only showing top 20 rows



In [43]:
data.take(1)[0][1]

'Very pleased with my purchase. Looks exactly like the picture and will look great on my cake. It definitely will sparkle.'

In [44]:
clean_text(data.take(1)[0])[1]

'pleas purchas look exactli like pictur look great cake definit sparkl'

In [45]:
cleaned_rdd = data.rdd.map(lambda x : clean_text(x))
cleaned_df = cleaned_rdd.toDF()
cleaned_df.show()

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|5.0|pleas purchas loo...|
|4.0|nice craft small ...|
|4.0|pretti super pick...|
|5.0|got wed cake pers...|
|4.0|want wed cake lov...|
|1.0|beauti checkout i...|
|5.0|omg inexpens exac...|
|3.0|nice size cake to...|
|5.0|perfect wait us w...|
|5.0|exactli look cake...|
|5.0|arriv mail packag...|
|5.0|      advers comment|
|5.0|hard local amazon...|
|5.0|best black tea hi...|
|5.0|like strong flavo...|
|5.0|tast tea far east...|
|5.0|truli finest tea ...|
|4.0|tri oversea year ...|
|5.0|came lipton yello...|
|5.0|tast caraca busi ...|
+---+--------------------+
only showing top 20 rows



## Working in ML pipeline

In [47]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="_2", outputCol="tokens")
w2v = Word2Vec(vectorSize=1000, minCount=0, inputCol="tokens", outputCol="features")
doc2vec_pipeline = Pipeline(stages=[tokenizer,w2v])
doc2vec_model = doc2vec_pipeline.fit(cleaned_df)
doc2vecs_df = doc2vec_model.transform(cleaned_df)

Py4JJavaError: An error occurred while calling o299.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 53.0 failed 1 times, most recent failure: Lost task 0.0 in stage 53.0 (TID 434, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-45-ec41a6c0871d>", line 1, in <lambda>
  File "<ipython-input-41-9271e8cecbb4>", line 19, in clean_text
AttributeError: 'NoneType' object has no attribute 'lower'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:196)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:311)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:186)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:126)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 400, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-45-ec41a6c0871d>", line 1, in <lambda>
  File "<ipython-input-41-9271e8cecbb4>", line 19, in clean_text
AttributeError: 'NoneType' object has no attribute 'lower'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
