# How to create a batch web service for a Spark model on Azure

Before running the tutorial, you must configure your DSVM as specified in the README on the [Deploying Spark ML Models on Azure (Preview)](https://github.com/Azure/Spark-Operationalization-On-Azure) GitHub repo. If you have previously configured your DSVM, you may want to check the GitHub repo to ensure that you are using the most recent instructions.


In the tutorial you will use [Apache Spark](http://spark.apache.org/) to create a model that uses a Logistic Regression learner to predict food inspection results. To do this, you will call the Spark Python API ([PySpark](http://spark.apache.org/docs/0.9.0/python-programming-guide.html)) to load a dataset, train a model using the dataset, and publish a batch scoring API for the model.

# How to create a batch web service for a Spark model on Azure

Before running the tutorial, you must configure your DSVM as specified in the README on the [Operationalizing Spark Models on Azure](https://github.com/Azure/Spark-Operationalization-On-Azure) GitHub repo.

In the tutorial you will use [Apache Spark](http://spark.apache.org/) to create a model that uses a Logistic Regression learner to predict food inspection results. To do this, you will call the Spark Python API ([PySpark](http://spark.apache.org/docs/0.9.0/python-programming-guide.html)) to load a dataset, train a model using the dataset, and publish a batch scoring API for the model.


## Load the data

The tutorial uses the *Food Inspections Data Set* which contains the results of food inspections that were conducted in Chicago. To facilitate this tutorial, we have placed a copy of the data in the ```azureml/datasets``` folder. The original dataset is available from the [City of Chicago data portal](https://data.cityofchicago.org/Health-Human-Services/Food-Inspections/4ijn-s7e5). 

In [None]:
### Import the relevant PySpark bindings
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

### Parse the food inspections dataset and create numerical labels for training

In [None]:
inspections = spark.read.csv("../datasets/food_inspections1.csv",mode='DROPMALFORMED',inferSchema=False)

schema = StructType([StructField("id", IntegerType(), False), 
                     StructField("name", StringType(), False), 
                     StructField("results", StringType(), False), 
                     StructField("violations", StringType(), True)])

df = sqlContext.createDataFrame(inspections.rdd.map(lambda l: (int(l[0]), l[1], l[12], l[13] if l[13] else '')), schema) 
df.registerTempTable('CountResults')

def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
    
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
labeledData.write.format('parquet').mode('overwrite').save('foo')

#### Create and save the model
Next, you train a logistic regression model to predict inspection results. The following code tokenizes each "violations" string to get the individual words in each string. It then uses a HashingTF to convert each set of tokens into a feature vector which is passed to the logistic regression algorithm to construct a model. 

In [None]:
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

Finally, you save the model to use when deploying the web service.

In [None]:
model.write().overwrite().save("food_inspection.model")
print "Model saved"

## Creating a Batch Web Service

In this section, you will create and deploy a batch webservice that will make predictions on given data using the model that you trained.

### Create a prediction script 

Your goal is to create an endpoint that you can call to make predictions based on the input data. To create a web service using the model you saved, you start by authoring a script to do the scoring.

In the provided sample, the script takes a data file as its input-data argument, uses the model specified by the user as model input, and makes predictions on the data by running the model. The script then saves the predictions as a parquet file to the path provided through the output-data argument.

The save file call (```%%save_file -f batch_score.py```) in the first line of the of the cell saves the contents of the cell to a local file with the name supplied by the ```-f``` argument.

When calling the batch web service create command for the scoring script using the AML CLI, you must provide the parameters that you identified in the script as command line arguments.

You can choose to parameterize your scoring script files per your descretion.

In [None]:
%%save_file -f batch_score.py
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import argparse

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext.getOrCreate(sc)


parser = argparse.ArgumentParser()
parser.add_argument("--input-data")
parser.add_argument("--trained-model")
parser.add_argument("--output-data")

args = parser.parse_args()
print str(args.input_data)
print str(args.trained_model)
print str(args.output_data)

testdata = spark.read.csv(str(args.input_data),mode='DROPMALFORMED',inferSchema=False)

schema = StructType([StructField("id", IntegerType(), False), 
                     StructField("name", StringType(), False), 
                     StructField("results", StringType(), False), 
                     StructField("violations", StringType(), True)])

testDf = sqlContext.createDataFrame(testdata.rdd.map(lambda l: (int(l[0]), l[1], l[12], l[13] if l[13] else '')), schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")

model = PipelineModel.load(args.trained_model)

predictionsDf = model.transform(testDf)
predictionsDf.write.format("parquet").mode("overwrite").save(str(args.output_data))

## Use the CLI to deploy and manage your batch web service

Open an SSH session to your DSVM and change to the folder notebooks/azureml/batch.
```
cd ~/notebooks/azureml/batch
```

#### Deploy to local VM

To create the batch web service locally on the DSVM, set your CLI environment to run in local mode.
```
aml env local
```

To create the web service, run the following command:

```
aml service create batch -f batch_score.py -n batchwebservice -i --input-data -i --trained-model=food_inspection.model -o --output-data=food_inspection_predictions.parquet
```

Once the web service is successfully created, the following command runs a job against the web service:

```
aml service run batch -n batchwebservice -w -i --input-data=../datasets/food_inspections2.csv 
```
The run command provides a dataset as input and executes with remaining default parameters as specified during web service creation. You may also choose to speciy your own parameters.

The **-w** parameter specifies that the job is run synchronously. If you do not specify **-w**, you can view the status of the job using the following commands:

View the list of jobs running against your web service to get the ID of the job:

```
aml service listjobs batch -n batchwebservice
```
Use the Job Name to view the status with the following command:
```
aml service viewjob batch -n batchwebservice -j <paste job name here>
```

#### Deploy to HDInsight Cluster

If you haven't already, provision an Azure HDInsight Spark 2.0 cluster. 

To provision an HDInsight Spark 2.0 cluster:

1. Sign in to the the [Azure portal](https://portal.azure.com).
2. click **New** and type HDInsight
3. Select HDInsight from the list returned results.
4. Click **Create**.
5. Enter a **Cluster name**.
6. Click **Cluster configuration**.
	1. In **Cluster type**, select **Spark**. 
	2. in **Version**, select **Spark 2.0.1 (HDI 3.5)**.
	3. Click **Select**.
7. Click **Credentials** and configure the credentials for the cluster. To access Jupyter notebooks, the SSH user name must be all lower case and you must select password authentication.
8. Click **Cluster size**, then click **Select** to accept the default options.
9. Select a resource group to to contain the cluster.
10. Click **Create**.

After the cluster deployment is complete must install the Azure Machine Learning Batch application to enable the cluster to execute on the commands submitted through the Azure ML CLI from your local machine.

To install the AMLBatch app on your HDInsight cluster, click the following link: 

<a href="https://portal.azure.com/#create/Microsoft.Template/uri/https%3A%2F%2Fazuremlbatchtest.blob.core.windows.net%2Ftemplates%2FinstallTemplate.json" target="_blank">AML Batch Install Template</a>

When the template opens, provide the Resource Group and name of the HDInsight Cluster where the web service will be deployed. Leave the node size and count fields as is. Accept the license agreement and click **purchase**.

Open an SSH session to your DSVM and change to the folder notebooks/azureml/batch.
Set your CLI environment to run in cluster mode.

```
aml env cluster
```
You will see the below prompt:
```
Would you like to set up port forwarding to your ACS cluster (Y/n)? n
```
Type **n** at the above prompt since you are not running the RRS scenario on ACS.

Type **y** to continue with cluster mode at the below prompt:

```
Could not connect to ACS cluster. Continue with cluster mode anyway (y/N)? y
```
To target the HDInsight Cluster and associated storage, type the below commands on the terminal to set the following environment variables for successful creation of the web service
```
	export AML_STORAGE_ACCT_NAME=<your storage account name>
	export AML_STORAGE_ACCT_KEY=<your storage account key>
	export AML_HDI_CLUSTER=<the url to your hdinsight cluster>
	export AML_HDI_USER=<your hdinsight user name>
	export AML_HDI_PW=<your hdinsight user password>
```
**Important**: Make sure the storage account you use is the one that's associated with your HDInsight Cluster.

To create the web service on the HDInsight cluster, run the following command:

```
aml service create batch -f batch_score.py -n batchwebservice -i --input-data -i --trained-model=food_inspection.model -o --output-data=wasb:///HdiSamples/HdiSamples/food_inspection_predictions.parquet
```
In the above command, we are specifying the output to be written in the cluster.  

Once the web service is successfully created, the following command runs a job against the web service:

```
aml service run batch -n batchwebservice -w -i --input-data=../datasets/food_inspections2.csv 
```

This run command provides a dataset as input and executes with remaining default parameters as specified during web service creation. You can also choose to speciy your own parameters.

The **-w** parameter specifies that the job is run synchronously. If you do not specify **-w**, you can view the status of the job using the following commands:

View the list of jobs running against your web service to get the ID of the job:

```
aml service listjobs batch -n batchwebservice
```
Use the Job Name to view the status with the following command:
```
aml service viewjob batch -n batchwebservice -j <paste job name here>
```
---  
Created by a Microsoft Employee.  
Copyright (C) Microsoft. All Rights Reserved.