<a href="https://colab.research.google.com/github/Chikuji/AzureML/blob/master/2020_09_22_05_OrquestrarMLcomPipilines.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Objetivos de aprendizado

- Crie um pipeline do Azure Machine Learning.
-Publique um pipeline do Azure Machine Learning.
-Agende um pipeline do Azure Machine Learning.

# Tipos de passo

- PythonScriptStep : executa um script Python especificado.
-EstimatorStep : executa um estimador.
-DataTransferStep : usa o Azure Data Factory para copiar dados entre armazenamentos de dados.
-DatabricksStep : executa um bloco de notas, script ou JAR compilado em um cluster de databricks.
-AdlaStep : executa um trabalho U-SQL no Azure Data Lake Analytics.

Definição de etapas em um pipeline

Para criar um pipeline, você deve primeiro definir cada etapa e, em seguida, criar um pipeline que inclua as etapas.

In [None]:
# Criando as etapas

from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config)

# Step to run an estimator
step2 = EstimatorStep(name = 'train model',
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster')


In [None]:
# Atribuindo as etapas a um pipiline

from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Construct the pipeline
train_pipeline = Pipeline(workspace = ws, steps = [step1,step2])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)

O objeto PipelineData

Para usar um objeto PipelineData para passar dados entre as etapas, você deve:

1. Defina um objeto PipelineData nomeado que faz referência a um local em um armazenamento de dados.
2. Especifique o objeto PipelineData como uma entrada ou saída para as etapas que o utilizam.
3. Passe o objeto PipelineData como um parâmetro de script nas etapas que executam scripts (e inclua o código nesses scripts para ler ou gravar dados)

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Get a dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# Define a PipelineData object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = PipelineData('prepped',  datastore=data_store)

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         # Specify dataset as initial input
                         inputs=[raw_ds.as_named_input('raw_data')],
                         # Specify PipelineData as output
                         outputs=[prepped_data],
                         # Also pass as data reference to script
                         arguments = ['--folder', prepped_data])

# Step to run an estimator
step2 = EstimatorStep(name = 'train model',
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster',
                      # Specify PipelineData as input
                      inputs=[prepped_data],
                      # Pass as data reference to estimator script
                      estimator_entry_script_arguments=['--folder', prepped_data])

Nos próprios scripts, você pode obter uma referência ao objeto PipelineData do argumento do script e usá-lo como uma pasta local.

In [None]:
# code in data_prep.py
from azureml.core import Run
import argparse
import os

# Get the experiment run context
run = Run.get_context()

# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

# Get PipelineData argument
parser = argparse.ArgumentParser()
parser.add_argument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']]

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)

# Gerenciando a reutilização da saída da etapa

Para controlar a reutilização de uma etapa individual, você pode definir o parâmetro allow_reuse na configuração da etapa, assim:

In [None]:
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         # Disable step reuse
                         allow_reuse = False)

Forçando todas as etapas a correr

In [None]:
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)

# Publicar um pipeline

In [None]:
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Como alternativa, você pode chamar o método de publicação em uma execução bem-sucedida do pipeline:

In [None]:
# Get the most recent run of the pipeline
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')

Após a publicação do pipeline, você pode exibi-lo no Azure Machine Learning Studio. Você também pode determinar o URI de seu ponto de extremidade desta forma:

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

# Usando um pipeline publicado

Por exemplo, o código Python a seguir faz uma solicitação REST para executar um pipeline e exibe o ID de execução retornado.

In [None]:
import requests

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline"})
run_id = response.json()["Id"]
print(run_id)