diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index e4221ee..bf17b2c 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -5,8 +5,9 @@ import json import logging import sys +from collections import deque from pathlib import Path -from typing import Awaitable, List, Optional, Sequence, TypeVar +from typing import Awaitable, Iterable, List, Optional, Sequence, TypeVar import click import httpx @@ -43,10 +44,42 @@ _imdb_cache: IMDbCache | None = None _imdb_max_retries: int = 3 _imdb_backoff: float = 1.0 -_imdb_retry_queue: asyncio.Queue[str] | None = None +_imdb_retry_queue: "_IMDbRetryQueue" | None = None _imdb_batch_limit: int = 5 _qdrant_batch_size: int = 1000 + +class _IMDbRetryQueue(asyncio.Queue[str]): + """Queue that tracks items in a deque for safe serialization.""" + + def __init__(self, initial: Iterable[str] | None = None): + super().__init__() + self._items: deque[str] = deque() + if initial: + for imdb_id in initial: + imdb_id_str = str(imdb_id) + super().put_nowait(imdb_id_str) + self._items.append(imdb_id_str) + + def put_nowait(self, item: str) -> None: # type: ignore[override] + super().put_nowait(item) + self._items.append(item) + + def get_nowait(self) -> str: # type: ignore[override] + if not self._items: + raise RuntimeError("Desynchronization: Queue is not empty but self._items is empty.") + try: + item = super().get_nowait() + except asyncio.QueueEmpty: + raise RuntimeError("Desynchronization: self._items is not empty but asyncio.Queue is empty.") + self._items.popleft() + return item + + def snapshot(self) -> list[str]: + """Return a list of the current queue contents.""" + + return list(self._items) + # Known Qdrant-managed dense embedding models with their dimensionality and # similarity metric. To support a new server-side embedding model, add an entry # here with the appropriate vector size and `models.Distance` value. @@ -182,14 +215,20 @@ def _load_imdb_retry_queue(path: Path) -> None: """Populate the retry queue from a JSON file if it exists.""" global _imdb_retry_queue - _imdb_retry_queue = asyncio.Queue() + ids: list[str] = [] if path.exists(): try: - ids = json.loads(path.read_text()) - for imdb_id in ids: - _imdb_retry_queue.put_nowait(str(imdb_id)) + data = json.loads(path.read_text()) + if isinstance(data, list): + ids = [str(imdb_id) for imdb_id in data] + else: + logger.warning( + "IMDb retry queue file %s did not contain a list; ignoring its contents", + path, + ) except Exception: logger.exception("Failed to load IMDb retry queue from %s", path) + _imdb_retry_queue = _IMDbRetryQueue(ids) async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None: @@ -210,8 +249,7 @@ def _persist_imdb_retry_queue(path: Path) -> None: if _imdb_retry_queue is None: return - ids = list(_imdb_retry_queue._queue) # type: ignore[attr-defined] - path.write_text(json.dumps(ids)) + path.write_text(json.dumps(_imdb_retry_queue.snapshot())) async def _upsert_in_batches( @@ -598,7 +636,7 @@ async def run( async with httpx.AsyncClient(timeout=30) as client: await _process_imdb_retry_queue(client) else: - _imdb_retry_queue = asyncio.Queue() + _imdb_retry_queue = _IMDbRetryQueue() items: List[AggregatedItem] if sample_dir is not None: diff --git a/pyproject.toml b/pyproject.toml index 9dde796..f66a8e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.28" +version = "0.26.29" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index 9ea0589..c08c1a4 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -370,6 +370,9 @@ async def first_run(): async def second_run(): _load_imdb_retry_queue(queue_path) + assert loader._imdb_retry_queue is not None + assert loader._imdb_retry_queue.qsize() == 1 + assert loader._imdb_retry_queue.snapshot() == ["tt0111161"] async with httpx.AsyncClient(transport=httpx.MockTransport(second_transport)) as client: await _process_imdb_retry_queue(client) _persist_imdb_retry_queue(queue_path) @@ -388,8 +391,7 @@ def test_load_imdb_retry_queue_invalid_json(tmp_path): def test_process_imdb_retry_queue_requeues(monkeypatch): - queue: asyncio.Queue[str] = asyncio.Queue() - queue.put_nowait("tt0111161") + queue = loader._IMDbRetryQueue(["tt0111161"]) monkeypatch.setattr(loader, "_imdb_retry_queue", queue) async def fake_fetch(client, imdb_id): @@ -403,6 +405,7 @@ async def run_test(): asyncio.run(run_test()) assert queue.qsize() == 1 + assert queue.snapshot() == ["tt0111161"] def test_resolve_tmdb_season_number_matches_name(): diff --git a/uv.lock b/uv.lock index 015b753..e7c7bea 100644 --- a/uv.lock +++ b/uv.lock @@ -690,7 +690,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.28" +version = "0.26.29" source = { editable = "." } dependencies = [ { name = "fastapi" },