# Name- Prathamesh Namjoshi   
# Net id- PXN180025

In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1588654013111_0008,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7f80293b5a90>

In [2]:
from pyspark.sql import functions as F

In [3]:
# Load Data Set
df = spark.read\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .option("basePath", "hdfs:///hive/amazon-reviews-pds/parquet/")\
          .parquet("hdfs:///hive/amazon-reviews-pds/parquet/*")

In [4]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

In [5]:
df.columns

['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date', 'year', 'product_category']

## Excluding data before 2005

In [6]:
df_limited = df.filter(F.col("year")>2004)

## Perform aggreagtion to get high level Stats


In [8]:
from pyspark.sql.functions import countDistinct, approx_count_distinct, count, sum, mean, round
df_limited.groupBy("product_category").agg(round(mean("star_rating"),3).alias("Average-star_rating"),
                         round(sum("helpful_votes"),3).alias("Sum-helpful_votes"),
                         round(mean("helpful_votes"),3).alias("Average-helpful_votes"))\
                     .sort("product_category", ascending=False).show(20, False)

+----------------------+-------------------+-----------------+---------------------+
|product_category      |Average-star_rating|Sum-helpful_votes|Average-helpful_votes|
+----------------------+-------------------+-----------------+---------------------+
|Wireless              |3.893              |7934542          |0.879                |
|Video_DVD             |4.336              |16816126         |2.594                |
|PC                    |4.091              |10044848         |1.444                |
|Mobile_Apps           |4.035              |18098912         |2.659                |
|Digital_Video_Download|4.208              |2538073          |0.491                |
|Digital_Ebook_Purchase|4.312              |19141273         |0.998                |
|Books                 |4.364              |53565320         |2.994                |
+----------------------+-------------------+-----------------+---------------------+

# To filter multiple reviews by same user on same product

In [9]:
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
temp=df_limited.withColumn("rownum",row_number().over(Window.partitionBy("customer_id","product_id").orderBy("customer_id","product_id")))

In [10]:
Filter = temp.rownum.isin(1)
filtered=temp.where(Filter)
filtered.persist()


DataFrame[marketplace: string, customer_id: string, review_id: string, product_id: string, product_parent: string, product_title: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, review_headline: string, review_body: string, review_date: date, year: int, product_category: string, rownum: int]

In [11]:
filtered.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- rownum: integer (nullable = true)

# 1. Explore the dataset and provide analysis by product-category and year:

## 1.1 Number of reviews

In [12]:
filtered.groupby("year","product_category").agg(F.countDistinct("review_id").alias('Number_of_Review')).show(5)

+----+--------------------+----------------+
|year|    product_category|Number_of_Review|
+----+--------------------+----------------+
|2014|               Books|         3540828|
|2010|Digital_Ebook_Pur...|          102515|
|2015|               Books|         2860743|
|2013|            Wireless|         1767132|
|2014|         Mobile_Apps|         1728280|
+----+--------------------+----------------+
only showing top 5 rows

## 1.2 Number of distinct users

In [13]:
filtered.groupby("year","product_category").agg(F.countDistinct("customer_id").alias('Number_of_distinct_Users')) \
.sort("year",ascending=True).show(10)

+----+--------------------+------------------------+
|year|    product_category|Number_of_distinct_Users|
+----+--------------------+------------------------+
|2005|            Wireless|                   10585|
|2005|Digital_Ebook_Pur...|                      17|
|2005|           Video_DVD|                   95195|
|2005|Digital_Video_Dow...|                       6|
|2005|               Books|                  290584|
|2005|                  PC|                   15780|
|2006|            Wireless|                   17984|
|2006|Digital_Ebook_Pur...|                      33|
|2006|           Video_DVD|                  105660|
|2006|                  PC|                   23174|
+----+--------------------+------------------------+
only showing top 10 rows

## 1.3. Average and Median review stars


In [14]:
filtered.groupby("year","product_category").agg(F.avg("star_rating").alias('Average-Ratings'),
                                         F.expr('percentile_approx(star_rating,0.5)').alias('Median-Ratings')) \
.sort("year","product_category",ascending=True).show()

+----+--------------------+------------------+--------------+
|year|    product_category|   Average-Ratings|Median-Ratings|
+----+--------------------+------------------+--------------+
|2005|               Books| 4.148046708559013|             5|
|2005|Digital_Ebook_Pur...|3.5789473684210527|             4|
|2005|Digital_Video_Dow...|              3.75|             4|
|2005|                  PC| 3.616240022020369|             4|
|2005|           Video_DVD| 4.004813687570013|             5|
|2005|            Wireless| 3.414026193493874|             4|
|2006|               Books| 4.196543242891375|             5|
|2006|Digital_Ebook_Pur...| 4.027777777777778|             5|
|2006|Digital_Video_Dow...|3.6324324324324326|             4|
|2006|                  PC| 3.717627814972611|             4|
|2006|           Video_DVD|4.0803634706894805|             5|
|2006|            Wireless|3.5095432341239867|             4|
|2007|               Books| 4.258164000084097|             5|
|2007|Di

## 1.4. Percentiles of length of the review. Use the following percentiles: [0.1, 0.25, 0.5, 0.75,
## 0.9, 0.95]

In [15]:
from pyspark.sql.functions import stddev_pop,min,max,length,count,mean
dataframeL1=filtered.withColumn('length',length(df.review_body))
dataframeL2=dataframeL1.groupby("year","product_category").agg(F.avg("length").alias('average_of_Reviews'))
columnName = "average_of_Reviews"
quantileProbs = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
Error = 0.01
dataframeL2.stat.approxQuantile("average_of_Reviews",quantileProbs,Error)

[188.95367161124744, 349.1557032255313, 586.5676289328576, 845.374313191258, 961.9873203920449, 1170.0692938515842]

## 1.5. Percentiles for number of reviews per product. For example, 10% of books got 5 or less## reviews. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]

In [16]:
from pyspark.sql.functions import stddev_pop, min, max,length,count, mean
df1=filtered.groupby("year","product_id","product_category").agg(F.countDistinct("review_id").alias('Number_of_Review'))

quantile = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
Err = 0.01
df1.stat.approxQuantile("Number_of_Review", quantile,Err)

[1.0, 1.0, 1.0, 3.0, 9.0, 21.0]

## 1.6. Identify week number (each year has 52 weeks) for each year and product category## with most positive reviews (4 and 5 star)

In [18]:
from pyspark.sql.functions import *
rating4 = filtered.star_rating.isin(4)
rating5 = filtered.star_rating.isin(5)
dataframef_Q6=filtered.select("product_category","year","review_date") \
.withColumn("week_number",weekofyear("review_date")).where(rating4 | rating5)
dataf_2 = dataframef_Q6.groupby("product_category","year","week_number").agg(F.countDistinct("week_number").alias("count"))
dataf_2.drop('count').show()

+--------------------+----+-----------+
|    product_category|year|week_number|
+--------------------+----+-----------+
|           Video_DVD|2015|         12|
|               Books|2011|         36|
|Digital_Ebook_Pur...|2015|         16|
|           Video_DVD|2011|         37|
|Digital_Ebook_Pur...|2014|         11|
|               Books|2008|         48|
|               Books|2007|         37|
|Digital_Ebook_Pur...|2015|         11|
|         Mobile_Apps|2013|          2|
|Digital_Ebook_Pur...|2013|         49|
|Digital_Ebook_Pur...|2013|         19|
|               Books|2014|          6|
|               Books|2009|         36|
|                  PC|2010|         20|
|               Books|2010|          3|
|           Video_DVD|2009|          9|
|               Books|2010|         12|
|               Books|2006|         12|
|                  PC|2012|         24|
|         Mobile_Apps|2011|         32|
+--------------------+----+-----------+
only showing top 20 rows

# 2. Provide detailed analysis of "Digital eBook Purchase" versus Books.


## 2.1. Using Spark Pivot functionality, produce DataFrame with following columns:

In [19]:
to_pivot=['Digital_Ebook_Purchase','Books']
pivoted=filtered.groupBy("year",F.month(F.col("review_date"))).pivot("product_category",to_pivot)\
 .agg((F.count("review_id")).alias("count_of_reviews"),
 F.round(F.mean("star_rating"),3).alias("Avg_star_rating")).sort("year","month(review_date)",ascending=True).show()


+----+------------------+---------------------------------------+--------------------------------------+----------------------+---------------------+
|year|month(review_date)|Digital_Ebook_Purchase_count_of_reviews|Digital_Ebook_Purchase_Avg_star_rating|Books_count_of_reviews|Books_Avg_star_rating|
+----+------------------+---------------------------------------+--------------------------------------+----------------------+---------------------+
|2005|                 1|                                      1|                                   5.0|                 40428|                4.121|
|2005|                 2|                                   null|                                  null|                 33722|                4.125|
|2005|                 3|                                      2|                                   4.5|                 38878|                4.122|
|2005|                 4|                                      1|                                   

## 2.2 Produce two graphs to demonstrate aggregations from #1:

Graphs have been produced in the report

## 3. Identify similar products (books) in both categories. Use "product_title" to match products. To account for potential differences in naming of products, compare titles after stripping spaces and converting to lower case.

## 3.1. Is there a difference in average rating for the similar books in digital and printed form?

In [22]:
prodfilter=['Digital_Ebook_Purchase']
digital_e_book=filtered.groupBy("product_title","product_category")\
 .agg((F.count("review_id")).alias("dig_book_count_of_reviews"),
 F.round(F.mean("star_rating"),3).alias("dig_book_Avg_star_rating")).filter(F.col("product_category").isin(prodfilter))

In [23]:
trimmeed_dig_book=digital_e_book.select(F.lower(F.trim(F.col("product_title"))).alias("product_title"),F.col("dig_book_count_of_reviews") \
                      ,F.col("dig_book_Avg_star_rating"))

In [24]:
var=['Books']
book=filtered.groupBy("product_title","product_category")\
 .agg((F.count("review_id")).alias("book_count_of_reviews"),
 F.round(F.mean("star_rating"),3).alias("book_Avg_star_rating")).filter(F.col("product_category").isin(var))

In [25]:
trimmed_book=book.select(F.lower(F.trim(F.col("product_title"))).alias("product_title"),F.col("book_count_of_reviews") \
                      ,F.col("book_Avg_star_rating"))

In [26]:
joinExpression = trimmed_book["product_title"] == trimmeed_dig_book["product_title"]
joinType = "inner"
final=trimmed_book.join(trimmeed_dig_book, joinExpression, joinType)

final.show()

+--------------------+---------------------+--------------------+--------------------+-------------------------+------------------------+
|       product_title|book_count_of_reviews|book_Avg_star_rating|       product_title|dig_book_count_of_reviews|dig_book_Avg_star_rating|
+--------------------+---------------------+--------------------+--------------------+-------------------------+------------------------+
|"rays of light": ...|                    2|                 5.0|"rays of light": ...|                        1|                     5.0|
|"the siege of khe...|                   19|               4.316|"the siege of khe...|                      156|                   3.327|
|          'dem bon'z|                    4|                 5.0|          'dem bon'z|                        2|                     5.0|
|   0400 roswell time|                    1|                 5.0|   0400 roswell time|                        6|                   3.667|
|10 smart things g...|            

In [110]:
star=F.col("book_Avg_star_rating")>4
final.where(star).count()

276595

In [112]:
star1=F.col("dig_book_Avg_star_rating")>4
final.where(star1).count()

245529

## We can see that printed book has got more number of higher rating i.e count of more than 4 star ratings is higher for printed books as compared to digital book star ratings.

# 4. Using provided LDA starter notebook, perform LDA topic modeling for the reviews in Digital_Ebook_Purchase and Books categories. Consider reviews for the January of 2015 only.

In [27]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import CountVectorizer, IDF,RegexTokenizer, Tokenizer
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
import re
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

In [29]:

df_ml = filtered.filter((F.col("product_category")=="Digital_Ebook_Purchase") | (F.col("product_category")=="Books") \
                   & (F.col("year")==2015) \
                   & (F.col("review_date")<'2015-02-01')
                   & (F.col("star_rating")>3))

In [30]:
df1 = df_ml.withColumn('review_text', 
                       F.concat(F.col('review_headline'),F.lit(' '), F.col('review_body')))
corpus =df1.select('review_text')

# This will return a new DF with all the columns + id
corpus_df = corpus.withColumn("id", F.monotonically_increasing_id())
# Remove records with no review text
corpus_df = corpus_df.dropna()

corpus_df.persist()
print('Corpus size:', corpus_df.count())
corpus_df.show(5)

corpus_df.printSchema()

Corpus size: 18287530
+--------------------+---+
|         review_text| id|
+--------------------+---+
|Nice Story but ve...|  0|
|Beautiful and hea...|  1|
|Worth The Wait. T...|  2|
|written before. I...|  3|
|Entertaining Rev....|  4|
+--------------------+---+
only showing top 5 rows

root
 |-- review_text: string (nullable = true)
 |-- id: long (nullable = false)

In [31]:
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())
'''
tokenized_df = tokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words").withColumn("tokens", countTokens(col("words"))).show() 
'''
regexTokenizer = RegexTokenizer(inputCol="review_text", 
                                outputCol="words",pattern="\\w+", gaps=False)
# alternatively, pattern="\\w+", gaps(False) pattern="\\W"

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words") \
    .withColumn("tokens", countTokens(F.col("words"))).show()

+--------------------+--------------------+------+
|         review_text|               words|tokens|
+--------------------+--------------------+------+
|Nice Story but ve...|[nice, story, but...|    40|
|Beautiful and hea...|[beautiful, and, ...|    76|
|Worth The Wait. T...|[worth, the, wait...|    77|
|written before. I...|[written, before,...|   327|
|Entertaining Rev....|[entertaining, re...|    51|
|Fastest 600 page ...|[fastest, 600, pa...|    45|
|Amazing It is a c...|[amazing, it, is,...|    27|
|Huge impact Profo...|[huge, impact, pr...|    27|
|LOVED LOVED LOVED...|[loved, loved, lo...|    25|
|Five Stars very h...|[five, stars, ver...|     4|
|This is an awesom...|[this, is, an, aw...|    26|
|Kept me intereste...|[kept, me, intere...|    29|
|So many of these ...|[so, many, of, th...|    50|
|she is an incredi...|[she, is, an, inc...|    43|
|Thoroughly enjoye...|[thoroughly, enjo...|    42|
|This book has mad...|[this, book, has,...|    39|
|Not as good as th...|[not, as,

## 2. Adding stop words to the standard list as needed. 

In [32]:
stop_words = ['a', 'about', 'above', 'across', 'after', 'afterwards', 'again', 'against', 'all', 'almost', 'alone', 'along', 'already', 'also', 'although', 'always', 'am', 'among', 'amongst', 'amoungst', 'amount', 'an', 'and', 'another', 'any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere', 'are', 'around', 'as', 'at', 'back', 'be', 'became', 'because', 'become', 'becomes', 'becoming', 'been', 'before', 'beforehand', 'behind', 'being', 'below', 'beside', 'besides', 'between', 'beyond', 'bill', 'both', 'bottom', 'but', 'by', 'call', 'can', 'cannot', 'cant', 'co', 'computer', 'con', 'could', 'couldnt', 'cry', 'de', 'describe', 'detail', 'do', 'done', 'down', 'due', 'during', 'each', 'eg', 'eight', 'either', 'eleven', 'else', 'elsewhere', 'empty', 'enough', 'etc', 'even', 'ever', 'every', 'everyone', 'everything', 'everywhere', 'except', 'few', 'fifteen', 'fify', 'fill', 'find', 'fire', 'first', 'five', 'for', 'former', 'formerly', 'forty', 'found', 'four', 'from', 'front', 'full', 'further', 'get', 'give', 'go', 'had', 'has', 'hasnt', 'have', 'he', 'hence', 'her', 'here', 'hereafter', 'hereby', 'herein', 'hereupon', 'hers', 'herself', 'him', 'himself', 'his', 'how', 'however', 'hundred', 'i', 'ie', 'if', 'in', 'inc', 'indeed', 'interest', 'into', 'is', 'it', 'its', 'itself', 'keep', 'last', 'latter', 'latterly', 'least', 'less', 'ltd', 'made', 'many', 'may', 'me', 'meanwhile', 'might', 'mill', 'mine', 'more', 'moreover', 'most', 'mostly', 'move', 'much', 'must', 'my', 'myself', 'name', 'namely', 'neither', 'never', 'nevertheless', 'next', 'nine', 'no', 'nobody', 'none', 'noone', 'nor', 'not', 'nothing', 'now', 'nowhere', 'of', 'off', 'often', 'on', 'once', 'one', 'only', 'onto', 'or', 'other', 'others', 'otherwise', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'part', 'per', 'perhaps', 'please', 'put', 'rather', 're', 'same', 'see', 'seem', 'seemed', 'seeming', 'seems', 'serious', 'several', 'she', 'should', 'show', 'side', 'since', 'sincere', 'six', 'sixty', 'so', 'some', 'somehow', 'someone', 'something', 'sometime', 'sometimes', 'somewhere', 'still', 'such', 'system', 'take', 'ten', 'than', 'that', 'the', 'their', 'them', 'themselves', 'then', 'thence', 'there', 'thereafter', 'thereby', 'therefore', 'therein', 'thereupon', 'these', 'they', 'thick', 'thin', 'third', 'this', 'those', 'though', 'three', 'through', 'throughout', 'thru', 'thus', 'to', 'together', 'too', 'top', 'toward', 'towards', 'twelve', 'twenty', 'two', 'un', 'under', 'until', 'up', 'upon', 'us', 'very', 'via', 'was', 'we', 'well', 'were', 'what', 'whatever', 'when', 'whence', 'whenever', 'where', 'whereafter', 'whereas', 'whereby', 'wherein', 'whereupon', 'wherever', 'whether', 'which', 'while', 'whither', 'who', 'whoever', 'whole', 'whom', 'whose', 'why', 'will', 'with', 'within', 'without', 'would', 'yet', 'you', 'your', 'yours', 'yourself', 'yourselves', '']
stop_words = stop_words + ['br','book','34','y','m','ich','zu']

In [33]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = stop_words

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

+--------------------+---+--------------------+--------------------+
|         review_text| id|               words|            filtered|
+--------------------+---+--------------------+--------------------+
|Nice Story but ve...|  0|[nice, story, but...|[nice, story, rus...|
|Beautiful and hea...|  1|[beautiful, and, ...|[beautiful, heart...|
|Worth The Wait. T...|  2|[worth, the, wait...|[worth, wait, sto...|
|written before. I...|  3|[written, before,...|[written, really,...|
|Entertaining Rev....|  4|[entertaining, re...|[entertaining, re...|
+--------------------+---+--------------------+--------------------+
only showing top 5 rows

+--------------------+---+--------------------+--------------------+--------------------+
|         review_text| id|               words|            filtered|       filtered_more|
+--------------------+---+--------------------+--------------------+--------------------+
|Nice Story but ve...|  0|[nice, story, but...|[nice, story, rus...|[nice, story, ru

In [34]:
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

+--------------------+--------------------+---+
|       filtered_more|            features| id|
+--------------------+--------------------+---+
|[nice, story, rus...|(10000,[0,1,4,5,7...|  0|
|[beautiful, heart...|(10000,[1,5,9,12,...|  1|
|[worth, wait, sto...|(10000,[1,28,57,8...|  2|
|[written, really,...|(10000,[0,4,5,9,1...|  3|
|[entertaining, re...|(10000,[0,22,37,4...|  4|
+--------------------+--------------------+---+
only showing top 5 rows

Records in the DF: 18287530

## Performing LDA

In [36]:
#k=10 means 10 words per topic
lda = LDA(k=10, maxIter=10)
model = lda.fit(countVectors)



In [37]:
topics = model.describeTopics(5)   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")

topic:  0
----------
story
love
characters
series
read
----------
topic:  1
----------
good
read
story
great
really
----------
topic:  2
----------
read
series
books
love
great
----------
topic:  3
----------
story
life
read
love
world
----------
topic:  4
----------
read
story
good
great
characters
----------
topic:  5
----------
read
like
great
time
interesting
----------
topic:  6
----------
read
characters
reading
story
great
----------
topic:  7
----------
great
read
good
life
information
----------
topic:  8
----------
love
story
like
really
life
----------
topic:  9
----------
read
author
good
books
like
----------

### We can see that for ratings greater than 3 there are more of positive words

## Topic modelling for reviews with stars less than 3

In [53]:
df_ml1 = filtered.filter((F.col("product_category")=="Digital_Ebook_Purchase") | (F.col("product_category")=="Books") \
                   & (F.col("year")==2015) \
                   & (F.col("review_date")<'2015-02-01')
                   & (F.col("star_rating")<3))

In [54]:

df1 = df_ml1.withColumn('review_text', 
                       F.concat(F.col('review_headline'),F.lit(' '), F.col('review_body')))
corpus =df1.select('review_text')

# This will return a new DF with all the columns + id
corpus_df = corpus.withColumn("id", F.monotonically_increasing_id())
# Remove records with no review text
corpus_df = corpus_df.dropna()

corpus_df.persist()
print('Corpus size:', corpus_df.count())
corpus_df.show(5)

corpus_df.printSchema()

Corpus size: 17950046
+--------------------+---+
|         review_text| id|
+--------------------+---+
|Nice Story but ve...|  0|
|Beautiful and hea...|  1|
|Worth The Wait. T...|  2|
|written before. I...|  3|
|Entertaining Rev....|  4|
+--------------------+---+
only showing top 5 rows

root
 |-- review_text: string (nullable = true)
 |-- id: long (nullable = false)

In [55]:
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())
'''
tokenized_df = tokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words").withColumn("tokens", countTokens(col("words"))).show() 
'''
regexTokenizer = RegexTokenizer(inputCol="review_text", 
                                outputCol="words",pattern="\\w+", gaps=False)
# alternatively, pattern="\\w+", gaps(False) pattern="\\W"

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words") \
    .withColumn("tokens", countTokens(F.col("words"))).show()

+--------------------+--------------------+------+
|         review_text|               words|tokens|
+--------------------+--------------------+------+
|Nice Story but ve...|[nice, story, but...|    40|
|Beautiful and hea...|[beautiful, and, ...|    76|
|Worth The Wait. T...|[worth, the, wait...|    77|
|written before. I...|[written, before,...|   327|
|Entertaining Rev....|[entertaining, re...|    51|
|Fastest 600 page ...|[fastest, 600, pa...|    45|
|Amazing It is a c...|[amazing, it, is,...|    27|
|Huge impact Profo...|[huge, impact, pr...|    27|
|LOVED LOVED LOVED...|[loved, loved, lo...|    25|
|Five Stars very h...|[five, stars, ver...|     4|
|This is an awesom...|[this, is, an, aw...|    26|
|Kept me intereste...|[kept, me, intere...|    29|
|So many of these ...|[so, many, of, th...|    50|
|she is an incredi...|[she, is, an, inc...|    43|
|Thoroughly enjoye...|[thoroughly, enjo...|    42|
|Not as good as th...|[not, as, good, a...|    33|
|Writer's Block Wo...|[writer, 

In [56]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = stop_words

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

+--------------------+---+--------------------+--------------------+
|         review_text| id|               words|            filtered|
+--------------------+---+--------------------+--------------------+
|Nice Story but ve...|  0|[nice, story, but...|[nice, story, rus...|
|Beautiful and hea...|  1|[beautiful, and, ...|[beautiful, heart...|
|Worth The Wait. T...|  2|[worth, the, wait...|[worth, wait, sto...|
|written before. I...|  3|[written, before,...|[written, really,...|
|Entertaining Rev....|  4|[entertaining, re...|[entertaining, re...|
+--------------------+---+--------------------+--------------------+
only showing top 5 rows

+--------------------+---+--------------------+--------------------+--------------------+
|         review_text| id|               words|            filtered|       filtered_more|
+--------------------+---+--------------------+--------------------+--------------------+
|Nice Story but ve...|  0|[nice, story, but...|[nice, story, rus...|[nice, story, ru

In [57]:
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

+--------------------+--------------------+---+
|       filtered_more|            features| id|
+--------------------+--------------------+---+
|[nice, story, rus...|(10000,[0,1,4,5,7...|  0|
|[beautiful, heart...|(10000,[1,5,9,12,...|  1|
|[worth, wait, sto...|(10000,[1,27,56,8...|  2|
|[written, really,...|(10000,[0,4,5,9,1...|  3|
|[entertaining, re...|(10000,[0,22,37,4...|  4|
+--------------------+--------------------+---+
only showing top 5 rows

Records in the DF: 17950046

In [58]:
countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

Records in the DF: 17950046

# Performing LDA

In [59]:
lda = LDA(k=10, maxIter=5)
model = lda.fit(countVectors)

In [60]:
topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")

topic:  0
----------
story
good
characters
read
series
love
author
time
like
great
----------
topic:  1
----------
good
read
story
great
stars
really
like
love
characters
series
----------
topic:  2
----------
read
series
books
great
like
love
reading
loved
story
wait
----------
topic:  3
----------
story
read
love
characters
written
like
great
life
novel
way
----------
topic:  4
----------
read
good
like
great
books
reading
new
story
easy
author
----------
topic:  5
----------
read
great
time
like
reading
good
history
life
know
stars
----------
topic:  6
----------
read
love
loved
series
characters
story
reading
great
wait
books
----------
topic:  7
----------
great
read
reading
life
recommend
story
god
series
good
stars
----------
topic:  8
----------
story
love
really
life
like
read
characters
loved
stars
know
----------
topic:  9
----------
read
good
author
characters
enjoyed
story
books
like
great
reading
----------

# 4.4. Does topic modeling provides good approximation to number of stars given in the review?

## We can see that there are some positive words even with reviews with star ratings less than 3. In this case topic modelling might not be so effective