In [1]:
from ray import serve

import os
import tempfile
import numpy as np
import requests

In [2]:
TRAINED_MODEL_PATH = os.path.join(tempfile.gettempdir(), "mnist_model.h5")


def train_and_save_model():
    import tensorflow as tf
    # Load mnist dataset
    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Train a simple neural net model
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation="relu"),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])
    model.fit(x_train, y_train, epochs=1)

    model.evaluate(x_test, y_test, verbose=2)
    model.summary()

    # Save the model in h5 format in local file system
    model.save(TRAINED_MODEL_PATH)


if not os.path.exists(TRAINED_MODEL_PATH):
    train_and_save_model()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
313/313 - 1s - loss: 0.1428 - accuracy: 0.9570
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten (Flatten)            (None, 784)               0         
_________________________________________________________________
dense (Dense)                (None, 128)               100480    
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_1 (Dense)              (None, 10)                1290      
Total params: 101,770
Trainable params: 101,770
Non-trainable params: 0
_________________________________________________________________


In [11]:
class TFMnistModel:
    def __init__(self, model_path):
        import tensorflow as tf
        self.model_path = model_path
        self.model = tf.keras.models.load_model(model_path)

    def __call__(self, flask_request):
        # Step 1: transform HTTP request -> tensorflow input
        # Here we define the request schema to be a json array.
        input_array = np.array(flask_request.json["array"])
        reshaped_array = input_array.reshape((1, 28, 28))

        # Step 2: tensorflow input -> tensorflow output
        prediction = self.model(reshaped_array)

        # Step 3: tensorflow output -> web output
        return {
            "prediction": prediction.numpy().tolist(),
            "file": self.model_path
        }

In [15]:
client = serve.start(http_port=8088)
client.create_backend("tf:v1", TFMnistModel, TRAINED_MODEL_PATH)
client.create_endpoint("tf_classifier", backend="tf:v1", route="/mnist_test")

[2m[36m(pid=2986)[0m 2020-10-07 00:15:30,344	INFO controller.py:194 -- Starting router with name 'KxJHRE:SERVE_CONTROLLER_ACTOR:SERVE_PROXY_ACTOR-node:10.221.0.4-0' on node 'node:10.221.0.4-0' listening on '127.0.0.1:8088'


RayTaskError(RayServeException): [36mray::ServeController.create_backend()[39m (pid=2986, ip=10.221.0.4)
  File "python/ray/_raylet.pyx", line 482, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 433, in ray._raylet.execute_task.function_executor
  File "python/ray/_raylet.pyx", line 1419, in ray._raylet.CoreWorker.run_async_func_in_event_loop
  File "/anaconda/envs/py37_tensorflow/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/anaconda/envs/py37_tensorflow/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/anaconda/envs/py37_tensorflow/lib/python3.7/site-packages/ray/serve/controller.py", line 783, in create_backend
    raise e
  File "/anaconda/envs/py37_tensorflow/lib/python3.7/site-packages/ray/serve/controller.py", line 780, in create_backend
    self._scale_replicas(backend_tag, backend_config.num_replicas)
  File "/anaconda/envs/py37_tensorflow/lib/python3.7/site-packages/ray/serve/controller.py", line 553, in _scale_replicas
    num_possible, current_num_replicas + num_possible))
ray.serve.exceptions.RayServeException: Cannot scale backend tf:v1 to 1 replicas. Ray Serve tried to add 1 replicas but the resources only allows 0 to be added. To fix this, consider scaling to replica to 0 or add more resources to the cluster. You can check avaiable resources with ray.nodes().

2020-10-07 00:15:37,129	INFO (unknown file):0 -- gc.collect() freed 82 refs in 0.19147202300155186 seconds
[2m[36m(pid=2454)[0m 2020-10-07 00:15:37,060	INFO (unknown file):0 -- gc.collect() freed 73 refs in 0.12369591999959084 seconds
[2m[36m(pid=2986)[0m 2020-10-07 00:15:37,058	INFO (unknown file):0 -- gc.collect() freed 108 refs in 0.1252408309992461 seconds


In [14]:
resp = requests.get(
    "http://localhost:8088/mnist_test",
    json={"array": np.random.randn(28 * 28).tolist()})
print(resp.json())

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

2020-10-07 00:15:17,043	INFO (unknown file):0 -- gc.collect() freed 53 refs in 0.16772314999980154 seconds
[2m[36m(pid=2454)[0m 2020-10-07 00:15:17,007	INFO (unknown file):0 -- gc.collect() freed 8 refs in 0.1270497469995462 seconds
