In [None]:
# Mohit Bhasin
# HW4 Q4
# Beer Reviews
# These data contain 2,924,163 reviews by 40,213 unique users on 110,419 unique types of beer.

# J. McAuley and J. Leskovec. Hidden factors and hidden topics: understanding rating dimensions with review text. RecSys, 2013.

In [31]:
from pyspark.mllib.feature import HashingTF, IDF, Normalizer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
import re

In [33]:
import json
all_reviews = sc.textFile("s3n://stat-37601/ratings.json", minPartitions=1000).map(json.loads)
reviews, reviews_test = all_reviews.randomSplit([.01, .99])
reviews.cache
reviews.count()

29376

In [25]:
# Let's see what a review looks like
reviews.take(1)[0]

{u'beer_ABV': u'6.4',
 u'beer_beerId': u'19539',
 u'beer_brewerId': u'2495',
 u'beer_name': u'Erebuni',
 u'beer_style': u'Strong Pale Lager/Imperial Pils',
 u'review_appearance': u'4/5',
 u'review_aroma': u'5/10',
 u'review_overall': u'11/20',
 u'review_palate': u'3/5',
 u'review_profileName': u'JPDIPSO',
 u'review_taste': u'5/10',
 u'review_text': u'Strong grassy hops abound and are detected from a great distance. Hints of dry pale and caramel malt. As the hops fade, there seems to very little left. Golden liquid with a moderate off-white head and nice lace.  Solid, yet, somewhat grain malt in the front that give way to a herbal grassy bitterness in the middle. Finishes slightly crisp, but there is a touch of metal/mineral that distracts in the linger. Certainly not one of the worst euro strongs out there.',
 u'review_time': u'1188950400'}

In [34]:
def getLabel(review):
    """
    Get the overall rating from a review
    """
    label, total = review["review_overall"].split("/")
    return float(label) / float(total)
labels = reviews.map(getLabel)

##### Map ['review text'] to a vocabaulary of words with word counts

In [35]:
# Using a simple parser - make text lowercase, remove punctuation and split on blank space
# Then count the occurences of each word
def removePunctuation(text):
    """
    Replaces anything that is not a lowercase letter, a space, or an apostrophe with a space:
    """
    stripped = re.sub('[^a-z\ \']+', " ", text)
    return stripped

word_counts = reviews.map(lambda x: x["review_text"].lower()).map(removePunctuation).flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a,b: a + b)
print(word_counts.take(10)),

[(u'jblauvs', 1), (u'flavouor', 1), (u'bourdon', 1), (u'opera', 1), (u'nun', 5), (u'inintressante', 1), (u'montreals', 1), (u'wasent', 1), (u'istllet', 2), (u'stopping', 4)]


##### See what words show up often so that we can filter to the words we want

In [36]:
count_words = word_counts.map(lambda (a,b): (b,a)).sortByKey(False)
wordcount = count_words.count()

In [37]:
wordcount

32589

In [38]:
percent_words =  count_words.map(lambda (a,b): (round(float(a)/wordcount,2),b))
print(percent_words.take(10)),

[(2.09, u'a'), (1.71, u'and'), (1.48, u'the'), (1.27, u'with'), (1.12, u'of'), (0.92, u'is'), (0.63, u'head'), (0.54, u'aroma'), (0.51, u'to'), (0.45, u'in')]


We can see that the "stop words" are words that occur >~85% of the time. We filter these words out.

In [39]:
percent_words_small = percent_words.filter(lambda (k,v): k<.85 and k>.001)
print(percent_words_small.take(5)),

[(0.63, u'head'), (0.54, u'aroma'), (0.51, u'to'), (0.45, u'in'), (0.45, u'this')]


In [49]:
wordlist = percent_words_small.map(lambda (k,v): v).collect()
n_features = len(wordlist)
print(n_features)

845


##### We now have a list of words (230 words) that we need to compute our TF-IDF on

In [59]:
def reviewParser(review_text):
    """
    Filter the review text to only words that show up in the vocabulary we built above 
    """
    words = review_text.lower()
    words = re.sub('[^a-z\ \']+', " ", words).split()
    words = [word for word in words if word in wordlist]
    return (words)

filtered_reviews = reviews.map(lambda x: x['review_text']).map(reviewParser)

##### The term frequency function below creates a sparse vector of frequency counts for each word in a given review . The hash is a Spark function that does this more efficiently. Notice that the length of the sparse vector is less than the length of our wordlist, this is aresult of the hashing.

In [60]:
hashingTF = HashingTF(n_features)
tf = hashingTF.transform(filtered_reviews)
tf.take(1)

[SparseVector(845, {15: 1.0, 88: 1.0, 116: 1.0, 239: 1.0, 246: 1.0, 338: 1.0, 364: 1.0, 379: 1.0, 381: 1.0, 427: 1.0, 430: 1.0, 449: 1.0, 450: 1.0, 475: 1.0, 547: 1.0, 573: 1.0, 578: 1.0, 675: 1.0, 676: 1.0, 712: 1.0, 723: 1.0, 754: 1.0, 762: 1.0, 820: 1.0, 828: 1.0}),
 SparseVector(845, {8: 1.0, 65: 1.0, 86: 1.0, 230: 4.0, 244: 1.0, 269: 2.0, 277: 2.0, 305: 1.0, 307: 2.0, 396: 1.0, 422: 1.0, 430: 1.0, 444: 1.0, 470: 3.0, 475: 2.0, 489: 1.0, 554: 3.0, 573: 1.0, 576: 1.0, 578: 2.0, 602: 1.0, 642: 3.0, 676: 2.0, 685: 1.0, 694: 2.0, 789: 2.0, 835: 2.0})]

In [61]:
tf.cache()
idf = IDF(minDocFreq=2).fit(tf)
features = idf.transform(tf)
features.take(1)

[SparseVector(845, {15: 1.7704, 88: 1.8539, 116: 2.3001, 239: 1.3149, 246: 4.0341, 338: 1.6621, 364: 2.2524, 379: 1.2307, 381: 2.2059, 427: 0.9955, 430: 1.5026, 449: 2.5131, 450: 2.1482, 475: 0.3765, 547: 1.1584, 573: 1.1663, 578: 0.5884, 675: 3.8994, 676: 1.3091, 712: 2.8632, 723: 1.0717, 754: 2.6435, 762: 2.3449, 820: 1.4662, 828: 3.4045}),
 SparseVector(845, {8: 1.8187, 65: 4.1225, 86: 2.7108, 230: 6.1245, 244: 1.1272, 269: 2.6166, 277: 2.7833, 305: 1.0882, 307: 6.4601, 396: 2.3158, 422: 2.6092, 430: 1.5026, 444: 2.3322, 470: 3.4889, 475: 0.7529, 489: 2.9108, 554: 4.0597, 573: 1.1663, 576: 4.4415, 578: 1.1769, 602: 1.6842, 642: 4.8026, 676: 2.6181, 685: 2.9241, 694: 2.8388, 789: 1.9759, 835: 5.0279})]

In [62]:
data = features.zip(labels).map(lambda (feature, label): LabeledPoint(label, feature))

In [None]:
def meanSquaredError(lAndP):
    """
    Calculate squared Errror 
    """
    mse_err = lAndP.map(lambda (v,p): (v-p)**2).reduce(lambda a,b: a+b)/float(lAndP.count())
    return (mse_err)

## Decision Trees
Build a decision tree on the (x,y) data designed above. Use that to make predictions and take a look at those predictions.
Finally compute the mean squared error based on the predictions

In [63]:
model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=2, maxBins=2)
predictions = model.predict(data.map(lambda x: x.features)) # Predict on in-sample
labelsAndPredictions = data.map(lambda lp: lp.label).zip(predictions) 

In [65]:
labelsAndPredictions.take(5)

[(0.65, 0.6861130374479889),
 (0.75, 0.6164411725693068),
 (0.4, 0.6164411725693068),
 (0.7, 0.7398477157360409),
 (0.45, 0.6164411725693068)]

### Calculate the mean squared error of our predictions

In [66]:
meanSquaredError(labelsAndPredictions)


0.02553340558226822

## Random Forests

In [67]:
model_rf = RandomForest.trainRegressor(data, {}, 2, impurity='variance', maxDepth=2, maxBins=2, seed=42)
predictions_rf = model_rf.predict(data.map(lambda x: x.features)) # Predict on in-sample
labelsAndPredictions_rf = data.map(lambda lp: lp.label).zip(predictions_rf) 

In [68]:
labelsAndPredictions_rf.take(5)

[(0.65, 0.6281735032893854),
 (0.75, 0.6281735032893854),
 (0.4, 0.6281735032893854),
 (0.7, 0.7257670974132193),
 (0.45, 0.6723545769158192)]

In [70]:
meanSquaredError(labelsAndPredictions_rf)

0.025505651267428674

Questions
Q:Why do we need a normalizer?
Q: Do we need to specify the sparsevector length?
Thats specifies my feature vector length (generally smaller than my vocab because of hashing)
Q:minDocFreq?
Q: Ensemble?

5/21
Q: what does cache do?
Q: what does max Bins do?
Q: What is the CV shortcut that was mentioned?

Consider keeping 1K+ words
Never create a new idf, (i.e same tf, same idf)
x is 1663 length vector of tfidf values per review
y is review rating

In [29]:
# Appendix

def squaredError(preds):
    """
    Calculate squared Errror 
    """
    sse = (preds[1]-preds[0])^2
    return (preds[1]-preds[0])



def parser(review_text):
    """
    Generate feature vector of words from Beer review 
    """
    words = review_text.lower().split()
    .map(lambda word: (word, 1)) 
             .reduceByKey(lambda a, b: a + b)
    return (words)

1072312