# Develop a Scala Spark Model on Chicago Building Violations


Combine the strengths of Data Science Experience Cloud and DSX Local to model `Building Violations` in Chicago.
___________
This notebook runs on Scala 2.11 and Spark 2.0

This notebook can be used as a companion to another [tutorial on our blog](https://medium.com/@adammassachi/dsx-hybrid-mode-91b580450c5b).   

* The data are <a href=https://data.cityofchicago.org/Buildings/Building-Violations/22u3-xenr target="_blank" rel="noopener noreferrer">Violations issued by the Chicago Department of Buildings</a>
 over the period from 2006 until present. The dataset contains instances of `violations`. Each violation is assocaiated with an `inspection` and an `inspection status`. 


* Using Spark Machine Learning, we're going to develop a model for the data from 2006-2016 which provides a score in the interval $[0,1]$ for how likely we believe an individual building is to `Pass` or `Fail` an inspection. 

Download the data and bring it into your DSX project. 
______________
## Table of contents
1. [Wrangle data](#wrangle)
2. [Build a pipeline](#build)
3. [Train model](#train)
4. [Save model](#save)
5. [Deploy model](#deploy)
_________________

<a id="wrangle"></a>
### 1. Wrangle data 
Next, we’ll read the data into a Spark DataFrame. This is straightforward on DSX. First, upload the data to your project. You can also access this data via API.

In [3]:
// Import top level
import scala.sys.process._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.util._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.DateFormatClass
import com.ibm.ibmos2spark.bluemix

In [4]:
//get your credentials
var bmos = new bluemix(sc, configurationname, credentials)

In [5]:
// Get the data
// Add your credentials when you Insert the DataFrame
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder().
    getOrCreate()
val violations = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("header", "true").
    option("inferSchema", "true").
    option("dateFormat", "MM/dd/yyyy").
    load(bmos.url("hybridDemos", "Building_Violations.csv"))
violations.printSchema()


 |-- ID: integer (nullable = true)
 |-- VIOLATION LAST MODIFIED DATE: string (nullable = true)
 |-- VIOLATION DATE: string (nullable = true)
 |-- VIOLATION CODE: string (nullable = true)
 |-- VIOLATION STATUS: string (nullable = true)
 |-- VIOLATION STATUS DATE: string (nullable = true)
 |-- VIOLATION DESCRIPTION: string (nullable = true)
 |-- VIOLATION LOCATION: string (nullable = true)
 |-- VIOLATION INSPECTOR COMMENTS: string (nullable = true)
 |-- VIOLATION ORDINANCE: string (nullable = true)
 |-- INSPECTOR ID: string (nullable = true)
 |-- INSPECTION NUMBER: string (nullable = true)
 |-- INSPECTION STATUS: string (nullable = true)
 |-- INSPECTION WAIVED: string (nullable = true)
 |-- INSPECTION CATEGORY: string (nullable = true)
 |-- DEPARTMENT BUREAU: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- PROPERTY GROUP: string (nullable = true)
 |-- SSA: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- L

We’re not going to build a model on all of the data. We’ll need to separate 2006–2016 from 2017. We’ll use a decade of data to train, and then we’ll test the performance of our model on the 2017 data. Notice that in the above Schema, `VIOLATION DATE` , is string type. This means we’ll need to do some wrangling before we can filter by the dates in an intuitive way.

In [6]:
// Create datetime column
val dated = violations.withColumn("timeStamp", to_date(unix_timestamp(
  $"VIOLATION DATE", "MM/dd/yyyy"
).cast("timestamp")))


Let’s make some more modifications. First, we’ll rename all of the columns so that we can reference them more easily later. 
We'll also remove the space between the names and replace with an underscore.

In [None]:
// sub whitespace for `_`
var cleanDf = dated
for(col <- dated.columns){
    cleanDf = cleanDf.withColumnRenamed(col,col.replaceAll("\\s", "_"))
}

We’re modeling `INSPECTION_STATUS`, but there are a small number of records where the status has not been resolved into `PASSED` or `FAILED`. We can select only those records that meet our criteria with `SQL Transformer`.

In [None]:
import org.apache.spark.ml.feature.SQLTransformer
val df = new SQLTransformer().setStatement("SELECT * FROM __THIS__ WHERE INSPECTION_STATUS IN ('FAILED', 'PASSED')").transform(cleanDf)
val preppedFrame = df.withColumn("LATITUDE", df("LATITUDE").cast(DoubleType)).
                    withColumn("LONGITUDE", df("LONGITUDE").cast(DoubleType))

We also change the datatype of LATITUDE and LONGITUDE from string to Double. Now we’ll separate the data by year.

In [7]:
// Filter by date. Train on  year < 2017, test on 2017 data
val trainingData2016 = preppedFrame.filter(year($"timestamp").leq(lit(2016))) 
val testingData2017 = preppedFrame.filter(year($"timestamp").gt(lit(2016))) 

Notice that `leq` is `less-than-or-equal-to` . Then `gt` should be easy to guess.
Now, we’ve represented the DataFrame with a new field, timeStamp . We can use this to filter the timestamp data intuitively.

In [8]:
// Take a peek
testingData2017.select("VIOLATION_DATE").show(3)

+--------------+
|VIOLATION_DATE|
+--------------+
|    08/15/2017|
|    05/09/2017|
|    02/28/2017|
+--------------+
only showing top 3 rows



In [9]:
// at the training data too
trainingData2016.select("VIOLATION_DATE").show(3)

+--------------+
|VIOLATION_DATE|
+--------------+
|    07/24/2007|
|    04/01/2008|
|    04/01/2008|
+--------------+
only showing top 3 rows



For simplicity, we’re going to choose only a subset of the fields to use for modeling. Many of the other fields have numerous missing values, which is slightly beyond the scope of this tutorial. 
First, we’ll specify a subset of the columns. Then we’ll drop those rows which contain nulls.

In [10]:
val keepCols = Array("VIOLATION_CODE", "VIOLATION_DESCRIPTION", 
                   "INSPECTION_STATUS", "INSPECTOR_ID", 
                   "INSPECTION_CATEGORY", "DEPARTMENT_BUREAU", 
                   "LATITUDE", "LONGITUDE")
val dfTrain = trainingData2016.select(keepCols.head, keepCols.tail: _*).na.drop
val dfTest = testingData2017.select(keepCols.head, keepCols.tail: _*).na.drop

In [21]:
/* save the testing df for later */
val written = dfTest.coalesce(1).write.option("header", "true").csv("buildingTest2017OK.csv")

0

In [12]:
dfTrain.printSchema()

root
 |-- VIOLATION_CODE: string (nullable = true)
 |-- VIOLATION_DESCRIPTION: string (nullable = true)
 |-- INSPECTION_STATUS: string (nullable = true)
 |-- INSPECTOR_ID: string (nullable = true)
 |-- INSPECTION_CATEGORY: string (nullable = true)
 |-- DEPARTMENT_BUREAU: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)



<a id="build"></a>
### 2. Build Pipeline
When deploying a model to Watson Machine Learning, we need to provide a `Spark Machine Learning Pipeline` which indicates how to transform raw data into the representation required by our model. Pipelines typically include a series of transformers and terminate with a model or, especially in classification tasks, some transformer which will convert model predictions into string labels.

In [13]:
/* Import transformers to build a pipeline [...] */ 
import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorAssembler}
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics


We’ll use the `StringIndexer` in order to convert strings into a numeric representation for the machine. You can read about many transformations in the documentation. We assign each transformation a value because we’ll need to reference them later in the Pipeline.

In [14]:
// Label colum
val labelCol = new StringIndexer().setInputCol("INSPECTION_STATUS").setOutputCol("STATUS_LABEL").fit(df)

// Feature cols with String Indexer => Vector Assembler //

//* VIOLATION CODE * //
val interCodeCol = new StringIndexer().setInputCol("VIOLATION_CODE").setOutputCol("CODE_X").setHandleInvalid("skip")


//* INSPECTOR ID * //
val interSpector = new StringIndexer().setInputCol("INSPECTOR_ID").setOutputCol("INSP_X").setHandleInvalid("skip")


//* INSPECTION CATEGORY * //
val interCatSpector = new StringIndexer().setInputCol("INSPECTION_CATEGORY").setOutputCol("INCAT_X").setHandleInvalid("skip")


//* DEPARTMENT BUREAU * //
val interBureau = new StringIndexer().setInputCol("DEPARTMENT_BUREAU").setHandleInvalid("skip").setOutputCol("BUR_X")


//** DEALING WITH TEXT **//
val regexTokenizer = new RegexTokenizer().setInputCol("VIOLATION_DESCRIPTION").setOutputCol("WORD_X").setPattern("\\W")
val hashingTF = new HashingTF().setInputCol("WORD_X").setOutputCol("DESCRIPTION").setNumFeatures(150) // experiment with numFeatures + regularization params

// LAT AND LONG ARE NUMERIC //


//** VECTOR ASSEMBLER **//

val vecAssembler = new VectorAssembler().setInputCols(Array("BUR_X", "INCAT_X", "CODE_X", "INSP_X", "DESCRIPTION", "LATITUDE", "LONGITUDE")).setOutputCol("FEATURES")



Notice that after creating a new instance of `StringIndexer` , we use `setInputCol` and `setOutputCol` . The output column will go into the `VectorAssembler`. All of those features we use for modeling we’ll include in VectorAssembler.
But what about string data that is not categorical? Sure, we can index all of the `INSPECTOR_ID` data, but does that make sense for the `VIOLATION_DESCRIPTION` , where almost every field is unique? 
<br>

For text data like this, Scala and Spark provide other handy transformations. For `RegexTokenizer` and `HashingTF` the general idea is simple. We’re going to take the text and break it into individual words, called `tokens`, with the tokenizer. Then we map the tokens contained in each violation description to their frequencies. This will allow us to accept unseen data as well.

In [15]:
import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression

//** Logistic Regression **//
val logitModel = new LogisticRegression().setLabelCol("STATUS_LABEL").setFeaturesCol("FEATURES").setRegParam(0.1)


//** Convert index prediction back to string **//
val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("PREDICTED_LABEL").setLabels(labelCol.labels)

Build the modeling pipeline.

In [16]:
/* Logitic Regression Pipeline */ 
val logisticPipe = new Pipeline().setStages(
                                    Array(
                                        labelCol, 
                                        interCodeCol, 
                                        interSpector, 
                                        interCatSpector,
                                        interBureau,
                                        regexTokenizer, hashingTF,
                                        vecAssembler,
                                        logitModel                                                                  
                                    )
                                )

<a id="train"></a>
### 3. Train models
Just call `.fit()` on the pipe.

In [17]:
val trainedLogit = logisticPipe.fit(dfTrain)



We can make predictions and get metrics.

In [18]:
// predict
val predictionsLogisitc = trainedLogit.transform(dfTest)


// Prepare for metrics
val predictionAndLabels = predictionsLogisitc.select("STATUS_LABEL", "prediction").rdd.map(row => 
            (row.getAs[Double]("prediction"), row.getAs[Double]("STATUS_LABEL")))

val metrics = new BinaryClassificationMetrics(predictionAndLabels)

We have a new object metrics which contains a lot of information.

In [19]:
// AUC
metrics.areaUnderROC



0.6834321232738194

<a id="save"></a>
### 4. Save Model
You’ll need an instance of `Watson Machine Learning` on Bluemix. You can create a new instance directly from within DSX, but you’ll need to log in to Bluemix for your credentials. Check the companion [blog](https://medium.com/p/91b580450c5b/edit) for more details.

In [16]:
import com.ibm.analytics.ngp.repository._

// Helper libraries

import scalaj.http.{Http, HttpOptions}
import scala.util.{Success, Failure}
import java.util.Base64
import java.nio.charset.StandardCharsets
import play.api.libs.json._

In [23]:
//@hidden_cell


In [18]:
// Authorize
val client = MLRepositoryClient(service_path)
client.authorize(username, password)

Success(())

In [19]:
// model artifact
val model_artifact = MLRepositoryArtifact(trainedLogit, dfTrain, "VIOLATIONS_SCALA211_SPARK20")
val saved = client.models.save(model_artifact)

In [20]:
saved

Success(com.ibm.analytics.ngp.repository.MLRepositoryClient$ModelAdapter$$anon$1@edd63bce)

In [21]:
println("modelType: " + saved.get.meta.prop("modelType"))
println("trainingDataSchema: " + saved.get.meta.prop("trainingDataSchema"))
println("creationTime: " + saved.get.meta.prop("creationTime"))
println("modelVersionHref: " + saved.get.meta.prop("modelVersionHref"))
println("label: " + saved.get.meta.prop("label"))
println("runtime: "+ saved.get.meta.prop("runtime"))
0

modelType: Some(sparkml-model-2.0)
trainingDataSchema: Some({"type":"struct","fields":[{"name":"VIOLATION_CODE","type":"string","nullable":true,"metadata":{}},{"name":"VIOLATION_DESCRIPTION","type":"string","nullable":true,"metadata":{}},{"name":"INSPECTION_STATUS","type":"string","nullable":true,"metadata":{}},{"name":"INSPECTOR_ID","type":"string","nullable":true,"metadata":{}},{"name":"INSPECTION_CATEGORY","type":"string","nullable":true,"metadata":{}},{"name":"DEPARTMENT_BUREAU","type":"string","nullable":true,"metadata":{}},{"name":"LATITUDE","type":"double","nullable":true,"metadata":{}},{"name":"LONGITUDE","type":"double","nullable":true,"metadata":{}}]})
creationTime: Some(2017-10-05T17:48:26.849Z)
modelVersionHref: Some(https://ibm-watson-ml.mybluemix.net/v2/artifacts/models/bcbf46f6-a5bc-4ed9-bc78-a90940711c6b/versions/887800a7-ffc3-4a82-885d-428fe63a2461)
label: Some(INSPECTION_STATUS)
runtime: Some(spark-2.0)


0

<a id="deploy"></a>
### 5. Deploy

If you're following along in the [blog](https://medium.com/p/91b580450c5b/edit), this is where we'll deploy through the UI. 

____________

### Author
Adam Massachi is a Data Science Intern with the Data Science Experience and Watson Data Platform teams at IBM. Before IBM, he worked on political campaigns, building and managing large volunteer operations and organizing campaign finance initiatives. Say hello [@adammassach](https://twitter.com/adammassach?lang=en)!

### Citations

City of Chicago (2017). Building Violations <a href=https://data.cityofchicago.org/Buildings/Building-Violations/22u3-xenr target="_blank" rel="noopener noreferrer">https://data.cityofchicago.org/Buildings/Building-Violations/22u3-xenr</a>  Chicago, IL: Chicago City Data Portal

Copyright © IBM Corp. 2017. This notebook and its source code are released under the terms of the MIT License.