In [None]:
import azureml.core
from azureml.core import Workspace, Environment, Experiment, Datastore, Dataset, ScriptRunConfig
from azureml.core.compute import ComputeTarget, AmlCompute, DatabricksCompute
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration
from azureml.exceptions import ComputeTargetException
from azureml.pipeline.steps import HyperDriveStep, HyperDriveStepRun, PythonScriptStep, DatabricksStep
from azureml.pipeline.core import Pipeline, PipelineData, TrainingOutput
from azureml.train.hyperdrive import RandomParameterSampling, BanditPolicy, HyperDriveConfig, PrimaryMetricGoal
from azureml.train.hyperdrive import choice, loguniform
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineParameter

import os
import shutil
import urllib
import numpy as np
import matplotlib.pyplot as plt

import mlflow
import mlflow.sklearn


# Check core SDK version number
print("SDK version:", azureml.core.VERSION)


In [None]:
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

In [None]:
script_folder = './pipeline-folder-all'
# os.makedirs(script_folder, exist_ok=True)

exp = Experiment(workspace=ws, name='AML_Pipeline_Comp')

In [None]:
# Replace with your account info before running.
 
db_compute_name=os.getenv("DATABRICKS_COMPUTE_NAME", "ADBCluster") # Databricks compute name
db_resource_group=os.getenv("DATABRICKS_RESOURCE_GROUP", "<my-db-resource-group>") # Databricks resource group
db_workspace_name=os.getenv("DATABRICKS_WORKSPACE_NAME", "<my-db-workspace-name>") # Databricks workspace name
db_access_token=os.getenv("DATABRICKS_ACCESS_TOKEN", "<my-access-token>") # Databricks access token

try:
    databricks_compute = DatabricksCompute(workspace=ws, name=db_compute_name)
    print('Compute target {} already exists'.format(db_compute_name))
except ComputeTargetException:
    print('Compute not found, will use below parameters to attach new one')
    print('db_compute_name {}'.format(db_compute_name))
    print('db_resource_group {}'.format(db_resource_group))
    print('db_workspace_name {}'.format(db_workspace_name))
    print('db_access_token {}'.format(db_access_token))
 
    config = DatabricksCompute.attach_configuration(
        resource_group = db_resource_group,
        workspace_name = db_workspace_name,
        access_token= db_access_token)
    databricks_compute=ComputeTarget.attach(ws, db_compute_name, config)
    databricks_compute.wait_for_completion(True)


In [None]:
import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris()
df = pd.DataFrame(iris['data'], columns = iris['feature_names'])
df['target'] = iris['target']

display(df.head())

In [None]:
df.to_csv(os.path.join(script_folder, 'datasets', 'iris.csv'), index=False)

In [None]:
# Use the default blob storage
datastore = Datastore(ws, "generalpurposeaccount")
print('Datastore {} will be used'.format(datastore.name))

# We are uploading a sample file in the local directory to be used as a datasource
datastore.upload_files(files=[os.path.join(script_folder, 'datasets', "iris.csv")], target_path="pipeline_inputdataset", overwrite=False)


In [None]:
# from azureml.pipeline.core import PipelineParameter
# pipeline_param = PipelineParameter(name="my_pipeline_param", default_value="pipeline_param1")

step_1_input = DataReference(datastore=datastore, path_on_datastore="pipeline_inputdataset",
                                     data_reference_name="input")

step_1_processed_data = PipelineData("processed_data", datastore=datastore)

In [None]:
from azureml.core.runconfig import EggLibrary, PyPiLibrary
# egg_lib1 = EggLibrary(library="dbfs:/FileStore/tables/test_package_0_1_py3_8.egg")

source_directory = os.path.join(script_folder, 'databricks')
python_script_name = "data_prep.py"

dbNbStep = DatabricksStep(
    name="DatabricksDataPrep",
    inputs=[step_1_input],
    outputs=[step_1_processed_data],
    source_directory=source_directory,
    python_script_name=python_script_name,
    python_script_params=['--input_filename', 'iris.csv', 
                          '--output_filename', 'iris_prep.parquet'],
    # existing_cluster_id="0908-123935-balm94",
    # permit_cluster_restart=True,

    run_name='DatabricksDataPrep',
    compute_target=databricks_compute,
    num_workers=1,
    pypi_libraries=[PyPiLibrary('azureml-sdk')],
    allow_reuse=False
)

# permit_cluster_restart=True,
# notebook_name="/Zillow/TestNotebook",
# pypi_libraries=[PyPiLibrary('azureml-sdk[automl]')],
# num_workers=1,

In [None]:
%%writefile conda_dependencies.yml

dependencies:
- python=3.6.2
- scikit-learn
- pip:
  - azureml-defaults


In [None]:
from azureml.core import Environment

# sklearn_env = Environment.from_conda_specification(name = 'sklearn-env', file_path = './conda_dependencies.yml')
sklearn_env = Environment.get(workspace=ws, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu")


In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
cluster_name = "hd-cluster"

try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target')
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', 
                                                           min_nodes=0, max_nodes=4)

    # create the cluster
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

# can poll for a minimum number of nodes and for a specific timeout. 
# if no min node count is provided it uses the scale settings for the cluster
compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

# use get_status() to get a detailed status for the current cluster. 
print(compute_target.get_status().serialize())


In [None]:
from azureml.core import ScriptRunConfig

project_folder = os.path.join(script_folder, 'train')

scriptConf = ScriptRunConfig(source_directory=project_folder,
                      script='train_iris.py',
                      compute_target=compute_target,
                      environment=sklearn_env)


In [None]:
# run = Experiment(ws, 'IrisStep').submit(scriptConf)

In [None]:
from azureml.train.hyperdrive.runconfig import HyperDriveConfig
from azureml.train.hyperdrive.sampling import RandomParameterSampling
from azureml.train.hyperdrive.run import PrimaryMetricGoal
from azureml.train.hyperdrive.parameter_expressions import choice

param_sampling = RandomParameterSampling( {
    "--kernel": choice('linear', 'rbf', 'poly', 'sigmoid'),
    "--penalty": choice(0.5, 1, 1.5)
    }
)

hyperdrive_config = HyperDriveConfig(run_config=scriptConf,
                                     hyperparameter_sampling=param_sampling, 
                                     primary_metric_name='Accuracy',
                                     primary_metric_goal=PrimaryMetricGoal.MAXIMIZE,
                                     max_total_runs=4,
                                     max_concurrent_runs=4)


In [None]:
metrics_output_name = 'metrics_output'
metrics_data = PipelineData(name='metrics_data',
                            datastore=datastore,
                            pipeline_output_name=metrics_output_name,
                            training_output=TrainingOutput("Metrics"))

model_output_name = 'model_output'
saved_model = PipelineData(name='saved_model',
                            datastore=datastore,
                            pipeline_output_name=model_output_name,
                            training_output=TrainingOutput("Model",
                                                           model_file="outputs/model.joblib"))

hd_step_name='hd_step01'
hd_step = HyperDriveStep(
    name=hd_step_name,
    hyperdrive_config=hyperdrive_config,
    estimator_entry_script_arguments=['--input_data', step_1_processed_data, '--input_filename', 'iris_prep.parquet'],
    inputs=[step_1_processed_data],
    allow_reuse=False,
    outputs=[metrics_data, saved_model])

In [None]:
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-sdk")

rcfg = RunConfiguration(conda_dependencies=conda_dep)

single_node_compute_target = ComputeTarget(workspace=ws, name='singlenode')

source_directory_m_reg = os.path.join(script_folder, 'register')
python_script_name = "eval_register_model.py"

register_model_step = PythonScriptStep(source_directory=source_directory_m_reg,
                                       script_name=python_script_name,
                                       name="register_model_step",
                                       inputs=[saved_model, metrics_data],
                                       compute_target=single_node_compute_target,
                                       arguments=["--saved-model", saved_model, '--metrics', metrics_data, '--model_name', 'iris_model_pipeline'],
                                       allow_reuse=True,
                                       runconfig=rcfg)

# register_model_step.run_after(hd_step)

In [None]:
# run = Experiment(ws, 'IrisStep').submit(scriptConf)
steps = [dbNbStep, hd_step, register_model_step]
pipeline = Pipeline(workspace=ws, steps=steps)
run = exp.submit(pipeline)
# pipeline_run = exp.submit(pipeline)
# pipeline_run.wait_for_completion()
run

In [None]:
from azureml.pipeline.core import PipelineEndpoint
p_endpoint = PipelineEndpoint.publish(workspace=ws, name="prep_train_reg", pipeline=pipeline, description="")