In [1]:
-from pyspark.sql.functions import lit

# Load the three different product categories: Video Games, Books, and Home and Kitchen
# Add a category column for all three data sets

df1 = spark.sql("select * from default.video_games_5")
df1 = df1.withColumn("category", lit("video_games"))
                     
df2 = spark.sql("select * from default.books_5_small")
df2 = df2.withColumn("category", lit("books"))
                     
df3 = spark.sql("select * from default.home_and_kitchen_5_small")
df3 = df3.withColumn("category", lit("home_kitchen"))

# Combine the three data sets into one data set for data cleaning

df = df1.union(df2).union(df3)

# There are 3,487,331 observations and 12 original features and 1 added feature                     

print((df.count(), len(df.columns)))

In [2]:
# Quick summary statistics of Amazon Reviews data set

df.describe().show()

In [3]:
# Display raw amazon review data

display(df)

In [4]:
# Drop duplicates using a subset of features reviewerID and asin
# Number of duplicated observations removed is approximately 8.3 % (288,157 = 3,487,331 - 3,199,174)

print("Before duplication removal: ", df.count())
df_distinct = df.dropDuplicates(["reviewerID", "asin"])
print("After duplication removal: ", df_distinct.count())

In [5]:
# Convert Unix timestamp to readable date

from pyspark.sql.functions import from_unixtime, to_date

df_date = df_distinct.withColumn("reviewTime", to_date(from_unixtime(df_distinct.unixReviewTime))).drop("unixReviewTime")

In [6]:
# Fill in the empty vote column with 0, and convert it to numeric type

from pyspark.sql.types import *

df_fill_vote = df_date.withColumn("vote", df_date.vote.cast(IntegerType())).fillna(0, subset = ["vote"]) 

In [7]:
# Install nltk

! pip install nltk

# Install Spark NLP

! pip install --ignore-installed spark-nlp==2.4.5

In [8]:
# Adapted from https://github.com/maobedkova/TopicModelling_PySpark_SparkNLP/blob/master/Topic_Modelling_with_PySpark_and_Spark_NLP.ipynb
# Converts reviewText data into Spark NLP annotation format

from sparknlp.base import DocumentAssembler

documentAssembler = DocumentAssembler() \
                    .setInputCol("reviewText") \
                    .setOutputCol("reviewDocument")

In [9]:
# Tokenize data using Tokenizer

from sparknlp.annotator import Tokenizer

tokenizer = Tokenizer() \
            .setInputCols(["reviewDocument"]) \
            .setOutputCol("reviewTokenized")

In [10]:
# Download stop words from nltk package

import nltk
nltk.download("stopwords")

from nltk.corpus import stopwords

eng_stopwords = stopwords.words("english")

In [11]:
# Remove stop words 

from sparknlp.annotator import StopWordsCleaner

stopwords_cleaner = StopWordsCleaner() \
                    .setInputCols(["reviewTokenized"]) \
                    .setOutputCol("reviewSWRemoved") \
                    .setStopWords(eng_stopwords)

In [12]:
# Normalize data to lowercase

from sparknlp.annotator import Normalizer

normalizer = Normalizer() \
             .setInputCols(["reviewSWRemoved"]) \
             .setOutputCol("reviewNormalized") \
             .setLowercase(True)

In [13]:
# Lemmatize data

from sparknlp.annotator import LemmatizerModel

lemmatizer = LemmatizerModel.pretrained() \
             .setInputCols(["reviewNormalized"]) \
             .setOutputCol("reviewUnigrams")

In [14]:
# Create n-grams

from sparknlp.annotator import NGramGenerator

ngrammer = NGramGenerator() \
           .setInputCols(["reviewUnigrams"]) \
           .setOutputCol("reviewNgrams") \
           .setN(2) \
           .setEnableCumulative(True) \
           .setDelimiter('_')

In [15]:
# Use Part-of-Speech (POS) tagger

from sparknlp.annotator import PerceptronModel

pos_tagger = PerceptronModel.pretrained("pos_anc") \
             .setInputCols(["reviewDocument", "reviewUnigrams"]) \
             .setOutputCol("reviewPOS")

In [16]:
# Transform data with Finisher

from sparknlp.base import Finisher

finisher = Finisher() \
           .setInputCols(["reviewUnigrams", "reviewNgrams", "reviewPOS"])

In [17]:
# Create a Preprocessing Pipeline

from pyspark.ml import Pipeline

pipeline = Pipeline() \
           .setStages([documentAssembler,
                      tokenizer,
                      stopwords_cleaner,
                      normalizer,
                      lemmatizer,
                      ngrammer,
                      pos_tagger,
                      finisher])

In [18]:
# Fit the pipeline to training data

pipeline_fit = pipeline.fit(df_fill_vote)
df_transformed = pipeline_fit.transform(df_fill_vote)
df_transformed.show(5)

In [19]:
display(df_transformed)

In [20]:
# Join POS Tags

from pyspark.sql import types as T
from pyspark.sql import functions as F

udf_join_arr = F.udf(lambda x: " ".join(x), T.StringType())
df_transformed = df_transformed.withColumn("finished_reviewPOS", udf_join_arr(F.col("finished_reviewPOS")))

In [21]:
# Converts POS Tags into Spark NLP annotation format 

pos_documentAssembler = DocumentAssembler() \
                        .setInputCol("finished_reviewPOS") \
                        .setOutputCol("posDocument")

In [22]:
# Tokenize POS tags using Tokenizer

pos_tokenizer = Tokenizer() \
                .setInputCols(["posDocument"]) \
                .setOutputCol("posTokenized")

In [23]:
# Generate N-grams for POS Tags

pos_ngrammer = NGramGenerator() \
               .setInputCols(["posTokenized"]) \
               .setOutputCol("posNgrams") \
               .setN(2) \
               .setEnableCumulative(True) \
               .setDelimiter("_")

In [24]:
# Transform POS Tags with Finisher

pos_finisher = Finisher() \
               .setInputCols(["posTokenized", "posNgrams"])

In [25]:
# Create a Preprocessing Pipeline

pos_pipeline = Pipeline() \
               .setStages([pos_documentAssembler,
                          pos_tokenizer,
                          pos_ngrammer,
                          pos_finisher])

In [26]:
# Fit the Pipeline to Training Data

df_processed = pos_pipeline.fit(df_transformed).transform(df_transformed)
df_processed.show(5)

In [27]:
df_processed.columns

In [28]:
df_processed.select("finished_reviewUnigrams", "finished_posTokenized").limit(5).show()

In [29]:
df_processed.select("finished_reviewNgrams", "finished_posNgrams").limit(5).show()

In [30]:
# Create Function to Filter Out POS Tags

def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) 
            if pos in ["JJ", "NN", "NNS", "VB", "VBP"]]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

In [31]:
df_processed = df_processed.withColumn("filtered_unigrams",
                                               udf_filter_pos(F.col("finished_reviewUnigrams"), 
                                                              F.col("finished_posTokenized")))

In [32]:
df_processed.select("filtered_unigrams").limit(5).show(truncate = 90)

In [33]:
def filter_pos_combs(words, pos_tags):
  return [word for word, pos in zip(words, pos_tags)
         if (len(pos.split("_")) == 2 and \
            pos.split("_")[0] in ["JJ", "NN", "NNS", "VB", "VBP"] and \
            pos.split("_")[1] in ["JJ", "NN", "NNS"])]

udf_filter_pos_combs = F.udf(filter_pos_combs, T.ArrayType(T.StringType()))

In [34]:
df_processed = df_processed.withColumn("filtered_ngrams",
                                      udf_filter_pos_combs(F.col("finished_reviewNgrams"), F.col("finished_posNgrams")))

In [35]:
df_processed.select("filtered_ngrams").limit(5).show(truncate = 90)

In [36]:
#Combine Unigram and Ngrams

from pyspark.sql.functions import concat

df_processed = df_processed.withColumn("reviewFinal",
                                          concat(F.col("filtered_unigrams"),
                                                 F.col("filtered_ngrams")))

In [37]:
# Dropping temporary columns, and cache results (note that cache is also a lazy operation)

df_cleaned = df_processed.drop("finished_reviewUnigrams",
 "finished_reviewNgrams",
 "finished_reviewPOS",
 "finished_posTokenized",
 "finished_posNgrams",
 ).cache()

display(df_cleaned)

In [38]:
df_cleaned.printSchema()

In [39]:
#Convert filter_unigrams array to string for sentiment analysis
df_cleaned = df_cleaned.withColumn("reviewString", F.concat_ws(" ", "filtered_unigrams"))

In [40]:
display(df_cleaned)

In [41]:
from pyspark.sql.types import FloatType

from textblob import TextBlob

def sentiment_analysis(text):
  return TextBlob(text).sentiment.polarity

sentiment_analysis_udf = udf(sentiment_analysis, FloatType())

In [42]:
df_cleaned = df_cleaned.withColumn("sentiment_score", sentiment_analysis_udf(df_cleaned["reviewString"]))
df_cleaned.show(5, True)

In [43]:
# Review Length
df_cleaned = df_cleaned.withColumn("reviewLength", F.length("reviewString"))

In [44]:
# Review Word Count
df_cleaned = df_cleaned.withColumn("reviewWordcount", F.size(F.split(F.col("reviewString"), " ")))

In [45]:
display(df_cleaned)

In [46]:
df_cleaned.columns

In [47]:
#Convert df_cleaned dataframe to sql table
df_cleaned.createOrReplaceTempView("df_cleaned_table")

In [48]:
%sql
create table mma2021w_islington.df_cleaned as
select reviewID,
 overall,
 vote,
 verified,
 reviewTime,
 reviewerID,
 asin,
 label,
 category,
 reviewString,
 sentiment_score,
 reviewLength,
 reviewWordcount from df_cleaned_table

In [49]:
# Create a cleaned dataframe for books with label = 1
df_books_cleaned_1 = df_cleaned.filter((F.col("category") == "books") & (F.col("label") == 1))

In [50]:
display(df_books_cleaned_1).show(5)

In [51]:
counts_books_1 = df_books_cleaned_1.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [52]:
display(counts_books_1)

In [53]:
counts_books_2grams_1 = df_books_cleaned_1.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [54]:
display(counts_books_2grams_1).show(10)

In [55]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_books_cleaned_1)
tf_result = tf_model.transform(df_books_cleaned_1)

In [56]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [57]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [58]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [59]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [60]:
# Create a cleaned dataframe for books with label = 1
df_books_cleaned_0 = df_cleaned.filter((F.col("category") == "books") & (F.col("label") == 0))

In [61]:
display(df_books_cleaned_0).show(5)

In [62]:
counts_books_0 = df_books_cleaned_0.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [63]:
display(counts_books_0)

In [64]:
counts_books_2grams_0 = df_books_cleaned_0.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [65]:
display(counts_books_2grams_0).show(10)

In [66]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_books_cleaned_0)
tf_result = tf_model.transform(df_books_cleaned_0)

In [67]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [68]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [69]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [70]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [71]:
# Create a cleaned dataframe for home & kitchen with label = 1
df_home_kitchen_cleaned_1 = df_cleaned.filter((F.col("category") == "home_kitchen") & (F.col("label") == 1))

In [72]:
counts_home_kitchen_1 = df_home_kitchen_cleaned_1.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [73]:
display(counts_home_kitchen_1)

In [74]:
counts_home_kitchen_2grams_1 = df_home_kitchen_cleaned_1.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [75]:
display(counts_home_kitchen_2grams_1)

In [76]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_home_kitchen_cleaned_1)
tf_result = tf_model.transform(df_home_kitchen_cleaned_1)

In [77]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [78]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [79]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [80]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [81]:
# Create a cleaned dataframe for home & kitchen with label = 1
df_home_kitchen_cleaned_0 = df_cleaned.filter((F.col("category") == "home_kitchen") & (F.col("label") == 0))

In [82]:
counts_home_kitchen_0 = df_home_kitchen_cleaned_0.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [83]:
display(counts_home_kitchen_0)

In [84]:
counts_home_kitchen_2grams_0 = df_home_kitchen_cleaned_0.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [85]:
display(counts_home_kitchen_2grams_0)

col,count
work_great,71472
easy_use,33814
good_quality,25876
look_great,25420
great_product,24868
great_price,17957
easy_clean,16802
look_nice,15268
perfect_size,14416
work_fine,14286


In [86]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_home_kitchen_cleaned_0)
tf_result = tf_model.transform(df_home_kitchen_cleaned_0)

In [87]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [88]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [89]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [90]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [91]:
# Create a cleaned dataframe for home & kitchen with label = 1
df_video_games_cleaned_1 = df_cleaned.filter((F.col("category") == "video_games") & (F.col("label") == 1))

In [92]:
counts_video_games_1 = df_video_games_cleaned_1.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [93]:
display(counts_video_games_1)

col,count
game,558511
play,169729
get,154221
good,101725
time,98473
make,92805
go,78342
use,72834
great,62990
character,62253


In [94]:
counts_video_games_2grams_1 = df_video_games_cleaned_1.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [95]:
display(counts_video_games_2grams_1)

col,count
play_game,20834
game_play,10626
video_game,8790
good_game,8743
great_game,8411
buy_game,7882
make_game,7740
get_game,6107
single_player,5930
final_fantasy,5727


In [96]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_video_games_cleaned_1)
tf_result = tf_model.transform(df_video_games_cleaned_1)

In [97]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [98]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [99]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [100]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [101]:
# Create a cleaned dataframe for home & kitchen with label = 1
df_video_games_cleaned_0 = df_cleaned.filter((F.col("category") == "video_games") & (F.col("label") == 0))

In [102]:
counts_video_games_0 = df_video_games_cleaned_0.select(F.explode("filtered_unigrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [103]:
display(counts_video_games_0)

col,count
game,601818
play,193263
get,159670
good,147828
great,127033
time,93046
fun,83890
make,82711
use,80718
love,71810


In [104]:
counts_video_games_2grams_0 = df_video_games_cleaned_0.select(F.explode("filtered_ngrams").alias("col")).groupBy("col").count().sort(F.desc("count")).collect()

In [105]:
display(counts_video_games_2grams_0)

col,count
play_game,23249
great_game,22058
good_game,18100
game_play,13982
love_game,12548
fun_game,10576
video_game,9902
work_great,9584
buy_game,8658
game_great,8259


In [106]:
from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol = "reviewFinal", outputCol = "tf_Features")
tf_model = tfizer.fit(df_video_games_cleaned_0)
tf_result = tf_model.transform(df_video_games_cleaned_0)

In [107]:
from pyspark.ml.feature import IDF

idfizer = IDF(inputCol = "tf_Features", outputCol = "tf_idf_features")
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [108]:
from pyspark.ml.clustering import LDA

num_topics = 8
max_iter = 10

lda = LDA(k = num_topics, maxIter = max_iter, featuresCol = "tf_idf_features")
lda_model = lda.fit(tfidf_result)

In [109]:
vocab = tf_model.vocabulary

def get_words(token_list):
  return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [110]:
num_top_words = 8

topics = lda_model.describeTopics(num_top_words).withColumn("topicWords", udf_to_words(F.col("termIndices")))
topics.select("topic", "topicWords").show(truncate = 90)

In [111]:
#Count of each overall rating 

#from pyspark.sql.functions import col
#df_cleaned.groupBy("overall").count().orderBy(col("overall").asc()).show()

In [112]:
#The most common product IDs

#df.groupBy("asin").count().orderBy(col("count").desc()).show(10)