Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] fork server processes earlier #1476

Merged
merged 10 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 24 additions & 31 deletions bentoml/cli/bento_service.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
import argparse
import click
import sys

import json
import re
import sys

import click
import psutil

from bentoml import __version__
from bentoml.configuration import BENTOML_CONFIG
from bentoml.configuration.containers import BentoMLConfiguration, BentoMLContainer
from bentoml.utils.lazy_loader import LazyLoader
from bentoml.server import start_dev_server, start_prod_server
from bentoml.server.open_api import get_open_api_spec_json
from bentoml.utils import (
ProtoMessageToDict,
resolve_bundle_path,
)
from bentoml.cli.click_utils import (
CLI_COLOR_SUCCESS,
_echo,
BentoMLCommandGroup,
_echo,
conditional_argument,
)
from bentoml.cli.utils import Spinner
from bentoml.configuration import BENTOML_CONFIG
from bentoml.configuration.containers import BentoMLConfiguration, BentoMLContainer
from bentoml.entrypoint import start_prod_server
from bentoml.saved_bundle import (
load_from_dir,
load_bento_service_api,
load_bento_service_metadata,
load_from_dir,
)
from bentoml.server import start_dev_server
from bentoml.server.open_api import get_open_api_spec_json
from bentoml.utils import ProtoMessageToDict, resolve_bundle_path
from bentoml.utils.docker_utils import validate_tag
from bentoml.utils.lazy_loader import LazyLoader
from bentoml.yatai.client import get_yatai_client

try:
Expand Down Expand Up @@ -310,23 +308,18 @@ def serve_gunicorn(
bento, pip_installed_bundle_path, yatai_url
)

container = BentoMLContainer()
config = BentoMLConfiguration(override_config_file=config)
config.override(["api_server", "port"], port)
config.override(["api_server", "workers"], workers)
config.override(["api_server", "timeout"], timeout)
config.override(["api_server", "enable_microbatch"], enable_microbatch)
config.override(["api_server", "enable_swagger"], enable_swagger)
config.override(["marshal_server", "max_batch_size"], mb_max_batch_size)
config.override(["marshal_server", "max_latency"], mb_max_latency)
config.override(["marshal_server", "workers"], microbatch_workers)
container.config.from_dict(config.as_dict())

from bentoml import marshal, server

container.wire(packages=[marshal, server])

start_prod_server(saved_bundle_path)
start_prod_server(
saved_bundle_path,
port=port,
workers=workers,
timeout=timeout,
enable_microbatch=enable_microbatch,
enable_swagger=enable_swagger,
mb_max_batch_size=mb_max_batch_size,
mb_max_latency=mb_max_latency,
microbatch_workers=microbatch_workers,
config_file=config,
)

@bentoml_cli.command(
help="Install shell command completion",
Expand Down
3 changes: 2 additions & 1 deletion bentoml/configuration/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from deepmerge import always_merger
from dependency_injector import containers, providers
from schema import Schema, SchemaError, And, Or
from schema import And, Or, Schema, SchemaError

from bentoml.configuration import config
from bentoml.exceptions import BentoMLConfigException
Expand Down Expand Up @@ -48,6 +48,7 @@
"yatai": {"url": Or(str, None)},
"tracing": {"zipkin_api_url": Or(str, None)},
"instrument": {"namespace": str},
"logging": {"level": str},
}
)

Expand Down
3 changes: 3 additions & 0 deletions bentoml/configuration/default_bentoml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ tracing:

instrument:
namespace: BENTOML

logging:
level: INFO
133 changes: 133 additions & 0 deletions bentoml/entrypoint/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2019 Atalaya Tech, Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you envision will be added under the /entrypoint package? What do we gain by moving to a new package?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The place before is /bentoml/server/__init__.py. I don't think it's a proper place to wire packages including bentoml.server itself.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to wire one's own package. Taking a step back, we should think about how we'd like to expose our public APIs. Having a module like BentoMLController is one choice. What are some of the best practice in Python?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bojiang @ssheng I think what @bojiang suggested last time, having an /api/ module for exposing public APIs is probably the convention among python-based DS/ML tools:

/api
├── server.py
├── __init__.py
├── yatai.py
├── bundle.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we will keep these in the original location under server/__init__.py.


# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import multiprocessing
from typing import Optional

from bentoml.configuration.containers import BentoMLConfiguration, BentoMLContainer
from bentoml.utils import reserve_free_port

logger = logging.getLogger(__name__)


def start_prod_server(
saved_bundle_path: str,
port: Optional[int] = None,
workers: Optional[int] = None,
timeout: Optional[int] = None,
enable_microbatch: Optional[bool] = None,
enable_swagger: Optional[bool] = None,
mb_max_batch_size: Optional[int] = None,
mb_max_latency: Optional[int] = None,
microbatch_workers: Optional[int] = None,
config_file: Optional[str] = None,
):
import psutil

assert (
psutil.POSIX
), "BentoML API Server production mode only supports POSIX platforms"

config = BentoMLConfiguration(override_config_file=config_file)
config.override(["api_server", "port"], port)
config.override(["api_server", "workers"], workers)
config.override(["api_server", "timeout"], timeout)
config.override(["api_server", "enable_microbatch"], enable_microbatch)
config.override(["api_server", "enable_swagger"], enable_swagger)
config.override(["marshal_server", "max_batch_size"], mb_max_batch_size)
config.override(["marshal_server", "max_latency"], mb_max_latency)
config.override(["marshal_server", "workers"], microbatch_workers)

if config.config['api_server'].get('enable_microbatch'):
ssheng marked this conversation as resolved.
Show resolved Hide resolved
prometheus_lock = multiprocessing.Lock()
with reserve_free_port() as api_server_port:
pass

model_server_job = multiprocessing.Process(
target=_start_prod_server,
kwargs=dict(
saved_bundle_path=saved_bundle_path,
port=api_server_port,
config=config,
prometheus_lock=prometheus_lock,
),
daemon=True,
)
model_server_job.start()

try:
_start_prod_batching_server(
saved_bundle_path=saved_bundle_path,
config=config,
api_server_port=api_server_port,
prometheus_lock=prometheus_lock,
)
finally:
model_server_job.terminate()
else:
_start_prod_server(saved_bundle_path=saved_bundle_path, config=config)


def _start_prod_server(
saved_bundle_path: str,
config: BentoMLConfiguration,
port: Optional[int] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to explicitly have port here instead of from config?

Copy link
Member Author

@bojiang bojiang Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not want to change the value of config.api_server.port. Seeing the batching app and the model app as the whole API server, it makes sense to keep config.api_server rather than overriding it by the randomly picked port.

Copy link
Collaborator

@ssheng ssheng Mar 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern. Here is why we should restructure the config keys and terminology. Before we do that, I think there is an opportunity here to make the intermediate port injectable as well. First we can introduce an intermediate port (for lack of a better name).

api_server:
    marshal:
        intermediate_port: Null # or an actual port e.g. 6000

In the container, we can introduce a provider that first checks if there is an intermediate port defined, if not randomly reserve a port.

intermediate_port = providers.Callable(
    lambda port: port if port else reserve_free_port(),
    config.marshal.intermediate_port,
)

Basically, a lot of the logic here can be refactored as a provider in containers.py.

Copy link
Member Author

@bojiang bojiang Mar 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great to have a provider for the intermediate_port. But we have to wire after creating the new process. In your solution, the reserve_free_port would be called twice and gave intermediate_port different values in each process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call that reserve_free_port might be called twice. But it should be a solvable problem. We can use a singleton provider for creating the intermediate port. All users of the provider will get the same port. We will need to move the container creation out, however. Happy to discuss with you over a Zoom call.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDS is superior but we might have to leave the TCP option open to make remote marshaling a possibility. We can structure the config meaningfully to reflect this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to leave the TCP option open to make remote marshaling a possibility. We can structure the config meaningfully to reflect this.

api_server:
  port: 5000
  enable_microbatch: False
  run_with_ngrok: False
  enable_swagger: True
  enable_metrics: True
  enable_feedback: True
  max_request_size: 20971520
  workers: 1
  timeout: 60

model_server:
  port: Null  # default: api_server.port when enable_marshal=False

marshal_server:
  port: Null  # default: api_server.port when enable_marshal=True


like this?

Copy link
Collaborator

@ssheng ssheng Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking something like the following where the user can choose the connector type of provide related configs. Basically the schema for connector is a union of UDS and TCP connector schemas.

If UDS is chosen,

api_server:
  marshal:
    connector:
      type: UDS
      uds_related_key1: value1
      uds_related_key2: value2

Or, if TCP is chosen,

api_server:
  marshal:
    connector:
      type: TCP
      port: None # or some configured port
      tcp_related_key1: value1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For UDS support, I'd like to have a simple host field in URI format, just like gunicorn and Nginx does.

  • 127.0.0.1:5000
  • unix:/tmp/gunicorn.sock

Your schema is fancy but looks too powerful for me.
I think you can draft a new PR to demonstrate your idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After all, the config structure reflects our system design. We can continue the discussion in the new PR/issue.

prometheus_lock: Optional[multiprocessing.Lock] = None,
):

logger.info("Starting BentoML API server in production mode..")

container = BentoMLContainer()
container.config.from_dict(config.as_dict())

from bentoml import server

container.wire(packages=[server])

if port is None:
gunicorn_app = server.gunicorn_server.GunicornBentoServer(
saved_bundle_path, prometheus_lock=prometheus_lock,
)
else:
gunicorn_app = server.gunicorn_server.GunicornBentoServer(
saved_bundle_path, port=port, prometheus_lock=prometheus_lock,
)
gunicorn_app.run()


def _start_prod_batching_server(
saved_bundle_path: str,
api_server_port: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here. Why do we need to explicitly have api_server_port here instead of from config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

config: BentoMLConfiguration,
prometheus_lock: Optional[multiprocessing.Lock] = None,
):

logger.info("Starting BentoML Batching server in production mode..")

container = BentoMLContainer()
container.config.from_dict(config.as_dict())

from bentoml import marshal, server

container.wire(packages=[server, marshal])

# avoid load model before gunicorn fork
marshal_server = server.marshal_server.GunicornMarshalServer(
bundle_path=saved_bundle_path,
prometheus_lock=prometheus_lock,
outbound_host="localhost",
outbound_port=api_server_port,
)
marshal_server.run()
32 changes: 26 additions & 6 deletions bentoml/marshal/marshal.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ def __init__(
BentoMLContainer.config.api_server.max_request_size
],
zipkin_api_url: str = Provide[BentoMLContainer.config.tracing.zipkin_api_url],
outbound_unix_socket: str = None,
):
self.outbound_unix_socket = outbound_unix_socket
self.outbound_host = outbound_host
self.outbound_port = outbound_port
self.outbound_workers = outbound_workers
Expand Down Expand Up @@ -178,6 +180,7 @@ def __init__(
"or launch more microbatch instances to accept more concurrent connection.",
self.CONNECTION_LIMIT,
)
self._client = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need to lazy initialize client here. Client is pretty much always needed, correct?

Copy link
Member Author

@bojiang bojiang Mar 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client is pretty much always needed, correct?

Yeah. But IMO we best only do value assignment operations in the __init__.

In addition, in this case, an aiohttp client session should be initialized with a running asyncio event loop.
aio-libs/aiohttp#3331


def set_outbound_port(self, outbound_port):
self.outbound_port = outbound_port
Expand All @@ -187,6 +190,22 @@ def fetch_sema(self):
self._outbound_sema = NonBlockSema(self.outbound_workers)
return self._outbound_sema

def get_client(self):
if self._client is None:
jar = aiohttp.DummyCookieJar()
if self.outbound_unix_socket:
conn = aiohttp.UnixConnector(path=self.outbound_unix_socket,)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for UDS.

else:
conn = aiohttp.TCPConnector(limit=30)
self._client = aiohttp.ClientSession(
connector=conn, auto_decompress=False, cookie_jar=jar,
)
return self._client

def __del__(self):
if self._client is not None and not self._client.closed:
self._client.close()

def add_batch_handler(self, api_route, max_latency, max_batch_size):
'''
Params:
Expand Down Expand Up @@ -268,11 +287,14 @@ async def relay_handler(self, request):
span_name=f"[2]{url.path} relay",
) as trace_ctx:
headers.update(make_http_headers(trace_ctx))
async with aiohttp.ClientSession(auto_decompress=False) as client:
try:
client = self.get_client()
async with client.request(
request.method, url, data=data, headers=request.headers
) as resp:
body = await resp.read()
except aiohttp.client_exceptions.ClientConnectionError:
return aiohttp.web.Response(status=503, body=b"Service Unavailable")
return aiohttp.web.Response(
status=resp.status, body=body, headers=resp.headers,
)
Expand All @@ -298,11 +320,9 @@ async def _batch_handler_template(self, requests, api_route):
headers.update(make_http_headers(trace_ctx))
reqs_s = DataLoader.merge_requests(requests)
try:
async with aiohttp.ClientSession(auto_decompress=False) as client:
async with client.post(
api_url, data=reqs_s, headers=headers
) as resp:
raw = await resp.read()
client = self.get_client()
async with client.post(api_url, data=reqs_s, headers=headers) as resp:
raw = await resp.read()
except aiohttp.client_exceptions.ClientConnectionError as e:
raise RemoteException(
e, payload=HTTPResponse(status=503, body=b"Service Unavailable")
Expand Down
Loading