# MLOps with CLI V2

- Creating Scripts for MLOps Pipeline


In [36]:
#import required libraries
import pandas as pd
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml.entities import Environment, BuildContext

In [37]:
subscription_id = '5da07161-3770-4a4b-aa43-418cbbb627cf'
resource_group = 'aml-workspace-rg'
workspace = 'aml-workspace'

In [38]:
import os

# Create a folder for the experiment files
script_folder = './src/prep'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'prep folder created')

script_folder = './src/train'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'train folder created')

script_folder = './src/deploy'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'deploy folder created')

script_folder = './src/pipeline'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'pipeline folder created')

./src/prep prep folder created
./src/train train folder created
./src/deploy deploy folder created
./src/pipeline pipeline folder created


In [39]:
import os
script_folder = os.path.join(os.getcwd(), "conda-yamls")
print(script_folder)
os.makedirs(script_folder, exist_ok=True)

/mnt/batch/tasks/shared/LS_root/mounts/clusters/memasanzsymtest/code/Users/memasanz/ML-Engineering-with-Azure-Machine-Learning-Service/Chapter8/conda-yamls


In [40]:
%%writefile $script_folder/job_env.yml
name: job_env
dependencies:
  # The python interpreter version.
  # Currently Azure ML only supports 3.5.2 and later.
- python=3.8.5
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow
  - azureml-mlflow==1.43.0.post1
  - azure-ai-ml
  - mltable

Writing /mnt/batch/tasks/shared/LS_root/mounts/clusters/memasanzsymtest/code/Users/memasanz/ML-Engineering-with-Azure-Machine-Learning-Service/Chapter8/conda-yamls/job_env.yml


# Data Preperation

## Connecting to your workspace

In [41]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    # This will open a browser page for
    credential = InteractiveBrowserCredential()

In [42]:
#connect to the workspace
try:
    ml_client = MLClient.from_config(credential=credential)
except Exception as ex:
    # NOTE: Update following workspace information if not correctly configure before
    client_config = {
        "subscription_id": subscription_id,
        "resource_group": resource_group,
        "workspace_name": workspace,
    }

    if client_config["subscription_id"].startswith("<"):
        print(
            "please update your <SUBSCRIPTION_ID> <RESOURCE_GROUP> <AML_WORKSPACE_NAME> in notebook cell"
        )
        raise ex
    else:  # write and reload from config file
        import json, os

        config_path = "../.azureml/config.json"
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        with open(config_path, "w") as fo:
            fo.write(json.dumps(client_config))
        ml_client = MLClient.from_config(credential=credential, path=config_path)
print(ml_client)

Found the config file in: /mnt/batch/tasks/shared/LS_root/mounts/clusters/memasanzsymtest/code/Users/memasanz/.azureml/config.json


MLClient(credential=<azure.identity._credentials.default.DefaultAzureCredential object at 0x7ff444f7f6a0>,
         subscription_id=5da07161-3770-4a4b-aa43-418cbbb627cf,
         resource_group_name=aml-workspace-rg,
         workspace_name=aml-workspace)


In [43]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

In [44]:
df= pd.read_csv('./data/titanic.csv')
print(df.shape)
print(df.columns)

try:
    registered_data_asset = ml_client.data.get(name='titanic_raw', version=1)
    print('data asset is registered')
except:
    print('register data asset')
    my_data = Data(
        path="./data/titanic.csv",
        type=AssetTypes.URI_FILE,
        description="Titanic CSV",
        name="titanic_raw",
        version="1",
    )

    ml_client.data.create_or_update(my_data)

(891, 12)
Index(['PassengerId', 'Survived', 'Pclass', 'Name', 'Sex', 'Age', 'SibSp',
       'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked'],
      dtype='object')
data asset is registered


## Preparing Scripts for Training Pipelines

## Prep Data.py

In [45]:
%%writefile ./src/prep/prep.py


import argparse
import pandas as pd

parser = argparse.ArgumentParser("prep")
parser.add_argument("--raw_data", type=str, help="Path to raw data")
parser.add_argument("--prep_data", type=str, help="Path of prepped data")
args = parser.parse_args()

print(args.raw_data)
print(args.prep_data)


df = pd.read_csv(args.raw_data)

df['Age'] = df.groupby(['Pclass', 'Sex'])['Age'].apply(lambda x: x.fillna(x.median()))
df['Sex']= df['Sex'].apply(lambda x: x[0] if pd.notnull(x) else 'X')
df['Loc']= df['Cabin'].apply(lambda x: x[0] if pd.notnull(x) else 'X')
df.drop(['Cabin', 'Ticket'], axis=1, inplace=True)
df['Embarked'] = df['Embarked'].fillna('S')
df.loc[:,'GroupSize'] = 1 + df['SibSp'] + df['Parch']

LABEL = 'Survived'
columns_to_keep = ['Pclass', 'Sex','Age', 'Fare', 'Embared', 'Deck', 'GroupSize']
columns_to_drop = ['Name','SibSp', 'Parch', 'Survived']
df_train = df
df = df_train.drop(['Name','SibSp', 'Parch', 'PassengerId'], axis=1)

df.to_csv(args.prep_data)

Writing ./src/prep/prep.py


## Create train.py file

In [46]:
%%writefile ./src/train/train.py
import os
import mlflow
import argparse
from mlflow.tracking import MlflowClient
import pandas as pd
import matplotlib.pyplot as plt
import mlflow
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from sklearn.metrics import roc_auc_score,roc_curve

import mlflow
import mlflow.sklearn
import shutil
from sklearn.metrics import accuracy_score, precision_score, recall_score


# define functions
def main(args):
    # enable auto logging
    current_run = mlflow.start_run()
    #mlflow.sklearn.autolog()

    # read in data
    df = pd.read_csv(args.titanic_csv)
    model, X_test = model_train('Survived', df, args.randomstate)
    
    model_file = os.path.join('outputs', 'titanic_model.pkl')
    joblib.dump(value=model, filename=model_file)
    
    shutil.copy('./outputs/titanic_model.pkl', os.path.join(args.model_output, "titanic_model.pkl"))
    
    
    X_test.to_csv(args.test_data)

def model_train(LABEL, df, randomstate):
    print('df.columns = ')
    print(df.columns)
    y_raw           = df[LABEL]
    columns_to_keep = ['Embarked', 'Loc', 'Sex','Pclass', 'Age', 'Fare', 'GroupSize']
    X_raw           = df[columns_to_keep]
    
    X_raw['Embarked'] = X_raw['Embarked'].astype(object)
    X_raw['Loc'] = X_raw['Loc'].astype(object)
    X_raw['Loc'] = X_raw['Sex'].astype(object)
    X_raw['Pclass'] = X_raw['Pclass'].astype(float)
    X_raw['Age'] = X_raw['Age'].astype(float)
    X_raw['Fare'] = X_raw['Fare'].astype(float)
    X_raw['GroupSize'] = X_raw['GroupSize'].astype(float)
    


    print(X_raw.columns)
     # Train test split
    X_train, X_test, y_train, y_test = train_test_split(X_raw, y_raw, test_size=0.2, random_state=randomstate)
    
    #use Logistic Regression estimator from scikit learn
    lg = LogisticRegression(penalty='l2', C=1.0, solver='liblinear')
    preprocessor = buildpreprocessorpipeline(X_train)
    
    #estimator instance
    clf = Pipeline(steps=[('preprocessor', preprocessor),
                               ('regressor', lg)], verbose=True)

    model = clf.fit(X_train, y_train)
    
    print('type of X_test = ' + str(type(X_test)))
          
    y_pred = model.predict(X_test)
    
    print('*****X_test************')
    print(X_test)
    
    metrics = mlflow.sklearn.eval_and_log_metrics(model, X_test, y_test, prefix="test_")
    
    #get the active run.
    run = mlflow.active_run()
    print("Active run_id: {}".format(run.info.run_id))
    MlflowClient().log_metric(run.info.run_id, "metric", 0.22)

    
    return model, X_test

    mlflow.end_run()


def buildpreprocessorpipeline(X_raw):

    categorical_features = X_raw.select_dtypes(include=['object', 'bool']).columns
    numeric_features = X_raw.select_dtypes(include=['float','int64']).columns

    categorical_transformer = Pipeline(steps=[('onehotencoder', 
                                               OneHotEncoder(categories='auto', sparse=False, handle_unknown='ignore'))])


    numeric_transformer1 = Pipeline(steps=[('scaler1', SimpleImputer(missing_values=np.nan, strategy = 'mean'))])
    

    preprocessor = ColumnTransformer(
        transformers=[
            ('numeric1', numeric_transformer1, numeric_features),
            ('categorical', categorical_transformer, categorical_features)], remainder='drop')
    
    return preprocessor



def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("---training_data", type=str)
    parser.add_argument("---randomstate", type=int, default=42)
    parser.add_argument("--test_data", type=str,)
    parser.add_argument("--model_output", type=str, help="Path of output model")

    # parse args
    args = parser.parse_args()
    print(args)
    # return args
    return args


# run script
if __name__ == "__main__":
    # parse args
    args = parse_args()

    # run main function
    main(args)

Writing ./src/train/train.py


In [47]:
%%writefile ./src/pipeline/train_and_eval_pipeline.yml

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: Training_and_eval_pipeline
experiment_name: Training_and_eval_pipeline
compute: azureml:cpu-cluster

jobs:
  prep_job:
    type: command
    code: ../prep
    command: >-
      python prep.py 
      --raw_data ${{inputs.raw_data}}
      --prep_data ${{outputs.prep_data}}
    inputs:
      raw_data:
        type: uri_folder
        path: azureml:titanic_raw:1
        mode: ro_mount
    outputs:
      prep_data:
        type: uri_file
        path: azureml://datastores/workspaceblobstore/paths/titanic_prep_data/titanic_prepped.csv
        mode: rw_mount
    environment:
      conda_file: ../conda-yamls/job_env.yml
      image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest
    description: Feature Engineering
        
  train_job:
    type: command
    inputs:
      training_data: ${{parent.jobs.prep_job.outputs.prep_data}}
      randomstate: 0
    outputs:
      model_output: 
      test_data: 
        mode: upload
    code: ../train
    environment:
      conda_file: ../conda-yamls/job_env.yml
      image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest
    compute: azureml:cpu-cluster
    command: >-
      python train.py 
      --training_data ${{inputs.training_data}} 
      --randomstate ${{inputs.randomstate}} 
      --test_data ${{outputs.test_data}} 
      --model_output ${{outputs.model_output}}

Writing ./src/pipeline/train_and_eval_pipeline.yml


In [48]:
%%writefile ./src/pipeline/AzureDevOpsPipeline.yml

resources:
  containers:
  - container: mlops
    image: mcr.microsoft.com/mlops/python:latest

pr: none
trigger:
  branches:
    include:
    - main

variables:
- group: xmmdevops-variable-group-non-prod
- group: xmmdevops-variable-group-qa

pool:
  vmImage: ubuntu-latest

stages:
- stage: 'RunPipline'
  variables:
  - group: xmmdevops-variable-group-qa
  displayName: 'Register Model QA'
  jobs:
  - job: "RegisterQA"
    steps:
      - task: UsePythonVersion@0
        inputs:
          versionSpec: '3.8'
          addToPath: true
      - script: |
          python -m pip install --upgrade pip
          pip install jupyter
          pip install nbconvert
          pip install --upgrade azureml-core
          pip install --upgrade azureml-sdk[automl]
            
      - task: AzureCLI@1
        env:
          tenantId: $(tenantId)
          servicePrincipalId: $(servicePrincipalId)
          servicePrincipalPassword: $(servicePrincipalPassword)
          wsName: $(wsName)
          subscriptionId: $(subscriptionId)
          resourceGroup: $(resourceGroup)
          location: $(location)
        inputs:
          azureSubscription: dev-aml-workspace-connection
          scriptLocation: inlineScript
          workingDirectory: '$(Build.SourcesDirectory)'
          inlineScript: |
            echo "files:"
            ls
            az version
            az extension add -n ml -y
            az configure --defaults group=$(resourceGroup) workspace=$(wsName) location=$(location)
            az ml job create -s -f src/pipeline/AzureDevOpsPipeline.yml
        displayName: 'Training Pipeline'
        

Writing ./src/pipeline/AzureDevOpsPipeline.yml


In [None]:
## Create eval.py file

In [None]:
# # Creating a unique endpoint name with current datetime to avoid conflicts
# import datetime

# online_endpoint_name = "endpoint-" + datetime.datetime.now().strftime("%m%d%H%M%f")

# # create an online endpoint
# endpoint = ManagedOnlineEndpoint(
#     name=online_endpoint_name,
#     description="titanic online endpoint for mlflow model",
#     auth_mode="key",
#     tags={"oneline endpoint": "titanic"},
# )

In [None]:
# %%writefile $script_folder/endpoint.yml
# $schema: https://azuremlschemas.azureedge.net/latest/managedOnlineEndpoint.schema.json
# name: titanic-managed-online-endpoint
# description: "CLI V2 titanic online endpoint for mlflow model"
# auth_mode: key
# tags : {"CLIV2": "titanic"}

In [None]:
# %%writefile $script_folder/deployment.yml

# $schema: https://azuremlschemas.azureedge.net/latest/managedOnlineDeployment.schema.json
# name: blue
# endpoint_name: titanic-managed-online-endpoint
# model: azureml:chapter6_titanic_model:1
# code_configuration:
#   code: 
#     local_path: .
#   scoring_script: score.py
# environment: azureml:job_base_env:1
# instance_type: Standard_F2s_v2
# instance_count: 1

In [None]:
# %%writefile $script_folder/score.py

# import os 
# import json
# import joblib
# from pandas import json_normalize
# import pandas as pd

# # Called when the service is loaded
# def init():
#     global model
#     # Get the path to the deployed model file and load it
#     model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'titanic_model.pkl')
#     model = joblib.load(model_path)

# # Called when a request is received
# def run(raw_data):
#     dict= json.loads(raw_data)
#     df = json_normalize(dict['raw_data']) 
#     y_pred = model.predict(df)
#     print(type(y_pred))
    
#     result = {"result": y_pred.tolist()}
#     return result