In [40]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, udf, col
from pyspark.sql.types import ArrayType, StringType
import string
import unicodedata

import nltk
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
from nltk.stem.snowball import SnowballStemmer
from nltk.util import ngrams
from nltk import pos_tag
from nltk import RegexpParser

from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import sys

In [13]:
#initialize spark instance
spark = SparkSession.builder.appName('Lecture').getOrCreate()

In [15]:
#load review data file from AWS S3
df = spark.read.json('reviews_Musical_Instruments_5.json')

In [16]:
df.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [17]:
#Data cleanup
#keep only overall rating and review text columns:
review = df.select('overall','reviewText')
review.printSchema()
cnt = review.groupby('overall').count()
cnt.show()

root
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)

+-------+-----+
|overall|count|
+-------+-----+
|    1.0|  217|
|    4.0| 2084|
|    3.0|  772|
|    2.0|  250|
|    5.0| 6938|
+-------+-----+



In [68]:
cnt_dict = dict(cnt.collect())
balanced_size = min(cnt_dict[1.0],cnt_dict[5.0])
print(balanced_size)

review_1 = review.filter(review['overall']=='1.0').orderBy(rand()).limit(balanced_size)
review_5 = review.filter(review['overall']=='5.0').orderBy(rand()).limit(balanced_size)

217


In [69]:
review_1.show(5)
review_5.show(5)
review_comb = review_1.union(review_5)
review_w_label = review_comb.withColumn('label',(review_comb['overall']-1)/4)
review_w_label.show(5)

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    1.0|I am a long-time ...|
|    1.0|I bought one of e...|
|    1.0|At the time I bou...|
|    1.0|I might have done...|
|    1.0|It was Not what I...|
+-------+--------------------+
only showing top 5 rows

+-------+--------------------+
|overall|          reviewText|
+-------+--------------------+
|    5.0|It's as good as i...|
|    5.0|To everyone who p...|
|    5.0|A great add on to...|
|    5.0|Nice microphone s...|
|    5.0|I typically use p...|
+-------+--------------------+
only showing top 5 rows

+-------+--------------------+-----+
|overall|          reviewText|label|
+-------+--------------------+-----+
|    1.0|I am a long-time ...|  0.0|
|    1.0|I bought one of e...|  0.0|
|    1.0|At the time I bou...|  0.0|
|    1.0|I might have done...|  0.0|
|    1.0|It was Not what I...|  0.0|
+-------+--------------------+-----+
only showing top 5 rows



In [70]:
def extract_bow_from_raw_text(text_as_string):
    """Extracts bag-of-words from a raw text string.
    Parameters
    ----------
    text (str): a text document given as a string
    Returns
    -------
    list : the list of the tokens extracted and filtered from the text
    """
    if (text_as_string == None):
        return []

    if (len(text_as_string) < 1):
        return []

    if sys.version_info[0] < 3:
        nfkd_form = unicodedata.normalize('NFKD', unicode(text_as_string))
    else:
        nfkd_form = unicodedata.normalize('NFKD', str(text_as_string))

    text_input = str(nfkd_form.encode('ASCII', 'ignore'))

    sent_tokens = sent_tokenize(text_input)

    tokens = list(map(word_tokenize, sent_tokens))

    sent_tags = list(map(pos_tag, tokens))

    grammar = r"""
        SENT: {<(J|N).*>}                # chunk sequences of proper nouns
    """

    cp = RegexpParser(grammar)
    ret_tokens = list()
    stemmer_snowball = SnowballStemmer('english')

    for sent in sent_tags:
        tree = cp.parse(sent)
        for subtree in tree.subtrees():
            if subtree.label() == 'SENT':
                t_tokenlist = [tpos[0].lower() for tpos in subtree.leaves()]
                t_tokens_stemsnowball = list(map(stemmer_snowball.stem, t_tokenlist))
                #t_token = "-".join(t_tokens_stemsnowball)
                #ret_tokens.append(t_token)
                ret_tokens.extend(t_tokens_stemsnowball)
            #if subtree.label() == 'V2V': print(subtree)
    #tokens_lower = [map(string.lower, sent) for sent in tokens]

    return(ret_tokens)


def indexing_pipeline(input_df, **kwargs):
    """Runs a full text indexing pipeline on a collection of texts contained in a DataFrame.
    Parameters
    ----------
    input_df (DataFrame): a DataFrame that contains a field called 'text'
    Returns
    -------
    df : the same DataFrames with a column called 'features' for each document
    wordlist : the list of words in the vocabulary with their corresponding IDF
    """
    inputCol_ = kwargs.get("inputCol", "text")
    vocabSize_ = kwargs.get("vocabSize", 5000)
    minDF_ = kwargs.get("minDF", 2.0)

    # ugly: to add that to our slave nodes so that it finds the bootstrapped nltk_data
    nltk.data.path.append('/home/hadoop/nltk_data')

    extract_bow_from_raw_text("")  # ugly: for instanciating all dependencies of this function
    tokenizer_udf = udf(extract_bow_from_raw_text, ArrayType(StringType()))
    df_tokens = input_df.withColumn("bow", tokenizer_udf(col(inputCol_)))

    cv = CountVectorizer(inputCol="bow", outputCol="vector_tf", vocabSize=vocabSize_, minDF=minDF_)
    cv_model = cv.fit(df_tokens)
    df_features_tf = cv_model.transform(df_tokens)

    idf = IDF(inputCol="vector_tf", outputCol="features")
    idfModel = idf.fit(df_features_tf)
    df_features = idfModel.transform(df_features_tf)

    return(df_features, cv_model.vocabulary)

In [71]:
review_out, vocab =indexing_pipeline(review_w_label, inputCol = 'reviewText')
review_out.printSchema()
print('vocabulary: {}'.format(vocab[:10]))


root
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- label: double (nullable = true)
 |-- bow: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector_tf: vector (nullable = true)
 |-- features: vector (nullable = true)

vocabulary: ['guitar', 'b', 'string', 'pedal', 'good', 'sound', 'great', 'other', 'amp', 'time']


In [72]:
#fit Naive Bayes model
train, test = review_out.randomSplit([0.7,0.3])
train.persist()
print (train.count(), test.count())

NB=NaiveBayes(featuresCol = 'features', labelCol = 'label' )
m= NB.fit(train)
results = m.transform(test)

results.printSchema()

278 156
root
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- label: double (nullable = true)
 |-- bow: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector_tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [73]:
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(results.select('prediction','label'),{evaluator.metricName: 'accuracy'})



0.7628205128205128

In [74]:
theta = m.theta.toArray().T
exp_theta = np.exp(theta)

#calculate likelihood:
prob_neg = exp_theta[:,0] *(1-exp_theta[:,1])
prob_pos = exp_theta[:,1] *(1-exp_theta[:,0])

In [84]:
#top 20 words related to positive/negative reviews:
pos_reviews = np.array(vocab)[np.argsort(prob_pos)[-20:][::-1]]
neg_reviews = np.array(vocab)[np.argsort(prob_neg)[-20:][::-1]]

In [85]:
pos_reviews


array(['pedal', 'great', 'string', 'guitar', 'sound', 'price', 'tuner',
       'i\\', 'acoust', 'good', 'pick', 'qualiti', 'amp', 'nice', 'fx',
       'mic', 'time', 'other', 'best', 'elixir'],
      dtype='<U12')

In [86]:
neg_reviews

array(['guitar', 'amp', 'other', 'strap', 'problem', 'plug', 'pedal',
       'mic', 'string', 'cheap', 'product', 'i', 'cabl', 'sound', 'issu',
       'way', 'time', 'stand', 'pickup', 'thing'],
      dtype='<U12')

Looking at both positive and negative review lists of words, we can observe some clear words related to both feelings such as "great","nice","best" for positive reviews and "problem", "cheap","issu(e)" for negative reviews. 

Note that in order to keep a balanced sample with 1 and 5 star reviews, we used the lower number of 217 reviews for the 1 star, which significantly shrank our sample. We could potentially include 2 and 3 star reviews in order to increase the overall sample size, which might show slightly different results.