In [None]:
%matplotlib inline
import matplotlib
import seaborn as sns
matplotlib.rcParams['savefig.dpi'] = 144

# PySpark MLlib
<!-- requirement: small_data/gutenberg -->
*Official documentation [here](https://spark.apache.org/docs/latest/mllib-guide.html).*

In [None]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")
print sc.version

In [None]:
# needed to convert RDDs into DataFrames
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# Tells Spark to look on the local filesystem
import os
def localpath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

## Algorithms

Spark supports a number of machine-learning algorithms.

- Classification and Regression
    - SVM, linear regression
    - SVR, logistic regression
    - Naive Bayes
    - Decision Trees
    - Random Forests and Gradient-Boosted Trees
- Clustering
    - K-means (and streaming K-means)
    - Gaussian Mixture Models
    - Latent Dirichlet Allocation
- Dimensionality Reduction
    - SVD and PCA
- It also has support for lower-level optimization primitives:
    - Stochastic Gradient Descent
    - Low-memory BFGS and L-BFGS

### Parallelized SGD

For linear models like SVM, Linear Regression, and Logistic Regression, the cost function we're trying to optimize is essentially an average over the individual error term from each data point. This is particularly great for parallelization.  For example, in linear regression, recall that the gradient is

$$\begin{align}
\frac{\partial \log(L(\beta))}{\partial \beta} &= \frac{\partial}{\partial \beta} \frac{1}{2}\sum_j \|y_j - X_{j \cdot} \cdot \beta\| \\
&= \frac{1}{2}\sum_j \frac{\partial}{\partial \beta} \|y_j - X_{j \cdot} \cdot \beta\| \\
& = \sum_j y_j - X_{j \cdot} \cdot \beta \\
& \approx \sum_{sample \mbox{ } j} y_j - X_{j \cdot} \cdot \beta
\end{align}$$

The key *mathematical properties* we have used are:

1. the error functions are the sum of error contributions of different training instances
1. linearity of the derivative
1. associativity of addition
1. downsampling giving an unbiased estimator

Since the last sum is over the different training instances and these are stored on different nodes, we can parallelize the computation of the gradient in SGD across multiple nodes.  Of course, we still need to maintain the running weight $\beta$ that has to be present on every node (through a broadcast variable that is updated).  Notice that SVM, Linear Regression, and Logistic Regression all have error functions that are just sums over training instances so SGD can be used for all these algorithms.

Spark's [implementation](http://spark.apache.org/docs/latest/mllib-optimization.html#stochastic-gradient-descent-sgd) uses a tunable minibatch size parameter to sample a percentage of the features RDD. For each iteration, the updated weights are broadcast to the executors, and the update is calculated for each data point and sent back to be aggregated.

Parallelization handles increasing number of sampled data points m quite well since there are no interaction terms and each calculation is independent. Controlling how the algorithm iterates to convergence is also important, and can be done with parameters for the total iterations and step size.

## ML vs. MLlib packages

Confusingly, there are two machine learning APIs in Spark, the `mllib` package based on RDDs and the `ml` package based on DataFrames. For years these have been developed somewhat in parallel, resulting in duplication and asymmetry in functionality.

With Spark 2.0+, `mllib` is in maintenance mode and will likely be deprecated in future in favor of the DataFrame-based API which more closely resembles libraries like Scikit-learn. Below is one example of the RDD-based API; the rest of the notebook will focus on DataFrames.

In [None]:
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel, LabeledPoint
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.linalg import Vector, Vectors
import random

# parameters
TRAINING_ITERATIONS = 10
TRAINING_FRACTION = 0.6

# generate the data
data = sc.parallelize(xrange(1,10001)) \
    .map(lambda x: LabeledPoint(random.random(), [random.random(), random.random(), random.random()]))

# split the training and test sets
splits = data.randomSplit([TRAINING_FRACTION, 1 - TRAINING_FRACTION], seed=42)
training, test = (splits[0].cache(), splits[1])

# train the model
model = LinearRegressionWithSGD.train(training, iterations=TRAINING_ITERATIONS)

# get r2 score
predictions = test.map(lambda x: (float(model.predict(x.features)), x.label))
print RegressionMetrics(predictions).r2

Maybe we can improve this by modeling the intercept. Use `<shift-tab>` inside the arguments to bring up the docstring for LinearRegressionWithSGD, and rerun the training with an intercept term.

If you're interested in methods for introspecting some of these objects, the `<tab>` and `<shift-tab>` documentation is good. You can also use `dir` in Python to list all the components of something.

In [None]:
dir(test.take(2)[0])

## Spark ML
Spark ML implements the ideas of transformers, estimators, and pipelines by standardizing APIS across machine learning algorithms. This can streamline more complex workflows.

The core functionality includes:
* DataFrames - built off Spark SQL, can be created either directly or from RDDs as seen above
* Transformers - algorithms that accept a DataFrame as input and return a DataFrame as output
* Estimators - algorithms that accept a DataFrame as input and return a Transformer as output
* Pipelines - chaining together Transformers and Estimators
* Parameters - common API for specifying hyperparameters

For example, a learning algorithm can be implemented as an Estimator which trains on a DataFrame of features and returns a Transformer which can output predictions based on a test DataFrame.

Full documentation can be found [here](http://spark.apache.org/docs/latest/ml-guide.html)

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

reviews = [("Prose is well-written, but style is an impediment to learning. Should be called 'Reviewing Spark,' not 'Learning Spark'", 0.0),
            ("Nice Headstart to Spark", 1.0),
            ("Start here: Excellent reference for Spark", 1.0),
            ("Insightful and so Spark-tastic!", 1.0),
            ("Good intro but wordy and lacking details in areas", 0.0),
            ("Best of the Books Currently Available", 1.0),
            ("A good resource for people interested in learning Spark", 1.0),
            ("Great Overview", 1.0)]

test_reviews = [("A decent guided tour of Spark and its major components.", 0.0),
                ("10/10 would buy again", 1.0),
                ("it is simple to follow. well organized. straight ...", 1.0),
                ("Just what you need to get started in Apache Spark.", 1.0),
                ("Very good book for learning Spark", 1.0)]

training = sqlContext.createDataFrame(reviews, ["title", "label"]).cache()
test = sqlContext.createDataFrame(test_reviews, ["title", "label"])

tokenizer = Tokenizer(inputCol="title", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
logreg = LogisticRegression(maxIter=10, regParam=0.01)

tokens = tokenizer.transform(training)
hashes = hashingTF.transform(tokens)
model = logreg.fit(hashes)

# Make predictions on test documents
test_tokens = tokenizer.transform(test)
test_hashes = hashingTF.transform(test_tokens)

prediction = model.transform(test_hashes)
selected = prediction.select("title", "label", "prediction")
for row in selected.collect():
    print(row)

In [None]:
# Note that if you use a PipelineModel it won't have a coefficients attribute.
model.coefficients

In [None]:
prediction.select(["features", "probability", "prediction"]).show(2)

**Exercise**: Rewrite the above using a Pipeline.

## Cross-validation and grid search

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [None]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, logreg])

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(logreg.regParam, [0.1, 0.01]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

*Note*: A more traditional validation set without folding is available in `TrainValidationSplit`.

In [None]:
cvModel = crossval.fit(training)

In [None]:
better_prediction = cvModel.transform(test)

In [None]:
selected = better_prediction.select("title", "label", "prediction")
for row in selected.collect():
    print(row)

In [None]:
cvModel.bestModel.stages

In [None]:
cvModel.bestModel.stages[2].coefficients

In [None]:
training.unpersist()

### Example algorithm: Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec

# text = sc.parallelize(reviews + test_reviews).map(lambda (line, score): (line.split(" "), score)).toDF(['text', 'score'])
gutenberg = sc.textFile(localpath('small_data/gutenberg/')).map(lambda line: (line.split(" "), 1)).toDF(['text', 'score'])
w2v = Word2Vec(inputCol="text", outputCol="vectors")
model = w2v.fit(gutenberg)
result = model.transform(gutenberg)

In [None]:
vectors = model.getVectors().rdd.map(lambda x: (x.word, x.vector))
print model.findSynonyms('woman', 10).rdd.take(10)

In [None]:
king_vec = vectors.lookup('king')[0]
queen_vec = vectors.lookup('queen')[0]
man_vec = vectors.lookup('man')[0]
woman_vec = vectors.lookup('woman')[0]

In [None]:
print king_vec

In [None]:
print queen_vec.squared_distance(king_vec)
print queen_vec.squared_distance(woman_vec)
print queen_vec.squared_distance(man_vec)
print queen_vec.squared_distance(king_vec + man_vec - woman_vec)
print queen_vec.squared_distance(king_vec - man_vec + woman_vec)

## Feature processing

In [None]:
vdf = vectors.toDF(["word", "vector"])

In [None]:
vdf.show(5)

In [None]:
sample_vector = vdf.select("vector").take(1)[0][0]
print len(sample_vector)

In [None]:
from pyspark.ml.feature import VectorSlicer

first_slicer = VectorSlicer(inputCol="vector", outputCol="first", indices=[0])
last_slicer = VectorSlicer(inputCol="vector", outputCol="last", indices=[len(sample_vector) - 1])
med_slicer = VectorSlicer(inputCol="vector", outputCol="med", indices=range(45, 55))

In [None]:
output = med_slicer.transform(last_slicer.transform(first_slicer.transform(vdf)))
output.columns

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["first", "last", "med"],
    outputCol="features")

new_output = assembler.transform(output)
new_output.columns

In [None]:
from pyspark.ml.feature import Binarizer

binarizer = Binarizer(threshold=0.05, inputCol="features", outputCol="bin_features")
new_output_b = binarizer.transform(new_output)
print new_output_b.select(["features", "bin_features"]).take(1)[0]

### Exercise: Use SVM to predict colon cancer from gene expressions
You can start getting a feel for the MLlib operations by following the [Spark docs example](https://spark.apache.org/docs/1.3.0/mllib-linear-methods.html#linear-support-vector-machines-svms) on this dataset.

#### About the data format: LibSVM
MLlib conveniently provides a data loading method, `MLUtils.loadLibSVMFile()`, for the LibSVM format for which many other languages (R, Matlab, etc.) also have loading methods.  
A dataset of *n* features will have one row per datum, with the label and values of each feature organized as follows:
>{label} 1:{value} 2:{value} ... n:{value}

Take these two datapoints with six features and labels of -1 and 1 respectively as an example:
>-1.000000  1:2.080750 2:1.099070 3:0.927763 4:1.029080 5:-0.130763 6:1.265460  
1.000000  1:1.109460 2:0.786453 3:0.445560 4:-0.146323 5:-0.996316 6:0.555759 

#### About the colon-cancer dataset
This dataset was introduced in the 1999 paper "Broad patterns of gene expression revealed by clustering analysis of tumor and normal colon tissues probed by oligonucleotide arrays." (Available on PNAS)

Here's the abstract of the paper:  
> *Oligonucleotide arrays can provide a broad picture of the state of the cell, by monitoring the expression level of thousands of genes at the same time. It is of interest to develop techniques for extracting useful information from the resulting data sets. Here we report the application of a two-way clustering method for analyzing a data set consisting of the expression patterns of different cell types. Gene expression in 40 tumor and 22 normal colon tissue samples was analyzed with an Affymetrix oligonucleotide array complementary to more than 6,500 human genes. An efficient two-way clustering algorithm was applied to both the genes and the tissues, revealing broad coherent patterns that suggest a high degree of organization underlying gene expression in these tissues. Coregulated families of genes clustered together, as demonstrated for the ribosomal proteins. Clustering also separated cancerous from noncancerous tissue and cell lines from in vivo tissues on the basis of subtle distributed patterns of genes even when expression of individual genes varied only slightly between the tissues. Two-way clustering thus may be of use both in classifying genes into functional groups and in classifying tissues based on gene expression.*

There are 2000 features, 62 data points (40 tumor (label=0), 22 normal (label=1)), and 2 classes (labels) for the colon cancer dataset. 

#### Exit Tickets
1. When would you use `org.apache.spark.mllib.linalg.Vector` versus `breeze.linalg.DenseVector`?
1. Why can SVM, Linear Regression, and Logistic Regression be parallelized?  How would you parallelize KMeans?


In [None]:
sc.stop()

*Copyright &copy; 2015 The Data Incubator.  All rights reserved.*