# Azure Machine Learning Pipeline

 In this notebook, we will be creating a Azure Machine Learning Pipeline for the complete stage of machine learning lifecycle:
 
 1. Data Engineering
 2. Model Training
 3. Model Management
 4. Model Deployment (to same environment)
 
![Data Engineering](./images/00-Pipeline.jpg)

## Data Engineering 

**Input** : Raw Data 

**Output** : Registered Data Set (ProductReview)

In [1]:
import os
if not os.path.exists('data_engineering'):
    os.makedirs('data_engineering')

In [2]:
%%writefile data_engineering/data_engineering.py
import os
import json
import gzip
import pandas as pd
from urllib.request import urlopen
import requests

from azureml.core.run import Run
from azureml.core import Dataset, Datastore, Workspace
from azureml.data.datapath import DataPath

# get run context
run = Run.get_context()

# Download data from source
url = "http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Software_5.json.gz"
response = requests.get(url, stream=True)

with open("Software_5.json.gz", "wb") as handle:
    for data in response.iter_content():
        handle.write(data)

### load the meta data
data = []
with gzip.open('Software_5.json.gz') as f:
    for l in f:
        data.append(json.loads(l.strip()))
    
# total length of list, this number equals total number of products
print(len(data))

# first row of the list
print(data[0])
df = pd.DataFrame.from_dict(data)

### remove rows with unformatted title (i.e. some 'title' may still contain html style content)
df3 = df.fillna('')
df3.iloc[2]

# register dataset
workspace = run.experiment.workspace
default_datastore = Datastore.get_default(workspace)

ds_name = 'ProductReview'
data_path = DataPath(datastore=default_datastore, path_on_datastore='product_review')

ds = Dataset.Tabular.register_pandas_dataframe(df3, 
                                    default_datastore, 
                                    ds_name, 
                                    description=None, 
                                    tags=None, 
                                    show_progress=True)


Overwriting data_engineering/data_engineering.py


### Model Training

Use existing train_2.py from previous demo.

In [3]:
import os
if not os.path.exists('train'):
    os.makedirs('train')

In [4]:
%%writefile train/train_2.py
# General libraries.
import numpy as np
from sklearn.naive_bayes import MultinomialNB
from sklearn import metrics
from sklearn.metrics import classification_report,plot_confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from azureml.core.run import Run
from sklearn.model_selection import GridSearchCV
from azureml.core import Workspace, Dataset
import matplotlib.pyplot as plt
from joblib import dump
from sklearn.feature_extraction.text import CountVectorizer

run = Run.get_context()

# Get workspace from run context
workspace = run.experiment.workspace

# Load Data
dataset = Dataset.get_by_name(workspace, name='ProductReview')
data = dataset.to_pandas_dataframe()[['overall', 'reviewText']]

# Prepare X & Y
Y = data.pop('overall').to_numpy()
X = data.pop('reviewText').to_numpy()
train_x, test_x, train_y, test_y = train_test_split(X,Y, test_size = 0.1, random_state=1)


vec = CountVectorizer()
fitted_train_data = vec.fit_transform(train_x)
fitted_test_data = vec.transform(test_x)

model = MultinomialNB()
params = {'alpha': [1.0e-5, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0]}

clf = GridSearchCV(model, params, scoring = "f1_macro", verbose=0, cv = 5)
clf_result = clf.fit(fitted_train_data, train_y)
run.log("Best alpha",clf_result.best_estimator_.alpha)
pred = clf.predict(fitted_test_data)
run.log("F1", metrics.f1_score(test_y, pred, average='weighted'))

plot_confusion_matrix(clf, fitted_test_data, test_y)  
plt.savefig('confusion_matrix.png')
run.log_image(name='Confusion-Matrix', path='./confusion_matrix.png')

# Save trained model
dump(vec, './vec.pkl')
dump(clf, './mnb.pkl')
run.upload_file(name='vec.pkl', path_or_stream='./vec.pkl')
run.upload_file(name='mnb.pkl', path_or_stream='./mnb.pkl')

Overwriting train/train_2.py


### Model Selection

In this step, we will use a predefined metrics **F1**. We will list all today's runs and select the highest F1 score model, which will be registered in Model Registry and prepare for deployment.

In [5]:
import os
if not os.path.exists('model_selection'):
    os.makedirs('model_selection')

In [6]:
%%writefile model_selection/model_select.py
import sklearn
from datetime import datetime, date
from azureml.core.run import Run
# from azureml.core import Experiment
# from azureml.core.model import Model

# get run context
run = Run.get_context()
workspace = run.experiment.workspace

# Get Experiment and runs for model select
# In this step, we will use F1
exp = run.experiment # Experiment.list(workspace, experiment_name='MLOps-Workshop')

today = date.today()

select_run = None
F1 = 0
for r in Run.list(exp, type="azureml.StepRun", include_children=True):
    run_starttime = datetime.strptime(r.get_details()['startTimeUtc'][:10], '%Y-%m-%d').date()
    if run_starttime==today and 'F1' in r.get_metrics().keys() and F1<r.get_metrics()['F1']:
        F1=r.get_metrics()['F1']
        select_run = r
    
if select_run != None:
    # Load Data
    mnb_model = select_run.register_model("ProductReview-NaiveBayes",
                            model_path="./mnb.pkl",
                            )

    vector    = select_run.register_model("ProductReview-CountVector", 
                            model_path="./vec.pkl",
                            )

Overwriting model_selection/model_select.py


## Prepare for Model Package

In [7]:
import os
if not os.path.exists('model_deploy'):
    os.makedirs('model_deploy')

In [8]:
%%writefile model_deploy/score.py
import json, os, joblib
from azureml.core.model import Model

def init(): 
  global vec, clf
  print(Model.get_model_path('ProductReview-NaiveBayes'))
  vec = joblib.load(Model.get_model_path('ProductReview-CountVector'))
  clf = joblib.load(Model.get_model_path('ProductReview-NaiveBayes'))

def run(data): 
  input_data = json.loads(data)['data'] 
  fitted_data = vec.transform(input_data)
  pred = clf.predict(fitted_data)
  return json.dumps(pred.tolist())


Overwriting model_deploy/score.py


In [9]:
%%writefile model_deploy/deploy.py
from azureml.core.model import InferenceConfig, Model
from azureml.core import Environment
from azureml.core.run import Run
from azureml.core.webservice import AciWebservice

# get run context
run = Run.get_context()
workspace = run.experiment.workspace

service_name = 'product-review-service'
env = Environment.get(workspace=workspace, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu")

inference_config = InferenceConfig(entry_script='score.py', 
                            source_directory='.',
                            environment=env)

deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)

nb_model = Model(workspace, 'ProductReview-NaiveBayes')
vectorizor = Model(workspace, 'ProductReview-CountVector')

service = Model.deploy(
    workspace,
    name = service_name,
    models=[nb_model, vectorizor],
    inference_config= inference_config,
    deployment_config= deployment_config,
    overwrite=True,
)
service.wait_for_deployment(show_output=True)

Overwriting model_deploy/deploy.py


In [10]:
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline, PipelineData
import azureml.core
from azureml.core import Workspace, Environment, Experiment
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
import os

workspace = Workspace.from_config()
# Get ComputeTarget
aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(workspace, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(workspace, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")

env = Environment.get(workspace=workspace, name="AzureML-sklearn-0.24-ubuntu18.04-py37-cpu")
# create a new runconfig object
runconfig = RunConfiguration()
runconfig.environment = env


found existing compute target.
Azure Machine Learning Compute attached


In [11]:
dataprep_step = PythonScriptStep( name="prep_data", 
                                script_name="data_engineering.py", 
	                            source_directory="data_engineering", 
                                compute_target=aml_compute_target, 
                                runconfig=runconfig,
                                allow_reuse=False                                
	                            )

train_step    = PythonScriptStep( name="train", 
                                script_name="train_2.py", 
	                            source_directory="train", 
                                compute_target=aml_compute_target, 
                                runconfig=runconfig,
                                allow_reuse=False
	                            )
#train_step.run_after(dataprep_step)

select_step   = PythonScriptStep( name="select_model", 
                                script_name="model_select.py", 
	                            source_directory="model_selection", 
                                compute_target=aml_compute_target, 
                                runconfig=runconfig,
                                allow_reuse=False
	                            )
#select_step.run_after(train_step)

deploy_step   = PythonScriptStep( name="deploy_model", 
                                script_name="deploy.py", 
	                            source_directory="model_deploy", 
                                compute_target=aml_compute_target, 
                                runconfig=runconfig,
                                allow_reuse=False
	                            )
#deploy_step.run_after(select_step)

In [12]:
from azureml.pipeline.core import StepSequence
   
experiment_name = 'MLOps-Workshop'

step_sequence = StepSequence(steps=[dataprep_step, train_step, select_step, deploy_step])
pipeline = Pipeline(workspace=workspace, steps=step_sequence)
pipeline_run = Experiment(workspace, experiment_name).submit(pipeline)

print("Pipeline is submitted for execution")

Class KubernetesCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Created step prep_data [88fdee95][4c0b75f6-7a45-4b70-b9a7-b7c602120b97], (This step will run and generate new outputs)
Created step train [b09b8f1b][3d2f614f-08cb-4d84-90e4-7d4da556831f], (This step will run and generate new outputs)
Created step select_model [edfb6c9e][73f707ac-9a6e-4bb5-be0b-ea1aa6a1f607], (This step will run and generate new outputs)
Created step deploy_model [2583151a][91e3aa2d-3e59-412d-8653-e0f55cdc4558], (This step will run and generate new outputs)
Submitted PipelineRun 858cb501-32b2-4d8c-897f-7e87afa85028
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/858cb501-32b2-4d8c-897f-7e87afa85028?wsid=/subscriptions/2fadeb06-9775-43ec-a256-ae5922c67d60/resourcegroups/mlops/workspaces/datamlops&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
Pipeline is submitted for execution


## Test Endpoint

In [19]:
import requests, json

# replace uri below with your service endpoint
uri = 'http://a9f21b4b-7143-4c8f-8578-58fb141f3a85.westeurope.azurecontainer.io/score'

headers = {"Content-Type": "application/json"}
comments = """
"I've been using Dreamweaver (and it's predecessor Macromedia's UltraDev) for many years.  
For someone who is an experienced web designer, this course is a high-level review of the CS5 version of Dreamweaver,
 but it doesn't go into a great enough level of detail to find it very useful.\n\nOn the other hand, 
 this is a great tool for someone who is a relative novice at web design.  
 It starts off with a basic overview of HTML and continues through the concepts necessary to build a modern web site.  
 Someone who goes through this course should exit with enough knowledge to create something that does what 
 you want it do do...within reason.  Don't expect to go off and build an entire e-commerce system with only this class 
 under your belt.\n\nIt's important to note that there's a long gap from site design to actual implementation.  
 This course teaches you how to implement a design.  The user interface and overall user experience is a different 
 subject that isn't covered here...it's possible to do a great implementation of an absolutely abysmal design.  
 I speak from experience.  :)\n\nAs I said above, if you're a novice, a relative newcomer or just an experienced web 
 designer who wants a refresher course, this is a good way to do it."
"""
sample_input = json.dumps({
    'data': [comments]
})
response = requests.post(uri, data=sample_input, headers=headers)
print(response.json())

[4.0]


## Publish Pipeline

In [14]:
published_pipeline = pipeline.publish(
     name="MLOps-Workshop-Pipeline",
     description="Published Pipeline for MLOps Workshop")


In [15]:
published_pipeline.id

'00288b87-9585-4a50-9c9b-a3eaae47f01a'

## Schedule Pipeline runs

https://docs.microsoft.com/en-us/azure/machine-learning/how-to-trigger-published-pipeline


In [16]:
#from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

## create time-based pipeline
## Frequency can be Minute / Hour / Day / Week / Month
#recurrence = ScheduleRecurrence(frequency="Month", interval=1)
#recurring_schedule = Schedule.create(workspace, name="MonthlySchedule", 
#                            description="Based on time",
#                            pipeline_id=published_pipeline.id, 
#                            experiment_name=experiment_name, 
#                            recurrence=recurrence)

In [17]:
#recurring_schedule


## Pipeline Schedule Management

- Enable / Disable

enable(wait_for_provisioning=False, wait_timeout=3600)

disable(wait_for_provisioning=False, wait_timeout=3600)

- Get (Set the schedule to 'Active' and available to run)

get(workspace, id, _workflow_provider=None, _service_endpoint=None)

- List ( Get all schedules in the current workspace)

list(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)

https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-setup-schedule-for-a-published-pipeline.ipynb


In [18]:
#recurring_schedule.disable(wait_for_provisioning=True)