# パイプラインを作成する

前のラボでは、Azure Machine Learning SDK を使用して、データへのアクセスからトレーニング実験の実行、機械学習モデルの登録に至るモデル トレーニング プロセス全体を説明しました。これまでは、機械学習ソリューションを対話形式で作成するために必要なさまざまな手順を実行してきました。このラボでは、*パイプライン*を使用してこれらの手順を自動化する方法について説明します。

## ワークスペースに接続する

まず、Azure ML SDK を使用してワークスペースに接続する必要があります。

> **注**: 前回の演習を完了してから Azure サブスクリプションとの認証済みセッションの有効期限が切れている場合は、再認証を求めるメッセージが表示されます。

In [None]:
import azureml.core
from azureml.core import Workspace

# 保存した構成ファイルからワークスペースを読み込む
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## 実験用データを準備する

このラボでは、糖尿病患者の詳細を含むデータセットを使用します。次のセルを実行してこのデータセットを作成します (前のラボで作成した場合は、コードによって既存のバージョンが検索されます)。

In [None]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## パイプライン ステップ用スクリプトを作成する

パイプラインは、Python スクリプト、自動 ML トレーニング Estimator のような特殊なステップ、データをある場所から別の場所にコピーするデータ転送ステップなど、1 つ以上の*ステップ*で構成されます。各ステップは、独自のコンピューティング コンテキストで実行できます。

この演習では、(モデルをトレーニングするための) Estimator ステップと (トレーニング済みモデルを登録するための) Python スクリプト ステップを含む単純なパイプラインを構築します。

In [None]:
import os
# パイプライン ステップ ファイル用フォルダーを作成する
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

これで、モデルをトレーニングする最初のステップのスクリプトを作成できます。スクリプトには、トレーニング済みモデルを保存するフォルダーを参照する **output_folder** という名前のパラメーターが含まれます。

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# ライブラリをインポートする
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# パラメーターを取得する
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# 実験実行コンテキストを取得する
run = Run.get_context()

# 糖尿病データを読み込む (入力データセットとして渡される)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# フィーチャーとラベルを分離する
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# データをトレーニング セットとテスト セットに分割する
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# デシジョン ツリー モデルをトレーニングする
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# AUC を計算する
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# ROC 曲線をプロットする
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# 対角 50% ラインをプロットする
plt.plot([0, 1], [0, 1], 'k--')
# モデルによって達成された FPR と TPR をプロットする
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# トレーニング済みモデルを保存する
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

パイプラインの 2 番目のステップのスクリプトは、保存された場所からモデルを読み込み、ワークスペースに登録します。これには、モデルが保存されたパスを含む単一の **model_folder** パラメーターが含まれます。

In [None]:
%%writefile $experiment_folder/register_diabetes.py
# ライブラリをインポートする
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# パラメーターを取得する
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# 実験実行コンテキストを取得する
run = Run.get_context()

# モデルを読み込む
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

## パイプライン ステップ用コンピューティング環境を準備する

この演習では、両方のステップで同じコンピューティングを使用しますが、各ステップは独立して実行されることを認識することが重要です。必要に応じて、各ステップに異なるコンピューティング コンテキストを指定できます。

最初に、前のラボで作成したコンピューティング ターゲットを取得します (存在しない場合は作成されます)。

> **重要**: 実行する前に、*コンピューティング クラスター*を以下のコードでコンピューティング クラスターの名前に変更してください。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

# クラスターが存在することを確認する
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS2_V2', 
                                                           max_nodes=2)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

このコンピューティングには、必要なパッケージの依存関係がインストールされた Python 環境が必要となるため、実行構成を作成する必要があります。

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# 実験用 Python 環境を作成する
diabetes_env = Environment("diabetes-experiment-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# パッケージの依存関係のセットを作成する
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib', 'pandas'],
                                             pip_packages=['azureml-sdk','pyarrow'])

# 環境に依存関係を追加する
diabetes_env.python.conda_dependencies = diabetes_packages

# 環境を登録する (前のラボが完了していない場合)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-experiment-env')

# パイプライン用の新しい runconfig オブジェクトを作成する
pipeline_run_config = RunConfiguration()

# 上記で作成したコンピューティングを使用します。 
pipeline_run_config.target = pipeline_cluster

# 環境を実行構成に割り当てる
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## パイプラインを作成および実行する

これで、パイプラインを作成および実行する準備が整いました。

まず、パイプライン用ステップと、パイプライン間で渡す必要があるデータ参照を定義する必要があります。この場合、最初のステップでは、2 番目のステップで読み取ることができるフォルダーにモデルを書き込む必要があります。ステップはリモート コンピューティングで実行されるため (実際には、それぞれ異なるコンピューティングで実行できます)、ワークスペース内のデータストア内の場所に、フォルダー パスをデータ参照として渡す必要があります。**PipelineData** オブジェクトは、パイプライン ステップ間で渡すことができる中間ストレージの場所に使用される特殊な種類のデータ参照であるため、1 つを作成し、最初のステップの出力と 2 番目のステップの入力として使用します。また、コードがデータ参照によって参照されるデータストアの場所にアクセスできるように、スクリプト引数として渡す必要があることに注意してください。

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# トレーニング データセットを取得する
diabetes_ds = ws.datasets.get("diabetes dataset")

# モデル フォルダー用パイプラインデータ (一時データ参照) を作成する
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# 手順 2、モデル登録スクリプトを実行する
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

これで、定義した手順からパイプラインを構築し、実験として実行できます。

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# パイプラインを構築する
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# 実験を作成してパイプラインを実行する
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

上のウィジェットは、実行時のパイプラインの詳細を示しています。[Azure Machine Learning Studio](https://ml.azure.com) の**実験**ページでパイプラインの実行を監視することもできます。

パイプラインが終了したら、新しいモデルを*トレーニング コンテキスト* タグに登録して、パイプラインでトレーニングされたことを示します。これを確認するには、次のコードを実行します。

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

これは、原理を示すために設計された簡単な例です。実際には、より高度なロジックをパイプライン ステップに組み込むことができます。たとえば、一部のテスト データに対してモデルを評価して AUC や精度などのパフォーマンス メトリックを計算し、以前に登録したモデルのバージョンのメトリックと比較して、パフォーマンスが向上した場合のみ新しいモデルを登録します。

[Azure DevOps 用 Azure Machine Learning 拡張機能](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml)を使用して、Azure ML パイプラインと Azure DevOps パイプラインを組み合わせて (そう、同じ名前委の場合は混乱*します*)、モデルの再トレーニングを*継続的統合/継続的デプロイ (CI/CD) プロセス*に統合します。たとえば、Azure DevOps *ビルド* パイプラインを使用して、モデルをトレーニングおよび登録する Azure ML パイプラインをトリガーし、モデルが登録されると、モデルを Web サービスとしてデプロイする Azure Devops *リリース* パイプラインとモデルを使用するアプリケーションまたはサービスをトリガーできます。