Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ Run continuously with a delay between runs:
uv run load-data --continuous --delay 600
```

### IMDb Retry Queue
When IMDb lookups continue to return HTTP 429 after the configured retries,
their IDs are added to a small queue (`imdb_queue.json` by default). The queue
is persisted after each run and reloaded on the next run so pending IDs are
retried before normal processing.

### Run the MCP Server
Start the FastMCP server over stdio (default):
```bash
Expand Down
68 changes: 67 additions & 1 deletion mcp_plex/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
_imdb_cache: IMDbCache | None = None
_imdb_max_retries: int = 3
_imdb_backoff: float = 1.0
_imdb_retry_queue: asyncio.Queue[str] | None = None


async def _gather_in_batches(
Expand Down Expand Up @@ -70,6 +71,8 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT
resp = await client.get(url)
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
await _imdb_retry_queue.put(imdb_id)
return None
await asyncio.sleep(delay)
delay *= 2
Expand All @@ -83,6 +86,42 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT
return None


def _load_imdb_retry_queue(path: Path) -> None:
"""Populate the retry queue from a JSON file if it exists."""

global _imdb_retry_queue
_imdb_retry_queue = asyncio.Queue()
if path.exists():
try:
ids = json.loads(path.read_text())
for imdb_id in ids:
_imdb_retry_queue.put_nowait(str(imdb_id))
except Exception:
logger.exception("Failed to load IMDb retry queue from %s", path)


async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None:
"""Attempt to fetch queued IMDb IDs, re-queueing failures."""

if _imdb_retry_queue is None or _imdb_retry_queue.empty():
return
size = _imdb_retry_queue.qsize()
for _ in range(size):
imdb_id = await _imdb_retry_queue.get()
title = await _fetch_imdb(client, imdb_id)
if title is None:
await _imdb_retry_queue.put(imdb_id)
Comment on lines +108 to +113

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Avoid doubling IMDb retry queue on repeated 429s

When processing the persisted queue, _process_imdb_retry_queue re-enqueues any ID that still returns None, but _fetch_imdb already pushes the same ID back onto _imdb_retry_queue when it hits the final 429 retry (see lines 72‑75). With a rate-limited IMDb API, every failed run doubles the number of queued entries for the same ID, so a single title can balloon to thousands of retries and a large imdb_queue.json, causing unnecessary API calls and potential disk growth once the service recovers. Consider only re-enqueuing here when _fetch_imdb did not already do so.

Useful? React with 👍 / 👎.



def _persist_imdb_retry_queue(path: Path) -> None:
"""Persist the retry queue to disk."""

if _imdb_retry_queue is None:
return
ids = list(_imdb_retry_queue._queue) # type: ignore[attr-defined]
path.write_text(json.dumps(ids))


async def _fetch_tmdb_movie(
client: httpx.AsyncClient, tmdb_id: str, api_key: str
) -> Optional[TMDBMovie]:
Expand Down Expand Up @@ -207,6 +246,10 @@ async def _augment_episode(
_fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None)
)
season = getattr(episode, "parentIndex", None)
if season is None:
title = getattr(episode, "parentTitle", "")
if isinstance(title, str) and title.isdigit():
season = int(title)
ep_num = getattr(episode, "index", None)
tmdb_task = (
_fetch_tmdb_episode(client, show_tmdb.id, season, ep_num, tmdb_api_key)
Expand Down Expand Up @@ -352,13 +395,20 @@ async def run(
imdb_cache_path: Path | None = None,
imdb_max_retries: int = 3,
imdb_backoff: float = 1.0,
imdb_queue_path: Path | None = None,
) -> None:
"""Core execution logic for the CLI."""

global _imdb_cache, _imdb_max_retries, _imdb_backoff
global _imdb_cache, _imdb_max_retries, _imdb_backoff, _imdb_retry_queue
_imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None
_imdb_max_retries = imdb_max_retries
_imdb_backoff = imdb_backoff
if imdb_queue_path:
_load_imdb_retry_queue(imdb_queue_path)
async with httpx.AsyncClient(timeout=30) as client:
await _process_imdb_retry_queue(client)
else:
_imdb_retry_queue = asyncio.Queue()

items: List[AggregatedItem]
if sample_dir is not None:
Expand Down Expand Up @@ -497,6 +547,9 @@ async def run(
else:
logger.info("No points to upsert")

if imdb_queue_path:
_persist_imdb_retry_queue(imdb_queue_path)

json.dump([item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2)
sys.stdout.write("\n")

Expand Down Expand Up @@ -643,6 +696,15 @@ async def run(
show_default=True,
help="Initial backoff delay in seconds for IMDb retries",
)
@click.option(
"--imdb-queue",
envvar="IMDB_QUEUE",
show_envvar=True,
type=click.Path(path_type=Path),
default=Path("imdb_queue.json"),
show_default=True,
help="Path to persistent IMDb retry queue",
)
def main(
plex_url: Optional[str],
plex_token: Optional[str],
Expand All @@ -662,6 +724,7 @@ def main(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_queue: Path,
) -> None:
"""Entry-point for the ``load-data`` script."""

Expand All @@ -685,6 +748,7 @@ def main(
imdb_cache,
imdb_max_retries,
imdb_backoff,
imdb_queue,
)
)

Expand All @@ -708,6 +772,7 @@ async def load_media(
imdb_cache: Path,
imdb_max_retries: int,
imdb_backoff: float,
imdb_queue: Path,
) -> None:
"""Orchestrate one or more runs of :func:`run`."""

Expand All @@ -729,6 +794,7 @@ async def load_media(
imdb_cache,
imdb_max_retries,
imdb_backoff,
imdb_queue,
)
if not continuous:
break
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.14"
version = "0.26.15"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
37 changes: 37 additions & 0 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
_fetch_tmdb_movie,
_fetch_tmdb_show,
_load_from_sample,
_load_imdb_retry_queue,
_persist_imdb_retry_queue,
_process_imdb_retry_queue,
)


Expand Down Expand Up @@ -203,3 +206,37 @@ async def main():
asyncio.run(main())
assert call_count == 3
assert delays == [0.1, 0.2]


def test_imdb_retry_queue_persists_and_retries(tmp_path, monkeypatch):
cache_path = tmp_path / "cache.json"
queue_path = tmp_path / "queue.json"
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))
monkeypatch.setattr(loader, "_imdb_max_retries", 0)
monkeypatch.setattr(loader, "_imdb_backoff", 0)

async def first_transport(request):
return httpx.Response(429)

async def second_transport(request):
return httpx.Response(200, json={"id": "tt1", "type": "movie", "primaryTitle": "T"})

async def first_run():
_load_imdb_retry_queue(queue_path)
async with httpx.AsyncClient(transport=httpx.MockTransport(first_transport)) as client:
await _process_imdb_retry_queue(client)
await _fetch_imdb(client, "tt1")
_persist_imdb_retry_queue(queue_path)

asyncio.run(first_run())
assert json.loads(queue_path.read_text()) == ["tt1"]

async def second_run():
_load_imdb_retry_queue(queue_path)
async with httpx.AsyncClient(transport=httpx.MockTransport(second_transport)) as client:
await _process_imdb_retry_queue(client)
_persist_imdb_retry_queue(queue_path)

asyncio.run(second_run())
assert json.loads(queue_path.read_text()) == []
assert loader._imdb_cache.get("tt1") is not None
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.