From cdc69bb5d9e2c2b6f0f02ca5b8ff4623fbda77e9 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Mon, 6 Oct 2025 19:32:13 -0600 Subject: [PATCH] fix(loader): enqueue batches using nowait fallback --- docker/pyproject.deps.toml | 2 +- mcp_plex/loader/pipeline/channels.py | 10 ++++++++++ mcp_plex/loader/pipeline/enrichment.py | 11 ++++++----- mcp_plex/loader/pipeline/ingestion.py | 15 +++++++++------ mcp_plex/loader/pipeline/persistence.py | 13 +++++++++---- pyproject.toml | 2 +- tests/test_enrichment_stage.py | 4 ++++ uv.lock | 2 +- 8 files changed, 41 insertions(+), 18 deletions(-) diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index edda86e..0ddfed0 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "1.0.14" +version = "1.0.15" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/pipeline/channels.py b/mcp_plex/loader/pipeline/channels.py index 2529f0a..91f9069 100644 --- a/mcp_plex/loader/pipeline/channels.py +++ b/mcp_plex/loader/pipeline/channels.py @@ -127,6 +127,15 @@ def snapshot(self) -> list[str]: return list(self._items) +async def enqueue_nowait(queue: asyncio.Queue[T], item: T) -> None: + """Place *item* onto *queue* using ``put_nowait`` with fallback backpressure.""" + + try: + queue.put_nowait(item) + except asyncio.QueueFull: + await queue.put(item) + + __all__ = [ "MovieBatch", "EpisodeBatch", @@ -140,4 +149,5 @@ def snapshot(self) -> list[str]: "PersistenceQueue", "chunk_sequence", "IMDbRetryQueue", + "enqueue_nowait", ] diff --git a/mcp_plex/loader/pipeline/enrichment.py b/mcp_plex/loader/pipeline/enrichment.py index 666436e..f4a4051 100644 --- a/mcp_plex/loader/pipeline/enrichment.py +++ b/mcp_plex/loader/pipeline/enrichment.py @@ -29,6 +29,7 @@ PersistenceQueue, SampleBatch, chunk_sequence, + enqueue_nowait, ) from ...common.validation import coerce_plex_tag_id, require_positive @@ -379,7 +380,7 @@ async def run(self) -> None: if got_item: self._ingest_queue.task_done() - await self._persistence_queue.put(PERSIST_DONE) + await enqueue_nowait(self._persistence_queue, PERSIST_DONE) self._logger.info( "Enrichment stage completed; persistence sentinel emitted (retry queue=%d).", self._imdb_retry_queue.qsize(), @@ -470,7 +471,7 @@ async def _emit_persistence_batch( if not aggregated: return payload = list(aggregated) - await self._persistence_queue.put(payload) + await enqueue_nowait(self._persistence_queue, payload) self._logger.debug( "Enqueued %d aggregated item(s) for persistence (queue size=%d).", len(payload), @@ -676,7 +677,7 @@ async def _retry_imdb_batches(self) -> bool: imdb_id for imdb_id in imdb_ids if imdb_results.get(imdb_id) is None ] for imdb_id in stalled_ids: - await self._imdb_retry_queue.put(imdb_id) + self._imdb_retry_queue.put_nowait(imdb_id) self._logger.info( "Retried IMDb batch with %d updates (retry queue=%d)", @@ -758,7 +759,7 @@ async def _fetch_imdb( if response.status_code == 429: if attempt == max_retries: if retry_queue is not None: - await retry_queue.put(imdb_id) + retry_queue.put_nowait(imdb_id) return None await asyncio.sleep(delay) delay *= 2 @@ -825,7 +826,7 @@ async def _fetch_imdb_batch( if attempt == max_retries: if retry_queue is not None: for imdb_id in chunk: - await retry_queue.put(imdb_id) + retry_queue.put_nowait(imdb_id) for imdb_id in chunk: results[imdb_id] = None break diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index f611379..5898029 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -20,6 +20,7 @@ MovieBatch, SampleBatch, chunk_sequence, + enqueue_nowait, ) from plexapi.server import PlexServer @@ -79,8 +80,8 @@ async def run(self) -> None: await self._run_plex_ingestion() self._logger.debug("Publishing ingestion completion sentinels to downstream stages.") - await self._output_queue.put(None) - await self._output_queue.put(self._completion_sentinel) + await enqueue_nowait(self._output_queue, None) + await enqueue_nowait(self._output_queue, self._completion_sentinel) self._logger.info( "Ingestion stage finished after queuing %d batch(es) covering %d item(s).", self._batches_ingested, @@ -223,7 +224,7 @@ def _fetch_movies(start: int) -> Sequence[Movie]: continue batch = MovieBatch(movies=list(batch_movies)) - await output_queue.put(batch) + await enqueue_nowait(output_queue, batch) self._items_ingested += len(batch_movies) self._batches_ingested += 1 movie_batches += 1 @@ -292,7 +293,7 @@ def _fetch_shows(start: int) -> Sequence[Show]: show=show, episodes=list(batch_episodes), ) - await output_queue.put(batch) + await enqueue_nowait(output_queue, batch) self._items_ingested += len(batch_episodes) self._batches_ingested += 1 episode_batches += 1 @@ -315,7 +316,7 @@ def _fetch_shows(start: int) -> Sequence[Show]: show=show, episodes=list(pending_episodes), ) - await output_queue.put(batch) + await enqueue_nowait(output_queue, batch) self._items_ingested += len(pending_episodes) self._batches_ingested += 1 episode_batches += 1 @@ -357,7 +358,9 @@ async def _enqueue_sample_batches( if not batch_items: continue - await self._output_queue.put(SampleBatch(items=batch_items)) + await enqueue_nowait( + self._output_queue, SampleBatch(items=batch_items) + ) self._items_ingested += len(batch_items) self._batches_ingested += 1 self._logger.debug( diff --git a/mcp_plex/loader/pipeline/persistence.py b/mcp_plex/loader/pipeline/persistence.py index 83f030f..3a821be 100644 --- a/mcp_plex/loader/pipeline/persistence.py +++ b/mcp_plex/loader/pipeline/persistence.py @@ -10,6 +10,7 @@ PERSIST_DONE, PersistenceQueue, chunk_sequence, + enqueue_nowait, ) from ...common.validation import require_positive @@ -176,7 +177,7 @@ async def enqueue_points( continue await self._upsert_semaphore.acquire() try: - await self._persistence_queue.put(batch) + await enqueue_nowait(self._persistence_queue, batch) except BaseException: self._upsert_semaphore.release() raise @@ -206,12 +207,14 @@ async def run(self, worker_id: int) -> None: if drained_retry: self._retry_flush_attempted = True for _ in range(sentinel_budget): - await self._persistence_queue.put(PERSIST_DONE) + await enqueue_nowait( + self._persistence_queue, PERSIST_DONE + ) continue drained_sentinels = max(sentinel_budget - 1, 0) for _ in range(drained_sentinels): - await self._persistence_queue.put(PERSIST_DONE) + await enqueue_nowait(self._persistence_queue, PERSIST_DONE) self._shutdown_tokens_seen += 1 outstanding_workers = max( @@ -222,7 +225,9 @@ async def run(self, worker_id: int) -> None: ) if additional_tokens: for _ in range(additional_tokens): - await self._persistence_queue.put(PERSIST_DONE) + await enqueue_nowait( + self._persistence_queue, PERSIST_DONE + ) self._logger.debug( "Persistence queue sentinel received; finishing run for worker %d.", worker_id, diff --git a/pyproject.toml b/pyproject.toml index 006272d..6fb0d5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "1.0.14" +version = "1.0.15" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_enrichment_stage.py b/tests/test_enrichment_stage.py index 124ec20..be31cf8 100644 --- a/tests/test_enrichment_stage.py +++ b/tests/test_enrichment_stage.py @@ -338,6 +338,10 @@ async def put(self, item: Any) -> None: # type: ignore[override] self.put_payloads.append(item) await super().put(item) + def put_nowait(self, item: Any) -> None: # type: ignore[override] + self.put_payloads.append(item) + super().put_nowait(item) + def test_enrichment_stage_caches_tmdb_show_results(monkeypatch): show_requests: list[tuple[str, str]] = [] diff --git a/uv.lock b/uv.lock index 0854bc5..4246f0c 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "1.0.14" +version = "1.0.15" source = { editable = "." } dependencies = [ { name = "fastapi" },