In [1]:
# Imports
import json, re, numpy as np, numpy.linalg as nplin, matplotlib.pyplot as plt, matplotlib.mlab as mlab, scipy.stats as spstat
from __future__ import division
from pyspark.mllib.feature import HashingTF, IDF, Normalizer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, RandomForest, GradientBoostedTrees
from operator import itemgetter
from time import time
%matplotlib inline

In [2]:
# Load data, split into test and training set
all_reviews = sc.textFile("s3n://stat-37601/ratings.json", minPartitions=1000).map(json.loads)
reviews, reviews_test = all_reviews.randomSplit([.7, .3])
reviews.cache()

PythonRDD[2] at RDD at PythonRDD.scala:42

Get the variable we are regressing on: a continious score out of 1

In [3]:
# Get the variable we are regressing on, the review as a score out of 1
def getLabel(review):
    """Get the overall rating from a review"""
    label, total = review["review_overall"].split("/")
    return float(label) / float(total)

labels      = reviews.map(getLabel)
labels_test = reviews_test.map(getLabel)

#### (a) Generating features (hashed TF-IDF)
We generate a list of words (the features) for each review, transform them into hashes, calculate term frequency-inverse document frequency accross corpus, and normalize

In [4]:
# Parser, mostly from earlier problem using SGD on tweets, except without code for emoticions

# words to ignore
stop = set(['the', 'and', 'you', 'your', 'for', 'por', 'que', 'las', 'los', 'les',\
       'una', 'del', 'este', 'usted', 'para', 'con', 'this', 'that', 'was', 'have', 'like',\
       'would', 'could', 'should', 'will', 'can', 'shall', 'just', 'all', 'it', 'its', 'per'])
eng_stop = set(['i', 'me', 'my', 'myself', 'we', 'our', \
             'ours', 'ourselves', 'you', 'your', 'yours', \
             'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', \
             'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', \
             'themselves', 'what', 'which', 'who', 'whom', 'this', \
             'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', \
             'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do',\
             'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', \
             'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at',\
             'by', 'for', 'with', 'about', 'against', 'between', 'into', \
             'through', 'during', 'before', 'after', \
            'above', 'below', 'to', 'from', 'up', 'down', 'in',\
            'out', 'on', 'off', 'over', 'under', 'again', 'further', \
            'then', 'once', 'here', 'there', 'when', 'where', 'why', \
            'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most',\
            'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', \
            'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', \
            'just', 'dont', 'should', 'now','on'])
spa_stop = set()
all_stop = stop|eng_stop|spa_stop

# word processor function
def splitter(s,ignore=all_stop):
    s = re.sub("([a-zA-Z])'([a-zA-Z])","\g<1>\g<2>",s) # standardize to no apostrophe
    s = re.sub('[^a-zA-Z!\?]',' ',s)           # get rid of most punctuation 
    s = re.sub('\?![\?!]*|!\?[\?!]*',' !? ',s) # standardize ?!?!?!
    s = re.sub('!+','!',s)                    # standardize to single !
    s = re.sub('\?+','?',s)                   # standarize to single ?
    s = re.sub('([a-zA-z]{2,})([?!]+)(\s|$)','\g<1> \g<2> ',s) # single out punctuation
    s = re.sub('(?!http://)www\.\S+|http://\S+','',s) # get rid of urls
    return list([w.lower() for w in s.split() if w not in ignore])

I only take 200 features for the hashing space. This means there are going to be a huge number of collisions, resulting in a large but unintelligent dimmensionality reduction. The computational time and space issues become prohibitive without a large reduction though. We still end up with some predictive power anyway though, so we aren't totally ruining the regression.

In [5]:
# Do the hashing transform.
revHTF = HashingTF(numFeatures=200)
reviewFrequency      = revHTF.transform(     reviews.map(lambda review: splitter(review["review_text"]))).cache()
review_testFrequency = revHTF.transform(reviews_test.map(lambda review: splitter(review["review_text"]))).cache()

# Do the inverse document frequency transform
revIDF = IDF().fit(reviewFrequency)
nor = Normalizer(p=2)
features      = nor.transform(revIDF.transform(reviewFrequency)).cache()
features_test = nor.transform(revIDF.transform(review_testFrequency)).cache()

# Un-cache unneeded data sets
reviewFrequency.unpersist()
review_testFrequency.unpersist()

PythonRDD[4] at RDD at PythonRDD.scala:42

In [6]:
# Join the labels back with the features
data = features.zip(labels).map(lambda (feature, label): LabeledPoint(label, feature)).cache()

#### (b) The regression model and evaluation
Write a function to compute MSE for training and test datasets

In [14]:
def treeMSE(tree,train_feat=features,train_label=labels,test_feat=features_test,test_label=labels_test,log=False):
    '''
    Evaluates training and test error for a pyspark mllib tree model
    '''
    if log:
        train_MSE = train_label.zip(tree.predict(train_feat)).map(lambda (l,p):(l-np.log(p))**2).sum() / train_label.count()
        test_MSE  = test_label.zip(tree.predict(test_feat)).map(lambda (l,p):(l-np.log(p))**2).sum() / test_label.count()
    else:
        train_MSE = train_label.zip(tree.predict(train_feat)).map(lambda (l,p):(l-p)**2).sum() / train_label.count()
        test_MSE  = test_label.zip(tree.predict(test_feat)).map(lambda (l,p):(l-p)**2).sum() / test_label.count()
    return(train_MSE,test_MSE)

####(c) train and test some trees
First, random forrests

In [9]:
model=RandomForest.trainRegressor(data=data,categoricalFeaturesInfo={},numTrees=2,impurity='variance',maxDepth=4,maxBins=25)
trerr,teerr = treeMSE(model)
print 'With %d trees, got %.4f training error MSE and %.4f testing error' % (2,trerr,teerr)

With 2 trees, got 0.0252 training error MSE and 0.0252 testing error


In [42]:
# Try with max depth 4 and max bins 25
# Suprisingly, I don't see a way to add additional trees to a random forrest once its initially fit, so I'm
# averaging accross the forrests on my own to save computation
marg_trees = [10,15,25,50]
model_set1 = []
ct=0
for i in range(len(marg_trees)):
    model_set1.append(RandomForest.trainRegressor(data=data,categoricalFeaturesInfo={},numTrees=marg_trees[i],impurity='variance',maxDepth=4,maxBins=25))
    if i==0:
        train_fit = labels.zip(model_set1[i].predict(features))
        test_fit  = labels_test.zip(model_set1[i].predict(features_test))
    else:
        cw        =            ct/(ct+marg_trees[i])
        mw        = marg_trees[i]/(ct+marg_trees[i])
        train_fit = train_fit.zip(model_set1[i].predict(features)     ).map(lambda ((l,cp),mp): (l,cw*cp+mw*mp) )
        test_fit  =  test_fit.zip(model_set1[i].predict(features_test)).map(lambda ((l,cp),mp): (l,cw*cp+mw*mp) )
    ct += marg_trees[i]
    # Training then test error
    trerr = train_fit.map(lambda (l,p):(l-p)**2).sum()/train_fit.count()
    teerr = test_fit.map(lambda (l,p):(l-p)**2).sum()/test_fit.count()
    print 'With %d trees, got %.4f training error MSE and %.4f testing error' % (ct,trerr,teerr)
    

With 10 trees, got 0.0247 training error MSE and 0.0247 testing error
With 25 trees, got 0.0246 training error MSE and 0.0246 testing error
With 50 trees, got 0.0246 training error MSE and 0.0246 testing error
With 100 trees, got 0.0246 training error MSE and 0.0246 testing error


It doesn't seem to change much even after only 10 trees, probably because it's relatively shallow and on highly reduced data. Lets see if ramping up the complexity improves things. Also, the testing and training error only appear to be the same due to the rounding, they are actually different.

In [43]:
# Max depth 6, max bins 50, 25 trees
model=RandomForest.trainRegressor(data=data,categoricalFeaturesInfo={},numTrees=25,impurity='variance',maxDepth=6,maxBins=50)
trerr,teerr = treeMSE(model)
print '%.4f training error MSE and %.4f testing error' % (trerr,teerr)

0.0239 training error MSE and 0.0239 testing error


Now some gradient boosted trees. Start by looking out how error changes over number of iterations.

In [9]:
# Least squares loss, max depth of 1
model_set2 = []
for itnum in [10,50,100]:
    model_set2.append(GradientBoostedTrees.trainRegressor(data=data,categoricalFeaturesInfo={},numIterations=itnum,loss='leastSquaresError',maxDepth=1))
    trerr,teerr = treeMSE(model_set2[-1])
    print 'With %d iterations, got %.4f training error MSE and %.4f testing error' % (itnum,trerr,teerr)

With 10 iterations, got 0.0257 training error MSE and 0.0258 testing error
With 50 iterations, got 0.0237 training error MSE and 0.0237 testing error
With 100 iterations, got 0.0228 training error MSE and 0.0229 testing error


In [8]:
# max depth of 2, 25 iterations, still least square loss
model=GradientBoostedTrees.trainRegressor(data=data,categoricalFeaturesInfo={},numIterations=25,loss='leastSquaresError',maxDepth=2)
trerr,teerr = treeMSE(model)
print '%.4f training error MSE and %.4f testing error' % (trerr,teerr)

0.0235 training error MSE and 0.0236 testing error


In [10]:
# max depth of 1, 50 iterations, exponential loss
model=GradientBoostedTrees.trainRegressor(data=data,categoricalFeaturesInfo={},numIterations=50,loss='logLoss',maxDepth=1)
trerr,teerr = treeMSE(model)
print '%.4f training error MSE and %.4f testing error' % (trerr,teerr)

2.1660 training error MSE and 2.1668 testing error


As it turns out, if you one uses log loss, one has to take the log of the prediction. Go figure. Here is the actual training error:

In [15]:
trerr,teerr = treeMSE(model, log=True)
print '%.4f training error MSE and %.4f testing error' % (trerr,teerr)

0.0368 training error MSE and 0.0370 testing error
