# How to create a batch web service for a Spark model on Azure

Before running the tutorial, you must configure your DSVM as specified in the README on the [Machine Learing Operationaliztion](https://aka.ms/o16ncli) GitHub repo. If you have previously configured your DSVM, you may want to check the GitHub repo to ensure that you are using the most recent instructions.

In the tutorial you will use [Apache Spark](http://spark.apache.org/) to create a model that uses a Logistic Regression learner to predict food inspection results. To do this, you will call the Spark Python API ([PySpark](http://spark.apache.org/docs/0.9.0/python-programming-guide.html)) to load a dataset, train a model using the dataset, and publish a batch scoring API for the model.

You then use the Azure CLI to operationalize the model and to call the web service.

## Load the data

The tutorial uses the *Food Inspections Data Set* which contains the results of food inspections that were conducted in Chicago. To facilitate this tutorial, we have placed a copy of the data in the ```azureml/datasets``` folder. The original dataset is available from the [City of Chicago data portal](https://data.cityofchicago.org/Health-Human-Services/Food-Inspections/4ijn-s7e5). 

In [1]:
### Import the relevant PySpark bindings
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

### Parse the food inspections dataset and create numerical labels for training

In [2]:
inspections = spark.read.csv("../datasets/food_inspections1.csv",mode='DROPMALFORMED',inferSchema=False)

schema = StructType([StructField("id", IntegerType(), False), 
                     StructField("name", StringType(), False), 
                     StructField("results", StringType(), False), 
                     StructField("violations", StringType(), True)])

df = sqlContext.createDataFrame(inspections.rdd.map(lambda l: (int(l[0]), l[1], l[12], l[13] if l[13] else '')), schema) 
df.registerTempTable('CountResults')

def labelForResults(s):
    if s == 'Fail':
        return 0.0
    elif s == 'Pass w/ Conditions' or s == 'Pass':
        return 1.0
    else:
        return -1.0
    
label = UserDefinedFunction(labelForResults, DoubleType())
labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
labeledData.write.format('parquet').mode('overwrite').save('foo')

#### Create and save the model
Next, train a logistic regression model to predict inspection results. The following code tokenizes each "violations" string to get the individual words in each string. It then uses a HashingTF to convert each set of tokens into a feature vector which is passed to the logistic regression algorithm to construct a model. 

In [3]:
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

Finally, you save the model to use when deploying the web service.

In [4]:
model.write().overwrite().save("food_inspection.model")
print "Model saved"

Model saved


## Creating a Batch Web Service

In this section, you will create and deploy a batch webservice that will make predictions on given data using the model that you trained.

### Create a prediction script 

Your goal is to create a web service that you can call to make predictions based on the input data. To create a web service using the model you saved, you start by authoring a function to do the scoring.

In the provided sample, the function takes a Spark Dataframe as its input-data argument, uses the model specified by the user as model input, and makes predictions on the data by running the model. The function then saves the predictions as a parquet file to the path provided through the output-data argument.

In [5]:
# Import Azure ML API SDK. The SDK is installed implicitly with the latest
# version of the CLI in your default python environment
from azure.ml.api.schema.dataTypes import DataTypes
from azure.ml.api.schema.sampleDefinition import SampleDefinition
from azure.ml.api.batch.batch_handler import prepare

In [6]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

def run(input_data, trained_model, output_data):
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext
    sqlContext = SQLContext.getOrCreate(sc)
    
    schema = StructType([StructField("id", IntegerType(), False),
                     StructField("name", StringType(), False),
                     StructField("results", StringType(), False),
                     StructField("violations", StringType(), True)])

    testDf = sqlContext.createDataFrame(input_data.rdd.map(lambda l: (int(l[0]), l[1], l[12], l[13] if l[13] else '')), schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
    
    predictionsDf = trained_model.transform(testDf)
    predictionsDf.write.format("parquet").mode('overwrite').save(str(output_data))

### Create a schema file 

To generate a schema for the inputs and models, you define a map of input names to tuple of input sample data and boolean, where the boolean signifies whether or not to expect headers for the data file. Outputs currently only map to a standard sample datatype. Parameters are used for any non-file inputs that the function expects, and also map to a standard sample type. The names for the inputs, outputs, and parameters must match exactly with the names of the arguments for the run function. For samples use the data structures you created and used for testing the model after training. Lastly, you can list any dependencies that your function has that are not already provided in the default environment.

### Create the driver and schema files

Finally, we put all of this together by calling the prepare function with the run, and inputs (and/or outputs) definitions.

This creates two files in the current working directory, the driver program named *service_driver.py*, and a schema file named *batch_schema_{timestamp}*.

The function outputs a command to call using the az ml cli to publish the created batch service.

In [17]:
inputs = {'input_data': (SampleDefinition(DataTypes.SPARK, labeledData), True),
          'trained_model': (SampleDefinition(DataTypes.SPARK, labeledData), True)}
outputs = {'output_data': SampleDefinition(DataTypes.STANDARD, 'output.parquet')}
parameters = {}
dependencies = []

prepare(run_func=run, inputs=inputs, outputs=outputs, parameters=parameters, dependencies=dependencies, service_name='batch_score')

az ml service create batch -f service_driver.py -n batch_score --in=--input-data --in=--trained-model --out=--output-data -d batch_schema_20170630232457.json


## Use the CLI to deploy and manage your batch web service

You can deploy an operationalized model as a web service locally and to a cluster.

Open an SSH session to your DSVM and change to the folder notebooks/azureml/batch.

```
cd ~/notebooks/azureml/batch
```

#### Deploy to local machine

To create the batch web service locally on the DSVM, set your CLI environment to run in local mode.

```
az ml env local
```

The following command creates a web service local to the DSVM. It reads the model from local storage and specifies that the output is written to local storage.

```
az ml service create batch -f batch_score.py -n batchwebservice --in=--input-data --in=--trained-model:food_inspection.model --out=--output-data:food_inspection_predictions.parquet
```

Once the web service is successfully created, the following command runs a job against the web service:

```
az ml service run batch -n samplebatch --out=--output-data:output.parquet -w
```

#### Deploy to a cluster

In the following example, the input data is stored remotely. When you create the batch web service, you can use data that is stored in a private blob in Azure storage. When using this scenario, the blob must be in the storage account that was created during your environment setup and that setup must be the active one in your az ml CLI environment. When you create the service the CLI uses the credentials stored in your environment to retrieve the data.

Download the *food_inspections2.csv* data file and upload it to a container in storage account that was created when you set up your environment.

To find the name of the storage account, open your *.amlenvrc* file and find the AML_STORAGE_ACCT_NAME variable.

To create the batch web service locally on the DSVM, set your CLI environment to run in local mode.
```
az ml env cluster
```

To create the web service, run the following command (update the account name with your storage account and container names):

```
az ml service create batch -f batch_score.py --in=--trained-model:food_inspection.model --in=--input-data:https://<yourStorageAccount>.blob.core.windows.net/<containerName>/food_inspections2.csv --out=--output-data -v -n samplebatch
```

Once the web service is successfully created, use the following command to run the job. The output is stored remotely using the wasb protocol: wasb[s]://&lt;containername>@&lt;accountname>.blob.core.windows.net/&lt;path>.

```
az ml service run batch --out=--output-data:wasbs://<containerName>@<StorageAccountName>.blob.core.windows.net/output.parquet -v -n samplebatch 
```

#### View a list jobs running against your web service

View the list of jobs running against your web service to get the ID of the job:

```
az ml service listjobs batch -n batchwebservice
```

Use the Job Name to view the status with the following command:

```
az ml service viewjob batch -n batchwebservice -j <paste job name here>
```