In [1]:
# initiating spark session
import pyspark as ps

conf = ps.SparkConf() \
    .setAppName("My App") \
    .setMaster("local[*]")
sc = ps.SparkContext(conf=conf)

spark = ps.sql.SparkSession(sc)

In [2]:
# Loading amazon reviews json file
reviews = spark.read.json("data/reviews_Musical_Instruments_5.json.gz")

In [3]:
reviews.take(3)

[Row(asin=u'1384719342', helpful=[0, 0], overall=5.0, reviewText=u"Not much to write about here, but it does exactly what it's supposed to. filters out the pop sounds. now my recordings are much more crisp. it is one of the lowest prices pop filters on amazon so might as well buy it, they honestly work the same despite their pricing,", reviewTime=u'02 28, 2014', reviewerID=u'A2IBPI20UZIR0U', reviewerName=u'cassandra tu "Yeah, well, that\'s just like, u...', summary=u'good', unixReviewTime=1393545600),
 Row(asin=u'1384719342', helpful=[13, 14], overall=5.0, reviewText=u"The product does exactly as it should and is quite affordable.I did not realized it was double screened until it arrived, so it was even better than I had expected.As an added bonus, one of the screens carries a small hint of the smell of an old grape candy I used to buy, so for reminiscent's sake, I cannot stop putting the pop filter next to my nose and smelling it after recording. :DIf you needed a pop filter, this wil

Checking the structure of that dataframe, and the column detected in the json content

In [4]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



For this analysis, we will be using the columns reviewText and overall rating

In [5]:
reviews_pro = reviews.select("reviewText", "overall")

In [6]:
reviews_pro.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)



using group by, the number of reviews per rating was calculated

In [7]:
import pyspark.sql.functions as F
reviews_count = reviews_pro.groupBy(reviews_pro.overall).agg(F.count(reviews_pro.reviewText).alias('RatingCount'))

In [8]:
reviews_count.show()

+-------+-----------+
|overall|RatingCount|
+-------+-----------+
|    1.0|        217|
|    4.0|       2084|
|    3.0|        772|
|    2.0|        250|
|    5.0|       6938|
+-------+-----------+



In this case, I will be using only extreme ratings {1.0, 5.0}. Based on the table above, the number of samples needed to build a balanced set were calculated below to have the same number of reviews in 1.0 and 5.0.



In [28]:
neg = reviews_pro.filter(reviews_pro["overall"] <= 1.0)

In [29]:
neg.count()

217

In [30]:
pos = reviews_pro.filter(reviews_pro["overall"] >= 5.0)

In [31]:
from pyspark.mllib.random import RandomRDDs

In [32]:
neg_data = neg.sample(False, 0.92)

In [33]:
print "number of samples in negative rating: ", neg_data.count()

number of samples in negative rating:  194


In [34]:
pos_data = pos.sample(False, 0.031)

In [35]:
print "number of samples in positive rating: ", pos_data.count()

number of samples in positive rating:  208


creating a single dataframe containing the samples from both the balanced neg and pos classes.

In [36]:
data = neg_data.union(pos_data)

In [37]:
data.count()

402

creating a function func_rating to derive the target variable, label

In [38]:
def func_rating(rating):
    if rating == 1.0:
        return 0
    else:
        return 1

In [39]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
func_udf = udf(func_rating, IntegerType())
result_df = data.withColumn("label", func_udf(data.overall))

In [40]:
result_df.take(10)

[Row(reviewText=u"It hums, crackles, and I think I'm having problems with my equipment.  As soon as I use any of my other cords then the problem is gone.  Hosa makes some other products that have good value.  But based on my experience I don't recommend this one.", overall=1.0, label=0),
 Row(reviewText=u"I'm a pro-cheapo and I hated this thing. They're noisy, and the cables feel really cheap, gummy-like. Drop few more bucks and get something else!", overall=1.0, label=0),
 Row(reviewText=u'Received it in time, standard blister packaging but the cable stopped working after 45 days. Since I was out of 30 days return time, sent an email to the manufacturer but no response till today.', overall=1.0, label=0),
 Row(reviewText=u"These things are terrible. One wouldn't fit in my soundboard, another wouldn't lock in my mic. Plain and simple....if your looking for crap that doesn't work....buy this.", overall=1.0, label=0),
 Row(reviewText=u"This is a cheap piece of junk that does what it says

In [41]:
# printing the schema of final dataframe
result_df.printSchema()

root
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- label: integer (nullable = true)



The indexing_pipeline function is used below that takes a dataframe and a column to specify which field to apply the TFIDF on as inputs. It returns two things: first, the same DataFrame with a field 'features' and second, the vocabulary.

In [42]:
from nlp_pipeline import indexing_pipeline

In [43]:
df, voc = indexing_pipeline(result_df, inputCol="reviewText")

In [44]:
df.first()

Row(reviewText=u"It hums, crackles, and I think I'm having problems with my equipment.  As soon as I use any of my other cords then the problem is gone.  Hosa makes some other products that have good value.  But based on my experience I don't recommend this one.", overall=1.0, label=0, bow=[u'crackl', u'problem', u'equip', u'other', u'cord', u'problem', u'hosa', u'other', u'product', u'good', u'valu', u'experi'], vector_tf=SparseVector(1124, {2: 1.0, 6: 2.0, 9: 1.0, 14: 2.0, 64: 1.0, 242: 1.0, 277: 1.0, 351: 1.0, 887: 1.0}), features=SparseVector(1124, {2: 1.414, 6: 3.4725, 9: 1.8881, 14: 4.4295, 64: 3.2909, 242: 3.6964, 277: 3.6964, 351: 3.9195, 887: 4.9003}))

### Applying naive bayes algorithm for sentiment analysis

In [45]:
from pyspark.ml.classification import NaiveBayes

splitting into training and test datasets

In [46]:
train_df, test_df = df.randomSplit([0.7, 0.3])

In [47]:
train_df.count()

280

In [48]:
test_df.count()

122

Implementing NaiveBayes and specifying the columns for features (featureCol), labels (labelCol) and prediction (predictionCol). Then applying .fit() method on training set to obtain and use a model on the testing test.

In [49]:
nb = NaiveBayes(featuresCol="features", labelCol="label", predictionCol="prediction")

In [50]:
# training the model
model = nb.fit(train_df)

In [51]:
# applying the model on the test set
prediction = model.transform(test_df)

In [52]:
prediction.select('reviewText', 'prediction', 'probability').show()

+--------------------+----------+--------------------+
|          reviewText|prediction|         probability|
+--------------------+----------+--------------------+
|...for something,...|       1.0|[4.82112594208346...|
|A great little pa...|       0.0|[0.99994898159018...|
|At the time I bou...|       0.0|[1.0,3.4785709140...|
|Be careful; I hav...|       1.0|[0.25986707347735...|
|Bought it after r...|       0.0|[0.99999999995016...|
|Bought this a whi...|       1.0|[0.01638607509435...|
|Can't believe the...|       0.0|[0.96362221099522...|
|Could not get to ...|       1.0|[0.00349439710197...|
|DOWNGRADE TO 1-O ...|       0.0|[1.0,2.2325069633...|
|Disappointing; wa...|       1.0|[0.25947854189704...|
|Doesn't reduce so...|       1.0|[2.03052760753931...|
|Go build your own...|       0.0|[0.99999999928353...|
|I got one of thes...|       0.0|[1.0,6.6954417461...|
|I had some issues...|       0.0|[0.99825573233647...|
|I have always lov...|       0.0|[0.99999999999995...|
|I have ha

Using the MulticlassClassificationEvaluator to obtain an evaluation of the accuracy of the classification.

Applying the instance on the prediction and label columns, by using .evaluate(). It will compute accuracy (or any other given metric) based on the differences observed between these two columns.


In [53]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [54]:
mce = MulticlassClassificationEvaluator()

In [55]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(prediction)

0.6967824440471745

#### Interpretation of the NaiveBayes results

The NaiveBayes model provides an internal matrix model.theta that can be converted into a numpy array with model.theta.toArray(). This matrix contains two columns corresponding to the two classes: 0 for neg and 1 for pos.

The values inside that matrix correspond, for each class, to the prior probabilities used to compute the likelihood of a document to belong to the class. In this implementation, the model.theta matrix doesn't provide probabilities, but log of probabilities.

In [56]:
model.theta.toArray().T

array([[-4.93572547, -4.67371078],
       [-5.23820439, -4.49589982],
       [-5.57048511, -5.02677174],
       ..., 
       [-9.43459814, -6.95831888],
       [-7.65959083, -7.56291775],
       [-9.43459814, -9.33792506]])

Obtaining words that are related to the pos class, and words that are related to the neg class. Ranking these words based on decreasing prior probabilities

In [57]:
import numpy as np

thetaarray = model.theta.toArray().T

vocab_size = len(voc)

dtype = [('label', 'S10'), ('neg', float), ('pos', float)]
prob_values = [ (voc[i],
                 np.exp(thetaarray[i,0])*(1-np.exp(thetaarray[i,1])),
                 (1-np.exp(thetaarray[i,0]))*np.exp(thetaarray[i,1]))
               for i in range(vocab_size) ]

a = np.array(prob_values, dtype=dtype)

In [58]:
prob_values[0]

(u'guitar', 0.0071181537240466494, 0.0092704629488804986)

In [59]:
np.sort(a, order='pos')[::-1][0:20]

array([('string',   5.25055388e-03,  0.01109541),
       ('guitar',   7.11815372e-03,  0.00927046),
       ('great',   2.14154191e-03,  0.00783009),
       ('pedal',   5.97409159e-03,  0.00744584),
       ('capo',   1.61514003e-03,  0.00717606),
       ('sound',   4.45302925e-03,  0.00687892),
       ('good',   3.78364791e-03,  0.00653497),
       ('microphon',   1.01312105e-03,  0.00629162),
       ('best',   1.12448134e-03,  0.0061038 ),
       ('voic',   7.94716234e-05,  0.00549679),
       ('strap',   6.52520934e-03,  0.00528761),
       ('easi',   4.68080934e-04,  0.00503271),
       ('year',   2.46737561e-03,  0.00495576),
       ('other',   4.91219607e-03,  0.00480189),
       ('tuner',   1.84837020e-03,  0.00472864),
       ('pick',   3.22103449e-03,  0.00454266),
       ('condens',   7.95487633e-05,  0.00453154),
       ('set',   2.51230195e-03,  0.0044814 ),
       ('much',   1.58538169e-03,  0.00412629),
       ('i',   3.04198753e-03,  0.00410816)], 
      dtype=[('label', '

In [60]:
np.sort(a, order='neg')[::-1][0:20]

array([('guitar',  0.00711815,  0.00927046),
       ('amp',  0.00680733,  0.00409939),
       ('strap',  0.00652521,  0.00528761),
       ('pedal',  0.00597409,  0.00744584),
       ('product',  0.00564209,  0.00355777),
       ('string',  0.00525055,  0.01109541),
       ('problem',  0.00520585,  0.00125114),
       ('other',  0.0049122 ,  0.00480189),
       ('way',  0.00482289,  0.00108855),
       ('time',  0.00458384,  0.00344471),
       ('thing',  0.00457128,  0.0024647 ),
       ('few',  0.00448679,  0.00307074),
       ('batteri',  0.00445613,  0.00150124),
       ('sound',  0.00445303,  0.00687892),
       ('cabl',  0.00436015,  0.00274829),
       ('cord',  0.00427724,  0.00239507),
       ('mic',  0.00422665,  0.00345088),
       ('stand',  0.00417273,  0.00269395),
       ('cheap',  0.0039904 ,  0.00213509),
       ('review',  0.00398323,  0.00051614)], 
      dtype=[('label', 'S10'), ('neg', '<f8'), ('pos', '<f8')])

Some words that clearly carry out a positive/negative feeling are observed. But they are mixed with other words that are only related to the products. It's because this analysis was performed on a dataset based on Instruments only. Thus, the positive/negative concept it biased by the terms related to the products people generally evaluate positively (or negatively).