diff --git a/docker/pyproject.deps.toml b/docker/pyproject.deps.toml index 6fb4ca7..4284262 100644 --- a/docker/pyproject.deps.toml +++ b/docker/pyproject.deps.toml @@ -1,6 +1,6 @@ [project] name = "mcp-plex" -version = "0.26.78" +version = "0.26.79" requires-python = ">=3.11,<3.13" dependencies = [ "fastmcp>=2.11.2", diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index f31e7c3..24dd5ac 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -7,6 +7,7 @@ import sys import time import warnings +from dataclasses import dataclass, field from pathlib import Path from typing import Any, List, Optional, Sequence, TypeVar @@ -19,12 +20,8 @@ from .pipeline.channels import ( IMDbRetryQueue, INGEST_DONE, - IngestBatch, IngestQueue, - MovieBatch, - EpisodeBatch, PersistenceQueue, - SampleBatch, chunk_sequence, require_positive, ) @@ -59,31 +56,48 @@ T = TypeVar("T") -_imdb_cache: IMDbCache | None = None -_imdb_max_retries: int = 3 -_imdb_backoff: float = 1.0 -_imdb_retry_queue: "_IMDbRetryQueue" | None = None -_imdb_batch_limit: int = 5 -_imdb_requests_per_window: int | None = None -_imdb_window_seconds: float = 1.0 -_imdb_throttle: Any = None -_qdrant_batch_size: int = 1000 -_qdrant_upsert_buffer_size: int = 200 -_qdrant_max_concurrent_upserts: int = 4 -_qdrant_retry_attempts: int = 3 -_qdrant_retry_backoff: float = 1.0 - -# Backwards-compatible aliases while callers migrate to shared pipeline exports. -_MovieBatch = MovieBatch -_EpisodeBatch = EpisodeBatch -_SampleBatch = SampleBatch -_IngestBatch = IngestBatch -_IngestQueue = IngestQueue -_PersistenceQueue = PersistenceQueue -_require_positive = require_positive -_chunk_sequence = chunk_sequence -_IMDbRetryQueue = IMDbRetryQueue -_INGEST_DONE = INGEST_DONE +IMDB_BATCH_LIMIT: int = 5 +DEFAULT_QDRANT_BATCH_SIZE: int = 1000 +DEFAULT_QDRANT_UPSERT_BUFFER_SIZE: int = 200 +DEFAULT_QDRANT_MAX_CONCURRENT_UPSERTS: int = 4 +DEFAULT_QDRANT_RETRY_ATTEMPTS: int = 3 +DEFAULT_QDRANT_RETRY_BACKOFF: float = 1.0 + + +@dataclass(slots=True) +class IMDbRuntimeConfig: + """Runtime configuration for IMDb enrichment helpers.""" + + cache: IMDbCache | None + max_retries: int + backoff: float + retry_queue: IMDbRetryQueue + requests_per_window: int | None + window_seconds: float + _throttle: Any = field(default=None, init=False, repr=False) + + def get_throttle(self) -> Any: + """Return the shared rate limiter, creating it on first use.""" + + if self.requests_per_window is None: + return None + if self._throttle is None: + from .pipeline import enrichment as enrichment_mod + + self._throttle = enrichment_mod._RequestThrottler( + limit=self.requests_per_window, + interval=float(self.window_seconds), + ) + return self._throttle + + +@dataclass(slots=True) +class QdrantRuntimeConfig: + """Runtime configuration for Qdrant persistence helpers.""" + + batch_size: int = DEFAULT_QDRANT_BATCH_SIZE + retry_attempts: int = DEFAULT_QDRANT_RETRY_ATTEMPTS + retry_backoff: float = DEFAULT_QDRANT_RETRY_BACKOFF def _is_local_qdrant(client: AsyncQdrantClient) -> bool: @@ -118,23 +132,11 @@ def _resolve_dense_model_params(model_name: str) -> tuple[int, models.Distance]: ) from exc -def _get_imdb_throttle() -> Any: - """Return the shared IMDb rate limiter instance if configured.""" - - global _imdb_throttle - if _imdb_requests_per_window is None: - return None - from .pipeline import enrichment as enrichment_mod - - if _imdb_throttle is None: - _imdb_throttle = enrichment_mod._RequestThrottler( - limit=_imdb_requests_per_window, - interval=float(_imdb_window_seconds), - ) - return _imdb_throttle - - -async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbTitle]: +async def _fetch_imdb( + client: httpx.AsyncClient, + imdb_id: str, + config: IMDbRuntimeConfig, +) -> Optional[IMDbTitle]: """Fetch metadata for an IMDb ID with caching, retry, and throttling.""" from .pipeline import enrichment as enrichment_mod @@ -142,18 +144,17 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT return await enrichment_mod._fetch_imdb( client, imdb_id, - cache=_imdb_cache, - throttle=_get_imdb_throttle(), - max_retries=_imdb_max_retries, - backoff=_imdb_backoff, - retry_queue=_imdb_retry_queue, + cache=config.cache, + throttle=config.get_throttle(), + max_retries=config.max_retries, + backoff=config.backoff, + retry_queue=config.retry_queue, ) -def _load_imdb_retry_queue(path: Path) -> None: +def _load_imdb_retry_queue(path: Path) -> IMDbRetryQueue: """Populate the retry queue from a JSON file if it exists.""" - global _imdb_retry_queue ids: list[str] = [] if path.exists(): try: @@ -167,28 +168,29 @@ def _load_imdb_retry_queue(path: Path) -> None: ) except Exception: logger.exception("Failed to load IMDb retry queue from %s", path) - _imdb_retry_queue = _IMDbRetryQueue(ids) + return IMDbRetryQueue(ids) -async def _process_imdb_retry_queue(client: httpx.AsyncClient) -> None: +async def _process_imdb_retry_queue( + client: httpx.AsyncClient, + config: IMDbRuntimeConfig, +) -> None: """Attempt to fetch queued IMDb IDs, re-queueing failures.""" - if _imdb_retry_queue is None or _imdb_retry_queue.empty(): + if config.retry_queue.empty(): return - size = _imdb_retry_queue.qsize() + size = config.retry_queue.qsize() for _ in range(size): - imdb_id = await _imdb_retry_queue.get() - title = await _fetch_imdb(client, imdb_id) + imdb_id = await config.retry_queue.get() + title = await _fetch_imdb(client, imdb_id, config) if title is None: - await _imdb_retry_queue.put(imdb_id) + await config.retry_queue.put(imdb_id) -def _persist_imdb_retry_queue(path: Path) -> None: +def _persist_imdb_retry_queue(path: Path, queue: IMDbRetryQueue) -> None: """Persist the retry queue to disk.""" - if _imdb_retry_queue is None: - return - path.write_text(json.dumps(_imdb_retry_queue.snapshot())) + path.write_text(json.dumps(queue.snapshot())) async def _upsert_in_batches( @@ -196,13 +198,14 @@ async def _upsert_in_batches( collection_name: str, points: Sequence[models.PointStruct], *, + batch_size: int, retry_queue: asyncio.Queue[list[models.PointStruct]] | None = None, ) -> None: """Upsert points into Qdrant in batches, logging HTTP errors.""" total = len(points) - for i in range(0, total, _qdrant_batch_size): - batch = points[i : i + _qdrant_batch_size] + for i in range(0, total, batch_size): + batch = points[i : i + batch_size] try: await client.upsert(collection_name=collection_name, points=batch) except Exception: @@ -221,6 +224,8 @@ async def _process_qdrant_retry_queue( client: AsyncQdrantClient, collection_name: str, retry_queue: asyncio.Queue[list[models.PointStruct]], + *, + config: QdrantRuntimeConfig, ) -> None: """Retry failed Qdrant batches with exponential backoff.""" @@ -232,7 +237,7 @@ async def _process_qdrant_retry_queue( while not retry_queue.empty(): batch = await retry_queue.get() attempt = 1 - while attempt <= _qdrant_retry_attempts: + while attempt <= config.retry_attempts: try: await client.upsert( collection_name=collection_name, @@ -242,18 +247,18 @@ async def _process_qdrant_retry_queue( logger.exception( "Retry %d/%d failed for Qdrant batch of %d points", attempt, - _qdrant_retry_attempts, + config.retry_attempts, len(batch), ) attempt += 1 - if attempt > _qdrant_retry_attempts: + if attempt > config.retry_attempts: logger.error( "Giving up on Qdrant batch after %d attempts; %d points were not indexed", - _qdrant_retry_attempts, + config.retry_attempts, len(batch), ) break - await asyncio.sleep(_qdrant_retry_backoff * attempt) + await asyncio.sleep(config.retry_backoff * attempt) continue else: logger.info( @@ -591,7 +596,8 @@ def _build_loader_orchestrator( enrichment_workers: int, upsert_buffer_size: int, max_concurrent_upserts: int, - imdb_retry_queue: IMDbRetryQueue | None = None, + imdb_config: IMDbRuntimeConfig, + qdrant_config: QdrantRuntimeConfig, ) -> tuple[LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]]: """Wire the staged loader pipeline and return the orchestrator helpers.""" @@ -600,13 +606,7 @@ def _build_loader_orchestrator( ingest_queue: IngestQueue = IngestQueue(maxsize=enrichment_workers * 2) persistence_queue: PersistenceQueue = PersistenceQueue() - - imdb_queue = imdb_retry_queue - if imdb_queue is None: - global _imdb_retry_queue - if _imdb_retry_queue is None: - _imdb_retry_queue = _IMDbRetryQueue() - imdb_queue = _imdb_retry_queue + imdb_queue = imdb_config.retry_queue upsert_capacity = asyncio.Semaphore(max_concurrent_upserts) qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() @@ -629,6 +629,7 @@ async def _upsert_aggregated( client, collection_name, list(point_chunk), + batch_size=qdrant_config.batch_size, retry_queue=qdrant_retry_queue, ) @@ -665,12 +666,12 @@ def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None: imdb_retry_queue=imdb_queue, movie_batch_size=enrichment_batch_size, episode_batch_size=enrichment_batch_size, - imdb_cache=_imdb_cache, - imdb_max_retries=_imdb_max_retries, - imdb_backoff=_imdb_backoff, - imdb_batch_limit=_imdb_batch_limit, - imdb_requests_per_window=_imdb_requests_per_window, - imdb_window_seconds=_imdb_window_seconds, + imdb_cache=imdb_config.cache, + imdb_max_retries=imdb_config.max_retries, + imdb_backoff=imdb_config.backoff, + imdb_batch_limit=IMDB_BATCH_LIMIT, + imdb_requests_per_window=imdb_config.requests_per_window, + imdb_window_seconds=imdb_config.window_seconds, ) persistence_stage = _PersistenceStage( @@ -719,41 +720,52 @@ async def run( imdb_queue_path: Path | None = None, imdb_requests_per_window: int | None = None, imdb_window_seconds: float = 1.0, - upsert_buffer_size: int = _qdrant_upsert_buffer_size, + upsert_buffer_size: int = DEFAULT_QDRANT_UPSERT_BUFFER_SIZE, plex_chunk_size: int = 200, enrichment_batch_size: int = 100, enrichment_workers: int = 4, + qdrant_batch_size: int = DEFAULT_QDRANT_BATCH_SIZE, + max_concurrent_upserts: int = DEFAULT_QDRANT_MAX_CONCURRENT_UPSERTS, + qdrant_retry_attempts: int = DEFAULT_QDRANT_RETRY_ATTEMPTS, + qdrant_retry_backoff: float = DEFAULT_QDRANT_RETRY_BACKOFF, ) -> None: """Core execution logic for the CLI.""" - global _imdb_cache - global _imdb_max_retries - global _imdb_backoff - global _imdb_retry_queue - global _imdb_requests_per_window - global _imdb_window_seconds - global _imdb_throttle - _imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None - _imdb_max_retries = imdb_max_retries - _imdb_backoff = imdb_backoff + imdb_cache = IMDbCache(imdb_cache_path) if imdb_cache_path else None if imdb_requests_per_window is not None: - _require_positive(imdb_requests_per_window, name="imdb_requests_per_window") + require_positive(imdb_requests_per_window, name="imdb_requests_per_window") if imdb_window_seconds <= 0: raise ValueError("imdb_window_seconds must be positive") - _imdb_requests_per_window = imdb_requests_per_window - _imdb_window_seconds = imdb_window_seconds - _imdb_throttle = None + if qdrant_retry_backoff <= 0: + raise ValueError("qdrant_retry_backoff must be positive") + + require_positive(upsert_buffer_size, name="upsert_buffer_size") + require_positive(plex_chunk_size, name="plex_chunk_size") + require_positive(enrichment_batch_size, name="enrichment_batch_size") + require_positive(enrichment_workers, name="enrichment_workers") + require_positive(qdrant_batch_size, name="qdrant_batch_size") + require_positive(max_concurrent_upserts, name="max_concurrent_upserts") + require_positive(qdrant_retry_attempts, name="qdrant_retry_attempts") + + imdb_retry_queue = _load_imdb_retry_queue(imdb_queue_path) if imdb_queue_path else IMDbRetryQueue() + imdb_config = IMDbRuntimeConfig( + cache=imdb_cache, + max_retries=imdb_max_retries, + backoff=imdb_backoff, + retry_queue=imdb_retry_queue, + requests_per_window=imdb_requests_per_window, + window_seconds=imdb_window_seconds, + ) + if imdb_queue_path: - _load_imdb_retry_queue(imdb_queue_path) async with httpx.AsyncClient(timeout=30) as client: - await _process_imdb_retry_queue(client) - else: - _imdb_retry_queue = _IMDbRetryQueue() + await _process_imdb_retry_queue(client, imdb_config) - _require_positive(upsert_buffer_size, name="upsert_buffer_size") - _require_positive(plex_chunk_size, name="plex_chunk_size") - _require_positive(enrichment_batch_size, name="enrichment_batch_size") - _require_positive(enrichment_workers, name="enrichment_workers") + qdrant_config = QdrantRuntimeConfig( + batch_size=qdrant_batch_size, + retry_attempts=qdrant_retry_attempts, + retry_backoff=qdrant_retry_backoff, + ) dense_size, dense_distance = _resolve_dense_model_params(dense_model_name) if qdrant_url is None and qdrant_host is None: @@ -791,8 +803,16 @@ async def run( enrichment_batch_size=enrichment_batch_size, enrichment_workers=enrichment_workers, upsert_buffer_size=upsert_buffer_size, - max_concurrent_upserts=_qdrant_max_concurrent_upserts, - imdb_retry_queue=_IMDbRetryQueue(), + max_concurrent_upserts=max_concurrent_upserts, + imdb_config=IMDbRuntimeConfig( + cache=imdb_config.cache, + max_retries=imdb_config.max_retries, + backoff=imdb_config.backoff, + retry_queue=IMDbRetryQueue(), + requests_per_window=imdb_config.requests_per_window, + window_seconds=imdb_config.window_seconds, + ), + qdrant_config=qdrant_config, ) logger.info("Starting staged loader (sample mode)") await orchestrator.run() @@ -817,7 +837,9 @@ async def run( enrichment_batch_size=enrichment_batch_size, enrichment_workers=enrichment_workers, upsert_buffer_size=upsert_buffer_size, - max_concurrent_upserts=_qdrant_max_concurrent_upserts, + max_concurrent_upserts=max_concurrent_upserts, + imdb_config=imdb_config, + qdrant_config=qdrant_config, ) logger.info("Starting staged loader (Plex mode)") await orchestrator.run() @@ -826,11 +848,14 @@ async def run( logger.info("No points to upsert") await _process_qdrant_retry_queue( - client, collection_name, qdrant_retry_queue + client, + collection_name, + qdrant_retry_queue, + config=qdrant_config, ) if imdb_queue_path: - _persist_imdb_retry_queue(imdb_queue_path) + _persist_imdb_retry_queue(imdb_queue_path, imdb_config.retry_queue) json.dump([item.model_dump(mode="json") for item in items], fp=sys.stdout, indent=2) sys.stdout.write("\n") @@ -924,7 +949,7 @@ async def run( envvar="QDRANT_UPSERT_BUFFER_SIZE", show_envvar=True, type=int, - default=_qdrant_upsert_buffer_size, + default=DEFAULT_QDRANT_UPSERT_BUFFER_SIZE, show_default=True, help="Number of media items to buffer before scheduling an async upsert", ) @@ -1126,6 +1151,10 @@ async def load_media( plex_chunk_size: int, enrichment_batch_size: int, enrichment_workers: int, + qdrant_batch_size: int = DEFAULT_QDRANT_BATCH_SIZE, + max_concurrent_upserts: int = DEFAULT_QDRANT_MAX_CONCURRENT_UPSERTS, + qdrant_retry_attempts: int = DEFAULT_QDRANT_RETRY_ATTEMPTS, + qdrant_retry_backoff: float = DEFAULT_QDRANT_RETRY_BACKOFF, ) -> None: """Orchestrate one or more runs of :func:`run`.""" @@ -1154,6 +1183,10 @@ async def load_media( plex_chunk_size, enrichment_batch_size, enrichment_workers, + qdrant_batch_size, + max_concurrent_upserts, + qdrant_retry_attempts, + qdrant_retry_backoff, ) if not continuous: break diff --git a/mcp_plex/loader/pipeline/channels.py b/mcp_plex/loader/pipeline/channels.py index 271c1e1..8de20bb 100644 --- a/mcp_plex/loader/pipeline/channels.py +++ b/mcp_plex/loader/pipeline/channels.py @@ -124,19 +124,6 @@ def snapshot(self) -> list[str]: return list(self._items) -# Backwards-compatible aliases for private imports while callers migrate. -_MovieBatch = MovieBatch -_EpisodeBatch = EpisodeBatch -_SampleBatch = SampleBatch -_IngestBatch = IngestBatch -_INGEST_DONE = INGEST_DONE -_PERSIST_DONE = PERSIST_DONE -_IngestQueue = IngestQueue -_PersistenceQueue = PersistenceQueue -_require_positive = require_positive -_chunk_sequence = chunk_sequence -_IMDbRetryQueue = IMDbRetryQueue - __all__ = [ "MovieBatch", "EpisodeBatch", @@ -149,15 +136,4 @@ def snapshot(self) -> list[str]: "require_positive", "chunk_sequence", "IMDbRetryQueue", - "_MovieBatch", - "_EpisodeBatch", - "_SampleBatch", - "_IngestBatch", - "_INGEST_DONE", - "_PERSIST_DONE", - "_IngestQueue", - "_PersistenceQueue", - "_require_positive", - "_chunk_sequence", - "_IMDbRetryQueue", ] diff --git a/mcp_plex/loader/pipeline/ingestion.py b/mcp_plex/loader/pipeline/ingestion.py index dc56b0e..54eccee 100644 --- a/mcp_plex/loader/pipeline/ingestion.py +++ b/mcp_plex/loader/pipeline/ingestion.py @@ -18,7 +18,6 @@ MovieBatch, SampleBatch, chunk_sequence, - _chunk_sequence, ) @@ -122,7 +121,7 @@ async def _ingest_plex( movies_source = movies_attr() if callable(movies_attr) else movies_attr movies = list(movies_source) for batch_index, chunk in enumerate( - _chunk_sequence(movies, movie_batch_size), start=1 + chunk_sequence(movies, movie_batch_size), start=1 ): batch_movies = list(chunk) if not batch_movies: @@ -150,7 +149,7 @@ async def _ingest_plex( ) episodes = list(episodes_source) for batch_index, chunk in enumerate( - _chunk_sequence(episodes, episode_batch_size), start=1 + chunk_sequence(episodes, episode_batch_size), start=1 ): batch_episodes = list(chunk) if not batch_episodes: diff --git a/pyproject.toml b/pyproject.toml index c3a53d8..c449a5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-plex" -version = "0.26.78" +version = "0.26.79" description = "Plex-Oriented Model Context Protocol Server" requires-python = ">=3.11,<3.13" diff --git a/tests/test_loader_integration.py b/tests/test_loader_integration.py index f352066..e0a3605 100644 --- a/tests/test_loader_integration.py +++ b/tests/test_loader_integration.py @@ -45,7 +45,7 @@ async def create_payload_index( ) -async def _run_loader(sample_dir: Path) -> None: +async def _run_loader(sample_dir: Path, **kwargs) -> None: await loader.run( None, None, @@ -53,6 +53,7 @@ async def _run_loader(sample_dir: Path) -> None: sample_dir, None, None, + **kwargs, ) @@ -113,7 +114,7 @@ def test_run_processes_imdb_queue(monkeypatch, tmp_path): queue_file.write_text(json.dumps(["tt0111161"])) sample_dir = Path(__file__).resolve().parents[1] / "sample-data" - async def fake_fetch(client, imdb_id): + async def fake_fetch(client, imdb_id, config): return None monkeypatch.setattr(loader, "_fetch_imdb", fake_fetch) @@ -135,11 +136,10 @@ async def fake_fetch(client, imdb_id): def test_run_upserts_in_batches(monkeypatch): monkeypatch.setattr(loader, "AsyncQdrantClient", CaptureClient) - monkeypatch.setattr(loader, "_qdrant_batch_size", 1) CaptureClient.captured_points = [] CaptureClient.upsert_calls = 0 sample_dir = Path(__file__).resolve().parents[1] / "sample-data" - asyncio.run(_run_loader(sample_dir)) + asyncio.run(_run_loader(sample_dir, qdrant_batch_size=1)) assert CaptureClient.upsert_calls == 2 assert len(CaptureClient.captured_points) == 2 diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index c5626c2..a0a8262 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -71,7 +71,6 @@ async def invoke(): def test_run_limits_concurrent_upserts(monkeypatch): monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient) sample_dir = Path(__file__).resolve().parents[1] / "sample-data" - monkeypatch.setattr(loader, "_qdrant_max_concurrent_upserts", 1) concurrency = {"current": 0, "max": 0} started = asyncio.Queue() @@ -101,7 +100,16 @@ async def fake_upsert(client, collection_name, points, **kwargs): async def invoke(): run_task = asyncio.create_task( - loader.run(None, None, None, sample_dir, None, None, upsert_buffer_size=1) + loader.run( + None, + None, + None, + sample_dir, + None, + None, + upsert_buffer_size=1, + max_concurrent_upserts=1, + ) ) await asyncio.wait_for(started.get(), timeout=1) assert not third_requested.is_set() diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index 8c02cef..d0adaee 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -12,6 +12,9 @@ from mcp_plex import loader from mcp_plex.loader.imdb_cache import IMDbCache from mcp_plex.loader import ( + IMDbRuntimeConfig, + QdrantRuntimeConfig, + _build_loader_orchestrator, _fetch_imdb, _load_from_sample, _load_imdb_retry_queue, @@ -20,6 +23,7 @@ _resolve_dense_model_params, build_point, ) +from mcp_plex.loader.pipeline.channels import IMDbRetryQueue from mcp_plex.common.types import ( AggregatedItem, IMDbName, @@ -32,6 +36,25 @@ ) +def make_imdb_config( + *, + cache: IMDbCache | None = None, + max_retries: int = 3, + backoff: float = 1.0, + retry_queue: IMDbRetryQueue | None = None, + requests_per_window: int | None = None, + window_seconds: float = 1.0, +) -> IMDbRuntimeConfig: + return IMDbRuntimeConfig( + cache=cache, + max_retries=max_retries, + backoff=backoff, + retry_queue=retry_queue or IMDbRetryQueue(), + requests_per_window=requests_per_window, + window_seconds=window_seconds, + ) + + def test_loader_import_fallback(monkeypatch): real_import = builtins.__import__ @@ -52,9 +75,9 @@ def test_load_from_sample_returns_items(): assert {i.plex.type for i in items} == {"movie", "episode"} -def test_fetch_imdb_cache_miss(tmp_path, monkeypatch): +def test_fetch_imdb_cache_miss(tmp_path): cache_path = tmp_path / "cache.json" - monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) + config = make_imdb_config(cache=IMDbCache(cache_path)) calls = 0 @@ -67,7 +90,7 @@ async def imdb_mock(request): async def main(): async with httpx.AsyncClient(transport=httpx.MockTransport(imdb_mock)) as client: - result = await _fetch_imdb(client, "tt1") + result = await _fetch_imdb(client, "tt1", config) assert result is not None asyncio.run(main()) @@ -76,19 +99,19 @@ async def main(): assert data["tt1"]["id"] == "tt1" -def test_fetch_imdb_cache_hit(tmp_path, monkeypatch): +def test_fetch_imdb_cache_hit(tmp_path): cache_path = tmp_path / "cache.json" cache_path.write_text( json.dumps({"tt1": {"id": "tt1", "type": "movie", "primaryTitle": "T"}}) ) - monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) + config = make_imdb_config(cache=IMDbCache(cache_path)) async def error_mock(request): raise AssertionError("network should not be called") async def main(): async with httpx.AsyncClient(transport=httpx.MockTransport(error_mock)) as client: - result = await _fetch_imdb(client, "tt1") + result = await _fetch_imdb(client, "tt1", config) assert result is not None assert result.id == "tt1" @@ -97,9 +120,11 @@ async def main(): def test_fetch_imdb_retries_on_429(monkeypatch, tmp_path): cache_path = tmp_path / "cache.json" - monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) - monkeypatch.setattr(loader, "_imdb_max_retries", 5) - monkeypatch.setattr(loader, "_imdb_backoff", 0.1) + config = make_imdb_config( + cache=IMDbCache(cache_path), + max_retries=5, + backoff=0.1, + ) call_count = 0 @@ -121,7 +146,7 @@ async def fake_sleep(seconds: float) -> None: async def main(): async with httpx.AsyncClient(transport=httpx.MockTransport(imdb_mock)) as client: - result = await _fetch_imdb(client, "tt1") + result = await _fetch_imdb(client, "tt1", config) assert result is not None asyncio.run(main()) @@ -129,12 +154,9 @@ async def main(): assert delays == [0.1, 0.2] -def test_imdb_retry_queue_persists_and_retries(tmp_path, monkeypatch): +def test_imdb_retry_queue_persists_and_retries(tmp_path): cache_path = tmp_path / "cache.json" queue_path = tmp_path / "queue.json" - monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path)) - monkeypatch.setattr(loader, "_imdb_max_retries", 0) - monkeypatch.setattr(loader, "_imdb_backoff", 0) async def first_transport(request): return httpx.Response(429) @@ -149,50 +171,63 @@ async def second_transport(request): }, ) - async def first_run(): - _load_imdb_retry_queue(queue_path) + async def first_run() -> IMDbRuntimeConfig: + queue = _load_imdb_retry_queue(queue_path) + config = make_imdb_config( + cache=IMDbCache(cache_path), + max_retries=0, + backoff=0, + retry_queue=queue, + ) async with httpx.AsyncClient(transport=httpx.MockTransport(first_transport)) as client: - await _process_imdb_retry_queue(client) - await _fetch_imdb(client, "tt0111161") - _persist_imdb_retry_queue(queue_path) + await _process_imdb_retry_queue(client, config) + await _fetch_imdb(client, "tt0111161", config) + _persist_imdb_retry_queue(queue_path, config.retry_queue) + return config asyncio.run(first_run()) assert json.loads(queue_path.read_text()) == ["tt0111161"] - async def second_run(): - _load_imdb_retry_queue(queue_path) - assert loader._imdb_retry_queue is not None - assert loader._imdb_retry_queue.qsize() == 1 - assert loader._imdb_retry_queue.snapshot() == ["tt0111161"] + async def second_run() -> IMDbRuntimeConfig: + queue = _load_imdb_retry_queue(queue_path) + config = make_imdb_config( + cache=IMDbCache(cache_path), + max_retries=0, + backoff=0, + retry_queue=queue, + ) + assert config.retry_queue.qsize() == 1 + assert config.retry_queue.snapshot() == ["tt0111161"] async with httpx.AsyncClient(transport=httpx.MockTransport(second_transport)) as client: - await _process_imdb_retry_queue(client) - _persist_imdb_retry_queue(queue_path) + await _process_imdb_retry_queue(client, config) + _persist_imdb_retry_queue(queue_path, config.retry_queue) + return config - asyncio.run(second_run()) + second_config = asyncio.run(second_run()) assert json.loads(queue_path.read_text()) == [] - assert loader._imdb_cache.get("tt0111161") is not None + assert second_config.cache is not None + assert second_config.cache.get("tt0111161") is not None def test_load_imdb_retry_queue_invalid_json(tmp_path): path = tmp_path / "queue.json" path.write_text("not json") - _load_imdb_retry_queue(path) - assert loader._imdb_retry_queue is not None - assert loader._imdb_retry_queue.qsize() == 0 + queue = _load_imdb_retry_queue(path) + assert queue.qsize() == 0 def test_process_imdb_retry_queue_requeues(monkeypatch): - queue = loader._IMDbRetryQueue(["tt0111161"]) - monkeypatch.setattr(loader, "_imdb_retry_queue", queue) + queue = IMDbRetryQueue(["tt0111161"]) + config = make_imdb_config(retry_queue=queue, max_retries=0, backoff=0) - async def fake_fetch(client, imdb_id): + async def fake_fetch(client, imdb_id, runtime_config): return None monkeypatch.setattr(loader, "_fetch_imdb", fake_fetch) async def run_test(): async with httpx.AsyncClient() as client: - await _process_imdb_retry_queue(client) + await _process_imdb_retry_queue(client, config) asyncio.run(run_test()) assert queue.qsize() == 1 @@ -209,8 +244,14 @@ async def upsert(self, collection_name: str, points, **kwargs): client = DummyClient() points = [models.PointStruct(id=i, vector={}, payload={}) for i in range(3)] - monkeypatch.setattr(loader, "_qdrant_batch_size", 1) - asyncio.run(loader._upsert_in_batches(client, "c", points)) + asyncio.run( + loader._upsert_in_batches( + client, + "c", + points, + batch_size=1, + ) + ) assert client.calls == 3 @@ -227,13 +268,13 @@ async def upsert(self, collection_name: str, points, **kwargs): client = DummyClient() points = [models.PointStruct(id=i, vector={}, payload={}) for i in range(2)] retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue() - monkeypatch.setattr(loader, "_qdrant_batch_size", 1) async def main() -> None: await loader._upsert_in_batches( client, "collection", points, + batch_size=1, retry_queue=retry_queue, ) @@ -258,8 +299,7 @@ async def main() -> None: retry_queue.put_nowait( [models.PointStruct(id=1, vector={}, payload={})] ) - monkeypatch.setattr(loader, "_qdrant_retry_attempts", 2) - monkeypatch.setattr(loader, "_qdrant_retry_backoff", 0.01) + config = QdrantRuntimeConfig(retry_attempts=2, retry_backoff=0.01) sleeps: list[float] = [] @@ -268,7 +308,12 @@ async def fake_sleep(delay: float) -> None: monkeypatch.setattr(loader.asyncio, "sleep", fake_sleep) - await loader._process_qdrant_retry_queue(client, "collection", retry_queue) + await loader._process_qdrant_retry_queue( + client, + "collection", + retry_queue, + config=config, + ) assert sleeps == [0.02] asyncio.run(main()) @@ -288,22 +333,22 @@ def test_resolve_dense_model_params_unknown_model(): def test_imdb_retry_queue_desync_errors(): - queue = loader._IMDbRetryQueue(["tt1"]) + queue = IMDbRetryQueue(["tt1"]) queue._items.clear() with pytest.raises(RuntimeError, match="Queue is not empty"): queue.get_nowait() - queue = loader._IMDbRetryQueue(["tt2"]) + queue = IMDbRetryQueue(["tt2"]) queue._queue.clear() # type: ignore[attr-defined] with pytest.raises(RuntimeError, match="asyncio.Queue is empty"): queue.get_nowait() -def test_persist_imdb_retry_queue_noop(tmp_path, monkeypatch): - monkeypatch.setattr(loader, "_imdb_retry_queue", None) +def test_persist_imdb_retry_queue_writes_snapshot(tmp_path): path = tmp_path / "retry.json" - loader._persist_imdb_retry_queue(path) - assert not path.exists() + queue = IMDbRetryQueue(["tt1"]) + _persist_imdb_retry_queue(path, queue) + assert json.loads(path.read_text()) == ["tt1"] def test_ensure_collection_skips_existing(): @@ -387,7 +432,10 @@ async def record_upsert(client, collection_name: str, points, **kwargs): monkeypatch.setattr(loader, "_upsert_in_batches", record_upsert) - orchestrator, processed_items, _ = loader._build_loader_orchestrator( + imdb_config = make_imdb_config() + qdrant_config = QdrantRuntimeConfig(batch_size=1) + + orchestrator, processed_items, _ = _build_loader_orchestrator( client=object(), collection_name="media-items", dense_model_name="BAAI/bge-small-en-v1.5", @@ -400,6 +448,8 @@ async def record_upsert(client, collection_name: str, points, **kwargs): enrichment_workers=2, upsert_buffer_size=1, max_concurrent_upserts=1, + imdb_config=imdb_config, + qdrant_config=qdrant_config, ) asyncio.run(orchestrator.run()) diff --git a/tests/test_persistence_stage.py b/tests/test_persistence_stage.py index d12dd45..b8c1de8 100644 --- a/tests/test_persistence_stage.py +++ b/tests/test_persistence_stage.py @@ -148,6 +148,7 @@ async def upsert_fn(batch: list[list[int]]) -> None: _FailingClient(), "media-items", batch, + batch_size=1, retry_queue=retry_queue, ) diff --git a/uv.lock b/uv.lock index ba8e51e..0140fa8 100644 --- a/uv.lock +++ b/uv.lock @@ -730,7 +730,7 @@ wheels = [ [[package]] name = "mcp-plex" -version = "0.26.78" +version = "0.26.79" source = { editable = "." } dependencies = [ { name = "fastapi" },