# 白黒画像の自動色付けサンプル

## 事前学習済みモデルをダウンロード

In [None]:
!python3 $INTEL_OPENVINO_DIR/deployment_tools/tools/model_downloader/downloader.py --list $INTEL_OPENVINO_DIR/inference_engine/demos/python_demos/colorization_demo/models.lst

## 事前学習済みモデルをOpenVINOのIRに変換
事前学習モデルは今回のように元のディープラーニングフレームワークのファイル形式で提供されている場合もあります。その場合は下記コマンドを使用して、OpenVINOの独自形式であるIR形式に変換ください。

In [None]:
!python3 $INTEL_OPENVINO_DIR/deployment_tools/tools/model_downloader/converter.py --name colorization-v2

## ライブラリをインポート

In [None]:
import os
import logging as log
import sys
import io

from openvino.inference_engine import IECore
import cv2
import numpy as np

from PIL import Image
import PIL

import IPython.display
from IPython.display import clear_output

## 実装

In [None]:
class Colorization:
    def __init__(self, model_path):
        # Plugin initialization
        ie = IECore()
    
        # Setup OpenVINO's IE
        model = model_path
        net = ie.read_network(model, os.path.splitext(model)[0] + ".bin")
        self.input_blob = next(iter(net.input_info))
        self.output_blob = next(iter(net.outputs))
        self.input_batch_size, self.input_channel, self.input_height, self.input_width = net.input_info[self.input_blob].input_data.shape
        self.exec_net = ie.load_network(network=net, device_name="CPU")
        
        # Setup coeffs
        coeffs = "public/colorization-v2/colorization-v2.npy"
        self.color_coeff = np.load(coeffs).astype(np.float32)
        assert self.color_coeff.shape == (313, 2), "Current shape of color coefficients does not match required shape"
        
    def __preprocess_input__(self, original_frame):
        if original_frame.shape[2] > 1:
            frame = cv2.cvtColor(cv2.cvtColor(original_frame, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2RGB)
        else:
            frame = cv2.cvtColor(original_frame, cv2.COLOR_GRAY2RGB)

        img_rgb = frame.astype(np.float32) / 255
        img_lab = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2Lab)
        img_l_rs = cv2.resize(img_lab.copy(), (self.input_width, self.input_height))[:, :, 0]
        
        return img_lab, img_l_rs 
    
    def infer(self, original_frame):
        # Read and pre-process input image (NOTE: one image only)
        img_lab, img_l_rs = self.__preprocess_input__(original_frame)
        
        res = self.exec_net.infer(inputs={self.input_blob: [img_l_rs]})
        
        update_res = (res[self.output_blob] * self.color_coeff.transpose()[:, :, np.newaxis, np.newaxis]).sum(1)

        out = update_res.transpose((1, 2, 0))
        (h_orig, w_orig) = original_frame.shape[:2]
        out = cv2.resize(out, (w_orig, h_orig))
        img_lab_out = np.concatenate((img_lab[:, :, 0][:, :, np.newaxis], out), axis=2)
        img_bgr_out = np.clip(cv2.cvtColor(img_lab_out, cv2.COLOR_Lab2BGR), 0, 1)
        
        return img_bgr_out

In [None]:
def create_output_image(original_frame, img_bgr_out):
    (h_orig, w_orig) = original_frame.shape[:2]
    imshowSize = (int(w_orig*(200/h_orig)), 200)
    original_image = cv2.resize(original_frame, imshowSize)
    colorize_image = (cv2.resize(img_bgr_out, imshowSize) * 255).astype(np.uint8)

    original_image = cv2.putText(original_image, 'Original', (25, 50),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
    colorize_image = cv2.putText(colorize_image, 'Colorize', (25, 50),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

    ir_image = [cv2.hconcat([original_image, colorize_image])]
    final_image = cv2.vconcat(ir_image)
    final_image = cv2.cvtColor(final_image, cv2.COLOR_BGR2RGB)
    
    return final_image

## メイン処理を実装（単一画像の推論）

In [None]:
colorization = Colorization("public/colorization-v2/FP32/colorization-v2.xml")

input_source = "sample.jpg"
original_frame = cv2.imread(input_source)

img_bgr_out = colorization.infer(original_frame)

final_image = create_output_image(original_frame, img_bgr_out)

f = io.BytesIO()
PIL.Image.fromarray(final_image).save(f, 'jpeg')
IPython.display.display(IPython.display.Image(data=f.getvalue()))

## メイン処理を実装（動画の推論）

In [None]:
colorization = Colorization("public/colorization-v2/FP32/colorization-v2.xml")

input_source = "https://github.com/hiouchiy/IntelAI_and_Cloud/raw/master/Azure/handson/bw.mp4"
cap = cv2.VideoCapture(input_source)
if not cap.isOpened():
    assert "{} not exist".format(input_source)

while True:
    hasFrame, original_frame = cap.read()
    if not hasFrame:
        break

    img_bgr_out = colorization.infer(original_frame)

    final_image = create_output_image(original_frame, img_bgr_out)

    clear_output(wait=True)

    f = io.BytesIO()
    PIL.Image.fromarray(final_image).save(f, 'jpeg')
    IPython.display.display(IPython.display.Image(data=f.getvalue()))

    if not cv2.waitKey(1) < 0:
        break

---

## OpenVINO™ Model Serverを使用してモデルをマイクロサービス化
ここからは手書き文字認識モデルをOpenVINO Model Serverでマイクロサービス化して外出しにします。Model ServerとはgRPCを介してコミュニケーションを取ります。

### OpenVINO Model Serverのセットアップ
OpenVINO Model ServerをホストOS上で稼働させる手順です。こちらを実行後に以降の作業を進めてください。
1. Model ServerのDockerイメージをダウンロード
```Bash
docker pull openvino/model_server:latest
```
1. 自動色付け用モデル（XMLファイルとBINファイル）を適当なフォルダへ格納
1. Model Serverを起動
```Bash
docker run -d --rm -v C:\Users\hiroshi\model\colorization:/models/colorization/1 -p 9000:9000 openvino/model_server:latest --model_path /models/colorization --model_name colorization --port 9000 --log_level DEBUG  --shape auto
```
※"C:\Users\hiroshi\model\colorization"にモデルが格納されているとした場合

### 実装

In [None]:
import cv2
import datetime
import grpc
import numpy as np
import os
import io
from tensorflow import make_tensor_proto, make_ndarray
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc, get_model_metadata_pb2

from PIL import Image
import PIL

import IPython.display
from IPython.display import clear_output

class RemoteColorization:
    def __init__(self, grpc_address='localhost', grpc_port=9000, model_name='colorization', model_version=None):
        
        #Settings for accessing model server
        self.grpc_address = grpc_address
        self.grpc_port = grpc_port
        self.model_name = model_name
        self.model_version = model_version
        channel = grpc.insecure_channel("{}:{}".format(self.grpc_address, self.grpc_port))
        self.stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
        
        # Get input shape info from Model Server
        self.input_name, input_shape, self.output_name, output_shape = self.__get_input_name_and_shape__()
        self.input_batchsize = input_shape[0]
        self.input_channel = input_shape[1]
        self.input_height = input_shape[2]
        self.input_width = input_shape[3]
        
        # Setup coeffs
        coeffs = "public/colorization-v2/colorization-v2.npy"
        self.color_coeff = np.load(coeffs).astype(np.float32)
        assert self.color_coeff.shape == (313, 2), "Current shape of color coefficients does not match required shape"

    def __get_input_name_and_shape__(self):
        metadata_field = "signature_def"
        request = get_model_metadata_pb2.GetModelMetadataRequest()
        request.model_spec.name = self.model_name
        if self.model_version is not None:
            request.model_spec.version.value = self.model_version
        request.metadata_field.append(metadata_field)

        result = self.stub.GetModelMetadata(request, 10.0) # result includes a dictionary with all model outputs
        input_metadata, output_metadata = self.__get_input_and_output_meta_data__(result)
        input_blob = next(iter(input_metadata.keys()))
        output_blob = next(iter(output_metadata.keys()))
        return input_blob, input_metadata[input_blob]['shape'], output_blob, output_metadata[output_blob]['shape']
    
    def __get_input_and_output_meta_data__(self, response):
        signature_def = response.metadata['signature_def']
        signature_map = get_model_metadata_pb2.SignatureDefMap()
        signature_map.ParseFromString(signature_def.value)
        serving_default = signature_map.ListFields()[0][1]['serving_default']
        serving_inputs = serving_default.inputs
        input_blobs_keys = {key: {} for key in serving_inputs.keys()}
        tensor_shape = {key: serving_inputs[key].tensor_shape
                        for key in serving_inputs.keys()}
        for input_blob in input_blobs_keys:
            inputs_shape = [d.size for d in tensor_shape[input_blob].dim]
            tensor_dtype = serving_inputs[input_blob].dtype
            input_blobs_keys[input_blob].update({'shape': inputs_shape})
            input_blobs_keys[input_blob].update({'dtype': tensor_dtype})
        
        serving_outputs = serving_default.outputs
        output_blobs_keys = {key: {} for key in serving_outputs.keys()}
        tensor_shape = {key: serving_outputs[key].tensor_shape
                        for key in serving_outputs.keys()}
        for output_blob in output_blobs_keys:
            outputs_shape = [d.size for d in tensor_shape[output_blob].dim]
            tensor_dtype = serving_outputs[output_blob].dtype
            output_blobs_keys[output_blob].update({'shape': outputs_shape})
            output_blobs_keys[output_blob].update({'dtype': tensor_dtype})

        return input_blobs_keys, output_blobs_keys
        
    def __preprocess_input__(self, original_frame):
        if original_frame.shape[2] > 1:
            frame = cv2.cvtColor(cv2.cvtColor(original_frame, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2RGB)
        else:
            frame = cv2.cvtColor(original_frame, cv2.COLOR_GRAY2RGB)

        img_rgb = frame.astype(np.float32) / 255
        img_lab = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2Lab)
        img_l_rs = cv2.resize(img_lab.copy(), (self.input_width, self.input_height))[:, :, 0]
        
        return img_lab, img_l_rs 
    
    def infer(self, original_frame):
        # Read and pre-process input image (NOTE: one image only)
        img_lab, img_l_rs = self.__preprocess_input__(original_frame)
        input_image = img_l_rs.reshape(self.input_batchsize, self.input_channel, self.input_height, self.input_width).astype(np.float32)
        
        #res = self.exec_net.infer(inputs={self.input_blob: [img_l_rs]})
        # Model ServerにgRPCでアクセスしてモデルをコール
        request = predict_pb2.PredictRequest()
        request.model_spec.name = self.model_name
        request.inputs[self.input_name].CopyFrom(make_tensor_proto(input_image, shape=(input_image.shape)))
        result = self.stub.Predict(request, 10.0) # result includes a dictionary with all model outputs
        res = make_ndarray(result.outputs[self.output_name])
        
        update_res = (res * self.color_coeff.transpose()[:, :, np.newaxis, np.newaxis]).sum(1)

        out = update_res.transpose((1, 2, 0))
        (h_orig, w_orig) = original_frame.shape[:2]
        out = cv2.resize(out, (w_orig, h_orig))
        img_lab_out = np.concatenate((img_lab[:, :, 0][:, :, np.newaxis], out), axis=2)
        img_bgr_out = np.clip(cv2.cvtColor(img_lab_out, cv2.COLOR_Lab2BGR), 0, 1)
        
        return img_bgr_out

### 実行

In [None]:
colorization = RemoteColorization(grpc_address='192.168.145.33', grpc_port='9000', model_name='colorization')

input_source = "sample.jpg"
original_frame = cv2.imread(input_source)

img_bgr_out = colorization.infer(original_frame)

final_image = create_output_image(original_frame, img_bgr_out)

f = io.BytesIO()
PIL.Image.fromarray(final_image).save(f, 'jpeg')
IPython.display.display(IPython.display.Image(data=f.getvalue()))