# dp100_14 パイプラインを使用して機械学習を調整する

パイプラインという用語は機械学習で広く使用されており、多くの場合意味はさまざま。  
たとえば、scikit-learnでは、パイプラインを定義して、データ前処理変換とトレーニングアルゴリズムを組み合わせることができる。  
また、Azure DevOpsでは、ビルドまたはリリースのパイプラインを定義して、ソフトウェアの配布に必要なビルドと厚生のタスクを実行できる。  
この子ジュールで焦点を当てるのは**AzureMLパイプラインで、これは実験として実行できるステップをカプセル化するため**のもの。  
ただし、念頭に置くべき点として、Azure DevOpsパイプラインのなkのタスクによってAzureMLパイプラインを開始させ、  
次いでその中にscikit-learnパイプラインを基にモデルをトレーニングするステップを含めても一向に差し支えがない。

## パイプラインの概要

パイプラインは、各タスクが1つの"ステップ"として実装される機械学習タスクのワークフロー。

ステップを順番にまたは並列で配置することで、機械学習操作を調整する高度なフローロジックを作成できる。  
各ステップは特定のコンピューティング先で実行できるため、必要に応じてさまざまな種類のプロセスを組み合わせて、全体的な目標を達成することができる。

パイプラインは1つのプロセスとして実行できる。そのためには、パイプラインを実験として実行する。  
パイプライン実行の各ステップは、実験実行全体の一部として、それぞれに割り当てられたコンピューティング先で実行される。

パイプラインをRESTエンドポイントとして発行し、クライアントアプリケーションでパイプライン実行を開始できるようにすることができる。  
また、パイプラインのスケジュールを定義し、定期的に自動で実行するように設定することもできる。

### パイプラインのステップ

パイプラインは、タスクを実行する1つ以上の"ステップ"で構成される。  
AzureMLパイプラインではさまざまな種類のステップがサポートされており、それぞれに固有の目的と構成のオプションがある。

AzureMLパイプラインでの一般的なステップは次の通り。

- PythonScriptStep:
    - 指定されたPythonスクリプトを実行する
- DataTransferStep:
    - Azure Data Factoryを使用してデータストア間でデータコピー
- DatabriksStep:
    - Databricksクラスt-あでノートブック、スクリプト、またはコンパイル済みJARを実行
- AdlaStep:
    - Azure Data Lake AnalyticsでU-SQLジョブを実行する
- ParallelRunStep:
    - 複数のコンピューティングノードで分散タスクとしてPythonスクリプトを実行
    
パイプラインを作成するには、まず各ステップを定義してから、それらのステップを含むパイプラインを作成する必要がある。  
各ステップの特定の構成は、ステップの種類によって異なる。  
たとえば、以下のコードでは、データを準備してモデルをトレーニングするための2つの**PythonScriptStep**ステップが定義される。

```
from azureml.pipeline.steps import PythonScriptStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster')

# Step to train a model
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster')
```

ステップを定義したら、それらをパイプラインに割り当てて、実験として実行できる。

```
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# パイプライン構築
train_pipeline = Pipeline(workspace = ws, steps = [step1,step2])

# 実験を行い、パイプラインを実行する
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)
```

## パイプラインのステップ間でデータを渡す

多くの場合、パイプラインの線には、前のステップの出力に依存するステップが少なくとも1つ含まれている。  
例えば、Pythonスクリプトを実行してデータを事前に処理するステップを使用し、これを後続のステップでモデルをトレーニングするために使用する必要がある、という場合が考えられる。

### PipelineDataオブジェクト

**PipelineData**オブジェクトは、特殊な種類の**DataReference**であり、用途は次の通り。

- データストア内の場所を参照する
- パイプラインステップ間のデータ依存関係を作成する

**PipelineData**オブジェクトは、あるステップから後続のステップに渡す必要があるデータの中間ストアとみなせる。

### PipelineDataステップの入力と出力

**PipelineData**オブジェクトを使用してステップ間でデータを渡すには、次の操作を行う必要がある。

1. データストア内の場所を参照する、名前付き**PipelineData**オブジェクトを定義する
2. スクリプトを実行するステップで、**PipelineData**オブジェクトをスクリプト引数として渡す
    - また、データの読み取りまたは書き込みを行うためのコードをスクリプトに含める
3. 必要に応じて、ステップの"入力"または"出力"として**PipelineData**オブジェクトを指定する

たとえば、次のコードではステップ間で渡される必要がある前処理されたデータを表す**PipelineData**オブジェクトが定義される。

```
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Get a dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# Define a PipelineData object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = PipelineData('prepped',  datastore=data_store)

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Script arguments include PipelineData
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),
                                      '--out_folder', prepped_data],
                         # Specify PipelineData as output
                         outputs=[prepped_data])

# Step to run an estimator
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Pass as script argument
                         arguments=['--in_folder', prepped_data],
                         # Specify PipelineData as input
                         inputs=[prepped_data])
```

スクリプト自体の中では、スクリプト引数から**PipelineData**オブジェクトへの参照を取得し、ローカルフォルダのように使用することができる。

```
# code in data_prep.py
from azureml.core import Run
import argparse
import os

# Get the experiment run context
run = Run.get_context()

# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id')
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']]

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)
```

## パイプラインのステップの再利用

実行時間の長いステップが複数あるパイプラインは、完了までにかなりの時間をかかることがある。  
AzureMLには、この時間を短縮するためのキャッシュと再利用の機能が含まれている。

### ステップの出力の再利用を管理する

規定では、前のパイプライン実行のステップ出力は、そのステップのスクリプト、ソースディレクトリ、  
その他のパラメータが変更されていない場合に、ステップを再実行せずに再利用される。  
ステップを再利用すると、パイプラインの実行にかかる時間を短縮できるが、  
ダウンストリームのデータソースへの変更が反映されていない場合は、古い結果になる恐れがある。

個々のステップの再利用を制御するには、次のようにステップの構成で**allow_reuse**パラメータを設定する。

```
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         # Disable step reuse
                         allow_reuse = False)
```

### すべてのステップを強制的に実行する

複数のステップがある場合、パイプライン実験を送信するときに**regenerate_outputs**パラメータを設定することによって、  
個々の再利用の構成に関係なく、すべてのステップを強制的に実行できる。

```
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)
```

## パイプラインを発行する

パイプラインを作成したらそれを発行し、必要に応じてパイプラインを実行できるようにRESTエンドポイントを作成できる。

### パイプラインを発行する

パイプラインを発行するには、**publish**メソッドを呼び出すことができる。

```
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')
```

または、パイプラインが正常に実行されたときに**publish**メソッドを呼び出すこともできる。

```
# パイプラインの最新の実行結果を取得
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Runからのパイプラインの公開
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')
```

パイプラインが発行されると、AzureMLスタジオで確認できる。  
以下のようにエンドポイントのURIを決定することもできる。

```
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)
```

### 発行されたパイプラインを使用する

発行されたエンドポイントを開始するには、RESTエンドポイントに対してHTTP要求を行い、  
パイプラインを実行するアクセス許可を持つサービスプリンシパルのトークンを指定したAuthorizationヘッダーと、  
実験名を指定したJSONペイロードを渡す。  
パイプラインは非同期に実行されるため、正常なREST呼び出しからの応答には実行IDが含まれる。これを使用してAzureML下地おでの実行を追跡できる。

例えば、次のPythonコードでは、パイプラインを実行するためのREST要求が行われ、返された実行IDが表示される。

```
import requests

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline"})
run_id = response.json()["Id"]
print(run_id)
```

## パイプラインパラメータを使用する

パラメータを定義することで、パイプラインの柔軟性を高めることができる。

### パイプラインのパラメータを定義する

パイプラインのパラメータを定義するには、パラメータごとに**PipelineParameter**オブジェクトを作成し、  
各パラメータを少なくとも1つのステップで指定する。

たとえば、次のコードを使用して、推定機によって使用されるスクリプトに正規化率を表すパラメータを含めることができる。

```
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)

...

step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Pass parameter as script argument
                         arguments=['--in_folder', prepped_data,
                                    '--reg', reg_param],
                         inputs=[prepped_data])
```

> 注:パイプラインを発行する前に、そのパラメータを定義する必要がある

### パイプラインを使用してパイプラインを実行する

パラメータ化されたパイプラインを発行した後、RESTインターフェイスのJSONペイロードに入れてパラメータ値を渡すことができる。

```
response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline",
                               "ParameterAssignments": {"reg_rate": 0.1}})
```

## パイプラインのスケジュール設定

パイプラインを発行した後は、RESTエンドポイントを使用して必要に応じてこれを開始できる。  
また、定期的なスケジュールに基づいて、またはデータ更新に応じてパイプラインを自動的に実行することもできる。

### 定期的な間隔でパイプラインのスケジュールを設定する

パイプラインが定期的に実行されるようにスケジュールを設定するには、実行頻度を決定する**ScheduleRecurrence**を定義し、  
それを使用して**Schedule**を作成する必要がある。

例えば、次のコードを使用すると、発行されたパイプラインが毎日実行されるようスケジュールが設定される。

```
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Training',
                                        description='trains model every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Training_Pipeline',
                                        recurrence=daily)
```

### データ変更時にパイプライン実行をトリガーする

データが変更されるたびにパイプラインが実行されるようにスケジュールを設定するには、  
次のようにデータストアの指定されたパスを監視する**Schedule**を作成する必要がある。

```
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline_id,
                                    experiment_name='Training_Pipeline',
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')
```

## 演習 パイプラインを作成する

このノートブックでは、機械学習ソリューションの構築に必要な一連の個別のステップをまとめ、データの前処理、  
モデルのトレーニングと登録を行うシンプルなパイプラインを作成する。

### ワークスペースの接続

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.28.0 to work with 20210613


### データの準備

In [2]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


### パイプラインステップのスクリプト作成

パイプラインは1つまたは複数のステップで構成されており、Pythonスクリプトや、データを有る場所から別の場所にコピーするデータ転送ステップなどの特殊なステップがある。  
各ステップは、独自のコンピュートコンテキストで実行できる。  
この演習では、2つのPythonスクリプトステップを含むシンプルなパイプラインを構築する。  
1つはトレーニングデータを前処理するステップで、もう1つは前処理したデータを使ってモデルをトレーニングして登録するステップ。

まず、パイプラインのステップで使用するスクリプトファイルのためのフォルダを作成する。

In [3]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


最初のスクリプトを作成する。このスクリプトは、糖尿病データセットからデータを読み込み、簡単な前処理を適用して、  
データが欠落している行を削除し、数値特徴が同じようなスケールになるように正規化する。

このスクリプトには、**--prepped-data**という引数があり、これは結果のデータを保存するフォルダを参照する。

In [4]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Writing diabetes_pipeline/prep_diabetes.py


次に、2つ目のスクリプトを作成する。  
スクリプトには、**--training-folder**という引数があり、前のステップで準備したデータが保存されているフォルダを参照する。

In [5]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-folder", type=str, dest='training_folder', help='training data folder')
args = parser.parse_args()
training_folder = args.training_folder

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_folder,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Writing diabetes_pipeline/train_diabetes.py


### パイプラインステップのためのコンピューティング環境の準備

必要に応じて各ステップに異なるコンピューティングコンテキストを指定することができる。  
まず、前回作成したコンピューティングターゲットを取得する。

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "msl-20210613b"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


このコンピューティングには、必要なパッケージの依存関係がインストールされたPython環境が必要となるため、  
実行設定を作成する必要がある。

In [7]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib','pandas','pip'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]','pyarrow'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment 
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


### パイプラインの作成と実行

まずパイプラインのステップを定義し、それらの間で渡す必要のあるデータ参照を定義する必要がある。  
今回の例では、最初のステップが準備したデータをフォルダに書き込み、2番目のステップがそれを読み込めるようにする。  
各ステップはリモートコンピュートで実行されるため(実際にはそれぞれ別のコンピュートで実行される可能性もある)、  
フォルダのパスはワークスペース内のデータストアの場所へのデータ参照として渡される必要がある。  
PipelineDataオブジェクトは、パイプラインステップ間で渡すことができる中間ストレージの場所に使用される特別な種類のデータ参照なので、  
これを作成して最初のステップの出力と2番目のステップの入力として使用する。  
また、データ参照によって参照されるデータストアの場所にコードがアクセスできるように、スクリプト引数として渡す必要があることに注意する。

In [8]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

In [9]:
# Create a PipelineData (temporary Data Reference) for the model folder
prepped_data_folder = PipelineData("prepped_data_folder", datastore=ws.get_default_datastore())

In [10]:
# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data_folder],
                                outputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

In [12]:
# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-folder', prepped_data_folder],
                                inputs=[prepped_data_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

これで、定義したステップからパイプラインを構築し、実験として実行する準備が整った。

In [13]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

Pipeline is built.


In [14]:
# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Created step Prepare Data [d29111bb][33517913-ded8-4a97-a34d-83f76102c1bd], (This step will run and generate new outputs)
Created step Train and Register Model [b742241d][c2f0785e-c28f-4464-9c23-381bebb1c2f6], (This step will run and generate new outputs)
Submitted PipelineRun 0268e2e0-be38-4866-82da-aa4a93d1a249
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/0268e2e0-be38-4866-82da-aa4a93d1a249?wsid=/subscriptions/153404fd-72ab-4092-b50e-de490c5509fc/resourcegroups/20210613/workspaces/20210613&tid=5456e8d8-0223-4619-ba5b-e313627da53d
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 0268e2e0-be38-4866-82da-aa4a93d1a249
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/0268e2e0-be38-4866-82da-aa4a93d1a249?wsid=/subscriptions/153404fd-72ab-4092-b50e-de490c5509fc/resourcegroups/20210613/workspaces/20210613&tid=5456e8d8-0223-4619-ba5b-e313627da53d
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: cc53dec4-6cb1-42c7-bd35-8f3213d63bc2
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/cc53dec4-6cb1-42c7-bd35-8f3213d63bc2?wsid=/subscriptions/153404fd-72ab-4092-b50e-de490c5509fc/resourcegroups/20210613/workspaces/20210613&tid=5456e8d8-0223-4619-ba5b-e313627da53d
StepRun( Prepare Data ) Status: NotStarted
StepRun( Prepare Data ) Status: Running

Streaming azureml-logs/20_image_build_log.txt
2021/06/14 17:43:37 Downloading source code...
2021/06/14 17:43:38 Finished downloading source code
2021/06/14 17:43:38 Creating Docker network: acb_default_network, driver: 'bridge'
2021/06/14 17:43:39 Successfully set u

'Finished'

パイプラインが終了すると、その子実行が記録したメトリックを調べることができる。

In [15]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

Train and Register Model :
	 Accuracy : 0.9004444444444445
	 AUC : 0.8854221505732166
	 ROC : aml://artifactId/ExperimentRun/dcid.e774011e-35ac-41be-a84c-ce20bdc56a52/ROC_1623693541.png
Prepare Data :
	 raw_rows : 15000
	 processed_rows : 15000


パイプラインが成功すると、新しいモデルが、パイプラインでトレーニングされたことを示すTraining contextタグとともに登録される。  
以下のコードを実行して確認してみる。

In [16]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 9
	 Training context : Pipeline
	 AUC : 0.8854221505732166
	 Accuracy : 0.9004444444444445


diabetes_model version: 8
	 Training context : Compute cluster
	 AUC : 0.8840918562273435
	 Accuracy : 0.8991111111111111


diabetes_model version: 7
	 Training context : File dataset
	 AUC : 0.8568743524381947
	 Accuracy : 0.7891111111111111


diabetes_model version: 6
	 Training context : Tabular dataset
	 AUC : 0.8568509052814499
	 Accuracy : 0.7891111111111111


diabetes_model version: 5
	 Training context : Tabular dataset
	 AUC : 0.8568509052814499
	 Accuracy : 0.7891111111111111


diabetes_model version: 4
	 Training context : Tabular dataset
	 AUC : 0.8568509052814499
	 Accuracy : 0.7891111111111111


diabetes_model version: 3
	 Training context : Tabular dataset
	 AUC : 0.8568509052814499
	 Accuracy : 0.7891111111111111


diabetes_model version: 2
	 Training context : Parameterized script
	 AUC : 0.8483198169063138
	 Accuracy : 0.774


diabetes_model version: 1


### パイプラインの登録

パイプラインを作成してテストした後は、RESTサービスとして公開することができる。

In [17]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
diabetes-training-pipeline,4c67cd90-bc1f-4e96-b252-04f3c5b3d88d,Active,REST Endpoint


公開されたパイプラインにはエンドポイントがあり、オブジェクトのプロパティとして確認することができる。

In [18]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://westus2.api.azureml.ms/pipelines/v1.0/subscriptions/153404fd-72ab-4092-b50e-de490c5509fc/resourceGroups/20210613/providers/Microsoft.MachineLearningServices/workspaces/20210613/PipelineRuns/PipelineSubmit/4c67cd90-bc1f-4e96-b252-04f3c5b3d88d


### パイプラインのエンドポイントを呼び出す

エンドポイントを利用するには、クライアントアプリケーションがHTTPでRESTコールを行う必要がある。  
このリクエストは認証されなければならないので、authorizationヘッダーが必要。  
実際のアプリケーションでは、認証されるためのサービス・プリンシパルが必要だが、これをテストするために、  
Azureワークスペースへの源氏亜の接続からの認証ヘッダーを使用する。

なお、以下のコードで取得できる。

In [19]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Authentication header ready.


パイプラインは非同期的に実行されるので、識別子が返ってくる。  
この識別子を使って、パイプラインの実験が実行されているかどうかを追跡することができる。

In [21]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'374b26ab-73e4-4b8c-a9b2-ad5c103ea138'

RunIDをもっているため、それを用いてRunが完了するのを待つことができる。

> 注:各ステップの出力は再利用できるように設定されているため、パイプラインはすぐ完了するはずで、  
これはこのコースの利便性と時間短縮のために行ったもの。  
実際には、データが変更された場合に備えて、最初のステップを毎回実行し、ステップ1からの出力が変更された場合にのみ、  
後続のステップをトリガーするようにすると良い。

In [22]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 374b26ab-73e4-4b8c-a9b2-ad5c103ea138
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/374b26ab-73e4-4b8c-a9b2-ad5c103ea138?wsid=/subscriptions/153404fd-72ab-4092-b50e-de490c5509fc/resourcegroups/20210613/workspaces/20210613&tid=5456e8d8-0223-4619-ba5b-e313627da53d

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '374b26ab-73e4-4b8c-a9b2-ad5c103ea138', 'status': 'Completed', 'startTimeUtc': '2021-06-14T18:02:40.22368Z', 'endTimeUtc': '2021-06-14T18:02:43.098834Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.pipelineid': '4c67cd90-bc1f-4e96-b252-04f3c5b3d88d'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://202106138491592323.blob.core.windows.net/azureml/ExperimentRun/dcid.374b26ab-73e4-4b8c-a9b2-ad5c103ea138/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=QS8g7vuv%2FDuR6x9OMkC7aME

'Finished'

### パイプラインをスケジューリングする

糖尿病患者のクリニックが毎週新しいデータを収集し、データセットに追加しているとする。  
パイプラインを毎週実行して、新しいデータでモデルを再学習することができる。

In [23]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# 毎週月曜0時(UTC時刻)に、パイプラインを実行する
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Pipeline scheduled.


ワークスペースに定義されているスケジュールを取得するには、以下のようにする。

In [24]:
schedules = Schedule.list(ws)
schedules

[Pipeline(Name: weekly-diabetes-training,
 Id: bc8e7161-8c3a-4ae8-8222-a8cccac95d14,
 Status: Active,
 Pipeline Id: 4c67cd90-bc1f-4e96-b252-04f3c5b3d88d,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:00 on Monday every Week)]

以下のようにして、最終実行履歴を確認することができる。

In [25]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = [0]

latest_run.get_details()

{'runId': 'd2a5da1b-7f9d-4adf-acac-2403dbae0272',
 'status': 'Completed',
 'startTimeUtc': '2021-06-14T18:06:28.973779Z',
 'endTimeUtc': '2021-06-14T18:06:31.859857Z',
 'properties': {'azureml.git.repository_uri': 'https://github.com/iguru0331/mslearn-dp100.git',
  'mlflow.source.git.repoURL': 'https://github.com/iguru0331/mslearn-dp100.git',
  'azureml.git.branch': 'main',
  'mlflow.source.git.branch': 'main',
  'azureml.git.commit': 'ed510154034e8c2bf830bb52bd28e078b6c98d13',
  'mlflow.source.git.commit': 'ed510154034e8c2bf830bb52bd28e078b6c98d13',
  'azureml.git.dirty': 'True',
  'azureml.runsource': 'azureml.PipelineRun',
  'runSource': 'Unavailable',
  'runType': 'Schedule',
  'azureml.parameters': '{}',
  'azureml.pipelineid': '4c67cd90-bc1f-4e96-b252-04f3c5b3d88d'},
 'inputDatasets': [],
 'outputDatasets': [],
 'logFiles': {'logs/azureml/executionlogs.txt': 'https://202106138491592323.blob.core.windows.net/azureml/ExperimentRun/dcid.d2a5da1b-7f9d-4adf-acac-2403dbae0272/logs/azur

## 知識チェック

1. 2 つのステップを含むパイプラインを作成しているとします。 ステップ 1 ではいくつかのデータが前処理され、  
ステップ 2 では前処理されたデータを使用してモデルがトレーニングされます。  
ステップ 1 からステップ 2 にデータを渡し、これらのステップの間に依存関係を作成するために使用するオブジェクトの種類を指定してください。

    - データストア
    - PipelineData
    - データ参照


2. 毎週実行するパイプラインを発行しました。 Schedule.create メソッドを使用してスケジュールを作成することを計画しているとします。  
パイプライン実行の頻度を構成するために最初に作成する必要のあるオブジェクトの種類を指定してください。

    - データストア
    - PipelineParameter
    - ScheduleRecurrence

↓解答

1. PipelineData
    - パイプライン内のステップ間でデータを渡すには、PipelineData オブジェクトを使用します。
2. ScheduleRecurrence
    - 定期的に実行されるスケジュールを作成するには、ScheduleRecurrence オブジェクトが必要です。