In [1]:
import azureml.core
azureml.core.VERSION

In [2]:
workspace_name = 'AMLWorkspace1'
subscription_id = 'f78802b9-4646-4cf6-b7f1-6b2ee63219f4'
resource_group = 'ADBDevRg'

In [3]:
import os
import shutil
from azureml.core.workspace import Workspace

ws = Workspace(
    workspace_name = workspace_name,
    subscription_id = subscription_id,
    resource_group = resource_group)

# persist the subscription id, resource group name, and workspace name in aml_config/config.json.
aml_config = 'aml_config'
if os.path.isfile(aml_config) or os.path.isdir(aml_config):
    shutil.rmtree(aml_config)
ws.write_config()

In [4]:
%sh
cat /databricks/driver/aml_config/config.json

In [5]:
ws.get_details()

In [6]:
#persist the config file to dbfs so that it can be used for the other notebooks.
aml_config_local = 'file:' + os.getcwd() + '/' + aml_config
aml_config_dbfs = '/dbfs/' + 'aml_config'

if os.path.isfile(aml_config_dbfs) or os.path.isdir(aml_config_dbfs):
    shutil.rmtree(aml_config_dbfs)
    #dbutils.fs.rm(aml_config, recurse=True)

dbutils.fs.cp(aml_config_local, aml_config, recurse=True)

In [7]:
import os
#import urllib
#import pandas as pd

from pyspark.ml import PipelineModel

In [8]:
##NOTE: service deployment always gets the model from the current working dir. 
model_name = "modelMain"
model_path_dbfs = "/modelMain/"#os.path.join("/dbfs/models", model_name)
model_path_local = "file:" + os.getcwd() + "/" + model_name + "/"

print("copy model from dbfs {} to local {}".format(model_path_dbfs, model_path_local))
dbutils.fs.cp(model_path_dbfs, model_path_local, recurse=True)

In [9]:
import azureml.core
from azureml.core.workspace import Workspace

#get the config file from dbfs
aml_config = '/aml_config'
dbutils.fs.cp(aml_config, 'file:'+os.getcwd()+aml_config, recurse=True)

ws = Workspace.from_config()

In [10]:
#Register the model
from azureml.core.model import Model
mymodel = Model.register(model_path = model_name, # this points to a local file or folder in the current working dir
                       model_name = model_name, # this is the name the model is registered with                 
                       description = "Taxi Fare Prediction Model",
                       workspace = ws)

print(mymodel.name, mymodel.description, mymodel.version)

In [11]:
%%writefile score_sparkml.py

import json

def init():
    try:
        # One-time initialization of PySpark and predictive model
        import pyspark
        from pyspark.ml import PipelineModel
        from azureml.core.model import Model
        from mmlspark import LightGBMRegressor
        global trainedModel
        global spark
        
        spark = pyspark.sql.SparkSession.builder.appName("Scoring").getOrCreate()
      
        model_name = "modelMain" 
        
        model_path = Model.get_model_path(model_name)

        trainedModel = PipelineModel.load(model_path)

    except Exception as e:
        json.dumps("Exception in init: " + str(e))
        trainedModel = e

def run(input_df):
    response = ''    

    if isinstance(trainedModel, Exception):
        return json.dumps({"Exception":trainedModel})

    try:
#         print("received: " + input_df)
        
        sc = spark.sparkContext
      
        # Set inferSchema=true to prevent the float values from being seen as strings
        # which can later cause the VectorAssembler to throw an error: 'Data type StringType is not supported.'
        df = spark.read.option("inferSchema", "true").json(sc.parallelize([input_df]))
      
        #Get prediction results for the dataframe
        score = trainedModel.transform(df)
        predictions = score.collect()
        
        #Get each scored result (prediction and confidence)
        preds = [{"prediction":str(result['prediction'])} for result in predictions]
        
        response = preds[0]
        
#         print("response: " + str(response))
        
    except Exception as e:
        print("Exception in run: " + str(e))
        return json.dumps(str(e))

    # Return results
    return json.dumps(response)

Test the scoring script locally and confirm that it works as desired.

In [13]:
from mmlspark import LightGBMRegressor
model_name = "modelMain"         
model_path = Model.get_model_path(model_name)
trainedModel = PipelineModel.load(model_path)

In [14]:
dfx = spark.read.option("inferSchema", "true").json(sc.parallelize([{"pickup_longitude":-73.9667739868164, "pickup_latitude":40.75720977783203,
    "dropoff_longitude":-73.98005676269531, "dropoff_latitude":40.7390251159668, "passenger_count":1, "year":2015, "month":4, "day":1, "hour":18, "trip_distance_km":5}]))

In [15]:
import json
score = trainedModel.transform(dfx)
predictions = score.collect()
preds = [{"prediction":str(result['prediction'])} for result in predictions]
json.dumps(preds[0])

In [16]:
# Create two records for testing the prediction
test_input1 = {"pickup_longitude":-73.9667739868164, "pickup_latitude":40.75720977783203,
    "dropoff_longitude":-73.98005676269531, "dropoff_latitude":40.7390251159668, "passenger_count":1, "year":2015, "month":4, "day":1, "hour":18, "trip_distance_km":5}

test_input2 = {"pickup_longitude":-73.9667739868164, "pickup_latitude":40.75720977783203,
    "dropoff_longitude":-73.98005676269531, "dropoff_latitude":40.7390251159668, "passenger_count":4, "year":2019, "month":1, "day":23, "hour":11, "trip_distance_km":8}

# test init() in local notebook
init()

# package the inputs into a JSON string and test run() in local notebook
test_inputs = [test_input1, test_input2] 
json_str_test_inputs = json.dumps(test_inputs)
run(json_str_test_inputs)

In [17]:
from azureml.core.conda_dependencies import CondaDependencies 

myacienv = CondaDependencies.create(conda_packages=['scikit-learn','numpy','pandas'])
# myacienv = CondaDependencies()
myacienv.save_to_file(".", "mydeployenv.yml")

# with open("mydeployenv.yml","w") as f:
#     f.write(myacienv.serialize_to_string())

In [18]:
myacienv.serialize_to_string()

In [19]:
from azureml.core.webservice import AciWebservice, Webservice

aci_config = AciWebservice.deploy_configuration(
    cpu_cores = 1, 
    memory_gb = 1, 
    tags = {'name':'Taxi Fare Prediction'}, 
    description = 'Predicts a taxi fare.',
    properties = {'dns-name-label':'taxipr'})

In [20]:
service_name = "sparkmlservicedb09"
runtime = "spark-py" #"python" #
driver_file = "score_sparkml.py"
conda_file = "mydeployenv.yml"

from azureml.core.image import ContainerImage

image_config = ContainerImage.image_configuration(execution_script = driver_file,
                                                  runtime = runtime,
                                                  conda_file = conda_file)

In [21]:
webservice = Webservice.deploy_from_model(
  workspace=ws, 
  name=service_name, 
  deployment_config=aci_config,
  models = [mymodel], 
  image_config=image_config, 
  )

webservice.wait_for_deployment(show_output=True)

In [22]:
def get_area(radius):
    request = json.dumps({"radius": 12})
    response = webservice.run(input_data = json_str_test_inputs)
    return json.loads(response)

get_area(radius=(0,10))

In [23]:
# webservice.run(input_data = json.dumps(test_input1))

import requests
headers = {'Content-Type':'application/json'}

# resp = requests.post(webservice.scoring_uri, "{\"data\": [" + json.dumps(test_input2) + "]}", headers=headers)
resp = requests.post(webservice.scoring_uri, json.dumps(test_input1), headers=headers)
print("POST to url", webservice.scoring_uri)
#print("input data:", input_data)
print("prediction:", resp.text)

In [24]:
test_input2 = {"pickup_longitude":-73.9667739868164, "pickup_latitude":40.75720977783203,
    "dropoff_longitude":-73.98005676269531, "dropoff_latitude":40.7390251159668, "passenger_count":4, "year":2019, "month":1, "day":23, "hour":11, "trip_distance_km":8}
json.dumps(test_input2)

In [25]:
json.dumps(test_input2)

In [26]:
json_str_test_inputs