Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

## リアルタイム コンテンツ ベース の パーソナライゼーション モデルをデプロイする

このノートブックでは、企業が機械学習を使用したレコメンデーション システムを使用し、顧客に対してコンテンツ ベースのパーソナライゼーションを自動化する方法を示します。Azure Databricks は、ユーザーがアイテムに関与する確率を予測するモデルをトレーニングするために使用されます。さらにこの推論を使用して、ユーザーが最も消費する可能性が高いコンテンツに基づいてアイテムをランク付けできます。<br><br>
このノートブックは、[MMLSpark-LightGBM-Criteo notebook](../02_model/mmlspark_lightgbm_criteo.ipynb)でトレーニングされたコンテンツベースのパーソナライゼーションモデルなど、Spark ベースのモデル用のスケーラブルなリアルタイム スコアリング サービスを作成します。

<br><br>
### アーキテクチャ
<img src="https://github.com/c-nova/recommenders/tree/master/notebooks/05_operationalize/lightgbm_criteo_arch_jp.svg" alt="アーキテクチャ">

### 構成要素
このアーキテクチャでは、以下の構成要素を使用します:<br>
- [Azure Blob Storage](https://azure.microsoft.com/ja-jp/services/storage/blobs/) は非構造データの大量の蓄積に最適化されたストレージです。このケースでは、入力データの格納に使用されます。<br>
- [Azure Databricks](https://azure.microsoft.com/ja-jp/services/databricks/) はマネージドな Apache Spark クラスタであり、このケースではモデルのトレーニング及び評価に使用されます。<br>
- [Azure Machine Learning service](https://azure.microsoft.com/ja-jp/services/machine-learning-service/) はこのシナリオでは機械学習モデルの登録に使用されます。<br>
- [Azure Container Registry](https://azure.microsoft.com/ja-jp/services/container-registry/) は本番環境でモデルの提供に使用するために、コンテナとしてスコアリング スクリプトをパッケージするために使用されます。<br>
- [Azure Kubernetes Service](https://azure.microsoft.com/ja-jp/services/kubernetes-service/) はトレーニングされたモデルを Web または App Services として展開するために使用されます。<br>

## 前提条件
このノートブックを実行するには、以下の項目を前提としています:

1. モデルは[mmlspark_lightgbm_criteo](https://aka.ms/recommenders/lgbm-criteo-training)ノートブックに示すように事前にトレーニングされています。
2. このノートブックは、前提 1 でノートブックを実行するために使用されたものと同様に、 Azure Databricks ワークスペースで実行することを想定しています。
3. (MML Spark と reco_utils が両方ともインストールされている)運用化のために準備された Databricks クラスタを使用します。 
- 詳細については、[セットアップ](https://github.com/c-nova/Recommenders/blob/master/SETUP.md) の手順を参照してください。
4. Azure Machine Learning Service ワークスペースは、モデルトのレーニングに使用される Azure Databricks ワークスペースと同じリージョンにセットアップされる必要があります。
- 詳細については、[ワークスペースの作成](https://docs.microsoft.com/ja-jp/azure/machine-learning/service/setup-create-workspace)を参照してください。
5. Azure ML ワークスペース config.json は、Databricks では `dbfs:/aml_config/config.json` にアップロードされます。
- [環境の構成](https://docs.microsoft.com/ja-jp/azure/machine-learning/service/how-to-configure-environment)および[Databricks CLI](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#access-dbfs-with-the-databricks-cli)を参照してください。
6. Azure Container Instance (ACI) は使用中の Azure サブスクリプションで登録されます
- 詳細については、[サポートされるサービス](https://docs.microsoft.com/ja-jp/azure/azure-resource-manager/resource-manager-supported-services#portal)を参照してください。

## スコア サービスのステップ
この例では、"スコアリング サービス" は、docker コンテナーによって実行されるファンクションです。JSON 書式のペイロードを使用して POST 要求を受け取り、事前に推定されたモデルに基づいてスコアを生成します。この例では、数値とカテゴリの一連のフィーチャーに基づいてユーザー アイテムの相互作用の確率を予測する、事前に推定したモデルを使用します。そのモデルは PySpark を使用してトレーニングされているため、[MML Spark 提供](https://github.com/Azure/mmlspark/blob/master/docs/mmlspark-serving.md) を使用してモデルを実行する単一インスタンス(Dcoker コンテナ内)にSparkセッションを作成します。受信した入力データに対して、相互作用の確率を返します。Azure Machine Learning を使用して、Docker コンテナーを作成して実行します。

スコアリング サービスを作成するには、次の手順を実行します。

1. Azure Machine KLearning ワークスペースのセットアップと認証
2. 以前にトレーニングされたモデルをシリアル化し、Azure モデル 登録に追加する
3. モデルを実行する 'スコアリング サービス' スクリプトを定義する
4. スクリプトに必要なすべての前提条件を定義する
5. モデル、ドライバー スクリプト、および前提条件を使用して Azure コンテナ イメージを作成する
6. スケーラブルなプラットフォームである Azure Kubernetes サービスにコンテナ イメージをデプロイする 
7. サービスのテスト

### ライブラリと変数のセットアップ

以下のいくつかのセルは、環境と変数を初期化します: ここで関連するライブラリをインポートし、変数を設定します。

In [1]:
import os
import json
import shutil

from reco_utils.dataset.criteo import get_spark_schema, load_spark_df
from reco_utils.azureml.aks_utils import qps_to_replicas, replicas_to_qps, nodes_to_replicas

from azureml.core import Workspace
from azureml.core import VERSION as azureml_version

from azureml.core.model import Model
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core.webservice import Webservice, AksWebservice
from azureml.core.image import ContainerImage
from azureml.core.compute import AksCompute, ComputeTarget

from math import floor

# Check core SDK version number
print("Azure ML SDK version: {}".format(azureml_version))

Azure ML SDK version: 1.0.45


## スコアリング サービス 変数の構成

In [10]:
MODEL_NAME = 'lightgbm_criteo.mml'  # この名前は、推論用ノートブックにパイプライン モデルを保存するために使用される名前と完全に一致する必要があります
MODEL_DESCRIPTION = 'LightGBM Criteo Model'

# AzureML アセットのセットアップ (名前はスペースを使用しない小文字の英数字で、3 ～ 32 文字の間でなければなりません)
# Azure ML Web サービス
SERVICE_NAME = 'lightgbm-criteo'
# Azure ML コンテナ イメージ
CONTAINER_NAME = SERVICE_NAME
CONTAINER_RUN_TIME = 'spark-PY'
# Azure Kubernetes Service (AKS)
AKS_NAME = 'predict-aks'

# 使用するその他のファイルの名前
CONDA_FILE = "deploy_conda.yaml"
DRIVER_FILE = "mmlspark_serving.py"

## AzureML ワークスペースのセットアップ
ワークスペース構成は、ポータルから取得し、Databricks にアップロードできます<br>
[AzureML on Databricks](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-configure-environment#azure-databricks)を参照してください。

In [None]:
ws = Workspace.from_config('/dbfs/aml_config/config.json')

## シリアル化されたモデルを準備する

docker コンテナを作成するには、最初に作成する docker コンテナーがアクセスできるように、前の手順で推定したモデルを準備します。これを行うには、モデルをワークスペースに *登録します* (詳細については、Azure ML [ドキュメント](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-model-management-and-deployment)を参照してください)。

モデルは dbfs のディレクトリとして保存されており、登録する前にプロセスを容易にするためにいくつかの追加手順を実行します。

### 入力スキーマ

Spark サービングには、生の入力データのスキーマが必要です。したがってスキーマを取得し、モデル ディレクトリに追加のファイルとして格納します。


In [11]:
raw_schema = get_spark_schema()
with open(os.path.join('/dbfs', MODEL_NAME, 'schema.json'), 'w') as f:
  f.write(raw_schema.json())

### dbfs からローカルにモデルをコピーする

ローカル ファイル API を使用して DBFS 上のファイルにアクセスできますが、ローカル ファイル API は 2 GB 未満のファイルにのみアクセスできるため、dbfs との間で保存されたモデルを明示的にコピーする方が安全です (詳細は[こちら](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html#access-dbfs-using-local-file-apis)を参照してください。

In [13]:
model_local = os.path.join(os.getcwd(), MODEL_NAME)
dbutils.fs.cp('dbfs:/' + MODEL_NAME, 'file:' + model_local, recurse=True)

### モデルの登録

これで、Azure Machine Learning ワークスペースにモデルを登録する準備ができました。

In [15]:
# まずデータ転送を最小限に抑えるためにモデル ディレクトリを圧縮します
zip_file = shutil.make_archive(base_name=MODEL_NAME, format='zip', root_dir=model_local)

# モデルの登録
model = Model.register(model_path=zip_file,  # ここではローカル ファイルをポイントします
                       model_name=MODEL_NAME,  # これはモデルの登録名です
                       description=MODEL_DESCRIPTION,
                       workspace=ws)

print(model.name, model.description, model.version)

## スコアリング スクリプトの定義

次に、サービスが呼び出されたときに実行されるドライバー スクリプトを作成する必要があります。スコアリングのために定義する必要がある関数は `init()` と `run()` です。`init()` 関数は、サービスの作成時に実行され、サービスが呼び出されるたびに `run()` 関数が実行されます。

この例では、`init()` 関数を使用してすべてのライブラリを読み込み、Spark セッションを初期化し、Spark ストリーミング サービスを開始し、モデル パイプラインを読み込みます。`run()` メソッドを使用して入力を Spark ストリーミング サービスにルーティングし、予測 (この場合はインタラクションの確率) を生成し、出力を返します。

In [17]:
driver_file = '''
import os
import json
from time import sleep
from uuid import uuid4
from zipfile import ZipFile

from azureml.core.model import Model
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import requests


def init():
    """One time initialization of pyspark and model server"""

    spark = SparkSession.builder.appName("Model Server").getOrCreate()
    import mmlspark  # this is needed to load mmlspark libraries

    # extract and load model
    model_path = Model.get_model_path('{model_name}')
    with ZipFile(model_path, 'r') as f:
        f.extractall('model')
    model = PipelineModel.load('model')

    # load data schema saved with model
    with open(os.path.join('model', 'schema.json'), 'r') as f:
        schema = StructType.fromJson(json.load(f))

    input_df = (
        spark.readStream.continuousServer()
        .address("localhost", 8089, "predict")
        .load()
        .parseRequest(schema)
    )

    output_df = (
        model.transform(input_df)
        .makeReply("probability")
    )

    checkpoint = os.path.join('/tmp', 'checkpoints', uuid4().hex)
    server = (
        output_df.writeStream.continuousServer()
        .trigger(continuous="30 seconds")
        .replyTo("predict")
        .queryName("prediction")
        .option("checkpointLocation", checkpoint)
        .start()
    )

    # let the server finish starting
    sleep(1)


def run(input_json):
    try:
        response = requests.post(data=input_json, url='http://localhost:8089/predict')
        result = response.json()['probability']['values'][1]
    except Exception as e:
        result = str(e)
    
    return json.dumps({{"result": result}})
    
'''.format(model_name=MODEL_NAME)

# check syntax
exec(driver_file)

with open(DRIVER_FILE, "w") as f:
    f.write(driver_file)

## 依存関係の定義

次に、ドライバー スクリプトで必要な依存関係を定義します。

In [19]:
# azureml-sdk は登録されたモデルを読み込むために必要です
conda_file = CondaDependencies.create(pip_packages=['azureml-sdk', 'requests']).serialize_to_string()

with open(CONDA_FILE, "w") as f:
    f.write(conda_file)

## イメージを作成する

`ContainerImage` クラスを使用して、最初に定義されたドライバーと依存関係を使用してイメージを構成し、次に後で使用するイメージを作成します。<br>
イメージをビルドすると、Docker を使用してローカルにダウンロードおよびデバッグすることができます。[トラブルシューティング方法](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-troubleshoot-deployment)を参照してください。

In [21]:
image_config = ContainerImage.image_configuration(execution_script=DRIVER_FILE, 
                                                  runtime=CONTAINER_RUN_TIME,
                                                  conda_file=CONDA_FILE,
                                                  tags={"runtime":CONTAINER_RUN_TIME, "model": MODEL_NAME})

image = ContainerImage.create(name=CONTAINER_NAME,
                              models=[model],
                              image_config=image_config,
                              workspace=ws)

image.wait_for_creation(show_output=True)

## サービスの作成

イメージを作成したら、Azure Kubernetes サービス (AKS) を構成し、イメージを AKS Web Service としてデプロイします。

**注** 私たちは `Webservice.deploy_from_model()` ファンクションを使用して、登録されたモデルとimage_configuration から直接サービスを作成することが*可能*です。ここではイメージを明示的に作成し、次の 3 つの理由から `deploy_from_image()` を使用します。

1. 実際に行われているステップの面でより透明性を提供します。
2. これにより多くの柔軟性および制御を提供します。たとえば、作成するサービスとは無関係な名前のイメージを作成できます。これは、イメージを複数のサービスで使用する場合に役立ちます。
3. これにより、潜在的により速い反復およびより多くの移植性を得られます。イメージが作成されると、まったく同じコードで新しいデプロイを作成することができます。

### セットアップと計画

本番サービスを設定する際には、まずサポートする負荷量を見積もる必要があります。それを見積るためには、1 回の呼び出しにかかる時間を見積もる必要があります。この例では、いくつかのローカル テストを行い、1 つのクエリの処理に約 100 ミリ秒掛かると見積もっています。

いくつかの追加の仮定に基づいて、1 秒あたりの目標数のクエリ(qps) をサポートするために必要なレプリカの数を見積もることができます。

**注:** この見積もりは開始点としての概算数として使用する必要があり、我々はより良い見積りにブラッシュアップするために、その後のロードテストでパフォーマンスを検証することができます。詳細については、こちらの[ドキュメント](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#aks)を参照してください。


この種の計算をサポートするヘルパー関数をいくつか記述し、1 つのクエリを完了する時間の見積もりとして 100 ミリ秒を使用して、1 秒あたり 25、50、100、200、および 350 クエリの読み込みをサポートするために必要なレプリカの数を推定します。

In [11]:
all_target_qps = [25, 50, 100, 200, 350]
query_processing_time = 0.1  ## 処理/秒
replica_estimates = {t: qps_to_replicas(t, query_processing_time) for t in all_target_qps}

顧客ベースの規模やその他の考慮事項(トラフィックを増やす可能性のある今後の予定など)に基づいて、サポートする最大負荷を決定します。この例では、1 秒あたり 100 のクエリをサポートする必要があり、対応するレプリカの数 (上記の見積もりに基づいて 15 個) を使用する必要があることを示します。

レプリカの数がわかったら、Azure Kubernetes サービス内に十分なリソース (コアとメモリ) があり、その数のレプリカをサポートする必要があります。その数を見積もるためには、各レプリカに割り当てられるコアの数を知る必要があります。コアごとに複数のレプリカがあるユース ケースが多いため、この数は小数点数になる可能性があります。詳細は[こちら](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units)をご覧ください。以下の Web サービスを作成するときは、各レプリカに 0.3 の 'cpu_cores' と 0.5 GB のメモリを割り当てます。15 のレプリカをサポートするには、`15*0.3` コアと `15*0.5` GB のメモリが必要です。


In [12]:
cpu_cores_per_replica = 0.3
print('{} cores required'.format(replica_estimates[100]*cpu_cores_per_replica))
print('{} GB of memory required'.format(replica_estimates[100]*0.5))

4.5 cores required
7.5 GB of memory required


### Azure Kubernetes サービスをプロビジョニングする

必要なコア数とメモリ量の見積もりができたので、AKS クラスターを構成して作成します。デフォルトでは、`AksCompute.provisioning_configuration()` は `vm_size='Standard_D3_v2'` を持つ 3 つのエージェントを持つ構成を作成します。各 Standard_D3_v2 仮想マシンには 4 つのコアと 14 GB のメモリがあるので、デフォルトでは 12 コアと 42 GB のメモリを組み合わせたクラスタが作成され、推定負荷要件を満たすのに十分です。

**注**: この特定のケースでは、負荷要件が 4.5 コアに過ぎない場合でも、AKS クラスターの 12 コアを下回 **らない** 必要があります。12 コアは、Web サービスに必要な AKS のコアの最小数です。[詳細](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#aks)については、ドキュメントを参照してください。`agent_count` パラメーターと `vm_size` パラメーターを使用して、負荷要件で必要な場合はコア数を 12 を超える値を増やすことができますが、それらを使用して減らすことはしないでください。

In [23]:
# 最初に AKS コンピュートを作成

# 既定の構成を使用する (カスタマイズするパラメーターを提供することもできます)
prov_config = AksCompute.provisioning_configuration()

# クラスタの作成
aks_target = ComputeTarget.create(
  workspace=ws, 
  name=AKS_NAME, 
  provisioning_configuration=prov_config
)

aks_target.wait_for_completion(show_output=True)

print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

### 検討事項

推定される負荷要件は Azure Machine Learning によって設定された最小値より小さいため、Web サービスで使用するレプリカの数を見積もる別の方法を検討する必要があります。これが AKS クラスターで実行される唯一のサービスである場合、すべてのコンピューティング リソースを活用しないことでリソースを浪費している可能性があります。最初は、予想される負荷を使用して、使用するレプリカの数を見積もります。このアプローチの代わりに、クラスター内のコア数を使用して、サポートできるレプリカの最大数を推定することもできます。

レプリカの最大数を見積もるためには、ベースとなる kubernetes の操作とノードのオペレーティング システムとコア機能の各ノードにオーバーヘッドがあることを考慮する必要があります。この場合は 10% のオーバーヘッドを想定していますが、詳細については[こちら](https://docs.microsoft.com/en-us/azure/aks/concepts-clusters-workloads)を参照してください。

**注** この例ではコアを使用していますが、代わりにメモリ要件を活用することもできます。


In [13]:
max_replicas_12_cores = nodes_to_replicas(
    n_cores_per_node=4, n_nodes=3, cpu_cores_per_replica=cpu_cores_per_replica
)

クラスターがサポートするレプリカの数が判明すると、AKS クラスタがサポートできると考えられる 1 秒あたりのクエリを推定できます。

In [14]:
replicas_to_qps(max_replicas_12_cores, query_processing_time)

140

### Web サービスを作成する

次に、Web サービスを構成して作成します。この構成では、各レプリカが `cpu_cores=cpu_cores_per_replica` (デフォルトは `cpu_cores=0.1`) を設定するとします。この値はこのサービスの経験と事前のテストに基づいて調整しています。

`AksWebservice.deploy_configuration()` に引数が渡されない場合は、`autoscale_enabled=True` と共に `autoscale_min_replicas=1` と `autoscale_max_replicas=10` が設定されます。最大値は、1 秒あたり 100 クエリをサポートするための最小要件を満たしていないため、調整する必要があります。この値は、負荷 (15) に基づいて見積もりを調整するか、AKS クラスター (36) でサポートできる数に基づいて見積もりを調整できます。この例では、AKS クラスターを他のタスクまたはサービスに使用できるように、負荷に基づく値に設定します。

In [24]:
webservice_config = AksWebservice.deploy_configuration(cpu_cores=cpu_cores_per_replica,
                                                       autoscale_enabled=True,
                                                       autoscale_max_replicas=replica_estimates[100])

# 作成したイメージを使用してサービスを展開する
aks_service = Webservice.deploy_from_image(
  workspace=ws, 
  name=SERVICE_NAME,
  deployment_config=webservice_config,
  image=image,
  deployment_target=aks_target
)

aks_service.wait_for_deployment(show_output=True)

## サービスをテストする

次に、`サンプル` データのデータを使用してサービスをテストできます。

このサービスは JSON をペイロードとして想定しているので、サンプル データを取得し、ディクショナリに変換してからサービス エンドポイントに送信します。

In [26]:
# URI を表示
url = aks_service.scoring_uri
print('AKS URI: {}'.format(url))

# aks_service のキーのいずれかを使用した認証のセットアップ
headers = dict(Authorization='Bearer {}'.format(aks_service.get_keys()[0]))

In [27]:
#いくつかのサンプルデータを使用
df = load_spark_df(size='sample', spark=spark, dbutils=dbutils)
data = df.head().asDict()
print(data)

In [28]:
# AKS クラスターに要求を送信
response = requests.post(url=url, json=data, headers=headers)
print(response.json())

### サービスを削除する

全てが完了後、コストを最小限に抑えるためにサービスを削除できます。上記の同じコマンドを使用して、イメージからいつでも再デプロイできます。

In [30]:
# Web サービスを削除するには、次の行のコメントを解除します
# aks_service.delete()

In [31]:
aks_service.state