In [None]:
import pyspark
import os
from __future__ import print_function
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, Normalizer
from pyspark import SparkContext, SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array, struct, split, explode, udf, col, collect_list
from pyspark.sql import functions as f
from pyspark.sql.functions import concat, col, lit, monotonically_increasing_id
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans, LDA, LDAModel

In [7]:
sc = SparkContext.getOrCreate()
ss = SparkSession(sc)

In [1]:
pyspark_submit_args = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.executor.memory", "5g") \
    .config("spark.driver.memory", "5g") \
    .config('spark.mongodb.input.uri', "mongodb://34.220.42.106/monskr.review1")\
    .getOrCreate()

In [4]:
spark2 = SparkSession \
    .builder \
    .appName("myApp") \
    .config('spark.mongodb.input.uri', "mongodb://34.220.42.106/monskr.meta1")\
    .getOrCreate()

# Loading the Datasets 

In [3]:
# Loading first dataset corresponding to the reviews
review = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [5]:
# Loading the meta dataset including the titles and the description of the books
meta = spark2.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [55]:
print(review.columns)
print(meta.columns)

['_id', 'asin', 'helpful', 'overall', 'reviewText', 'reviewTime', 'reviewerID', 'reviewerName', 'summary', 'unixReviewTime']
['asin', 'title', 'description']


In [None]:
# Filtering the data

In [9]:
# extract the data subset coresponding to the 2012 year
df2012 = review.where(review.reviewTime.contains('2012'))
# extract the data subset coresponding to the 2013 year
df2013 = review.where(review.reviewTime.contains('2013'))
# extract the data subset coresponding to the 2014 year
df2014 = review.where(review.reviewTime.contains('2014'))
# Join the three sub-datasets
reviews = df2012.union(df2013).union(df2014)  

In [10]:
# Concatinating the review Text and the description text together in the review data
reviews = reviews.select('asin',
                        concat(col("reviewText"),lit(""),col("summary")).alias('textss'))

In [11]:
# Selecting the columns needed from meta dataset
meta = meta.drop_duplicates(['asin']).select('asin', 'title', 'description') #dropped categories

In [12]:
#Getting all the reviews per book_id (asin)
reviews_gb = reviews.groupBy("asin").agg(f.concat_ws(" ", f.collect_list('textss')).alias('textss'))

In [13]:
#Joining the two data sets
df = reviews_gb.join(meta, 'asin', 'inner').cache()

In [14]:
#Again joining the text columns from the meta data afyer the join
df = df.select('asin','title',
                concat(col("textss"),lit(""),col("description")).alias('reviewText'))

In [16]:
# we delete the columns with no texts entry
dff = df.na.drop(subset=["reviewText"]) 

In [17]:
# Create a new cilumn for indexing
dff = dff.select("*").withColumn("id", monotonically_increasing_id())

In [18]:
# Final data set ready for modeling!!
dff.show(5)

+----------+--------------------+--------------------+---+
|      asin|               title|          reviewText| id|
+----------+--------------------+--------------------+---+
|0670869961|     The Pasta Bible|This is a wonderf...|  0|
|0671019880|Upon a Midnight C...|This is a book fi...|  1|
|0671617478|           Red Baker|I'm not sure how ...|  2|
|0671732188|Bestial: The Sava...|Interesting how t...|  3|
|0671736450|The Way of Energy...|I can't remember ...|  4|
+----------+--------------------+--------------------+---+
only showing top 5 rows



# Tokenization 

In [19]:
# Create a Tokenizer and apply it to the data
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words") 
df_w_words = tokenizer.transform(dff)
# Remove the stop words
remover = StopWordsRemover(inputCol="words", outputCol="mf_words")  
df_w_mfwords = remover.transform(df_w_words)
df_w_mfwords.show(3)

+----------+--------------------+--------------------+---+--------------------+--------------------+
|      asin|               title|          reviewText| id|               words|            mf_words|
+----------+--------------------+--------------------+---+--------------------+--------------------+
|0670869961|     The Pasta Bible|This is a wonderf...|  0|[this, is, a, won...|[wonderful, book,...|
|0671019880|Upon a Midnight C...|This is a book fi...|  1|[this, is, a, boo...|[book, filled, sh...|
|0671617478|           Red Baker|I'm not sure how ...|  2|[i'm, not, sure, ...|[sure, (or, why),...|
+----------+--------------------+--------------------+---+--------------------+--------------------+
only showing top 3 rows



# TFIDF

In [20]:
# Apply the TFIDF algorithm to get 
hashingTF = HashingTF(inputCol="mf_words", outputCol="tf", numFeatures=200) #need to know how to choose the numFeatures
tf = hashingTF.transform(df_w_mfwords)  # reviews_w_feature  == tf
idf = IDF(inputCol="tf", outputCol="features").fit(tf)
tfidf = idf.transform(tf)
tfidf.show(3)

+----------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|      asin|               title|          reviewText| id|               words|            mf_words|                  tf|            features|
+----------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+
|0670869961|     The Pasta Bible|This is a wonderf...|  0|[this, is, a, won...|[wonderful, book,...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|
|0671019880|Upon a Midnight C...|This is a book fi...|  1|[this, is, a, boo...|[book, filled, sh...|(200,[0,1,3,5,8,1...|(200,[0,1,3,5,8,1...|
|0671617478|           Red Baker|I'm not sure how ...|  2|[i'm, not, sure, ...|[sure, (or, why),...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|
+----------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+

In [56]:
 tfidf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- id: long (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mf_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
# persist the features data frame to save time during clustering
tfidf.cache()

# Clustering

## Kmeans

In [29]:
# Normalize the features
normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
l2NormData = normalizer.transform(tfidf)

In [None]:
# Apply kmeans for 10 topics and 50 as maximum iterartin numbers
kmeans = KMeans().setK(10).setMaxIter(50)
km_model = kmeans.fit(l2NormData)
clustersTable = km_model.transform(l2NormData)

In [58]:
clustersTable.printSchema()

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- id: long (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mf_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- normFeatures: vector (nullable = true)
 |-- prediction: integer (nullable = false)



In [39]:
clustersTable.show(5)

+----------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|      asin|               title|          reviewText| id|               words|            mf_words|                  tf|            features|        normFeatures|prediction|
+----------+--------------------+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|0670869961|     The Pasta Bible|This is a wonderf...|  0|[this, is, a, won...|[wonderful, book,...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|(200,[0,1,2,3,4,5...|         0|
|0671019880|Upon a Midnight C...|This is a book fi...|  1|[this, is, a, boo...|[book, filled, sh...|(200,[0,1,3,5,8,1...|(200,[0,1,3,5,8,1...|(200,[0,1,3,5,8,1...|         0|
|0671617478|           Red Baker|I'm not sure how ...|  2|[i'm, not, sure, ...|[sure, (or, why),...|(200,[0,1,2,3,4,5...|(200

In [30]:
# Explore the number of books per topic for k=10
clustersTable.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  208|
|         6| 1367|
|         3|   79|
|         5|  582|
|         9|   42|
|         4|   17|
|         8|13413|
|         7|   11|
|         2| 3951|
|         0|60931|
+----------+-----+



In [52]:
# Aplly kmeans with 6 categories
kmeans = KMeans().setK(6).setMaxIter(50)
km_model = kmeans.fit(l2NormData)
clustersTable = km_model.transform(l2NormData)

In [53]:
# Explore the number of books per topic for k=6
clustersTable.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  192|
|         3|   20|
|         5|11848|
|         4| 3007|
|         2|  893|
|         0|64641|
+----------+-----+



## LDA

In [47]:
# Apply LDA for k=10
lda = LDA(k=10)
model = lda.fit(l2NormData)

In [51]:
# Explore the number of books per topic for k=10
model.describeTopics().rdd.map(lambda x: (x[0],sum(x[1]))).toDF().withColumnRenamed("_1","topic").withColumnRenamed("_2","count").show()

+-----+-----+
|topic|count|
+-----+-----+
|    0|  973|
|    1|  809|
|    2| 1178|
|    3|  871|
|    4| 1046|
|    5| 1009|
|    6| 1297|
|    7|  733|
|    8|  878|
|    9|  632|
+-----+-----+



In [None]:
#Apply LDA for k=6
lda = LDA(k=6)
model = lda.fit(l2NormData)

In [60]:
# Explore the number of books per topic for k=6
model.describeTopics().rdd.map(lambda x: (x[0],sum(x[1]))).toDF().withColumnRenamed("_1","topic").withColumnRenamed("_2","count").show()

+-----+-----+
|topic|count|
+-----+-----+
|    0|  906|
|    1|  819|
|    2| 1171|
|    3|  869|
|    4| 1046|
|    5| 1066|
+-----+-----+



In [46]:
model.topicsMatrix()

DenseMatrix(200, 10, [4447.7533, 4066.6514, 6946.4871, 2954.0353, 2727.0803, 3766.1738, 3913.7805, 3815.6799, ..., 3141.578, 1696.8065, 3513.166, 3303.174, 2629.1893, 2707.0874, 3013.417, 3145.8837], 0)