In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType
import nltk as nltk
import nltk as nltk.data
import numpy as np

spark = SparkSession \
    .builder \
    .appName("Overall Summary Statistic") \
    .getOrCreate()

data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
allData = spark.read.csv(data, header=True, sep='\t')

allData = allData.select(allData.customer_id, allData.product_id, allData.star_rating, allData.review_id, allData.review_body)


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1558495087812_0006,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
totalReviews = allData.count()
print('the total number of reviews is', totalReviews)

VBox()

the total number of reviews is 4751577

In [3]:
totalUsers = allData.select('customer_id').dropDuplicates().count()
print('the number of unique users is', totalUsers)

VBox()

the number of unique users is 1940732

In [4]:
totalProducts = allData.select('product_id').dropDuplicates().count()
print('the number of unique products is', totalProducts)

VBox()

the number of unique products is 782326

In [5]:
reviewsUser = allData.groupBy('customer_id')
reviewsUser = reviewsUser.agg({'review_id':'count'})
# in descending order
orderReviewsData = reviewsUser.orderBy(-reviewsUser['count(review_id)'])

# the largest number of reviews published by a single user
# user_id & largest number of reviews
print('the largest number of reviews published by a single user')
orderReviewsData.show(1)

VBox()

the largest number of reviews published by a single user
+-----------+----------------+
|customer_id|count(review_id)|
+-----------+----------------+
|   50736950|            7168|
+-----------+----------------+
only showing top 1 row

In [6]:
# the top 10 users ranked by the number of reviews they publish
print('the top 10 users ranked by the number of reviews they publish')
top10Users = orderReviewsData.select('customer_id').show(10)


VBox()

the top 10 users ranked by the number of reviews they publish
+-----------+
|customer_id|
+-----------+
|   50736950|
|   38214553|
|   51184997|
|   18116317|
|   23267387|
|   50345651|
|   14539589|
|   15725862|
|   19380211|
|   20018062|
+-----------+
only showing top 10 rows

In [7]:
# the median number of reviews published by a user
medianReviews = orderReviewsData.approxQuantile("count(review_id)",[0.5],0)
print('the median number of reviews published by a user is', medianReviews[0])


VBox()

the median number of reviews published by a user is 1.0

In [8]:
reviewsProd = allData.groupBy('product_id')
reviewsProd = reviewsProd.agg({'review_id':'count'})
orderReviewsProdData = reviewsProd.orderBy(-reviewsProd['count(review_id)'])


VBox()

In [9]:
# the largest number of reviews written for a single product
# product_id & largest number of reviews
print('the largest number of reviews written for a single product is')
largestReviewsProdHead = orderReviewsProdData.show(1)

VBox()

the largest number of reviews written for a single product is
+----------+----------------+
|product_id|count(review_id)|
+----------+----------------+
|B00008OWZG|            3936|
+----------+----------------+
only showing top 1 row

In [10]:
# the top 10 products ranked by the number of reviews they have
print('the top 10 products ranked by the number of reviews they have')
top10Prod = orderReviewsProdData.select('product_id').show(10)

VBox()

the top 10 products ranked by the number of reviews they have
+----------+
|product_id|
+----------+
|B00008OWZG|
|B0000AGWEC|
|B00MIA0KGY|
|B00NEJ7MMI|
|B000089RVX|
|B004EBT5CU|
|B0026P3G12|
|B00009PRZF|
|B00004XONN|
|B00006J6VG|
+----------+
only showing top 10 rows

In [11]:
# the median number of reviews a product has
medianReviewsProd = orderReviewsProdData.approxQuantile("count(review_id)",[0.5],0)
print('the median number of reviews a product has is', medianReviewsProd[0])

print('part1 done')

VBox()

the median number of reviews a product has is 2.0
part1 done

In [12]:
#In stage1, the median number has been calculate, to convenient, we use it directly
user = reviewsUser.filter(reviewsUser['count(review_id)'] > 1.0)
product = reviewsProd.filter(reviewsProd['count(review_id)'] > 2.0)
filtData = allData.join(user, "customer_id").select(allData.customer_id, allData.product_id, allData.review_body)
filtAll = filtData.join(product, "product_id").select(allData.customer_id, allData.product_id, allData.review_body)
#filtAll.show(10)

VBox()

In [13]:
#nltk
# seprerate the sentences
# return in sequence is: customer_id,  product_id, review_body, seg_sentences, sentences_count
# in which key is customer_id
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

def crop_sentences(data):
    string = str(data[2])
    sentences = tokenizer.tokenize(string)
    if sentences[-1] == '.' or sentences[-1] == '!' or sentences[-1] == '?':
        sentences.pop()
    sentences_count = len(sentences)
    return ((data[0] ), (data[1], data[2], sentences, sentences_count))

rddData = filtAll.rdd
# filter whose sentences less than two
maped = rddData.map(crop_sentences).filter(lambda x:x[1][3]>=2)
user_maped = maped.groupByKey()


def sort_by_sentences(data):
    data = list(data)
    count = [data[i][3] for i in range(len(data))]
    median = int(np.median(count))
    return median

VBox()

In [14]:
# top 10 users ranked by median number of sentences in the reviews they have published
sorted_maped = user_maped.mapValues(sort_by_sentences)
#sorted_maped.take(10)

VBox()

In [15]:
sorted_maped = sorted_maped.sortBy(lambda x:x[1], ascending=False)
sorted_maped.take(10)

VBox()

[('25628286', 234), ('37118941', 227), ('51865782', 226), ('29580246', 201), ('50595705', 191), ('17821650', 183), ('43879820', 180), ('15585529', 177), ('23717536', 157), ('46097534', 154)]
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 60312)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip

In [16]:
# product_id,  customer_id, review_body, seg_sentences, sentences_count
def transform_map(data):
    return ((data[1][0]), (data[0], data[1][1], data[1][2], data[1][3]))

VBox()

In [17]:
# top 10 products ranked by median number of sentences in the reviews they have received
product_maped = maped.map(transform_map)
product_maped = product_maped.groupByKey()
sorted_product_maped = product_maped.mapValues(sort_by_sentences)
sorted_product_maped = sorted_product_maped.sortBy(lambda x:x[1], ascending=False)
sorted_product_maped.take(10)

VBox()

[('B00LTQ5EVY', 984), ('B009SF2GZU', 321), ('B000003G29', 267), ('B00T7TYTCK', 252), ('B000BCH5PK', 209), ('B0000C0FEW', 200), ('B00AP5M4WM', 160), ('B009SF2IRG', 157), ('B008LA8E9K', 149), ('B005ZHBBU6', 147)]