<a href="https://colab.research.google.com/github/Akihito-IRIE/google-colab-book/blob/main/Armadillo_IoT_G4_model_create.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# はじめに

本ドキュメントではArmadillo-IoT ゲートウェイ G4上で使用できるTFLite形式の推論モデルを、既存のモデルをベースに学習する転移学習を行い作成するサンプルを紹介します。

内容はほぼTensorFlowのObject Detection APIのサンプルそのままですが、最後にSavedModelをTFLite形式に変換する手順を追加しています。

# セットアップ



## Imports

In [None]:
!pip install -U --pre tensorflow=="2.7.0"

In [None]:
import os
import pathlib

# 存在しなければtensoorflowのmodelsリポジトリをクローンします
if "models" in pathlib.Path.cwd().parts:
  while "models" in pathlib.Path.cwd().parts:
    os.chdir('..')
elif not pathlib.Path('models').exists():
  !git clone --depth 1 https://github.com/tensorflow/models

In [None]:
# Object Detection APIをインストールします
%%bash
cd models/research/
protoc object_detection/protos/*.proto --python_out=.
cp object_detection/packages/tf2/setup.py .
python -m pip install . --use-deprecated=legacy-resolver

In [None]:
# 以後の作業に必要なライブラリをimportします
import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.utils import config_util
from object_detection.builders import model_builder

%matplotlib inline

##ユーティリティ

以後の作業に必要な関数を定義します。

In [None]:
def load_image_into_numpy_array(path):
  """Load an image from file into a numpy array.

  Puts image into numpy array to feed into tensorflow graph.
  Note that by convention we put it into a numpy array with shape
  (height, width, channels), where channels=3 for RGB.

  Args:
    path: a file path.

  Returns:
    uint8 numpy array with shape (img_height, img_width, 3)
  """
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
  """Wrapper function to visualize detections.

  Args:
    image_np: uint8 numpy array with shape (img_height, img_width, 3)
    boxes: a numpy array of shape [N, 4]
    classes: a numpy array of shape [N]. Note that class indices are 1-based,
      and match the keys in the label map.
    scores: a numpy array of shape [N] or None.  If scores=None, then
      this function assumes that the boxes to be plotted are groundtruth
      boxes and plot all boxes as black with no classes or scores.
    category_index: a dict containing category dictionaries (each holding
      category index `id` and category name `name`) keyed by category indices.
    figsize: size for the figure.
    image_name: a name for the image file.
  """
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.6)
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)


## 教師データの準備

転移学習をするために必要な、教師データを作成します。

物体検出において、以下の2つを合わせたデータを教師データとしています。
* 認識させたい物が写った画像群
* 画像のどこに物があるかを表すアノテーションデータ

そのため、まずは画像データを収集します。

今回のサンプルアプリケーションでは、TensorFlow公式の[githubリポジトリ内の画像](https://github.com/tensorflow/models/tree/master/research/object_detection/test_images/ducky/train)(ゴム製のアヒルのおもちゃ)を使用します。

なお、本手順で使用する訓練済み物体検出推論モデル元となった[COCOデータセット](https://cocodataset.org/#explore)には、ラバーダックも動物としてのアヒルも含まれていません。

In [None]:
# 画像データをロードし、表示します
train_image_dir = 'models/research/object_detection/test_images/ducky/train/'
train_images_np = []
for i in range(1, 6):
  image_path = os.path.join(train_image_dir, 'robertducky' + str(i) + '.jpg')
  train_images_np.append(load_image_into_numpy_array(image_path))

plt.rcParams['axes.grid'] = False
plt.rcParams['xtick.labelsize'] = False
plt.rcParams['ytick.labelsize'] = False
plt.rcParams['xtick.top'] = False
plt.rcParams['xtick.bottom'] = False
plt.rcParams['ytick.left'] = False
plt.rcParams['ytick.right'] = False
plt.rcParams['figure.figsize'] = [14, 7]

for idx, train_image_np in enumerate(train_images_np):
  plt.subplot(2, 3, idx+1)
  plt.imshow(train_image_np)
plt.show()

## データのアノテーション

次に、画像のどこに何が写っているかを表すアノテーションデータを作成します。

TODO: VoTTなどのアノテーションツールの使い方についてまとめる

```
colab_utils.annotate
```
を用いることで、Google Colaboratory上でもアノテーションを行うことができます。

今回は事前にアノテーションしたバウンディングボックスを用いて教師データを作成します。


In [None]:
# 以下の2行をアンコメントすることで、Googlel Colaboratory上でも画像にアノテーションを行うことができます
# gt_boxes = []
# colab_utils.annotate(train_images_np, box_storage_pointer=gt_boxes)

# 上2行の方法でアノテーションを行う場合は以下をコメントアウトしてください
gt_boxes = [
            np.array([[0.436, 0.591, 0.629, 0.712]], dtype=np.float32),
            np.array([[0.539, 0.583, 0.73, 0.71]], dtype=np.float32),
            np.array([[0.464, 0.414, 0.626, 0.548]], dtype=np.float32),
            np.array([[0.313, 0.308, 0.648, 0.526]], dtype=np.float32),
            np.array([[0.256, 0.444, 0.484, 0.629]], dtype=np.float32)
]


次に、クラスのアノテーションを追加します。
このサンプルではわかりやすくするために、単一の「rubber_dukey」クラスを指定していますが、複数のクラスを扱うように拡張することも可能です。

In [None]:
# 慣習的にクラスIDは1から数え始めます
duck_class_id = 1
num_classes = 1

category_index = {duck_class_id: {'id': duck_class_id, 'name': 'rubber_ducky'}}

入力データ(教師データや画像)をトレーニンググループが期待するフォーマットに変換します。  
期待するフォーマットはモデルによって異なります(tensor型やワンショット表現など)。


In [None]:
label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []
for (train_image_np, gt_box_np) in zip(
    train_images_np, gt_boxes):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0))
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(
      np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
  gt_classes_one_hot_tensors.append(tf.one_hot(
      zero_indexed_groundtruth_classes, num_classes))
print('データの事前準備が完了しました。')

ラバーダックを正しくアノテーションできたかを確認します。

In [None]:
dummy_scores = np.array([1.0], dtype=np.float32)  # ダミーのスコアとして100%(=1.0)に設定します。

plt.figure(figsize=(30, 15))
for idx in range(5):
  plt.subplot(2, 3, idx+1)
  plot_detections(
      train_images_np[idx],
      gt_boxes[idx],
      np.ones(shape=[gt_boxes[idx].shape[0]], dtype=np.int32),
      dummy_scores, category_index)
plt.show()

## モバイル端末向け学習済みモデルのロード

ここでは、モバイル端末向けの物体検出モデルのアーキテクチャである「SSD MobileNet V2 FPN-Lite」をビルドし、最上位の分類層を除く全ての層を復元します。

**執筆時点では、最終的に出力するTFLite形式の物体検出推論モデルは、SSDモデル以外サポートしていないことに注意してください。**

本サンプルでは、今回使用するSSDアーキテクチャに合わせていくつかの項目をハードコーディングしています(入力画像サイズが常に320x320であることを想定しているなど)。

In [None]:
# checkpointファイルをダウンロードしてmodels/research/object_detection/test_data/に移動させます。

!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz
!tar -xf ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz
!if [ -d "models/research/object_detection/test_data/checkpoint" ]; then rm -Rf models/research/object_detection/test_data/checkpoint; fi
!mkdir models/research/object_detection/test_data/checkpoint
!mv ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

In [None]:
tf.keras.backend.clear_session()

print('モデルをビルドし、ファインチューニングの為に重みパラメータを復元します。', flush=True)
num_classes = 1
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# 後でTFLiteファイルを作成するために、outpuy_directoryにcheckpointファイルとconfigファイルを保存します。
output_directory = 'output/'
output_checkpoint_dir = os.path.join(output_directory, 'checkpoint')

# パイプラインコンフィグをロードし、物体検出モデルをビルドします。
#
# 90個のクラスを予測するCOCOアーキテクチャを使用しているので、
# num_classフィールドを上書きして1個の新しいクラス(rubber_ducky)を作成します。
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)
# 新しいパイプラインコンフィグを保存します。
pipeline_proto = config_util.create_pipeline_proto_from_configs(configs)
config_util.save_pipeline_config(pipeline_proto, output_directory)

# オブジェクトベースのチェックポイントリストアの設定 
# SSDには2つの予測ヘッドがあります。
# 1つは分類用、もう1つはボックス回帰用です。 
# ここではボックス回帰ヘッドを復元し、分類ヘッドを最初から初期化します。
fake_box_predictor = tf.compat.v2.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    # 以下の行をアンコメントすることで、両方のヘッドを復元することができます。
    # _prediction_heads=detection_model._box_predictor._prediction_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.compat.v2.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.compat.v2.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path).expect_partial()

# TFLiteへの変換のために、checkpointファイルを保存します。
exported_ckpt = tf.compat.v2.train.Checkpoint(model=detection_model)
ckpt_manager = tf.train.CheckpointManager(
    exported_ckpt, output_checkpoint_dir, max_to_keep=1)

# ダミーの画像を使用して推論を実行します。
image, shapes = detection_model.preprocess(tf.zeros([1, 320, 320, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('重みパラメータを復元しました。')

## 転移学習

ロードした学習済みモデルに対して、用意した教師データを使って転移学習を行います。

この手順でのいくつかのパラメータは実験的に設定されています。
例えばSGDの場合、"learning rate"、"num_batches"、"momentum"パラメータなどです。

これらはあくまでもサンプルのパラメータ設定であり、最良の結果を得るためには、データやモデルのアーキテクチャに合わせてチューニングする必要があります。

In [None]:
tf.keras.backend.set_learning_phase(True)

# これらのパラメータは チューニング可能です。
# 今回の例では学習用の画像は5枚なので、これ以上バッチサイズを大きくする意味はありませんが、
# 必要であれば設定可能です。
batch_size = 5
learning_rate = 0.15
num_batches = 1000

# 最上位のレイヤーのファインチューニングする変数を選択します。
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

# 学習のために、前後方のパスを設定します。
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):

  # 少しだけ実行速度が速くなるため、tf.functionデコレーターを使用しています。
  # "@tf.function"をコメントアウトすることで、中でどのような処理が行われているか見ることができます。
  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    """A single training iteration.

    Args:
      image_tensors: A list of [1, height, width, 3] Tensor of type tf.float32.
        Note that the height and width can vary across images, as they are
        reshaped within this function to be 320x320.
      groundtruth_boxes_list: A list of Tensors of shape [N_i, 4] with type
        tf.float32 representing groundtruth boxes for each image in the batch.
      groundtruth_classes_list: A list of Tensors of shape [N_i, num_classes]
        with type tf.float32 representing groundtruth boxes for each image in
        the batch.

    Returns:
      A scalar tensor representing the total loss for the input batch.
    """
    shapes = tf.constant(batch_size * [[320, 320, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

print('ファインチューニングを開始します。', flush=True)
for idx in range(num_batches):
  # 教師データとなる画像をランダムに取得
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  # なお、このデモではデータのかさ増しはしていません。
  # ランダムに画像を回転させたり、切り抜いたりしたものを教師データとすることで
  # より精度が高いモデルが作成できることがあります。
  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]

  # トレーニングを行います。
  total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 100 == 0:
    print('batch ' + str(idx) + ' of ' + str(num_batches)
    + ', loss=' +  str(total_loss.numpy()), flush=True)

print('ファインチューニングが完了しました。')

ckpt_manager.save()
print('checkpointファイルを保存しました。')

# TensorFlow Lite形式にエクスポート&実行



## モデルの変換

上記手順で作成した推論モデルはTensorFlow形式ではありますが、Armadillo-IoT ゲートウェイ G4では使用できない形式です。

モバイル端末向けに軽量化されたTFLite形式に変換しなければなりませんので、以下に手順を紹介します。

最初に、`export_tflite_graph_tf2.py`を実行し、TFLite形式に適した中間ファイルを生成します。

この中間ファイルがTensorFlow Lite Converterに渡され、最終的にTFLiteモデルが生成されることになります。

このプロセスについての詳細は[こちら](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/running_on_mobile_tf2.md)を参照してください。

In [None]:
%%bash
python models/research/object_detection/export_tflite_graph_tf2.py \
  --pipeline_config_path output/pipeline.config \
  --trained_checkpoint_dir output/checkpoint \
  --output_directory tflite

## TFliteモデルのダウンロード

TensorFlow Lite Converterを使用してTFLite形式のモデル(model.tflite)を生成します。

In [None]:
!tflite_convert --saved_model_dir=tflite/saved_model --output_file=tflite/model.tflite

## TFLiteモデルをテストする

出来上がったTFLiteモデルをロードし、テストしてみます。

In [None]:
test_image_dir = 'models/research/object_detection/test_images/ducky/test/'
test_images_np = []
for i in range(1, 50):
  image_path = os.path.join(test_image_dir, 'out' + str(i) + '.jpg')
  test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0))

In [None]:
def detect(interpreter, input_tensor):
  """Run detection on an input image.

  Args:
    interpreter: tf.lite.Interpreter
    input_tensor: A [1, height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.

  Returns:
    A dict containing 3 Tensors (`detection_boxes`, `detection_classes`,
      and `detection_scores`).
  """
  input_details = interpreter.get_input_details()
  output_details = interpreter.get_output_details()

  # TFLiteモデルには前処理が含まれていないので、前処理部分は元のモデルを使用しています。
  preprocessed_image, shapes = detection_model.preprocess(input_tensor)
  interpreter.set_tensor(input_details[0]['index'], preprocessed_image.numpy())

  interpreter.invoke()

  boxes = interpreter.get_tensor(output_details[1]['index'])
  classes = interpreter.get_tensor(output_details[3]['index'])
  scores = interpreter.get_tensor(output_details[0]['index'])

  return boxes, classes, scores

In [None]:
# TFLiteモデルをロードし、メモリ上に展開
interpreter = tf.lite.Interpreter(model_path="tflite/model.tflite")
interpreter.allocate_tensors()

# なお、最初の1フレームはtf.functionのトレースが行われて時間がかかりますが、その後の推論は速くなるはずです。

label_id_offset = 1
for i in range(len(test_images_np)):
  input_tensor = tf.convert_to_tensor(test_images_np[i], dtype=tf.float32)
  boxes, classes, scores = detect(interpreter, input_tensor)

  plot_detections(
      test_images_np[i][0],
      boxes[0],
      classes[0].astype(np.uint32) + label_id_offset,
      scores[0],
      category_index, figsize=(15, 20), image_name="gif_frame_" + ('%02d' % i) + ".jpg")

In [None]:
# 推論した結果を描画したjpgファイルをつなげてGIFファイルとして出力します。
imageio.plugins.freeimage.download()

anim_file = 'duckies_test.gif'

filenames = glob.glob('gif_frame_*.jpg')
filenames = sorted(filenames)
last = -1
images = []
for filename in filenames:
  image = imageio.imread(filename)
  images.append(image)

imageio.mimsave(anim_file, images, 'GIF-FI', fps=5)

display(IPyImage(open(anim_file, 'rb').read()))

## モデルをダウンロードします

本手順で生成したTFLiteモデルは、Armadillo-IoT ゲートウェイ G4上でも動作します。
以下を実行することで生成したTFLite形式のモデルである、tfliteファイルをダウンロードできます。

その後の実機上での実行方法については、Armadillo Base OS開発ガイドを参照してください。

In [None]:
from google.colab import files
files.download('tflite/model.tflite') 