-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor loader to stream Qdrant upserts #70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the Qdrant loader to use streaming architecture for improved efficiency and reliability. The changes implement asynchronous buffering to handle large media collections without memory spikes and ensure proper ordering of collection setup before ingestion.
- Converts the loader from batch-processing to streaming with configurable upsert buffers
- Moves Qdrant collection and index creation to occur before media processing
- Adds CLI configuration for buffer size with validation and comprehensive test coverage
Reviewed Changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| mcp_plex/loader.py | Core refactor implementing streaming iterator functions, collection setup, buffered upserts, and CLI option |
| tests/test_loader_logging.py | Adds validation tests for buffer size parameter and collection setup ordering |
| tests/test_load_from_plex.py | Updates test to use new streaming iterator function name |
| pyproject.toml | Version bump to 0.26.39 |
| docker/pyproject.deps.toml | Version bump to 0.26.39 |
| AGENTS.md | Documentation update describing the streaming improvements |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
mcp_plex/loader.py
Outdated
| async def _iter_gather_in_batches( | ||
| tasks: Sequence[Awaitable[T]], batch_size: int | ||
| ) -> List[T]: | ||
| """Gather awaitable tasks in fixed-size batches.""" | ||
| ) -> AsyncIterator[T]: | ||
| """Yield results from awaitable tasks in fixed-size batches.""" | ||
|
|
||
| if batch_size <= 0: | ||
| raise ValueError("batch_size must be positive") |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation logic for batch_size is duplicated. Consider extracting this validation into a shared helper function since it's used in both _iter_gather_in_batches and the upsert_buffer_size validation in the run function.
|
|
||
| def _schedule_upsert(batch: List[models.PointStruct]) -> None: | ||
| logger.info( | ||
| "Upserting %d points into Qdrant collection %s in batches of %d", | ||
| len(batch), | ||
| collection_name, | ||
| _qdrant_batch_size, | ||
| ) | ||
| upsert_tasks.append( | ||
| asyncio.create_task( | ||
| _upsert_in_batches( | ||
| client, | ||
| collection_name, | ||
| batch, | ||
| ) | ||
| ) | ||
| ) |
Copilot
AI
Oct 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _schedule_upsert function creates unbounded async tasks without any concurrency control. For large datasets, this could lead to memory issues and resource exhaustion. Consider implementing a semaphore or task pool to limit concurrent upsert operations.
| def _schedule_upsert(batch: List[models.PointStruct]) -> None: | |
| logger.info( | |
| "Upserting %d points into Qdrant collection %s in batches of %d", | |
| len(batch), | |
| collection_name, | |
| _qdrant_batch_size, | |
| ) | |
| upsert_tasks.append( | |
| asyncio.create_task( | |
| _upsert_in_batches( | |
| client, | |
| collection_name, | |
| batch, | |
| ) | |
| ) | |
| ) | |
| upsert_semaphore = asyncio.Semaphore(10) # Limit to 10 concurrent upserts | |
| def _schedule_upsert(batch: List[models.PointStruct]) -> None: | |
| async def upsert_with_semaphore(): | |
| async with upsert_semaphore: | |
| logger.info( | |
| "Upserting %d points into Qdrant collection %s in batches of %d", | |
| len(batch), | |
| collection_name, | |
| _qdrant_batch_size, | |
| ) | |
| await _upsert_in_batches( | |
| client, | |
| collection_name, | |
| batch, | |
| ) | |
| upsert_tasks.append(asyncio.create_task(upsert_with_semaphore())) |
Coverage Report
|
|||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting
| points_buffer: List[models.PointStruct] = [] | ||
| upsert_tasks: list[asyncio.Task[None]] = [] | ||
|
|
||
| def _schedule_upsert(batch: List[models.PointStruct]) -> None: | ||
| logger.info( | ||
| "Upserting %d points into Qdrant collection %s in batches of %d", | ||
| len(batch), | ||
| collection_name, | ||
| _qdrant_batch_size, | ||
| ) | ||
| upsert_tasks.append( | ||
| asyncio.create_task( | ||
| _upsert_in_batches( | ||
| client, | ||
| collection_name, | ||
| batch, | ||
| ) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bound the number of parallel upsert tasks
The new streaming path fires an asyncio.create_task for every buffered batch and only awaits all of them after every item has been processed. On a large Plex library this schedules one _upsert_in_batches task per upsert_buffer_size items, so a 50 000 item load with the default buffer of 200 will launch 250 concurrent tasks all hitting Qdrant at once. The previous implementation performed these upserts sequentially. This unbounded concurrency can exhaust connections or overwhelm Qdrant, causing timeouts and partial loads. Consider serialising the upserts or using a semaphore/queue to cap the number of in‑flight tasks.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 6 out of 7 changed files in this pull request and generated no new comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Summary
Testing
https://chatgpt.com/codex/tasks/task_e_68e0ec5f4b34832899897c764721ef16