From af699c500a2e366f929c2f6f606724af4314cb68 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Tue, 23 Sep 2025 08:49:26 -0600 Subject: [PATCH 1/5] refactor(loader): track imdb retry queue for persistence --- mcp_plex/loader.py | 52 ++++++++++++++++++++++++++++++++------- pyproject.toml | 2 +- tests/test_loader_unit.py | 7 ++++-- uv.lock | 2 +- 4 files changed, 50 insertions(+), 13 deletions(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index e4221ee..d8eaf61 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -5,8 +5,9 @@ import json import logging import sys +from collections import deque from pathlib import Path -from typing import Awaitable, List, Optional, Sequence, TypeVar +from typing import Awaitable, Iterable, List, Optional, Sequence, TypeVar import click import httpx @@ -43,10 +44,38 @@ _imdb_cache: IMDbCache | None = None _imdb_max_retries: int = 3 _imdb_backoff: float = 1.0 -_imdb_retry_queue: asyncio.Queue[str] | None = None +_imdb_retry_queue: "_IMDbRetryQueue" | None = None _imdb_batch_limit: int = 5 _qdrant_batch_size: int = 1000 + +class _IMDbRetryQueue(asyncio.Queue[str]): + """Queue that tracks items in a deque for safe serialization.""" + + def __init__(self, initial: Iterable[str] | None = None): + super().__init__() + self._items: deque[str] = deque() + if initial: + for imdb_id in initial: + imdb_id_str = str(imdb_id) + super().put_nowait(imdb_id_str) + self._items.append(imdb_id_str) + + def put_nowait(self, item: str) -> None: # type: ignore[override] + super().put_nowait(item) + self._items.append(item) + + def get_nowait(self) -> str: # type: ignore[override] + item = super().get_nowait() + if self._items: + self._items.popleft() + return item + + def snapshot(self) -> list[str]: + """Return a list of the current queue contents.""" + + return list(self._items) + # Known Qdrant-managed dense embedding models with their dimensionality and # similarity metric. To support a new server-side embedding model, add an entry # here with the appropriate vector size and `models.Distance` value. @@ -182,14 +211,20 @@ def _load_imdb_retry_queue(path: Path) -> None: """Populate the retry queue from a JSON file if it exists.""" global _imdb_retry_queue - _imdb_retry_queue = asyncio.Queue() + ids: list[str] = [] if path.exists(): try: - ids = json.loads(path.read_text()) - for imdb_id in ids: - _imdb_retry_queue.put_nowait(str(imdb_id)) + data = json.loads(path.read_text()) + if isinstance(data, list): + ids = [str(imdb_id) for imdb_id in data] + else: + logger.warning( + "IMDb retry queue file %s did not contain a list; ignoring its contents", + path, + ) except Exception: logger.exception("Failed to load IMDb retry queue from %s", path) + _imdb_retry_queue = _IMDbRetryQueue(ids) async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None: @@ -210,8 +245,7 @@ def _persist_imdb_retry_queue(path: Path) -> None: if _imdb_retry_queue is None: return - ids = list(_imdb_retry_queue._queue) # type: ignore[attr-defined] - path.write_text(json.dumps(ids)) + path.write_text(json.dumps(_imdb_retry_queue.snapshot())) async def _upsert_in_batches( @@ -598,7 +632,7 @@ async def run( async with httpx.AsyncClient(timeout=30) as client: await _process_imdb_retry_queue(client) else: - _imdb_retry_queue = asyncio.Queue() + _imdb_retry_queue = _IMDbRetryQueue() items: List[AggregatedItem] if sample_dir is not None: diff --git a/pyproject.toml b/pyproject.toml index 9dde796..f66a8e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.28" +version = "0.26.29" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index 9ea0589..c08c1a4 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -370,6 +370,9 @@ async def first_run(): async def second_run(): _load_imdb_retry_queue(queue_path) + assert loader._imdb_retry_queue is not None + assert loader._imdb_retry_queue.qsize() == 1 + assert loader._imdb_retry_queue.snapshot() == ["tt0111161"] async with httpx.AsyncClient(transport=httpx.MockTransport(second_transport)) as client: await _process_imdb_retry_queue(client) _persist_imdb_retry_queue(queue_path) @@ -388,8 +391,7 @@ def test_load_imdb_retry_queue_invalid_json(tmp_path): def test_process_imdb_retry_queue_requeues(monkeypatch): - queue: asyncio.Queue[str] = asyncio.Queue() - queue.put_nowait("tt0111161") + queue = loader._IMDbRetryQueue(["tt0111161"]) monkeypatch.setattr(loader, "_imdb_retry_queue", queue) async def fake_fetch(client, imdb_id): @@ -403,6 +405,7 @@ async def run_test(): asyncio.run(run_test()) assert queue.qsize() == 1 + assert queue.snapshot() == ["tt0111161"] def test_resolve_tmdb_season_number_matches_name(): diff --git a/uv.lock b/uv.lock index 015b753..e7c7bea 100644 --- a/uv.lock +++ b/uv.lock @@ -690,7 +690,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.28" +version = "0.26.29" source = { editable = "." } dependencies = [ { name = "fastapi" }, From ac09dfa0b168955709bfbb05a0c98aa1d1053c13 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Tue, 23 Sep 2025 08:50:56 -0600 Subject: [PATCH 2/5] Update mcp_plex/loader.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- mcp_plex/loader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index d8eaf61..05d5ccf 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -67,8 +67,10 @@ def put_nowait(self, item: str) -> None: # type: ignore[override] def get_nowait(self) -> str: # type: ignore[override] item = super().get_nowait() - if self._items: + try: self._items.popleft() + except IndexError: + raise RuntimeError("Desynchronization: asyncio.Queue has items but self._items is empty.") return item def snapshot(self) -> list[str]: From 15b45f56bdd3e6d43ef7ccfdc96d6d35f6d66006 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Tue, 23 Sep 2025 08:52:00 -0600 Subject: [PATCH 3/5] Update mcp_plex/loader.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- mcp_plex/loader.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index 05d5ccf..2ca4c4f 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -66,11 +66,13 @@ def put_nowait(self, item: str) -> None: # type: ignore[override] self._items.append(item) def get_nowait(self) -> str: # type: ignore[override] - item = super().get_nowait() + if not self._items: + raise RuntimeError("Desynchronization: self._items is empty but asyncio.Queue is not.") try: - self._items.popleft() - except IndexError: - raise RuntimeError("Desynchronization: asyncio.Queue has items but self._items is empty.") + item = super().get_nowait() + except asyncio.QueueEmpty: + raise RuntimeError("Desynchronization: asyncio.Queue is empty but self._items is not.") + self._items.popleft() return item def snapshot(self) -> list[str]: From 9bdd27c2032caa7fcc7f16ec3f3e43d6310299ef Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Tue, 23 Sep 2025 08:53:22 -0600 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- mcp_plex/loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index 2ca4c4f..85542e0 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -67,11 +67,11 @@ def put_nowait(self, item: str) -> None: # type: ignore[override] def get_nowait(self) -> str: # type: ignore[override] if not self._items: - raise RuntimeError("Desynchronization: self._items is empty but asyncio.Queue is not.") + raise RuntimeError("Desynchronization: self._items is empty but asyncio.Queue is not empty.") try: item = super().get_nowait() except asyncio.QueueEmpty: - raise RuntimeError("Desynchronization: asyncio.Queue is empty but self._items is not.") + raise RuntimeError("Desynchronization: asyncio.Queue is empty but self._items is not empty.") self._items.popleft() return item From d0cf6fe5d3040be490370cdcc6a7e1df7528d929 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Tue, 23 Sep 2025 08:54:37 -0600 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- mcp_plex/loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index 85542e0..bf17b2c 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -67,11 +67,11 @@ def put_nowait(self, item: str) -> None: # type: ignore[override] def get_nowait(self) -> str: # type: ignore[override] if not self._items: - raise RuntimeError("Desynchronization: self._items is empty but asyncio.Queue is not empty.") + raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") try: item = super().get_nowait() except asyncio.QueueEmpty: - raise RuntimeError("Desynchronization: asyncio.Queue is empty but self._items is not empty.") + raise RuntimeError("Desynchronization: self._items is not empty but asyncio.Queue is empty.") self._items.popleft() return item