Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "0.26.69"
version = "0.26.70"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
3 changes: 2 additions & 1 deletion mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .pipeline.channels import (
IMDbRetryQueue,
INGEST_DONE,
PERSIST_DONE,
IngestBatch,
IngestQueue,
MovieBatch,
Expand Down Expand Up @@ -1035,7 +1036,7 @@ async def execute(self) -> None:
error = exc
finally:
for _ in range(self._max_concurrent_upserts):
await self._points_queue.put(None)
await self._points_queue.put(PERSIST_DONE)
upsert_results = await asyncio.gather(
*upsert_tasks, return_exceptions=True
)
Expand Down
8 changes: 7 additions & 1 deletion mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
sentinel so legacy listeners that only check for ``None`` continue to work.
"""

PERSIST_DONE: Final = object()
"""Sentinel object signaling that persistence has completed."""

if TYPE_CHECKING:
PersistencePayload: TypeAlias = list[models.PointStruct]
else: # pragma: no cover - runtime fallback for typing-only alias
Expand Down Expand Up @@ -63,7 +66,7 @@ class SampleBatch:
IngestBatch = MovieBatch | EpisodeBatch | SampleBatch

IngestQueueItem: TypeAlias = IngestBatch | None | object
PersistenceQueueItem: TypeAlias = PersistencePayload | None
PersistenceQueueItem: TypeAlias = PersistencePayload | None | object

IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem]
PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem]
Expand Down Expand Up @@ -127,6 +130,7 @@ def snapshot(self) -> list[str]:
_SampleBatch = SampleBatch
_IngestBatch = IngestBatch
_INGEST_DONE = INGEST_DONE
_PERSIST_DONE = PERSIST_DONE
_IngestQueue = IngestQueue
_PersistenceQueue = PersistenceQueue
_require_positive = require_positive
Expand All @@ -139,6 +143,7 @@ def snapshot(self) -> list[str]:
"SampleBatch",
"IngestBatch",
"INGEST_DONE",
"PERSIST_DONE",
"IngestQueue",
"PersistenceQueue",
"require_positive",
Expand All @@ -149,6 +154,7 @@ def snapshot(self) -> list[str]:
"_SampleBatch",
"_IngestBatch",
"_INGEST_DONE",
"_PERSIST_DONE",
"_IngestQueue",
"_PersistenceQueue",
"_require_positive",
Expand Down
3 changes: 2 additions & 1 deletion mcp_plex/loader/pipeline/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
EpisodeBatch,
IMDbRetryQueue,
INGEST_DONE,
PERSIST_DONE,
IngestQueue,
MovieBatch,
PersistenceQueue,
Expand Down Expand Up @@ -155,7 +156,7 @@ async def run(self) -> None:
if got_item:
self._ingest_queue.task_done()

await self._persistence_queue.put(None)
await self._persistence_queue.put(PERSIST_DONE)

async def _handle_movie_batch(self, batch: MovieBatch) -> None:
"""Enrich and forward Plex movie batches to the persistence stage."""
Expand Down
76 changes: 73 additions & 3 deletions mcp_plex/loader/pipeline/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import logging
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence

from .channels import PersistenceQueue, chunk_sequence, require_positive
from .channels import (
PERSIST_DONE,
PersistenceQueue,
chunk_sequence,
require_positive,
)

if TYPE_CHECKING: # pragma: no cover - typing helpers only
from qdrant_client import AsyncQdrantClient, models
Expand Down Expand Up @@ -47,6 +52,7 @@ def __init__(
self._upsert_fn = upsert_fn
self._on_batch_complete = on_batch_complete
self._logger = logging.getLogger("mcp_plex.loader.persistence")
self._retry_flush_attempted = False

@property
def logger(self) -> logging.Logger:
Expand Down Expand Up @@ -102,6 +108,51 @@ def upsert_buffer_size(self) -> int:

return self._upsert_buffer_size

async def _flush_retry_queue(self) -> int:
"""Re-enqueue retry batches so they are persisted before shutdown."""

drained_count = 0
while True:
try:
retry_payload = self._retry_queue.get_nowait()
except asyncio.QueueEmpty:
break

drained_count += 1
try:
await self.enqueue_points(retry_payload)
finally:
self._retry_queue.task_done()

if drained_count:
self._logger.debug(
"Re-enqueued %d retry batch(es) before persistence shutdown.",
drained_count,
)

return drained_count

def _drain_additional_sentinels(self) -> int:
"""Remove queued sentinel tokens so payloads run before shutdown."""

drained = 0
while True:
try:
queued_item = self._persistence_queue.get_nowait()
except asyncio.QueueEmpty:
break

if queued_item in (None, PERSIST_DONE):
drained += 1
self._persistence_queue.task_done()
continue

# Non-sentinel payload encountered; put it back and stop draining.
self._persistence_queue.put_nowait(queued_item)
break

return drained

async def enqueue_points(
self, points: Sequence["models.PointStruct"]
) -> None:
Expand All @@ -127,11 +178,30 @@ async def run(self, worker_id: int) -> None:
while True:
payload = await self._persistence_queue.get()
try:
if payload is None:
if payload is None or payload is PERSIST_DONE:
sentinel_budget = 1 + self._drain_additional_sentinels()
drained_retry = 0
if not self._retry_flush_attempted:
drained_retry = await self._flush_retry_queue()
if drained_retry:
self._retry_flush_attempted = True
for _ in range(sentinel_budget):
await self._persistence_queue.put(PERSIST_DONE)
continue

remaining_tokens = max(sentinel_budget - 1, 0)
if remaining_tokens:
for _ in range(remaining_tokens):
await self._persistence_queue.put(PERSIST_DONE)
self._logger.debug(
"Persistence queue sentinel received; finishing run for worker %d.",
worker_id,
)
if drained_retry and not self._retry_queue.empty():
self._logger.warning(
"Retry queue still contains %d batch(es) after flush.",
self._retry_queue.qsize(),
)
return

queue_size = self._persistence_queue.qsize()
Expand All @@ -148,5 +218,5 @@ async def run(self, worker_id: int) -> None:
)
finally:
self._persistence_queue.task_done()
if payload is not None:
if payload not in (None, PERSIST_DONE):
self._upsert_semaphore.release()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.69"
version = "0.26.70"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
19 changes: 10 additions & 9 deletions tests/test_enrichment_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
EpisodeBatch,
IMDbRetryQueue,
INGEST_DONE,
PERSIST_DONE,
MovieBatch,
SampleBatch,
)
Expand Down Expand Up @@ -238,7 +239,7 @@ async def scenario() -> list[list[AggregatedItem] | None]:
while True:
payload = await persistence_queue.get()
emitted.append(payload)
if payload is None:
if payload in (None, PERSIST_DONE):
break
return emitted

Expand All @@ -252,7 +253,7 @@ async def scenario() -> list[list[AggregatedItem] | None]:
first, second, sentinel = emitted_batches
assert isinstance(first, list)
assert isinstance(second, list)
assert sentinel is None
assert sentinel is PERSIST_DONE
assert [item.plex.rating_key for item in first] == ["1", "2"]
assert [item.plex.rating_key for item in second] == ["3"]
assert all(item.imdb is not None for item in first + second)
Expand Down Expand Up @@ -420,7 +421,7 @@ async def scenario() -> list[list[AggregatedItem] | None]:
while True:
payload = await persistence_queue.get()
payloads.append(payload)
if payload is None:
if payload in (None, PERSIST_DONE):
break
return payloads

Expand All @@ -431,7 +432,7 @@ async def scenario() -> list[list[AggregatedItem] | None]:

assert len(payloads) == 3
first, second, sentinel = payloads
assert sentinel is None
assert sentinel is PERSIST_DONE
assert [item.plex.rating_key for item in first] == ["e1", "e2"]
assert [item.plex.rating_key for item in second] == ["e3"]
assert all(item.tmdb for item in first + second)
Expand Down Expand Up @@ -489,7 +490,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], list[Any], list
while True:
payload = await persistence_queue.get()
payloads.append(payload)
if payload is None:
if payload in (None, PERSIST_DONE):
break
return payloads, persistence_queue.put_payloads, items

Expand All @@ -498,11 +499,11 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], list[Any], list
assert any("Processed sample batch" in message for message in handler.messages)
assert len(payloads) == 2
batch, sentinel = payloads
assert sentinel is None
assert sentinel is PERSIST_DONE
assert isinstance(batch, list)
assert batch == items
assert put_payloads[0] == batch
assert put_payloads[-1] is None
assert put_payloads[-1] is PERSIST_DONE


def test_enrichment_stage_retries_imdb_queue_when_idle(monkeypatch):
Expand Down Expand Up @@ -601,7 +602,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], int, list[list[
while True:
payload = await persistence_queue.get()
payloads.append(payload)
if payload is None:
if payload in (None, PERSIST_DONE):
break
return payloads, retry_queue.qsize(), calls

Expand All @@ -611,7 +612,7 @@ async def scenario() -> tuple[list[list[AggregatedItem] | None], int, list[list[
assert remaining == 0
assert len(payloads) == 3
first_batch, second_batch, sentinel = payloads
assert sentinel is None
assert sentinel is PERSIST_DONE
assert isinstance(first_batch, list)
assert isinstance(second_batch, list)
assert first_batch[0].imdb is None
Expand Down
Loading