Spark Machine Learning Library (MLlib)

* Objectives:
    * How to preprocess data with MLlib
    * How to munge data with MLlib
    * How to train models with MLlib
    * How to make predictions at scale on data with MLlib

1) **Machine Learning Library (MLlib)** - perform large scale machine learning with a built-in library of machine learning algorithms
* Example: Retail Store with series of transformations
    * Prepping retail store data into numerical representation
    ```scala
    %scala
    import org.apache.spark.sql.functions.date_format
    val preppedDataFrame = staticDataFrame
        .na.fill(0)
        .withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
        .coalesce(5) // (partitions data) It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept
    ```
    ```python
    %python
    from pyspark.sql.functions import date_format, col
    preppedDataFrame = staticDataFrame \
        .na.fill(0) \
        .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE")) \
        .coalesce(5)
    ```
    * Split data into training and test sets by date that a certain purchase occurred
        * Use validation splits or cross-validation to create training/test set
    ```scala
    %scala
    val trainDataFrame = preppedDataFrame
       .where("InvoiceDate < '2011-07-01'")
    val testDataFrame = preppedDataFrame
       .where("InvoiceDate >= '2011-07-01'")
    ```
    ```python
    %python
    trainDataFrame = preppedDataFrame \
        .where("InvoiceDate < '2011-07-01'")
    testDataFrame = preppedDataFrame \
        .where("InvoiceDate >= '2011-07-01'")
    ```
    * Turn days of weeks into corresponding numerical values (Saturday as 6 and Monday as 1)
        * However this implicitly stating that Saturday is greater than Monday by pure numerical values, which is incorrect
    ```scala
    %scala
    import org.apache.spark.ml.feature.StringIndexer
    val indexer = new StringIndexer()
        .setInputCol("day_of_week")
        .setOutputCol("day_of_week_index")
    ```
    ```python
    %python
    from pyspark.ml.feature import StringIndexer
    indexer = StringIndexer() \
        .setInputCol("day_of_week") \
        .setOutputCol("day_of_week_index")
    ```
    * Turn days of week into numerical representation using boolean flags
    ```scala
    %scala
    import org.apache.spark.ml.feature.OneHotEncoder
    val indexer = new OneHotEncoder()
        .setInputCol("day_of_week")
        .setOutputCol("day_of_week_index")
    ```
    ```python
    %python
    from pyspark.ml.feature import OneHotEncoder
    indexer = OneHotEncoder() \
        .setInputCol("day_of_week") \
        .setOutputCol("day_of_week_index")
    ```
    * Each of these will result in a set of columns that we will "assemble" into a vector. All machine learning algorithms in Spark take as input a `Vector` type, which must be a set of numerical values.
    ```scala
    %scala
    import org.apache.spark.ml.feature.VectorAssembler
    val vectorAssembler = new VectorAssembler()
        .setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
        .setOutputCol("features")
    ```
    ```python
    %python
    from pyspark.ml.feature import VectorAssembler
    vectorAssembler = VectorAssembler() \
        .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"]) \
        .setOutputCol("features")
    ```
    * We can see that we have 4 key features, the price, the quantity, and the day of week. Now we’ll set this up into a pipeline so any future data we need to transform can go through the exact same process.
    ```scala
    %scala
    import org.apache.spark.ml.Pipeline
    val transformationPipeline = new Pipeline()
        .setStages(Array(indexer, encoder, vectorAssembler))
    ```
    ```python
    %python
    from pyspark.ml import Pipeline
    transformationPipeline = Pipeline() \
        .setStages([indexer, encoder, vectorAssembler])
    ```
    * Now preparing for training is a two step process. We first need to fit our transformers to this dataset. Once we fit the training data, we are now create to take that fitted pipeline and use it to transform all of our data in a consistent and repeatable way.
    ```scala
    %scala
    val fittedPipeline = transformationPipeline.fit(trainDataFrame)
    val transformedTraining = fittedPipeline.transform(trainDataFrame)
    ```
    ```python
    %python
    fittedPipeline = transformationPipeline.fit(trainDataFrame)
    transformedTraining = fittedPipeline.transform(trainDataFrame)
    ```
    * At this point, it’s worth mentioning that we could have included our model training in our pipeline. We chose not to in order to demonstrate a use case for caching the data. At this point, we’re going to perform some hyperparameter tuning on the model, since we do not want to repeat the exact same transformations over and over again, we’ll instead cache our training set. This is worth putting it into memory because that will allow us to e iciently, and repeatedly access it in an already transformed state. If you’re curious to see how much of a di erence this makes, skip this line and run the training without caching the data. Then try it a er caching, you’ll see the results are (very) significant.
    ```scala
    %scala
    transformedTraining.cache()
    ```
    ```python
    %python
    transformedTraining.cache()
    ```
    * Now we have a training set, now it’s time to train the model. First we initialize an untrained model, then we train it. There are always two types for every algorithm in MLlib’s DataFrame API. The algorithm Kmeans and then the trained version which is a KMeansModel. We can see the resulting cost at this point. Which is quite high, that’s likely because we didn’t necessary scale our data or transform. 
    ```scala
    %scala
    import org.apache.spark.ml.clustering.KMeans
    val kmeans = new KMeans()
        .setK(20)
        .setSeed(1L)
    val kmModel = kmeans.fit(transformedTraining)
    kmModel.computeCost(transformedTraining)
    val transformedTest = fittedPipeline.transform(testDataFrame)
    kmModel.computeCost(transformedTest)
    ```
    ```python
    %python
    from pyspark.ml.clustering import KMeans
    kmeans = KMeans() \
        .setK(20) \
        .setSeed(1L)
    kmModel = kmeans.fit(transformedTraining)
    kmModel.computeCost(transformedTraining)
    transformedTest = fittedPipeline.transform(testDataFrame)
    kmModel.computeCost(transformedTest)
    ```