diff --git a/AGENTS.md b/AGENTS.md index 4ee55c4..0f6d7cb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -22,6 +22,7 @@ accurate episode lookups. - Plex metadata is fetched in batches using `fetchItems` to reduce repeated network calls when loading library items. +- IMDb metadata is fetched via `titles:batchGet` to minimize repeated API calls. ## User Queries The project should handle natural-language searches and recommendations such as: diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index a4fe593..e851a82 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -51,9 +51,11 @@ async def _gather_in_batches( """Gather awaitable tasks in fixed-size batches.""" results: List[T] = [] - for i in range(0, len(tasks), batch_size): + total = len(tasks) + for i in range(0, total, batch_size): batch = tasks[i : i + batch_size] results.extend(await asyncio.gather(*batch)) + logger.info("Processed %d/%d items", min(i + batch_size, total), total) return results @@ -86,6 +88,54 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT return None +async def _fetch_imdb_batch( + client: httpx.AsyncClient, imdb_ids: Sequence[str] +) -> dict[str, Optional[IMDbTitle]]: + """Fetch metadata for multiple IMDb IDs in a single request.""" + + results: dict[str, Optional[IMDbTitle]] = {} + ids_to_fetch: list[str] = [] + for imdb_id in imdb_ids: + if _imdb_cache: + cached = _imdb_cache.get(imdb_id) + if cached: + results[imdb_id] = IMDbTitle.model_validate(cached) + continue + ids_to_fetch.append(imdb_id) + + if not ids_to_fetch: + return results + + delay = _imdb_backoff + params = [("titleIds", i) for i in ids_to_fetch] + for attempt in range(_imdb_max_retries + 1): + resp = await client.get("https://api.imdbapi.dev/titles:batchGet", params=params) + if resp.status_code == 429: + if attempt == _imdb_max_retries: + if _imdb_retry_queue is not None: + for imdb_id in ids_to_fetch: + await _imdb_retry_queue.put(imdb_id) + break + await asyncio.sleep(delay) + delay *= 2 + continue + if resp.is_success: + data = resp.json() + for title_data in data.get("titles", []): + imdb_title = IMDbTitle.model_validate(title_data) + results[imdb_title.id] = imdb_title + if _imdb_cache: + _imdb_cache.set(imdb_title.id, title_data) + for missing in set(ids_to_fetch) - set(results): + results[missing] = None + break + for imdb_id in ids_to_fetch: + results[imdb_id] = None + break + + return results + + def _load_imdb_retry_queue(path: Path) -> None: """Populate the retry queue from a JSON file if it exists.""" @@ -279,46 +329,29 @@ async def _load_from_plex( server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50 ) -> List[AggregatedItem]: """Load items from a live Plex server.""" - - async def _augment_movie(client: httpx.AsyncClient, movie: PlexPartialObject) -> AggregatedItem: - ids = _extract_external_ids(movie) - imdb_task = ( - _fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None) - ) - tmdb_task = ( - _fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key) - if ids.tmdb - else asyncio.sleep(0, result=None) - ) - imdb, tmdb = await asyncio.gather(imdb_task, tmdb_task) - return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb) - - async def _augment_episode( - client: httpx.AsyncClient, - episode: PlexPartialObject, - show_tmdb: Optional[TMDBShow], - ) -> AggregatedItem: - ids = _extract_external_ids(episode) - imdb_task = ( - _fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None) - ) - season = resolve_tmdb_season_number(show_tmdb, episode) - ep_num = getattr(episode, "index", None) - tmdb_task = ( - _fetch_tmdb_episode(client, show_tmdb.id, season, ep_num, tmdb_api_key) - if show_tmdb and season is not None and ep_num is not None - else asyncio.sleep(0, result=None) - ) - imdb, tmdb_episode = await asyncio.gather(imdb_task, tmdb_task) - tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb - return AggregatedItem(plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb) - results: List[AggregatedItem] = [] async with httpx.AsyncClient(timeout=30) as client: movie_section = server.library.section("Movies") movie_keys = [int(m.ratingKey) for m in movie_section.all()] movies = server.fetchItems(movie_keys) if movie_keys else [] - movie_tasks = [_augment_movie(client, movie) for movie in movies] + movie_imdb_ids = [ + _extract_external_ids(m).imdb for m in movies if _extract_external_ids(m).imdb + ] + movie_imdb_map = ( + await _fetch_imdb_batch(client, movie_imdb_ids) if movie_imdb_ids else {} + ) + + async def _augment_movie(movie: PlexPartialObject) -> AggregatedItem: + ids = _extract_external_ids(movie) + imdb = movie_imdb_map.get(ids.imdb) if ids.imdb else None + tmdb = ( + await _fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key) + if ids.tmdb + else None + ) + return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb) + + movie_tasks = [_augment_movie(movie) for movie in movies] if movie_tasks: results.extend(await _gather_in_batches(movie_tasks, batch_size)) @@ -332,9 +365,33 @@ async def _augment_episode( show_tmdb = await _fetch_tmdb_show(client, show_ids.tmdb, tmdb_api_key) episode_keys = [int(e.ratingKey) for e in full_show.episodes()] episodes = server.fetchItems(episode_keys) if episode_keys else [] - episode_tasks = [ - _augment_episode(client, episode, show_tmdb) for episode in episodes + ep_imdb_ids = [ + _extract_external_ids(e).imdb + for e in episodes + if _extract_external_ids(e).imdb ] + ep_imdb_map = ( + await _fetch_imdb_batch(client, ep_imdb_ids) if ep_imdb_ids else {} + ) + + async def _augment_episode(episode: PlexPartialObject) -> AggregatedItem: + ids = _extract_external_ids(episode) + imdb = ep_imdb_map.get(ids.imdb) if ids.imdb else None + season = resolve_tmdb_season_number(show_tmdb, episode) + ep_num = getattr(episode, "index", None) + tmdb_episode = ( + await _fetch_tmdb_episode( + client, show_tmdb.id, season, ep_num, tmdb_api_key + ) + if show_tmdb and season is not None and ep_num is not None + else None + ) + tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb + return AggregatedItem( + plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb + ) + + episode_tasks = [_augment_episode(ep) for ep in episodes] if episode_tasks: results.extend(await _gather_in_batches(episode_tasks, batch_size)) return results diff --git a/pyproject.toml b/pyproject.toml index 03a86d9..b398f3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.19" +version = "0.26.21" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_gather_in_batches.py b/tests/test_gather_in_batches.py index 8863b08..3dcff39 100644 --- a/tests/test_gather_in_batches.py +++ b/tests/test_gather_in_batches.py @@ -1,4 +1,5 @@ import asyncio +import logging from mcp_plex import loader @@ -8,7 +9,7 @@ async def _echo(value: int) -> int: return value -def test_gather_in_batches(monkeypatch): +def test_gather_in_batches(monkeypatch, caplog): calls: list[int] = [] orig_gather = asyncio.gather @@ -19,8 +20,12 @@ async def fake_gather(*coros): monkeypatch.setattr(asyncio, "gather", fake_gather) tasks = [_echo(i) for i in range(5)] - results = asyncio.run(loader._gather_in_batches(tasks, 2)) + with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): + results = asyncio.run(loader._gather_in_batches(tasks, 2)) assert results == list(range(5)) assert calls == [2, 2, 1] + assert "Processed 2/5 items" in caplog.text + assert "Processed 4/5 items" in caplog.text + assert "Processed 5/5 items" in caplog.text diff --git a/tests/test_load_from_plex.py b/tests/test_load_from_plex.py index 7945fb6..6ec3164 100644 --- a/tests/test_load_from_plex.py +++ b/tests/test_load_from_plex.py @@ -62,33 +62,34 @@ async def handler(request): url = str(request.url) if "themoviedb.org" in url: assert request.headers.get("Authorization") == "Bearer key" - if "tt1375666" in url: - return httpx.Response( - 200, - json={ - "id": "tt1375666", - "type": "movie", - "primaryTitle": "Inception", - }, - ) - if "tt0959621" in url: - return httpx.Response( - 200, - json={ - "id": "tt0959621", - "type": "episode", - "primaryTitle": "Pilot", - }, - ) - if "tt0959622" in url: - return httpx.Response( - 200, - json={ - "id": "tt0959622", - "type": "episode", - "primaryTitle": "Cat's in the Bag...", - }, - ) + if "titles:batchGet" in url: + ids = request.url.params.get_list("titleIds") + titles = [] + if "tt1375666" in ids: + titles.append( + { + "id": "tt1375666", + "type": "movie", + "primaryTitle": "Inception", + } + ) + if "tt0959621" in ids: + titles.append( + { + "id": "tt0959621", + "type": "episode", + "primaryTitle": "Pilot", + } + ) + if "tt0959622" in ids: + titles.append( + { + "id": "tt0959622", + "type": "episode", + "primaryTitle": "Cat's in the Bag...", + } + ) + return httpx.Response(200, json={"titles": titles}) if "/movie/27205" in url: return httpx.Response(200, json={"id": 27205, "title": "Inception"}) if "/tv/1396/season/1/episode/1" in url: diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index 7d88588..967cde2 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -11,6 +11,7 @@ _build_plex_item, _extract_external_ids, _fetch_imdb, + _fetch_imdb_batch, _fetch_tmdb_episode, _fetch_tmdb_movie, _fetch_tmdb_show, @@ -176,6 +177,34 @@ async def main(): asyncio.run(main()) +def test_fetch_imdb_batch(tmp_path, monkeypatch): + cache_path = tmp_path / "cache.json" + monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) + + async def imdb_mock(request): + params = request.url.params + assert sorted(params.get_list("titleIds")) == ["tt1", "tt2"] + return httpx.Response( + 200, + json={ + "titles": [ + {"id": "tt1", "type": "movie", "primaryTitle": "A"}, + {"id": "tt2", "type": "movie", "primaryTitle": "B"}, + ] + }, + ) + + async def main(): + async with httpx.AsyncClient(transport=httpx.MockTransport(imdb_mock)) as client: + result = await _fetch_imdb_batch(client, ["tt1", "tt2"]) + assert result["tt1"] and result["tt1"].primaryTitle == "A" + assert result["tt2"] and result["tt2"].primaryTitle == "B" + + asyncio.run(main()) + data = json.loads(cache_path.read_text()) + assert set(data.keys()) == {"tt1", "tt2"} + + def test_fetch_imdb_retries_on_429(monkeypatch, tmp_path): cache_path = tmp_path / "cache.json" monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) diff --git a/uv.lock b/uv.lock index f684ca9..52e3894 100644 --- a/uv.lock +++ b/uv.lock @@ -690,7 +690,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.19" +version = "0.26.21" source = { editable = "." } dependencies = [ { name = "fastapi" },