In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec
import re

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1558519364302_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
#STAGE 1

# Create a SparkSession
spark = SparkSession.builder.appName("MusicDataAnalysis").getOrCreate()

#Load data from S3
music_data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
musics = spark.read.csv(music_data,header=True,sep='\t')
musics = musics.drop('marketplace','product_parent','product_title','product_category','helpful_votes','total_votes','vine','verified_purchase','review_headline','review_date').cache()

VBox()

In [3]:
#Number of items in the dataset = total count of the reviews
musics.count()

VBox()

4751577

In [4]:
#Count unique customer and product id
musics.select("customer_id").distinct().count()

VBox()

1940732

In [5]:
musics.select("product_id").distinct().count()

VBox()

782326

In [6]:
#Largest number of reviews by a single user
musics.groupby('customer_id').count().sort(f.col("count"), ascending = False).select("count").limit(1).show()

VBox()

+-----+
|count|
+-----+
| 7168|
+-----+

In [7]:
#Top 10 users ranked by reviews
musics.groupby('customer_id').count().sort(f.col("count"), ascending = False).select("customer_id").limit(10).show()

#Median number of review for a user
count_list_user = musics.groupby('customer_id').count().sort(f.col("count"), ascending = False).select("count").selectExpr("count as _count")
count_array_user = [int(row._count) for row in count_list_user.collect()]
median_user = np.median(count_array_user)
print(median_user)

VBox()

+-----------+
|customer_id|
+-----------+
|   50736950|
|   38214553|
|   51184997|
|   18116317|
|   23267387|
|   50345651|
|   14539589|
|   15725862|
|   19380211|
|   20018062|
+-----------+

1.0

In [8]:
#Largest number of reviews for a single product
musics.groupby('product_id').count().sort(f.col("count"), ascending = False).select("count").limit(1).show()

VBox()

+-----+
|count|
+-----+
| 3936|
+-----+

In [9]:
#Top 10 products ranked by reviews
musics.groupby('product_id').count().sort(f.col("count"), ascending = False).select("product_id").limit(10).show()

#Median number of review for a product
count_list_product = musics.groupby('product_id').count().sort(f.col("count"), ascending = False).select("count").selectExpr("count as _count")
count_array_product = [int(row._count) for row in count_list_product.collect()]
median_product = np.median(count_array_product)
print(median_product)

VBox()

+----------+
|product_id|
+----------+
|B00008OWZG|
|B0000AGWEC|
|B00MIA0KGY|
|B00NEJ7MMI|
|B000089RVX|
|B004EBT5CU|
|B0026P3G12|
|B00009PRZF|
|B00004XONN|
|B00006J6VG|
+----------+

2.0

In [10]:
#STAGE 2

#split review_body
def splitreviews(x):
    try:
        if x !=None:
            x = re.split("[.?!]",x)
        new_x = []
        for i in range(len(x)):
            if x[i] != '':
                new_x.append(x[i])
    except:
        pass
    return new_x
#         return x
filter_udf = f.udf(lambda y: splitreviews(y), ArrayType(StringType()))
music_f = musics.withColumn("review_body", filter_udf("review_body"))
# music_f.show()

#Filter length < 2
givenlength = 2
music_s = music_f.filter(f.size(music_f.review_body) >= givenlength).cache()
# music_s.show()
# music_s.select('review_body').show()

VBox()

In [11]:
#Filter by customer id and product id
c_group = music_s.groupby('customer_id').count().sort(f.col("count"), ascending = False)
c_group.withColumn("count", c_group["count"].cast(IntegerType()))
c_group_filter = c_group.filter(f.col("count")> median_user).sort(f.col('count'))
music_filter_c = music_s.join(c_group_filter, ["customer_id"], "leftsemi")

p_group = music_filter_c.groupby('product_id').count().sort(f.col("count"), ascending = False)
p_group.withColumn("count", p_group["count"].cast(IntegerType()))
p_group_filter = p_group.filter(f.col("count")>median_product)
music_filtered = music_filter_c.join(p_group_filter, ["product_id"], "leftsemi")

#Top 10 users ranked by median number of sentences in the reviews they have published
music_filtered_modified = music_filtered.withColumn('num_sentences', f.size("review_body"))
# music_filtered_modified.show(5)
music_filtered_modified_user = music_filtered_modified.groupby('customer_id').agg(f.collect_list("num_sentences").alias("list_num_sentences"))
# music_filtered_modified_user.show(5)

def find_median(values_list):
    try:
        median = np.median(values_list)
        return int(median)
    except Exception:
        return None

median_finder = f.udf(find_median, IntegerType())

music_filtered_modified_user2 = music_filtered_modified_user.withColumn('median', median_finder("list_num_sentences"))
# music_filtered_modified3.sort(f.col("median"), ascending = False).limit(10).show()
music_filtered_modified_user2.sort(f.col("median"), ascending = False).select("customer_id").limit(10).show()

# Top 10 products ranked by median number of sentences in the reviews they have received
music_filtered_modified_product = music_filtered_modified.groupby('product_id').agg(f.collect_list("num_sentences").alias("list_num_sentences"))
# music_filtered_modified_product.show(5)
music_filtered_modified_product2 = music_filtered_modified_product.withColumn('median', median_finder("list_num_sentences"))
# music_filtered_modified_product2.sort(f.col("median"), ascending = False).limit(10).show()
music_filtered_modified_product2.sort(f.col("median"), ascending = False).select("product_id").limit(10).show()

VBox()

+-----------+
|customer_id|
+-----------+
|   40611822|
|   25628286|
|   51865782|
|   37118941|
|   50595705|
|   23717536|
|   43879820|
|   17821650|
|   46097534|
|   37733322|
+-----------+

+----------+
|product_id|
+----------+
|B00061NSU2|
|B003Z4Y5HW|
|B007USIWPU|
|B0000A2ZV8|
|B000000SI5|
|B00T5GY470|
|B00O1BC2FU|
|B00PA9A1AU|
|B00386FG0M|
|B005SGZBHI|
+----------+

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 46514)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    sel

In [12]:
#STAGE 3

#Choose product ID and create two class dataframe
music_product = music_filtered.filter(f.col('product_id') == 'B00006J6VG').drop('product_id','customer_id')
music_positive = music_product.filter(f.col('star_rating') > 3).cache()
music_negative = music_product.filter(f.col('star_rating') < 3).cache()
music_positive = music_positive.drop('star_rating')
music_negative = music_negative.drop('star_rating')
# music_positive.show()

#create rdd for two class
music_p_small = music_positive.limit(music_positive.count())
music_p_small_rdd = music_p_small.rdd.cache()
music_n_small = music_negative.limit(music_negative.count())
music_n_small_rdd = music_n_small.rdd.cache()

VBox()

In [13]:
#encode function
def review_encode(x):
 
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    review_list = []
    review_number = []
    result_list = []

    temp_x = [t for t in x]

    for i in temp_x:
        rid = i[0]
        r_body = i[1]
#         review_number.append(len(r_body))
        review_list += r_body
        result_list.append(i)

    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        embeddings = session.run(embed(review_list))

    
    final_result = []
    n = 0
    for l in result_list:
        rid = l[0]
        r_body = l[1]
        for i in r_body:
#         for vec in embeddings:
            vec_list = [rid, i, embeddings[n]]
            final_result.append(vec_list)
            n+=1
            
    return final_result
#     return embeddings

#encode for two class
review_embedding_p_s3 = music_p_small_rdd.mapPartitions(review_encode).cache()
# review_embedding_p_s3.count()
review_embedding_n_s3 = music_n_small_rdd.mapPartitions(review_encode).cache()
# review_embedding_n_s3.count()


VBox()

In [14]:
#create numpy array for all vectors
def return_list(x):
    vec_list = []
    for i in x:
        vec = i[2]
        vec_list.append(vec)
    return vec_list
#positive array
vec_list_p_s3 = review_embedding_p_s3.mapPartitions(return_list).cache()
# vec_list_p_s3.count()
vec_array_p_s3 = np.array(vec_list_p_s3.collect())
# print(vec_array_p_s3.shape)
#negative array
vec_list_n_s3 = review_embedding_n_s3.mapPartitions(return_list).cache()
# vec_list_n_s3.count()
vec_array_n_s3 = np.array(vec_list_n_s3.collect())
# print(vec_array_n_s3.shape)

VBox()

In [15]:
#caculate positive average distance 
def calculate_p_s3_distance(x):
#     temp_x = [t for t in x]
    dis = []
    dis_mean = []
    v1 = x
    for v in vec_array_p_s3:
        dot_value = 1 - float((np.dot(v1, v)) / (np.linalg.norm(v1)*np.linalg.norm(v)))
        dis.append(dot_value)

    a = np.mean(dis)
    dis_mean.append(a)
    return dis_mean

#caculate negative average distance
def calculate_n_s3_distance(x):
#     temp_x = [t for t in x]
    dis = []
    dis_mean = []
    v1 = x
    for v in vec_array_n_s3:
        dot_value = 1 - float((np.dot(v1, v)) / (np.linalg.norm(v1)*np.linalg.norm(v)))
        dis.append(dot_value)

    a = np.mean(dis)
    dis_mean.append(a)
    return dis_mean

#positive average distance
average_distance_p_s3 = vec_list_p_s3.map(calculate_p_s3_distance).cache()
# average_distance_p_s3.take(1)

#negative average distance
average_distance_n_s3 = vec_list_n_s3.map(calculate_n_s3_distance).cache()
# average_distance_n_s3.take(1)

#zip two rdd: (review_id, review_sentence, vector, average_distance)
#positive zip
zip_rdd_p_s3 = review_embedding_p_s3.zip(average_distance_p_s3).map(lambda x: (x[0][0],x[0][1],x[0][2],x[1][0]))
# zip_rdd_p_s3.take(5)
#negative zip
zip_rdd_n_s3 = review_embedding_n_s3.zip(average_distance_n_s3).map(lambda x: (x[0][0],x[0][1],x[0][2],x[1][0]))
# zip_rdd_n_s3.take(5)

VBox()

In [16]:
#sort and find center vector
#positive class center
zip_sort_p_s3 = zip_rdd_p_s3.sortBy(lambda x: x[-1])
p_center_list_s3 = zip_sort_p_s3.take(1)
p_center_s3 = (p_center_list_s3[0][0],p_center_list_s3[0][1])
p_center_vec_s3 = p_center_list_s3[0][2]
print(p_center_s3)
print("-----------------")
#negative class center
zip_sort_n_s3 = zip_rdd_n_s3.sortBy(lambda x: x[-1])
n_center_list_s3 = zip_sort_n_s3.take(1)
n_center_s3 = (n_center_list_s3[0][0],n_center_list_s3[0][1])
n_center_vec_s3 = n_center_list_s3[0][2]
print(n_center_s3)

VBox()

('R1YEXVPMKYXOVC', ' just an awesome song')
-----------------
('R2L4PZC7CHGQ4R', ' This album is much too insipid for people who like to listen to real music')

In [17]:
#find positive nearest 10 vector
def neareast_p_s3_10(x):
    v = x[2]
    rid = x[0]
    dis = []
    dot_value = 1 - float((np.dot(p_center_vec_s3, v)) / (np.linalg.norm(p_center_vec_s3)*np.linalg.norm(v)))

    return (rid, dot_value)

#positive neareast 10
id_dis_p_s3 = review_embedding_p_s3.map(neareast_p_s3_10)
near_10_p_s3 = id_dis_p_s3.sortBy(lambda x: x[1])
near_10_p_s3_list = near_10_p_s3.take(11)
near_10_p_s3_list = near_10_p_s3_list[1::]
for i in near_10_p_s3_list:
    print(i[0])

print("-----------------")

#find negative nearest 10 vector
def neareast_n_s3_10(x):
    v = x[2]
    rid = x[0]
    dis = []
    dot_value = 1 - float((np.dot(n_center_vec_s3, v)) / (np.linalg.norm(n_center_vec_s3)*np.linalg.norm(v)))

    return (rid, dot_value)


    
#negative neareast 10
id_dis_n_s3 = review_embedding_n_s3.map(neareast_n_s3_10)
near_10_n_s3 = id_dis_n_s3.sortBy(lambda x: x[1])
near_10_n_s3_list = near_10_n_s3.take(11)
near_10_n_s3_list = near_10_n_s3_list[1::]
for i in near_10_n_s3_list:
    print(i[0])

VBox()

R3OSCPFNN3SAAS
R2EKY5I5KJW2PB
R2H1YNHUCT31TS
RY3PCDQ80U0O8
R3OSCPFNN3SAAS
R2RZANPP1RFQ7M
R1BMO598TXFB7R
R1W2RU1V7J3IGI
R2RZANPP1RFQ7M
RM2Z9V7TKMH7P
-----------------
R36BAXRVCR2PFB
R2UBEJ5JX4PKX1
R195DYX83KWYXU
R3QG1GKJO8IL4K
R1DP63VJ4NXY44
R1DP63VJ4NXY44
R1CA8OIEZ0B22Y
RKN17VFTZQ69P
RATB9UCW9ZV0B
RKEOBZVEWY7RW

In [18]:
#STAGE 4

#preprocess function before encode
def review_encode_preprocess(x):
    review_list = []
    result_list = []
    temp_x = [t for t in x]
    for i in temp_x:
        rid = i[0]
        r_body = i[1]
        review_list += r_body
        result_list.append(i)

    final_result = []
    for l in result_list:
        rid = l[0]
        r_body = l[1]
        for i in r_body:
            if(i != " "):
                reformat_list = [rid, i]
                final_result.append(reformat_list)
    return final_result

#Preprocess the rdd from stage 3 for both positive and negative
music_p_small_preprocess = music_p_small_rdd.mapPartitions(review_encode_preprocess).cache()
# music_p_small_preprocess.take(2)
music_n_small_preprocess = music_n_small_rdd.mapPartitions(review_encode_preprocess).cache()
# music_n_small_preprocess.take(2)

#Re-formate the preprocessed rdd to formatted dataframe
music_p_preprocess_reformat = spark.createDataFrame(music_p_small_preprocess)
music_p_preprocess_reformat = music_p_preprocess_reformat.withColumnRenamed('_1', 'review_id').withColumnRenamed('_2', 'review_body')
music_p_preprocess_reformat = music_p_preprocess_reformat.withColumn('review_body', f.ltrim(music_p_preprocess_reformat.review_body))
music_p_preprocess_reformat = music_p_preprocess_reformat.withColumn('review_body', f.rtrim(music_p_preprocess_reformat.review_body))
# music_p_preprocess_reformat.show(5)
music_n_preprocess_reformat = spark.createDataFrame(music_n_small_preprocess)
music_n_preprocess_reformat = music_n_preprocess_reformat.withColumnRenamed('_1', 'review_id').withColumnRenamed('_2', 'review_body')
music_n_preprocess_reformat = music_n_preprocess_reformat.withColumn('review_body', f.ltrim(music_n_preprocess_reformat.review_body))
music_n_preprocess_reformat = music_n_preprocess_reformat.withColumn('review_body', f.rtrim(music_n_preprocess_reformat.review_body))
# music_n_preprocess_reformat.show(5)

#Doing tokenizer with regex to separate every word in review body and filter if empty list
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'review_body', outputCol = 'review_token')
music_p_preprocess_reformat_token = regexTokenizer.transform(music_p_preprocess_reformat)
music_p_preprocess_reformat_token_filter = music_p_preprocess_reformat_token.filter(f.size('review_token') > 1)
# music_p_preprocess_reformat_token_filter.show(5)
music_n_preprocess_reformat_token = regexTokenizer.transform(music_n_preprocess_reformat)
music_n_preprocess_reformat_token_filter = music_n_preprocess_reformat_token.filter(f.size('review_token') > 1)
# music_n_preprocess_reformat_token_filter.show(5)

#Create an average word vector for each sentence
word2vec = Word2Vec(vectorSize = 512, minCount = 1, inputCol = 'review_token', outputCol = 'vector')
model_p = word2vec.fit(music_p_preprocess_reformat_token_filter)
result_p = model_p.transform(music_p_preprocess_reformat_token_filter)
# result_p.count()
# result_p.show(5)
model_n = word2vec.fit(music_n_preprocess_reformat_token_filter)
result_n = model_n.transform(music_n_preprocess_reformat_token_filter)

#Drop the unwanted column and transfer to rdd type
result_p = result_p.drop('review_token')
result_p_rdd = result_p.rdd
result_n = result_n.drop('review_token')
result_n_rdd = result_n.rdd

VBox()

In [19]:
#reformat function after encode
def rdd_reformat(x):
    final_result = []
    for l in x:
        rid = l[0]
        r_body = l[1]
        r_vec = l[2].toArray().astype(np.double)
        reformat_list = [rid, r_body, r_vec]
        final_result.append(reformat_list)
    return final_result

#Re-format the rdd to review_id, review_body, vector
result_p_rdd_reformat = result_p_rdd.mapPartitions(rdd_reformat).cache()
# result_p_rdd_reformat.count()
# result_p_rdd_reformat.take(2)
result_n_rdd_reformat = result_n_rdd.mapPartitions(rdd_reformat).cache()
# result_n_rdd_reformat.count()
# result_n_rdd_reformat.take(2)

#create numpy array for all vectors
def np_array_vectors(x):
    vec_list = []
    for i in x:
        vec = i[2]
        vec_list.append(vec)
    return vec_list

#positive array
result_p_vec_list = result_p_rdd_reformat.mapPartitions(np_array_vectors).cache()
result_p_vec_array = np.array(result_p_vec_list.collect())
# print(result_p_vec_array.shape)
# result_p_vec_array[0]
#negative array
result_n_vec_list = result_n_rdd_reformat.mapPartitions(np_array_vectors).cache()
result_n_vec_array = np.array(result_n_vec_list.collect())
# print(result_n_vec_array.shape)
# result_n_vec_array[0]

#caculate positive average distance
def calculate_p_w2v_distance(x):
    dis = []
    dis_mean = []
    v1 = x
    for v in result_p_vec_array:
        dot_value = 1 - float((np.dot(v1, v)) / (np.linalg.norm(v1)*np.linalg.norm(v)))
        dis.append(dot_value)

    a = np.mean(dis)
    dis_mean.append(a)
    return dis_mean

#caculate negative average distance
def calculate_n_w2v_distance(x):
    dis = []
    dis_mean = []
    v1 = x
    for v in result_n_vec_array:
        dot_value = 1 - float((np.dot(v1, v)) / (np.linalg.norm(v1)*np.linalg.norm(v)))
        dis.append(dot_value)

    a = np.mean(dis)
    dis_mean.append(a)
    return dis_mean

#positive average distance (take longer time)
average_distance_p_w2v = result_p_vec_list.map(calculate_p_w2v_distance).cache()
# average_distance_p_w2v.take(5)
#negative average distance (take longer time)
average_distance_n_w2v = result_n_vec_list.map(calculate_n_w2v_distance).cache()
# average_distance_n_w2v.take(5)

#zip two rdd: (review_id, review_sentence, vector, average_distance)
#positive zip
zip_rdd_p_w2v = result_p_rdd_reformat.zip(average_distance_p_w2v).map(lambda x: (x[0][0],x[0][1],x[0][2],x[1][0]))
# zip_rdd_p_w2v.take(5)
#negative zip
zip_rdd_n_w2v = result_n_rdd_reformat.zip(average_distance_n_w2v).map(lambda x: (x[0][0],x[0][1],x[0][2],x[1][0]))
# zip_rdd_n_w2v.take(5)

#sort and find center vector
#positive class center
zip_sort_p_w2v = zip_rdd_p_w2v.sortBy(lambda x: x[-1])
p_center_list_w2v = zip_sort_p_w2v.take(1)
p_center_w2v = (p_center_list_w2v[0][0], p_center_list_w2v[0][1])
p_center_vec_w2v = p_center_list_w2v[0][2]
print(p_center_w2v)
print("-----------------")
#negative class center
zip_sort_n_w2v = zip_rdd_n_w2v.sortBy(lambda x: x[-1])
n_center_list_w2v = zip_sort_n_w2v.take(1)
n_center_w2v = (n_center_list_w2v[0][0], n_center_list_w2v[0][1])
n_center_vec_w2v = n_center_list_w2v[0][2]
print(n_center_w2v)

VBox()

('R2JHEVBBH3TIJZ', 'Bloody Valentine has to be about the best song on the cd, even though its about killing someone its an awesome song  and ive come to this conclusion because so many people i know have told me how much they love it')
-----------------
('RV9TLI1TKAOYP', 'They have made the average high school idiot think that being punk is about your clothes and how many peircings you have, and again I can understand why punks want to go Columbine on the preppy people who think Good Charlotte is rated in the same class as Anti Flag or The Misfits')

In [20]:
#find positive nearest 10 vector
def neareast_p_w2v_10(x):
    v = x[2]
    rid = x[0]
    dot_value = 1 - float((np.dot(p_center_vec_w2v, v)) / (np.linalg.norm(p_center_vec_w2v)*np.linalg.norm(v)))
    return (rid, dot_value)

#positive neareast 10
id_dis_p_w2v = result_p_rdd_reformat.map(neareast_p_w2v_10)
near_10_p_w2v = id_dis_p_w2v.sortBy(lambda x: x[1])
near_10_p_w2v_list = near_10_p_w2v.take(11)
near_10_p_w2v_list = near_10_p_w2v_list[1::]
for i in near_10_p_w2v_list:
    print(i[0])

print("-----------------")

#find negative nearest 10 vector
def neareast_n_w2v_10(x):
    v = x[2]
    rid = x[0]
    dot_value = 1 - float((np.dot(n_center_vec_w2v, v)) / (np.linalg.norm(n_center_vec_w2v)*np.linalg.norm(v)))
    return (rid, dot_value)

#negative neareast 10
id_dis_n_w2v = result_n_rdd_reformat.map(neareast_n_w2v_10)
near_10_n_w2v = id_dis_n_w2v.sortBy(lambda x: x[1])
near_10_n_w2v_list = near_10_n_w2v.take(11)
near_10_n_w2v_list = near_10_n_w2v_list[1::]
for i in near_10_n_w2v_list:
    print(i[0])

VBox()

R11K39GJQIHVM7
R1HWMPUYUW9KMT
RY3PCDQ80U0O8
R36KVYD91K4JDD
R2TWK4QDP00KYK
RWHWS0A2IKWLB
RK9SGITJHUAEX
R1ZXWNL7DIQNK4
R2K78UJNJE4RKO
R22CD2HMRRLJ7L
-----------------
RBPUHIYNI8FF9
R2KFKTCQFRJI05
R302XPK0GQ0VWP
R2E3X3CIMYF5JT
RV9TLI1TKAOYP
R4JQMJ7S6K92X
R1E4HC0RBS6O2Q
R3MGK1ZXH61TIS
R24DPPKVHC5Q7G
RTWYBI9VBUZCY