# Basic classification: Classify images of clothing
A tensorflow serving style service example using MLFlow


![Impression](https://www.google-analytics.com/collect?v=1&tid=UA-112879361-3&cid=555&t=event&ec=tensorflow&ea=tensorflow_2_fashion_mnist&dt=tensorflow_2_fashion_mnist)

In [3]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

# add venv PATH to shell command PATH
import sys, os
if sys.base_prefix not in os.environ['PATH']:
    os.environ['PATH'] = f"{sys.base_prefix}/bin:{os.environ['PATH']}"

In [4]:
from __future__ import absolute_import, division, print_function, unicode_literals

import io

# TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import matplotlib.pyplot as plt
print(tf.__version__)

2.1.0


In [5]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(_train_images, train_labels), (_test_images, test_labels) = fashion_mnist.load_data()
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
train_images = _train_images / 255.0
test_images = _test_images / 255.0

# pick up a test image
d_test_img = _test_images[0]
print(class_names[test_labels[0]])

plt.imshow(255.0 - d_test_img, cmap='gray')
plt.imsave("test.png", 255.0 - d_test_img, cmap='gray')

In [6]:
class FashionMnist(tf.keras.Model):
    def __init__(self):
        super(FashionMnist, self).__init__()
        self.cnn = tf.keras.Sequential([
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])
    
    @staticmethod
    def image_bytes2tensor(inputs):
        inputs = tf.map_fn(lambda i: tf.io.decode_png(i, channels=1), inputs, dtype=tf.uint8)
        inputs = tf.cast(inputs, tf.float32)
        inputs = (255.0 - inputs) / 255.0
        inputs = tf.reshape(inputs, [-1, 28, 28])
        return inputs

    @tf.function(input_signature=[tf.TensorSpec(shape=(None,), dtype=tf.string)])
    def predict_image(self, inputs):
        inputs = self.image_bytes2tensor(inputs)
        return self(inputs)
    
    def call(self, inputs):
        return self.cnn(inputs)

In [None]:
model = FashionMnist()
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.fit(train_images, train_labels, epochs=1)

Train on 60000 samples

## test the model

In [36]:
predict = model(tf.constant(np.reshape(test_images[:3], (-1, 28 * 28))))
klass = tf.argmax(predict, axis=1)
[class_names[k] for k in klass]



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



['Ankle boot', 'Pullover', 'Trouser']

And the model predicts a label as expected.

# Define & save MLFlow Pyfunc model

In [9]:
tmpdir = 'mlflow_tmp'
tf_model_path = os.path.join(str(tmpdir), "tf.pkl")
tf.saved_model.save(model, tf_model_path)

INFO:tensorflow:Assets written to: mlflow_tmp/tf.pkl/assets


In [28]:
%%writefile benchmark_mlflow_pyfunc.py
from __future__ import print_function

import os
import pickle

import base64
import pandas as pd
import numpy as np
import pytest
import six

import tensorflow as tf

import mlflow
import mlflow.pyfunc
import mlflow.pyfunc.model
from mlflow.models import Model


def _load_pyfunc(path):
    tf_model = tf.saved_model.load(path)
    class Model:
        class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                       'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
        def predict(self, inputs):
            _bytes = [base64.b64decode(i) for i in inputs['str'].to_numpy().tolist()]
            inputs = tf.constant(_bytes, dtype=tf.string)
            outputs = tf_model.predict_image(inputs)
            output_classes = tf.math.argmax(outputs, axis=1)
            return [self.class_names[i] for i in output_classes]
    return Model()
        


if __name__ == '__main__':
    tmpdir = 'mlflow_tmp'
    tf_model_path = os.path.join(str(tmpdir), "tf.pkl")
    model_path = os.path.join(str(tmpdir), "model")

    model_config = Model(run_id="test")
    mlflow.pyfunc.save_model(path=model_path,
                             data_path=tf_model_path,
                             loader_module=os.path.basename(__file__)[:-3],
                             code_path=[__file__],
                             mlflow_model=model_config)

    reloaded_model = mlflow.pyfunc.load_pyfunc(model_path)
    print(reloaded_model)


Overwriting benchmark_mlflow_pyfunc.py


In [29]:
!rm -r {tmpdir}/model
!python benchmark_mlflow_pyfunc.py

  reloaded_model = mlflow.pyfunc.load_pyfunc(model_path)
2020-03-12 17:33:38.765326: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-03-12 17:33:38.788000: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2712000000 Hz
2020-03-12 17:33:38.788289: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x5579bd3561c0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-03-12 17:33:38.788312: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2020-03-12 17:33:38.788417: I tensorflow/core/common_runtime/process_util.cc:147] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
<benchmark_mlflow_pyfunc._load_pyfunc.<locals>.Model object at 0x7fcb5c754a90>


In [30]:
!mlflow models serve -m mlflow_tmp/model

  import imp
2020/03/12 17:33:40 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/03/12 17:33:40 INFO mlflow.pyfunc.backend: === Running command 'gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-03-12 17:33:40 +0800] [42524] [INFO] Starting gunicorn 20.0.4
[2020-03-12 17:33:40 +0800] [42524] [INFO] Listening at: http://127.0.0.1:5000 (42524)
[2020-03-12 17:33:40 +0800] [42524] [INFO] Using worker: sync
[2020-03-12 17:33:40 +0800] [42527] [INFO] Booting worker with pid: 42527
  import imp
2020-03-12 17:33:42.313731: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2 AVX AVX2 FMA
2020-03-12 17:33:42.339998: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2712000000 Hz
2020-03-12 17:33:42.340276: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55e71f6a70c0 init

# Build & Run in Docker

In [None]:
# replace PIP_INDEX_URL with your prefer pypi mirror
NAME = saved_path.split('/')[-1].lower()
!docker build -t {NAME} \
    --build-arg PIP_TRUSTED_HOST=192.168.138.2 \
    --build-arg PIP_INDEX_URL=http://192.168.138.2/simple \
    {saved_path}

In [None]:
from bentoml.utils import detect_free_port
PORT = detect_free_port()
print(PORT)

!docker run -itd -p {PORT}:5000 --cpus 1 -e FLAGS="--workers 1 --enable-microbatch" {NAME}:latest

# Test with requests

In [38]:
import base64
import json
import requests
import pandas as pd

with open("test.png", "rb") as f:
    img_bytes = f.read()
img_b64 = base64.b64encode(img_bytes).decode()


headers = {"content-type": "application/json"}
raw_data = np.array([img_b64])
data = pd.DataFrame(raw_data, columns=['str']).to_json(orient='split')

json_response = requests.post(f'http://127.0.0.1:5000/invocations', data=data, headers=headers)
print(json_response)
print(json_response.json())

<Response [200]>
['Ankle boot']


# Benchmark with locust

In [33]:
%%writefile benchmark_mlflow_b64.py
from locust import HttpLocust, TaskSet, task, constant
from functools import lru_cache

import math
import random
import numpy as np
import pandas as pd
import json
import base64
import requests


@lru_cache(maxsize=1)
def data_producer():

    with open("test.png", "rb") as f:
        img_bytes = f.read()
    img_b64 = base64.b64encode(img_bytes).decode()

    def _gen_data(size=3):
        headers = {"content-type": "application/json"}
        raw_data = np.array([img_b64] * size)
        data = pd.DataFrame(raw_data, columns=['str']).to_json(orient='split')
        return headers, data

    return _gen_data


class WebsiteTasks(TaskSet):

    @staticmethod
    def get_data():
        headers, data = data_producer()(1)
        return headers, data
        
    @task
    def index(self):
        headers, data = self.get_data()
        self.client.post("/invocations", data, headers=headers)

class WebsiteUser(HttpLocust):
    task_set = WebsiteTasks
    wait_time = constant(0.5)

Writing benchmark_mlflow_b64.py


In [34]:
PORT = 5000

In [35]:
!locust -f benchmark_mlflow_b64.py -H http://127.0.0.1:{PORT}

[2020-03-12 17:37:05,684] beta/INFO/locust.main: Starting web monitor at http://*:8089
[2020-03-12 17:37:05,684] beta/INFO/locust.main: Starting Locust 0.14.4
[2020-03-12 17:37:25,036] beta/INFO/locust.runners: Hatching and swarming 1000 users at the rate 1000 users/s (0 users already running)...
[2020-03-12 17:37:26,967] beta/INFO/locust.runners: All locusts hatched: WebsiteUser: 1000 (0 already running)
[2020-03-12 17:40:07,359] beta/INFO/locust.runners: Hatching and swarming 100 users at the rate 1000 users/s (0 users already running)...
[2020-03-12 17:40:07,524] beta/INFO/locust.runners: All locusts hatched: WebsiteUser: 100 (0 already running)
^C
[2020-03-12 17:44:13,804] beta/ERROR/stderr: KeyboardInterrupt
[2020-03-12 17:44:13,805] beta/ERROR/stderr: 2020-03-12T09:44:13Z
[2020-03-12 17:44:13,805] beta/ERROR/stderr: 
[2020-03-12 17:44:13,805] beta/INFO/locust.main: Shutting down (exit code 0), bye.
[2020-03-12 17:44:13,805] beta/INFO/locust.main: Cleaning up runner...
[2020-03-12