# 分散モデル推論を用いた OpenVINO の使用
このノートブックは、[torchvision.models](https://pytorch.org/docs/stable/torchvision/models.html#torchvision.models.resnet50)からResNet-50モデルを使用して、OpenVINOで分散モデル推論を行う方法をデモンストレーションします。入力データとして画像ファイルを使用します。

このガイドは以下のセクションで構成されています：

* **推論のための訓練済みモデルの準備。**
* **Spark DataFrames に databricks-dataset からデータをロード。**
* **Pandas UDF を用いたモデル推論の実行。**

**注意:**
* CPU対応のApache Sparkクラスタ上でノートブックを実行するには、変数`cuda = False`に変更してください。
* GPU対応のApache Sparkクラスタ上でノートブックを実行するには、変数`cuda = True`に変更してください。
* DBR 13.3 LTS ML, Standard_D16s_v5（ドライバー１台、ワーカー４台）

In [0]:
%pip install openvino

In [0]:
dbutils.library.restartPython()

### 必要なライブラリをインポート

In [0]:
import os
import shutil
import uuid
from typing import Iterator, Tuple

import pandas as pd

import torch
from torch.utils.data import Dataset
from torchvision import datasets, models, transforms
from torchvision.datasets.folder import default_loader  # private API

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import ArrayType, FloatType, IntegerType

import numpy as np

import openvino as ov

### GPUを使用する場合はTrueに設定

In [0]:
cuda = False

use_cuda = cuda and torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

### 学習済みモデルを推論用に準備する

ドライバーノードにResNet50をロードし、OpenVINOのフォーマットに変換してから、ワークスペースに保存する。

In [0]:
# Save openvino.runtime.Model object on disk
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model.eval()

# Create OpenVINO Core object instance
core = ov.Core()

# Convert model to openvino.runtime.Model object
ov_model = ov.convert_model(model)

OV_MODEL_PATH = '/Workspace/Users/hiroshi.ouchiyama@databricks.com/ov_model/ov_resnet50_dynamic.xml'
ov.save_model(ov_model, OV_MODEL_PATH)

モデルインスタンス作成用の関数を定義

In [0]:
def get_model_for_eval():
  # Create OpenVINO Core object instance
  core = ov.Core()
  ov_model = core.read_model(model=OV_MODEL_PATH)

  # Load OpenVINO model on device
  compiled_model = core.compile_model(ov_model, 'CPU')

  return compiled_model

### Spark DataFramesへのdatabricks-datasetからのデータのロード
例として、TensorFlowチームによる[flowers dataset](https://www.tensorflow.org/datasets/catalog/tf_flowers)を使用します。これには、クラスごとに1つずつ、計5つのサブディレクトリの下に保存された花の写真が含まれています。これは、簡単にアクセスできるようにDatabricks Datasetsの`dbfs:/databricks-datasets/flower_photos`の下にホストされています。

In [0]:
dataset_dir = "/dbfs/databricks-datasets/flower_photos/"
output_file_path = "/tmp/predictions"
files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(dataset_dir) for f in filenames if os.path.splitext(f)[1] == '.jpg']
print(f'画像ファイルの総数は　{len(files)}　枚です。')

### シングルノード、または、ドライバーノード上でのみ推論させる

In [0]:
transform = transforms.Compose([
  transforms.Resize(224),
  transforms.CenterCrop(224),
  transforms.ToTensor(),
  transforms.Normalize(mean=[0.485, 0.456, 0.406],
                      std=[0.229, 0.224, 0.225])
])

model = get_model_for_eval()

predictions = []
for image_path in files:
  image = default_loader(image_path)
  image = transform(image)
  batch = image.unsqueeze(0)

  prediction = model(batch)[0]
  class_id = prediction.argmax(axis=1)
  score = prediction[np.arange(prediction.shape[0]), class_id]
  predictions.append((class_id, score))

print(list(zip(files, predictions)))

### ここからようやく本題
### 次にワーカーのノードを利用して分散モデル推論を実施する
### Databricksでは分散モデル推論にはPandas UDF（別名：Vectorized UDF）を使うことが推奨されています。

分散処理のために、画像パスのDataFrameを作成する。

再パーティショニングの際のパーティション数はワーカーノード数とおなじ、または最小限の倍数にすべし。このサンプルでは４つのワーカーノード使用しているので４に設定。

In [0]:
files_df = spark.createDataFrame(
  map(lambda path: (path,), files), ["path"]
).repartition(4)  # number of partitions should be a small multiple of total number of nodes　

display(files_df.limit(10))

各画像パスのDataFrameが４つのパーティションにほぼ等分されているのが確認できます。

In [0]:
from pyspark.sql.functions import spark_partition_id

display(files_df.withColumn('partition', spark_partition_id()).groupBy('partition').count().orderBy('partition'))

### Pandas UDFを作成します。

Pandas UDFはメモリ内のデータ形式にApache Arrowを使っているので、明示的にArrowの使用をOnにします。

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

かつ、Pandas UDFに一度に渡すレコード数（バッチ）の最大値を`spark.sql.execution.arrow.maxRecordsPerBatch`に指定します。今回は５１２を設定しています。

In [0]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "５１２")

カスタム PyTorch データセットクラスを作成します。

以下のドキュメントの通りですが、パフォーマンスチューニングのヒントとして、PyTorchであれば、データロード用に`torch.utils.data.DataLoader`の使用が推奨されているので、そのお作法に則ります。
- https://docs.databricks.com/en/machine-learning/model-inference/dl-model-inference.html
- https://docs.databricks.com/en/machine-learning/model-inference/model-inference-performance.html

In [0]:
class ImageDataset(Dataset):
  def __init__(self, paths, transform=None):
    self.paths = paths
    self.transform = transform
  def __len__(self):
    return len(self.paths)
  def __getitem__(self, index):
    image = default_loader(self.paths[index])
    if self.transform is not None:
      image = self.transform(image)
    return image

モデル推論のための関数を定義する。

今回のサンプル画像データは全部で3670枚で、ワーカーノードの数と同じ４つのパーティションに分割しているので、各ワーカーノードが９１０〜９２０枚ほどの画像を処理します。その中から512枚の画像（正確には画像パス）を取り出してきて、Pandas UDFにpadas.Seriesデータとして入力します。Pandas UDF内では、その５１２個の画像パスからバッチサイズごとに画像パスを取り出し、当該画像ファイルをロードしてTensor化して、それをモデルで推論します。推論結果から欲しい情報を取り出した上で、それをPandas.Seriesとしてパックして、返します。

In [0]:
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(paths: pd.Series) -> pd.Series:
  transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                       std=[0.229, 0.224, 0.225])
  ])

  images = ImageDataset(paths, transform=transform)
  loader = torch.utils.data.DataLoader(images, batch_size=1, num_workers=8)
  model = get_model_for_eval()

  all_predictions = []
  with torch.no_grad():
    for batch in loader:
      predictions = model(batch)[0]
      class_id = predictions.argmax(axis=1)
      score = predictions[np.arange(predictions.shape[0]), class_id]

      for result in np.stack((class_id, score), axis=1):
        all_predictions.append(result)

  return pd.Series(all_predictions)

モデル推論を実行し、結果をDisplayします。

In [0]:
predictions_df = files_df.withColumn('prediction', predict_batch_udf(col('path')))
display(predictions_df)

以上、Databricks上で分散モデル推論を実施する際に、Pandas UDFを使用する方法をご紹介しました。