# 创建批处理推理服务

假设一个健康诊所收集患者的全天测量值，将每位患者的详细信息保存在单独的文件中。然后在夜间，糖尿病预测模型可用于批量处理一整天的患者数据，生成第二天早上需要的预测，这样诊所就可以跟踪预测有患糖尿病风险的患者。借助 Azure 机器学习，可以通过创建批处理推理管道来做到这一点，你将在本练习中完成此操作。

## 连接到工作区

首先，请连接到你的工作区。

> **备注**：如果尚未与 Azure 订阅建立经过身份验证的会话，则系统将提示你通过执行以下操作进行身份验证：单击链接，输入验证码，然后登录到 Azure。

In [None]:
import azureml.core
from azureml.core import Workspace

# 从保存的配置文件加载工作区
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 训练和注册模型

现在我们来训练并注册模型以部署至分支推理管道中。

In [None]:
from azureml.core import Experiment
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

# 在工作区中创建 Azure ML 试验
experiment = Experiment(workspace=ws, name='mslearn-train-diabetes')
run = experiment.start_logging()
print("Starting experiment:", experiment.name)

# 加载糖尿病数据集
print("Loading Data...")
diabetes = pd.read_csv('data/diabetes.csv')

# 分隔特征和标签
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# 将数据拆分为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# 训练决策树模型
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# 计算精度
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# 计算 AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# 保存训练的模型
model_file = 'diabetes_model.pkl'
joblib.dump(value=model, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

# 完成运行
run.complete()

# 注册模型
run.register_model(model_path='outputs/diabetes_model.pkl', model_name='diabetes_model',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

## 生成和上传批处理数据

由于我们实际上没有人员齐备的诊所，无法获取用于此练习的患者新数据，因此你需要从糖尿病 CSV 文件生成随机样本，将这些数据上传至 Azure 机器学习工作区的数据存储中，并为其注册一个数据集。

In [None]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# 设置默认数据存储
ws.set_default_datastore('workspaceblobstore')
default_ds = ws.get_default_datastore()

# 枚举所有数据存储，指出默认项
for ds_name in ws.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)

# 加载糖尿病数据
diabetes = pd.read_csv('data/diabetes2.csv')
# 获取特征列（而非糖尿病标签）的 100 个项的样本
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# 创建文件夹
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# 将每个样本保存为单独的文件
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# 将文件上传到默认数据存储
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# 为输入数据注册数据集
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='batch-data',
                                             description='batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

## 创建计算

我们需要管道的计算上下文，因此我们将使用以下代码指定一个 Azure 机器学习计算群集（如果不存在，则会创建该群集）。

> **重要事项**：在运行以下代码之前，请先将代码中的 your-compute-cluster **更改为你的计算群集的名称！群集名称必须是长度在 2 到 16 个字符之间的全局唯一名称。有效字符是字母、数字和 - 字符。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

## 创建用于批处理推理的管道

现在，我们可以开始定义将用于批量推理的管道。我们的管道将需要 Python 代码来执行批量推理，因此我们来创建一个可存储该管道使用的所有文件的文件夹：

In [None]:
import os
# 为试验文件创建文件夹
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

现在，我们将创建一个 Python 脚本来完成实际工作，并将其保存在管道文件夹中：

In [None]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

接下来，我们将定义一个包括脚本所需的依赖项的运行上下文

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.runconfig import CondaDependencies

# 添加模型所需的依赖项
# 对于 scikit-learn 模型，你需要 scikit-learn
# 对于并行管道步骤，你需要 azureml-core 和 azureml-dataprep[fuse]
cd = CondaDependencies.create(conda_packages=['scikit-learn','pip'],
                              pip_packages=['azureml-defaults','azureml-core','azureml-dataprep[fuse]'])

batch_env = Environment(name='batch_environment')
batch_env.python.conda_dependencies = cd
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

你将使用管道来运行批量预测脚本、从输入数据生成预测以及将结果作为文本文件保存在 outputs 文件夹中。为此，可以使用 **ParallelRunStep**，通过它可以并行处理批量数据，并将结果整理到名为 *parallel_run_step.txt* 的单个输出文件中。

> **备注**：可能会显示 *“*不建议使用“已启用””* 警告，你可以忽略此警告。

In [None]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.pipeline.core import PipelineData
from azureml.core.runconfig import DockerConfiguration

default_ds = ws.get_default_datastore()

output_dir = PipelineData(name='inferences', 
                          datastore=default_ds, 
                          output_path_on_compute='diabetes/results')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

现在应将此步骤放入管道中并运行它。

> **备注**：这可能需要一些时间！

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'mslearn-diabetes-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

管道运行完成后，生成的预测将保存在与管道中第一个（也是唯一一个）步骤相关联的试验输出中。你可按照以下步骤对其进行检索：

In [None]:
import pandas as pd
import shutil

# 如果之前的运行遗留了本地结果文件夹，请删除该文件夹
shutil.rmtree('diabetes-results', ignore_errors=True)

# 获取第一步骤的运行并下载其输出
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# 遍历文件夹层次结构并找到结果文件
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# 清理输出格式
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# 显示前 20 个结果
df.head(20)

## 发布管道并使用其 REST 接口

拥有用于批量推理的工作管道后，你可以发布它并使用 REST 终结点从应用程序中运行它。

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='diabetes-batch-pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

请注意，已发布的管道具有一个终结点，可在 Azure 门户中看到此终结点。还可以将其视为已发布管道对象的属性：

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

要使用该终结点，客户端应用程序需要通过 HTTP 进行 REST 调用。此请求必须经过身份验证，因此需要授权标头。为了测试这一点，我们将使用当前连接到 Azure 工作区的授权标头（可使用以下代码获取该标头）：

> **备注**：实际的应用程序需要使用服务主体进行身份验证。

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

现在可以调用 REST 接口。管道异步运行，因此我们将获得一个标识符，可用于跟踪运行中的管道试验：

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "mslearn-diabetes-batch"})
run_id = response.json()["Id"]
run_id

由于具有运行 ID，因此可以使用“**RunDetails**”小组件查看运行中的试验：

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments['mslearn-diabetes-batch'], run_id)

# 阻止至运行完成
published_pipeline_run.wait_for_completion(show_output=True)

等待管道完成运行，然后运行以下单元格以查看结果。

如前所述，结果位于第一个管道步骤的输出中：

In [None]:
import pandas as pd
import shutil

# 如果之前的运行遗留了本地结果文件夹，请删除该文件夹
shutil.rmtree('diabetes-results', ignore_errors=True)

# 获取第一步骤的运行并下载其输出
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')

# 遍历文件夹层次结构并找到结果文件
for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# 清理输出格式
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# 显示前 20 个结果
df.head(20)

现在你具有一个可用于批量处理每日患者数据的管道。

**详细信息**：若要详细了解如何使用管道进行批量推理，请参阅 Azure 机器学习文档中的[如何运行批量预测](https://docs.microsoft.com/azure/machine-learning/how-to-run-batch-predictions)。