Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/pipeline-style-transfer/pipeline-style-transfer.png)

## Initialize Workspace

Initialize a workspace object from persisted configuration.

In [27]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.0.85


In [28]:
from azureml.core import Workspace, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: ignite
Azure region: eastus2
Subscription id: 15ae9cb6-95c1-483d-a0e3-b1a1a3b06324
Resource group: ignite


In [29]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.core.compute_target import ComputeTargetException
from azureml.core import Dataset
import pandas as pd
import os

In [30]:
attrition_dataset = Dataset.get_by_name(ws, name='attrition-big')
df = attrition_dataset.to_pandas_dataframe()

In [31]:
data = df.head(5)

# Test score.R

In [32]:
%cd scripts
from score import init, run
os.environ["AZUREML_MODEL_DIR"] = os.getcwd()
%cd ..

/mnt/batch/tasks/shared/LS_root/mounts/clusters/controlignite/code/Users/danielsc/basf/scripts
/mnt/batch/tasks/shared/LS_root/mounts/clusters/controlignite/code/Users/danielsc/basf


In [33]:
init()

/mnt/batch/tasks/shared/LS_root/mounts/clusters/controlignite/code/Users/danielsc/basf/scripts






In [34]:
data

Unnamed: 0,Age,Attrition,BusinessTravel,DailyRate,Department,DistanceFromHome,Education,EducationField,EmployeeCount,EmployeeNumber,...,RelationshipSatisfaction,StandardHours,StockOptionLevel,TotalWorkingYears,TrainingTimesLastYear,WorkLifeBalance,YearsAtCompany,YearsInCurrentRole,YearsSinceLastPromotion,YearsWithCurrManager
0,41,1,Travel_Rarely,1102,Sales,1,2,Life Sciences,1,1,...,1,80,0,8,0,1,6,4,0,5
1,49,0,Travel_Frequently,279,Research & Development,8,1,Life Sciences,1,2,...,4,80,1,10,3,3,10,7,1,7
2,37,1,Travel_Rarely,1373,Research & Development,2,2,Other,1,4,...,2,80,0,7,3,3,0,0,0,0
3,33,0,Travel_Frequently,1392,Research & Development,3,4,Life Sciences,1,5,...,3,80,0,8,3,3,8,7,3,0
4,27,0,Travel_Rarely,591,Research & Development,2,1,Medical,1,7,...,4,80,1,6,3,3,2,2,2,2


In [35]:
predictions = run(data)

In [36]:
predictions

['1', '0', '0', '0', '0']

# Create or use existing compute

In [14]:
# AmlCompute
cpu_cluster_name = "batch-scoring"
try:
    cpu_cluster = AmlCompute(ws, cpu_cluster_name)
    print("found existing cluster.")
except ComputeTargetException:
    print("creating new cluster")
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_v2",
                                                                    max_nodes = 4)

    # create the cluster
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, provisioning_config)
    cpu_cluster.wait_for_completion(show_output=True)
    

found existing cluster.


In [15]:

from azureml.core.model import Model

# register downloaded model
model = Model.register(model_path = "scripts/model.rds",
                       model_name = "AttritionR", # this is the name the model is registered as
                       tags = {'language': "R"},
                       workspace = ws)

Registering model AttritionR


In [37]:
scripts_folder = "scripts"
script_file = "score.py"
script_file_r = "score.R"

# peek at contents
with open(os.path.join(scripts_folder, script_file)) as inference_file:
    print(inference_file.read())
    
with open(os.path.join(scripts_folder, script_file_r)) as inference_file:
    print(inference_file.read())

# This is auto-generated python wrapper.
import rpy2.robjects as robjects
import os
import json
from rpy2.robjects import r, pandas2ri
pandas2ri.activate()

def init():
    global r_run
    #os.environ["AZUREML_MODEL_DIR"] = os.path.dirname(os.path.realpath(__file__))
    #print(os.path.dirname(os.path.realpath(__file__)))
    
    score_r_path = os.path.join(os.path.dirname(
      os.path.realpath(__file__)),
      "score.R")

    # handle path for windows os
    score_r_path = score_r_path.replace('\\', '/')
    robjects.r.source("{}".format(score_r_path))
    r_run = robjects.r['init']()

def run(input_data):
    dataR = r_run(input_data)
    return list(dataR)

#' Copyright(c) Microsoft Corporation.
#' Licensed under the MIT license.



init <- function()
{
  model_path <- Sys.getenv("AZUREML_MODEL_DIR")
  message(paste(list.files(path = model_path, recursive = TRUE), collapse=', '))
  
  model <- readRDS(file.path(model_path,"model.rds"))
  message("model is loaded")
  
  function

In [54]:
from azureml.core import Environment
env = Environment.from_conda_specification('RScoring', file_path='environment.yaml')


In [73]:
from azureml.pipeline.core import PipelineData

datastore = ws.get_default_datastore()
output_folder = PipelineData(name='batch_inferences', datastore=datastore).as_dataset().register('batch_output')


In [74]:
named_attrition = attrition_dataset.as_named_input('attrition')

In [75]:
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
                    source_directory=scripts_folder,
                    entry_script=script_file,  # the user script to run against each input
                    mini_batch_size='100KB',
                    error_threshold=3,
                    output_action='append_row',
                    environment=env,
                    compute_target=cpu_cluster, 
                    node_count=3,
                    run_invocation_timeout=600)

In [76]:
distributed_step = ParallelRunStep(
    name='parallel-attrition',
    inputs=[named_attrition],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    models=[model],
    arguments=[],
    allow_reuse=False
)

In [77]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_step])

pipeline_run = Experiment(ws, 'basf_parallel').submit(pipeline)

Created step parallel-attrition [8e1ccd61][4980fc35-9abf-495c-b08f-4543c0f9ad9b], (This step will run and generate new outputs)
Submitted PipelineRun 005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d
Link to Azure Machine Learning studio: https://ml.azure.com/experiments/basf_parallel/runs/005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d?wsid=/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourcegroups/ignite/workspaces/ignite


In [78]:
# this will output a table with link to the run details in azure portal
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
basf_parallel,005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d,azureml.PipelineRun,NotStarted,Link to Azure Machine Learning studio,Link to Documentation


In [79]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show() 

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

In [80]:
# Console logs
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d
Link to Portal: https://ml.azure.com/experiments/basf_parallel/runs/005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d?wsid=/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourcegroups/ignite/workspaces/ignite
PipelineRun Status: Running


StepRunId: 878a1ce3-08ca-4d62-9cb3-30bb7632b075
Link to Portal: https://ml.azure.com/experiments/basf_parallel/runs/878a1ce3-08ca-4d62-9cb3-30bb7632b075?wsid=/subscriptions/15ae9cb6-95c1-483d-a0e3-b1a1a3b06324/resourcegroups/ignite/workspaces/ignite
StepRun( parallel-attrition ) Status: NotStarted
StepRun( parallel-attrition ) Status: Queued
StepRun( parallel-attrition ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_f327004d872339e8c909c66f112ab742f97ffe01d1d2510efabe32da82d294d8_d.txt
2020-02-06T06:02:27Z Starting output-watcher...
Login Succeeded
Using default tag: latest
latest: Pulling from azureml/azureml_517b514f69792dda164c9897d3a71dc8
a1298f4ce990: Pulling fs layer
04a3282d9c4b:

INFO - Sent "{"id": 200, "minibatch": [200], "location": "attrition", "partition_size": 102400}" to queue.
INFO - Scheduled 200 mini batches in 10.021727626000029 seconds.
INFO - Scheduled 300 mini batches in 11.981899825000028 seconds.
INFO - Sent "{"id": 300, "minibatch": [300], "location": "attrition", "partition_size": 102400}" to queue.
INFO - TaskManager - Total number of items: -1.
INFO - Master - Task queue job completed: Scheduled 312 mini batches with -1 items. Provider init time: 0.24997708099999727, first task creation time: 6.422456700999987, total queue time: 12.193584773999987.
INFO - Progress update stopped for reasons: ['All 312 mini batches have been processed.'].
INFO - Sending stop signal. Reason: Progress monitor is stopping..
INFO - Master sent stop signal.
INFO - Added reason to /mnt/batch/tasks/shared/LS_root/jobs/ignite/azureml/878a1ce3-08ca-4d62-9cb3-30bb7632b075/mounts/workspaceblobstore/azureml/878a1ce3-08ca-4d62-9cb3-30bb7632b075/job/stop_signal.txt.
INFO -


Streaming azureml-logs/75_job_post-tvmps_f5f0dcf293681417ce01193f94de4f718c57ca567012422a22f9e899ce7d8169_d.txt

Streaming azureml-logs/75_job_post-tvmps_f327004d872339e8c909c66f112ab742f97ffe01d1d2510efabe32da82d294d8_d.txt

StepRun(parallel-attrition) Execution Summary
StepRun( parallel-attrition ) Status: Finished
{'runId': '878a1ce3-08ca-4d62-9cb3-30bb7632b075', 'target': 'batch-scoring', 'status': 'Completed', 'startTimeUtc': '2020-02-06T06:02:19.9899Z', 'endTimeUtc': '2020-02-06T06:06:46.792611Z', 'properties': {'azureml.runsource': 'azureml.StepRun', 'ContentSnapshotId': '27c5f962-5f32-42c7-aec2-76aba3779d61', 'StepType': 'PythonScriptStep', 'ComputeTargetType': 'AmlCompute', 'azureml.pipelinerunid': '005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d', '_azureml.ComputeTargetType': 'amlcompute', 'AzureML.DerivedImageName': 'azureml/azureml_517b514f69792dda164c9897d3a71dc8', 'ProcessInfoFile': 'azureml-logs/process_info.json', 'ProcessStatusFile': 'azureml-logs/process_status.json'}, 'inputD



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d', 'status': 'Completed', 'startTimeUtc': '2020-02-06T05:55:34.792577Z', 'endTimeUtc': '2020-02-06T06:06:53.826431Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{"aml_node_count":"3"}'}, 'inputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://ignite6981724196.blob.core.windows.net/azureml/ExperimentRun/dcid.005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=cEoqqalnxliOsJGfiPnUIHoemUyyBDv2KDH%2Bs0cN8X0%3D&st=2020-02-06T05%3A56%3A55Z&se=2020-02-06T14%3A06%3A55Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://ignite6981724196.blob.core.windows.net/azureml/ExperimentRun/dcid.005f9f97-1ae3-4ccc-b48c-08e0fbf1c95d/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=TUaVbAJkFXt40PsOfzeiK2Bi8nFqdUU5cS7YTZ%2FYJdg%3D&st=2020-02-06T05%3A56%3A55Z&se=2020-02-

'Finished'

In [63]:
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data("inferences")
prediction_output.download(local_path="attrition_scores")


1