Skip to content

Commit d704841

Browse files
authored
feat(loader): add persistence stage placeholder (#96)
1 parent b032f24 commit d704841

File tree

2 files changed

+171
-1
lines changed

2 files changed

+171
-1
lines changed
Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,110 @@
1-
"""Placeholder module for the loader pipeline."""
1+
"""Persistence stage placeholder used by the loader pipeline."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import logging
7+
from typing import TYPE_CHECKING, Any
8+
9+
from .channels import PersistenceQueue
10+
11+
if TYPE_CHECKING: # pragma: no cover - typing helpers only
12+
from qdrant_client import AsyncQdrantClient, models
13+
14+
PersistencePayload = list[models.PointStruct]
15+
else: # pragma: no cover - runtime fallback when qdrant_client is absent
16+
AsyncQdrantClient = Any # type: ignore[assignment]
17+
PersistencePayload = list[Any]
18+
19+
20+
class PersistenceStage:
21+
"""Drain the persistence queue and coordinate Qdrant upserts."""
22+
23+
def __init__(
24+
self,
25+
*,
26+
client: AsyncQdrantClient,
27+
collection_name: str,
28+
dense_vector_name: str,
29+
sparse_vector_name: str,
30+
persistence_queue: PersistenceQueue,
31+
retry_queue: asyncio.Queue[PersistencePayload],
32+
upsert_semaphore: asyncio.Semaphore,
33+
) -> None:
34+
self._client = client
35+
self._collection_name = str(collection_name)
36+
self._dense_vector_name = str(dense_vector_name)
37+
self._sparse_vector_name = str(sparse_vector_name)
38+
self._persistence_queue = persistence_queue
39+
self._retry_queue = retry_queue
40+
self._upsert_semaphore = upsert_semaphore
41+
self._logger = logging.getLogger("mcp_plex.loader.persistence")
42+
43+
@property
44+
def logger(self) -> logging.Logger:
45+
"""Logger used by the persistence stage."""
46+
47+
return self._logger
48+
49+
@property
50+
def qdrant_client(self) -> AsyncQdrantClient:
51+
"""Return the Qdrant client used for persistence."""
52+
53+
return self._client
54+
55+
@property
56+
def collection_name(self) -> str:
57+
"""Name of the Qdrant collection targeted by persistence."""
58+
59+
return self._collection_name
60+
61+
@property
62+
def dense_vector_name(self) -> str:
63+
"""Name of the dense vector configuration in the collection."""
64+
65+
return self._dense_vector_name
66+
67+
@property
68+
def sparse_vector_name(self) -> str:
69+
"""Name of the sparse vector configuration in the collection."""
70+
71+
return self._sparse_vector_name
72+
73+
@property
74+
def persistence_queue(self) -> PersistenceQueue:
75+
"""Queue providing batches destined for Qdrant."""
76+
77+
return self._persistence_queue
78+
79+
@property
80+
def retry_queue(self) -> asyncio.Queue[PersistencePayload]:
81+
"""Queue used to persist batches that require retries."""
82+
83+
return self._retry_queue
84+
85+
@property
86+
def upsert_semaphore(self) -> asyncio.Semaphore:
87+
"""Semaphore limiting concurrent Qdrant upserts."""
88+
89+
return self._upsert_semaphore
90+
91+
async def run(self) -> None:
92+
"""Drain the persistence queue until a sentinel is received."""
93+
94+
while True:
95+
payload = await self._persistence_queue.get()
96+
try:
97+
if payload is None:
98+
self._logger.debug(
99+
"Persistence queue sentinel received; finishing placeholder run."
100+
)
101+
return
102+
103+
self._logger.debug(
104+
"Placeholder persistence stage received batch with %d items.",
105+
len(payload),
106+
)
107+
finally:
108+
self._persistence_queue.task_done()
109+
110+
await asyncio.sleep(0)

tests/test_persistence_stage.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import asyncio
2+
3+
from mcp_plex.loader.pipeline.channels import PersistenceQueue
4+
from mcp_plex.loader.pipeline.persistence import PersistenceStage
5+
6+
7+
class _FakeQdrantClient:
8+
pass
9+
10+
11+
def test_persistence_stage_logger_name() -> None:
12+
async def scenario() -> str:
13+
client = _FakeQdrantClient()
14+
persistence_queue: PersistenceQueue = asyncio.Queue()
15+
retry_queue: asyncio.Queue = asyncio.Queue()
16+
semaphore = asyncio.Semaphore(3)
17+
18+
stage = PersistenceStage(
19+
client=client,
20+
collection_name="media-items",
21+
dense_vector_name="dense",
22+
sparse_vector_name="sparse",
23+
persistence_queue=persistence_queue,
24+
retry_queue=retry_queue,
25+
upsert_semaphore=semaphore,
26+
)
27+
return stage.logger.name
28+
29+
logger_name = asyncio.run(scenario())
30+
31+
assert logger_name == "mcp_plex.loader.persistence"
32+
33+
34+
def test_persistence_stage_holds_dependencies() -> None:
35+
async def scenario() -> tuple[PersistenceStage, _FakeQdrantClient, PersistenceQueue, asyncio.Queue, asyncio.Semaphore]:
36+
client = _FakeQdrantClient()
37+
persistence_queue: PersistenceQueue = asyncio.Queue()
38+
retry_queue: asyncio.Queue = asyncio.Queue()
39+
semaphore = asyncio.Semaphore(5)
40+
41+
stage = PersistenceStage(
42+
client=client,
43+
collection_name="media-items",
44+
dense_vector_name="dense",
45+
sparse_vector_name="sparse",
46+
persistence_queue=persistence_queue,
47+
retry_queue=retry_queue,
48+
upsert_semaphore=semaphore,
49+
)
50+
51+
return stage, client, persistence_queue, retry_queue, semaphore
52+
53+
stage, client, persistence_queue, retry_queue, semaphore = asyncio.run(scenario())
54+
55+
assert stage.qdrant_client is client
56+
assert stage.collection_name == "media-items"
57+
assert stage.dense_vector_name == "dense"
58+
assert stage.sparse_vector_name == "sparse"
59+
assert stage.persistence_queue is persistence_queue
60+
assert stage.retry_queue is retry_queue
61+
assert stage.upsert_semaphore is semaphore

0 commit comments

Comments
 (0)