Skip to content

Commit

Permalink
fix(persistence): db race condition between spans and evals (#2905)
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerHYang committed Apr 16, 2024
1 parent 379e336 commit 2666464
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions src/phoenix/db/bulk_inserter.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,32 @@ def _queue_evaluation(self, evaluation: pb.Evaluation) -> None:
self._evaluations.append(evaluation)

async def _bulk_insert(self) -> None:
spans_buffer, evaluations_buffer = None, None
next_run_at = time() + self._run_interval_seconds
while self._spans or self._evaluations or self._running:
await asyncio.sleep(next_run_at - time())
next_run_at = time() + self._run_interval_seconds
# It's important to grab the buffers at the same time so there's
# no race condition, since an eval insertion will fail if the span
# it references doesn't exist. Grabbing the eval buffer later may
# include an eval whose span is in the queue but missed being
# included in the span buffer that was grabbed previously.
if self._spans:
await self._insert_spans()
spans_buffer = self._spans
self._spans = []
if self._evaluations:
await self._insert_evaluations()
evaluations_buffer = self._evaluations
self._evaluations = []
# Spans should be inserted before the evaluations, since an evaluation
# insertion will fail if the span it references doesn't exist.
if spans_buffer:
await self._insert_spans(spans_buffer)
spans_buffer = None
if evaluations_buffer:
await self._insert_evaluations(evaluations_buffer)
evaluations_buffer = None

async def _insert_spans(self) -> None:
spans = self._spans
self._spans = []
async def _insert_spans(self, spans: List[Tuple[Span, str]]) -> None:
for i in range(0, len(spans), self._max_num_per_transaction):
try:
async with self._db() as session:
Expand All @@ -103,9 +117,7 @@ async def _insert_spans(self) -> None:
except Exception:
logger.exception("Failed to insert spans")

async def _insert_evaluations(self) -> None:
evaluations = self._evaluations
self._evaluations = []
async def _insert_evaluations(self, evaluations: List[pb.Evaluation]) -> None:
for i in range(0, len(evaluations), self._max_num_per_transaction):
try:
async with self._db() as session:
Expand Down

0 comments on commit 2666464

Please sign in to comment.