## Background

This notebook demonstrates the code required to train and deploy two algorithms (linear regression and random forest)
to an MLeap server. 

The dataset used for the demo was pulled together from individual cities' data found here: http://insideairbnb.com/get-the-data.html

The sample code has the following sections:
* Step 1: Load Data: Can be done from a flat file or from a S3 path
* Step 2: Define Dependent and Independent (continuous and categorical) variables + Prep the data
* Step 3: Train a linear regression and random forest model
* Step 4: Convert the Spark Model -> MLeap Model
* Step 5: Save the serialized models to file system
* Step 6: Start MLeap Server and run sample requests against the models

In [5]:
// imports
import java.io.File
import com.esotericsoftware.kryo.io.Output
import com.truecar.mleap.serialization.ml.v1.MlJsonSerializer
import com.truecar.mleap.runtime.transformer.Transformer
import com.truecar.mleap.spark.MleapSparkSupport._
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.{RandomForestRegressor, LinearRegression}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import ml.bundle.fs.DirectoryBundle

### Step 1: Load Data - Can be done from a flat file or from a S3 path

In [6]:
// Step 1. Load our Airbnb dataset

val inputFile = "file:///tmp/airbnb"
val outputFileRf = "/tmp/transformer.rf.ml"
val outputFileLr = "/tmp/transformer.lr.ml"

var dataset = sqlContext.read.format("com.databricks.spark.avro").
  load(inputFile)

### Step 2: Define Dependent and Independent (continuous and categorical) variables + Prep the Data

In [4]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("bathrooms",
  "bedrooms",
  "security_deposit",
  "cleaning_fee",
  "extra_people",
  "number_of_reviews",
  "review_scores_rating")

val categoricalFeatures = Array("room_type",
  "host_is_superhost",
  "cancellation_policy",
  "instant_bookable")

val allFeatures = continuousFeatures.union(categoricalFeatures)

// Filter all null values
val allCols = allFeatures.union(Seq("price")).map(dataset.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
dataset = dataset.select(allCols: _*).filter(nullFilter).persist()
val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array(0.7, 0.3))

val continuousFeatureAssembler = new VectorAssembler().
    setInputCols(continuousFeatures).
    setOutputCol("unscaled_continuous_features")
val continuousFeatureScaler = new StandardScaler().
    setInputCol("unscaled_continuous_features").
    setOutputCol("scaled_continuous_features")

val categoricalFeatureIndexers = categoricalFeatures.map {
    feature => new StringIndexer().
      setInputCol(feature).
      setOutputCol(s"${feature}_index")
}

val featureCols = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features"))
val featureAssembler = new VectorAssembler().
    setInputCols(featureCols).
    setOutputCol("features")
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler).
    union(categoricalFeatureIndexers).
    union(Seq(featureAssembler))
val featurePipeline = new Pipeline().
    setStages(estimators)
val sparkFeaturePipelineModel = featurePipeline.fit(dataset)

Name: Compile Error
Message: <console>:48: error: not found: type PipelineStage
         val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler).
                               ^
StackTrace: 

### Step 3: Train a linear regression and random forest model

In [4]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestRegressor()
    .setFeaturesCol("features")
    .setLabelCol("price")
    .setPredictionCol("price_prediction")
val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)

In [5]:
// Step 3.2 Create our linear regression model
val linearRegression = new LinearRegressor()
    .setFeaturesCol("features")
    .setLabelCol("price")
    .setPredictionCol("price_prediction")
val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, linearRegression))
val sparkPipelineLr = sparkPipelineEstimatorLr.fit(trainingDataset)

### Step 4: Convert the Spark Model -> MLeap Model

In [6]:
// Step 4.1 Assemble the final pipeline (random forest) by implicit conversion to MLeap models
val mleapPipelineRf: Transformer = sparkPipelineRf

In [7]:
// Step 4.2 Assemble the final pipeline (linear regression) by implicit conversion to MLeap models
val mleapPipelineLr: Transformer = sparkPipelineLr

### Step 5: Save the serialized models to file system

In [8]:
// Step 7. Save our MLeap pipeline to a directory
val mleapFileRf = new File(outputFileRf)
val mleapFileLr = new File(outputFileLr)

// if you want to save to S3
// val bundleWriter = S3BundleWriter(s3Path)
val bundleWriterRf = DirectoryBundle(mleapFileRf)
val bundleWriterLr = DirectoryBundle(mleapFileLr)

mleapFileRf.mkdirs()
mleapFileLr.mkdirs()

val serializer = MlJsonSerializer

serializer.serializeWithClass(mleapPipelineRf, bundleWriterRf)
serializer.serializeWithClass(mleapPipelineLr, bundleWriterLr)

### Step 6: Start MLeap Server and run sample requests against the models

In [None]:
// sbt "server/run /tmp/transformer.rf.ml 8080"
// sbt "server/run /tmp/transformer.lr.ml 8081"
// curl -v -XPOST \                                                                                                                                                                 ~ Hollins-MacBook-Pro
//   -H "content-type: application/json" \
//   -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8080/transform
// curl -v -XPOST \                                                                                                                                                                 ~ Hollins-MacBook-Pro
//   -H "content-type: application/json" \
//   -d @/Users/hollinwilkins/Workspace/scratch/frame.json http://localhost:8081/transform

In [None]:
/*
{
  "schema": {
    "fields": [{
      "name": "bathrooms",
      "dataType": "double"
    }, {
      "name": "bedrooms",
      "dataType": "double"
    }, {
      "name": "security_deposit",
      "dataType": "double"
    }, {
      "name": "cleaning_fee",
      "dataType": "double"
    }, {
      "name": "extra_people",
      "dataType": "double"
    }, {
      "name": "number_of_reviews",
      "dataType": "double"
    }, {
      "name": "review_scores_rating",
      "dataType": "double"
    }, {
      "name": "room_type",
      "dataType": "string"
    }, {
      "name": "host_is_superhost",
      "dataType": "string"
    }, {
      "name": "cancellation_policy",
      "dataType": "string"
    }, {
      "name": "instant_bookable",
      "dataType": "string"
    }]
  },
  "rows": [[2.0, 3.0, 50.0, 30.0, 2.0, 56.0, 90.0, "Entire home/apt", "1.0", "strict", "1.0"]]
}
*/