From c62fb6b26003a521fcf161b2995ecc7a665f73f0 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 14 Sep 2025 04:18:09 -0600 Subject: [PATCH] feat(loader): batch upserts and handle http errors --- AGENTS.md | 2 + mcp_plex/loader.py | 65 ++++++++++++++++++++++++++++---- pyproject.toml | 2 +- tests/test_loader_integration.py | 23 ++++++++++- tests/test_loader_unit.py | 36 ++++++++++++++++++ uv.lock | 2 +- 6 files changed, 119 insertions(+), 11 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 9a2c6b9..1b1d034 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,6 +25,8 @@ - IMDb metadata is fetched via `titles:batchGet` to minimize repeated API calls. - The `titles:batchGet` endpoint accepts at most five IDs, so IMDb lookups are split into batches of five. +- Qdrant upserts are batched and network errors are logged so large loads can + proceed even when individual batches fail. ## User Queries The project should handle natural-language searches and recommendations such as: diff --git a/mcp_plex/loader.py b/mcp_plex/loader.py index 11e2145..d69d4f6 100644 --- a/mcp_plex/loader.py +++ b/mcp_plex/loader.py @@ -45,6 +45,7 @@ _imdb_backoff: float = 1.0 _imdb_retry_queue: asyncio.Queue[str] | None = None _imdb_batch_limit: int = 5 +_qdrant_batch_size: int = 1000 async def _gather_in_batches( @@ -72,7 +73,11 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT url = f"https://api.imdbapi.dev/titles/{imdb_id}" delay = _imdb_backoff for attempt in range(_imdb_max_retries + 1): - resp = await client.get(url) + try: + resp = await client.get(url) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb ID %s", imdb_id) + return None if resp.status_code == 429: if attempt == _imdb_max_retries: if _imdb_retry_queue is not None: @@ -114,7 +119,13 @@ async def _fetch_imdb_batch( params = [("titleIds", imdb_id) for imdb_id in chunk] delay = _imdb_backoff for attempt in range(_imdb_max_retries + 1): - resp = await client.get(url, params=params) + try: + resp = await client.get(url, params=params) + except httpx.HTTPError: + logger.exception("HTTP error fetching IMDb IDs %s", ",".join(chunk)) + for imdb_id in chunk: + results[imdb_id] = None + break if resp.status_code == 429: if attempt == _imdb_max_retries: if _imdb_retry_queue is not None: @@ -181,13 +192,39 @@ def _persist_imdb_retry_queue(path: Path) -> None: path.write_text(json.dumps(ids)) +async def _upsert_in_batches( + client: AsyncQdrantClient, + collection_name: str, + points: Sequence[models.PointStruct], +) -> None: + """Upsert points into Qdrant in batches, logging HTTP errors.""" + + total = len(points) + for i in range(0, total, _qdrant_batch_size): + batch = points[i : i + _qdrant_batch_size] + try: + await client.upsert(collection_name=collection_name, points=batch) + except Exception: + logger.exception( + "Failed to upsert batch %d-%d", i, i + len(batch) + ) + else: + logger.info( + "Upserted %d/%d points", min(i + len(batch), total), total + ) + + async def _fetch_tmdb_movie( client: httpx.AsyncClient, tmdb_id: str, api_key: str ) -> Optional[TMDBMovie]: url = ( f"https://api.themoviedb.org/3/movie/{tmdb_id}?append_to_response=reviews" ) - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb movie %s", tmdb_id) + return None if resp.is_success: return TMDBMovie.model_validate(resp.json()) return None @@ -199,7 +236,11 @@ async def _fetch_tmdb_show( url = ( f"https://api.themoviedb.org/3/tv/{tmdb_id}?append_to_response=reviews" ) - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception("HTTP error fetching TMDb show %s", tmdb_id) + return None if resp.is_success: return TMDBShow.model_validate(resp.json()) return None @@ -217,7 +258,16 @@ async def _fetch_tmdb_episode( url = ( f"https://api.themoviedb.org/3/tv/{show_id}/season/{season_number}/episode/{episode_number}" ) - resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + try: + resp = await client.get(url, headers={"Authorization": f"Bearer {api_key}"}) + except httpx.HTTPError: + logger.exception( + "HTTP error fetching TMDb episode %s S%sE%s", + show_id, + season_number, + episode_number, + ) + return None if resp.is_success: return TMDBEpisode.model_validate(resp.json()) return None @@ -657,11 +707,12 @@ async def run( if points: logger.info( - "Upserting %d points into Qdrant collection %s", + "Upserting %d points into Qdrant collection %s in batches of %d", len(points), collection_name, + _qdrant_batch_size, ) - await client.upsert(collection_name=collection_name, points=points) + await _upsert_in_batches(client, collection_name, points) else: logger.info("No points to upsert") diff --git a/pyproject.toml b/pyproject.toml index 1fa27df..45c2b3f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.24" +version = "0.26.25" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_integration.py b/tests/test_loader_integration.py index 573bafd..3359461 100644 --- a/tests/test_loader_integration.py +++ b/tests/test_loader_integration.py @@ -13,14 +13,18 @@ class CaptureClient(AsyncQdrantClient): instance: "CaptureClient" | None = None captured_points: list[models.PointStruct] = [] + upsert_calls: int = 0 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) CaptureClient.instance = self async def upsert(self, collection_name: str, points, **kwargs): - CaptureClient.captured_points = points - return await super().upsert(collection_name=collection_name, points=points, **kwargs) + CaptureClient.upsert_calls += 1 + CaptureClient.captured_points.extend(points) + return await super().upsert( + collection_name=collection_name, points=points, **kwargs + ) async def _run_loader(sample_dir: Path) -> None: @@ -36,6 +40,8 @@ async def _run_loader(sample_dir: Path) -> None: def test_run_writes_points(monkeypatch): monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient) + CaptureClient.captured_points = [] + CaptureClient.upsert_calls = 0 sample_dir = Path(__file__).resolve().parents[1] / "sample-data" asyncio.run(_run_loader(sample_dir)) client = CaptureClient.instance @@ -56,6 +62,8 @@ def test_run_writes_points(monkeypatch): def test_run_processes_imdb_queue(monkeypatch, tmp_path): monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient) + CaptureClient.captured_points = [] + CaptureClient.upsert_calls = 0 queue_file = tmp_path / "queue.json" queue_file.write_text(json.dumps(["tt0111161"])) sample_dir = Path(__file__).resolve().parents[1] / "sample-data" @@ -78,3 +86,14 @@ async def fake_fetch(client, imdb_id): ) assert json.loads(queue_file.read_text()) == ["tt0111161"] + + +def test_run_upserts_in_batches(monkeypatch): + monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient) + monkeypatch.setattr(loader, "_qdrant_batch_size", 1) + CaptureClient.captured_points = [] + CaptureClient.upsert_calls = 0 + sample_dir = Path(__file__).resolve().parents[1] / "sample-data" + asyncio.run(_run_loader(sample_dir)) + assert CaptureClient.upsert_calls == 2 + assert len(CaptureClient.captured_points) == 2 diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index d01387f..cbaf974 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -6,6 +6,7 @@ from pathlib import Path import httpx +from qdrant_client import models from mcp_plex import loader from mcp_plex.imdb_cache import IMDbCache @@ -151,6 +152,24 @@ async def main(): asyncio.run(main()) +def test_fetch_functions_handle_http_error(): + def raise_error(request: httpx.Request) -> httpx.Response: # type: ignore[override] + raise httpx.ConnectError("boom", request=request) + + async def main() -> None: + transport = httpx.MockTransport(raise_error) + async with httpx.AsyncClient(transport=transport) as client: + assert await _fetch_imdb(client, "tt1") is None + async with httpx.AsyncClient(transport=transport) as client: + assert await _fetch_tmdb_movie(client, "1", "k") is None + async with httpx.AsyncClient(transport=transport) as client: + assert await _fetch_tmdb_show(client, "1", "k") is None + async with httpx.AsyncClient(transport=transport) as client: + assert await _fetch_tmdb_episode(client, 1, 1, 1, "k") is None + + asyncio.run(main()) + + def test_fetch_imdb_cache_miss(tmp_path, monkeypatch): cache_path = tmp_path / "cache.json" monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) @@ -446,3 +465,20 @@ def test_resolve_tmdb_season_number_parent_index_str(): def test_resolve_tmdb_season_number_parent_title_digit(): episode = types.SimpleNamespace(parentTitle="4") assert resolve_tmdb_season_number(None, episode) == 4 + + +def test_upsert_in_batches_handles_errors(monkeypatch): + class DummyClient: + def __init__(self): + self.calls = 0 + + async def upsert(self, collection_name: str, points, **kwargs): + self.calls += 1 + if self.calls == 2: + raise httpx.ConnectError("fail", request=httpx.Request("POST", "")) + + client = DummyClient() + points = [models.PointStruct(id=i, vector={}, payload={}) for i in range(3)] + monkeypatch.setattr(loader, "_qdrant_batch_size", 1) + asyncio.run(loader._upsert_in_batches(client, "c", points)) + assert client.calls == 3 diff --git a/uv.lock b/uv.lock index e40423d..2958ce4 100644 --- a/uv.lock +++ b/uv.lock @@ -690,7 +690,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.24" +version = "0.26.25" source = { editable = "." } dependencies = [ { name = "fastapi" },