Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 3 additions & 23 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
This package now centres its public API on :class:`LoaderOrchestrator`,
the coordination layer for the ingestion, enrichment, and persistence
stages. The historical pipeline implementation remains available as
``LegacyLoaderPipeline`` for the CLI while the original ``LoaderPipeline``
name resolves to the orchestrator with a deprecation warning.
``LoaderPipeline`` for reference, but new code should instantiate the
staged classes directly and coordinate them with
:class:`LoaderOrchestrator`.
"""
from __future__ import annotations

Expand Down Expand Up @@ -1346,12 +1347,6 @@ def _handle_upsert_batch(
)


# Preserve the legacy pipeline implementation for the CLI while exposing the
# orchestrator as the public API moving forward.
LegacyLoaderPipeline = LoaderPipeline
del LoaderPipeline


def _build_loader_orchestrator(
*,
client: AsyncQdrantClient,
Expand Down Expand Up @@ -1474,21 +1469,6 @@ def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None:
return orchestrator, items, persistence_stage.retry_queue


def __getattr__(name: str) -> Any:
"""Provide deprecated access to :class:`LoaderOrchestrator`."""

if name == "LoaderPipeline":
warnings.warn(
"LoaderPipeline has been renamed to LoaderOrchestrator; import "
"mcp_plex.loader.LoaderOrchestrator instead.",
DeprecationWarning,
stacklevel=2,
)
globals()[name] = LoaderOrchestrator
return LoaderOrchestrator
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


async def run(
plex_url: Optional[str],
plex_token: Optional[str],
Expand Down
17 changes: 9 additions & 8 deletions tests/test_loader_alias.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
"""Compatibility tests for loader public API aliases."""

import importlib
import warnings

import pytest

def test_loader_pipeline_alias_emits_deprecation_warning() -> None:
"""The legacy LoaderPipeline export should point at the orchestrator."""

def test_loader_pipeline_alias_removed() -> None:
"""Ensure the legacy alias no longer exposes the orchestrator."""

module = importlib.import_module("mcp_plex.loader")
module = importlib.reload(module)
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always", DeprecationWarning)
alias = getattr(module, "LoaderPipeline")

assert alias is module.LoaderOrchestrator
assert any(issubclass(w.category, DeprecationWarning) for w in caught)
loader_cls = getattr(module, "LoaderPipeline")
assert hasattr(loader_cls, "execute"), "expected legacy LoaderPipeline class"

with pytest.raises(AttributeError):
getattr(module, "LegacyLoaderPipeline")
10 changes: 5 additions & 5 deletions tests/test_loader_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ async def upsert(self, *args, **kwargs):
def test_run_logs_upsert(monkeypatch, caplog):
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
with caplog.at_level(logging.INFO, logger="mcp_plex.loader"):
with caplog.at_level(logging.INFO):
asyncio.run(loader.run(None, None, None, sample_dir, None, None))
assert "Starting staged loader (sample mode)" in caplog.text
assert "Upsert worker 0 handling 2 points" in caplog.text
assert "Upsert worker 0 processed 2 items" in caplog.text
assert "Loaded 2 items" in caplog.text
assert "Upsert worker" in caplog.text
assert "handling 2 points" in caplog.text
assert "processed 2 items" in caplog.text


def test_run_logs_no_points(monkeypatch, caplog):
monkeypatch.setattr(loader, "AsyncQdrantClient", DummyClient)
monkeypatch.setattr(loader, "_load_from_sample", lambda _: [])
sample_dir = Path(__file__).resolve().parents[1] / "sample-data"
with caplog.at_level(logging.INFO, logger="mcp_plex.loader"):
with caplog.at_level(logging.INFO):
asyncio.run(loader.run(None, None, None, sample_dir, None, None))
assert "Loaded 0 items" in caplog.text
assert "No points to upsert" in caplog.text
Expand Down
13 changes: 7 additions & 6 deletions tests/test_loader_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from mcp_plex import loader
from mcp_plex.loader.imdb_cache import IMDbCache
from mcp_plex.loader import (
LegacyLoaderPipeline as LoaderPipeline,
_build_plex_item,
_extract_external_ids,
_fetch_imdb,
Expand Down Expand Up @@ -721,7 +720,9 @@ def test_build_point_includes_metadata():


def test_loader_pipeline_processes_sample_batches(monkeypatch):
items = _load_from_sample(Path(__file__).resolve().parents[1] / "sample-data")
sample_items = _load_from_sample(
Path(__file__).resolve().parents[1] / "sample-data"
)

recorded_batches: list[list[models.PointStruct]] = []

Expand All @@ -730,13 +731,13 @@ async def record_upsert(client, collection_name: str, points, **kwargs):

monkeypatch.setattr(loader, "_upsert_in_batches", record_upsert)

pipeline = LoaderPipeline(
orchestrator, processed_items, _ = loader._build_loader_orchestrator(
client=object(),
collection_name="media-items",
dense_model_name="BAAI/bge-small-en-v1.5",
sparse_model_name="Qdrant/bm42-all-minilm-l6-v2-attentions",
tmdb_api_key=None,
sample_items=items,
sample_items=sample_items,
plex_server=None,
plex_chunk_size=10,
enrichment_batch_size=1,
Expand All @@ -745,9 +746,9 @@ async def record_upsert(client, collection_name: str, points, **kwargs):
max_concurrent_upserts=1,
)

asyncio.run(pipeline.execute())
asyncio.run(orchestrator.run())

assert len(pipeline.items) == len(items)
assert len(processed_items) == len(sample_items)
assert recorded_batches, "expected pipeline to emit upsert batches"
payload = recorded_batches[0][0].payload
assert payload["title"]
Expand Down