Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
- IMDb metadata is fetched via `titles:batchGet` to minimize repeated API calls.
- The `titles:batchGet` endpoint accepts at most five IDs, so IMDb lookups are
split into batches of five.
- Qdrant upserts are batched and network errors are logged so large loads can
proceed even when individual batches fail.

## User Queries
The project should handle natural-language searches and recommendations such as:
Expand Down
65 changes: 58 additions & 7 deletions mcp_plex/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
_imdb_backoff: float = 1.0
_imdb_retry_queue: asyncio.Queue[str] | None = None
_imdb_batch_limit: int = 5
_qdrant_batch_size: int = 1000


async def _gather_in_batches(
Expand Down Expand Up @@ -72,7 +73,11 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT
url = f"https://api.imdbapi.dev/titles/{imdb_id}"
delay = _imdb_backoff
for attempt in range(_imdb_max_retries + 1):
resp = await client.get(url)
try:
resp = await client.get(url)
except httpx.HTTPError:
logger.exception("HTTP error fetching IMDb ID %s", imdb_id)
return None
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
Expand Down Expand Up @@ -114,7 +119,13 @@ async def _fetch_imdb_batch(
params = [("titleIds", imdb_id) for imdb_id in chunk]
delay = _imdb_backoff
for attempt in range(_imdb_max_retries + 1):
resp = await client.get(url, params=params)
try:
resp = await client.get(url, params=params)
except httpx.HTTPError:
logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk))
for imdb_id in chunk:
results[imdb_id] = None
break
if resp.status_code == 429:
if attempt == _imdb_max_retries:
if _imdb_retry_queue is not None:
Expand Down Expand Up @@ -181,13 +192,39 @@ def _persist_imdb_retry_queue(path: Path) -> None:
path.write_text(json.dumps(ids))


async def _upsert_in_batches(
client: AsyncQdrantClient,
collection_name: str,
points: Sequence[models.PointStruct],
) -> None:
"""Upsert points into Qdrant in batches, logging HTTP errors."""

total = len(points)
for i in range(0, total, _qdrant_batch_size):
batch = points[i : i + _qdrant_batch_size]
try:
await client.upsert(collection_name=collection_name, points=batch)
except Exception:
logger.exception(
"Failed to upsert batch %d-%d", i, i + len(batch)
)
else:
logger.info(
"Upserted %d/%d points", min(i + len(batch), total), total
)


async def _fetch_tmdb_movie(
client: httpx.AsyncClient, tmdb_id: str, api_key: str
) -> Optional[TMDBMovie]:
url = (
f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews"
)
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
try:
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
except httpx.HTTPError:
logger.exception("HTTP error fetching TMDb movie %s", tmdb_id)
return None
if resp.is_success:
return TMDBMovie.model_validate(resp.json())
return None
Expand All @@ -199,7 +236,11 @@ async def _fetch_tmdb_show(
url = (
f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews"
)
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
try:
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
except httpx.HTTPError:
logger.exception("HTTP error fetching TMDb show %s", tmdb_id)
return None
if resp.is_success:
return TMDBShow.model_validate(resp.json())
return None
Expand All @@ -217,7 +258,16 @@ async def _fetch_tmdb_episode(
url = (
f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}"
)
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
try:
resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"})
except httpx.HTTPError:
logger.exception(
"HTTP error fetching TMDb episode %s S%sE%s",
show_id,
season_number,
episode_number,
)
return None
if resp.is_success:
return TMDBEpisode.model_validate(resp.json())
return None
Expand Down Expand Up @@ -657,11 +707,12 @@ async def run(

if points:
logger.info(
"Upserting %d points into Qdrant collection %s",
"Upserting %d points into Qdrant collection %s in batches of %d",
len(points),
collection_name,
_qdrant_batch_size,
)
await client.upsert(collection_name=collection_name, points=points)
await _upsert_in_batches(client, collection_name, points)
else:
logger.info("No points to upsert")

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mcp-plex"
version = "0.26.24"
version = "0.26.25"

description = "Plex-Oriented Model Context Protocol Server"
requires-python = ">=3.11,<3.13"
Expand Down
23 changes: 21 additions & 2 deletions tests/test_loader_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
class CaptureClient(AsyncQdrantClient):
instance: "CaptureClient" | None = None
captured_points: list[models.PointStruct] = []
upsert_calls: int = 0

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
CaptureClient.instance = self

async def upsert(self, collection_name: str, points, **kwargs):
CaptureClient.captured_points = points
return await super().upsert(collection_name=collection_name, points=points, **kwargs)
CaptureClient.upsert_calls += 1
CaptureClient.captured_points.extend(points)
return await super().upsert(
collection_name=collection_name, points=points, **kwargs
)


async def _run_loader(sample_dir: Path) -> None:
Expand All @@ -36,6 +40,8 @@ async def _run_loader(sample_dir: Path) -> None:

def test_run_writes_points(monkeypatch):
monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient)
CaptureClient.captured_points = []
CaptureClient.upsert_calls = 0
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
asyncio.run(_run_loader(sample_dir))
client = CaptureClient.instance
Expand All @@ -56,6 +62,8 @@ def test_run_writes_points(monkeypatch):

def test_run_processes_imdb_queue(monkeypatch, tmp_path):
monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient)
CaptureClient.captured_points = []
CaptureClient.upsert_calls = 0
queue_file = tmp_path / "queue.json"
queue_file.write_text(json.dumps(["tt0111161"]))
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
Expand All @@ -78,3 +86,14 @@ async def fake_fetch(client, imdb_id):
)

assert json.loads(queue_file.read_text()) == ["tt0111161"]


def test_run_upserts_in_batches(monkeypatch):
monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient)
monkeypatch.setattr(loader, "_qdrant_batch_size", 1)
CaptureClient.captured_points = []
CaptureClient.upsert_calls = 0
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
asyncio.run(_run_loader(sample_dir))
assert CaptureClient.upsert_calls == 2
assert len(CaptureClient.captured_points) == 2
36 changes: 36 additions & 0 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path

import httpx
from qdrant_client import models

from mcp_plex import loader
from mcp_plex.imdb_cache import IMDbCache
Expand Down Expand Up @@ -151,6 +152,24 @@ async def main():
asyncio.run(main())


def test_fetch_functions_handle_http_error():
def raise_error(request: httpx.Request) -> httpx.Response: # type: ignore[override]
raise httpx.ConnectError("boom", request=request)

async def main() -> None:
transport = httpx.MockTransport(raise_error)
async with httpx.AsyncClient(transport=transport) as client:
assert await _fetch_imdb(client, "tt1") is None
async with httpx.AsyncClient(transport=transport) as client:
assert await _fetch_tmdb_movie(client, "1", "k") is None
async with httpx.AsyncClient(transport=transport) as client:
assert await _fetch_tmdb_show(client, "1", "k") is None
async with httpx.AsyncClient(transport=transport) as client:
assert await _fetch_tmdb_episode(client, 1, 1, 1, "k") is None

asyncio.run(main())


def test_fetch_imdb_cache_miss(tmp_path, monkeypatch):
cache_path = tmp_path / "cache.json"
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))
Expand Down Expand Up @@ -446,3 +465,20 @@ def test_resolve_tmdb_season_number_parent_index_str():
def test_resolve_tmdb_season_number_parent_title_digit():
episode = types.SimpleNamespace(parentTitle="4")
assert resolve_tmdb_season_number(None, episode) == 4


def test_upsert_in_batches_handles_errors(monkeypatch):
class DummyClient:
def __init__(self):
self.calls = 0

async def upsert(self, collection_name: str, points, **kwargs):
self.calls += 1
if self.calls == 2:
raise httpx.ConnectError("fail", request=httpx.Request("POST", ""))

client = DummyClient()
points = [models.PointStruct(id=i, vector={}, payload={}) for i in range(3)]
monkeypatch.setattr(loader, "_qdrant_batch_size", 1)
asyncio.run(loader._upsert_in_batches(client, "c", points))
assert client.calls == 3
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.