# バッチ推論サービスの作成

以前のラボでは、Azure ML *パイプライン*を使用してモデルのトレーニングと登録を自動化し、リアルタイム*推論*（モデルから予測を取得する）のWebサービスとしてモデルを公開しました。次に、これら2つの概念を組み合わせて、*バッチ推論*のパイプラインを作成します。どういう意味ですか？ヘルスクリニックが1日中患者の測定を行い、各患者の詳細を個別のファイルに保存するとします。その後、夜通し、糖尿病予測モデルを使用して、その日の患者データをすべてバッチとして処理し、翌朝待機する予測を生成して、クリニックが糖尿病のリスクがあると予測される患者を追跡できるようにします。それがこの演習で実装するものです。

## ワークスペースに接続する

最初に行う必要があるのは、Azure ML SDKを使用してワークスペースに接続することです。

> **Note**: 前の演習を完了してから、Azureサブスクリプションとの認証済みセッションの有効期限が切れた場合、再認証するように求められます。

In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## モデルのトレーニングと登録

このコースの以前の実習ラボを修了している場合、**diabetes_model**モデルの多くのバージョンを登録しているので、すぐに使用できます。

そうでない場合は、下のセルを実行してモデルをトレーニングおよび登録できます。

In [None]:
from azureml.core import Experiment
from azureml.core import Model
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

# Create an Azure ML experiment in your workspace
experiment = Experiment(workspace = ws, name = "diabetes-training")
run = experiment.start_logging()
print("Starting experiment:", experiment.name)

# load the diabetes dataset
print("Loading Data...")
diabetes = pd.read_csv('data/diabetes.csv')

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train a decision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
model_file = 'diabetes_model.pkl'
joblib.dump(value=model, filename=model_file)
run.upload_file(name = 'outputs/' + model_file, path_or_stream = './' + model_file)

# Complete the run
run.complete()

# Register the model
run.register_model(model_path='outputs/diabetes_model.pkl', model_name='diabetes_model',
                   tags={'Training context':'Inline Training'},
                   properties={'AUC': run.get_metrics()['AUC'], 'Accuracy': run.get_metrics()['Accuracy']})

print('Model trained and registered.')

## バッチデータの生成とアップロード

実際には、このコースの新しいデータを取得する患者がいる完全なスタッフの診療所がないため、糖尿病CSVファイルからランダムなサンプルを生成し、それらを使用してパイプラインをテストします。次に、そのデータをAzure Machine Learningワークスペースのデータストアにアップロードし、そのデータセットを登録します。

In [None]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# Load the diabetes data
diabetes = pd.read_csv('data/diabetes2.csv')
# Get a 100-item sample of the feature columns (not the diabetic label)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# Create a folder
batch_folder = './batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="batch-data", target_path="batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'batch-data/'), validate=False)
batch_data_set = batch_data_set.register(workspace=ws, 
                                         name='batch-data',
                                         description='batch data',
                                         create_new_version=True)

print("Done!")

## Computeを作成

パイプラインの計算コンテキストが必要になるため、前の演習で使用したAzure ML計算クラスターを使用します（まだ存在しない場合は作成されます）。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "aml-cluster"

# Check for existing cluster
try:
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # Create an AzureMl Compute resource
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS12_V2', max_nodes=4)
    inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

inference_cluster.wait_for_completion(show_output=True)

## バッチ推論用のパイプラインを作成する

これで、バッチ推論に使用するパイプラインを定義する準備が整いました。パイプラインでは、バッチ推論を実行するためにPythonコードが必要になるため、パイプラインで使用されるすべてのファイルを保持できるフォルダーを作成しましょう:

In [None]:
import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

次に、実際の作業を行うPythonスクリプトを作成し、パイプラインフォルダーに保存します:

In [None]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

次に、スクリプトに必要な依存関係を含む実行コンテキストを定義します

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.runconfig import CondaDependencies

# Add dependencies required by the model
# For scikit-learn models, you need scikit-learn
cd = CondaDependencies.create(conda_packages=['pip==19.3.1'], pip_packages=['scikit-learn'])

batch_env = Environment(name='batch_environment')
batch_env.python.conda_dependencies = cd
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

パイプラインを使用してバッチ予測スクリプトを実行し、入力データから予測を生成し、結果をテキストファイルとして出力フォルダーに保存します。これを行うには、**ParallelRunStep**を使用します。これにより、バッチデータを並列処理し、結果を*parallel_run_step.txt*という名前の単一の出力ファイルで照合できます。

したがって、**ParallelRunStep**クラスを含むライブラリをインポートする必要があります。

In [None]:
!pip install --upgrade azureml-contrib-pipeline-steps

OK、**ParallelRunStep**パイプラインステップを定義する準備ができました。

In [None]:
from azureml.contrib.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.pipeline.core import PipelineData

default_ds = ws.get_default_datastore()
model = ws.models['diabetes_model']

output_dir = PipelineData(name='inferences', 
                          datastore=default_ds, 
                          output_path_on_compute='diabetes/results')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=4)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    models=[model],
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

次に、ステップをパイプラインに入れて実行します。

> **Note**: これには時間がかかる場合があります！

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

パイプラインの実行が終了すると、結果の予測は、パイプラインの最初の（そして唯一の）ステップに関連付けられた実験の出力に保存されます。次のように取得できます:

In [None]:
import pandas as pd
import shutil

shutil.rmtree('diabetes-results', ignore_errors=True)

prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-results')


for root, dirs, files in os.walk('diabetes-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

## パイプラインを公開し、RESTインターフェイスを使用します

バッチ推論用の作業パイプラインができたので、それを公開し、RESTエンドポイントを使用してアプリケーションから実行できます。

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='Diabetes_Parallel_Batch_Pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

公開されたパイプラインにはエンドポイントがあり、Azureポータルで確認できます。また、公開されたパイプラインオブジェクトのプロパティとして見つけることもできます。:

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

エンドポイントを使用するには、クライアントアプリケーションがHTTP経由でREST呼び出しを行う必要があります。この要求は認証される必要があるため、認証ヘッダーが必要です。これをテストするために、現在のAzureワークスペースへの接続の認証ヘッダーを使用します。これは、次のコードを使用して取得できます。

> **Note**: 実際のアプリケーションでは、認証に使用するサービスプリンシパルが必要です。

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print(auth_header)

これで、RESTインターフェースを呼び出す準備ができました。パイプラインは非同期に実行されるため、識別子を取得します。これを使用して、実行中のパイプラインの実験を追跡できます:

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Batch_Pipeline_via_REST"})
run_id = response.json()["Id"]
run_id

実行IDがあるため、**RunDetails**ウィジェットを使用して、実行中の実験を表示できます:

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["Batch_Pipeline_via_REST"], run_id)
RunDetails(published_pipeline_run).show()

前と同様に、結果は最初のパイプラインステップの出力にあります。:

In [None]:
import pandas as pd
import shutil

shutil.rmtree("diabetes-results", ignore_errors=True)

prediction_run = next(published_pipeline_run.get_children())
prediction_output = prediction_run.get_output_data("inferences")
prediction_output.download(local_path="diabetes-results")


for root, dirs, files in os.walk("diabetes-results"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

これで、毎日の患者データをバッチ処理するために使用できるパイプラインができました。

**詳しくは**: バッチ推論にパイプラインを使用する方法の詳細については、Azure Machineの[バッチ予測の実行方法](https://docs.microsoft.com/azure/machine-learning/how-to-run-batch-predictions)を参照してください。学習ドキュメント。