# 创建管道

在先前的实验中，你使用 Azure 机器学习 SDK 了解了整个模型训练流程，该流程涉及从访问数据到运行训练试验，再到注册机器学习模型。目前为止，你已执行了以交互方式创建机器学习解决方案所需的各个步骤。在本实验中，你将使用*管道*了解这些步骤的自动化。

## 连接到工作区

你首先需要使用 Azure ML SDK 连接到工作区。

> **注意**：如果 Azure 订阅的身份验证会话在你完成上一练习后已过期，系统将提示你重新进行身份验证。

In [None]:
import azureml.core
from azureml.core import Workspace

# 从保存的配置文件加载工作区
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 准备试验数据

在本实验室中，你将使用包含糖尿病患者详细信息的数据集。运行以下单元格以创建此数据集（如果在上一个实验室中创建了此数据集，则代码会找到现有版本）

In [None]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## 创建管道步骤的脚本

管道由一个或多个*步骤*组成，这些步骤可以是 Python 脚本，也可以是专用步骤（例如自动机器学习训练估算器或将数据从一个位置复制到另一位置的数据传输步骤）。每个步骤都可以在自己的计算上下文中运行。

在本练习中，你将构建简单管道，其中包含估算器步骤（用于训练模型）和 Python 脚本步骤（用于注册训练后模型）。

In [None]:
import os
# 为管道步骤文件创建文件夹
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

现在即可为第一步骤创建脚本，该脚本可训练模型。该脚本包含名为 **output_folder** 的参数，该参数可引用其中应保存训练后模型的文件夹。

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# 导入库
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# 获取参数
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# 获取试验运行上下文
run = Run.get_context()

# 加载糖尿病数据（作为输入数据集传递）
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# 分隔特征和标签
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# 将数据拆分为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# 训练决策树模型
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# 计算精度
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# 计算 AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# 绘制 ROC 曲线
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# 绘制 50% 对角线
plt.plot([0, 1], [0, 1], 'k--')
# 绘制模型实现的 FPR 和 TPnplt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# 保存训练的模型
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

管道第二步骤的脚本将从模型的原保存位置加载模型，然后在工作区中注册它。它包含一个 **model_folder** 参数，该参数包含指向模型原保存位置的路径。

In [None]:
%%writefile $experiment_folder/register_diabetes.py
# 导入库
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# 获取参数
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# 获取试验运行上下文
run = Run.get_context()

# 加载模型
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

## 准备管道步骤的计算环境

在本练习中，两个步骤都将使用相同计算，但务必要意识到每个步骤都是独立运行的；因此，可以根据需要为每个步骤指定不同的计算上下文。

首先，获取你在上一个实验室中创建的计算目标（如果不存在，将创建它）。

> **重要事项**：请在运行之前，在以下代码中将 *your-compute-cluster* 更改为你的计算群集的名称！

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

# 验证群集是否存在
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS2_V2', 
                                                           max_nodes=2)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

计算将需要安装有必要数据包依赖项的 Python 环境，因此需要创建运行配置。

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# 创建用于试验的 Python 环境
diabetes_env = Environment("diabetes-experiment-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# 创建一组包依赖项
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib', 'pandas'],
                                             pip_packages=['azureml-sdk','pyarrow'])

# 将依赖项添加到环境
diabetes_env.python.conda_dependencies = diabetes_packages

# 注册环境（以防上一实验未完成）
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-experiment-env')

# 为管道新建 runconfig 对象
pipeline_run_config = RunConfiguration()

# 使用上面创建的计算。 
pipeline_run_config.target = pipeline_cluster

# 将环境分配给运行配置
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## 创建并运行管道

现在即可创建并运行管道。

首先，需要定义管道步骤以及需在步骤间传递的所有数据引用。这种情况下，第一步骤必须将模型写入到可从第二步中读取的文件夹。由于这些步骤将在远程计算上运行（实际上，每个步骤都可以在不同的计算上运行），因此必须将文件夹路径作为数据引用传递到工作区的数据存储中。**PipelineData** 对象是特殊的数据引用，用于可在管道步骤之间传递的临时存储位置，因此你将创建该对象并用作第一步骤的输出和第二步骤的输入。请注意，你还需要将其作为脚本参数传递，以便代码可以访问数据引用所引用的数据存储位置。

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# 获取训练数据集
diabetes_ds = ws.datasets.get("diabetes dataset")

# 为模型文件夹创建 PipelineData（临时数据引用）
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# 步骤 2，运行模型注册脚本
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

现在即可根据已定义的步骤构建管道，然后将其作为试验来运行。

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# 构建管道
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# 创建试验并运行管道
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

上方的小组件可显示管道在运行时的详细信息。你还可以在 [Azure 机器学习工作室](https://ml.azure.com)的 **“试验”** 页监视管道运行。

管道完成后，应使用*训练上下文*标签注册新模型，表示其已在管道中训练过。运行以下代码对此进行验证。

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

这是一个简单示例，旨在演示原理。实际上，可以在管道步骤中构建更为复杂的逻辑 - 例如，根据用于计算 AUC 或准确性等性能指标的一些测试数据来评估模型、将指标与任何先前注册版本的模型中的指标进行对比，并仅注册性能更好的新模型。

可以使用[适用于 Azure DevOps 的 Azure 机器学习扩展](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml)将 Azure ML 管道与 Azure DevOps 管道结合（是的，同名*令人*困惑！），还可以将模型重训练集成到*持续集成/连续部署 (CI/CD)* 流程。例如，可以使用 Azure DevOps *生成*管道来触发可训练和注册模型的 Azure ML 管道，并且在注册模型时，还可触发 Azure Devops *发布*管道，该管道可将模型部署为 Web 服务，同时部署使用该模型的应用程序或服务。