In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
from pyspark import SparkContext
import numpy as np
import pyspark.sql.functions as F
import tensorflow as tf
import tensorflow_hub as hub
import re
import nltk
from pyspark.sql.functions import desc
from pyspark.sql.types import FloatType
from nltk.stem import PorterStemmer

VBox()

Starting Spark application


KeyboardInterrupt: 

In [None]:
spark = SparkSession.builder.appName("Spark Text Encoder Music").getOrCreate()
rev_data = "s3://amazon-reviews-pds/tsv/amazon_reviews_us_Music_v1_00.tsv.gz"
revs = spark.read.csv(rev_data,header=True,sep='\t')

In [None]:
organized_data = revs.select(['customer_id','product_id','review_id','star_rating','review_body'])

In [None]:
product_review_number = organized_data.groupBy('product_id').count()

In [None]:
sort_by_reviews = product_review_number.sort(desc('count'))

In [None]:
product_with_most_reviews = sort_by_reviews.limit(1)

In [None]:
Top_product = product_with_most_reviews.join(organized_data,product_with_most_reviews.product_id == organized_data.product_id,how='right')

In [None]:
Top_product_filter = Top_product.filter(product_with_most_reviews.product_id.isNotNull()).select([product_with_most_reviews.product_id, 'review_body', 'star_rating', 'review_id'])

In [None]:
positive_reviews = Top_product_filter.filter("star_rating>=4").select(['review_id','review_body'])
negative_reviews = Top_product_filter.filter("star_rating<=2").select(['review_id','review_body'])

In [None]:
ps = PorterStemmer()
def stopword(line):
    line = line.strip(' ')
    stpwrds = ["a","about","above","after","again","against","all","am","an","and","any","are","aren\'t","as","at","be","because","been","before","being","below","between","both","but","by","can\'t","cannot","could","couldn\'t","did","didn\'t","do","does","doesn\'t","doing","don\'t","down","during","each","few","for","from","further","had","hadn\'t","has","hasn\'t","have","haven\'t","having","he","he\'d","he\'ll","he\'s","her","here","here\'s","hers","herself","him","himself","his","how","how\'s","i","i\'d","i\'ll","i\'m","i\'ve","if","in","into","is","isn\'t","it","it\'s","its","itself","let\'s","me","more","most","mustn\'t","my","myself","no","nor","not","of","off","on","once","only","or","other","ought","our","ours","out","over","own","same","shan\'t","she","she\'d","she\'ll","she\'s","should","shouldn\'t","so","some","such","than","that","that\'s","the","their","theirs","them","themselves","then","there","there\'s","these","they","they\'d","they\'ll","they\'re","they\'ve","this","those","through","to","too","under","until","up","very","was","wasn\'t","we","we\'d","we\'ll","we\'re","we\'ve","were","weren\'t","what","what\'s","when","when\'s","where","where\'s","which","while","who","who\'s","whom","why","why\'s","with","won\'t","would","wouldn\'t","you","you\'d","you\'ll","you\'re","you\'ve","your","yours","yourself","yourselves","br"]
    final_line = []
    for word in line:
        if word.lower() not in stpwrds:
            word = re.sub('[^A-Za-z0-9]+', '', word)
            final_line.append(word)
    final_line = ' '.join(final_line)
    return final_line

In [None]:
def filterlines(line):
    temp_list = []
    review_body = re.split('[?.!]', str(line))
    for line in review_body:
        line = line.strip()
        line = stopword(line)
        if line != '':
            temp_list.append(line)
    return (temp_list)

In [None]:
positive_reviews = positive_reviews.rdd.map(lambda x : x)
negative_reviews = negative_reviews.rdd.map(lambda x : x)

In [None]:
positive_reviews_list = positive_reviews.map(lambda x : (x[0], filterlines(x[1])))
negative_reviews_list = negative_reviews.map(lambda x : (x[0], filterlines(x[1])))

In [None]:
def flatlist(listitem):
    flat_list = [item for sublist in listitem for item in sublist]
    return flat_list

In [None]:
positive_reviews_list = positive_reviews_list.groupByKey().map(lambda x : (x[0], list(x[1])))
negative_reviews_list = negative_reviews_list.groupByKey().map(lambda x : (x[0], list(x[1])))

In [None]:
positive_reviews_list = positive_reviews_list.map(lambda x : (x[0], flatlist(x[1])))
negative_reviews_list = negative_reviews_list.map(lambda x : (x[0], flatlist(x[1])))

In [None]:
final_positive_df = spark.createDataFrame(positive_reviews_list, ['review_id','review_body']).select(['review_id', 'review_body'])
final_negative_df = spark.createDataFrame(negative_reviews_list, ['review_id','review_body']).select(['review_id', 'review_body'])

In [None]:
rev_pos_text_rdd = final_positive_df.rdd.map(lambda row: (str(row[0]), str(row[1]))).filter(lambda data: data[1] is not None).cache()
rev_neg_text_rdd = final_negative_df.rdd.map(lambda row: (str(row[0]), str(row[1]))).filter(lambda data: data[1] is not None).cache()

In [None]:
def review_embed(rev_text_partition):
    module_url = "https://tfhub.dev/google/universal-sentence-encoder/2" #@param ["https://tfhub.dev/google/universal-sentence-encoder/2", "https://tfhub.dev/google/universal-sentence-encoder-large/3"]
    embed = hub.Module(module_url)
    # mapPartition would supply element inside a partition using generator stype
    # this does not fit tensorflow stype
    #rev_text_list1 = [text[1] for text in rev_text_partition]
    #rev_text_list = [text[1] for text in rev_text_partition]
    
    rev_text_list = []
    rev_id_list = []
    for text in rev_text_partition:
        rev_text_list.append(text[1])
        rev_id_list.append(text[0])
    
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        message_embeddings = session.run(embed(rev_text_list))
    result = []
    
    for i, me in enumerate(np.array(message_embeddings).tolist()):
        print("Message: {}".format(rev_text_list[i]))
        print("Review_id: {}".format(rev_text_list[i]))
        print("Embedding size: {}".format(len(me)))
        result.append((rev_id_list[i], rev_text_list[i], me))
        
    return result

In [None]:
pos_review_embedding = rev_pos_text_rdd.mapPartitions(review_embed).cache()
neg_review_embedding = rev_neg_text_rdd.mapPartitions(review_embed).cache()

In [None]:
pos_review_embedding_df = spark.createDataFrame(pos_review_embedding, ['review_id', 'review_body', 'review_vector'])

In [None]:
neg_review_embedding_df = spark.createDataFrame(neg_review_embedding, ['review_id', 'review_body', 'review_vector'])

In [None]:
import numpy as np

def cos_sim(vector_a, vector_b):
    """
    计算两个向量之间的余弦相似度
    :param vector_a: 向量 a 
    :param vector_b: 向量 b
    :return: sim
    """
    vector_a = np.mat(vector_a)
    vector_b = np.mat(vector_b)
    num = float(vector_a * vector_b.T)
    denom = np.linalg.norm(vector_a) * np.linalg.norm(vector_b)
    cos = num / denom
    sim = 1-cos
    return round(float(sim), 4)

In [None]:
import itertools;

def cartesian_product(a, b):
    return itertools.product(a, b)

#pos_review_embedding_df = pos_review_embedding_df.limit(1)
pos_review_embedding_rdd = pos_review_embedding_df.rdd.map(lambda x:x)

In [None]:
neg_review_embedding_rdd = neg_review_embedding_df.rdd.map(lambda x:x)

In [None]:
num_list_pos = pos_review_embedding_rdd.cartesian(pos_review_embedding_rdd)

In [None]:
num_list_neg = neg_review_embedding_rdd.cartesian(neg_review_embedding_rdd)

In [None]:
num_list_pos2 = num_list_pos.map(lambda car : cos_sim(car[0][2], car[1][2])).filter(lambda x: x!=0)

In [None]:
num_list_neg2 = num_list_neg.map(lambda car : cos_sim(car[0][2], car[1][2])).filter(lambda x: x!=0)

In [None]:
num_list_pos3 = spark.createDataFrame(num_list_pos2, FloatType())

In [None]:
num_list_neg3 = spark.createDataFrame(num_list_neg2, FloatType())

In [None]:
from pyspark.sql.functions import col, avg
pos_mean_cosdis = num_list_pos3.select(avg(col('value'))) 

In [None]:
neg_mean_cosdis = num_list_neg3.select(avg(col('value'))) 

In [None]:
pos_mean_cosdis.take(1)

In [None]:
neg_mean_cosdis.take(1)