<h1>Machine Learning Library (MLlib)</h1>

[MLlib](http://spark.apache.org/docs/latest/ml-guide.html) is Spark’s machine learning (ML) library. It provides:

- *ML Algorithms*: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- *Featurization*: feature extraction, transformation, dimensionality reduction, and selection
- *Pipelines*: tools for constructing, evaluating, and tuning ML Pipelines
- *Persistence*: saving and load algorithms, models, and Pipelines
- *Utilities*: linear algebra, statistics, data handling, etc.

We carry out the usual settings, classpath and imports, this time including <tt>MLlib</tt>.

In [None]:
val sparkVersion = "2.0.1"
val scalaVersion = scala.util.Properties.versionNumberString

In [None]:
classpath.add(
    "org.apache.spark" %% "spark-yarn" % sparkVersion,
    "org.apache.spark" %% "spark-mllib" % sparkVersion
)

In [None]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.util.MLUtils

// imports for the text document pipeline
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{Tokenizer, StopWordsRemover}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

In [None]:
// Create Spark session
val sparkSession = SparkSession.builder
    .master("local[1]")
    .appName("Spark dataframes and datasets")
    .getOrCreate()

<tt>MLlib</tt> allows easy combination of numerous algorithms into a single pipeline using standardized APIs for machine learning algorithms. The key concepts are:

- **Dataframe**. Dataframes can hold a variety of data types.
- **Transformer**. Transforms one dataframe into another.
- **Estimator**. Algorithm which can be fit on a DataFrame to produce a Transformer.
- **Pipeline**. A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
- **Parameter**. Transformers and Estimators share a common API for specifying parameters.

More details on these below, and a list of some of the available ML features is available [here](http://spark.apache.org/docs/latest/ml-features.html).

<h2>Datasets and Dataframes</h2>

Along with the introduction of <tt>SparkSession</tt>, the <tt>resilient distributed dataset</tt> (RDD) was replaced by [dataset](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset). Again, these are objects which can be worked on in parallel. The available operations are:

- **transformations**: produce new datasets
- **actions**: computations which return results

We will start with creating dataframes and datasets, showing how we can print their contents. We create a dataframe in the cell below and print out some info (we can also modify the output before printing):

In [None]:
// create a dataframe based on the contents of a JSON file
val peopleDF = sparkSession.read.json("files/people.json")

peopleDF.show()

// Print the schema in a tree format
peopleDF.printSchema()

// Select only the "name" column
peopleDF.select("name").show()

// This import is needed to use the $-notation
import sparkSession.implicits._

// Select everybody, but increment the age by 1
peopleDF.select($"name", $"age" + 1).show()

// Select people older than 21
peopleDF.filter($"age" > 21).show()

// Count people by age
peopleDF.groupBy("age").count().show()

Dataset example is in the cell below:

In [None]:
// create a dataset using sparkSession.range starting from 5 to 100, with increments of 5
val numDS = sparkSession.range(5, 100, 5)

// order by column
numDS.orderBy("id").show(5)

import sparkSession.implicits._

numDS.orderBy($"id".desc).show(5)

// compute descriptive stats and display them
numDS.describe().show()

Another dataframe example, showing access to columns:

In [None]:
// create a DataFrame using sparkSession.createDataFrame from a List or Seq
val langPercentDF = sparkSession.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))

// rename the columns
val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")

// order the DataFrame in descending order of percentage
lpDF.orderBy($"percent".desc).show(false)

<h3>Reading text</h3>

Aside from creating a dataset by transforming a previous one, we can also read data from a file directly into a dataset:

In [None]:
// Read a csv file
val dfCrime = sparkSession.read.option("header","true").csv("files/SacramentocrimeJanuary2006.csv")
dfCrime.show()

To read plain text as a dataset, we need an extra <tt>import</tt> for schema conversion. Once the text is read in, operations can be carried out to find line lengths, total length of text or anything else you may want to do:

In [None]:
// Read a plain text file
import sparkSession.implicits._

// class converts from dataframe to dataset output
val bookDS = sparkSession.read.text("files/TaleOfTwoCities.txt").as[String]
bookDS.show()

val lineLengths = bookDS.map(s => s.length)

// To maintain lineLengths in memory
// lineLengths.persist()

val totalLength = lineLengths.reduce((a, b) => a + b)

<h3>Transformations</h3>

We create other datasets from an existing dataset using **transformations**. A list of some of the possible transformations is available [here](http://spark.apache.org/docs/latest/programming-guide.html#transformations), and some examples follow:

In [None]:
val words = bookDS.flatMap(value => value.split("\\s+"))
words.show()
val groupedWords = words.groupByKey(_.toLowerCase)

<h3>Actions</h3>

Some of the most common actions are available from [this page](http://spark.apache.org/docs/latest/programming-guide.html#actions). For example, <tt>count</tt> returns the number of elements in the dataset. 

In [None]:
val counts = groupedWords.count()
counts.show()

<h2>Pipelines</h2>

It is common that a number of algorithms need to run on some data. MLlib allows this to be encoded as a [pipeline](http://spark.apache.org/docs/latest/ml-pipeline.html), and it takes care of input / output of each phase.

We demonstrate a simple pipeline using the task of stop word removal.

In [None]:
// Prepare dataset consisting of (id, text) tuples.
val dataSet = sparkSession.createDataFrame(Seq(
  (0, "I saw the red baloon"),
  (1, "Mary had a little lamb")
)).toDF("id", "text")

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val wordsData = tokenizer.transform(dataSet)
wordsData.select("words").show()

As you will have noticed in the previous notebook's exercises, the most common words in a text are often words such as *and*, *so* etc. These are not informative, and could be removed. Our pipeline is in two stages:

1. tokenizer
2. stop word removal

These two stages are to be run in that order, and the input DataFrame will be transformed as it passes through them. Both stages are Transformer stages, and so the <tt>transform()</tt> method will be called on the DataFrame.

In [None]:
// Configure an ML pipeline, which consists of two stages: tokenizer, and stopWordsRemover.

val tokenizer = new Tokenizer()
    .setInputCol("text")
    .setOutputCol("words")

val remover = new StopWordsRemover()
    .setInputCol("words")
    .setOutputCol("filtered")

val pipeline = new Pipeline()
  .setStages(Array(tokenizer,remover))

val model = pipeline.fit(dataSet)
val result = model.transform(dataSet)
result.show()

To see the full power of pipelines, we present a second example: one which includes an estimator in the form of logistic regression. This pipeline has three steps:

1. split each document's text into words (<i>tokenizer</i>)
2. convert each document's words into a feature vector (<i>hashingTF</i>)
3. learn a prediction model using the features vectors and labels (<i>logistic regression</i>)

For Estimator stages, the <tt>fit()</tt> method is called to produce a Transformer and that Transformer’s <tt>transform()</tt> method is called on the DataFrame.

In [None]:
// Prepare training documents from a list of (id, text, label) tuples.
val training = sparkSession.createDataFrame(Seq(
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
    .setInputCol("text")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.01)

val pipeline = new Pipeline()
    .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = sparkSession.createDataFrame(Seq(
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
    .select("id", "text", "probability", "prediction")
    .collect()
    .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
    }

<h2>Exercises</h2>

<h3>Exercise 1</h3>

In the CSV file above, <tt>[SacramentoCrime](http://samplecsvs.s3.amazonaws.com/SacramentocrimeJanuary2006.csv)</tt>, the <tt>ucr_ncic_code</tt> represents the type of crime carried out. Use any transformations / actions to output crime types in descending order of frequency. You should create this as a standalone program.

<h3>Exercise 2</h3>

As well as the "[TaleOfTwoCities.txt](files/TaleOfTwoCities.txt)", the files directory contains the file "[GreatExpectations.txt](files/GreatExpectations.txt)". Read in both files, and find the top 20 most frequent (overall) words that appear in both documents. (You will need to convert the documents to lower case, but you can assume that ends of line and whitespace indicate word boundaries.)

<h3>Exercise 3</h3>

There are a [lot of transformers and estimators](http://spark.apache.org/docs/latest/ml-features.html) implemented within Spark that can be pipelined. Create a pipeline which prints n-grams from the [TaleOfTwoCities.txt](files/TaleOfTwoCities.txt) and the [GreatExpectations.txt](files/GreatExpectations.txt) files.