Skip to content

Commit

Permalink
fix: Always wait a small amount of time between inserts (#3168)
Browse files Browse the repository at this point in the history
* Always wait a small amount of time between inserts

* Ruff 🐶

* Parametrize sleep and sleep on empty queue

* Ruff 🐶

* Await sleep

* Update src/phoenix/db/bulk_inserter.py

Co-authored-by: Roger Yang <80478925+RogerHYang@users.noreply.github.com>

* fix: unflatten attributes when loading spans from `trace_dataset` (#3170)

* chore(main): release arize-phoenix 4.0.2 (#3152)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

---------

Co-authored-by: Roger Yang <80478925+RogerHYang@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed May 13, 2024
1 parent 62f0b9a commit 6e18e3c
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions src/phoenix/db/bulk_inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from itertools import islice
from time import perf_counter, time
from time import perf_counter
from typing import (
Any,
AsyncContextManager,
Expand Down Expand Up @@ -48,25 +48,20 @@ def __init__(
cache_for_dataloaders: Optional[CacheForDataLoaders] = None,
initial_batch_of_spans: Optional[Iterable[Tuple[Span, str]]] = None,
initial_batch_of_evaluations: Optional[Iterable[pb.Evaluation]] = None,
run_interval_in_seconds: float = 2,
sleep: float = 0.1,
max_num_per_transaction: int = 1000,
enable_prometheus: bool = False,
) -> None:
"""
:param db: A function to initiate a new database session.
:param initial_batch_of_spans: Initial batch of spans to insert.
:param run_interval_in_seconds: The time interval between the starts of each
bulk insert. If there's nothing to insert, the inserter goes back to sleep.
:param sleep: The time to sleep between bulk insertions
:param max_num_per_transaction: The maximum number of items to insert in a single
transaction. Multiple transactions will be used if there are more items in the batch.
"""
self._db = db
self._running = False
self._run_interval_seconds = run_interval_in_seconds

# tracks time between insertions to improve responsiveness for small batches
self._last_insertion_time = time() - self._run_interval_seconds

self._sleep = sleep
self._max_num_per_transaction = max_num_per_transaction
self._spans: List[Tuple[Span, str]] = (
[] if initial_batch_of_spans is None else list(initial_batch_of_spans)
Expand Down Expand Up @@ -104,9 +99,8 @@ async def _bulk_insert(self) -> None:
spans_buffer, evaluations_buffer = None, None
# start first insert immediately if the inserter has not run recently
while self._spans or self._evaluations or self._running:
next_run_at = self._last_insertion_time + self._run_interval_seconds
await asyncio.sleep(max(next_run_at - time(), 0))
if not (self._spans or self._evaluations):
await asyncio.sleep(self._sleep)
continue
# It's important to grab the buffers at the same time so there's
# no race condition, since an eval insertion will fail if the span
Expand All @@ -132,7 +126,7 @@ async def _bulk_insert(self) -> None:
evaluations_buffer = None
for project_rowid in transaction_result.updated_project_rowids:
self._last_updated_at_by_project[project_rowid] = datetime.now(timezone.utc)
self._last_insertion_time = time()
await asyncio.sleep(self._sleep)

async def _insert_spans(self, spans: List[Tuple[Span, str]]) -> TransactionResult:
transaction_result = TransactionResult()
Expand Down

0 comments on commit 6e18e3c

Please sign in to comment.