In [1]:
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1558448443060_0002,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark = SparkSession \
    .builder \
    .appName("COMP5349_ASS2") \
    .getOrCreate()

VBox()

## Stage 1

In [3]:
#load data
music_data = 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz'
mus = spark.read.csv(music_data, header=True,sep='\t').cache()

VBox()

In [4]:
# Total number of reviews
mus.count()

VBox()

4751577

In [5]:
# The number of unique users
mus.dropDuplicates(["customer_id"]).count()

VBox()

1940732

In [6]:
# The number of unique products
mus.dropDuplicates(["product_id"]).count()

VBox()

782326

In [7]:
# The largest number of reviews published by a single user
user_rev = mus.groupBy("customer_id").count().orderBy("count", ascending=False).cache()
user_rev.select("count").first()[0]

VBox()

7168

In [8]:
# The top10 users ranked by the number of reviews they publish
user_rev.select("customer_id").take(10)

VBox()

[Row(customer_id='50736950'), Row(customer_id='38214553'), Row(customer_id='51184997'), Row(customer_id='18116317'), Row(customer_id='23267387'), Row(customer_id='50345651'), Row(customer_id='14539589'), Row(customer_id='15725862'), Row(customer_id='19380211'), Row(customer_id='20018062')]

In [9]:
# The median number of reviews published by a user
user_rev.approxQuantile("count", [0.5], 0)[0]

VBox()

1.0

In [10]:
# The largest number of reviews written for a single product
prod_rev = mus.groupBy("product_id").count().orderBy("count", ascending=False).cache()
prod_rev.select("count").first()[0]

VBox()

3936

In [11]:
# The top10 products ranked by the number of reviews they have
prod_rev.select("product_id").take(10)

VBox()

[Row(product_id='B00008OWZG'), Row(product_id='B0000AGWEC'), Row(product_id='B00MIA0KGY'), Row(product_id='B00NEJ7MMI'), Row(product_id='B000089RVX'), Row(product_id='B004EBT5CU'), Row(product_id='B0026P3G12'), Row(product_id='B00009PRZF'), Row(product_id='B00004XONN'), Row(product_id='B00006J6VG')]

In [12]:
# The median number of reviews a product has
prod_rev.approxQuantile("count", [0.5], 0)[0]

VBox()

2.0

## Stage 2

In [13]:
# Remove reviews published by users with less than median number of reviews published

#but we got median number of reviews per user is 1.0, we decided to remove anyway.
from pyspark.sql.window import Window
from pyspark.sql.functions import count
window = Window \
    .partitionBy("customer_id") \
    #.orderBy("ts")
mus_cus = mus.withColumn("n", count("customer_id") \
    .over(window)) \
    .filter("n > 1") \
    .drop("n").cache()
mus_cus.count()

VBox()

3408839

In [14]:
# Remove reviews from products with less than median number of reviews received
window = Window \
    .partitionBy("product_id") \
    #.orderBy("ts")
mus_pro = mus_cus.withColumn("n", count("product_id") \
    .over(window)) \
    .filter("n > 2") \
    .drop("n").cache()

mus_pro.count()

VBox()

2858386

In [20]:
# Using "… . ! ?" to split sentences

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re

def split_and_remove_empty (x):
    x = str(x)
    x_row = re.split(r"[.!?…]", x)
    x_row_r = [x for x in x_row if x != "" and x != " "]
    return x_row_r

split_re = udf(split_and_remove_empty, ArrayType(StringType(), containsNull=False))
# mus_pro.select(split_re(mus_pro.review_body).alias('split_re'))

mus_pro_split = mus_pro.withColumn("split_re", split_re(mus_pro.review_body).alias('review_body')).cache()
mus_pro_split.count()

VBox()

2858386

In [16]:
# Remove reviews with less than two sentences in the review body. 

from pyspark.sql.functions import col, size

mus_processed = mus_pro_split.filter(size(col("split_re")) > 2).cache()
mus_processed.count()

VBox()

2187835

In [17]:
# Add a column to record the median of sentences
mus_pro_split_count = mus_processed.withColumn("split_count", size(col("split_re"))-1).cache()

VBox()

In [18]:
# Top 10 users ranked by median number of sentences in the reviews they have published

from pyspark.sql import Window
import pyspark.sql.functions as F

#grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(split_count, 0.5)')

#df.withColumn('med_val', magic_percentile.over(grp_window))

mg = mus_pro_split_count.groupBy("customer_id").agg(magic_percentile.alias('med_val'))

mg.orderBy("med_val", ascending=False).show(10)

VBox()

+-----------+-------+
|customer_id|med_val|
+-----------+-------+
|   25628286|    272|
|   50595705|    222|
|   23717536|    219|
|   43879820|    196|
|   17821650|    184|
|   15585529|    181|
|   37733322|    167|
|   49916132|    158|
|   22109829|    157|
|   44348856|    157|
|   27767670|    154|
|   12365011|    152|
|   36306364|    150|
|   35279755|    146|
|   14544989|    146|
|   21065408|    144|
|   19493393|    143|
|   53024265|    140|
|   29061899|    139|
|   16273696|    139|
+-----------+-------+
only showing top 20 rows

In [19]:
# Top 10 products ranked by median number of sentences in the reviews they have published

from pyspark.sql import Window
import pyspark.sql.functions as F

#grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(split_count, 0.5)')

#df.withColumn('med_val', magic_percentile.over(grp_window))

mg_p = mus_pro_split_count.groupBy("product_id").agg(magic_percentile.alias('med_val'))

mg_p.orderBy("med_val", ascending=False).show(10)

VBox()

+----------+-------+
|product_id|med_val|
+----------+-------+
|B000003G29|    466|
|B005ZHBBU6|    320|
|B008LA8E9K|    205|
|B00IROI9BS|    187|
|B00AP5M4WM|    175|
|B000S5LQBO|    163|
|B000ASAEI0|    163|
|B007929F9E|    160|
|B00LF0GLXO|    155|
|B000KJTCOG|    153|
|B0074B10ES|    151|
|B00L47E4VE|    147|
|B000002B6S|    144|
|B00644JI1S|    137|
|B00N3B2290|    133|
|B00TILNZFY|    133|
|B00061NSU2|    128|
|B0000BHIKV|    127|
|B00KFTW0ZE|    126|
|B003Z4Y5HW|    125|
+----------+-------+
only showing top 20 rows

## Stage 3

In [20]:
# Select 1 product from top 10 product
mus_1_pro = mus_processed.filter(mus_processed['product_id']=='B00006J6VG').cache()

VBox()

787

In [21]:
# Divide into positive and negative
mus_1_pro_pos = mus_1_pro.filter("star_rating >= 4").cache()
mus_1_pro_neg = mus_1_pro.filter("star_rating <= 2").cache()

VBox()

In [22]:
# Make index RDD to find back the review id and review body 

def flat_id(tup):
    for val in tup[1]:
        yield tup[0]

def flat_val(tup):
    for val in tup[1]:
        yield val

### Negative
mus_1_pro_neg_id_rdd = mus_1_pro_neg.rdd \
                    .map(lambda p: (p.review_id, p.split_re)) \
                    .flatMap(flat_id)

mus_1_pro_neg_val_rdd = mus_1_pro_neg.rdd \
                    .map(lambda p: (p.review_id, p.split_re)) \
                    .flatMap(flat_val)

id_list_neg = mus_1_pro_neg_id_rdd.collect()
val_list_neg = mus_1_pro_neg_val_rdd.collect()

neg_input = sc.parallelize(val_list_neg)

### positive
mus_1_pro_pos_id_rdd = mus_1_pro_pos.rdd \
                    .map(lambda p: (p.review_id, p.split_re)) \
                    .flatMap(flat_id)

mus_1_pro_pos_val_rdd = mus_1_pro_pos.rdd \
                    .map(lambda p: (p.review_id, p.split_re)) \
                    .flatMap(flat_val)

id_list_pos = mus_1_pro_pos_id_rdd.collect()
val_list_pos = mus_1_pro_pos_val_rdd.collect()

pos_input = sc.parallelize(val_list_pos)

VBox()

In [23]:
### Sentence embedding

import tensorflow as tf
import tensorflow_hub as hub
def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    rev_text_list = [text for text in rev_text_partition]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings

VBox()

In [24]:
# Cache embedded data
mus_1_pro_neg_embedding = neg_input.mapPartitions(review_embed)

mus_1_pro_pos_embedding = pos_input.mapPartitions(review_embed)

VBox()

In [25]:
#Using numpy to calculate cosine distance
#Cosine distance = 1 - cosine similarity
#Since google universal sentence encoder returns normalized value, 
#the inner product of encodings can be treated as a cosine similarity matrix
#                       x * y
#cosine similarity = -----------
#                     |x| * |y|
#|x| == |y| == 1
import numpy as np

neg_embedding_list = mus_1_pro_neg_embedding.collect()
neg_matrix = np.inner(neg_embedding_list, neg_embedding_list)
pos_embedding_list = mus_1_pro_pos_embedding.collect()
pos_matrix = np.inner(pos_embedding_list, pos_embedding_list)

#Use identity matrix - similarity matrix to calculate distance matrix
i_matrix_neg = np.ones(neg_matrix.shape)
neg_matrix_distance = i_matrix_neg - neg_matrix
i_matrix_pos = np.ones(pos_matrix.shape)
pos_matrix_distance = i_matrix_pos - pos_matrix

#Calculate average distance of one sentence to all other sentences
line_distance_neg = neg_matrix_distance.sum(axis=0)/len(neg_matrix_distance)
line_distance_pos = pos_matrix_distance.sum(axis=0)/len(pos_matrix_distance)

#Find out center sentence in each class
cen_neg = np.argmin(line_distance_neg)
cen_pos = np.argmin(line_distance_pos)

#Find out 10 nearest sentences to center sentence in each class
cen_ten_neg = np.argsort(neg_matrix_distance[cen_neg])
cen_ten_pos = np.argsort(pos_matrix_distance[cen_pos])

#Print output
# print("negative class average distance:")
# print(neg_matrix_distance.mean())
# print("positive class average distance:")
# print(pos_matrix_distance.mean())

#Print output
print("negative center sentence:")
print(id_list_neg[cen_neg],val_list_neg[cen_neg])
print("negative nearest 10 sentences:")
for x in cen_ten_neg[1:11]:
  print(id_list_neg[x],val_list_neg[x].strip())

print("positive center sentence:")
print(id_list_neg[cen_neg],val_list_pos[cen_pos])
print("positive nearest 10 sentences:")
for x in cen_ten_pos[1:11]:
  print(id_list_pos[x],val_list_pos[x].strip())


VBox()

negative center sentence:
R2L4PZC7CHGQ4R  This album is much too insipid for people who like to listen to real music
negative nearest 10 sentences:
R36BAXRVCR2PFB I bet most people who like this album don't even like real punk
R2UBEJ5JX4PKX1 Almost everything that is wrong with mainstream music today, is represented in this album
R195DYX83KWYXU Simply put, this album is mediocre at best
R3QG1GKJO8IL4K This album has in no way let down that genre
R1DP63VJ4NXY44 This album absolutely sucks
R1DP63VJ4NXY44 This album is full of teeny-bopper pop songs disguised as rock
R1CA8OIEZ0B22Y I do like Blink 182 but most of this type of this music is junk
RKN17VFTZQ69P This album is horrid
RATB9UCW9ZV0B This album is a waste of time and money
RKEOBZVEWY7RW those sad people saying this is 'the greatest album in years' need to listen to some proper music, not watch Mtv all day
positive center sentence:
R2L4PZC7CHGQ4R  just an awesome song
positive nearest 10 sentences:
R3OSCPFNN3SAAS AWESOME SONG
R2EK

## Stage 4

In [26]:
##convert to df to process
from pyspark.sql import Row
row = Row("val") # Or some other column name
neg_input_df = neg_input.map(row).toDF()
pos_input_df = pos_input.map(row).toDF()

VBox()

In [27]:
##define a function remove punctuation and tab and split token 
import re
from pyspark.sql import Row
def normalize_name(name):
    name = re.sub(r"[.,\/#!$%\^\*;:{}=\_`~()@]", ' ', name)
    name = re.sub(r'\s+', ' ', name).strip()
    name = [tok for tok in re.split(" ",name) if tok]
    return name

VBox()

In [28]:
##use udf to preprocessing the review.
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType
normal = udf(normalize_name,ArrayType(StringType()))
neg_input_df_token = neg_input_df.withColumn("vec",normal(neg_input_df['val'])).drop("val")
neg_input_df_token.show()
pos_input_df_token = pos_input_df.withColumn("vec",normal(pos_input_df['val'])).drop("val")
pos_input_df_token.show()
# from pyspark.ml.feature import StopWordsRemover
# swr = StopWordsRemover(inputCol = 'vec', outputCol = 'data')

# neg_input_df_token_swr = swr.transform(neg_input_df_token).drop("vec")
# neg_input_df_token_swr.show(3)

VBox()

+--------------------+
|                 vec|
+--------------------+
|               [yep]|
|[I, guess, the, l...|
|[the, happiest, d...|
|[this, group, is,...|
|[its, another, bo...|
|[its, all, an, im...|
|[I'm, not, sure, ...|
|[oh, hell, they'r...|
|[Stop, infesting,...|
|[<br, >, <br, >Lo...|
|[realise, there, ...|
|[I, HATE, this, a...|
|[Whoever, said, t...|
|[The, songs, are,...|
|[Don't, waste, yo...|
|[<BR>I'd, have, g...|
|[This, album, abs...|
|[Whoever, said, t...|
|[The, songs, on, ...|
|[This, album, is,...|
+--------------------+
only showing top 20 rows

+--------------------+
|                 vec|
+--------------------+
|[The, Young, And,...|
|[A, New, Beginnin...|
|[The, Anthem, -, ...|
|[Lifestyles, of, ...|
|[Wondering, -, 2,...|
|[The, Story, of, ...|
|[Girls, &, Boys, ...|
|[My, Bloody, Vale...|
|[Hold, On, -, 5, ...|
|[Riot, Girl, -, 5...|
|[Say, Anything, -...|
|[The, Day, that, ...|
|[The, Young, and,...|
|[Emotionless, -, ...|
|  [Movin, On, -4, 5]|
|[I, rea

In [29]:
##add index to all review_split
##convert df to rdd to tfidf and cos-sim easily
neg_list_id = [i for i in range(len(id_list_neg))]
pos_list_id = [i for i in range(len(id_list_pos))]

neg_data_array = [row['vec'] for row in neg_input_df_token.collect()]
pos_data_array = [row['vec'] for row in pos_input_df_token.collect()]

neg_data_rdd = sc.parallelize(neg_data_array)
pos_data_rdd = sc.parallelize(pos_data_array)

neg_ind_rdd = sc.parallelize(neg_list_id)
pos_ind_rdd = sc.parallelize(pos_list_id)
##using HashingTF, hashing 100000 dimension vector, reflect all word on it.
from pyspark.mllib.feature import HashingTF, IDF
hashingTF = HashingTF(100000)
tf_neg = hashingTF.transform(neg_data_rdd)
tf_pos = hashingTF.transform(pos_data_rdd)
tf_neg.cache()
tf_pos.cache()
###compute tfidf and convert all sentence to vecotor
idf_neg = IDF().fit(tf_neg)
tfidf_neg = idf_neg.transform(tf_neg)
idf_pos = IDF().fit(tf_pos)
tfidf_pos = idf_pos.transform(tf_pos)

### all vector from tfidf normalized and compute cos similarity
from pyspark.mllib.feature import Normalizer
normalizer = Normalizer()
data_neg = neg_ind_rdd.zip(normalizer.transform(tfidf_neg))
data_pos = pos_ind_rdd.zip(normalizer.transform(tfidf_pos))
##using cartesian product get all comination with each vector point and compute dot with them separately.
neg_data_cos = data_neg.cartesian(data_neg)\
                .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
                .cache()

pos_data_cos = data_pos.cartesian(data_pos)\
                .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
                .cache()
##convert to df and rename column
neg_data_cos_rdd = neg_data_cos.map(lambda j: (j[0][0], j[0][1], j[1])).map(lambda j : (int(j[0]),int(j[1]), float(j[2])))
test_neg_df = neg_data_cos_rdd.toDF()
neg_cos_mar = test_neg_df.selectExpr("_1 as i", "_2 as j" ,"_3 as cos")


pos_data_cos_rdd = pos_data_cos.map(lambda j: (j[0][0], j[0][1], j[1])).map(lambda j : (int(j[0]),int(j[1]), float(j[2])))
test_pos_df = pos_data_cos_rdd.toDF()
pos_cos_mar = test_pos_df.selectExpr("_1 as i", "_2 as j" ,"_3 as cos")



VBox()

In [30]:
###compute 1-cos and get the center sentence
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, IntegerType
def red_1(val):
    return 1-val

reduce_1 = udf(red_1, DoubleType())
neg_cos_df_cos_1 = neg_cos_mar.withColumn("1-cos", reduce_1(neg_cos_mar.cos).alias("cos"))\
                .drop("cos")

pos_cos_df_cos_1 = pos_cos_mar.withColumn("1-cos", reduce_1(pos_cos_mar.cos).alias("cos"))\
                .drop("cos")

neg_cos_df_cos_1.groupBy('i').sum('1-cos').orderBy("sum(1-cos)").show(10)
pos_cos_df_cos_1.groupBy('i').sum('1-cos').orderBy("sum(1-cos)").show(10)
print("negative reviews center ", id_list_neg[1954], val_list_neg[1954])
print("---------------")
print("positive reviews center ", id_list_pos[4909], val_list_pos[4909])
print("-----------------------")


VBox()

+----+------------------+
|   i|        sum(1-cos)|
+----+------------------+
|1954| 3003.172262305627|
|2018|3006.2644375244627|
|1361|3012.1878366297365|
|2962|3013.6289608350694|
|2726|3016.7240764834323|
|1503| 3017.455062704513|
|1084| 3019.195963646621|
|3051| 3020.394893524665|
|1545| 3022.601971291658|
|1826|3023.2426404057132|
+----+------------------+
only showing top 10 rows

+----+-----------------+
|   i|       sum(1-cos)|
+----+-----------------+
| 823|4909.972432796806|
|3062|4922.339457731504|
|3850|4928.213983743272|
| 291|4931.992705301235|
| 968|4933.544159004289|
|4040|4937.001879685455|
|4927|4937.547664683688|
|4484|4939.725924378351|
|4159|4941.830710425537|
|3834|4941.884629920205|
+----+-----------------+
only showing top 10 rows

negative reviews center  RV9TLI1TKAOYP  I play music and I am rating Good Charlotte on their music which is just plain horrible, I mean they have five people in their band, the lyrics are terrible but catchy, and if you are in high sc

In [31]:
##show top 10 sentence arount the neg/pos center 
print("negative reviews center ", id_list_neg[1954], val_list_neg[1954])
print("---------------")
neg_stage4_list = neg_cos_df_cos_1.filter("i==1954").orderBy("1-cos").select("j").head(11)
neg_stage4_list_10 = [int(row.j) for row in neg_stage4_list[1:11]]
print("top 10 sentence of negative center")
for k in neg_stage4_list_10:
    print(id_list_neg[k], val_list_neg[k])
print("---------------------")    
    
    
print("positive reviews center ", id_list_pos[4909], val_list_pos[4909])
print("-----------------------")
pos_stage4_list = pos_cos_df_cos_1.filter("i==4909").orderBy("1-cos").select("j").head(11)
pos_stage4_list_10 = [int(row.j) for row in pos_stage4_list[1:11]]
print("top 10 sentence of postive center")
for k in pos_stage4_list_10:
    print(id_list_pos[k], val_list_pos[k])

VBox()

negative reviews center  RV9TLI1TKAOYP  I play music and I am rating Good Charlotte on their music which is just plain horrible, I mean they have five people in their band, the lyrics are terrible but catchy, and if you are in high school this should be out of your CD player, if your in middle school then it is understandable
---------------
top 10 sentence of negative center
R2VP9WX4XP2YSB   Just go out and buy their new cd and be happy that you can now \\"fit in\\" with the punk rock kiddies in your high school
R317FWCY8Q6I4V   <br /> <br />Also, when you look at how unfortunate so many people in this country and this world are, you might see that you really don't have it all that bad in high school
R1P6AETM3HJIDO Good Charlotte used to be a good band but they sold out and now they are horrible
R3SW6OMLQW51ZJ  Their music is much more pop-like, their songs aren't about how they got picked on in high school and hating the &quot;in-crowd&quot; anymore
RV9TLI1TKAOYP  They have made the 