# Machine learning in Spark: MLLib and Pipelines #

This lab introduces Spark's main abstractions for machine learning: MLLib and pipelines.

In [20]:
import re

print sc

sql_sc = SQLContext(sc)

df = sqlContext.read.csv(
    "./data/amazon_book_reviews/Andy-Weir-The-Martian.csv",
    sep="\t"
).toDF(*["score", "url", "title", "review"]) \


pos_word_set = set(file("./data/positive_word_list.txt").read().splitlines())
neg_word_set = set(file("./data/negative_word_list.txt").read().splitlines())

print list(pos_word_set)[:10]
print list(neg_word_set)[:10]
    


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

#Create the conversion function
def convert_to_int(x):
    return int(float(x))

assert convert_to_int("1.0") == 1

#Wrap it as a sparkSQL udf
convert_to_int_udf = udf(convert_to_int, IntegerType())

get_len_udf = udf(len, IntegerType())

def positive_sentiment_score(x):
    token_set = set(re.findall('\w+', x.lower()))
    return len(pos_word_set.intersection(token_set))

assert positive_sentiment_score("The saver is sleek and desirable and worked.") == 4

positive_sentiment_score_udf = udf(positive_sentiment_score, IntegerType())

def negative_sentiment_score(x):
    token_set = set(re.findall('\w+', x.lower()))
    return len(neg_word_set.intersection(token_set))

negative_sentiment_score_udf = udf(negative_sentiment_score, IntegerType())

assert negative_sentiment_score("It's a cussed limited intereference threatening to subtract.") == 4

df = df \
    .withColumn("score_int", convert_to_int_udf(df.score)) \
    .withColumn("review_len", get_len_udf(df.review)) \
    .withColumn("positive_sentiment", positive_sentiment_score_udf(df.review)) \
    .withColumn("negative_sentiment", negative_sentiment_score_udf(df.review))

<pyspark.context.SparkContext object at 0x112ec4ed0>
['unencumbered', 'pardon', 'saver', 'desirable', 'encouragingly', 'sleek', 'pamperedly', 'worked', 'undisputed', 'sagely']
['limited', 'subtract', 'suicidal', 'cussed', 'interference', 'dissolution', 'refutes', 'threatening', 'foul', 'obstruction']


## Pipelines ##

Spark's Pipeline abstraction provides helper functions that simplify machine learning.

To start off with, we need to create a `label` column, since that's where MLLib classifiers expect labels to be stored.

In [21]:
def get_binary_label(x):
    return int(x >= 4)

get_binary_label_udf = udf(get_binary_label, IntegerType())


df4 = df.withColumn('label', get_binary_label_udf(df.score_int))

Now we create a pipeline. We're going to use a basic example with three stages:
1. A tokenizer
2. A hashingTF to translate string tokens into feature space
3. Logistic regression

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="review", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=40, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Lazy loading means that our pipeline doesn't do anything yet.

Let's train it against all our data. (This should take a few seconds to run.)

In [23]:
model = pipeline.fit(df4)

Now let's apply the same transformation to our data, to get predicted scores for each review.

(Note: we're using our training set as our testing set. This is a no-no. We'll correct it in a sec.)

In [24]:
prediction = model.transform(df4)

AttributeError: 'DataFrame' object has no attribute 'setTrainRatio'

Here's how our predicted scores turn out:

In [15]:
prediction.groupby('prediction').count().collect()

[Row(prediction=0.0, count=3198), Row(prediction=1.0, count=19373)]

For reference, here's what the features look like.

In [16]:
prediction.select("features").take(20)

[Row(features=SparseVector(262144, {733: 1.0, 1524: 2.0, 2437: 4.0, 4081: 1.0, 4200: 1.0, 4282: 1.0, 5315: 1.0, 8630: 1.0, 8804: 1.0, 9616: 3.0, 9639: 14.0, 9781: 2.0, 9862: 2.0, 10879: 2.0, 12213: 1.0, 12659: 1.0, 13148: 1.0, 13396: 1.0, 14009: 1.0, 14610: 1.0, 14898: 2.0, 15885: 1.0, 15889: 10.0, 16332: 5.0, 18485: 1.0, 18976: 1.0, 21028: 1.0, 21471: 1.0, 23212: 1.0, 23893: 1.0, 24016: 1.0, 24417: 9.0, 24847: 1.0, 24980: 1.0, 25570: 2.0, 26343: 1.0, 26395: 1.0, 26512: 1.0, 27222: 1.0, 29643: 1.0, 31704: 1.0, 33254: 1.0, 33532: 2.0, 33672: 1.0, 33933: 2.0, 33977: 1.0, 34116: 1.0, 36543: 1.0, 36584: 1.0, 38266: 1.0, 38796: 1.0, 39081: 1.0, 40448: 1.0, 41085: 1.0, 42791: 1.0, 46199: 1.0, 46209: 1.0, 46498: 1.0, 48448: 4.0, 50940: 1.0, 52620: 1.0, 53570: 1.0, 54375: 1.0, 54961: 1.0, 55103: 2.0, 55242: 2.0, 56326: 2.0, 57341: 1.0, 58127: 1.0, 59853: 1.0, 62794: 1.0, 64643: 1.0, 66535: 1.0, 66980: 3.0, 68004: 1.0, 68707: 1.0, 68867: 1.0, 69353: 1.0, 69983: 1.0, 70527: 1.0, 71524: 2.0, 7182

Here are labels, probabilities, and predictions for 20 cases.

In [17]:
selected = prediction.select("label", "review", "probability", "prediction")
for row in selected.sample(False, .01).take(20):
    label, text, prob, predicted = row
#     print("(%s) --> prob=%s, prediction=%f" % (text, str(prob), prediction))
    print("label=%f, prob=%s, prediction=%f" % (label, str(prob), predicted))

label=0.000000, prob=[0.985319622253,0.0146803777471], prediction=0.000000
label=0.000000, prob=[1.0,3.05341156721e-21], prediction=0.000000
label=1.000000, prob=[0.000117390370384,0.99988260963], prediction=1.000000
label=1.000000, prob=[1.49743965929e-07,0.999999850256], prediction=1.000000
label=1.000000, prob=[4.21748243921e-06,0.999995782518], prediction=1.000000
label=1.000000, prob=[0.00123833498374,0.998761665016], prediction=1.000000
label=1.000000, prob=[3.76862989875e-08,0.999999962314], prediction=1.000000
label=1.000000, prob=[5.4223280485e-10,0.999999999458], prediction=1.000000
label=1.000000, prob=[2.00260885238e-18,1.0], prediction=1.000000
label=1.000000, prob=[0.00201283737055,0.997987162629], prediction=1.000000
label=1.000000, prob=[0.0388762448605,0.96112375514], prediction=1.000000
label=1.000000, prob=[6.08526327783e-05,0.999939147367], prediction=1.000000
label=1.000000, prob=[0.00409271910503,0.995907280895], prediction=1.000000
label=1.000000, prob=[4.4323236

And finally, here's our confusion matrix.

In [18]:
prediction.groupby(['label', 'prediction']).count().collect()

[Row(label=1, prediction=0.0, count=22),
 Row(label=0, prediction=0.0, count=3176),
 Row(label=1, prediction=1.0, count=19017),
 Row(label=0, prediction=1.0, count=356)]

Only 22+356 incorrect cases out of 22571 = 98.3% accuracy!

Of course, we cheated by mixing training and testing data...

## Exercise 1 - Split training and testing sets ##

Repeat the exercise for another book of your choice. Are the results substantively similar?

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit



df = sqlContext.read.csv(
    "./data/amazon_book_reviews/Andy-Weir-The-Martian.csv",
    sep="\t"
)

train, test = df.randomSplit([0.9, 0.1], seed=12345)

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)


# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

KeyError: Param(parent=u'TrainValidationSplit_482f8763316d8c127224', name='estimatorParamMaps', doc='estimator param maps')

## Exercise 2 - Graphs ##

Graph the distributions
* of predicted scores.
* of predicted scores for false positives.
* of predicted scores for false negatives.
* of review lengths
* of predicted scores for the shortest 10% of reviews.

## Exercise 3 - Split training and testing sets ##

Let's stop cheating on our training accuracy.

Retrain our logistic regression with a proper training and testing set split. Does it make a substantive difference?

(Hint: [https://spark.apache.org/docs/latest/ml-tuning.html#example-model-selection-via-train-validation-split](https://spark.apache.org/docs/latest/ml-tuning.html#example-model-selection-via-train-validation-split))

## Bonus Exercise - Different model specifications ##

MLLib has functions for many other models: [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html)

Try some other model and feature types for training your model. YMMV: some models are not fully reliable, so deployment is still sometimes tricky.

Can you improve prediction accuracy?