In [1]:
import numpy as np
import nltk
import tensorflow as tf
import tensorflow_hub as hub
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,mean
import pyspark.sql.functions as f
from pyspark.ml.feature import PCA,VectorAssembler
from pyspark.ml.linalg import Vectors,VectorUDT,DenseVector
from pyspark.sql.types import DoubleType,ArrayType,IntegerType,StringType

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1558344694965_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark = SparkSession \
    .builder \
    .config('spark.executor.memory', '6g') \
    .config('spark.executor.cores', '3') \
    .config('spark.num.executor', '20') \
    .appName("Spark Text Encoder example") \
    .getOrCreate()

VBox()

In [10]:
rev_data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
revs = spark.read.csv(rev_data,header=True,sep='\t')

VBox()

In [12]:
#define udf for returning the list of the sentences
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
sentences = lambda paragraph: tokenizer.tokenize(paragraph)
udf_sentences_list = udf(sentences,ArrayType(StringType()))

VBox()

In [13]:
#select one product from the top 10 products list, and compute the list of sentences and stars
top_reviews = revs.filter((revs.product_id == 'B00006J6VG') & (revs.review_body.isNotNull())) \
    .withColumn("stars", revs.star_rating.cast(IntegerType())) \
    .select('review_body','stars','review_id') \
    .withColumn("sentences", udf_sentences_list(revs.review_body)) \
    .drop('review_body') \

VBox()

In [4]:
def review_embed(rev_partition):
    '''
    embedding the sentences to 512-d vectors by tensorflow framework
    '''
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    ids = []
    sen = []
    for item in rev_partition:
        ids.append(item[0])
        sen.append(item[1])
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        embedding_sen = session.run(embed(sen))
    return list(zip(ids,embedding_sen.tolist(),sen))

VBox()

In [5]:
#define positive class (stars >= 4) and negative class (star <= 2)
positive_class = top_reviews.filter("stars >= 4") \
                .drop('stars') \
                .rdd.map(lambda row : [row.review_id, row.sentences]) \
                .flatMap(lambda item : [[item[0], item[1][i]] for i in range(len(item[1]))]) \
                .mapPartitions(review_embed) \
                .cache()
negative_class = top_reviews.filter("stars <= 2") \
                .drop('stars') \
                .rdd.map(lambda row : [row.review_id, row.sentences]) \
                .flatMap(lambda item : [[item[0], item[1][i]] for i in range(len(item[1]))]) \
                .mapPartitions(review_embed) \
                .cache()

VBox()

In [7]:
#add index column
positive_class_df = spark.createDataFrame(positive_class,['review_id','array','sentence'])\
                    .withColumn('index',f.monotonically_increasing_id()) \
                    .cache()         
negative_class_df = spark.createDataFrame(negative_class,['review_id','array','sentence']) \
                    .withColumn('index',f.monotonically_increasing_id()) \
                    .cache()

VBox()

In [19]:
#transform the format of RDD from lsit to vector form
list_to_vector_udf = udf(lambda array: Vectors.dense(array), VectorUDT())
positive_class_vectors = positive_class_df.select(
    positive_class_df.review_id,
    list_to_vector_udf(positive_class_df.array).alias("vector")
)
negative_class_vectors = negative_class_df.select(
    negative_class_df.review_id,
    list_to_vector_udf(negative_class_df.array).alias("vector")
)

VBox()

In [20]:
# using PCA to reduce the dimension
# for positive and negative class reduce the dimension to 125 which contains 95% information
pca = PCA(k=125, inputCol="vector", outputCol="pca")
pca_result = lambda vectors : pca.fit(vectors).transform(vectors).select('pca')
positive_pca_result = pca_result(positive_class_vectors) \
                        .withColumn('index',f.monotonically_increasing_id())  
negative_pca_result = pca_result(negative_class_vectors) \
                        .withColumn('index',f.monotonically_increasing_id())
#delete the variables to recycle the memory
del positive_class_vectors
del negative_class_vectors

VBox()

In [21]:
#broadcast points to all nodes
positive_pca_broadcast = positive_pca_result.rdd.map(lambda item: (item[1],np.array(item[0]))).collect()
positive_pca_broadcast =sc.broadcast(positive_pca_broadcast)
negative_pca_broadcast = negative_pca_result.rdd.map(lambda item: (item[1],np.array(item[0]))).collect()
negative_pca_broadcast =sc.broadcast(negative_pca_broadcast)

VBox()

In [22]:
#calculating the cosine distance
def positive_cosine_dis(item):
    dis_list = []
    pca_bdt = positive_pca_broadcast.value
    cos = lambda vector1,vector2 : np.dot(vector1,vector2)/(np.linalg.norm(vector1)*(np.linalg.norm(vector2)))
    for i in range(item[0]+1,len(pca_bdt)):
        dis_list.append([pca_bdt[i][0],1-cos(item[1],pca_bdt[i][1])])
    return [[(item[0],i[0]), i[1].tolist()] for i in dis_list]
def negative_cosine_dis(item):
    dis_list = []
    pca_bdt = negative_pca_broadcast.value
    cos = lambda vector1,vector2 : np.dot(vector1,vector2)/(np.linalg.norm(vector1)*(np.linalg.norm(vector2)))
    for i in range(item[0]+1,len(pca_bdt)):
        dis_list.append([pca_bdt[i][0],1-cos(item[1],pca_bdt[i][1])])
    return [[(item[0],i[0]), i[1].tolist()] for i in dis_list]

VBox()

In [23]:
#define distance df for each class
positive_distance_rdd = positive_pca_result.rdd.repartition(120) \
    .map(lambda item: (item[1],np.array(item[0]))) \
    .flatMap(positive_cosine_dis) \
    .cache()
positive_distance_df = spark.createDataFrame(positive_distance_rdd, ['point','distance'])
negative_distance_rdd = negative_pca_result.rdd.repartition(120) \
    .map(lambda item: (item[1],np.array(item[0]))) \
    .flatMap(negative_cosine_dis) \
    .cache()
negative_distance_df = spark.createDataFrame(negative_distance_rdd, ['point','distance'])

VBox()

In [24]:
#calculate mean for the distance for each class
avg_positive_distance = positive_distance_df.select(mean(positive_distance_df.distance)).show()
avg_negative_distance = negative_distance_df.select(mean(negative_distance_df.distance)).show()

VBox()

+------------------+
|     avg(distance)|
+------------------+
|0.6807390314585274|
+------------------+

+------------------+
|     avg(distance)|
+------------------+
|0.7035416887820123|
+------------------+

In [25]:
#create the index -> distance_list df, the distance_list contains all points and distance it connects
def transform_distance_rdd(df):
    '''
    combine the key with connected all points and distance to form the new rdd 
    '''
    trans_rdd = df.rdd.flatMap(lambda item : [[item[0][0],(item[0][1],item[1])],[item[0][1],(item[0][0],item[1])]]) \
                                .mapValues(lambda val : [val]) \
                                .reduceByKey(lambda x,y: x+y)
    return trans_rdd
negative_distance_list_df = spark.createDataFrame(transform_distance_rdd(negative_distance_df),['index','distance_list'])
positive_distance_list_df = spark.createDataFrame(transform_distance_rdd(positive_distance_df),['index','distance_list'])

VBox()

In [26]:
def min_avg_distance(df):
    '''
    calculate the minimum avg distance for each point
    return the index of the minimum row that is the class centre
    '''
    avg_distance_udf = udf(lambda distance_list: float(np.mean(np.array(distance_list)[:,1],axis=0)), DoubleType())
    return df.withColumn('avg_distance',avg_distance_udf(df.distance_list)) \
                        .drop('distance_list') \
                        .rdd \
                        .reduce(lambda x,y : (x[0],x[1]) if (x[1] < y[1]) else (y[0],y[1]))[0]
positive_centre_index = min_avg_distance(positive_distance_list_df)
negative_centre_index = min_avg_distance(negative_distance_list_df)

VBox()

In [27]:
def get_neighbors_index(index,df,class_df):
    '''
    input: index of the class centre, df
    output: the 10 nearest neighbors list of the index
    
    '''
    rdd = df.filter(df.index == index).select('distance_list') \
            .rdd.flatMap(lambda data: [[item[0],item[1]]for item in data.distance_list])
    index_list = spark.createDataFrame(rdd,['index','distance']) \
                        .orderBy(f.asc('distance')) \
                        .limit(10) \
                        .select('index') \
                        .collect()
    index_list = [item['index'] for item in index_list] #top 10 nearest neighbors index list
    return index_list
positive_neighbors_index = get_neighbors_index(positive_centre_index,positive_distance_list_df,positive_class_df)
negative_neighbors_index = get_neighbors_index(negative_centre_index,negative_distance_list_df,negative_class_df)

VBox()

In [28]:
#class centre sentence and its 10 nearest neighbors
def show_centre_sentence(df,index):
    df.where(df.index == index).select('review_id','sentence').show(1,False)
def show_neighbors(df,index_list):
    df.where(df.index.isin(index_list)).select('review_id','sentence').show(10,False)
show_centre_sentence(positive_class_df,positive_centre_index)
show_centre_sentence(negative_class_df,negative_centre_index)
show_neighbors(positive_class_df,positive_neighbors_index)
show_neighbors(negative_class_df,negative_neighbors_index)

VBox()

+--------------+----------------------+
|review_id     |sentence              |
+--------------+----------------------+
|R32NXZWC3RKH76|Every song is awesome.|
+--------------+----------------------+

+--------------+---------------------------------------------------------------------------+
|review_id     |sentence                                                                   |
+--------------+---------------------------------------------------------------------------+
|R2L4PZC7CHGQ4R|This album is much too insipid for people who like to listen to real music.|
+--------------+---------------------------------------------------------------------------+

+--------------+-----------------------------------------------+
|review_id     |sentence                                       |
+--------------+-----------------------------------------------+
|R2L30H6HKZXUK7|every song is really good.                     |
|R144NZND4C5S5A|Every single song is GREAT!                    |
|R1VR2BO