Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "0.26.68"
version = "0.26.69"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
79 changes: 37 additions & 42 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
chunk_sequence,
require_positive,
)
from .pipeline.persistence import PersistenceStage as _PersistenceStage
from ..common.types import (
AggregatedItem,
ExternalIDs,
Expand Down Expand Up @@ -985,6 +986,18 @@ def __init__(
self._ingest_start = now
self._enrich_start = now
self._upsert_start = now
self._persistence_stage = _PersistenceStage(
client=self._client,
collection_name=self._collection_name,
dense_vector_name=self._dense_model_name,
sparse_vector_name=self._sparse_model_name,
persistence_queue=self._points_queue,
retry_queue=self._qdrant_retry_queue,
upsert_semaphore=self._upsert_capacity,
upsert_buffer_size=self._upsert_buffer_size,
upsert_fn=self._perform_upsert,
on_batch_complete=self._handle_upsert_batch,
)

@property
def qdrant_retry_queue(self) -> asyncio.Queue[list[models.PointStruct]]:
Expand All @@ -1009,7 +1022,7 @@ async def execute(self) -> None:
for worker_id in range(self._enrichment_workers)
]
upsert_tasks = [
asyncio.create_task(self._upsert_worker(worker_id))
asyncio.create_task(self._persistence_stage.run(worker_id))
for worker_id in range(self._max_concurrent_upserts)
]
error: BaseException | None = None
Expand Down Expand Up @@ -1297,48 +1310,30 @@ async def _emit_points(self, aggregated: Sequence[AggregatedItem]) -> None:
build_point(item, self._dense_model_name, self._sparse_model_name)
for item in aggregated
]
for chunk in _chunk_sequence(points, self._upsert_buffer_size):
batch = list(chunk)
if not batch:
continue
await self._upsert_capacity.acquire()
try:
await self._points_queue.put(batch)
except BaseException:
self._upsert_capacity.release()
raise
await self._persistence_stage.enqueue_points(points)

async def _upsert_worker(self, worker_id: int) -> None:
while True:
batch = await self._points_queue.get()
if batch is None:
self._points_queue.task_done()
break
logger.info(
"Upsert worker %d handling %d points (queue size=%d)",
worker_id,
len(batch),
self._points_queue.qsize(),
)
try:
if self._upserted_points == 0:
self._upsert_start = time.perf_counter()
await _upsert_in_batches(
self._client,
self._collection_name,
batch,
retry_queue=self._qdrant_retry_queue,
)
self._upserted_points += len(batch)
self._log_progress(
f"Upsert worker {worker_id}",
self._upserted_points,
self._upsert_start,
self._points_queue.qsize(),
)
finally:
self._points_queue.task_done()
self._upsert_capacity.release()
async def _perform_upsert(
self, batch: Sequence[models.PointStruct]
) -> None:
await _upsert_in_batches(
self._client,
self._collection_name,
list(batch),
retry_queue=self._qdrant_retry_queue,
)

def _handle_upsert_batch(
self, worker_id: int, batch_size: int, queue_size: int
) -> None:
if self._upserted_points == 0:
self._upsert_start = time.perf_counter()
self._upserted_points += batch_size
self._log_progress(
f"Upsert worker {worker_id}",
self._upserted_points,
self._upsert_start,
queue_size,
)
async def run(
plex_url: Optional[str],
plex_token: Optional[str],
Expand Down
58 changes: 50 additions & 8 deletions mcp_plex/loader/pipeline/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import asyncio
import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence

from .channels import PersistenceQueue
from .channels import PersistenceQueue, chunk_sequence, require_positive

if TYPE_CHECKING: # pragma: no cover - typing helpers only
from qdrant_client import AsyncQdrantClient, models
Expand All @@ -30,6 +30,9 @@ def __init__(
persistence_queue: PersistenceQueue,
retry_queue: asyncio.Queue[PersistencePayload],
upsert_semaphore: asyncio.Semaphore,
upsert_buffer_size: int,
upsert_fn: Callable[[PersistencePayload], Awaitable[None]],
on_batch_complete: Callable[[int, int, int], None] | None = None,
) -> None:
self._client = client
self._collection_name = str(collection_name)
Expand All @@ -38,6 +41,11 @@ def __init__(
self._persistence_queue = persistence_queue
self._retry_queue = retry_queue
self._upsert_semaphore = upsert_semaphore
self._upsert_buffer_size = require_positive(
upsert_buffer_size, name="upsert_buffer_size"
)
self._upsert_fn = upsert_fn
self._on_batch_complete = on_batch_complete
self._logger = logging.getLogger("mcp_plex.loader.persistence")

@property
Expand Down Expand Up @@ -88,23 +96,57 @@ def upsert_semaphore(self) -> asyncio.Semaphore:

return self._upsert_semaphore

async def run(self) -> None:
@property
def upsert_buffer_size(self) -> int:
"""Maximum number of points per persistence batch."""

return self._upsert_buffer_size

async def enqueue_points(
self, points: Sequence["models.PointStruct"]
) -> None:
"""Chunk *points* and place them on the persistence queue."""

if not points:
return

for chunk in chunk_sequence(list(points), self._upsert_buffer_size):
batch = list(chunk)
if not batch:
continue
await self._upsert_semaphore.acquire()
try:
await self._persistence_queue.put(batch)
except BaseException:
self._upsert_semaphore.release()
raise

async def run(self, worker_id: int) -> None:
"""Drain the persistence queue until a sentinel is received."""

while True:
payload = await self._persistence_queue.get()
try:
if payload is None:
self._logger.debug(
"Persistence queue sentinel received; finishing placeholder run."
"Persistence queue sentinel received; finishing run for worker %d.",
worker_id,
)
return

self._logger.debug(
"Placeholder persistence stage received batch with %d items.",
queue_size = self._persistence_queue.qsize()
self._logger.info(
"Upsert worker %d handling %d points (queue size=%d)",
worker_id,
len(payload),
queue_size,
)
await self._upsert_fn(payload)
if self._on_batch_complete is not None:
self._on_batch_complete(
worker_id, len(payload), self._persistence_queue.qsize()
)
finally:
self._persistence_queue.task_done()

await asyncio.sleep(0)
if payload is not None:
self._upsert_semaphore.release()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.68"
version = "0.26.69"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
Loading