In [None]:
%pylab inline
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import Row, functions as F
from pyspark.ml.feature import Tokenizer, HashingTF, Binarizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

# Introduction
In this tutorial, we will use spark.ml to build an ML pipeline using the reddit comment dataset.

We are going to build a model to classify reddit comments as positive or negative based on the content of each comment.



## Read in the Reddit data
#### Reading in this dataset takes approximately 17 minutes. A good time for a coffee.

In [None]:
fields = [StructField("archived", BooleanType(), True),
        StructField("author", StringType(), True),
        StructField("author_flair_css_class", StringType(), True),
        StructField("body", StringType(), True),
        StructField("controversiality", LongType(), True),
        StructField("created_utc", StringType(), True),
        StructField("day", LongType(), True),
        StructField("distinguished", StringType(), True),
        StructField("downs", LongType(), True),
        StructField("edited", StringType(), True),
        StructField("gilded", LongType(), True),
        StructField("id", StringType(), True),
        StructField("link_id", StringType(), True),
        StructField("month", LongType(), True),
        StructField("name", StringType(), True),
        StructField("parent_id", StringType(), True),
        StructField("retrieved_on", LongType(), True),
        StructField("score", LongType(), True),
        StructField("score_hidden", BooleanType(), True),
        StructField("subreddit", StringType(), True),
        StructField("subreddit_id", StringType(), True),
        StructField("ups", LongType(), True),
        StructField("year", LongType(), True)]
# You can try the json read later, but it's mainly to understand the performance benefit between json and parquet file formats
#rawDF = sqlContext.read.json("s3a://reddit-comments/2009", StructType(fields)).persist(StorageLevel.MEMORY_AND_DISK_SER)
rawDF = sqlContext.read.parquet("s3a://reddit-comments-parquet/year=2009").persist(StorageLevel.MEMORY_AND_DISK_SER)

In [None]:
rawDF.count()

In [None]:
rawDF.printSchema()

## Selecting a subset of the data
To keep the tutorial fast, we'll just use the reddit.com subreddit comments here. You could easily extend this to a general model for comments across all subreddits.

We clean the data by removing deleted comments and comments that do not have a score. We also cast the columns into the correct datatype for Spark's Transformers.


In [None]:
# Select columns that are needed for the training and testing
# Cast columns to the correct datatype for Transformers
# Only use comments that have been upvoted or downvoted
def cast_col(df, col, cast_type):
    '''
    Function to cast column into datatype for Transformers. 
    The format may seem very un-pythonic but because Spark is written in Scala, 
    the columns are immutable and so we need to create a new temporary column (temp_col)
    '''
    return df.withColumn("temp_col", df[col].cast(cast_type))\
             .drop(col)\
             .withColumnRenamed("temp_col", col)

# filter out comments with no score, comments that are deleted, and only use the reddit.com subreddit comments
filteredDF = rawDF.select("id", "body", "score", "score_hidden", "subreddit")\
                  .filter(rawDF.body != "[deleted]")\
                  .filter(rawDF.score_hidden == False)\
                  .filter(rawDF.subreddit == "reddit.com")
castedDF = cast_col(filteredDF, "score", DoubleType())

print "Sample size: {}".format(castedDF.count())

castedDF.registerTempTable("rc")
query = sqlContext.sql("""
    SELECT score, COUNT(*) as cnt FROM rc
    GROUP BY score
    ORDER BY cnt DESC
    """)
result = query.toPandas()


Let's plot the distribution of comment scores to see what the data looks like:

In [None]:
result.plot(x="score", y="cnt", kind="scatter")
plt.xlim([-20,20])
plt.ylim([0, 800000])

Most comments have a score of 1, and the number of comments drops very rapidly on either side of this.

## Creating a labeled dataset
Now that we have cleaned the data, let's create labels for our positive comments and negative comments. 

Since the majority of comments have a score of 0-3, in this example we are going to assume that a comment needs a score < 0 to be a negative comment and > 10 to be a positive comment (10% of the data falls into this classification scheme). 

You probably noticed that conveniently the number of positive comments and the number of negative comments are about equal. A balanced labeled dataset will help simplify training and validating our model.


In [None]:
negativeDF = castedDF.filter(castedDF["score"] < 0)
positiveDF = castedDF.filter(castedDF["score"] > 10)

print negativeDF.count(), positiveDF.count(), castedDF.count()

## Split into training and testing data
Now let's combine the positive comments and negative comments, and randomly split them into training and testing datasets (80% in the training set, 20% in the testing set). 

We'll put the testing dataset aside for now and use the training dataset to train our model. Once the model is trained, we can use the testing dataset to validate the model.



In [None]:
# Split dataset into training and testing
mergedDF = negativeDF.unionAll(positiveDF)
splitDF = mergedDF.randomSplit([0.8, 0.2])
trainingDF = splitDF[0]
testingDF = splitDF[1]

trainingDF.persist(StorageLevel.MEMORY_AND_DISK)
testingDF.persist(StorageLevel.MEMORY_AND_DISK)

print "training size: {}".format(trainingDF.count())
print "negative sentiment: {}".format(trainingDF.filter(trainingDF.score<0).count())
print "positive sentiment: {}".format(trainingDF.filter(trainingDF.score>0).count())
print "testing size: {}".format(testingDF.count())


## Training a model
As features for our model we are going to use the frequency of each word in a comment.

We build an ML pipeline by chaining together the binarizer, tokenizer, hashingTF and logisticregression. Then we  fit the model to the training dataset and make predictions on the testing dataset.


In [None]:
#binarize -- create a column called 'label' that has converted the score into a column 
#that contains a 0 or 1, depending on the threshold variable
binarizer = Binarizer(threshold=0.0, inputCol="score", outputCol="label")
#tokenize the text into individual words
tokenizer = Tokenizer(inputCol="body", outputCol="words")
#hasgingTF - calculate the term frequency and send the resulting values to a column called 'features'
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

#now create a logistic regression
#maxIter is the maximum number of iterations completed when running the fit
#regParam is the regularization strength
lr = LogisticRegression(maxIter=10, regParam=0.01)

#put it all into a pipeline estimator, fit on the training data, and test on the testing data!
pipeline = Pipeline(stages=[binarizer, tokenizer, hashingTF, lr])
model = pipeline.fit(trainingDF)
prediction = model.transform(testingDF)

## Model validation
### Let's check how well we did:

In [None]:
evaluator = BinaryClassificationEvaluator() # the default metric is the area under the RoC curve
train_RoC = evaluator.evaluate(model.transform(trainingDF))
test_RoC = evaluator.evaluate(model.transform(testingDF))

print "The area under the RoC curve for the training set is {} and for the test set is {}".format(train_RoC, test_RoC)

### Let's also explore the errors

In [None]:
selected = prediction.select("id", "body", "prediction", "label")
positive_score_rate = binarizer.transform(mergedDF).map(lambda r: r.label).mean()

def typeI_II(row):
    if row.prediction == 0 and row.label == 0:
        return Row(error_type="true_neg", cnt=1)
    elif row.prediction == 0 and row.label == 1:
        return Row(error_type="false_neg", cnt=1)
    elif row.prediction == 1 and row.label == 0:
        return Row(error_type="false_pos", cnt=1)
    else:
        return Row(error_type="true_pos", cnt=1)

typeI_II_DF = selected.map(lambda r: typeI_II(r)).toDF()
type_error_pd = typeI_II_DF.groupBy("error_type")\
                           .sum("cnt")\
                           .withColumnRenamed("SUM(cnt)", "cnt").toPandas()

type_error_pd["tot"] = type_error_pd["cnt"].sum(axis=0)
type_error_pd["perc"] = type_error_pd["cnt"]/type_error_pd["tot"]
print type_error_pd
print "percentage of comments with positive score in full set: {0:.2f}".format(positive_score_rate)



## Task 1: Based on the training and test area under RoC, we've really overfit! Consider why. Which other models or analysis steps do we need?
Check into the elasticNetParam option to determine what type of regularlization we have used in our default model. 
Additionally, one could consider trying a Naive Bayes model, or using the pyspark.ml.feature.IDF transformer, or removing stopwords.

### We could also tune the hyperparameters using cross-validation:

In [None]:
#Create a grid of hyperparameters - careful, this could take a very long time to run with many points in the grid!
lr_grid = LogisticRegression()
pipeline = Pipeline(stages=[binarizer, tokenizer, hashingTF, lr_grid])

grid = ParamGridBuilder()\
        .baseOn({lr_grid.labelCol: 'label'})\
        .addGrid(lr_grid.regParam, [0.01, 0.1])\
        .addGrid(lr_grid.elasticNetParam, [0.5,0.75])\
        .addGrid(lr_grid.maxIter, [1, 2])\
        .build()
    
print 'optimized over a grid of {} parameter combinations'.format(len(grid))
evaluator = BinaryClassificationEvaluator() # the default is the area under the RoC curve
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator, numFolds=4)
cvModel = cv.fit(trainingDF)


In [None]:
print evaluator.evaluate(cvModel.transform(trainingDF))
print evaluator.evaluate(cvModel.transform(testingDF))

#### We've reduced the overfitting and also have a simple model that performs a bit better than random