In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1558397948671_0004,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
#New SparkSession
#can not overwrite the configuration in cluster
spark = SparkSession \
    .builder \
    .appName("Spark Text Encoder example") \
    .getOrCreate()

VBox()

In [3]:
rev_data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
revs = spark.read.csv(rev_data,header=True,sep='\t')

VBox()

# Stage3 - 5.1 Postive vs. Negative Reviews
## 1. Remove HTML tag

In [4]:
# select one of the top10 product
productRecords = revs.filter(revs.product_id == 'B004EBT5CU')

VBox()

In [5]:
# select the column that matters 
refinedProRec = productRecords.select("review_id", "review_body", "star_rating")

VBox()

In [6]:
# remove the None review_body
rNoneContent = refinedProRec.filter(refinedProRec.review_body != '')

VBox()

In [7]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import lit
from pyspark.sql import functions as F

def filterout_htmltag(x):
    tag_pattern = re.compile('<.*?>')
    cleantag = re.sub(tag_pattern, ' ', x)
    return cleantag

clean_htmltag = udf(lambda x: filterout_htmltag(x), StringType())

removeHTag = rNoneContent.withColumn("review_body", lit(clean_htmltag(rNoneContent['review_body'])))

VBox()

## 2. Split review_body

**In the next cell, I use regular expression to split review_body into sentences and use the udf(user defined function) to apply it to the dataframe**

In [11]:
import re
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

pattern = re.compile(r'[\.\?\!]+ ')
def segmentReview(x):
    contentList = re.split(pattern, x)
    return contentList

contentOfSen = udf(lambda x: segmentReview(x), ArrayType(StringType()))#set type is important!!!!!!!!!!

VBox()

In [13]:
#filter the star_rating greate than 4 and add a new column "sentences" to the dataframe
posClass = removeHTag.filter(removeHTag.star_rating >= 4).withColumn("sentences", contentOfSen(removeHTag.review_body)).cache()

VBox()

In [14]:
#explode the "sentence" column into the different rows
from pyspark.sql.functions import split, explode
senPosClass = posClass.withColumn('sentence', explode(posClass.sentences))

VBox()

In [15]:
#filter the star_rating less than 2 and add a new column "sentences" to the dataframe
negClass = removeHTag.filter(removeHTag.star_rating <= 2).withColumn("sentences", contentOfSen(removeHTag.review_body)).cache()

VBox()

In [16]:
#explode the "sentence" column into the different rows
senNegClass = negClass.withColumn('sentence', explode(negClass.sentences))

VBox()

## 3. Google Pre-trained universal sentence encoder
embedded with Google Pre-trained universal sen- tence encoder. The result is a 512 dimension vector.
### Transfer DataFrame to RDD

In [17]:
#row[5] is the sentence column
posClassRdd = senPosClass.rdd.map(lambda row: str(row[4]))
negClassRdd = senNegClass.rdd.map(lambda row: str(row[4]))

VBox()

### Embed Universal-sentence-encoder

In [18]:
# embed function using Google pre-defined universal-sentence-encoder transfer sentence into Vectors
def senEmbed(sen):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    # sen : a iterable object contains the elements in the rev_text_partition
    # second loop is for the each sentence in the list
    # create a list for all sentence
    rev_text_list = [text for text in sen]
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    return message_embeddings

VBox()

In [19]:
#transfer the sentence of positive class & negitive class into vector
posSentenceEmbedding = posClassRdd.mapPartitions(senEmbed).cache()
negSentenceEmbedding = negClassRdd.mapPartitions(senEmbed).cache()

VBox()

# Stage3 - 5.2 Intra-Class Similarity
We want to find out if sentences in the same category are closely related with each other. The closeness is measured by average distance between points in the class. In our case, point refers to the sentence encoding and pair-wise distance is measured by Cosine distance. Cosine distance is computed as “1 − CosineSimilarity”. It has a value between 0 and 2.

In [22]:
# calculate the cosine similarity
def calCosine(vector1, vector2):
    result = np.dot(vector1,vector2)
    norm = np.linalg.norm(vector1)*np.linalg.norm(vector2)
    cos = result/norm
    return (1-cos)

VBox()

In [23]:
# calculate the average cosine similarity
def calAverage(list):
    list2 = []
    for x in list:
        list1 = []
        for y in list:
            if ((x == y).all()) == False:
                list1.append(calCosine(x, y))
        average = sum(list1)/len(list1)
        list2.append(average)
    return list2

VBox()

## positive class

In [24]:
embedLenPos = posSentenceEmbedding.count()

VBox()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 45446)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authenticate_and_accum_updates
    received_to

In [31]:
#transfer RDD to a list of Vecters
posSentenceEmbeddingVectors = posSentenceEmbedding.take(embedLenPos)

VBox()

In [None]:
#calculate the average dictance of every point
averListPos = calAverage(posSentenceEmbeddingVectors)

VBox()

### find back records
* using monotonically_increasing_id() to generate consecutive sentenceID, which is used to find back the record related to the point.
* Create a list contains sentenceID 
* Create a dictionary, key: average distance for each point, value: sentenceID

In [33]:
from pyspark.sql.functions import monotonically_increasing_id
senIDPos = senPosClass.withColumn("sentenceID", monotonically_increasing_id())
senIDListPos = [x for x in range(senIDPos.count())]
dicPos = dict(map(lambda x,y:[x,y],averListPos, senIDListPos))

VBox()

In [34]:
minAveragePos = min(averListPos)
minSenIDPos = dicPos[minAveragePos]
print(minSenIDPos)

VBox()

3240

In [35]:
#output the center
outputListPos = senIDPos.filter(senIDPos.sentenceID == minSenIDPos).select("review_id", "sentence").collect()

VBox()

**Find the Top10 Neighbor:**
1. vRDD - the rdd contains vectors
2. sID - the sentenceID of center
3. length - the length of RDD

In [36]:
#recalculate the cosine distance and put every distance into a dictionary as the key. The value is the sentenceID
def findTen(vRDD, sID, length):
    vList = vRDD.take(length) # list of vector
    dList = [] #list of cosine distance
    dic = {} 
    i = 0 # dictionary value
    tList =[] # top10 list
    for x in vList:
        if ((x == vList[sID]).all()) == False:
            cosine = calCosine(x, vList[sID])
            dList.append(cosine)
            dic[cosine] = i
            i += 1
    dList.sort() # sort the distance list
    for cos in dList[:10]:
        tList.append(dic[cos])
    return tList

VBox()

In [37]:
listIDPos = findTen(posSentenceEmbedding, minSenIDPos, embedLenPos)

VBox()

In [39]:
for i in listIDPos:
    tempListPos = []
    tempListPos = senIDPos.filter(senIDPos.sentenceID == i).select("review_id", "sentence").collect()
    outputListPos.append(tempListPos[0])

VBox()

In [40]:
count = 0
for i in outputListPos:
    if count == 0:
        print('Center Sentence:')
        print('review_id: ' + i.review_id)
        print('content of sentence: ' + i.sentence)
        print('----------------------------------------------------------------------------')
        print('Top 10  Neighbor:')
        count += 1
    else:
        print('review_id: ' + i.review_id)
        print('content of sentence: ' + i.sentence)
        print()

VBox()

Center Sentence:
review_id: R3SBLDHLGWU3CQ
content of sentence: I love Adele and this CD is fantastic
----------------------------------------------------------------------------
Top 10  Neighbor:
review_id: R3LNZ080XD5U72
content of sentence: I love Adele and this CD is just wonderful

review_id: R1A4J1EM06YPN2
content of sentence:  For fun, watch the SNL skit with Emma Stone as Adele's song is the topic.

review_id: R1ZCPBTGSZGCOB
content of sentence: Just buy it !

review_id: R3KAX2RX0I8QC4
content of sentence: I love this cd and I love all of the music on it if you like Adele get this CD.!!!!

review_id: R3QSLNGLJ04CTH
content of sentence: Great Singer-singing w/ thought provoking lyrics.

review_id: R7PWW0ZRAZ4EN
content of sentence: I love this CD because I think Adele is perfect

review_id: RTDKHB1K5H1NB
content of sentence: Love this CD from Adele - ALL the songs are great

review_id: RMIJKZJ4ZAYKB
content of sentence: I love Adele and was not disappointed with this CD

review_

## negitive class

In [41]:
embedLenNeg = negSentenceEmbedding.count()
negSentenceEmbeddingList = negSentenceEmbedding.take(embedLenNeg)
averListNeg = calAverage(negSentenceEmbeddingList)

VBox()

In [42]:
print(embedLenNeg)

VBox()

455

In [43]:
senIDNeg = senNegClass.withColumn("sentenceID", monotonically_increasing_id())
senIDListNeg = [x for x in range(senIDNeg.count())]
dicNeg = dict(map(lambda x,y:[x,y],averListNeg, senIDListNeg))
minAverageNeg = min(averListNeg)
minSenIDNeg = dicNeg[minAverageNeg]

VBox()

In [44]:
outputListNeg = senIDNeg.filter(senIDNeg.sentenceID == minSenIDNeg).select("review_id", "sentence").collect()

VBox()

In [45]:
listIDNeg = findTen(negSentenceEmbedding, minSenIDNeg, embedLenNeg)

VBox()

In [46]:
for i in listIDNeg:
    tempListPos = []
    tempListPos = senIDNeg.filter(senIDNeg.sentenceID == i).select("review_id", "sentence").collect()
    outputListNeg.append(tempListPos[0])

VBox()

In [47]:
count = 0
for i in outputListNeg:
    if count == 0:
        print('Center Sentence:')
        print('review_id: ' + i.review_id)
        print('content of sentence: ' + i.sentence)
        print('----------------------------------------------------------------------------')
        print('Top 10  Neighbor:')
        count += 1
    else:
        print('review_id: ' + i.review_id)
        print('content of sentence: ' + i.sentence)
        print()

VBox()

Center Sentence:
review_id: R14GO9494QD0UP
content of sentence: Adele's music has it all
----------------------------------------------------------------------------
Top 10  Neighbor:
review_id: R13TD1FE6KGHTY
content of sentence: Adele has the pipes, but the music is VERY derivative

review_id: RAV33939RLHCF
content of sentence:  I am a frequent Amazon shopper and this is the first time I have had any issues, but I am bummed that I have to give this as a gift.

review_id: R26FGK6YKP8C9H
content of sentence:  For those of us who remember the truly great performers, Adele is really average

review_id: R17HRZ4TJ0A8BX
content of sentence: PLEASE LISTEN TO THE BRITISH SINGER \\"RUMER\\" TO UNDERSTAND THE DIFFERENCE BETWEEN SINGING AND SHOUTING

review_id: R31NJOJB47HYS4
content of sentence: I must be in the minority-- I couldn't get to this album, or the artist either

review_id: R3BYX4UBOFLKQO
content of sentence: You want good music

review_id: RCYY788RCLY3E
content of sentence: Sorry if