# パイプラインを作成する

Azure ML SDK を使用してスクリプトベースの実験を実行すると、データの取り込み、モデルのトレーニング、モデルの登録に必要なさまざまな手順を個々に実行できます。ただし、エンタープライズ環境では、機械学習ソリューションの構築に必要な不連続手順のシーケンスは通常、*パイプライン*にカプセル化されます。このパイプラインは、ユーザーからのデマンド、自動構築プロセス、またはスケジュールに従って、ひとつ以上のコンピューティング先で実行できます。

このノートブックでは、あらゆる要素をまとめてシンプルなパイプラインを作成し、データを前処理して、モデルのトレーニングと登録を行います。

## ワークスペースに接続する

作業を開始するには、ワークスペースに接続します。

> **注**: Azure サブスクリプションでまだ認証済みのセッションを確立していない場合は、リンクをクリックして認証コードを入力し、Azure にサインインして認証するよう指示されます。

In [None]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## データを準備する

パイプラインでは、糖尿病患者の詳細を含むデータセットを使用します。次のセルを実行して、このデータセットを作成します (以前に作成していた場合は、コードが既存のバージョンを検索します)。

In [None]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

## パイプライン手順のスクリプトを作成する

パイプラインはひとつ以上の*手順*で構成されます。Python スクリプトの場合もあれば、データをひとつの場所から別の場所にコピーするデータ転送手順のように特別な手順の場合もあります。各ステップは、独自のコンピューティング コンテキストで実行できます。この演習では、2 つの Python スクリプトの手順を含むシンプルなパイプラインを構築します。ひとつの手順では一部のトレーニング データを前処理し、もうひとつの手順では前処理されたデータを使用してモデルのトレーニングと登録を行います。

まず、パイプラインの手順で使用するスクリプト ファイル用のフォルダーを作成しましょう。

In [None]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

糖尿病データセットからデータを読み取り、一部のシンプルな前処理を適用してデータが欠落している行を削除し、類似したスケールになるように数値の特徴を正規化する最初のスクリプトを作成しましょう。

スクリプトには **--prepped-data** という名前の引数が含まれます。これは、結果的に生じるデータを保存するフォルダーを参照します。

In [None]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

これで、モデルをトレーニングする 2 番目の手順のスクリプトを作成できます。スクリプトには **--training-data** という名前の引数が含まれます。これは、準備済みのデータが以前の手順で保存された場所を参照します。

In [None]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

## パイプライン手順用のコンピューティング環境を準備する

この演習では、両方のステップで同じコンピューティングを使用しますが、各ステップは独立して実行されることを認識することが重要です。必要に応じて、各ステップに異なるコンピューティング コンテキストを指定できます。

最初に、前のラボで作成したコンピューティング ターゲットを取得します (存在しない場合は作成されます)。

> **重要**: 実行する前に、以下のコードで *your-compute-cluster* をコンピューティング クラスターの名前に変更してください。クラスター名は、長さが 2 〜 16 文字のグローバルに一意の名前である必要があります。英字、数字、- の文字が有効です。

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "your-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

> **注**: コンピューティング インスタンスとクラスターは、スタンダードの Azure 仮想マシンのイメージに基づいています。この演習では、コストとパフォーマンスの最適なバランスを実現するために、*Standard_DS11_v2* イメージが推薦されます。サブスクリプションにこのイメージを含まないクォータがある場合は、別のイメージを選択してください。 ただし、画像が大きいほどコストが高くなり、小さすぎるとタスクが完了できない場合があることに注意してください。Azure 管理者にクォータを拡張するように依頼していただくことも可能です。

コンピューティングには、必要なパッケージの依存関係がインストールされた Python 環境が必要です。

In [None]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Conda 構成ファイルができたら、環境を作成してパイプラインの実行構成で使用することができます。

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## パイプラインを作成して実行する

これで、パイプラインを作成および実行する準備が整いました。

まず、パイプラインの手順と、パイプライン間で渡す必要があるデータ参照を定義する必要があります。この場合、最初の手順では、2 番目の手順で読み取ることができるフォルダーに準備済みのデータを書き込む必要があります。これらの手順はリモート コンピューティングで実行されるため (実際には、それぞれ異なるコンピューティングで実行できます)、ワークスペース内のデータストア内の場所に、フォルダー パスをデータ参照として渡す必要があります。**OutputFileDatasetConfig** オブジェクトは、パイプラインの手順間で渡すことができる中間ストレージの場所で使われる特殊な種類のデータ参照なので、1 つ作成して、最初の手順の出力と 2 番目の手順の入力として使用します。コードがデータ参照によって参照されるデータストアの場所にアクセスできるように、スクリプト引数として渡す必要があることに注意してください。

In [None]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

これで、定義した手順からパイプラインを構築し、実験として実行できます。

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

実行中のパイプライン実験のグラフィカル表示がウィジェットに表示されます。ページの右上にあるカーネル インジケータに注目してください。**&#9899;** から **&#9711;** に変わると、コードの実行が終了します。[Azure Machine Learning Studio](https://ml.azure.com) の**実験**ページでパイプラインの実行を監視することもできます。

パイプラインが終了すると、子実行により記録された指標を確認できます。

In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

パイプラインが成功すると、新しいモデルを*トレーニング コンテキスト* タグに登録して、パイプラインでトレーニングされたことを示す必要があります。これを確認するには、次のコードを実行します。

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## パイプラインを発行する

パイプラインを作成してテストした後、REST サービスとして公開できます。

In [None]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

公開されたパイプラインにはエンドポイントがあり、[Azure Machine Learning Studio](https://ml.azure.com) の**エンドポイント** ページ (**パイプライン エンドポイント** タブ) に表示されます。また、公開されたパイプライン オブジェクトのプロパティとして URI を見つけることもできます。

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

## パイプライン エンドポイントを呼び出す

エンドポイントを使用するには、クライアント アプリケーションが HTTP 経由で REST 呼び出しを行う必要があります。この要求は認証される必要があるため、Authorization ヘッダーが必要です。実際のアプリケーションには、認証に使用するサービス プリンシパルが必要ですが、これをテストするには、現在の接続から Azure ワークスペースへの Authorization ヘッダーを使用します。これは、次のコードを使用して取得できます。

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

これで、REST インターフェイスを呼び出す準備ができました。パイプラインは非同期で実行されるため、識別子を取得します。実行中のパイプライン実験を追跡するために使用できます。

In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

実行 ID を持っているので、その ID を使用し、実行が完了するのを待ちます。

> **注**: 各ステップは出力の再利用を許可するように構成されているため、パイプラインは迅速に完了するはずです。これは、主に利便性とこのコースの時間を節約するために行われました。実際には、データが変更された場合に備えて、最初のステップを毎回実行し、ステップ 1 の出力が変更された場合にのみ後続のステップをトリガーする必要があります。

In [None]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

## パイプラインをスケジュールする

たとえば、糖尿病患者の診療所が毎週新しいデータを収集し、データセットに追加したとします。パイプラインを毎週実行して、新しいデータでモデルを再トレーニングできます。

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

以下のように、ワークスペースで定義されているスケジュールを取得できます。

In [None]:
schedules = Schedule.list(ws)
schedules

次のように最新の実行を確認できます。

In [None]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

これは、原理を示すために設計された簡単な例です。実際には、より高度なロジックをパイプライン ステップに組み込むことができます。たとえば、一部のテスト データに対してモデルを評価して AUC や精度などのパフォーマンス メトリックを計算し、以前に登録したモデルのバージョンのメトリックと比較して、パフォーマンスが向上した場合のみ新しいモデルを登録します。

[Azure DevOps 用 Azure Machine Learning 拡張機能](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml)を使用して、Azure ML パイプラインと Azure DevOps パイプラインを組み合わせて (そう、同じ名前委の場合は混乱*します*)、モデルの再トレーニングを*継続的統合/継続的デプロイ (CI/CD) プロセス*に統合します。たとえば、Azure DevOps *ビルド* パイプラインを使用して、モデルをトレーニングおよび登録する Azure ML パイプラインをトリガーし、モデルが登録されると、モデルを Web サービスとしてデプロイする Azure Devops *リリース* パイプラインとモデルを使用するアプリケーションまたはサービスをトリガーできます。