diff --git a/docs/source/concepts/runner.rst b/docs/source/concepts/runner.rst index 8dd0ae1f32c..ce9ffc80950 100644 --- a/docs/source/concepts/runner.rst +++ b/docs/source/concepts/runner.rst @@ -365,9 +365,10 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu enabled: true max_batch_size: 100 max_latency_ms: 500 - strategy: intelligent_wait - strategy_options: - decay: 0.95 + strategy: + name: intelligent_wait + options: + decay: 0.95 .. tab-item:: Individual Runner :sync: individual_runner @@ -381,9 +382,10 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu enabled: true max_batch_size: 100 max_latency_ms: 500 - strategy: intelligent_wait - strategy_options: - decay: 0.95 + strategy: + name: intelligent_wait + options: + decay: 0.95 Resource Allocation ^^^^^^^^^^^^^^^^^^^ diff --git a/docs/source/guides/batching.rst b/docs/source/guides/batching.rst index 15fb55572ac..10822090c74 100644 --- a/docs/source/guides/batching.rst +++ b/docs/source/guides/batching.rst @@ -125,9 +125,10 @@ them into a batch before dispatching it to begin execution. There are three opti enabled: true max_batch_size: 100 max_latency_ms: 500 - strategy: target_latency - strategy_options: - latency_ms: 200 + strategy: + name: target_latency + options: + latency_ms: 200 Monitoring ---------- diff --git a/src/bentoml/_internal/configuration/containers.py b/src/bentoml/_internal/configuration/containers.py index 12747b4f96a..93bc4973c70 100644 --- a/src/bentoml/_internal/configuration/containers.py +++ b/src/bentoml/_internal/configuration/containers.py @@ -150,7 +150,6 @@ def _finalize(self): "metrics", "traffic", "strategy", - "strategy_options", "workers_per_resource", ] global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS} diff --git a/src/bentoml/_internal/configuration/v1/__init__.py b/src/bentoml/_internal/configuration/v1/__init__.py index 430c966b35e..9ff05dadf49 100644 --- a/src/bentoml/_internal/configuration/v1/__init__.py +++ b/src/bentoml/_internal/configuration/v1/__init__.py @@ -142,16 +142,24 @@ }, } _RUNNER_CONFIG = { - s.Optional("optimizer"): { - s.Optional("name"): str, - s.Optional("options"): dict, - }, + s.Optional("optimizer"): s.Or( + str, + { + s.Optional("name"): str, + s.Optional("options"): dict, + }, + ), s.Optional("batching"): { s.Optional("enabled"): bool, s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero), s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero), - s.Optional("strategy"): str, - s.Optional("strategy_options"): dict, + s.Optional("strategy"): s.Or( + str, + { + s.Optional("name"): str, + s.Optional("options"): dict, + }, + ), }, # NOTE: there is a distinction between being unset and None here; if set to 'None' # in configuration for a specific runner, it will override the global configuration. diff --git a/src/bentoml/_internal/configuration/v1/default_configuration.yaml b/src/bentoml/_internal/configuration/v1/default_configuration.yaml index b5588949966..82cee747bbb 100644 --- a/src/bentoml/_internal/configuration/v1/default_configuration.yaml +++ b/src/bentoml/_internal/configuration/v1/default_configuration.yaml @@ -84,9 +84,10 @@ runners: # there are currently two available options: # - target_latency: attempt to ensure requests are served within a certain amount of time # - adaptive: wait a variable amount of time in order to optimize for minimal average latency - strategy: adaptive - strategy_options: - decay: 0.95 + strategy: + name: adaptive + options: + decay: 0.95 max_latency_ms: 60000 logging: access: diff --git a/src/bentoml/_internal/marshal/dispatcher.py b/src/bentoml/_internal/marshal/dispatcher.py index d8abfca147b..649032728c9 100644 --- a/src/bentoml/_internal/marshal/dispatcher.py +++ b/src/bentoml/_internal/marshal/dispatcher.py @@ -15,7 +15,6 @@ import numpy as np from ...exceptions import BadInput -from ..utils import cached_property from ..utils.alg import TokenBucket logger = logging.getLogger(__name__) @@ -58,6 +57,7 @@ class Optimizer(ABC): optimizer_id: str n_skipped_sample: int = 0 + @abstractmethod def __init__(self, options: dict[str, t.Any]): pass @@ -92,6 +92,9 @@ def __init__(self, options: dict[str, t.Any]): self.time = options["time_ms"] def predict(self, batch_size: int): + # explicitly unused parameters + del batch_size + return self.time @@ -171,7 +174,156 @@ def trigger_refresh(self): T_OUT = t.TypeVar("T_OUT") -class CorkDispatcher: +BATCHING_STRATEGY_REGISTRY = {} + + +class BatchingStrategy(ABC): + strategy_id: str + + @abstractmethod + def __init__(self, optimizer: Optimizer, options: dict[t.Any, t.Any]): + pass + + @abstractmethod + async def batch( + self, + optimizer: Optimizer, + queue: t.Deque[Job], + max_latency: float, + max_batch_size: int, + tick_interval: float, + dispatch: t.Callable[[t.Sequence[Job], int], None], + ): + pass + + def __init_subclass__(cls, strategy_id: str): + BATCHING_STRATEGY_REGISTRY[strategy_id] = cls + cls.strategy_id = strategy_id + + +class TargetLatencyStrategy(BatchingStrategy, strategy_id="target_latency"): + latency: float = 1.0 + + def __init__(self, options: dict[t.Any, t.Any]): + for key in options: + if key == "latency": + self.latency = options[key] / 1000.0 + else: + logger.warning( + f"Strategy 'target_latency' ignoring unknown configuration key '{key}'." + ) + + async def batch( + self, + optimizer: Optimizer, + queue: t.Deque[Job], + max_latency: float, + max_batch_size: int, + tick_interval: float, + dispatch: t.Callable[[t.Sequence[Job], int], None], + ): + # explicitly unused parameters + del max_latency + + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.predict(n) + + while latency_0 < self.latency and n < max_batch_size: + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.predict(n) + + await asyncio.sleep(tick_interval) + + # call + n_call_out = 0 + batch_size = 0 + for job in queue: + if batch_size + job.data.sample.batch_size <= max_batch_size: + n_call_out += 1 + batch_size += job.data.sample.batch_size + inputs_info = tuple(queue.pop() for _ in range(n_call_out)) + dispatch(inputs_info, batch_size) + + +class AdaptiveStrategy(BatchingStrategy, strategy_id="adaptive"): + decay: float = 0.95 + + n_kept_samples = 50 + avg_wait_times: collections.deque[float] + avg_req_wait: float = 0 + + def __init__(self, options: dict[t.Any, t.Any]): + for key in options: + if key == "decay": + self.decay = options[key] + elif key == "n_kept_samples": + self.n_kept_samples = options[key] + else: + logger.warning( + "Strategy 'adaptive' ignoring unknown configuration value" + ) + + self.avg_wait_times = collections.deque(maxlen=self.n_kept_samples) + + async def batch( + self, + optimizer: Optimizer, + queue: t.Deque[Job], + max_latency: float, + max_batch_size: int, + tick_interval: float, + dispatch: t.Callable[[t.Sequence[Job], int], None], + ): + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + wn = now - queue[-1].enqueue_time + latency_0 = w0 + optimizer.predict(n) + while ( + # if we don't already have enough requests, + n < max_batch_size + # we are not about to cancel the first request, + and latency_0 + tick_interval <= max_latency * 0.95 + # and waiting will cause average latency to decrese + and n * (wn + tick_interval + optimizer.predict_diff(n, n + 1)) + <= self.avg_req_wait * self.decay + ): + n = len(queue) + now = time.time() + w0 = now - queue[0].enqueue_time + latency_0 = w0 + optimizer.predict(n) + + # wait for additional requests to arrive + await asyncio.sleep(tick_interval) + + # dispatch the batch + inputs_info: list[Job] = [] + n_call_out = 0 + batch_size = 0 + for job in queue: + if batch_size + job.data.sample.batch_size <= max_batch_size: + n_call_out += 1 + + for _ in range(n_call_out): + job = queue.pop() + batch_size += job.data.sample.batch_size + new_wait = (now - job.enqueue_time) / self.n_kept_samples + if len(self.avg_wait_times) == self.n_kept_samples: + oldest_wait = self.avg_wait_times.popleft() + self.avg_req_wait = self.avg_req_wait - oldest_wait + new_wait + else: + # avg deliberately undercounts until we hit n_kept_sample for simplicity + self.avg_req_wait += new_wait + inputs_info.append(job) + + dispatch(inputs_info, batch_size) + + +class Dispatcher: """ A decorator that: * wrap batch function @@ -179,6 +331,8 @@ class CorkDispatcher: The wrapped function should be an async function. """ + background_tasks: set[asyncio.Task[None]] = set() + def __init__( self, max_latency_in_ms: int, @@ -296,15 +450,18 @@ async def train_optimizer( else: n_call_out = 0 batch_size = 0 - for job in queue: - if batch_size + job.data.sample.batch_size <= max_batch_size: + for job in self._queue: + if ( + batch_size + job.data.sample.batch_size + <= self.max_batch_size + ): n_call_out += 1 batch_size += job.data.sample.batch_size req_count += 1 # call inputs_info = tuple(self._queue.pop() for _ in range(n_call_out)) - dispatch(inputs_info, batch_size) + self._dispatch(inputs_info, batch_size) except Exception as e: # pylint: disable=broad-except logger.error(traceback.format_exc(), exc_info=e) diff --git a/src/bentoml/_internal/models/model.py b/src/bentoml/_internal/models/model.py index 78c1a72f9f0..443339cf25e 100644 --- a/src/bentoml/_internal/models/model.py +++ b/src/bentoml/_internal/models/model.py @@ -40,6 +40,8 @@ from ..utils import normalize_labels_value if t.TYPE_CHECKING: + from ..marshal.dispatcher import BatchingStrategy + from ..marshal.dispatcher import Optimizer from ..runner import Runnable from ..runner import Runner from ..runner.strategy import Strategy @@ -319,6 +321,7 @@ def to_runner( name: str = "", max_batch_size: int | None = None, max_latency_ms: int | None = None, + optimizer: Optimizer | None = None, batching_strategy: BatchingStrategy | None = None, method_configs: dict[str, dict[str, int]] | None = None, embedded: bool = False, @@ -356,6 +359,7 @@ def to_runner( models=[self], max_batch_size=max_batch_size, max_latency_ms=max_latency_ms, + optimizer=optimizer, batching_strategy=batching_strategy, method_configs=method_configs, embedded=embedded, diff --git a/src/bentoml/_internal/runner/runner.py b/src/bentoml/_internal/runner/runner.py index 91e92822606..1901b0bd1ee 100644 --- a/src/bentoml/_internal/runner/runner.py +++ b/src/bentoml/_internal/runner/runner.py @@ -10,6 +10,7 @@ from simple_di import Provide from simple_di import inject +from ...exceptions import BentoMLConfigException from ...exceptions import StateException from ..configuration.containers import BentoMLContainer from ..marshal.dispatcher import BATCHING_STRATEGY_REGISTRY @@ -253,32 +254,50 @@ def __init__( "batching_strategy" ) - if config["batching"]["strategy"] not in BATCHING_STRATEGY_REGISTRY: - raise BentoMLConfigException( - f"Unknown batching strategy '{config['batching']['strategy']}'. Available strategies are: {','.join(BATCHING_STRATEGY_REGISTRY.keys())}.", - ) + optimizer_conf = config["optimizer"] + if isinstance(optimizer_conf, str): + optimizer_name = optimizer_conf + optimizer_opts = {} + else: + optimizer_name = optimizer_conf["name"] + optimizer_opts = optimizer_conf["options"] - if config["optimizer"]["name"] not in OPTIMIZER_REGISTRY: + if optimizer_name not in OPTIMIZER_REGISTRY: raise BentoMLConfigException( - f"Unknown optimizer '{config['optimizer']['name']}'. Available optimizers are: {','.join(OPTIMIZER_REGISTRY.keys())}." + f"Unknown optimizer '{optimizer_name}'. Available optimizers are: {','.join(OPTIMIZER_REGISTRY.keys())}." ) try: - default_optimizer = OPTIMIZER_REGISTRY[config["optimizer"]["name"]]( - config["optimizer"]["options"] - ) + default_optimizer = OPTIMIZER_REGISTRY[optimizer_name](optimizer_opts) except Exception as e: raise BentoMLConfigException( - f"Initializing strategy '{config['optimizer']['name']}' with configured options ({pprint(config['optimizer']['options'])}) failed." + f"Initializing strategy '{optimizer_name}' with configured options ({pprint(optimizer_opts)}) failed." ) from e + strategy_conf = config["batching"]["strategy"] + if isinstance(strategy_conf, str): + pass + else: + strategy_conf["name"] + strategy_conf["options"] + + if config["batching"]["strategy"] not in BATCHING_STRATEGY_REGISTRY: + raise BentoMLConfigException( + f"Unknown batching strategy '{config['batching']['strategy']}'. Available strategies are: {','.join(BATCHING_STRATEGY_REGISTRY.keys())}.", + ) + try: - default_batching_strategy = BATCHING_STRATEGY_REGISTRY[ - config["batching"]["strategy"] - ](config["batching"]["strategy_options"]) + if isinstance(strategy_conf, str): + default_batching_strategy = BATCHING_STRATEGY_REGISTRY[ + strategy_conf + ]({}) + else: + default_batching_strategy = BATCHING_STRATEGY_REGISTRY[ + strategy_conf["name"] + ](strategy_conf["options"]) except Exception as e: raise BentoMLConfigException( - f"Initializing strategy '{config['batching']['strategy']}' with configured options ({pprint(config['batching']['strategy_options'])}) failed." + f"Initializing strategy '{pprint(config['batching'])}' failed." ) from e runner_method_map[method_name] = RunnerMethod( diff --git a/src/bentoml/_internal/server/runner_app.py b/src/bentoml/_internal/server/runner_app.py index 8e33a0ae863..3a86a3ad03a 100644 --- a/src/bentoml/_internal/server/runner_app.py +++ b/src/bentoml/_internal/server/runner_app.py @@ -4,6 +4,7 @@ import json import logging import pickle +import traceback import typing as t from typing import TYPE_CHECKING @@ -16,6 +17,7 @@ from ..configuration.containers import BentoMLContainer from ..context import component_context from ..context import trace_context +from ..marshal.dispatcher import Dispatcher from ..runner.container import AutoContainer from ..runner.container import Payload from ..runner.utils import PAYLOAD_META_HEADER