In [2]:
from pyspark import Row
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf, concat, lit, struct
from pyspark.sql.functions import monotonically_increasing_id, size, length
import re
from string import digits
from pyspark.sql import Window
from pyspark.sql import functions as F
from nltk.corpus import stopwords
import nltk
import pandas as pd
from pyspark.ml.feature import Tokenizer,CountVectorizer 
from nltk.stem.porter import *
from pyspark.ml.clustering import LDA
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint,Vectors,array
from operator import itemgetter
from textblob import TextBlob

In [3]:
#Creating and Spark and SQl Context
sc = SparkContext()
sqlContext = SQLContext(sc)

In [4]:
#Load the json file
amazon_reviews = sqlContext.read.json('/home/ubuntu/Cell_Phones_and_Accessories_5.json')
amazon_reviews.first()
amazon_reviews.registerTempTable("reviews")
sqlContext.sql("SELECT * FROM reviews LIMIT 2").toPandas()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,120401325X,"[0, 0]",4.0,They look good and stick good! I just don't li...,"05 21, 2014",A30TL5EWN6DFXT,christina,Looks Good,1400630400
1,120401325X,"[0, 0]",5.0,These stickers work like the review says they ...,"01 14, 2014",ASY55RVNIL0UD,emily l.,Really great product.,1389657600


In [5]:
amazon_reviews.limit(2).toPandas()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime
0,120401325X,"[0, 0]",4.0,They look good and stick good! I just don't li...,"05 21, 2014",A30TL5EWN6DFXT,christina,Looks Good,1400630400
1,120401325X,"[0, 0]",5.0,These stickers work like the review says they ...,"01 14, 2014",ASY55RVNIL0UD,emily l.,Really great product.,1389657600


In [6]:
#Helpful column represent the nunmber of people who found the review helpful upon the number of people who read the review
#Calculating Helpfulness
amazon_reviews = amazon_reviews.withColumn('helpfulness',(amazon_reviews.helpful[0]/amazon_reviews.helpful[1]))
amazon_reviews = amazon_reviews.fillna(0)

In [7]:
#Summary of the data
amazon_reviews.describe().show()

+-------+------------------+--------------------+-------------------+
|summary|           overall|      unixReviewTime|        helpfulness|
+-------+------------------+--------------------+-------------------+
|  count|            194439|              194439|             194439|
|   mean| 4.129912208970422|1.3687136230571027E9|0.19305457870106593|
| stddev|1.2224991825084577|3.2300324670308113E7| 0.3724174398848259|
|    min|               1.0|           982800000|                0.0|
|    max|               5.0|          1406073600|                1.0|
+-------+------------------+--------------------+-------------------+



In [8]:
#Understanding the proportion of reviews which is helpful to understand the skewness in the dataset
helpful_reviews = amazon_reviews.filter(amazon_reviews.helpfulness > 0).count()

In [9]:
print ("Propotion of reviews having helpfulness greater than zero is" + " " + str(helpful_reviews/float(194439)))

Propotion of reviews having helpfulness greater than zero is 0.22444057005024712


In [10]:
# Creating a Unique ID for each review
amazon_reviews = amazon_reviews.withColumn("id", monotonically_increasing_id())
amazon_reviews.limit(2).toPandas()

Unnamed: 0,asin,helpful,overall,reviewText,reviewTime,reviewerID,reviewerName,summary,unixReviewTime,helpfulness,id
0,120401325X,"[0, 0]",4.0,They look good and stick good! I just don't li...,"05 21, 2014",A30TL5EWN6DFXT,christina,Looks Good,1400630400,0.0,0
1,120401325X,"[0, 0]",5.0,These stickers work like the review says they ...,"01 14, 2014",ASY55RVNIL0UD,emily l.,Really great product.,1389657600,0.0,1


In [11]:
#Removing special characters and digits from the review
def remove_spl_numeric(review):
    review = review.replace("'","").lower()
    cleanString = re.sub('\W+',' ', review )
    remove_digits = str.maketrans("","",digits)
    res = cleanString.translate(remove_digits)
    return res

review_clean = udf(remove_spl_numeric,StringType())
amazon_reviews = amazon_reviews.withColumn("reviewText_clean",review_clean(amazon_reviews.reviewText))

amazon_reviews.select("reviewText_clean").show(4,False)
    
  

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|reviewText_clean                                                                                                                                                                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|they look good and stick good i just dont like the rounded shape because i was always bumping it and siri kept popping up and it was irritating i just wont buy a product like this aga

In [13]:
# Calculate the number of reviews for each product
product_count = amazon_reviews.select("asin").rdd.map(lambda asin:(asin,1)).reduceByKey(lambda a,b:a+b).toDF()

In [14]:
product_count.printSchema()

root
 |-- _1: struct (nullable = true)
 |    |-- asin: string (nullable = true)
 |-- _2: long (nullable = true)



In [15]:
def extract_one(asin):
    return asin[0]

extract = udf(extract_one,StringType())
product_count = product_count.withColumn("_1",extract(product_count._1)).withColumnRenamed("_1","asin")

In [16]:
product_count = product_count.withColumnRenamed("_2","count")

In [17]:
#Create a cummulative sum column to identify products with ample amount of reviews
product_count.registerTempTable("prod_count")
product_count = sqlContext.sql("SELECT *,sum(count) OVER (ORDER BY COUNT DESC ROWS UNBOUNDED PRECEDING) AS CUM_SUM FROM prod_count")
product_count.limit(10).toPandas()

Unnamed: 0,asin,count,CUM_SUM
0,B005SUHPO6,837,837
1,B0042FV2SI,694,1531
2,B008OHNZI0,657,2188
3,B009RXU59C,636,2824
4,B000S5Q9CA,628,3452
5,B007FHX9OK,557,4009
6,B0073FCPSK,526,4535
7,B008DJIIG8,510,5045
8,B0088LYCZC,488,5533
9,B0015RB39O,466,5999


In [18]:
#Calculate the percentage contibution of total reviews by product
product_count = product_count.withColumn("per_reviews",(product_count.CUM_SUM/float(194439)))
product_count.limit(2).toPandas()

Unnamed: 0,asin,count,CUM_SUM,per_reviews
0,B005SUHPO6,837,837,0.004305
1,B0042FV2SI,694,1531,0.007874


In [19]:
product_count.count()

10429

In [20]:
# Filter for top products contributing to ~70% of total reviews
product_count = product_count.filter(product_count.per_reviews <= 0.70)

In [21]:
product_count.count()

3104

## 30% of total products contribute to 70% of total reviews

In [22]:
#Filtering for the required columns
amazon_reviews = amazon_reviews.select("id","asin","reviewText_clean","helpfulness","reviewText")

#Stopwords
#Removing the stop words from the reviews
nltk.download('stopwords')
cachedStopWords = stopwords.words("english")

#Stopwords removal
def stop_words_removal(review):
    review = ' '.join([word for word in review.split() if word not in cachedStopWords])
    return review
stop_words = udf(stop_words_removal,StringType())
amazon_reviews = amazon_reviews.withColumn("review_wo_stop",stop_words(amazon_reviews.reviewText_clean))

[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [21]:
#Tokenize
tokenizer = Tokenizer(inputCol="review_wo_stop",outputCol="review_token")
tokenized = tokenizer.transform(amazon_reviews)

#Lemmatization

#Stemming
stemmer = PorterStemmer()

#Function for stemming
def stem_words(rev_tok):
    rev_stem = []
    for word in rev_tok:
        stem_word = stemmer.stem(word)
        if len(stem_word) > 2:
            rev_stem.append(stem_word)
    return rev_stem

stem = udf(stem_words,ArrayType(StringType()))

stemmed = tokenized.withColumn("rev_stemmed",stem(tokenized.review_token))

stemmed.select("review_token","rev_stemmed").limit(2).toPandas()  


Unnamed: 0,review_token,rev_stemmed
0,"[look, good, stick, good, dont, like, rounded,...","[look, good, stick, good, dont, like, round, s..."
1,"[stickers, work, like, review, says, stick, gr...","[sticker, work, like, review, say, stick, grea..."


# Topic modelling requires data in the form of documents. Hence, collecting reviews by product to create documents. 

In [22]:
product_review = stemmed.select("asin","rev_stemmed").rdd.map(lambda row:(row[0],row[1])).reduceByKey(lambda a,b :a +b).toDF()
product_review.limit(2).toPandas()

Unnamed: 0,_1,_2
0,B001UO45SI,"[complain, someth, would, buy, long, term, use..."
1,B00IZOU1JS,"[work, flawlessli, howev, enjoy, sleep, dark, ..."


In [23]:
product_review = product_review.withColumnRenamed("_1","asin")
product_review = product_review.withColumnRenamed("_2","Reviews_combined")
product_review.count()

10429

In [24]:
product_count.count()

3104

# Merging combined reviews with top products

In [25]:
top_rev_prod = product_count.join(product_review,"asin", how = "left" )
top_rev_prod.count()

3104

In [26]:
# Creating a unique ID for each document
top_rev_prod = top_rev_prod.withColumn("id", monotonically_increasing_id())
top_rev_prod.columns

['asin', 'count', 'CUM_SUM', 'per_reviews', 'Reviews_combined', 'id']

# Create a sparse vector to be used as an input for Topic Modelling

In [27]:
# Count vectorize to create a vector which contains the count of words by each word in the vocabulary
cv = CountVectorizer(inputCol="Reviews_combined", outputCol="vectors")
count_vectorizer_model = cv.fit(top_rev_prod)
final_review = count_vectorizer_model.transform(top_rev_prod)

# Filter for ID and sparse vector

In [35]:
#Creating the corpus
corpus = final_review.select(F.col("id").cast("long"),"vectors")
corpus = corpus.withColumnRenamed("id","label")
corpus = corpus.withColumnRenamed("vectors","features")
 

In [41]:

#training data
lda = LDA(k=3, seed=12, maxIter=20)
model = lda.fit(corpus)

In [42]:
# extracting topics
topics = model.describeTopics(maxTermsPerTopic=10)


In [43]:
#extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary


In [44]:
indices_rdd = topics.select("termIndices").rdd.map(lambda x : x[0]).map(lambda y : [vocabulary[i] for i in y])

In [1]:
indices_rdd.collect()

[['case', 'screen', 'protect', 'get', 'work', 'well'], ['charge', 'battery', 'charger', 'device', 'work', 'time']]


**Filter the sentences in the review based on the identified topics**

In [47]:
topic_1 = indices_rdd.take(2)[1]

topic_2 = indices_rdd.take(3)[2]


In [50]:
topic_1

['case',
 'phone',
 'use',
 'screen',
 'one',
 'protect',
 'get',
 'like',
 'work',
 'well']

In [51]:
topic_2

['charg',
 'phone',
 'use',
 'batteri',
 'one',
 'charger',
 'devic',
 'like',
 'work',
 'time']

## Filtering for keywords from reviews and saving it in spearate columns for each review

In [18]:
# Filter for key words from the reviews
topic_1 = ['case','screen','protect','get','work','well']
topic_2 = ['charge','battery','charger','device','work','time']



In [23]:
amazon_reviews.printSchema()

root
 |-- id: long (nullable = false)
 |-- asin: string (nullable = true)
 |-- reviewText_clean: string (nullable = true)
 |-- helpfulness: double (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- review_wo_stop: string (nullable = true)



In [24]:
def assign_topic_1(rev):
    rev = rev.replace("'","").lower()
    top_1_sent = []
    
    #split it by full stop
    review_sent = rev.split('.')
    for sent in review_sent:
        cleanString = re.sub('\W+',' ', sent)
        remove_digits = str.maketrans("","",digits)
        res = cleanString.translate(remove_digits)
        res = res.split()
        len_top_1 = len(set(res).intersection(set(topic_1))) 
        if len_top_1 > 0:
            top_1_sent.append(sent)
    return top_1_sent

top_1 = udf(assign_topic_1,StringType())
amazon_reviews = amazon_reviews.withColumn("topic_1",top_1(amazon_reviews.reviewText))
            
amazon_reviews.printSchema()

root
 |-- id: long (nullable = false)
 |-- asin: string (nullable = true)
 |-- reviewText_clean: string (nullable = true)
 |-- helpfulness: double (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- review_wo_stop: string (nullable = true)
 |-- topic_1: string (nullable = true)



In [25]:
def assign_topic_2(rev):
    rev = rev.replace("'","").lower()
    top_2_sent = []
    
    #split it by full stop
    review_sent = rev.split('.')
    for sent in review_sent:
        
        cleanString = re.sub('\W+',' ', sent)
        remove_digits = str.maketrans("","",digits)
        res = cleanString.translate(remove_digits)
        res = res.split()
       
        len_top_2 = len(set(res).intersection(set(topic_2))) 
        if len_top_2 > 0:
            top_2_sent.append(sent)
    print (type(top_2_sent))
    return top_2_sent

top_2 = udf(assign_topic_2,StringType())
amazon_reviews = amazon_reviews.withColumn("topic_2",top_2(amazon_reviews.reviewText))
            
amazon_reviews.printSchema()

root
 |-- id: long (nullable = false)
 |-- asin: string (nullable = true)
 |-- reviewText_clean: string (nullable = true)
 |-- helpfulness: double (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- review_wo_stop: string (nullable = true)
 |-- topic_1: string (nullable = true)
 |-- topic_2: string (nullable = true)



In [26]:
def senti(rev):
    return TextBlob(str(rev)).sentiment.polarity
def sub(rev):
    return TextBlob(str(rev)).sentiment.subjectivity

sentiment = udf(senti,FloatType())
subjectivity = udf(sub,FloatType())
amazon_reviews = amazon_reviews.withColumn("topic_1_pol",sentiment(amazon_reviews.topic_1))
amazon_reviews = amazon_reviews.withColumn("topic_1_sub",subjectivity(amazon_reviews.topic_1))
amazon_reviews = amazon_reviews.withColumn("topic_2_pol",sentiment(amazon_reviews.topic_2))
amazon_reviews = amazon_reviews.withColumn("topic_2_sub",subjectivity(amazon_reviews.topic_2))

In [27]:
amazon_reviews.printSchema()

root
 |-- id: long (nullable = false)
 |-- asin: string (nullable = true)
 |-- reviewText_clean: string (nullable = true)
 |-- helpfulness: double (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- review_wo_stop: string (nullable = true)
 |-- topic_1: string (nullable = true)
 |-- topic_2: string (nullable = true)
 |-- topic_1_pol: float (nullable = true)
 |-- topic_1_sub: float (nullable = true)
 |-- topic_2_pol: float (nullable = true)
 |-- topic_2_sub: float (nullable = true)



In [28]:
amazon_rev_fil = amazon_reviews.filter((amazon_reviews.topic_1_pol != 0) | (amazon_reviews.topic_2_pol != 0))


In [29]:
#amazon_rev_fil = amazon_rev_fil.withColumnRenamed("helpfulness","")
amazon_rev_fil.count()

134076

In [30]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

train, test = amazon_rev_fil.randomSplit([0.9, 0.1], seed=12345)

numeric_cols = ["topic_1_pol","topic_1_sub","topic_2_pol","topic_2_sub"]
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
lr = LinearRegression(maxIter=10, featuresCol="features",labelCol="helpfulness")
training = assembler.transform(train)
testing = assembler.transform(test)
# Fit the model
lrModel = lr.fit(training)

In [31]:
results = lrModel.transform(testing)
results.show(2)

+---+----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+-----------+--------------------+------------------+
| id|      asin|    reviewText_clean|helpfulness|          reviewText|      review_wo_stop|             topic_1|             topic_2|topic_1_pol|topic_1_sub|topic_2_pol|topic_2_sub|            features|        prediction|
+---+----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+-----------+--------------------+------------------+
|  9|3998899561|this is a fantast...|        1.0|This is a fantast...|fantastic case st...|[this is a fantas...|                  []|        0.4|        0.9|        0.0|        0.0|[0.40000000596046...|0.1938141631563738|
| 13|3998899561|unlike most of th...|       0.75|Unlike Most of th...|unlike rechargeab...|[ it doesnt heat ...|

In [33]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
trainingSummary.residuals.show()


Coefficients: [-0.0545240120062,0.0509638244982,-0.0765281410936,0.166000231385]
Intercept: 0.1697563274505073
numIterations: 1
+--------------------+
|           residuals|
+--------------------+
|   0.744641065055044|
|  0.3415044942755673|
| 0.22914286596482009|
|  0.4336411245087861|
|-0.16619613994253157|
|  0.4281258830087288|
|  0.8269273826385848|
|  0.8145133898550703|
| 0.47458440838266863|
|-0.19488222208265238|
|  0.4291770648565912|
|-0.26254103693695696|
| -0.3981430827242153|
| 0.47740754246139183|
|-0.16886628057351338|
|-0.21177017178781282|
|-0.19831624693620326|
|-0.21130748370680935|
|-0.28056886733244074|
|-0.22812000855951745|
+--------------------+
only showing top 20 rows

