# Machine Learning and Featurization with Spark

## Imported Libraries

In [1]:
import bz2
import json
import time
from pyspark.ml import Pipeline
from pyspark.ml.feature import * # CountVectorizer, Tokenizer, RegexTokenizer, HashingTF
from pyspark.ml.regression import * # RandomForestRegressor, LinearRegression, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

## Helper Functions

The timeit function will be used to measured the time it takes for functions to run. We can use this to determine the efficiency with a smaller dataset and see how this will translate to the full dataset.

In [2]:
def timeit(method):
    '''
    Decorator to time functions.
    '''
    def timed(*args, **kw):
        ts = time.time()
        result = method(*args, **kw)
        te = time.time()

        print '%r took %2.2f sec\n' % \
              (method.__name__, te-ts)
        return result
    return timed

In [3]:
# SQL example 

# wordsFilteredDF.createOrReplaceTempView("comments_table")
# filtered_words = sqlContext.sql("SELECT filtered_words FROM comments_table")
# filtered_words.show(5, False)

## Prepare Data

### Load data from file

Initially, a portion of the 30 GB will be used to test. The data can be loaded in two ways, directly passing the filename to spark.read.json or using bz2 and reading lines.

In [4]:
@timeit
def load_data(filename, test=True, mb=1):
    '''
    Returns either the a DataFrame containing all the tweets or a test DataFrame containing
    numTest comments.
    
    @params:
        test - boolean, if True return test DataFrame
        mb - the number of megabytes to load from the data set
    '''
    
    # load compressed file
    comments_file = bz2.BZ2File(filename, "r")
    
    # convert the megabytes to bytes
    size = mb * (1024 ** 2)
    
    # load a test dataset of size mb
    if test:
        # create RDD using string returned by reading the comments file
        # specify bytesize of file to read
        commentRDD = sc.parallelize(comments_file.readlines(size))
        # read RDD as json and convert to a DataFrame
        df = spark.read.json(commentRDD)
    # load full dataset
    else:
        df = spark.read.json(filename)
    
    # return a new DataFrame that doesn't contain deleted comments
    return df.filter("body != '[deleted]'")

In [5]:
filename = 'RC_2015-01.bz2'

# load the comments into a DataFrame
commentDF = load_data(filename, mb=1)

# Display comments and information
print "Snippet of Comment DataFrame:"
commentDF.select('id', 'body', 'ups', 'downs', 'gilded', 'subreddit').show(5)
print "Column names of comment DataFrame:"
print commentDF.columns
print "\nThe total number of comments: %s (deleted comments removed)" % commentDF.count()

'load_data' took 15.37 sec

Snippet of Comment DataFrame:
+-------+--------------------+---+-----+------+--------------+
|     id|                body|ups|downs|gilded|     subreddit|
+-------+--------------------+---+-----+------+--------------+
|cnas8zv|Most of us have s...| 14|    0|     0|      exmormon|
|cnas8zw|But Mill's career...|  3|    0|     0|CanadaPolitics|
|cnas8zx|Mine uses a strai...|  1|    0|     0| AdviceAnimals|
|cnas8zz|Very fast, thank ...|  2|    0|     0|    freedonuts|
|cnas900|The guy is a prof...|  6|    0|     0|           WTF|
+-------+--------------------+---+-----+------+--------------+
only showing top 5 rows

Column names of comment DataFrame:
['archived', 'author', 'author_flair_css_class', 'author_flair_text', 'body', 'controversiality', 'created_utc', 'distinguished', 'downs', 'edited', 'gilded', 'id', 'link_id', 'name', 'parent_id', 'retrieved_on', 'score', 'score_hidden', 'subreddit', 'subreddit_id', 'ups']

The total number of comments: 1627 (dele

Now we will create a subset of the comment DataFrame only containing the id, upvotes and body. We will be performing a regression task to determine the number of upvotes.

In [6]:
sentenceDF = commentDF.select('id','ups','body')
sentenceDF.show(n=5)

+-------+---+--------------------+
|     id|ups|                body|
+-------+---+--------------------+
|cnas8zv| 14|Most of us have s...|
|cnas8zw|  3|But Mill's career...|
|cnas8zx|  1|Mine uses a strai...|
|cnas8zz|  2|Very fast, thank ...|
|cnas900|  6|The guy is a prof...|
+-------+---+--------------------+
only showing top 5 rows



Tokenize the body of the comment:
* Use the tokenizer to convert the comment bodies to arrays
* Remove stopwords from words column

In [7]:
# use pyspark tokenizer object to split words in array
pattern = "\\W"
# tokenizer = RegexTokenizer(inputCol="body", outputCol="words", pattern=pattern)
tokenizer = Tokenizer(inputCol="body", outputCol="words")
wordsDF = tokenizer.transform(sentenceDF)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
wordsFilteredDF = remover.transform(wordsDF)

# Remove body and words since they will no longer be used
wordsFilteredDF = wordsFilteredDF.select('id','ups','filtered_words')

We will be comparing two methods of featurization, CountVectorizer and HashingTF. CountVectorizer will create a bag of words representation of the words found in the body of the comment. HashingTF is a Feature Hasher and will also create a bag of words representation but will place similar words into the same bucket to limit the size of the matrix.

In [8]:
@timeit
def term_frequency(df, inputCol, outputCol, hashFeatures=None):
    '''
    Returns a DataFrame object containing a new row with the extracted features. 
    Passing hashed=True will return a Featured Hashed matrix.
    
    @params:
        df - DataFrame
        inputCol - name of input column from DataFrame to find features
        outputCol - name of the column to save the features
        hashFeatures - number of features for HashingTF, if None will perform 
            CountVectorization
    '''
    
    # since the number of features was not passed perform standard CountVectorization
    if hashFeatures is None:
        cv = CountVectorizer(inputCol=inputCol, outputCol=outputCol)
        feature_extractor = cv.fit(wordsFilteredDF)
    # otherwise perform a feature extractor with 
    else:
        feature_extractor = HashingTF(\
                              inputCol=inputCol, outputCol=outputCol, numFeatures=hashFeatures)
    
    # create a new DataFrame using either feature extraction method
    return feature_extractor.transform(df)

In [9]:
# Find the occurence of each word in the comment content
cvDF = term_frequency(\
    df=wordsFilteredDF, inputCol="filtered_words", outputCol="features")

# Display snippet of new DataFrame
cvDF.select('filtered_words','features').show(5)

'term_frequency' took 2.47 sec

+--------------------+--------------------+
|      filtered_words|            features|
+--------------------+--------------------+
|[us, family, memb...|(13306,[2,94,169,...|
|[mill's, career, ...|(13306,[37,476,51...|
|[mine, uses, stra...|(13306,[0,4,13,26...|
|[fast,, thank, you!]|(13306,[95,574,13...|
|[guy, professiona...|(13306,[18,152,38...|
+--------------------+--------------------+
only showing top 5 rows



In [10]:
# Feature Hash the comment content
# number of features for Feature Hash matrix, reccomended too use power of 2
hashDF = term_frequency(\
    df=wordsFilteredDF, inputCol="filtered_words", outputCol="features", hashFeatures=1024)

# Display snippet of new DataFrame
hashDF.select('filtered_words','features').show(5)

'term_frequency' took 0.10 sec

+--------------------+--------------------+
|      filtered_words|            features|
+--------------------+--------------------+
|[us, family, memb...|(1024,[368,386,45...|
|[mill's, career, ...|(1024,[102,211,22...|
|[mine, uses, stra...|(1024,[112,120,18...|
|[fast,, thank, you!]|(1024,[206,220,36...|
|[guy, professiona...|(1024,[95,358,366...|
+--------------------+--------------------+
only showing top 5 rows



## Machine Learning

### Regression: Term Frequency vs. Feature Hashing

In this first example we will be comparing the computation time and accuracy of training a Random Forest Regressor. The expected result is compareable accuracry but the Feature Hashed matrix will be much more efficient.

First we will create a function that will return the predicted DataFrame with the timeit decorator to keep track of run time.

In [11]:
@timeit
def random_forest_regression(df, featuresCol, labelCol):
    '''
    Returns a DataFrame containing a column of predicted values of the labelCol.
    Predict the output of labelCol using values in featuresCol y = rf(x).
    
    @params:
        df - DataFrame
        featuresCol - input features, x
        labelCol - output variable, y
    '''
    # split the training and test data using the holdout method
    (trainingData, testData) = df.randomSplit([0.8, 0.2])
    
    # create the random forest regressor, limit number of trees to ten
    dtr = RandomForestRegressor(\
       featuresCol=featuresCol, labelCol=labelCol)
    
    # fit the training data to the regressor to create the model
    model = dtr.fit(trainingData)
    
    # create a DataFrame contained a column with predicted values of the labelCol
    predictions = model.transform(testData)
    
    return predictions

Then using the Feature Hashed DataFrame:

In [12]:
# train random forest regression
predictions = random_forest_regression(df=hashDF,featuresCol="features",labelCol="ups")

# compute the error
evaluator = RegressionEvaluator(labelCol="ups", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Squared Error (RMSE) on test data = %g" % rmse

'random_forest_regression' took 7.46 sec

Root Mean Squared Error (RMSE) on test data = 6.54565


In [17]:
predictions.show(10)

+-------+---+--------------------+--------------------+-----------------+
|     id|ups|      filtered_words|            features|       prediction|
+-------+---+--------------------+--------------------+-----------------+
|cnas8zv| 14|[us, family, memb...|(13306,[2,94,169,...|2.846309230468395|
|cnas8zz|  2|[fast,, thank, you!]|(13306,[95,574,13...|2.846309230468395|
|cnas908|  1|      [love, music!]|(13306,[78,9501],...|2.846309230468395|
|cnas90l|  2|[roofers,, people...|(13306,[16,370,50...|2.846309230468395|
|cnas90o|  1|[agreed!, get, it...|(13306,[4,7,2347,...|2.846309230468395|
|cnas911|  1|[don't, know, 100...|(13306,[6,10,15,2...|2.846309230468395|
|cnas912|  3|[i'll, try, find,...|(13306,[18,90,96,...|2.846309230468395|
|cnas916|  0|[wish, google, dr...|(13306,[335,573,5...|2.846309230468395|
|cnas917|  2|[[](/hellohuman)m...|(13306,[102,3059,...|2.846309230468395|
|cnas918|  3|[haha, guilty., i...|(13306,[2,16,23,2...|2.846309230468395|
+-------+---+--------------------+----

First using the CountVectorized DataFrame:

In [14]:
# train random forest regression
predictions = random_forest_regression(df=cvDF,featuresCol="features",labelCol="ups")

# compute the error
evaluator = RegressionEvaluator(labelCol="ups", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print "Root Mean Squared Error (RMSE) on test data = %g" % rmse

'random_forest_regression' took 27.53 sec

Root Mean Squared Error (RMSE) on test data = 7.04472
