From 742339e42e7a936462e2c622de596cd1a6260e33 Mon Sep 17 00:00:00 2001 From: Teagan Glenn Date: Sun, 5 Oct 2025 21:42:41 -0600 Subject: [PATCH] refactor(loader): remove legacy pipeline alias --- mcp_plex/loader/__init__.py | 26 +++----------------------- tests/test_loader_alias.py | 17 +++++++++-------- tests/test_loader_logging.py | 10 +++++----- tests/test_loader_unit.py | 13 +++++++------ 4 files changed, 24 insertions(+), 42 deletions(-) diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index 5534843..bf88211 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -3,8 +3,9 @@ This package now centres its public API on :class:`LoaderOrchestrator`, the coordination layer for the ingestion, enrichment, and persistence stages. The historical pipeline implementation remains available as -``LegacyLoaderPipeline`` for the CLI while the original ``LoaderPipeline`` -name resolves to the orchestrator with a deprecation warning. +``LoaderPipeline`` for reference, but new code should instantiate the +staged classes directly and coordinate them with +:class:`LoaderOrchestrator`. """ from __future__ import annotations @@ -1346,12 +1347,6 @@ def _handle_upsert_batch( ) -# Preserve the legacy pipeline implementation for the CLI while exposing the -# orchestrator as the public API moving forward. -LegacyLoaderPipeline = LoaderPipeline -del LoaderPipeline - - def _build_loader_orchestrator( *, client: AsyncQdrantClient, @@ -1474,21 +1469,6 @@ def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None: return orchestrator, items, persistence_stage.retry_queue -def __getattr__(name: str) -> Any: - """Provide deprecated access to :class:`LoaderOrchestrator`.""" - - if name == "LoaderPipeline": - warnings.warn( - "LoaderPipeline has been renamed to LoaderOrchestrator; import " - "mcp_plex.loader.LoaderOrchestrator instead.", - DeprecationWarning, - stacklevel=2, - ) - globals()[name] = LoaderOrchestrator - return LoaderOrchestrator - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - - async def run( plex_url: Optional[str], plex_token: Optional[str], diff --git a/tests/test_loader_alias.py b/tests/test_loader_alias.py index e15f95c..f13c461 100644 --- a/tests/test_loader_alias.py +++ b/tests/test_loader_alias.py @@ -1,17 +1,18 @@ """Compatibility tests for loader public API aliases.""" import importlib -import warnings +import pytest -def test_loader_pipeline_alias_emits_deprecation_warning() -> None: - """The legacy LoaderPipeline export should point at the orchestrator.""" + +def test_loader_pipeline_alias_removed() -> None: + """Ensure the legacy alias no longer exposes the orchestrator.""" module = importlib.import_module("mcp_plex.loader") module = importlib.reload(module) - with warnings.catch_warnings(record=True) as caught: - warnings.simplefilter("always", DeprecationWarning) - alias = getattr(module, "LoaderPipeline") - assert alias is module.LoaderOrchestrator - assert any(issubclass(w.category, DeprecationWarning) for w in caught) + loader_cls = getattr(module, "LoaderPipeline") + assert hasattr(loader_cls, "execute"), "expected legacy LoaderPipeline class" + + with pytest.raises(AttributeError): + getattr(module, "LegacyLoaderPipeline") diff --git a/tests/test_loader_logging.py b/tests/test_loader_logging.py index 80fdaf3..c5626c2 100644 --- a/tests/test_loader_logging.py +++ b/tests/test_loader_logging.py @@ -31,19 +31,19 @@ async def upsert(self, *args, **kwargs): def test_run_logs_upsert(monkeypatch, caplog): monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient) sample_dir = Path(__file__).resolve().parents[1] / "sample-data" - with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): + with caplog.at_level(logging.INFO): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) + assert "Starting staged loader (sample mode)" in caplog.text + assert "Upsert worker 0 handling 2 points" in caplog.text + assert "Upsert worker 0 processed 2 items" in caplog.text assert "Loaded 2 items" in caplog.text - assert "Upsert worker" in caplog.text - assert "handling 2 points" in caplog.text - assert "processed 2 items" in caplog.text def test_run_logs_no_points(monkeypatch, caplog): monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient) monkeypatch.setattr(loader, "_load_from_sample", lambda _: []) sample_dir = Path(__file__).resolve().parents[1] / "sample-data" - with caplog.at_level(logging.INFO, logger="mcp_plex.loader"): + with caplog.at_level(logging.INFO): asyncio.run(loader.run(None, None, None, sample_dir, None, None)) assert "Loaded 0 items" in caplog.text assert "No points to upsert" in caplog.text diff --git a/tests/test_loader_unit.py b/tests/test_loader_unit.py index d3108a9..6dffa39 100644 --- a/tests/test_loader_unit.py +++ b/tests/test_loader_unit.py @@ -13,7 +13,6 @@ from mcp_plex import loader from mcp_plex.loader.imdb_cache import IMDbCache from mcp_plex.loader import ( - LegacyLoaderPipeline as LoaderPipeline, _build_plex_item, _extract_external_ids, _fetch_imdb, @@ -721,7 +720,9 @@ def test_build_point_includes_metadata(): def test_loader_pipeline_processes_sample_batches(monkeypatch): - items = _load_from_sample(Path(__file__).resolve().parents[1] / "sample-data") + sample_items = _load_from_sample( + Path(__file__).resolve().parents[1] / "sample-data" + ) recorded_batches: list[list[models.PointStruct]] = [] @@ -730,13 +731,13 @@ async def record_upsert(client, collection_name: str, points, **kwargs): monkeypatch.setattr(loader, "_upsert_in_batches", record_upsert) - pipeline = LoaderPipeline( + orchestrator, processed_items, _ = loader._build_loader_orchestrator( client=object(), collection_name="media-items", dense_model_name="BAAI/bge-small-en-v1.5", sparse_model_name="Qdrant/bm42-all-minilm-l6-v2-attentions", tmdb_api_key=None, - sample_items=items, + sample_items=sample_items, plex_server=None, plex_chunk_size=10, enrichment_batch_size=1, @@ -745,9 +746,9 @@ async def record_upsert(client, collection_name: str, points, **kwargs): max_concurrent_upserts=1, ) - asyncio.run(pipeline.execute()) + asyncio.run(orchestrator.run()) - assert len(pipeline.items) == len(items) + assert len(processed_items) == len(sample_items) assert recorded_batches, "expected pipeline to emit upsert batches" payload = recorded_batches[0][0].payload assert payload["title"]