diff --git a/mcp_plex/loader/pipeline/persistence.py b/mcp_plex/loader/pipeline/persistence.py index 45363e7..539708f 100644 --- a/mcp_plex/loader/pipeline/persistence.py +++ b/mcp_plex/loader/pipeline/persistence.py @@ -1 +1,110 @@ -"""Placeholder module for the loader pipeline.""" +"""Persistence stage placeholder used by the loader pipeline.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import TYPE_CHECKING, Any + +from .channels import PersistenceQueue + +if TYPE_CHECKING: # pragma: no cover - typing helpers only + from qdrant_client import AsyncQdrantClient, models + + PersistencePayload = list[models.PointStruct] +else: # pragma: no cover - runtime fallback when qdrant_client is absent + AsyncQdrantClient = Any # type: ignore[assignment] + PersistencePayload = list[Any] + + +class PersistenceStage: + """Drain the persistence queue and coordinate Qdrant upserts.""" + + def __init__( + self, + *, + client: AsyncQdrantClient, + collection_name: str, + dense_vector_name: str, + sparse_vector_name: str, + persistence_queue: PersistenceQueue, + retry_queue: asyncio.Queue[PersistencePayload], + upsert_semaphore: asyncio.Semaphore, + ) -> None: + self._client = client + self._collection_name = str(collection_name) + self._dense_vector_name = str(dense_vector_name) + self._sparse_vector_name = str(sparse_vector_name) + self._persistence_queue = persistence_queue + self._retry_queue = retry_queue + self._upsert_semaphore = upsert_semaphore + self._logger = logging.getLogger("mcp_plex.loader.persistence") + + @property + def logger(self) -> logging.Logger: + """Logger used by the persistence stage.""" + + return self._logger + + @property + def qdrant_client(self) -> AsyncQdrantClient: + """Return the Qdrant client used for persistence.""" + + return self._client + + @property + def collection_name(self) -> str: + """Name of the Qdrant collection targeted by persistence.""" + + return self._collection_name + + @property + def dense_vector_name(self) -> str: + """Name of the dense vector configuration in the collection.""" + + return self._dense_vector_name + + @property + def sparse_vector_name(self) -> str: + """Name of the sparse vector configuration in the collection.""" + + return self._sparse_vector_name + + @property + def persistence_queue(self) -> PersistenceQueue: + """Queue providing batches destined for Qdrant.""" + + return self._persistence_queue + + @property + def retry_queue(self) -> asyncio.Queue[PersistencePayload]: + """Queue used to persist batches that require retries.""" + + return self._retry_queue + + @property + def upsert_semaphore(self) -> asyncio.Semaphore: + """Semaphore limiting concurrent Qdrant upserts.""" + + return self._upsert_semaphore + + async def run(self) -> None: + """Drain the persistence queue until a sentinel is received.""" + + while True: + payload = await self._persistence_queue.get() + try: + if payload is None: + self._logger.debug( + "Persistence queue sentinel received; finishing placeholder run." + ) + return + + self._logger.debug( + "Placeholder persistence stage received batch with %d items.", + len(payload), + ) + finally: + self._persistence_queue.task_done() + + await asyncio.sleep(0) diff --git a/tests/test_persistence_stage.py b/tests/test_persistence_stage.py new file mode 100644 index 0000000..d7e0c54 --- /dev/null +++ b/tests/test_persistence_stage.py @@ -0,0 +1,61 @@ +import asyncio + +from mcp_plex.loader.pipeline.channels import PersistenceQueue +from mcp_plex.loader.pipeline.persistence import PersistenceStage + + +class _FakeQdrantClient: + pass + + +def test_persistence_stage_logger_name() -> None: + async def scenario() -> str: + client = _FakeQdrantClient() + persistence_queue: PersistenceQueue = asyncio.Queue() + retry_queue: asyncio.Queue = asyncio.Queue() + semaphore = asyncio.Semaphore(3) + + stage = PersistenceStage( + client=client, + collection_name="media-items", + dense_vector_name="dense", + sparse_vector_name="sparse", + persistence_queue=persistence_queue, + retry_queue=retry_queue, + upsert_semaphore=semaphore, + ) + return stage.logger.name + + logger_name = asyncio.run(scenario()) + + assert logger_name == "mcp_plex.loader.persistence" + + +def test_persistence_stage_holds_dependencies() -> None: + async def scenario() -> tuple[PersistenceStage, _FakeQdrantClient, PersistenceQueue, asyncio.Queue, asyncio.Semaphore]: + client = _FakeQdrantClient() + persistence_queue: PersistenceQueue = asyncio.Queue() + retry_queue: asyncio.Queue = asyncio.Queue() + semaphore = asyncio.Semaphore(5) + + stage = PersistenceStage( + client=client, + collection_name="media-items", + dense_vector_name="dense", + sparse_vector_name="sparse", + persistence_queue=persistence_queue, + retry_queue=retry_queue, + upsert_semaphore=semaphore, + ) + + return stage, client, persistence_queue, retry_queue, semaphore + + stage, client, persistence_queue, retry_queue, semaphore = asyncio.run(scenario()) + + assert stage.qdrant_client is client + assert stage.collection_name == "media-items" + assert stage.dense_vector_name == "dense" + assert stage.sparse_vector_name == "sparse" + assert stage.persistence_queue is persistence_queue + assert stage.retry_queue is retry_queue + assert stage.upsert_semaphore is semaphore