# ML Pipeline example

## Setup and Initialization

In [1]:
// Set log level to ERROR (less verbose)
sc.setLogLevel("ERROR")

Intitializing Scala interpreter ...

Spark Web UI available at http://252f7d7c2f69:4040
SparkContext available as 'sc' (version = 2.4.2, master = local[*], app id = local-1559548536709)
SparkSession available as 'spark'


### Importing Libraries

In [2]:
// Import libs
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row


## Data Sources

### Training Set

In [3]:
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d b d b d b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

training.show()

+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1| b d b d b d b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+



training: org.apache.spark.sql.DataFrame = [id: bigint, text: string ... 1 more field]


### Test Set

In [4]:
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n o p q"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

test.show

+---+------------------+
| id|              text|
+---+------------------+
|  4|       spark i j k|
|  5|       l m n o p q|
|  6|spark hadoop spark|
|  7|     apache hadoop|
+---+------------------+



test: org.apache.spark.sql.DataFrame = [id: bigint, text: string]


## Pipeline

### Transformer Components

In [5]:
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_0b3ccea7c12a


In [6]:
// Let's see how it works
tokenizer.transform(training)
  .show()

+---+----------------+-----+--------------------+
| id|            text|label|               words|
+---+----------------+-----+--------------------+
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|
|  1| b d b d b d b d|  0.0|[b, d, b, d, b, d...|
|  2|     spark f g h|  1.0|    [spark, f, g, h]|
|  3|hadoop mapreduce|  0.0| [hadoop, mapreduce]|
+---+----------------+-----+--------------------+



In [7]:
val hashingTF = new HashingTF()
  .setNumFeatures(20)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")

hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_e964a2089549


In [8]:
// Let's see the TF outputs
hashingTF.transform(tokenizer.transform(training))
  .show()

+---+----------------+-----+--------------------+--------------------+
| id|            text|label|               words|            features|
+---+----------------+-----+--------------------+--------------------+
|  0| a b c d e spark|  1.0|[a, b, c, d, e, s...|(20,[1,2,5,10,14,...|
|  1| b d b d b d b d|  0.0|[b, d, b, d, b, d...|(20,[1,14],[4.0,4...|
|  2|     spark f g h|  1.0|    [spark, f, g, h]|(20,[2,5,8,17],[1...|
|  3|hadoop mapreduce|  0.0| [hadoop, mapreduce]|(20,[1,13],[1.0,1...|
+---+----------------+-----+--------------------+--------------------+



### Estimator Component

In [9]:
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_1e8bbe3b6377


### The Pipeline

In [10]:
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

pipeline: org.apache.spark.ml.Pipeline = pipeline_fa791550969a


### Fitting The Pipeline

In [11]:
// Fit the pipeline to training documents.
val model = pipeline.fit(training)

model: org.apache.spark.ml.PipelineModel = pipeline_fa791550969a


### Saving The Pipeline

In [12]:
// Now we can optionally save the fitted pipeline to disk
model.write
  .overwrite()
  .save("/tmp/spark-logistic-regression-model")

In [13]:
// We can also save this unfit pipeline to disk
pipeline.write
  .overwrite()
  .save("/tmp/unfit-lr-model")

## Evaluation

### Loading Pipeline

In [14]:
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

sameModel: org.apache.spark.ml.PipelineModel = pipeline_fa791550969a


In [15]:
// Compare the original model with its copy
model.transform(training)
  .select("id", "text", "probability", "prediction", "label")
  .toDF.show()

sameModel.transform(training)
  .select("id", "text", "probability", "prediction", "label")
  .toDF.show()

+---+----------------+--------------------+----------+-----+
| id|            text|         probability|prediction|label|
+---+----------------+--------------------+----------+-----+
|  0| a b c d e spark|[0.00169244465497...|       1.0|  1.0|
|  1| b d b d b d b d|[0.99827725601696...|       0.0|  0.0|
|  2|     spark f g h|[0.00128335308370...|       1.0|  1.0|
|  3|hadoop mapreduce|[0.99700847846575...|       0.0|  0.0|
+---+----------------+--------------------+----------+-----+

+---+----------------+--------------------+----------+-----+
| id|            text|         probability|prediction|label|
+---+----------------+--------------------+----------+-----+
|  0| a b c d e spark|[0.00169244465497...|       1.0|  1.0|
|  1| b d b d b d b d|[0.99827725601696...|       0.0|  0.0|
|  2|     spark f g h|[0.00128335308370...|       1.0|  1.0|
|  3|hadoop mapreduce|[0.99700847846575...|       0.0|  0.0|
+---+----------------+--------------------+----------+-----+



## Evaluating Test Set

In [16]:
// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, pred: Double) =>
    println(s"($id, $text)\t --> prob=$prob\t prediction=$pred")
  }

(4, spark i j k)	 --> prob=[0.20800534226347644,0.7919946577365236]	 prediction=1.0
(5, l m n o p q)	 --> prob=[0.31872314837045623,0.6812768516295437]	 prediction=1.0
(6, spark hadoop spark)	 --> prob=[0.04813200761161769,0.9518679923883824]	 prediction=1.0
(7, apache hadoop)	 --> prob=[0.8822485586556412,0.11775144134435879]	 prediction=0.0
