Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions mcp_plex/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,122 @@ def _handle_upsert_batch(
del LoaderPipeline


def _build_loader_orchestrator(
*,
client: AsyncQdrantClient,
collection_name: str,
dense_model_name: str,
sparse_model_name: str,
tmdb_api_key: str | None,
sample_items: Sequence[AggregatedItem] | None,
plex_server: PlexServer | None,
plex_chunk_size: int,
enrichment_batch_size: int,
enrichment_workers: int,
upsert_buffer_size: int,
max_concurrent_upserts: int,
) -> tuple[LoaderOrchestrator, list[AggregatedItem], asyncio.Queue[list[models.PointStruct]]]:
"""Wire the staged loader pipeline and return the orchestrator helpers."""

from .pipeline.ingestion import IngestionStage
from .pipeline.enrichment import EnrichmentStage

ingest_queue: IngestQueue = IngestQueue(maxsize=enrichment_workers * 2)
persistence_queue: PersistenceQueue = PersistenceQueue()

global _imdb_retry_queue
if _imdb_retry_queue is None:
_imdb_retry_queue = _IMDbRetryQueue()

upsert_capacity = asyncio.Semaphore(max_concurrent_upserts)
qdrant_retry_queue: asyncio.Queue[list[models.PointStruct]] = asyncio.Queue()
items: list[AggregatedItem] = []
upserted = 0
upsert_start = time.perf_counter()

async def _upsert_aggregated(
batch: Sequence[AggregatedItem],
) -> None:
if not batch:
return
items.extend(batch)
points = [
build_point(item, dense_model_name, sparse_model_name)
for item in batch
]
await _upsert_in_batches(
client,
collection_name,
points,
retry_queue=qdrant_retry_queue,
)

def _record_upsert(worker_id: int, batch_size: int, queue_size: int) -> None:
nonlocal upserted, upsert_start
if upserted == 0:
upsert_start = time.perf_counter()
upserted += batch_size
elapsed = time.perf_counter() - upsert_start
rate = upserted / elapsed if elapsed > 0 else 0.0
logger.info(
"Upsert worker %d processed %d items (%.2f items/sec, queue size=%d)",
worker_id,
upserted,
rate,
queue_size,
)

ingestion_stage = IngestionStage(
plex_server=plex_server,
sample_items=sample_items,
movie_batch_size=plex_chunk_size,
episode_batch_size=plex_chunk_size,
sample_batch_size=enrichment_batch_size,
output_queue=ingest_queue,
completion_sentinel=INGEST_DONE,
)

enrichment_stage = EnrichmentStage(
http_client_factory=lambda: httpx.AsyncClient(timeout=30),
tmdb_api_key=tmdb_api_key or "",
ingest_queue=ingest_queue,
persistence_queue=persistence_queue,
imdb_retry_queue=_imdb_retry_queue,
movie_batch_size=enrichment_batch_size,
episode_batch_size=enrichment_batch_size,
imdb_cache=_imdb_cache,
imdb_max_retries=_imdb_max_retries,
imdb_backoff=_imdb_backoff,
imdb_batch_limit=_imdb_batch_limit,
imdb_requests_per_window=_imdb_requests_per_window,
imdb_window_seconds=_imdb_window_seconds,
)

persistence_stage = _PersistenceStage(
client=client,
collection_name=collection_name,
dense_vector_name=dense_model_name,
sparse_vector_name=sparse_model_name,
persistence_queue=persistence_queue,
retry_queue=qdrant_retry_queue,
upsert_semaphore=upsert_capacity,
upsert_buffer_size=upsert_buffer_size,
upsert_fn=_upsert_aggregated,
on_batch_complete=_record_upsert,
Comment on lines +1387 to +1455

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Treat retry payloads as AggregatedItem batches

The new orchestrator wiring feeds EnrichmentStage’s aggregated items directly into PersistenceStage and converts them to Qdrant points inside _upsert_aggregated. That works for the first attempt, but when _upsert_in_batches fails it enqueues the raw PointStruct batch onto the shared retry queue. On shutdown PersistenceStage._flush_retry_queue() re-enqueues those point batches via enqueue_points, after which the persistence workers call _upsert_aggregated again. _upsert_aggregated always assumes batch contains AggregatedItem instances and passes each element to build_point, so any retried payload will raise attribute errors and the failed points can never be persisted. Any upsert failure therefore permanently breaks the retry path. The upsert helper needs to detect already-built PointStructs or reuse PersistenceStage’s native point-based enqueueing instead.

Useful? React with 👍 / 👎.

)

orchestrator = LoaderOrchestrator(
ingestion_stage=ingestion_stage,
enrichment_stage=enrichment_stage,
persistence_stage=persistence_stage,
ingest_queue=ingest_queue,
persistence_queue=persistence_queue,
persistence_worker_count=max_concurrent_upserts,
)

return orchestrator, items, persistence_stage.retry_queue


def __getattr__(name: str) -> Any:
"""Provide deprecated access to :class:`LoaderOrchestrator`."""

Expand Down