From 21d7e4b05622a02c4bd28e4b2dc883a47123aebf Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 15:47:57 -0600 Subject: [PATCH] feat(loader): propagate persistence sentinel --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/__init__.py | 3 +- mcp_plex/loader/pipeline/channels.py | 8 ++- mcp_plex/loader/pipeline/enrichment.py | 3 +- mcp_plex/loader/pipeline/persistence.py | 76 +++++++++++++++++++++- pyproject.toml | 2 +- tests/test_enrichment_stage.py | 19 +++--- tests/test_persistence_stage.py | 86 +++++++++++++++++++++++-- uv.lock | 2 +- 9 files changed, 179 insertions(+), 22 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 1a12bf8..c0eca25 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.69" +version = "0.26.70" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index 1e658b4..ab2dfc4 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -28,6 +28,7 @@ from .pipeline.channels import ( IMDbRetryQueue, INGEST_DONE, + PERSIST_DONE, IngestBatch, IngestQueue, MovieBatch, @@ -1035,7 +1036,7 @@ async def execute(self) -> None: error = exc finally: for _ in range(self._max_concurrent_upserts): - await self._points_queue.put(None) + await self._points_queue.put(PERSIST_DONE) upsert_results = await asyncio.gather( *upsert_tasks, return_exceptions=True ) diff --git a/mcp_plex/loader/pipeline/channels.py b/mcp_plex/loader/pipeline/channels.py index a26aab3..271c1e1 100644 --- a/mcp_plex/loader/pipeline/channels.py +++ b/mcp_plex/loader/pipeline/channels.py @@ -32,6 +32,9 @@ sentinel so legacy listeners that only check for ``None`` continue to work. """ +PERSIST_DONE: Final = object() +"""Sentinel object signaling that persistence has completed.""" + if TYPE_CHECKING: PersistencePayload: TypeAlias = list[models.PointStruct] else: # pragma: no cover - runtime fallback for typing-only alias @@ -63,7 +66,7 @@ class SampleBatch: IngestBatch = MovieBatch | EpisodeBatch | SampleBatch IngestQueueItem: TypeAlias = IngestBatch | None | object -PersistenceQueueItem: TypeAlias = PersistencePayload | None +PersistenceQueueItem: TypeAlias = PersistencePayload | None | object IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem] PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem] @@ -127,6 +130,7 @@ def snapshot(self) -> list[str]: _SampleBatch = SampleBatch _IngestBatch = IngestBatch _INGEST_DONE = INGEST_DONE +_PERSIST_DONE = PERSIST_DONE _IngestQueue = IngestQueue _PersistenceQueue = PersistenceQueue _require_positive = require_positive @@ -139,6 +143,7 @@ def snapshot(self) -> list[str]: "SampleBatch", "IngestBatch", "INGEST_DONE", + "PERSIST_DONE", "IngestQueue", "PersistenceQueue", "require_positive", @@ -149,6 +154,7 @@ def snapshot(self) -> list[str]: "_SampleBatch", "_IngestBatch", "_INGEST_DONE", + "_PERSIST_DONE", "_IngestQueue", "_PersistenceQueue", "_require_positive", diff --git a/mcp_plex/loader/pipeline/enrichment.py b/mcp_plex/loader/pipeline/enrichment.py index ea3c376..15c14f8 100644 --- a/mcp_plex/loader/pipeline/enrichment.py +++ b/mcp_plex/loader/pipeline/enrichment.py @@ -23,6 +23,7 @@ EpisodeBatch, IMDbRetryQueue, INGEST_DONE, + PERSIST_DONE, IngestQueue, MovieBatch, PersistenceQueue, @@ -155,7 +156,7 @@ async def run(self) -> None: if got_item: self._ingest_queue.task_done() - await self._persistence_queue.put(None) + await self._persistence_queue.put(PERSIST_DONE) async def _handle_movie_batch(self, batch: MovieBatch) -> None: """Enrich and forward Plex movie batches to the persistence stage.""" diff --git a/mcp_plex/loader/pipeline/persistence.py b/mcp_plex/loader/pipeline/persistence.py index 4fc380b..b0e7c0f 100644 --- a/mcp_plex/loader/pipeline/persistence.py +++ b/mcp_plex/loader/pipeline/persistence.py @@ -6,7 +6,12 @@ import logging from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence -from .channels import PersistenceQueue, chunk_sequence, require_positive +from .channels import ( + PERSIST_DONE, + PersistenceQueue, + chunk_sequence, + require_positive, +) if TYPE_CHECKING: # pragma: no cover - typing helpers only from qdrant_client import AsyncQdrantClient, models @@ -47,6 +52,7 @@ def __init__( self._upsert_fn = upsert_fn self._on_batch_complete = on_batch_complete self._logger = logging.getLogger("mcp_plex.loader.persistence") + self._retry_flush_attempted = False @property def logger(self) -> logging.Logger: @@ -102,6 +108,51 @@ def upsert_buffer_size(self) -> int: return self._upsert_buffer_size + async def _flush_retry_queue(self) -> int: + """Re-enqueue retry batches so they are persisted before shutdown.""" + + drained_count = 0 + while True: + try: + retry_payload = self._retry_queue.get_nowait() + except asyncio.QueueEmpty: + break + + drained_count += 1 + try: + await self.enqueue_points(retry_payload) + finally: + self._retry_queue.task_done() + + if drained_count: + self._logger.debug( + "Re-enqueued %d retry batch(es) before persistence shutdown.", + drained_count, + ) + + return drained_count + + def _drain_additional_sentinels(self) -> int: + """Remove queued sentinel tokens so payloads run before shutdown.""" + + drained = 0 + while True: + try: + queued_item = self._persistence_queue.get_nowait() + except asyncio.QueueEmpty: + break + + if queued_item in (None, PERSIST_DONE): + drained += 1 + self._persistence_queue.task_done() + continue + + # Non-sentinel payload encountered; put it back and stop draining. + self._persistence_queue.put_nowait(queued_item) + break + + return drained + async def enqueue_points( self, points: Sequence["models.PointStruct"] ) -> None: @@ -127,11 +178,30 @@ async def run(self, worker_id: int) -> None: while True: payload = await self._persistence_queue.get() try: - if payload is None: + if payload is None or payload is PERSIST_DONE: + sentinel_budget = 1 + self._drain_additional_sentinels() + drained_retry = 0 + if not self._retry_flush_attempted: + drained_retry = await self._flush_retry_queue() + if drained_retry: + self._retry_flush_attempted = True + for _ in range(sentinel_budget): + await self._persistence_queue.put(PERSIST_DONE) + continue + + remaining_tokens = max(sentinel_budget - 1, 0) + if remaining_tokens: + for _ in range(remaining_tokens): + await self._persistence_queue.put(PERSIST_DONE) self._logger.debug( "Persistence queue sentinel received; finishing run for worker %d.", worker_id, ) + if drained_retry and not self._retry_queue.empty(): + self._logger.warning( + "Retry queue still contains %d batch(es) after flush.", + self._retry_queue.qsize(), + ) return queue_size = self._persistence_queue.qsize() @@ -148,5 +218,5 @@ async def run(self, worker_id: int) -> None: ) finally: self._persistence_queue.task_done() - if payload is not None: + if payload not in (None, PERSIST_DONE): self._upsert_semaphore.release() diff --git a/pyproject.toml b/pyproject.toml index 94133ab..41b4c70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.69" +version = "0.26.70" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_enrichment_stage.py b/tests/test_enrichment_stage.py index 6bd1505..124ec20 100644 --- a/tests/test_enrichment_stage.py +++ b/tests/test_enrichment_stage.py @@ -14,6 +14,7 @@ EpisodeBatch, IMDbRetryQueue, INGEST_DONE, + PERSIST_DONE, MovieBatch, SampleBatch, ) @@ -238,7 +239,7 @@ async def scenario() -> list[list[AggregatedItem] | None]: while True: payload = await persistence_queue.get() emitted.append(payload) - if payload is None: + if payload in (None, PERSIST_DONE): break return emitted @@ -252,7 +253,7 @@ async def scenario() -> list[list[AggregatedItem] | None]: first, second, sentinel = emitted_batches assert isinstance(first, list) assert isinstance(second, list) - assert sentinel is None + assert sentinel is PERSIST_DONE assert [item.plex.rating_key for item in first] == ["1", "2"] assert [item.plex.rating_key for item in second] == ["3"] assert all(item.imdb is not None for item in first + second) @@ -420,7 +421,7 @@ async def scenario() -> list[list[AggregatedItem] | None]: while True: payload = await persistence_queue.get() payloads.append(payload) - if payload is None: + if payload in (None, PERSIST_DONE): break return payloads @@ -431,7 +432,7 @@ async def scenario() -> list[list[AggregatedItem] | None]: assert len(payloads) == 3 first, second, sentinel = payloads - assert sentinel is None + assert sentinel is PERSIST_DONE assert [item.plex.rating_key for item in first] == ["e1", "e2"] assert [item.plex.rating_key for item in second] == ["e3"] assert all(item.tmdb for item in first + second) @@ -489,7 +490,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], list[Any], list while True: payload = await persistence_queue.get() payloads.append(payload) - if payload is None: + if payload in (None, PERSIST_DONE): break return payloads, persistence_queue.put_payloads, items @@ -498,11 +499,11 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], list[Any], list assert any("Processed sample batch" in message for message in handler.messages) assert len(payloads) == 2 batch, sentinel = payloads - assert sentinel is None + assert sentinel is PERSIST_DONE assert isinstance(batch, list) assert batch == items assert put_payloads[0] == batch - assert put_payloads[-1] is None + assert put_payloads[-1] is PERSIST_DONE def test_enrichment_stage_retries_imdb_queue_when_idle(monkeypatch): @@ -601,7 +602,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], int, list[list[ while True: payload = await persistence_queue.get() payloads.append(payload) - if payload is None: + if payload in (None, PERSIST_DONE): break return payloads, retry_queue.qsize(), calls @@ -611,7 +612,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], int, list[list[ assert remaining == 0 assert len(payloads) == 3 first_batch, second_batch, sentinel = payloads - assert sentinel is None + assert sentinel is PERSIST_DONE assert isinstance(first_batch, list) assert isinstance(second_batch, list) assert first_batch[0].imdb is None diff --git a/tests/test_persistence_stage.py b/tests/test_persistence_stage.py index a552c43..d12dd45 100644 --- a/tests/test_persistence_stage.py +++ b/tests/test_persistence_stage.py @@ -4,7 +4,7 @@ import pytest from mcp_plex.loader import _upsert_in_batches -from mcp_plex.loader.pipeline.channels import PersistenceQueue +from mcp_plex.loader.pipeline.channels import PERSIST_DONE, PersistenceQueue from mcp_plex.loader.pipeline.persistence import PersistenceStage @@ -117,7 +117,7 @@ def on_batch_complete(worker_id: int, batch_size: int, queue_size: int) -> None: await persistence_queue.join() for _ in workers: - await persistence_queue.put(None) + await persistence_queue.put(PERSIST_DONE) await asyncio.gather(*workers) @@ -168,8 +168,8 @@ async def upsert_fn(batch: list[list[int]]) -> None: await stage.enqueue_points([[1, 2]]) await persistence_queue.join() - await persistence_queue.put(None) - await asyncio.gather(worker) + await persistence_queue.put(PERSIST_DONE) + await asyncio.wait_for(asyncio.gather(worker), timeout=1) failures: list[list[int]] = [] while not retry_queue.empty(): @@ -218,3 +218,81 @@ async def failing_upsert(batch: list[int]) -> None: semaphore_value = asyncio.run(scenario()) assert semaphore_value == 1 + + +def test_persistence_stage_flushes_retry_queue_before_exit() -> None: + async def scenario() -> tuple[list[list[int]], bool, bool]: + persistence_queue: PersistenceQueue = asyncio.Queue() + retry_queue: asyncio.Queue[list[list[int]]] = asyncio.Queue() + semaphore = asyncio.Semaphore(1) + + processed: list[list[int]] = [] + + async def upsert(batch: list[int]) -> None: + processed.append(list(batch)) + + stage = PersistenceStage( + client=_FakeQdrantClient(), + collection_name="media-items", + dense_vector_name="dense", + sparse_vector_name="sparse", + persistence_queue=persistence_queue, + retry_queue=retry_queue, + upsert_semaphore=semaphore, + upsert_buffer_size=5, + upsert_fn=upsert, + on_batch_complete=None, + ) + + worker = asyncio.create_task(stage.run(0)) + + await retry_queue.put([1, 2, 3]) + await persistence_queue.put(PERSIST_DONE) + + await asyncio.wait_for(worker, timeout=1) + + return processed, persistence_queue.empty(), retry_queue.empty() + + processed, persistence_empty, retry_empty = asyncio.run(scenario()) + + assert processed == [[1, 2, 3]] + assert persistence_empty + assert retry_empty + + +def test_persistence_stage_leaves_no_lingering_queue_items() -> None: + async def scenario() -> tuple[int, int]: + persistence_queue: PersistenceQueue = asyncio.Queue() + retry_queue: asyncio.Queue[list[int]] = asyncio.Queue() + semaphore = asyncio.Semaphore(2) + + async def upsert(batch: list[int]) -> None: + await asyncio.sleep(0) + + stage = PersistenceStage( + client=_FakeQdrantClient(), + collection_name="media-items", + dense_vector_name="dense", + sparse_vector_name="sparse", + persistence_queue=persistence_queue, + retry_queue=retry_queue, + upsert_semaphore=semaphore, + upsert_buffer_size=2, + upsert_fn=upsert, + on_batch_complete=None, + ) + + worker = asyncio.create_task(stage.run(0)) + + await stage.enqueue_points([1, 2, 3, 4]) + await asyncio.wait_for(persistence_queue.join(), timeout=1) + await persistence_queue.put(PERSIST_DONE) + + await asyncio.wait_for(worker, timeout=1) + + return persistence_queue.qsize(), retry_queue.qsize() + + persistence_remaining, retry_remaining = asyncio.run(scenario()) + + assert persistence_remaining == 0 + assert retry_remaining == 0 diff --git a/uv.lock b/uv.lock index 8f1fd6e..23b6113 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.69" +version = "0.26.70" source = { editable = "." } dependencies = [ { name = "fastapi" },