In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


# Amazon Musical Instruments Reviews

Webportals like Bhuvan get vast amount of feedback from the users. To go through all the feedback's can be a tedious job. You have to categorize opinions expressed in feedback forums. This can be utilized for feedback management system. We Classification of individual comments/reviews.and we also determining overall rating based on individual comments/reviews. So that company can get a complete idea on feedback's provided by customers and can take care on those particular fields. This makes more loyal Customers to the company, increase in business , fame ,brand value ,profits.

## 1. Problem Definition

In our case, the problem we will be exploring is regression.

This is because we're going to be using a number of different features about reviews to predict them overall rating.

## 2. Data 

The data we're using is from Kaggle : https://www.kaggle.com/eswarchandt/amazon-music-reviews

## 3. Fetures 

This file has reviewer ID , User ID, Reviewer Name, Reviewer text, helpful, Summary(obtained from Reviewer text),Overall Rating on a scale 5, Review time
Description of columns in the file:

* reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B
* asin - ID of the product, e.g. 0000013714
* reviewerName - name of the reviewer
* helpful - helpfulness rating of the review, e.g. 2/3
* reviewText - text of the review
* overall - rating of the product
* summary - summary of the review
* unixReviewTime - time of the review (unix time)
* reviewTime - time of the review (raw)

## Preparing the tools


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, from_unixtime
from pyspark.sql.types import IntegerType, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re

Let's create a Spark Session to develop and train a model.

In [3]:
# create a spark session
spark = SparkSession \
    .builder \
    .appName("Spark Session") \
    .getOrCreate()  # replace session if it has been created

Load CSV file to Spark dataset.

In [4]:
data = spark.read.csv('Musical_instruments_reviews.csv', header=True)
data.show()

+--------------+----------+--------------------+--------+--------------------+-------+--------------------+--------------------+-----------+
|    reviewerID|      asin|        reviewerName| helpful|          reviewText|overall|             summary|      unixReviewTime| reviewTime|
+--------------+----------+--------------------+--------+--------------------+-------+--------------------+--------------------+-----------+
|A2IBPI20UZIR0U|1384719342|"cassandra tu ""Yeah|    well|    that's just like|  u..."|              [0, 0]|Not much to write...|        5.0|
|A14VAT5EAX3D9S|1384719342|                Jake|[13, 14]|The product does ...|    5.0|                Jake|          1363392000|03 16, 2013|
|A195EZSQDW3E21|1384719342|"Rick Bennette ""...|  [1, 1]|The primary job o...|    5.0|It Does The Job Well|          1377648000|08 28, 2013|
|A2C00NNG1ZQQG2|1384719342|"RustyBill ""Sund...|  [0, 0]|Nice windscreen p...|    5.0|GOOD WINDSCREEN F...|          1392336000|02 14, 2014|
| A94QU4C90B1

In [5]:
data.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- helpful: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: string (nullable = true)
 |-- reviewTime: string (nullable = true)



## Parse Data

To train our model we should have the data in the numerical format.

### Clean Data

We should avoid NULL in data. So, let's get all values where columns is not NULL.


In [6]:
df = data

for name in df.schema.names:
    df = df.where(data[name].isNotNull())

print(f'Number of rows in input data = {data.count()}\nNumber of row after removing NULL = { df.count()}')

Number of rows in input data = 10261
Number of row after removing NULL = 10226


Convert `unixReviewTime` to numeric format.

In [7]:
data_to_int = udf(lambda x : int(x) if x.isdigit() else 0, IntegerType())

df = df.withColumn("reviewTimeNumeric", data_to_int(data['unixReviewTime']))
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0)

### Tokenization

Tokenization splits strings into separate words. Spark has a Tokenizer class as well as RegexTokenizer, which allows for more control over the tokenization process.

In [8]:
# for column "reviewText"
regexTokenizer_reviewText = RegexTokenizer(inputCol="reviewText", 
                                           outputCol="review_words",
                                           pattern="\\W") #only words, without punctuation
# for column "summary"
regexTokenizer_summary = RegexTokenizer(inputCol="summary", 
                                           outputCol="summary_words",
                                           pattern="\\W") #only words, without punctuation

df = regexTokenizer_reviewText.transform(df)
df = regexTokenizer_summary.transform(df)
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0, review_words=['that', 's', 'just', 'like'], summary_words=['0', '0'])

### Count Vectorizer

Set how many words we would like to keep.

`vocabSize` : we keep top 1000 the most common words.

In [9]:
# find the term frequencies of the words
cv_review = CountVectorizer(inputCol="review_words", outputCol="TF_review", vocabSize=1000)
cv_summary = CountVectorizer(inputCol="summary_words", outputCol="TF_summary", vocabSize=1000)

df = cv_review.fit(df).transform(df)
df = cv_summary.fit(df).transform(df)
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0, review_words=['that', 's', 'just', 'like'], summary_words=['0', '0'], TF_review=SparseVector(1000, {12: 1.0, 19: 1.0, 32: 1.0, 38: 1.0}), TF_summary=SparseVector(1000, {67: 2.0}))

### Inter-Document Frequency

Let's count TF-IDF - it shows to us importance of the word

In [10]:
idf_review = IDF(inputCol='TF_review', outputCol='TFIDF_review')
idf_summary = IDF(inputCol='TF_summary', outputCol='TFIDF_summary')

df = idf_review.fit(df).transform(df)
df = idf_summary.fit(df).transform(df)
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0, review_words=['that', 's', 'just', 'like'], summary_words=['0', '0'], TF_review=SparseVector(1000, {12: 1.0, 19: 1.0, 32: 1.0, 38: 1.0}), TF_summary=SparseVector(1000, {67: 2.0}), TFIDF_review=SparseVector(1000, {12: 0.8816, 19: 1.2298, 32: 1.3726, 38: 1.3972}), TFIDF_summary=SparseVector(1000, {67: 8.7305}))

### StringIndexer

We want to get labels to our `reviewerID`, `asin` and `overall` columns . For example, label = 1 - the most common word, label = 100 - the most uncommon word.

In [11]:
indexer = StringIndexer(inputCols=['reviewerID', 'asin', 'overall'],
                        outputCols=['reviewerID_indexed', 'asin_indexed', 'overall_indexed'])
df = indexer.fit(df).transform(df)
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0, review_words=['that', 's', 'just', 'like'], summary_words=['0', '0'], TF_review=SparseVector(1000, {12: 1.0, 19: 1.0, 32: 1.0, 38: 1.0}), TF_summary=SparseVector(1000, {67: 2.0}), TFIDF_review=SparseVector(1000, {12: 0.8816, 19: 1.2298, 32: 1.3726, 38: 1.3972}), TFIDF_summary=SparseVector(1000, {67: 8.7305}), reviewerID_indexed=66.0, asin_indexed=701.0, overall_indexed=6.0)

### Prepare data for training

Let's choose a LogisticRegression Model and train it. For this we should add a `label` column and a `features` column.

* `label` - what we want to predict
* `features` - we will use these values to predict

In [12]:
assembler = (VectorAssembler()
    .setInputCols(["reviewerID_indexed", "asin_indexed", "TFIDF_review", "TFIDF_summary", "reviewTimeNumeric"])
    .setOutputCol("features"))
    
df = assembler.transform(df)
df.head()

Row(reviewerID='A2IBPI20UZIR0U', asin='1384719342', reviewerName='"cassandra tu ""Yeah', helpful=' well', reviewText=" that's just like", overall=' u..."', summary='[0, 0]', unixReviewTime="Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime='5.0', reviewTimeNumeric=0, review_words=['that', 's', 'just', 'like'], summary_words=['0', '0'], TF_review=SparseVector(1000, {12: 1.0, 19: 1.0, 32: 1.0, 38: 1.0}), TF_summary=SparseVector(1000, {67: 2.0}), TFIDF_review=SparseVector(1000, {12: 0.8816, 19: 1.2298, 32: 1.3726, 38: 1.3972}), TFIDF_summary=SparseVector(1000, {67: 8.7305}), reviewerID_indexed=66.0, asin_indexed=701.0, overall_indexed=6.0, features=SparseVector(2003, {0: 66.0, 1: 701.0, 14: 0.8816, 21: 1.2298, 34: 1.3726, 40: 1.3972, 1069: 8.7305}))

Let's split our data on train and test sets:

* Train set = 80 % of all data
* Test set = 20 % of all data

In [13]:
# create a data frame which has only feature and target (label)
data_lr = df.select(col("overall_indexed").alias("label"), col("features"))

# split data
train_df, test_df = data_lr.randomSplit([0.8, 0.2], seed=42)

## Training and predicting

Define a Logistic Regression Model.

In [14]:
lr_model = LogisticRegression(maxIter=5, regParam=0.0, elasticNetParam=0)

In [15]:
# fit the model
LR = lr_model.fit(train_df)

# predicting
res_LR = LR.transform(test_df)

In [16]:
print(f'Number of right predicted values equals {res_LR.filter(res_LR.label == res_LR.prediction).count()} from {res_LR.count()}')
print(f'Accuracy equals {LR.summary.accuracy}')

Number of right predicted values equals 1279 from 1961
Accuracy equals 0.7283726557773744
