diff --git a/mcp_plex/loader/__init__.py b/mcp_plex/loader/__init__.py index 03b0713..47106dd 100644 --- a/mcp_plex/loader/__init__.py +++ b/mcp_plex/loader/__init__.py @@ -26,9 +26,12 @@ from .imdb_cache import IMDbCache from .pipeline.channels import ( IMDbRetryQueue, + INGEST_DONE, IngestBatch, + IngestQueue, MovieBatch, EpisodeBatch, + PersistenceQueue, SampleBatch, chunk_sequence, require_positive, @@ -81,9 +84,12 @@ _EpisodeBatch = EpisodeBatch _SampleBatch = SampleBatch _IngestBatch = IngestBatch +_IngestQueue = IngestQueue +_PersistenceQueue = PersistenceQueue _require_positive = require_positive _chunk_sequence = chunk_sequence _IMDbRetryQueue = IMDbRetryQueue +_INGEST_DONE = INGEST_DONE def _is_local_qdrant(client: AsyncQdrantClient) -> bool: @@ -1000,12 +1006,10 @@ def __init__( if self._sample_items is None and not self._tmdb_api_key: raise RuntimeError("TMDB API key required for live ingestion") - self._ingest_queue: asyncio.Queue[_IngestBatch | None] = asyncio.Queue( + self._ingest_queue: IngestQueue = asyncio.Queue( maxsize=self._enrichment_workers * 2 ) - self._points_queue: asyncio.Queue[list[models.PointStruct] | None] = ( - asyncio.Queue() - ) + self._points_queue: PersistenceQueue = asyncio.Queue() self._upsert_capacity = asyncio.Semaphore(self._max_concurrent_upserts) self._items: list[AggregatedItem] = [] self._qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = ( @@ -1161,7 +1165,7 @@ async def _ingest_from_plex(self) -> None: async def _enrichment_worker(self, worker_id: int) -> None: while True: batch = await self._ingest_queue.get() - if batch is None: + if batch is None or batch is INGEST_DONE: self._ingest_queue.task_done() break try: diff --git a/mcp_plex/loader/pipeline/channels.py b/mcp_plex/loader/pipeline/channels.py index 6c605c2..a26aab3 100644 --- a/mcp_plex/loader/pipeline/channels.py +++ b/mcp_plex/loader/pipeline/channels.py @@ -1,10 +1,16 @@ -"""Batch container and helper utilities shared across loader stages.""" +"""Batch container and helper utilities shared across loader stages. + +The queues exported here intentionally share sentinel objects so stage-specific +integration tests can assert on hand-off behavior without duplicating +constants. The loader still emits ``None`` as a completion token for +compatibility while downstream components migrate to sentinel-only signaling. +""" from __future__ import annotations import asyncio from collections import deque from dataclasses import dataclass -from typing import Iterable, Sequence, TypeVar +from typing import TYPE_CHECKING, Any, Final, Iterable, Sequence, TypeVar, TypeAlias from ...common.types import AggregatedItem @@ -15,6 +21,22 @@ T = TypeVar("T") +if TYPE_CHECKING: + from qdrant_client import models + + +INGEST_DONE: Final = object() +"""Sentinel object signaling that ingestion has completed. + +The loader currently places ``None`` on ingestion queues in addition to this +sentinel so legacy listeners that only check for ``None`` continue to work. +""" + +if TYPE_CHECKING: + PersistencePayload: TypeAlias = list[models.PointStruct] +else: # pragma: no cover - runtime fallback for typing-only alias + PersistencePayload: TypeAlias = list[Any] + @dataclass(slots=True) class MovieBatch: @@ -40,6 +62,12 @@ class SampleBatch: IngestBatch = MovieBatch | EpisodeBatch | SampleBatch +IngestQueueItem: TypeAlias = IngestBatch | None | object +PersistenceQueueItem: TypeAlias = PersistencePayload | None + +IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem] +PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem] + def require_positive(value: int, *, name: str) -> int: """Return *value* if positive, otherwise raise a ``ValueError``.""" @@ -98,6 +126,9 @@ def snapshot(self) -> list[str]: _EpisodeBatch = EpisodeBatch _SampleBatch = SampleBatch _IngestBatch = IngestBatch +_INGEST_DONE = INGEST_DONE +_IngestQueue = IngestQueue +_PersistenceQueue = PersistenceQueue _require_positive = require_positive _chunk_sequence = chunk_sequence _IMDbRetryQueue = IMDbRetryQueue @@ -107,6 +138,9 @@ def snapshot(self) -> list[str]: "EpisodeBatch", "SampleBatch", "IngestBatch", + "INGEST_DONE", + "IngestQueue", + "PersistenceQueue", "require_positive", "chunk_sequence", "IMDbRetryQueue", @@ -114,6 +148,9 @@ def snapshot(self) -> list[str]: "_EpisodeBatch", "_SampleBatch", "_IngestBatch", + "_INGEST_DONE", + "_IngestQueue", + "_PersistenceQueue", "_require_positive", "_chunk_sequence", "_IMDbRetryQueue",