# 创建管道

可以通过使用 Azure ML SDK 来运行基于脚本的试验，从而执行引入数据、训练模型和注册模型各自所需的各个步骤。但是，在企业环境中，通常根据用户需求或按计划在自动生成过程中将生成机器学习解决方案所需的各个步骤序列封装到可在一个或多个计算目标上运行的管道中。

在该笔记本中，你将把所有这些元素组合在一起，以创建一个简单的管道，该管道可以预处理数据、训练和注册模型。

## 连接到工作区

首先，请连接到你的工作区。

> **备注**：如果尚未与 Azure 订阅建立经过身份验证的会话，则系统将提示你通过执行以下操作进行身份验证：单击链接，输入验证码，然后登录到 Azure。

In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 准备数据

在该管道中，你将使用包含糖尿病患者详细信息的数据集。运行以下单元格以创建此数据集（如果之前创建了此数据集，则代码将查找现有版本）

In [None]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## 创建管道步骤的脚本

管道由一个或多个步骤组成，这些步骤可以是 Python 脚本，也可以是专用步骤（例如将数据从一个位置复制到另一位置的数据传输步骤）。每个步骤都可以在自己的计算上下文中运行。在本练习中，你将构建一个简单的管道，其中包含两个 Python 脚本步骤：一个用于预处理一些训练数据，另一个用于使用预处理的数据来训练和注册模型。

首先，我们为将在管道步骤中使用的脚本文件创建一个文件夹。

In [None]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

现在，让我们创建第一个脚本，该脚本将从糖尿病数据集中读取数据，并进行一些简单的预处理，以删除所有缺少数据的行，并对数值特征进行规范化，使其具有相似的范围。

该脚本包含名为 **--prepped-data** 的参数，该参数可引用其中应保存结果数据的文件夹。

In [None]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

现在即可为第二个步骤创建脚本，该脚本可训练模型。该脚本包含名为 **--training-data** 的参数，该参数可引用上一步在其中保存准备好的数据的位置。

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

## 准备管道步骤的计算环境

在本练习中，两个步骤都将使用相同计算，但务必要意识到每个步骤都是独立运行的；因此，可以根据需要为每个步骤指定不同的计算上下文。

首先，获取你在上一个实验室中创建的计算目标（如果不存在，将创建它）。

> **重要提示**：在运行以下代码之前，请先将代码中的 *your-compute-cluster* 更改为你的计算群集的名称！群集名称必须是长度在 2 到 16 个字符之间的全局唯一名称。有效字符是字母、数字和 - 字符。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **备注**：计算实例和群集是基于标准的 Azure 虚拟机映像。对于本练习，建议使用 *Standard_DS11_v2* 映像来实现成本和性能的最佳平衡。如果你的订阅配额不包含此映像，请选择其他映像；但请记住，较大的映像可能会产生更高的成本，但较小的映像可能不够完成任务。或者，可以请 Azure 管理员扩展你的配额。

计算将需要安装有必要数据包依赖项的 Python 环境。

In [None]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

现在，有了 Conda 配置文件，你可以创建一个环境并在管道的运行配置中使用它。

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## 创建和运行管道

现在即可创建并运行管道。

首先，需要定义管道步骤以及需在步骤间传递的所有数据引用。这种情况下，第一个步骤必须将准备好的数据写入到可从第二个步骤中读取的文件夹。由于这些步骤将在远程计算上运行（实际上，每个步骤都可以在不同的计算上运行），因此必须将文件夹路径作为数据引用传递到工作区的数据存储中的某个位置。**OutputFileDatasetConfig** 对象是特殊的数据引用，用于可在管道步骤之间传递的临时存储位置，因此你将创建该对象并用作第一步骤的输出和第二步骤的输入。请注意，你需要将其作为脚本参数传递，以便代码可以访问数据引用所引用的数据存储位置。

In [None]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

现在即可根据已定义的步骤构建管道，然后以试验形式运行。

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

当管道试验运行时，小组件中会显示它的图形表示形式。请注意页面右上方的内核指示器，当它从 **&#9899;** 变为 **&#9711;** 时，表明代码已运行完毕。你还可以在 [Azure 机器学习工作室](https://ml.azure.com)的“**试验**”页面监视管道运行。

管道完成后，你可以查看其子运行记录的指标。

In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

假设管道成功，应使用训练上下文标签注册新模型，表示其已在管道中训练过。运行以下代码对此进行验证。

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## 发布管道

创建并测试管道后，可以将其发布为 REST 服务。

In [None]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

请注意，发布的管道具有一个终结点，可以在 [Azure 机器学习工作室](https://ml.azure.com)的“**终结点**”页（“**管道终结点**”选项卡上）中进行查看。你还可以查找其 URI，其形式为发布管道对象的属性：

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

## 调用管道终结点

要使用该终结点，客户端应用程序需要通过 HTTP 进行 REST 调用。此请求必须经过身份验证，因此需要授权标头。实际的应用程序需要使用服务主体进行身份验证，但为了测试这一点，我们将使用当前连接到 Azure 工作区的授权标头（可使用以下代码获取该标头）：

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

现在可以调用 REST 接口。管道异步运行，因此我们将获得一个标识符，可用于跟踪运行中的管道试验：

In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

由于你具有运行 ID，因此可以使用它来等待运行完成。

> **备注**：由于每个步骤都已配置为允许输出重用，因此管道应该会迅速完成。这主要是为了方便并节省课程时间。实际上，你可能希望每次都运行第一步，以防数据已发生更改，并且希望仅当第一步输出发生更改时才触发后续步骤。

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

## 计划管道

假设糖尿病患者的诊所每周收集新数据，并将其添加到数据集。可以每周运行管道以使用新数据重训练模型。

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

可以检索工作区中定义的计划，如下所示：

In [None]:
schedules = Schedule.list(ws)
schedules

可以检查最新运行，如下所示：

In [None]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

这是一个简单示例，旨在演示原理。实际上，可以在管道步骤中构建更为复杂的逻辑 - 例如，根据用于计算 AUC 或准确性等性能指标的一些测试数据来评估模型、将指标与任何先前注册版本的模型中的指标进行对比，并仅注册性能更好的新模型。

可以使用[适用于 Azure DevOps 的 Azure 机器学习扩展](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml)将 Azure ML 管道与 Azure DevOps 管道结合（是的，同名令人困惑！），还可以将模型重训练集成到持续集成/连续部署 (CI/CD) 流程。例如，可以使用 Azure DevOps 生成管道来触发可训练和注册模型的 Azure ML 管道，并且在注册模型时，还可触发 Azure Devops 发布管道，该管道可将模型部署为 Web 服务，同时部署使用该模型的应用程序或服务。