Download libraries and Initialise spark session

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf /content/spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Download dataset

In [2]:
!curl -L http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz -o data.json.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2402k  100 2402k    0     0  2479k      0 --:--:-- --:--:-- --:--:-- 2477k


Spark specific imports

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

We train and test our sentiment analysis model on musical instruments dataset and use the model on a separate but similar amazon review dataset for demonstration purposes 

Read and display dataset

In [4]:
data = spark.read.json('data.json.gz')
data = data.select('overall','reviewText')
data = data.withColumnRenamed('overall', 'label')
data.show(5)

+-----+--------------------+
|label|          reviewText|
+-----+--------------------+
|  5.0|Not much to write...|
|  5.0|The product does ...|
|  5.0|The primary job o...|
|  5.0|Nice windscreen p...|
|  5.0|This pop filter i...|
+-----+--------------------+
only showing top 5 rows



## **Sentiment Analysis**

In [5]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import HashingTF, IDF

# we tokenize review text column in dataset to split review text by words
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")
'''
we use standard list of stopwords available in spark, note that we use the
input from regex tokenizer which splits string into words
'''
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")

'''
After removing stopwords, we use the filtered column to convert list of words
into raw features basically it means that given a list of words as
filtered = [I, saw, the, red, balloon,I,am,human] 
It is converted into a vector of **fixed-size**
for the above example if 'I' is hashed to 24 ,then vector[24]=2 just stores the 
frequency of 'I'
This fixed size by default is 2^18   
'''
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures")
'''
minDocFreq: remove sparse terms
IDF is used to remove importance of words which occur often in document
refer : https://spark.apache.org/docs/latest/ml-features#feature-extractors
'''
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) 

Setup pipeline and transform the data

In [6]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

(trainingData, testData) = dataset.randomSplit([0.8, 0.2])

Train logistic regression classifier on the data and find predictions for test data

In [7]:
from pyspark.ml.classification import LogisticRegression
 
# parameters can be tuned
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
# find predictions
predictions = lrModel.transform(testData)
predictions.show(n=1, truncate=True, vertical=True)

-RECORD 0-----------------------------
 label         | 1.0                  
 reviewText    | ...what is the so... 
 words         | [what, is, the, s... 
 filtered      | [sound, backgroun... 
 rawFeatures   | (262144,[4714,744... 
 features      | (262144,[4714,744... 
 rawPrediction | [-5.5069815393259... 
 probability   | [2.20309578445644... 
 prediction    | 3.0                  
only showing top 1 row



Evaluate the results of LR model

In [8]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5992091221281566

## **ALS Matrix factorization**
 
Note that different dataset is used here namely the Amazon digital music dataset

In [9]:
!curl -L http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz -o data_als.json.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2402k  100 2402k    0     0  2696k      0 --:--:-- --:--:-- --:--:-- 2696k


Pre-processing data

In [10]:
df = spark.read.json('data_als.json.gz')
# only select the necessary rows
df = df.select('asin','overall','reviewText','reviewerID','helpful')
# Convert unique strings to integer using StringIndexer
asin_indexer = StringIndexer(inputCol="asin", outputCol="itemId")
reviewerID_indexer = StringIndexer(inputCol="reviewerID", outputCol="userId")

df = asin_indexer.fit(df).transform(df)
df = reviewerID_indexer.fit(df).transform(df)

# drop unnecessary columns 
df = df.select('userId','itemId','reviewText','overall','helpful')
df = df.withColumnRenamed('overall', 'orig_rating')

df.show(5)

+------+------+--------------------+-----------+--------+
|userId|itemId|          reviewText|orig_rating| helpful|
+------+------+--------------------+-----------+--------+
|  66.0| 703.0|Not much to write...|        5.0|  [0, 0]|
| 266.0| 703.0|The product does ...|        5.0|[13, 14]|
| 395.0| 703.0|The primary job o...|        5.0|  [1, 1]|
|1048.0| 703.0|Nice windscreen p...|        5.0|  [0, 0]|
|1311.0| 703.0|This pop filter i...|        5.0|  [0, 0]|
+------+------+--------------------+-----------+--------+
only showing top 5 rows



Transform the data using the pipeline used earlier,
and calculate rating predicted by our sentiment classifcation model(*sent_rating*)

In [11]:
df = pipelineFit.transform(df)
df = lrModel.transform(df)

In [12]:
# remove unnecessary columns
df = df.select('userId','itemId','orig_rating','helpful','prediction')
df = df.withColumnRenamed('prediction', 'sent_rating')

df.show(2)

+------+------+-----------+--------+-----------+
|userId|itemId|orig_rating| helpful|sent_rating|
+------+------+-----------+--------+-----------+
|  66.0| 703.0|        5.0|  [0, 0]|        5.0|
| 266.0| 703.0|        5.0|[13, 14]|        5.0|
+------+------+-----------+--------+-----------+
only showing top 2 rows



### **Normalising rating** - 


Using formula -

> orig_rating * 0.8 + sent_rating * 0.2 + overall_effect

Where overall_effect is calculated as 
 
```
  if pos > neg :
    overall_effect = 1+(pos)/(pos+neg)
  else :
    overall_effect = 1-(neg)/(pos+neg)
  
  overall_effect *= 0.5
```
Where 

>  pos = number of people who found that review helpful \
neg = number of people who found that review not-helpful







In [13]:
from pyspark.sql.functions import udf,col
from pyspark.sql.types import DoubleType

def normalise_rating(orig_rating,sent_rating,helpful):
  # return helpful[0]
  pos = helpful[0]+1
  neg = helpful[1]+1
  
  if pos > neg :
    effect = 1+(pos)/(pos+neg)
  else :
    effect = 1-(neg)/(pos+neg)
  
  effect *= 0.5

  return effect+orig_rating*0.8+sent_rating*0.2
  
# register it as a spark user-defined function with double return type
normalise_udf = udf(normalise_rating,DoubleType())

In [14]:
# Normalise the rating and store result in df2

df2 = df.withColumn("norm_rating", normalise_udf(col("orig_rating"),col("sent_rating"),col("helpful")))
df2.show(5)
df2.printSchema()

# split data into train-test
(train, test) = df2.randomSplit([0.8, 0.2], seed = 1234)

+------+------+-----------+--------+-----------+-----------------+
|userId|itemId|orig_rating| helpful|sent_rating|      norm_rating|
+------+------+-----------+--------+-----------+-----------------+
|  66.0| 703.0|        5.0|  [0, 0]|        5.0|             5.25|
| 266.0| 703.0|        5.0|[13, 14]|        5.0|5.241379310344827|
| 395.0| 703.0|        5.0|  [1, 1]|        5.0|             5.25|
|1048.0| 703.0|        5.0|  [0, 0]|        4.0|             5.05|
|1311.0| 703.0|        5.0|  [0, 0]|        5.0|             5.25|
+------+------+-----------+--------+-----------+-----------------+
only showing top 5 rows

root
 |-- userId: double (nullable = false)
 |-- itemId: double (nullable = false)
 |-- orig_rating: double (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- sent_rating: double (nullable = false)
 |-- norm_rating: double (nullable = true)



Run ALS on 3 rating columns we have obtained 


1.   orig_rating = original rating given by user
2.   sent_rating = rating obtained by using LR sentiment analysis
3. norm_rating = normalised rating caluclated by us, using the orig_rating,sent_rating,helpfulness where more helpful ratings of more helpful reviews are increased and weighted average of orig_rating and sent_rating is 



In [15]:
rating_columns = ['orig_rating','norm_rating']

# using each of the rating columns run ALS matrix factorization and obtain RMSE

for rating_column in rating_columns:

  als = ALS(maxIter=20, regParam=0.1, rank=50,userCol="userId", itemCol="itemId", ratingCol=rating_column,coldStartStrategy="drop")
  model = als.fit(train)

  # Evaluate the model by computing the RMSE on the test data
  predictions = model.transform(test)
  evaluator = RegressionEvaluator(metricName="rmse", labelCol=rating_column,predictionCol="prediction")
  rmse = evaluator.evaluate(predictions)
  print(f"For {rating_column} Root-mean-square error = {rmse}")


For orig_rating Root-mean-square error = 1.096972208241755
For norm_rating Root-mean-square error = 0.9640859879724919
