Skip to content

Commit

Permalink
[Refactor] Add API output adapters (#783)
Browse files Browse the repository at this point in the history
* init adapters

* Add DataframeOutput

* Gen pb

* Add JsonOutput

* [Style]: cleaning $ renaming

* revert & freeze versioneer.py
  • Loading branch information
bojiang committed Jun 12, 2020
1 parent 2624fb7 commit 8d49f11
Show file tree
Hide file tree
Showing 26 changed files with 1,238 additions and 543 deletions.
2 changes: 0 additions & 2 deletions bentoml/clipper/__init__.py
Expand Up @@ -34,8 +34,6 @@


CLIPPER_ENTRY = """\
from __future__ import print_function
import rpc # this is clipper's rpc.py module
import os
import sys
Expand Down
14 changes: 14 additions & 0 deletions bentoml/handlers/adapter/__init__.py
@@ -0,0 +1,14 @@
from bentoml.handlers.adapter.dataframe_output import DataframeOutput
from bentoml.handlers.adapter.tf_tensor_output import TfTensorOutput
from bentoml.handlers.adapter.base_output import BaseOutputAdapter
from bentoml.handlers.adapter.default_output import DefaultOutput
from bentoml.handlers.adapter.json_output import JsonserializableOutput


__all__ = [
'DefaultOutput',
'DataframeOutput',
'BaseOutputAdapter',
'TfTensorOutput',
'JsonserializableOutput',
]
81 changes: 81 additions & 0 deletions bentoml/handlers/adapter/base_output.py
@@ -0,0 +1,81 @@
# Copyright 2019 Atalaya Tech, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable

import flask

from bentoml.marshal.utils import SimpleResponse, SimpleRequest


class BaseOutputAdapter:
"""OutputAdapter is an layer between result of user defined API callback function
and final output in a variety of different forms,
such as HTTP response, command line stdout or AWS Lambda event object.
"""

def __init__(self, cors='*'):
self.cors = cors

@property
def config(self):
return dict(cors=self.cors,)

def to_response(self, result, request: flask.Request) -> flask.Response:
"""Converts corresponding data into an HTTP response
:param result: result of user API function
:param request: request object
"""
simple_req = SimpleRequest(headers=request.headers, data=request.get_data())
simple_resp = self.to_batch_response((result,), requests=(simple_req,))[0]
return flask.Response(
response=simple_resp.data,
status=simple_resp.status,
headers=simple_resp.headers,
)

def to_batch_response(
self, result_conc, slices=None, fallbacks=None, requests=None,
) -> Iterable[SimpleResponse]:
"""Converts corresponding data merged by batching service into HTTP responses
:param result_conc: result of user API function
:param slices: auto-batching slices
:param requests: request objects
"""
raise NotImplementedError()

def to_cli(self, result, args):
"""Converts corresponding data into an CLI output.
:param result: result of user API function
:param args: CLI args
"""
raise NotImplementedError()

def to_aws_lambda_event(self, result, event):
"""Converts corresponding data into a Lambda event.
:param result: result of user API function
:param event: input event
"""
raise NotImplementedError

@property
def pip_dependencies(self):
"""
:return: List of PyPI package names required by this OutputAdapter
"""
return []
150 changes: 150 additions & 0 deletions bentoml/handlers/adapter/dataframe_output.py
@@ -0,0 +1,150 @@
# Copyright 2019 Atalaya Tech, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable

import argparse

from bentoml.exceptions import BentoMLException
from bentoml.marshal.utils import SimpleResponse, SimpleRequest
from bentoml.handlers.adapter.base_output import BaseOutputAdapter

PANDAS_DATAFRAME_TO_DICT_ORIENT_OPTIONS = [
'dict',
'list',
'series',
'split',
'records',
'index',
]


def df_to_json(result, pandas_dataframe_orient="records"):
import pandas as pd

assert (
pandas_dataframe_orient in PANDAS_DATAFRAME_TO_DICT_ORIENT_OPTIONS
), f"unkown pandas dataframe orient '{pandas_dataframe_orient}'"

if isinstance(result, pd.DataFrame):
return result.to_json(orient=pandas_dataframe_orient)

if isinstance(result, pd.Series):
return pd.DataFrame(result).to_dict(orient=pandas_dataframe_orient)
raise BentoMLException("DataframeOutput only accepts pd.Series or pd.DataFrame.")


class DataframeOutput(BaseOutputAdapter):
"""
Converts result of use defined API function into specific output.
Args:
output_orient (str): Prefer json orient format for output result. Default is
records.
cors (str): The value of the Access-Control-Allow-Origin header set in the
AWS Lambda response object. Default is "*". If set to None,
the header will not be set.
"""

def __init__(self, output_orient='records', **kwargs):
super(DataframeOutput, self).__init__(**kwargs)
self.output_orient = output_orient

assert self.output_orient in PANDAS_DATAFRAME_TO_DICT_ORIENT_OPTIONS, (
f"Invalid 'output_orient'='{self.orient}', valid options are "
f"{PANDAS_DATAFRAME_TO_DICT_ORIENT_OPTIONS}"
)

@property
def config(self):
base_config = super(DataframeOutput, self).config
return dict(base_config, output_orient=self.output_orient,)

def to_batch_response(
self,
result_conc,
slices=None,
fallbacks=None,
requests: Iterable[SimpleRequest] = None,
) -> Iterable[SimpleResponse]:
# TODO(bojiang): header content_type

if slices is None:
slices = [i for i, _ in enumerate(result_conc)]
if fallbacks is None:
fallbacks = [None] * len(slices)
responses = [None] * len(slices)

for i, (s, f) in enumerate(zip(slices, fallbacks)):
if s is None:
responses[i] = f
continue
result = result_conc[s]
try:
json_output = df_to_json(
result, pandas_dataframe_orient=self.output_orient
)
responses[i] = SimpleResponse(
200, (("Content-Type", "application/json"),), json_output
)
except AssertionError as e:
responses[i] = SimpleResponse(400, None, str(e))
except Exception as e: # pylint: disable=broad-except
responses[i] = SimpleResponse(500, None, str(e))
return responses

def to_cli(self, result, args):
"""Handles an CLI command call, convert CLI arguments into
corresponding data format that user API function is expecting, and
prints the API function result to console output
:param args: CLI arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("-o", "--output", default="str", choices=["str", "json"])
parser.add_argument(
"--output_orient",
default=self.output_orient,
choices=PANDAS_DATAFRAME_TO_DICT_ORIENT_OPTIONS,
)
parsed_args = parser.parse_args(args)

if parsed_args.output == 'json':
result = df_to_json(
result, pandas_dataframe_orient=parsed_args.output_orient
)
else:
result = str(result)
print(result)

def to_aws_lambda_event(self, result, event):

result = df_to_json(result, pandas_dataframe_orient=self.output_orient)

# Allow disabling CORS by setting it to None
if self.cors:
return {
"statusCode": 200,
"body": result,
"headers": {"Access-Control-Allow-Origin": self.cors},
}

return {"statusCode": 200, "body": result}

@property
def pip_dependencies(self):
"""
:return: List of PyPI package names required by this BentoHandler
"""
return ['pandas']
106 changes: 106 additions & 0 deletions bentoml/handlers/adapter/default_output.py
@@ -0,0 +1,106 @@
# Copyright 2019 Atalaya Tech, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable

from bentoml.marshal.utils import SimpleResponse, SimpleRequest
from .base_output import BaseOutputAdapter


def detect_suitable_adapter(result, slices=None):
if slices is not None:
for s in slices:
if s:
return detect_suitable_adapter(result[s])
if isinstance(result, (list, tuple)):
return detect_suitable_adapter(result[0])

try:
import pandas as pd

if isinstance(result, (pd.DataFrame, pd.Series)):
from .dataframe_output import DataframeOutput

return DataframeOutput
except ImportError:
pass

try:
import tensorflow as tf

if isinstance(result, tf.Tensor):
from .tf_tensor_output import TfTensorOutput

return TfTensorOutput
except ImportError:
pass

from .json_output import JsonserializableOutput

return JsonserializableOutput


class DefaultOutput(BaseOutputAdapter):
"""
Detect suitable output adapter automatically and
converts result of use defined API function into specific output.
Args:
cors (str): The value of the Access-Control-Allow-Origin header set in the
AWS Lambda response object. Default is "*". If set to None,
the header will not be set.
"""

def __init__(self, **kwargs):
super(DefaultOutput, self).__init__(**kwargs)
self.actual_adapter = None

def to_batch_response(
self,
result_conc,
slices=None,
fallbacks=None,
requests: Iterable[SimpleRequest] = None,
) -> Iterable[SimpleResponse]:
"""Converts corresponding data merged by batching service into HTTP responses
:param result_conc: result of user API function
:param slices: auto-batching slices
:param requests: request objects
"""
if self.actual_adapter is None:
self.actual_adapter = detect_suitable_adapter(result_conc, slices)()
return self.actual_adapter.to_batch_response(
result_conc, slices, fallbacks, requests
)

def to_cli(self, result, args):
"""Converts corresponding data into an CLI output.
:param result: result of user API function
:param args: CLI args
"""
if self.actual_adapter is None:
self.actual_adapter = detect_suitable_adapter(result)()
return self.actual_adapter.to_cli(result, args)

def to_aws_lambda_event(self, result, event):
"""Converts corresponding data into a Lambda event.
:param result: result of user API function
:param event: input event
"""
if self.actual_adapter is None:
self.actual_adapter = detect_suitable_adapter(result)()
return self.actual_adapter.to_aws_lambda_event(result, event)

0 comments on commit 8d49f11

Please sign in to comment.