Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
from .imdb_cache import IMDbCache
from .pipeline.channels import (
IMDbRetryQueue,
INGEST_DONE,
IngestBatch,
IngestQueue,
MovieBatch,
EpisodeBatch,
PersistenceQueue,
SampleBatch,
chunk_sequence,
require_positive,
Expand Down Expand Up @@ -81,9 +84,12 @@
_EpisodeBatch = EpisodeBatch
_SampleBatch = SampleBatch
_IngestBatch = IngestBatch
_IngestQueue = IngestQueue
_PersistenceQueue = PersistenceQueue
_require_positive = require_positive
_chunk_sequence = chunk_sequence
_IMDbRetryQueue = IMDbRetryQueue
_INGEST_DONE = INGEST_DONE


def _is_local_qdrant(client: AsyncQdrantClient) -> bool:
Expand Down Expand Up @@ -1000,12 +1006,10 @@ def __init__(
if self._sample_items is None and not self._tmdb_api_key:
raise RuntimeError("TMDB API key required for live ingestion")

self._ingest_queue: asyncio.Queue[_IngestBatch | None] = asyncio.Queue(
self._ingest_queue: IngestQueue = asyncio.Queue(
maxsize=self._enrichment_workers * 2
)
self._points_queue: asyncio.Queue[list[models.PointStruct] | None] = (
asyncio.Queue()
)
self._points_queue: PersistenceQueue = asyncio.Queue()
self._upsert_capacity = asyncio.Semaphore(self._max_concurrent_upserts)
self._items: list[AggregatedItem] = []
self._qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = (
Expand Down Expand Up @@ -1161,7 +1165,7 @@ async def _ingest_from_plex(self) -> None:
async def _enrichment_worker(self, worker_id: int) -> None:
while True:
batch = await self._ingest_queue.get()
if batch is None:
if batch is None or batch is INGEST_DONE:
self._ingest_queue.task_done()
break
try:
Expand Down
41 changes: 39 additions & 2 deletions mcp_plex/loader/pipeline/channels.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""Batch container and helper utilities shared across loader stages."""
"""Batch container and helper utilities shared across loader stages.

The queues exported here intentionally share sentinel objects so stage-specific
integration tests can assert on hand-off behavior without duplicating
constants. The loader still emits ``None`` as a completion token for
compatibility while downstream components migrate to sentinel-only signaling.
"""
from __future__ import annotations

import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Iterable, Sequence, TypeVar
from typing import TYPE_CHECKING, Any, Final, Iterable, Sequence, TypeVar, TypeAlias

from ...common.types import AggregatedItem

Expand All @@ -15,6 +21,22 @@

T = TypeVar("T")

if TYPE_CHECKING:
from qdrant_client import models


INGEST_DONE: Final = object()
"""Sentinel object signaling that ingestion has completed.

The loader currently places ``None`` on ingestion queues in addition to this
sentinel so legacy listeners that only check for ``None`` continue to work.
"""

if TYPE_CHECKING:
PersistencePayload: TypeAlias = list[models.PointStruct]
else: # pragma: no cover - runtime fallback for typing-only alias
PersistencePayload: TypeAlias = list[Any]


@dataclass(slots=True)
class MovieBatch:
Expand All @@ -40,6 +62,12 @@ class SampleBatch:

IngestBatch = MovieBatch | EpisodeBatch | SampleBatch

IngestQueueItem: TypeAlias = IngestBatch | None | object
PersistenceQueueItem: TypeAlias = PersistencePayload | None

IngestQueue: TypeAlias = asyncio.Queue[IngestQueueItem]
PersistenceQueue: TypeAlias = asyncio.Queue[PersistenceQueueItem]


def require_positive(value: int, *, name: str) -> int:
"""Return *value* if positive, otherwise raise a ``ValueError``."""
Expand Down Expand Up @@ -98,6 +126,9 @@ def snapshot(self) -> list[str]:
_EpisodeBatch = EpisodeBatch
_SampleBatch = SampleBatch
_IngestBatch = IngestBatch
_INGEST_DONE = INGEST_DONE
_IngestQueue = IngestQueue
_PersistenceQueue = PersistenceQueue
_require_positive = require_positive
_chunk_sequence = chunk_sequence
_IMDbRetryQueue = IMDbRetryQueue
Expand All @@ -107,13 +138,19 @@ def snapshot(self) -> list[str]:
"EpisodeBatch",
"SampleBatch",
"IngestBatch",
"INGEST_DONE",
"IngestQueue",
"PersistenceQueue",
"require_positive",
"chunk_sequence",
"IMDbRetryQueue",
"_MovieBatch",
"_EpisodeBatch",
"_SampleBatch",
"_IngestBatch",
"_INGEST_DONE",
"_IngestQueue",
"_PersistenceQueue",
"_require_positive",
"_chunk_sequence",
"_IMDbRetryQueue",
Expand Down