# Spark ML/MLlib - Machine Learning with Spark for beginners

### Agenda

1. About Spark (with projects)
2. About these notebooks
3. *RDD* vs *DataFrame* and *MLlib* vs *ML*
4. ML overview
5. Pipelines
6. Supervised learning
7. We need to go deeper!
8. Where to go from there

### test of environment

In [None]:
println("hello from Scala")


### about this notebooks

This is a fork  of a popular [Docker](http://docker.com) image ["All spark notebook"](https://hub.docker.com/r/jupyter/all-spark-notebook/) by [Jupyter](http://jupyter.org/).

I used image mentioned above as baseline and included this notebook and couple of datasets.

### why notebooks?

Notebooks lets you mix code and documentation (uses Markdown). Results can contain code, graphs of results, etc. I hope you also find notebooks convenient for easy to start, easy to reproduce environment.

#### Spark overview

**Apache Spark** is general purpose engine for distributed data processing. The foundation of Spark is **Spark Core** — software layer that handles task dispatching, scheduling, IO operations. The center of Spark Core API is a concept of Resilient Distributed Dataset (**RDD**) — a collection that is distibuted across Spark cluster

There are 4 officials project on top **Spark Core**:

* **Spark SQL**: high level API of Spark that provides DataFrame and SQL abstraction over Dataset. You should always try to use high level API first.

* **Spark (Structured) Streaming**: Provides interface for streaming. Structured streaming provides high level DataFrame interface. It ingests data in mini-batches and perfroms transformations on those data. High level interface leverages DataFrames to allows to reuse code for both streaming and batch processing, helping to create [lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture)

* **Spark MLlib (ML)**: distributed machine learning framework, that provides:
    * [summary statistics](https://en.wikipedia.org/wiki/Summary_statistics), [correlations](https://en.wikipedia.org/wiki/Correlation_and_dependence), [stratified sampling](https://en.wikipedia.org/wiki/Stratified_sampling), [hypothesis testing](https://en.wikipedia.org/wiki/Statistical_hypothesis_testing), random data generation
    * [classification](https://en.wikipedia.org/wiki/Statistical_classification) and [regression](https://en.wikipedia.org/wiki/Regression_analysis): [support vector machines](https://en.wikipedia.org/wiki/Support_vector_machines), [logistic regression](https://en.wikipedia.org/wiki/Logistic_regression), [linear regression](https://en.wikipedia.org/wiki/Linear_regression), decision trees, [naive Bayes classification](https://en.wikipedia.org/wiki/Naive_Bayes_classifier)
    * [collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering) techniques including alternating least squares (ALS)
    * [cluster analysis](https://en.wikipedia.org/wiki/Cluster_analysis) methods including [k-means](https://en.wikipedia.org/wiki/K-means_clustering), and [Latent Dirichlet Allocation (LDA)](https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation)
    * [dimensionality reduction](https://en.wikipedia.org/wiki/Dimensionality_reduction) techniques such as [singular value decomposition (SVD)](https://en.wikipedia.org/wiki/Singular-value_decomposition), and [principal component analysis (PCA)](https://en.wikipedia.org/wiki/Principal_component_analysis)
    * [feature extraction](https://en.wikipedia.org/wiki/Feature_extraction) and [transformation functions](https://en.wikipedia.org/wiki/Data_transformation_(statistics))
    * optimization algorithms such as [stochastic gradient descent](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [limited-memory BFGS (L-BFGS)](https://en.wikipedia.org/wiki/Limited-memory_BFGS)



![Apache Spark Architecture](https://spark.apache.org/images/spark-stack.png)


### _RDD_ vs _DataFrame_ and _MLlib_ vs _ML_

Spark has 2 API:
* **Structured API** — high level API (includes DataFrame, Dataset, SparkSQL, Views and various interfaces for manipulation all sorts of data, from unstructured log files to semi-structured CSV to highly structured relation tables or Parquet files)
* **Low-level API** — you should always favor high-level API, since it is well suited for most scenarios, however there are times where high-level manipulations will not meet business or engineering problem you are trying to solve. For those cases you can try to use Spark's low-level API, specifically RDD, SparkContext, distributed shared variables, like accumulators, broadcast variables and etc.

Spark's Machine learning library have two implementations **RDD based** and **DataFrame Base**.
As of Spark 2.0 RDD-based API is in maintenance mode, so you should check for new features in Spark Dataframe MLlib library. It often called Spark ML. (due to _org.apache.spark.ml_ package)


### ML overview

Machine learning is set of techniques aimed at deriving insights and making predicitions or recommendationd based on dataset.
Common task include:
1. **Recomendation engines** — transform existing information about customer choices into suggestetions for new or existing customers
2. **Unsupervised learning** — clustering, anomaly detection, topic modeling, etc — techniques for discovery structure of the data
3. **Supervised learning** — classification and regression. Goal to predict a label for data point using features.

On a very high-level typical machine learning process looks like that:

![Machine learning process](https://i.imgur.com/0XAG2c0.png)


MLlib is a package, built on and included in Spark, that provides interfaces for: gathering and cleaning data, feature engineering and feature selection, training and tuning large-scale supervised and unsupervised machine learning models, and using those models in production.

### Why should anyone use Spark MLlib?

Basically, you have a lot of ML frameworks, but not all of them scales as well as Spark MLlib.
Spark is distributed first framework and leverages distributed collections to do heavy lifting for you — reduce amount of time needed for training a model and allow use datasets lager than 1 computer instance.

### Pipelines

Spark ML library utilizes Dataframes as data source via **ML Pipelines API**.
Pipeline helps developer to create sequence of data transofmations.

Main Pipeline concepts:
* **Dataframe** — represent our data. Can hold a variety of data types (text, feature vectors, ground truth labels, predictions)
* **Transformer** — algorithm that transforms Dataframe into another Dataframe.
    Examples:
    * Normalizer — it transforms raw data to normalized data
    * ML model — transforms DataFrame to DataFrame with predictions
    
* **Estimator** — algoritm that can be fit on Dataframe to produce Transfromer. Example:
    * Learning algorithm — takes a DataFrame to fit (training set) and gives ML model (Transfomer) 

* **Pipeline** — chain of trasformers and estimators. Pipeline represents ML workflow

* **Parameter** — API to share parameters in Pipeline

Pipeline is a sequence of stages, either Transformer or Estimator. When _fit()_ method called on a pipeline:
    * For each Transormer method _transform()_ called on Dataframe.
    * On Estimator stages _fit()_ called to produce Transformer. If there are more than one estimator, calls _transorm()_ on this transformer also.

##### Example Pipeline
before fit:
![before fit](https://spark.apache.org/docs/latest/img/ml-Pipeline.png)

1. Tokenizer — splits text into words
2. HashingTF — converts words to feature vectors (adding new column to Dataframe)
3. Logistic regression — classifies input vector

after calling _fit()_ on Pipeline it becomes _PipelineModel_ (Transformer)

after fit:
![after fit](https://spark.apache.org/docs/latest/img/ml-PipelineModel.png)

### Lets write an example pipeline :)

In [None]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")

val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")

val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.001)

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq((4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test).select("id", "text", "probability", "prediction").collect().foreach {
    case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
