# Pipeline Components
The objective of this notebook is to create a pipeline for preprocessing and traning of a model. 

    + The prepprocessing is a separate component that is run on a sepcific node
    + The train component is also executing on a separate node.

## Important for using Copilot
Copilot disrupts the internal features such as intelliscence feautres, .e.g., autocompletion. To avoide its suggestion automatically, from the setting disable automatic inline suggestion.
Then, you can get the suggestion by alt + \ .

In [None]:
# !pip uninstall azure-ai-ml
!pip install azure-ai-ml

!pip show azure-ai-ml

In [25]:
# Connecting to workspace
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [26]:
# Get a handler to the workspace and Datastore
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


In [7]:
# Get the list of compute nodes and print them
compute_list = ml_client.compute.list()
for compute in compute_list:
    print(compute.name)

# get the list of datasets and print them
dataset_list = ml_client.datastores.list()
for dataset in dataset_list:
    print(dataset.name)

# Get the list of environment
evs = ml_client.environments.list()
for e in evs:
    print(e.name)


farbodtaymouri1
farbodtaymouri2
azureml_globaldatasets
workspaceworkingdirectory
workspaceartifactstore
workspacefilestore
workspaceblobstore
AzureML-ACPT-pytorch-1.13-py38-cuda11.7-gpu
AzureML-ACPT-pytorch-1.12-py38-cuda11.6-gpu
AzureML-ACPT-pytorch-1.12-py39-cuda11.6-gpu
AzureML-ACPT-pytorch-1.11-py38-cuda11.5-gpu
AzureML-ACPT-pytorch-1.11-py38-cuda11.3-gpu
AzureML-responsibleai-0.21-ubuntu20.04-py38-cpu
AzureML-responsibleai-0.20-ubuntu20.04-py38-cpu
AzureML-tensorflow-2.5-ubuntu20.04-py38-cuda11-gpu
AzureML-tensorflow-2.6-ubuntu20.04-py38-cuda11-gpu
AzureML-tensorflow-2.7-ubuntu20.04-py38-cuda11-gpu
AzureML-sklearn-1.0-ubuntu20.04-py38-cpu
AzureML-pytorch-1.10-ubuntu18.04-py38-cuda11-gpu
AzureML-pytorch-1.9-ubuntu18.04-py37-cuda11-gpu
AzureML-pytorch-1.8-ubuntu18.04-py37-cuda11-gpu
AzureML-sklearn-0.24-ubuntu18.04-py37-cpu
AzureML-lightgbm-3.2-ubuntu18.04-py37-cpu
AzureML-pytorch-1.7-ubuntu18.04-py37-cuda11-gpu
AzureML-tensorflow-2.4-ubuntu18.04-py37-cuda11-gpu
AzureML-Triton
Azure

In [33]:
# Getting the status of compute nodes
from azure.ai.ml.entities import ComputeInstance, AmlCompute

# Creat a function from the above code that takes an input paramter for turning nodes on or off
def compute_node_status(status):
    
    compute_list = ml_client.compute.list()
    for compute in compute_list:
        ci_name = compute.name
        ci_basic_state = ml_client.compute.get(ci_name)
        print(compute.name, ci_basic_state.state)

        # Running if a node is stopped
        if status == 'on':
            if ci_basic_state.state == "Stopped":
                ml_client.compute.begin_start(ci_name).wait()
                print(ci_name, "is starting")
        else:
            if ci_basic_state.state == "Running":
                ml_client.compute.begin_stop(ci_name).wait()
                print(ci_name, "is stopping")



compute_node_status(status = 'on')


farbodtaymouri1 Running
farbodtaymouri2 Running


In [21]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


In [19]:
%%writefile $script_folder/prep-data.py
# import libraries
import argparse
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler
import mlflow

def main(args):
    # read data
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    normalized_data = normalize_data(cleaned_data)
    print('args.output_data: ', args.output_data)

    # See https://docs.python.org/3/library/pathlib.html#concrete-paths
    output_df = normalized_data.to_csv((Path(args.output_data) / "diabetes.csv"), index = False)

# function that reads the data
def get_data(path):
    df = pd.read_csv(path)

    # Count the rows and print the result
    row_count = (len(df))
    print('Preparing {} rows of data'.format(row_count))
    
    return df

# function that removes missing values
def clean_data(df):
    df = df.dropna()
    
    return df

# function that normalizes the data
def normalize_data(df):
    scaler = MinMaxScaler()
    num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
    df[num_cols] = scaler.fit_transform(df[num_cols])

    return df

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting src/prep-data.py


In [23]:
# Reading the data asset
registered_data_asset  = ml_client.data.get(name ='diabetes-local', version = 1)
df = pd.read_csv(registered_data_asset.path)
df.head(10)


NameError: name 'pd' is not defined

In [7]:
%%writefile $script_folder/train-model.py
# import libraries
import mlflow
import argparse
import glob
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt
from mlflow.models import infer_signature



from mlflow.pyfunc import PythonModel, PythonModelContext
# https://learn.microsoft.com/en-us/azure/machine-learning/how-to-log-mlflow-models?view=azureml-api-2&tabs=wrapper#logging-custom-models
class ModelWrapper(PythonModel):
    def __init__(self, model):
        self._model = model

    def predict(self, context: PythonModelContext, data):
        # You don't have to keep the semantic meaning of `predict`. You can use here model.recommend(), model.forecast(), etc
        return self._model.predict_proba(data)

    # You can even add extra functions if you need to. Since the model is serialized,
    # all of them will be available when you load your model back.
    def predict_batch(self, data):
        pass





def main(args):
    # read data
    df = get_data(args.training_data)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train_model(args.n_estimators, args.max_depth, args.model_output, X_train, X_test, y_train, y_test)

    # evaluate model
    eval_model(model, X_test, y_test)

# # function that reads the data
# def get_data(path):
#     print("Reading data...")
#     df = pd.read_csv(path)
    
#     return df

# function that reads the data
def get_data(data_path):

    # Read all csv files in the data directory
    all_files = glob.glob(data_path + "/*.csv")
    print('all_files in get-data(): ', all_files)
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    
    return df

# function that splits the data
def split_data(df):
    print("Splitting data...")
    X, y = df[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness',
    'SerumInsulin','BMI','DiabetesPedigree','Age']].values, df['Diabetic'].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

    return X_train, X_test, y_train, y_test

# function that trains the model
def train_model(n_estimators, max_depth, model_output, X_train, X_test, y_train, y_test):

    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)


    clf = RandomForestClassifier(max_depth = max_depth, n_estimators = n_estimators, random_state=0)
    model = clf.fit(X_train, y_train)


    # mlflow.log_param("Regularization rate", reg_rate)
    # print("Training model...")
    # model = LogisticRegression(C=1/reg_rate, solver="liblinear").fit(X_train, y_train)

    y_probs = model.predict_proba(X_test)

    #Logging the model artifact
    signature = infer_signature(X_test, y_probs)
    mlflow.pyfunc.log_model("RFclassifier", 
                            python_model=ModelWrapper(model),
                            signature=signature)
    

    return model

# function that evaluates the model
def eval_model(model, X_test, y_test):
    # calculate accuracy
    y_hat = model.predict(X_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', acc)
    mlflow.log_metric("Accuracy", acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))
    mlflow.log_metric("AUC", auc)

    # plot ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
    fig = plt.figure(figsize=(6, 4))
    # Plot the diagonal 50% line
    plt.plot([0, 1], [0, 1], 'k--')
    # Plot the FPR and TPR achieved by our model
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.savefig("ROC-Curve.png")
    mlflow.log_artifact("ROC-Curve.png")    

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--n_estimators", dest='n_estimators',
                        type=int, default=10)
    parser.add_argument("--max_depth", dest='max_depth',
                        type=int, default=3)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting src/train-model.py


### Define the Yaml file for the components


In [34]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prep-data.yml


In [33]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a decision tree classifier model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  n_estimators:
    type: integer
    default: 5
  max_depth:
    type: integer
    default: 3
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --n_estimators  ${{inputs.n_estimators}}
  --max_depth ${{inputs.max_depth}}
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


## Loading components

In [9]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_decision_tree = load_component(source=parent_dir + "./train-model.yml")


## Build Pipeline

In [10]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def diabetes_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    clean_data.compute = 'farbodtaymouri1' # Assigning the component to a compute node

    train_model = train_decision_tree(training_data=clean_data.outputs.output_data, n_estimators=10, max_depth=3)
    train_model.compute = 'farbodtaymouri2'

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }


pipeline_job = diabetes_classification(Input(type=AssetTypes.URI_FILE, path="azureml:diabetes-local:1"))

In [44]:
print(pipeline_job)

display_name: diabetes_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:diabetes-local:1
outputs:
  pipeline_job_transformed_data:
    type: uri_folder
  pipeline_job_trained_model:
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    compute: azureml:farbodtaymouri1
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-p

In [11]:
# change the output mode (VERY IMPORTANT)
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

# # print the pipeline job again to review the changes
# print(pipeline_job)

## Submit the job

In [12]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_diabetes1"
)
pipeline_job

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Experiment,Name,Type,Status,Details Page
pipeline_diabetes1,coral_guava_tdwg7lxjf2,pipeline,Preparing,Link to Azure Machine Learning studio


## Schedule a pipeline

In [None]:
import datetime
from azure.ai.ml import RecurrencePattern, RecurrenceTrigger, TimeZone, JobSchedule

schedule_name = "simple_sdk_create_schedule_recurrence"

schedule_start_time = datetime.datetime.utcnow()
schedule_start_time = schedule_start_time + datetime.timedelta(minutes=2)

recurrence_trigger = RecurrenceTrigger(
    frequency="day",
    interval=1,
    schedule=RecurrencePattern(hours=10, minutes=[0, 1]),
    start_time=schedule_start_time,
    time_zone=TimeZone.UTC,
)

job_schedule = JobSchedule(
    name=schedule_name, trigger=recurrence_trigger, create_job=pipeline_job
)

In [None]:
job_schedule = ml_client.schedules.begin_create_or_update(
    schedule=job_schedule
).result()
print(job_schedule)

In [None]:
compute_node_status(status= 'off')