Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# 予測 パイプライン - カスタム スクリプト
---

このノートブックでは、前回のステップでトレーニングしたモデルを使用して販売のバッチ予測を行うパイプラインを作成します。設定する予測パイプラインは、前回の手順で作成したトレーニング パイプラインに似ていますので、ドキュメントを簡易的に記載します。手順と機能の詳細については、以前のノートブックを参照してください。

### 前提条件
この時点で、次の内容が完了している必要があります:

1. [00_Setup_AML_Workspace notebook](../../00_Setup_AML_Workspace.ipynb) を使用して AML ワークスペースが作成作成済みであること
2. [01_Data_Preparation.ipynb](../../01_Data_Preparation.ipynb) を実行してデータセットが作成済みであること
3. [02_CustomScript_Training_Pipeline.ipynb](02_CustomScript_Training_Pipeline.ipynb) を実行してモデルがトレーニング済みであること

#### Azure ML SDK の最新バージョンを使用していることを確認し、パイプライン ステップ パッケージをインストールしてください。

In [None]:
#!pip install --upgrade azureml-sdk

In [None]:
# !pip install azureml-pipeline-steps

## 1.0 ワークスペースとデータストアへの接続

In [None]:
from azureml.core import Workspace
from azureml.core import Datastore

ws = Workspace.from_config()

# データストアのセットアップ
dstore = ws.get_default_datastore()

print('Workspace Name: ' + ws.name, 
      'Azure Region: ' + ws.location, 
      'Subscription Id: ' + ws.subscription_id, 
      'Resource Group: ' + ws.resource_group, sep='\n')

## 2.0 実験の作成

In [None]:
from azureml.core import Experiment

experiment = Experiment(ws, 'forecasting_pipeline')

## 3.0 データセットの取得

[データ準備ノートブック](../01_Data_Preparation.ipynb)では、予測目的でオレンジジュースのサブセットを登録しました。ここではデータストアから、そのデータセットへの参照を取得します。[モデリング ノートブック](02_CustomScript_Training_Pipeline.ipynb)でトレーニングされたモデルを使用して、各推論ファイルのすべての行に対する予測を生成します。

11,973行の時系列データのファイルのサブセット、または完全なデータセットでパイプラインを実行することを選択できます。ファイルのサブセットのみを使用するように選択した場合は、トレーニング データセット名を `oj_data_small_inference` と指定します。それ以外の場合は、`oj_data_inference` を指定します。小さいデータセットから始めることをお勧めします。

In [None]:
dataset_name = 'oj_data_small_inference'

In [None]:
from azureml.core.dataset import Dataset

dataset = Dataset.get_by_name(ws, name=dataset_name)
dataset_input = dataset.as_named_input(dataset_name)

## 4.0 予測パイプライン用の ParallelRunStep の作成
トレーニング パイプラインで行ったように、ParallelRunStep を作成して予測プロセスを並列化します。このコードは、前回の手順と基本的に同じですが、train.py ではなく [**forecast.py**](scripts/forecast.py) を並列化します。引き続き時系列スキーマ (タイムスタンプ列名、時系列 ID 列名など) を予測スクリプトに渡す必要があることに注意してください。

ただしトレーニング スクリプトとは異なり、予測スクリプトにターゲット列の名前は必要ありません。実際の予測シナリオではターゲットの実際の値は存在しないため、予測パイプラインは予測値を返します。ただし、予測パイプラインは、推論データセットに存在する場合であっても、実績値を返すことができます。

### 4.1 ParallelRunStep の環境を設定する

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies

forecast_env = Environment(name="many_models_environment")
forecast_conda_deps = CondaDependencies.create(pip_packages=['sklearn', 'pandas', 'joblib', 'azureml-defaults', 'azureml-core', 'azureml-dataprep[fuse]'])
forecast_env.python.conda_dependencies = forecast_conda_deps

### 4.2 コンピュート ターゲットの選択

これは[セットアップ ノートブック](../00_Setup_AML_Workspace.ipynb#3.0-Create-compute-cluster)で作成したコンピュート クラスタです。

In [None]:
cpu_cluster_name = "cpucluster"

In [None]:
from azureml.core.compute import AmlCompute

compute = AmlCompute(ws, cpu_cluster_name)

### 4.3 ParallelRunConfig の設定

In [None]:
from azureml.pipeline.steps import ParallelRunConfig 

process_count_per_node = 6
node_count = 1
timeout = 180

parallel_run_config = ParallelRunConfig(
    source_directory='./scripts',
    entry_script='forecast.py',
    mini_batch_size='1',
    run_invocation_timeout=timeout, 
    error_threshold=10,
    output_action='append_row', 
    environment=forecast_env, 
    process_count_per_node=process_count_per_node, 
    compute_target=compute, 
    node_count=node_count
)

### 4.4 ParallelRunStep の構成

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import ParallelRunStep 

output_dir = PipelineData(name='forecasting_output', datastore=dstore)

parallel_run_step = ParallelRunStep(
    name="many-models-forecasting",
    parallel_run_config=parallel_run_config,
    inputs=[dataset_input],
    output=output_dir,
    allow_reuse=False,
    arguments=['--timestamp_column', 'WeekStarting',
               '--timeseries_id_columns', 'Store', 'Brand',
               '--model_type', 'lr']
)

## 5.0 予測をコピーするステップの作成

予測パイプラインには、*parallel_run_step.txt* から別のコンテナー内の CSV ファイルに予測をコピーする 2 番目の手順が含まれています。この手順は簡単ですが、パイプラインにステップを追加して、予測を別のデータストアにアップロードしたり、出力に追加の変換を行う方法を示します。

### 5.1 データ参照の作成
まず、パイプラインの出力を保持し、それに対する参照を取得する **predictions** という名前のデータストアを作成します。

In [None]:
from azureml.data.data_reference import DataReference

output_dstore = Datastore.register_azure_blob_container(
    workspace=ws, 
    datastore_name="predictions",
    container_name="predictions",
    account_name=dstore.account_name,
    account_key=dstore.account_key,
    create_if_not_exists=True
)

output_dref = DataReference(output_dstore)

### 5.2 PythonScriptStep の作成
次に、[PythonScriptStep](https://docs.microsoft.com/ja-jp/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py) を定義し、新しく作成したデータストアと *parallel_run_step.txt* の場所を指定します。コピー スクリプトは時系列スキーマも使用します。その理由は、コピー スクリプトが予測データのヘッダー行を作成するため、列名を知る必要があるためです。ターゲット列は、推論に使用されたデータに存在していたため、ここに渡されます。

In [None]:
from azureml.pipeline.steps import PythonScriptStep

upload_predictions_step = PythonScriptStep(
    name="copy_predictions",
    script_name="copy_predictions.py",
    compute_target=compute,
    source_directory='./scripts',
    inputs=[output_dref, output_dir],
    allow_reuse=False,
    arguments=['--parallel_run_step_output', output_dir,
               '--output_dir', output_dref,
               '--target_column', 'Quantity',
               '--timestamp_column', 'WeekStarting',
               '--timeseries_id_columns', 'Store', 'Brand']
)

## 6.0 パイプラインの実行

In [None]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallel_run_step, upload_predictions_step])
run = experiment.submit(pipeline)

In [None]:
# 前の予測実行への参照を取得するには、次のコードをコメント解除します。
#from azureml.pipeline.core import PipelineRun
#run = PipelineRun(experiment, '<pipeline run id>')

In [None]:
# 実行が完了するまで待機
run.wait_for_completion(show_output=False, raise_on_error=True)

## 7.0 予測パイプラインの結果を表示する
予測を確認するために、*parallel_run_step.txt* をダウンロードし、結果をデータフレームに読み込み、予測を視覚化します。上記で作成した予測コンテナから結果をダウンロードすることもできます。

### 7.1 ローカルに parallel_run_step.txt をダウンロードする
Azure Machine Learning クラスターに送信された実行が完了するまで待つ必要があります。実行ステータスは、https://ml.azure.com でモニターできます。

In [None]:
import os
from pathlib import Path

def download_predictions(run, target_dir=None, step_name='many-models-forecasting', output_name='forecasting_output'):
    stitch_run = run.find_step_run(step_name)[0]
    port_data = stitch_run.get_output_data(output_name)
    port_data.download(target_dir, show_progress=True, overwrite=True)
    return os.path.join(target_dir, 'azureml', stitch_run.id, output_name)

file_path = download_predictions(run, 'output')
file_path

### 7.2 ファイルを dataframe に変換する

In [None]:
import pandas as pd

df = pd.read_csv(file_path + '/parallel_run_step.txt', sep=" ", header=None)
df.columns = ['WeekStarting', 'Predictions', 'Quantity', 'Store', 'Brand']
df['WeekStarting'] = pd.to_datetime(df['WeekStarting'])
df.head()

### 7.3 予測を視覚化する
まず、ブランド別予測数量の分布を見てみましょう：

In [None]:
import seaborn as sns
fig = sns.violinplot(x=df['Brand'], y=df['Predictions'], data=df)
fig.set_title('Predictions by Brand')

次に、これらの予測を時間の経過とともに見てみましょう：

In [None]:
import matplotlib.pyplot as plt

week = df.groupby(['WeekStarting', 'Brand'])
week = week['Predictions'].sum()
week = pd.DataFrame(week.unstack(level=1))

week.plot()
plt.title('Total Predictions by Brand')
plt.xticks(rotation=40)
plt.legend(loc='upper right')
plt.xlabel('Week')
plt.ylabel('Total Predictions')
plt.show()

そこから、結果をトリミングして個々のブランドを見ることができます：

In [None]:
store = 1001
df_1001 = df[df['Store'] == store]

brands = df_1001.groupby(['WeekStarting','Brand'])
brands= brands['Predictions'].sum()
brands= pd.DataFrame(brands.unstack(level=1))

brands.plot()
plt.legend(loc='upper right', labels=brands.columns.values)
plt.xticks(rotation=40)
plt.title('Predictions for Store 1001')
plt.xlabel('Week')
plt.ylabel('Predicted Quantity')
plt.show()

## 8.0 パイプラインの発行とスケジュール (オプション)


### 8.1 パイプラインを発行する
満足できるパイプラインを作成したら、パイプラインを発行して、後からプログラムで呼び出すことができます。パイプラインの発行と呼び出しの詳細については、この[チュートリアル](https://docs.microsoft.com/ja-jp/azure/machine-learning/how-to-create-machine-learning-pipelines#publish-a-pipeline)を参照してください。

In [None]:
# published_pipeline = pipeline.publish(name = 'forecast_many_models',
#                                      description = 'forecast many models',
#                                      version = '1',
#                                      continue_on_step_failure = False)

### 8.2 パイプラインのスケジュール実行
また、時間ベースまたは変更ベースのスケジュールで実行するように[パイプラインをスケジュール](https://docs.microsoft.com/ja-jp/azure/machine-learning/how-to-trigger-published-pipeline)することもできます。これは、毎月、またはデータドリフトなどの別のトリガーに基づいて、モデルを自動的に再トレーニングするために使用できます。

In [None]:
# from azureml.pipeline.core import Schedule, ScheduleRecurrence
    
# training_pipeline_id = published_pipeline.id

# recurrence = ScheduleRecurrence(frequency="Week", interval=1, start_time="2020-01-01T09:00:00")
# recurring_schedule = Schedule.create(ws, name="forecasting_pipeline_recurring_schedule", 
#                             description="Schedule Forecasting Pipeline to run on the first day of every week",
#                             pipeline_id=training_pipeline_id, 
#                             experiment_name=experiment.name, 
#                             recurrence=recurrence)