# Copyright (c) 2021, Microsoft

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

DO NOT USE IN PRODUCTION ENVIRONMENTS.

# Open Workspace

In [1]:
import azureml.core
from azureml.core import Workspace
from azureml.core.experiment import Experiment

ws = Workspace.from_config()
datastore = ws.get_default_datastore()
experiment = Experiment(ws, 'trump-tweets-classification')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


# Create Computes

In [2]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your training CPU cluster
cpu_cluster_name = "trump-tweets-cpu"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=10,
                                                           min_nodes=0,
                                                           idle_seconds_before_scaledown=900)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


In [3]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "trump-tweets-pl"
try:  
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2',
                                                               max_nodes=2,
                                                               min_nodes=0,
                                                               idle_seconds_before_scaledown=900)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


In [4]:
from azureml.core.compute import AksCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your AKS cluster
aks_name = 'trump-tweets-inf'
try:
    # Check for existing compute target
    aks_target = ComputeTarget(workspace=ws, name=aks_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # Use the default configuration (can also provide parameters to customize)
    prov_config = AksCompute.provisioning_configuration()

    # Create the cluster
    aks_target = ComputeTarget.create(workspace=ws, 
                                      name=aks_name, 
                                      provisioning_configuration=prov_config)

if aks_target.get_status() != "Succeeded":
    aks_target.wait_for_completion(show_output=True)

Found existing cluster, use it.


# Create DataPath Pipeline Parameter

In [5]:
from azureml.data.datapath import DataPath, DataPathComputeBinding
from azureml.pipeline.core import PipelineParameter

datapath = DataPath(datastore=datastore, path_on_datastore='upload')
data_path_pipeline_param = PipelineParameter(name="input_data", default_value=datapath)
datapath_input = (data_path_pipeline_param, DataPathComputeBinding(mode='mount'))

# Define PrepStep

In [6]:
from azureml.core import Dataset
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration
from azureml.core.datastore import Datastore
from azureml.pipeline.steps import PythonScriptStep

conda_dep = CondaDependencies()
conda_dep.add_pip_package("pandas")
conda_dep.add_pip_package("azureml-sdk")

data_prep_rcfg = RunConfiguration(conda_dependencies=conda_dep)

prep_step = PythonScriptStep(
    name="data_prep_step",
    script_name="prepare.py",
    source_directory="./script_folder/01_prep",
    arguments=["--input", datapath_input],
    inputs=[datapath_input],
    compute_target=pipeline_cluster,
    runconfig = data_prep_rcfg,
    allow_reuse=True
)

# Define HyperDriveStep

In [7]:
from azureml.core.dataset import Dataset
from azureml.core import ScriptRunConfig, Environment
from azureml.train.sklearn import SKLearn
from azureml.train.hyperdrive import (GridParameterSampling, RandomParameterSampling, BanditPolicy,
                                      HyperDriveConfig, PrimaryMetricGoal, BanditPolicy, normal,
                                      uniform, choice)
from azureml.pipeline.steps import HyperDriveStep
from azureml.pipeline.core import PipelineData, TrainingOutput

proper_ds = Dataset.get_by_name(ws, name='trump-tweets-prepared')
proper_ds_named = proper_ds.as_named_input('trumptweetsprepared')

environment = Environment("proper-training")
environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=["scikit-learn", "pandas"], pip_packages=["azureml-defaults",],
)

#src = ScriptRunConfig(source_directory="./script_folder/02_train",
#                      script='training.py',
#                      compute_target=cpu_cluster_name,
#                      environment=environment)

est = SKLearn(compute_target=cpu_cluster_name,
              entry_script='training.py',
              source_directory="./script_folder/02_train",
              conda_packages=['matplotlib'],
              pip_packages=['azureml-dataset-runtime[pandas,fuse]'])

param_sampling = RandomParameterSampling( {
        "min_doc_freq": choice(range(2,10)),
        "max_doc_freq": uniform(0.4,0.7),
        "n_estimators": choice(range(1,1000)),
        "min_class_frequency": choice(range(1,50))
    }
)

hd_config = HyperDriveConfig(estimator=est, 
                            hyperparameter_sampling=param_sampling,
                            policy=BanditPolicy(evaluation_interval=3, slack_amount=0.05),
                            primary_metric_name='Accuracy', 
                            primary_metric_goal=PrimaryMetricGoal.MAXIMIZE, 
                            max_total_runs=10,
                            max_concurrent_runs=4)

metrics_data = PipelineData(name='metrics_data',
                            datastore=datastore,
                            pipeline_output_name='metrics_output',
                            training_output=TrainingOutput(type="Metrics"))

saved_model = PipelineData(name='saved_model',
                            datastore=datastore,
                            pipeline_output_name='model_output',
                            is_directory=True,
                            training_output=TrainingOutput(type='Model'))

hd_step = HyperDriveStep(
    name='hyperdrive_step',
    hyperdrive_config=hd_config,
    estimator_entry_script_arguments=['--input-data', proper_ds_named],
    inputs=[proper_ds_named],
    outputs=[metrics_data, saved_model],
    allow_reuse=True)

hd_step.run_after(prep_step)

'SKLearn' estimator is deprecated. Please use 'ScriptRunConfig' from 'azureml.core.script_run_config' with your own defined environment or the AzureML-Tutorial curated environment.


# Define Register Model and Deploy Endpoint Step

In [8]:
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-sdk")

register_model_rcfg = RunConfiguration(conda_dependencies=conda_dep)

register_model_step = PythonScriptStep(script_name='register_model.py',
                                       source_directory="./script_folder/03_register_model",
                                       name="register_model_step",
                                       compute_target=pipeline_cluster,
                                       allow_reuse=True,
                                       runconfig=register_model_rcfg)

register_model_step.run_after(hd_step)

In [9]:
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-sdk")

deploy_rcfg = RunConfiguration(conda_dependencies=conda_dep)

deploy_step = PythonScriptStep(
    name="deploy_step",
    script_name="deploy.py",
    source_directory="./script_folder/04_deploy",
    arguments=[],
    inputs=[],
    compute_target=pipeline_cluster,
    runconfig = deploy_rcfg,
    allow_reuse=True
)

deploy_step.run_after(register_model_step)

# Build and Run / Publish Pipeline

In [10]:
steps = [prep_step, hd_step, register_model_step, deploy_step]
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline_run = experiment.submit(pipeline)
pipeline_run.wait_for_completion()

#published_pipeline = pipeline_run.publish_pipeline(
#     name="Trump Tweets Classification Pipeline",
#     description="Re-run prep, training and deploying when new data arrives.",
#     version="0.1")

t /mnt/batch/tasks/shared/LS_root/jobs/mlw-test/azureml/1ab865ad-7093-49c5-b97e-5829e0486ba0/mounts/workspaceblobstore
>>>   2021/04/19 07:59:33 Successfully mounted azureml-blobstore-235e526e-18a0-41b8-81aa-3b9bf4be3c17 container from mlwteststorage6cc78d867d account at /mnt/batch/tasks/shared/LS_root/jobs/mlw-test/azureml/1ab865ad-7093-49c5-b97e-5829e0486ba0/mounts/workspaceblobstore
>>>   2021/04/19 07:59:33 No unmanaged file systems configured
>>>   2021/04/19 07:59:33 Start to getting gpu count by running nvidia-smi command
>>>   2021/04/19 07:59:33 From the policy service, the filtering patterns is: , data store is 
>>>   2021/04/19 07:59:33 Creating directory /mnt/batch/tasks/shared/LS_root/jobs/mlw-test/azureml/1ab865ad-7093-49c5-b97e-5829e0486ba0/mounts/workspaceblobstore/azureml/1ab865ad-7093-49c5-b97e-5829e0486ba0/azureml_compute_logs
>>>   2021/04/19 07:59:33 Creating directory /mnt/batch/tasks/shared/LS_root/jobs/mlw-test/azureml/1ab865ad-7093-49c5-b97e-5829e0486ba0/mounts

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "AzureMLCompute job failed.\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\n\tReason: Job failed with non-zero exit Code",
        "messageFormat": "{Message}",
        "messageParameters": {
            "Message": "AzureMLCompute job failed.\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\n\tReason: Job failed with non-zero exit Code"
        },
        "details": [],
        "innerError": {
            "code": "UserTrainingScriptFailed"
        }
    },
    "correlation": {
        "operation": null,
        "request": "a5a74abd014c2d93"
    },
    "environment": "westeurope",
    "location": "westeurope",
    "time": "2021-04-19T08:00:04.140642Z",
    "componentName": "execution-worker"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"AzureMLCompute job failed.\\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\\n\\tReason: Job failed with non-zero exit Code\",\n        \"messageFormat\": \"{Message}\",\n        \"messageParameters\": {\n            \"Message\": \"AzureMLCompute job failed.\\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\\n\\tReason: Job failed with non-zero exit Code\"\n        },\n        \"details\": [],\n        \"innerError\": {\n            \"code\": \"UserTrainingScriptFailed\"\n        }\n    },\n    \"correlation\": {\n        \"operation\": null,\n        \"request\": \"a5a74abd014c2d93\"\n    },\n    \"environment\": \"westeurope\",\n    \"location\": \"westeurope\",\n    \"time\": \"2021-04-19T08:00:04.140642Z\",\n    \"componentName\": \"execution-worker\"\n}"
    }
}

In [11]:
#published_pipelines = PublishedPipeline.list(ws)
#for published_pipeline in published_pipelines:
#    print(f"{published_pipeline.name}, '{published_pipeline.id}', { published_pipeline.version}")
#PublishedPipeline.get(ws, id="d37e9c60-5316-40b5-87af-0e9de702b500").disable()

Sample 1: Regression - Automobile Price Prediction (Basic) 08-11-2020-02-12, '46184478-663f-4a88-86c1-f092aeadf6b9', None
