This notebook is based on the Azure ML & Azure Databricks notebooks by Parashar Shah
(ref. https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/azure-databricks/amlsdk )

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

We support installing AML SDK as library from GUI. When attaching a library follow this https://docs.databricks.com/user-guide/libraries.html and add the below string as your PyPi package. You can select the option to attach the library to all clusters or just one cluster.

**install azureml-sdk**
* Source: Upload Python Egg or PyPi
* PyPi Name: `azureml-sdk`
* Select Install Library

In [3]:
import azureml.core

# Check core SDK version number - based on build number of preview/master.
print("SDK version:", azureml.core.VERSION)

Please specify the Azure subscription Id, resource group name, workspace name, and the region in which you want to create the Azure Machine Learning Workspace.

You can get the value of your Azure subscription ID from the Azure Portal, and then selecting Subscriptions from the menu on the left.

For the resource_group, use the name of the resource group that contains your Azure Databricks Workspace.

NOTE: If you provide a resource group name that does not exist, the resource group will be automatically created. This may or may not succeed in your environment, depending on the permissions you have on your Azure Subscription.

In [5]:
# subscription_id = "<your-subscription-id>"
# resource_group = "<your-existing-resource-group>"
# workspace_name = "<a-new-or-existing-workspace; it is unrelated to Databricks workspace>"
# workspace_region = "<your-resource group-region>"

In [6]:
# Set auth to be used by workspace related APIs.
# For automation or CI/CD ServicePrincipalAuthentication can be used.
# https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.authentication.serviceprincipalauthentication?view=azure-ml-py
auth = None

In [7]:
# import the Workspace class and check the azureml SDK version
# exist_ok checks if workspace exists or not.

from azureml.core import Workspace

ws = Workspace.create(name = workspace_name,
                      subscription_id = subscription_id,
                      resource_group = resource_group, 
                      location = workspace_region,
                      auth = auth,
                      exist_ok=True)

In [8]:
#get workspace details
ws.get_details()

In [9]:
ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group,
               auth = auth)

In [10]:
# find out what you can do with Azure ML class Workspace
help(Workspace)

#02 Ingest Data

#Data Ingestion

In [13]:
import os
import urllib

In [14]:
# Download AdultCensusIncome.csv from Azure CDN. This file has 32,561 rows.
dataurl = "https://amldockerdatasets.azureedge.net/AdultCensusIncome.csv"
datafile = "AdultCensusIncome.csv"
datafile_dbfs = os.path.join("/dbfs", datafile)

if os.path.isfile(datafile_dbfs):
    print("found {} at {}".format(datafile, datafile_dbfs))
else:
    print("downloading {} to {}".format(datafile, datafile_dbfs))
    urllib.request.urlretrieve(dataurl, datafile_dbfs)

In [15]:
# Create a Spark dataframe out of the csv file.
data_all = sqlContext.read.format('csv').options(header='true', inferSchema='true', ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true').load(datafile)
print("({}, {})".format(data_all.count(), len(data_all.columns)))
data_all.printSchema()

In [16]:
#renaming columns
columns_new = [col.replace("-", "_") for col in data_all.columns]
data_all = data_all.toDF(*columns_new)
data_all.printSchema()

In [17]:
display(data_all.limit(5))

#Data Preparation

In [19]:
# Choose feature columns and the label column.
label = "income"
xvars = set(data_all.columns) - {label}

print("label = {}".format(label))
print("features = {}".format(xvars))

data = data_all.select([*xvars, label])

# Split data into train and test.
train, test = data.randomSplit([0.75, 0.25], seed=123)

print("train ({}, {})".format(train.count(), len(train.columns)))
print("test ({}, {})".format(test.count(), len(test.columns)))

#Data Persistence

In [21]:
# Write the train and test data sets to intermediate storage
train_data_path = "AdultCensusIncomeTrain"
test_data_path = "AdultCensusIncomeTest"

train_data_path_dbfs = os.path.join("/dbfs", "AdultCensusIncomeTrain")
test_data_path_dbfs = os.path.join("/dbfs", "AdultCensusIncomeTest")

train.write.mode('overwrite').parquet(train_data_path)
test.write.mode('overwrite').parquet(test_data_path)
print("train and test datasets saved to {} and {}".format(train_data_path_dbfs, test_data_path_dbfs))

#03 Model Building

#Model Building

In [24]:
import os
import pprint
import numpy as np

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [25]:
#get the train and test datasets
train_data_path = "AdultCensusIncomeTrain"
test_data_path = "AdultCensusIncomeTest"

train = spark.read.parquet(train_data_path)
test = spark.read.parquet(test_data_path)

print("train: ({}, {})".format(train.count(), len(train.columns)))
print("test: ({}, {})".format(test.count(), len(test.columns)))

train.printSchema()

#Define Model

In [27]:
label = "income"
dtypes = dict(train.dtypes)
dtypes.pop(label)

si_xvars = []
ohe_xvars = []
featureCols = []
for idx,key in enumerate(dtypes):
    if dtypes[key] == "string":
        featureCol = "-".join([key, "encoded"])
        featureCols.append(featureCol)
        
        tmpCol = "-".join([key, "tmp"])
        # string-index and one-hot encode the string column
        #https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
        #handleInvalid: Param for how to handle invalid data (unseen labels or NULL values). 
        #Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), 
        #or 'keep' (put invalid data in a special additional bucket, at index numLabels). Default: "error"
        si_xvars.append(StringIndexer(inputCol=key, outputCol=tmpCol, handleInvalid="skip"))
        ohe_xvars.append(OneHotEncoder(inputCol=tmpCol, outputCol=featureCol))
    else:
        featureCols.append(key)

# string-index the label column into a column named "label"
si_label = StringIndexer(inputCol=label, outputCol='label')

# assemble the encoded feature columns in to a column named "features"
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

In [28]:
from azureml.core.run import Run
from azureml.core.experiment import Experiment
import numpy as np
import os
import shutil

model_name = "AdultCensus_runHistory.mml"
model_dbfs = os.path.join("/dbfs", model_name)
run_history_name = 'spark-ml-notebook'

# start a training run by defining an experiment
myexperiment = Experiment(ws, "Ignite_AI_Talk")
root_run = myexperiment.start_logging()

# Regularization Rates - 
regs = [0.0001, 0.001, 0.01, 0.1]
 
# try a bunch of regularization rate in a Logistic Regression model
for reg in regs:
    print("Regularization rate: {}".format(reg))
    # create a bunch of child runs
    with root_run.child_run("reg-" + str(reg)) as run:
        # create a new Logistic Regression model.
        lr = LogisticRegression(regParam=reg)
        
        # put together the pipeline
        pipe = Pipeline(stages=[*si_xvars, *ohe_xvars, si_label, assembler, lr])

        # train the model
        model_p = pipe.fit(train)
        
        # make prediction
        pred = model_p.transform(test)
        
        # evaluate. note only 2 metrics are supported out of the box by Spark ML.
        bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
        au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
        au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

        print("Area under ROC: {}".format(au_roc))
        print("Area Under PR: {}".format(au_prc))
      
        # log reg, au_roc, au_prc and feature names in run history
        run.log("reg", reg)
        run.log("au_roc", au_roc)
        run.log("au_prc", au_prc)
        run.log_list("columns", train.columns)

        # save model
        model_p.write().overwrite().save(model_name)
        
        # upload the serialized model into run history record
        mdl, ext = model_name.split(".")
        model_zip = mdl + ".zip"
        shutil.make_archive(mdl, 'zip', model_dbfs)
        run.upload_file("outputs/" + model_name, model_zip)        
        #run.upload_file("outputs/" + model_name, path_or_stream = model_dbfs) #cannot deal with folders

        # now delete the serialized model from local folder since it is already uploaded to run history 
        shutil.rmtree(model_dbfs)
        os.remove(model_zip)
        
# Declare run completed
root_run.complete()
root_run_id = root_run.id
print ("run id:", root_run.id)

In [29]:
metrics = root_run.get_metrics(recursive=True)
best_run_id = max(metrics, key = lambda k: metrics[k]['au_roc'])
print(best_run_id, metrics[best_run_id]['au_roc'], metrics[best_run_id]['reg'])

In [30]:
#Get the best run
child_runs = {}

for r in root_run.get_children():
    child_runs[r.id] = r
   
best_run = child_runs[best_run_id]

In [31]:
#Download the model from the best run to a local folder
best_model_file_name = "best_model.zip"
best_run.download_file(name = 'outputs/' + model_name, output_file_path = best_model_file_name)

#Model Evaluation

In [33]:
##unzip the model to dbfs (as load() seems to require that) and load it.
if os.path.isfile(model_dbfs) or os.path.isdir(model_dbfs):
    shutil.rmtree(model_dbfs)
shutil.unpack_archive(best_model_file_name, model_dbfs)

model_p_best = PipelineModel.load(model_name)

In [34]:
# make prediction
pred = model_p_best.transform(test)
output = pred[['hours_per_week','age','workclass','marital_status','income','prediction']]
display(output.limit(5))

In [35]:
# evaluate. note only 2 metrics are supported out of the box by Spark ML.
bce = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')
au_roc = bce.setMetricName('areaUnderROC').evaluate(pred)
au_prc = bce.setMetricName('areaUnderPR').evaluate(pred)

print("Area under ROC: {}".format(au_roc))
print("Area Under PR: {}".format(au_prc))

#Model Persistence

In [37]:
##NOTE: by default the model is saved to and loaded from /dbfs/ instead of cwd!
model_p_best.write().overwrite().save(model_name)
print("saved model to {}".format(model_dbfs))

In [38]:
%sh

# check the model has been saved

ls -la /dbfs/AdultCensus_runHistory.mml/*

# 04 Deploy Model

# Register Azure Databricks trained model and deploy it to ACI

Please ensure you have run all previous notebooks in sequence before running this.

Please Register Azure Container Instance(ACI) using Azure Portal: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-supported-services#portal in your subscription before using the SDK to deploy your ML model to ACI.

In [42]:
##NOTE: service deployment always gets the model from the current working dir.
import os

model_name = "AdultCensus_runHistory.mml" # 
model_name_dbfs = os.path.join("/dbfs", model_name)

print("copy model from dbfs to local")
model_local = "file:" + os.getcwd() + "/" + model_name
dbutils.fs.cp(model_name, model_local, True)

In [43]:
#Register the model
from azureml.core.model import Model
mymodel = Model.register(model_path = model_name, # this points to a local file
                       model_name = model_name, # this is the name the model is registered as, am using same name for both path and name.                 
                       description = "ADB trained model by Parashar",
                       workspace = ws)

print(mymodel.name, mymodel.description, mymodel.version)

In [44]:
#%%writefile score_sparkml.py
score_sparkml = """
 
import json
 
def init():
    # One-time initialization of PySpark and predictive model
    import pyspark
    import os
    from azureml.core.model import Model
    from pyspark.ml import PipelineModel
 
    global trainedModel
    global spark
 
    spark = pyspark.sql.SparkSession.builder.appName("ADB and AML notebook by Parashar").getOrCreate()
    model_name = "{model_name}" #interpolated
    # AZUREML_MODEL_DIR is an environment variable created during deployment.
    # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
    # For multiple models, it points to the folder containing all deployed models (./azureml-models)
    model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), model_name)
    trainedModel = PipelineModel.load(model_path)
    
def run(input_json):
    if isinstance(trainedModel, Exception):
        return json.dumps({{"trainedModel":str(trainedModel)}})
      
    try:
        sc = spark.sparkContext
        input_list = json.loads(input_json)
        input_rdd = sc.parallelize(input_list)
        input_df = spark.read.json(input_rdd)
    
        # Compute prediction
        prediction = trainedModel.transform(input_df)
        #result = prediction.first().prediction
        predictions = prediction.collect()
 
        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        result = ",".join(preds)
        # you can return any data type as long as it is JSON-serializable
        # return result.tolist()
        return result
    except Exception as e:
        result = str(e)
        return result
    
""".format(model_name=model_name)
 
exec(score_sparkml)
 
with open("score_sparkml.py", "w") as file:
    file.write(score_sparkml)

In [45]:
#deploy to ACI
from azureml.core.webservice import AciWebservice, Webservice
from azureml.exceptions import WebserviceException
from azureml.core.model import InferenceConfig
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies


myaci_config = AciWebservice.deploy_configuration(cpu_cores = 2, 
                                                  memory_gb = 2, 
                                                  tags = {'name':'Databricks Azure ML ACI'}, 
                                                  description = 'This is for ADB and AML example.')

service_name = 'aciws'

# Remove any existing service under the same name.
try:
    Webservice(ws, service_name).delete()
except WebserviceException:
    pass

myenv = Environment.get(ws, name='AzureML-PySpark-MmlSpark-0.15')
# we need to add extra packages to procured environment
# in order to deploy amended environment we need to rename it
myenv.name = 'myenv'
inference_config = InferenceConfig(entry_script='score_sparkml.py', environment=myenv)

myservice = Model.deploy(ws, service_name, [mymodel], inference_config, myaci_config)
myservice.wait_for_deployment(show_output=True)

In [46]:
# learn about Webservice
help(Webservice)

In [47]:
#for using the Web HTTP API 
print(myservice.scoring_uri)

In [48]:
import json

#get the some sample data
test_data_path = "AdultCensusIncomeTest"
test = spark.read.parquet(test_data_path).limit(5)

test_json = json.dumps(test.toJSON().collect())

print(test_json)

In [49]:
#using data defined above predict if income is >50K (1) or <=50K (0)
myservice.run(input_data=test_json)

In [50]:
#comment to not delete the web service
# myservice.delete()

## Deploying to other types of computes

In order to learn how to deploy to other types of compute targets, such as AKS, please take a look at the set of notebooks in the [deployment](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/deployment) folder.