Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/pyproject.deps.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mcp-plex"
version = "1.0.14"
version = "1.0.15"
requires-python = ">=3.11,<3.13"
dependencies = [
"fastmcp>=2.11.2",
Expand Down
10 changes: 10 additions & 0 deletions mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ def snapshot(self) -> list[str]:
return list(self._items)


async def enqueue_nowait(queue: asyncio.Queue[T], item: T) -> None:
"""Place *item* onto *queue* using ``put_nowait`` with fallback backpressure."""

try:
queue.put_nowait(item)
except asyncio.QueueFull:
await queue.put(item)


__all__ = [
"MovieBatch",
"EpisodeBatch",
Expand All @@ -140,4 +149,5 @@ def snapshot(self) -> list[str]:
"PersistenceQueue",
"chunk_sequence",
"IMDbRetryQueue",
"enqueue_nowait",
]
11 changes: 6 additions & 5 deletions mcp_plex/loader/pipeline/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
PersistenceQueue,
SampleBatch,
chunk_sequence,
enqueue_nowait,
)
from ...common.validation import coerce_plex_tag_id, require_positive

Expand Down Expand Up @@ -379,7 +380,7 @@ async def run(self) -> None:
if got_item:
self._ingest_queue.task_done()

await self._persistence_queue.put(PERSIST_DONE)
await enqueue_nowait(self._persistence_queue, PERSIST_DONE)
self._logger.info(
"Enrichment stage completed; persistence sentinel emitted (retry queue=%d).",
self._imdb_retry_queue.qsize(),
Expand Down Expand Up @@ -470,7 +471,7 @@ async def _emit_persistence_batch(
if not aggregated:
return
payload = list(aggregated)
await self._persistence_queue.put(payload)
await enqueue_nowait(self._persistence_queue, payload)
self._logger.debug(
"Enqueued %d aggregated item(s) for persistence (queue size=%d).",
len(payload),
Expand Down Expand Up @@ -676,7 +677,7 @@ async def _retry_imdb_batches(self) -> bool:
imdb_id for imdb_id in imdb_ids if imdb_results.get(imdb_id) is None
]
for imdb_id in stalled_ids:
await self._imdb_retry_queue.put(imdb_id)
self._imdb_retry_queue.put_nowait(imdb_id)

self._logger.info(
"Retried IMDb batch with %d updates (retry queue=%d)",
Expand Down Expand Up @@ -758,7 +759,7 @@ async def _fetch_imdb(
if response.status_code == 429:
if attempt == max_retries:
if retry_queue is not None:
await retry_queue.put(imdb_id)
retry_queue.put_nowait(imdb_id)
return None
await asyncio.sleep(delay)
delay *= 2
Expand Down Expand Up @@ -825,7 +826,7 @@ async def _fetch_imdb_batch(
if attempt == max_retries:
if retry_queue is not None:
for imdb_id in chunk:
await retry_queue.put(imdb_id)
retry_queue.put_nowait(imdb_id)
for imdb_id in chunk:
results[imdb_id] = None
break
Expand Down
15 changes: 9 additions & 6 deletions mcp_plex/loader/pipeline/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MovieBatch,
SampleBatch,
chunk_sequence,
enqueue_nowait,
)

from plexapi.server import PlexServer
Expand Down Expand Up @@ -79,8 +80,8 @@ async def run(self) -> None:
await self._run_plex_ingestion()

self._logger.debug("Publishing ingestion completion sentinels to downstream stages.")
await self._output_queue.put(None)
await self._output_queue.put(self._completion_sentinel)
await enqueue_nowait(self._output_queue, None)
await enqueue_nowait(self._output_queue, self._completion_sentinel)
self._logger.info(
"Ingestion stage finished after queuing %d batch(es) covering %d item(s).",
self._batches_ingested,
Expand Down Expand Up @@ -223,7 +224,7 @@ def _fetch_movies(start: int) -> Sequence[Movie]:
continue

batch = MovieBatch(movies=list(batch_movies))
await output_queue.put(batch)
await enqueue_nowait(output_queue, batch)
self._items_ingested += len(batch_movies)
self._batches_ingested += 1
movie_batches += 1
Expand Down Expand Up @@ -292,7 +293,7 @@ def _fetch_shows(start: int) -> Sequence[Show]:
show=show,
episodes=list(batch_episodes),
)
await output_queue.put(batch)
await enqueue_nowait(output_queue, batch)
self._items_ingested += len(batch_episodes)
self._batches_ingested += 1
episode_batches += 1
Expand All @@ -315,7 +316,7 @@ def _fetch_shows(start: int) -> Sequence[Show]:
show=show,
episodes=list(pending_episodes),
)
await output_queue.put(batch)
await enqueue_nowait(output_queue, batch)
self._items_ingested += len(pending_episodes)
self._batches_ingested += 1
episode_batches += 1
Expand Down Expand Up @@ -357,7 +358,9 @@ async def _enqueue_sample_batches(
if not batch_items:
continue

await self._output_queue.put(SampleBatch(items=batch_items))
await enqueue_nowait(
self._output_queue, SampleBatch(items=batch_items)
)
self._items_ingested += len(batch_items)
self._batches_ingested += 1
self._logger.debug(
Expand Down
13 changes: 9 additions & 4 deletions mcp_plex/loader/pipeline/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
PERSIST_DONE,
PersistenceQueue,
chunk_sequence,
enqueue_nowait,
)
from ...common.validation import require_positive

Expand Down Expand Up @@ -176,7 +177,7 @@ async def enqueue_points(
continue
await self._upsert_semaphore.acquire()
try:
await self._persistence_queue.put(batch)
await enqueue_nowait(self._persistence_queue, batch)
except BaseException:
self._upsert_semaphore.release()
raise
Expand Down Expand Up @@ -206,12 +207,14 @@ async def run(self, worker_id: int) -> None:
if drained_retry:
self._retry_flush_attempted = True
for _ in range(sentinel_budget):
await self._persistence_queue.put(PERSIST_DONE)
await enqueue_nowait(
self._persistence_queue, PERSIST_DONE
)
continue

drained_sentinels = max(sentinel_budget - 1, 0)
for _ in range(drained_sentinels):
await self._persistence_queue.put(PERSIST_DONE)
await enqueue_nowait(self._persistence_queue, PERSIST_DONE)

self._shutdown_tokens_seen += 1
outstanding_workers = max(
Expand All @@ -222,7 +225,9 @@ async def run(self, worker_id: int) -> None:
)
if additional_tokens:
for _ in range(additional_tokens):
await self._persistence_queue.put(PERSIST_DONE)
await enqueue_nowait(
self._persistence_queue, PERSIST_DONE
)
self._logger.debug(
"Persistence queue sentinel received; finishing run for worker %d.",
worker_id,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "1.0.14"
version = "1.0.15"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
4 changes: 4 additions & 0 deletions tests/test_enrichment_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ async def put(self, item: Any) -> None: # type: ignore[override]
self.put_payloads.append(item)
await super().put(item)

def put_nowait(self, item: Any) -> None: # type: ignore[override]
self.put_payloads.append(item)
super().put_nowait(item)


def test_enrichment_stage_caches_tmdb_show_results(monkeypatch):
show_requests: list[tuple[str, str]] = []
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.