Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
accurate episode lookups.
- Plex metadata is fetched in batches using `fetchItems` to reduce repeated
network calls when loading library items.
- IMDb metadata is fetched via `titles:batchGet` to minimize repeated API calls.

## User Queries
The project should handle natural-language searches and recommendations such as:
Expand Down
133 changes: 95 additions & 38 deletions mcp_plex/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ async def _gather_in_batches(
"""Gather awaitable tasks in fixed-size batches."""

results: List[T] = []
for i in range(0, len(tasks), batch_size):
total = len(tasks)
for i in range(0, total, batch_size):
batch = tasks[i : i + batch_size]
results.extend(await asyncio.gather(*batch))
logger.info("Processed %d/%d items", min(i + batch_size, total), total)
return results


Expand Down Expand Up @@ -86,6 +88,54 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT
return None


async def _fetch_imdb_batch(
client: httpx.AsyncClient, imdb_ids: Sequence[str]
) -> dict[str, Optional[IMDbTitle]]:
"""Fetch metadata for multiple IMDb IDs in a single request."""

results: dict[str, Optional[IMDbTitle]] = {}
ids_to_fetch: list[str] = []
for imdb_id in imdb_ids:
if _imdb_cache:
cached = _imdb_cache.get(imdb_id)
if cached:
results[imdb_id] = IMDbTitle.model_validate(cached)
continue
ids_to_fetch.append(imdb_id)

if not ids_to_fetch:
return results

delay = _imdb_backoff
params = [("titleIds", i) for i in ids_to_fetch]
for attempt in range(_imdb_max_retries + 1):
resp = await client.get("https://api.imdbapi.dev/titles:batchGet", params=params)
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
for imdb_id in ids_to_fetch:
await _imdb_retry_queue.put(imdb_id)
break
await asyncio.sleep(delay)
delay *= 2
continue
if resp.is_success:
data = resp.json()
for title_data in data.get("titles", []):
imdb_title = IMDbTitle.model_validate(title_data)
results[imdb_title.id] = imdb_title
if _imdb_cache:
_imdb_cache.set(imdb_title.id, title_data)
for missing in set(ids_to_fetch) - set(results):
results[missing] = None
break
for imdb_id in ids_to_fetch:
results[imdb_id] = None
break

return results


def _load_imdb_retry_queue(path: Path) -> None:
"""Populate the retry queue from a JSON file if it exists."""

Expand Down Expand Up @@ -279,46 +329,29 @@ async def _load_from_plex(
server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50
) -> List[AggregatedItem]:
"""Load items from a live Plex server."""

async def _augment_movie(client: httpx.AsyncClient, movie: PlexPartialObject) -> AggregatedItem:
ids = _extract_external_ids(movie)
imdb_task = (
_fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None)
)
tmdb_task = (
_fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key)
if ids.tmdb
else asyncio.sleep(0, result=None)
)
imdb, tmdb = await asyncio.gather(imdb_task, tmdb_task)
return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb)

async def _augment_episode(
client: httpx.AsyncClient,
episode: PlexPartialObject,
show_tmdb: Optional[TMDBShow],
) -> AggregatedItem:
ids = _extract_external_ids(episode)
imdb_task = (
_fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None)
)
season = resolve_tmdb_season_number(show_tmdb, episode)
ep_num = getattr(episode, "index", None)
tmdb_task = (
_fetch_tmdb_episode(client, show_tmdb.id, season, ep_num, tmdb_api_key)
if show_tmdb and season is not None and ep_num is not None
else asyncio.sleep(0, result=None)
)
imdb, tmdb_episode = await asyncio.gather(imdb_task, tmdb_task)
tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb
return AggregatedItem(plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb)

results: List[AggregatedItem] = []
async with httpx.AsyncClient(timeout=30) as client:
movie_section = server.library.section("Movies")
movie_keys = [int(m.ratingKey) for m in movie_section.all()]
movies = server.fetchItems(movie_keys) if movie_keys else []
movie_tasks = [_augment_movie(client, movie) for movie in movies]
movie_imdb_ids = [
_extract_external_ids(m).imdb for m in movies if _extract_external_ids(m).imdb
]
movie_imdb_map = (
await _fetch_imdb_batch(client, movie_imdb_ids) if movie_imdb_ids else {}
)

async def _augment_movie(movie: PlexPartialObject) -> AggregatedItem:
ids = _extract_external_ids(movie)
imdb = movie_imdb_map.get(ids.imdb) if ids.imdb else None
tmdb = (
await _fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key)
if ids.tmdb
else None
)
return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb)

movie_tasks = [_augment_movie(movie) for movie in movies]
if movie_tasks:
results.extend(await _gather_in_batches(movie_tasks, batch_size))

Expand All @@ -332,9 +365,33 @@ async def _augment_episode(
show_tmdb = await _fetch_tmdb_show(client, show_ids.tmdb, tmdb_api_key)
episode_keys = [int(e.ratingKey) for e in full_show.episodes()]
episodes = server.fetchItems(episode_keys) if episode_keys else []
episode_tasks = [
_augment_episode(client, episode, show_tmdb) for episode in episodes
ep_imdb_ids = [
_extract_external_ids(e).imdb
for e in episodes
if _extract_external_ids(e).imdb
]
ep_imdb_map = (
await _fetch_imdb_batch(client, ep_imdb_ids) if ep_imdb_ids else {}
)

async def _augment_episode(episode: PlexPartialObject) -> AggregatedItem:
ids = _extract_external_ids(episode)
imdb = ep_imdb_map.get(ids.imdb) if ids.imdb else None
season = resolve_tmdb_season_number(show_tmdb, episode)
ep_num = getattr(episode, "index", None)
tmdb_episode = (
await _fetch_tmdb_episode(
client, show_tmdb.id, season, ep_num, tmdb_api_key
)
if show_tmdb and season is not None and ep_num is not None
else None
)
tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb
return AggregatedItem(
plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb
)

episode_tasks = [_augment_episode(ep) for ep in episodes]
if episode_tasks:
results.extend(await _gather_in_batches(episode_tasks, batch_size))
return results
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.19"
version = "0.26.21"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
9 changes: 7 additions & 2 deletions tests/test_gather_in_batches.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging

from mcp_plex import loader

Expand All @@ -8,7 +9,7 @@ async def _echo(value: int) -> int:
return value


def test_gather_in_batches(monkeypatch):
def test_gather_in_batches(monkeypatch, caplog):
calls: list[int] = []
orig_gather = asyncio.gather

Expand All @@ -19,8 +20,12 @@ async def fake_gather(*coros):
monkeypatch.setattr(asyncio, "gather", fake_gather)

tasks = [_echo(i) for i in range(5)]
results = asyncio.run(loader._gather_in_batches(tasks, 2))
with caplog.at_level(logging.INFO, logger="mcp_plex.loader"):
results = asyncio.run(loader._gather_in_batches(tasks, 2))

assert results == list(range(5))
assert calls == [2, 2, 1]
assert "Processed 2/5 items" in caplog.text
assert "Processed 4/5 items" in caplog.text
assert "Processed 5/5 items" in caplog.text

55 changes: 28 additions & 27 deletions tests/test_load_from_plex.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,34 @@ async def handler(request):
url = str(request.url)
if "themoviedb.org" in url:
assert request.headers.get("Authorization") == "Bearer key"
if "tt1375666" in url:
return httpx.Response(
200,
json={
"id": "tt1375666",
"type": "movie",
"primaryTitle": "Inception",
},
)
if "tt0959621" in url:
return httpx.Response(
200,
json={
"id": "tt0959621",
"type": "episode",
"primaryTitle": "Pilot",
},
)
if "tt0959622" in url:
return httpx.Response(
200,
json={
"id": "tt0959622",
"type": "episode",
"primaryTitle": "Cat's in the Bag...",
},
)
if "titles:batchGet" in url:
ids = request.url.params.get_list("titleIds")
titles = []
if "tt1375666" in ids:
titles.append(
{
"id": "tt1375666",
"type": "movie",
"primaryTitle": "Inception",
}
)
if "tt0959621" in ids:
titles.append(
{
"id": "tt0959621",
"type": "episode",
"primaryTitle": "Pilot",
}
)
if "tt0959622" in ids:
titles.append(
{
"id": "tt0959622",
"type": "episode",
"primaryTitle": "Cat's in the Bag...",
}
)
return httpx.Response(200, json={"titles": titles})
if "/movie/27205" in url:
return httpx.Response(200, json={"id": 27205, "title": "Inception"})
if "/tv/1396/season/1/episode/1" in url:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
_build_plex_item,
_extract_external_ids,
_fetch_imdb,
_fetch_imdb_batch,
_fetch_tmdb_episode,
_fetch_tmdb_movie,
_fetch_tmdb_show,
Expand Down Expand Up @@ -176,6 +177,34 @@ async def main():
asyncio.run(main())


def test_fetch_imdb_batch(tmp_path, monkeypatch):
cache_path = tmp_path / "cache.json"
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))

async def imdb_mock(request):
params = request.url.params
assert sorted(params.get_list("titleIds")) == ["tt1", "tt2"]
return httpx.Response(
200,
json={
"titles": [
{"id": "tt1", "type": "movie", "primaryTitle": "A"},
{"id": "tt2", "type": "movie", "primaryTitle": "B"},
]
},
)

async def main():
async with httpx.AsyncClient(transport=httpx.MockTransport(imdb_mock)) as client:
result = await _fetch_imdb_batch(client, ["tt1", "tt2"])
assert result["tt1"] and result["tt1"].primaryTitle == "A"
assert result["tt2"] and result["tt2"].primaryTitle == "B"

asyncio.run(main())
data = json.loads(cache_path.read_text())
assert set(data.keys()) == {"tt1", "tt2"}


def test_fetch_imdb_retries_on_429(monkeypatch, tmp_path):
cache_path = tmp_path / "cache.json"
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.