# Create a Machine Learning Pipeline using Azure SDK

In this hands-on lab scenario you are a Data Scientist for Awesome Company. Recently the company has been using Azure Machine Learning Service to implement and run their machine learning workloads. As part of this effort, you are learning to create pipelines programmatically using the Azure SDK. You'll be utilizing an open dataset on diabetes in order to help Awesome Company build a pipeline for diabetes research.

To accomplish your goal, the following should be completed:
* Create a workspace
* Create compute resources
* Create and run a pipeline

## Connect to your workspace

To get started, connect to your workspace.

In [7]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.27.0 to work with lkftest


## Prepare data

Awesome Company is using an open dataset on diabetes patients as part of their research. The cell below will create the required dataset.

In [8]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


## Create a folder for our python scripts

In [9]:
import os
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


## Create a data preparation script

In [10]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# Load the data
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# Remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Writing diabetes_pipeline/prep_diabetes.py


## Create a script to train the model

In [12]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-folder", type=str, dest='training_folder', help='training data folder')
args = parser.parse_args()
training_folder = args.training_folder

# Get the experiment run context
run = Run.get_context()

# Load the prepared data file
print("Loading Data...")
file_path = os.path.join(training_folder,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# Calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# Calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Overwriting diabetes_pipeline/train_diabetes.py


## Assign the compute target

In [13]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "ac-aml"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


## Ensure the necessary packages are installed on your compute

In [14]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib','pandas','pip'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]','pyarrow'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment 
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Define the pipeline

In [15]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData for the model folder
prepped_data_folder = PipelineData("prepped_data_folder", datastore=ws.get_default_datastore())

# Step 1: Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data_folder],
                                outputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2: Run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-folder', prepped_data_folder],
                                inputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


## Build the pipeline from the defined steps

In [16]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Prepare Data [b30ff547][9b56d1e0-5d20-4a99-8c82-4444010bfd1b], (This step will run and generate new outputs)Created step Train and Register Model [b0be7842][0f96d972-d4ff-454d-bb1b-a5d35331c7d3], (This step will run and generate new outputs)

Submitted PipelineRun 047ec2f9-08ed-48b0-b903-f4695490d92f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/047ec2f9-08ed-48b0-b903-f4695490d92f?wsid=/subscriptions/b7bf924e-67c3-4e5b-a5a7-3a2988311e4c/resourcegroups/dp-100/workspaces/lkftest&tid=e54f6a3d-b8e3-4e00-bbbf-c7a65d1e76b0
Pipeline submitted for execution.
PipelineRunId: 047ec2f9-08ed-48b0-b903-f4695490d92f
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/047ec2f9-08ed-48b0-b903-f4695490d92f?wsid=/subscriptions/b7bf924e-67c3-4e5b-a5a7-3a2988311e4c/resourcegroups/dp-100/workspaces/lkftest&tid=e54f6a3d-b8e3-4e00-bbbf-c7a65d1e76b0
PipelineRun Status: NotStarted


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRun Status: Running


StepRunId: 8b7674e5-b587-4fac-aeff-7b0d59fc577e
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/8b7674e5-b587-4fac-aeff-7b0d59fc577e?wsid=/subscriptions/b7bf924e-67c3-4e5b-a5a7-3a2988311e4c/resourcegroups/dp-100/workspaces/lkftest&tid=e54f6a3d-b8e3-4e00-bbbf-c7a65d1e76b0
StepRun( Prepare Data ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_b8452806c64ead682d779f49c85ab1dde905589d561602cce286aa7980a922bd_d.txt
2021-05-18T15:00:48Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/lkftest/azureml/8b7674e5-b587-4fac-aeff-7b0d59fc577e/mounts/workspaceblobstore
2021-05-18T15:00:49Z Starting output-watcher...
2021-05-18T15:00:49Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2021-05-18T15:00:49Z Executing 'Copy ACR Details file' on 10.0.0.5
2021-05-18T15:00:49Z Copy ACR Details file succeeded on 10.0.0.5. Output: 
>>>   
>>>   
Login Succeeded
Using default tag: latest
lat

'Finished'