In [None]:
import os

directory = './scripts'

if not os.path.exists(directory):
    os.makedirs(directory)

In [17]:
% % writefile scripts / score.py

import argparse
import os
import tempfile

import joblib
import pandas as pd
from azureml.core import Dataset, Datastore, Workspace
from azureml.core.model import Model
from azureml.core.run import Run


def get_model(ws):
    print("Get the model")
    model_name = 'MaxAbsScalerExtremeRandomTrees'
    print("Model name=", model_name)
    model_path = Model.get_model_path(model_name, _workspace=ws, version=1)
    model = joblib.load(model_path)
    return model


def get_dataset(run_ctx):
    print("Get the dataset")
    dataset = run_ctx.input_datasets['CustomerInference_dataset']
    data = dataset.to_pandas_dataframe()
    print('Dataset= ', dataset)
    print('Columns= ', data.columns)
    return data


def get_local_dataset(ws):
    print("Get the dataset")
    dataset = Dataset.get_by_name(ws, name='CustomerDataDataset')
    data = dataset.to_pandas_dataframe()
    print('Dataset= ', dataset)
    print('Columns= ', data.columns)
    return data


def score(data, model):
    df = data.astype({'PostCode': 'float64', 'RewardPoints': 'float64'}).fillna(0)
    df.drop(columns=['Income', 'IncomeAML'], axis=1, inplace=True, errors='ignore')
    print("Origin columns=", df.columns)
    df.columns = ['CustomerId', 'FirstName', 'LastName', 'FullName', 'DateOfBirth', 'Gender', 'EMail',
                  'EmailSubscriber60352436', 'Telephone', 'PostCode', 'StreetAddress', 'City', 'State', 'Country',
                  'CreatedOn', 'Headshot', 'LoyaltyTier2147257320', 'Occupation', 'CustomerSatisfaction',
                  'pos_posCustomer_LoyaltyId', 'RewardPoints', 'CreditCard', 'ecommerce_eCommerceContacts_ContactId',
                  'ecommerce_eCommerceContacts_ContactId_Alternate', 'PoS_PoSContacts_LoyaltyId',
                  'pos_posCustomer_LoyaltyId_Alternate']
    print("Columns used for prediction ", df.columns)
    predictions = model.predict(df)
    results = pd.concat(
        [pd.Series(df['CustomerId'], name="CustomerID"), pd.Series(predictions, name="IncomePrediction")],
        axis=1)
    results_df = results.set_index("CustomerID")
    print("Result columns are ", results_df.columns)
    return results_df


def save_results(ws, output_datastore, output_path, results_df):
    datastore = Datastore.get(ws, output_datastore)
    directory_name = os.path.dirname(output_path)
    print("Extracting Directory {} from path {}".format(directory_name, output_path))
    output_folder = tempfile.TemporaryDirectory(dir="/tmp")
    filename = os.path.join(output_folder.name, os.path.basename(output_path))
    print("Filename =", filename)
    results_df.to_csv(filename)
    Dataset.File.upload_directory(src_dir=output_folder.name, target=(datastore, directory_name), overwrite=True)


def parse_arguments():
    parser = argparse.ArgumentParser("Start scoring")
    parser.add_argument("--input_data", dest='input_data', type=str, help="input data")
    parser.add_argument('--output_path', dest='output_path', required=True)
    parser.add_argument('--output_datastore', dest='output_datastore', required=True)
    parser.add_argument('--mode', dest='mode', required=False, default="RUN")
    args, _ = parser.parse_known_args()
    input_data = args.input_data
    output_path = args.output_path
    output_datastore = args.output_datastore
    mode = args.mode
    print("Input data=", input_data)
    print("Output Datastore=", output_datastore)
    print("Output Path=", output_path)
    print("Running the scoring script in a mode", mode)
    return output_path, output_datastore, mode


def get_workspace(run_ctx):
    return run_ctx.experiment.workspace


def get_run_context():
    return Run.get_context()


if __name__ == '__main__':
    print("Version: 0.0.8")
    output_path, output_datastore, mode = parse_arguments()
    if mode == 'RUN':
        run_ctx = get_run_context()
        ws = get_workspace(run_ctx)
        data = get_dataset(run_ctx)
    else:
        ws = Workspace.from_config()
        data = get_local_dataset(ws)

    model = get_model(ws)
    result = score(data, model)
    save_results(ws, output_datastore, output_path, result)
    result


Overwriting scripts/score.py


In [15]:
# First run and debug mode

!python scripts / score.py --mode DEBUG --output_datastore workspaceblobstore --output_path CustomerIncomeOutput / CustomerIncomeOutput.csv

Version: 0.0.7
Input data= None
Output Datastore= workspaceblobstore
Output Path= CustomerIncomeOutput/CustomerIncomeOutput.csv
Mode is DEBUG
Get the dataset
Dataset=  TabularDataset
{
  "source": [
    "('ci_5091543d_bb80_4c41_aa07_4cfb02c3381d', 'intelligence/fe28789e-8926-413e-937a-6a2fdc3fd47e/input/Customer/Customer_1.csv')"
  ],
  "definition": [
    "GetDatastoreFiles",
    "ParseDelimited",
    "DropColumns",
    "SetColumnTypes"
  ],
  "registration": {
    "id": "0c928c21-ce27-4b72-b5f1-32a9159faaf8",
    "name": "BugMonday",
    "version": 1,
    "workspace": "Workspace.create(name='ci_aml_main_workspace', subscription_id='202fb1eb-7a41-4c55-8a8d-7f47e105246c', resource_group='ci_aml_custom_model')"
  }
}
Columns=  Index(['CustomerId', 'FirstName', 'LastName', 'FullName', 'DateOfBirth',
       'Gender', 'EMail', 'EmailSubscriber60352436', 'Telephone', 'PostCode',
       'StreetAddress', 'City', 'State', 'Country', 'CreatedOn', 'Headshot',
       'LoyaltyTier2147257320', 'Inc

In [18]:
import os

import joblib
from azureml.core import Dataset, Environment, Experiment, Workspace, ScriptRunConfig
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core.model import Model
from azureml.core.runconfig import RunConfiguration
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import Pipeline, PipelineEndpoint, PipelineParameter
from azureml.pipeline.steps import PythonScriptStep

ws = Workspace.from_config()

aml_compute_target = "cluster"

In [19]:
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("Found existing compute target: {}".format(aml_compute_target))
except ComputeTargetException:
    print("Creating new compute target: {}".format(aml_compute_target))

    provisioning_config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D2_V2",
                                                                min_nodes=1,
                                                                max_nodes=2)
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)


Found existing compute target: cluster


In [22]:
CustomerInference_dataset = Dataset.get_by_name(ws, name='CustomerDataDataset')
CustomerInference_pipeline_param = PipelineParameter(name="CustomerInference_pipeline_param",
                                                     default_value=CustomerInference_dataset)
CustomerInference_ds_consumption = DatasetConsumptionConfig("CustomerInference_dataset",
                                                            CustomerInference_pipeline_param)

OutputPathParameter = PipelineParameter(name="output_path",
                                        default_value="CustomerIncomeOutput/CustomerIncomeOutput.csv")
OutputDatastoreParameter = PipelineParameter(name="output_datastore", default_value="workspaceblobstore")

env = Environment.from_conda_specification(name='env', file_path='./CustomerIncomeInferencePipeline.yml')

run_config = RunConfiguration()
run_config.environment = env

scoring = PythonScriptStep(
    name="Scoring_Step",
    script_name="score.py",
    arguments=["--input_dataset", CustomerInference_ds_consumption, "--output_path", OutputPathParameter,
               "--output_datastore", OutputDatastoreParameter],
    inputs=[CustomerInference_ds_consumption],
    compute_target=aml_compute_target,
    source_directory='./scripts',
    runconfig=run_config,
    allow_reuse=False)

print("Scoring step created")

pipeline = Pipeline(workspace=ws, steps=[scoring])

print("Pipeline is built")

Scoring step created
Pipeline is built


In [24]:
# published_pipeline.disable()
pipeline_endpoint_name = 'IncomePrediction'
experiment_name = 'ci-aml-custom-models'

pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name=pipeline_endpoint_name, pipeline=pipeline,
                                             description="Income Endpoint")
published_pipeline = pipeline.publish(name=pipeline_endpoint_name, description="published pipeline")
pipeline_endpoint = PipelineEndpoint.get(workspace=ws, name=pipeline_endpoint_name)
pipeline_endpoint.add_default(pipeline=published_pipeline)
published_pipeline

pipeline_test = Experiment(ws, experiment_name).submit(pipeline, pipeline_parameters={
    "CustomerInference_pipeline_param": CustomerInference_dataset})
pipeline_test.get_details()
pipeline_test.wait_for_completion(show_output=True)

Created step Scoring_Step [62764377][94d3bb55-523f-4692-8f1f-2fe768c969ea], (This step will run and generate new outputs)
Submitted PipelineRun 5889e70d-7252-4be5-8ee5-112c1d9b5a8f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/5889e70d-7252-4be5-8ee5-112c1d9b5a8f?wsid=/subscriptions/202fb1eb-7a41-4c55-8a8d-7f47e105246c/resourcegroups/ci_aml_custom_model/workspaces/ci_aml_main_workspace&tid=5e5d1d17-c4b2-4b93-8e7f-1370db4f1d6c
PipelineRunId: 5889e70d-7252-4be5-8ee5-112c1d9b5a8f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/5889e70d-7252-4be5-8ee5-112c1d9b5a8f?wsid=/subscriptions/202fb1eb-7a41-4c55-8a8d-7f47e105246c/resourcegroups/ci_aml_custom_model/workspaces/ci_aml_main_workspace&tid=5e5d1d17-c4b2-4b93-8e7f-1370db4f1d6c
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: e618e5a4-3f07-41cd-a385-b1ac5d5dd310
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/e618e5a4-3f07-41cd-a385-b1ac5d5dd310?wsid=/subscriptions/

'Finished'