Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sauyon committed Sep 20, 2023
1 parent 28d209a commit 56088fe
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 38 deletions.
14 changes: 8 additions & 6 deletions docs/source/concepts/runner.rst
Expand Up @@ -365,9 +365,10 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: intelligent_wait
strategy_options:
decay: 0.95
strategy:
name: intelligent_wait
options:
decay: 0.95
.. tab-item:: Individual Runner
:sync: individual_runner
Expand All @@ -381,9 +382,10 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: intelligent_wait
strategy_options:
decay: 0.95
strategy:
name: intelligent_wait
options:
decay: 0.95
Resource Allocation
^^^^^^^^^^^^^^^^^^^
Expand Down
7 changes: 4 additions & 3 deletions docs/source/guides/batching.rst
Expand Up @@ -125,9 +125,10 @@ them into a batch before dispatching it to begin execution. There are three opti
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: target_latency
strategy_options:
latency_ms: 200
strategy:
name: target_latency
options:
latency_ms: 200
Monitoring
----------
Expand Down
1 change: 0 additions & 1 deletion src/bentoml/_internal/configuration/containers.py
Expand Up @@ -150,7 +150,6 @@ def _finalize(self):
"metrics",
"traffic",
"strategy",
"strategy_options",
"workers_per_resource",
]
global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
Expand Down
20 changes: 14 additions & 6 deletions src/bentoml/_internal/configuration/v1/__init__.py
Expand Up @@ -142,16 +142,24 @@
},
}
_RUNNER_CONFIG = {
s.Optional("optimizer"): {
s.Optional("name"): str,
s.Optional("options"): dict,
},
s.Optional("optimizer"): s.Or(
str,
{
s.Optional("name"): str,
s.Optional("options"): dict,
},
),
s.Optional("batching"): {
s.Optional("enabled"): bool,
s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero),
s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero),
s.Optional("strategy"): str,
s.Optional("strategy_options"): dict,
s.Optional("strategy"): s.Or(
str,
{
s.Optional("name"): str,
s.Optional("options"): dict,
},
),
},
# NOTE: there is a distinction between being unset and None here; if set to 'None'
# in configuration for a specific runner, it will override the global configuration.
Expand Down
Expand Up @@ -84,9 +84,10 @@ runners:
# there are currently two available options:
# - target_latency: attempt to ensure requests are served within a certain amount of time
# - adaptive: wait a variable amount of time in order to optimize for minimal average latency
strategy: adaptive
strategy_options:
decay: 0.95
strategy:
name: adaptive
options:
decay: 0.95
max_latency_ms: 60000
logging:
access:
Expand Down
167 changes: 162 additions & 5 deletions src/bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -15,7 +15,6 @@
import numpy as np

from ...exceptions import BadInput
from ..utils import cached_property
from ..utils.alg import TokenBucket

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,6 +57,7 @@ class Optimizer(ABC):
optimizer_id: str
n_skipped_sample: int = 0

@abstractmethod
def __init__(self, options: dict[str, t.Any]):
pass

Expand Down Expand Up @@ -92,6 +92,9 @@ def __init__(self, options: dict[str, t.Any]):
self.time = options["time_ms"]

def predict(self, batch_size: int):
# explicitly unused parameters
del batch_size

return self.time


Expand Down Expand Up @@ -171,14 +174,165 @@ def trigger_refresh(self):
T_OUT = t.TypeVar("T_OUT")


class CorkDispatcher:
BATCHING_STRATEGY_REGISTRY = {}


class BatchingStrategy(ABC):
strategy_id: str

@abstractmethod
def __init__(self, optimizer: Optimizer, options: dict[t.Any, t.Any]):
pass

@abstractmethod
async def batch(
self,
optimizer: Optimizer,
queue: t.Deque[Job],
max_latency: float,
max_batch_size: int,
tick_interval: float,
dispatch: t.Callable[[t.Sequence[Job], int], None],
):
pass

def __init_subclass__(cls, strategy_id: str):
BATCHING_STRATEGY_REGISTRY[strategy_id] = cls
cls.strategy_id = strategy_id


class TargetLatencyStrategy(BatchingStrategy, strategy_id="target_latency"):
latency: float = 1.0

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "latency":
self.latency = options[key] / 1000.0
else:
logger.warning(
f"Strategy 'target_latency' ignoring unknown configuration key '{key}'."
)

async def batch(
self,
optimizer: Optimizer,
queue: t.Deque[Job],
max_latency: float,
max_batch_size: int,
tick_interval: float,
dispatch: t.Callable[[t.Sequence[Job], int], None],
):
# explicitly unused parameters
del max_latency

n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.predict(n)

while latency_0 < self.latency and n < max_batch_size:
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.predict(n)

await asyncio.sleep(tick_interval)

# call
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
n_call_out += 1
batch_size += job.data.sample.batch_size
inputs_info = tuple(queue.pop() for _ in range(n_call_out))
dispatch(inputs_info, batch_size)


class AdaptiveStrategy(BatchingStrategy, strategy_id="adaptive"):
decay: float = 0.95

n_kept_samples = 50
avg_wait_times: collections.deque[float]
avg_req_wait: float = 0

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "decay":
self.decay = options[key]
elif key == "n_kept_samples":
self.n_kept_samples = options[key]
else:
logger.warning(
"Strategy 'adaptive' ignoring unknown configuration value"
)

self.avg_wait_times = collections.deque(maxlen=self.n_kept_samples)

async def batch(
self,
optimizer: Optimizer,
queue: t.Deque[Job],
max_latency: float,
max_batch_size: int,
tick_interval: float,
dispatch: t.Callable[[t.Sequence[Job], int], None],
):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
wn = now - queue[-1].enqueue_time
latency_0 = w0 + optimizer.predict(n)
while (
# if we don't already have enough requests,
n < max_batch_size
# we are not about to cancel the first request,
and latency_0 + tick_interval <= max_latency * 0.95
# and waiting will cause average latency to decrese
and n * (wn + tick_interval + optimizer.predict_diff(n, n + 1))
<= self.avg_req_wait * self.decay
):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.predict(n)

# wait for additional requests to arrive
await asyncio.sleep(tick_interval)

# dispatch the batch
inputs_info: list[Job] = []
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
n_call_out += 1

for _ in range(n_call_out):
job = queue.pop()
batch_size += job.data.sample.batch_size
new_wait = (now - job.enqueue_time) / self.n_kept_samples
if len(self.avg_wait_times) == self.n_kept_samples:
oldest_wait = self.avg_wait_times.popleft()
self.avg_req_wait = self.avg_req_wait - oldest_wait + new_wait
else:
# avg deliberately undercounts until we hit n_kept_sample for simplicity
self.avg_req_wait += new_wait
inputs_info.append(job)

dispatch(inputs_info, batch_size)


class Dispatcher:
"""
A decorator that:
* wrap batch function
* implement CORK algorithm to cork & release calling of wrapped function
The wrapped function should be an async function.
"""

background_tasks: set[asyncio.Task[None]] = set()

def __init__(
self,
max_latency_in_ms: int,
Expand Down Expand Up @@ -296,15 +450,18 @@ async def train_optimizer(
else:
n_call_out = 0
batch_size = 0
for job in queue:
if batch_size + job.data.sample.batch_size <= max_batch_size:
for job in self._queue:
if (
batch_size + job.data.sample.batch_size
<= self.max_batch_size
):
n_call_out += 1
batch_size += job.data.sample.batch_size

req_count += 1
# call
inputs_info = tuple(self._queue.pop() for _ in range(n_call_out))
dispatch(inputs_info, batch_size)
self._dispatch(inputs_info, batch_size)
except Exception as e: # pylint: disable=broad-except
logger.error(traceback.format_exc(), exc_info=e)

Expand Down
4 changes: 4 additions & 0 deletions src/bentoml/_internal/models/model.py
Expand Up @@ -40,6 +40,8 @@
from ..utils import normalize_labels_value

if t.TYPE_CHECKING:
from ..marshal.dispatcher import BatchingStrategy
from ..marshal.dispatcher import Optimizer
from ..runner import Runnable
from ..runner import Runner
from ..runner.strategy import Strategy
Expand Down Expand Up @@ -319,6 +321,7 @@ def to_runner(
name: str = "",
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
optimizer: Optimizer | None = None,
batching_strategy: BatchingStrategy | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
embedded: bool = False,
Expand Down Expand Up @@ -356,6 +359,7 @@ def to_runner(
models=[self],
max_batch_size=max_batch_size,
max_latency_ms=max_latency_ms,
optimizer=optimizer,
batching_strategy=batching_strategy,
method_configs=method_configs,
embedded=embedded,
Expand Down

0 comments on commit 56088fe

Please sign in to comment.