-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add batch processing capabilities with BatchVectorIngestionWork… #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…flow and related activities! - Introduced BatchVectorIngestionWorkflow for processing multiple document ingestion requests in parallel. - Added process_documents_batch activity for handling batch document processing. - Updated schema to include BatchIngestionRequest and BatchDocument models. - Enhanced README with usage examples and performance considerations for batch processing.
WalkthroughThese changes introduce batch ingestion capabilities for document processing in the hivemind_etl module. New schema models, activities, and a Temporal workflow enable parallel processing of multiple documents in chunks. The registry and documentation are updated to support and describe both single and batch ingestion workflows, with batch operations now fully integrated. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant TemporalWorker
participant BatchVectorIngestionWorkflow
participant process_documents_batch
participant CustomIngestionPipeline
Client->>TemporalWorker: Start BatchVectorIngestionWorkflow(batchRequest)
TemporalWorker->>BatchVectorIngestionWorkflow: Run(batchRequest)
loop For each chunk in batchRequest
BatchVectorIngestionWorkflow->>process_documents_batch: Process(chunk)
process_documents_batch->>CustomIngestionPipeline: Ingest documents in chunk
CustomIngestionPipeline-->>process_documents_batch: Return
end
BatchVectorIngestionWorkflow-->>TemporalWorker: Done
TemporalWorker-->>Client: Workflow complete
Possibly related PRs
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
hivemind_etl/simple_ingestion/schema.py (2)
41-49: Complete the docstring with parameter descriptions.The
BatchDocumentdocstring is incomplete and doesn't follow the same format asIngestionRequest. Add parameter descriptions for consistency.class BatchDocument(BaseModel): - """A model representing a document for batch ingestion. - - """ + """A model representing a document for batch ingestion. + + Parameters + ---------- + docId : str + Unique identifier for the document. + text : str + The text content to be processed. + metadata : dict + Additional metadata associated with the document. + excludedEmbedMetadataKeys : list[str], optional + List of metadata keys to exclude from embedding process. + Default is an empty list. + excludedLlmMetadataKeys : list[str], optional + List of metadata keys to exclude from LLM processing. + Default is an empty list. + """
52-63: Fix the docstring parameter description.The docstring incorrectly describes the parameter as
ingestion_requests : list[IngestionRequest], but the actual field isdocument: list[BatchDocument].class BatchIngestionRequest(BaseModel): """A model representing a batch of ingestion requests for document processing. Parameters ---------- - ingestion_requests : list[IngestionRequest] - A list of ingestion requests. + communityId : str + The unique identifier of the community. + platformId : str + The unique identifier of the platform. + collectionName : str | None, optional + The name of the collection to use for the documents. + Default is `None` means it would follow the default pattern of `[communityId]_[platformId]` + document : list[BatchDocument] + A list of batch documents to process. """hivemind_etl/simple_ingestion/README.md (1)
75-84: Fix the batch workflow usage example.The usage example incorrectly suggests that
batch_sizeis a parameter passed to the workflow execution. Based on the workflow implementation,batch_sizeis hardcoded as 10 within the workflow and is not configurable via parameters.# Execute batch workflow client = await Client.connect("localhost:7233") await client.execute_workflow( "BatchVectorIngestionWorkflow", batch_request, - 10, # batch_size: optional, default is 10 id="batch-ingestion-123", task_queue="hivemind-etl" )hivemind_etl/simple_ingestion/pipeline.py (2)
86-86: Consider making batch_size configurable.The batch size is currently hardcoded to 10. Consider making it configurable through the workflow input or environment variables for better flexibility.
109-109: Replace unused loop variable with underscore.The loop variable
iis not used within the loop body.Apply this diff:
- for i, chunk in enumerate(document_chunks): + for chunk in document_chunks:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
hivemind_etl/activities.py(1 hunks)hivemind_etl/simple_ingestion/README.md(1 hunks)hivemind_etl/simple_ingestion/pipeline.py(3 hunks)hivemind_etl/simple_ingestion/schema.py(1 hunks)registry.py(4 hunks)workflows.py(1 hunks)
🧰 Additional context used
🧠 Learnings (5)
hivemind_etl/activities.py (1)
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#1
File: hivemind_etl/activities.py:69-71
Timestamp: 2024-11-25T12:06:15.391Z
Learning: The `say_hello` function in `hivemind_etl/activities.py` is an example and does not require documentation.
workflows.py (3)
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#36
File: workflows.py:13-15
Timestamp: 2025-05-08T06:37:34.094Z
Learning: In the temporal-worker-python project, workflows are defined in domain-specific modules, imported into the root-level workflows.py file, and then imported from workflows.py into registry.py where they're registered in the WORKFLOWS list. Therefore, imports in workflows.py are necessary even if they appear unused within that file itself.
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#1
File: test_run_workflow.py:0-0
Timestamp: 2024-11-25T11:49:42.951Z
Learning: The file `test_run_workflow.py` is used for testing, and code change suggestions are not required for this file.
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#39
File: hivemind_summarizer/activities.py:97-100
Timestamp: 2025-05-12T11:24:54.953Z
Learning: In the temporal-worker-python project, `CustomIngestionPipeline` expects collection names WITHOUT the community_id prefix (e.g., `{platform_id}_summary`), while direct Qdrant queries use collection names WITH the community_id prefix (e.g., `{community_id}_{platform_id}_summary`).
registry.py (1)
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#36
File: workflows.py:13-15
Timestamp: 2025-05-08T06:37:34.094Z
Learning: In the temporal-worker-python project, workflows are defined in domain-specific modules, imported into the root-level workflows.py file, and then imported from workflows.py into registry.py where they're registered in the WORKFLOWS list. Therefore, imports in workflows.py are necessary even if they appear unused within that file itself.
hivemind_etl/simple_ingestion/README.md (2)
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#39
File: hivemind_summarizer/activities.py:97-100
Timestamp: 2025-05-12T11:24:54.953Z
Learning: In the temporal-worker-python project, `CustomIngestionPipeline` expects collection names WITHOUT the community_id prefix (e.g., `{platform_id}_summary`), while direct Qdrant queries use collection names WITH the community_id prefix (e.g., `{community_id}_{platform_id}_summary`).
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#36
File: workflows.py:13-15
Timestamp: 2025-05-08T06:37:34.094Z
Learning: In the temporal-worker-python project, workflows are defined in domain-specific modules, imported into the root-level workflows.py file, and then imported from workflows.py into registry.py where they're registered in the WORKFLOWS list. Therefore, imports in workflows.py are necessary even if they appear unused within that file itself.
hivemind_etl/simple_ingestion/pipeline.py (2)
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#39
File: hivemind_summarizer/activities.py:97-100
Timestamp: 2025-05-12T11:24:54.953Z
Learning: In the temporal-worker-python project, `CustomIngestionPipeline` expects collection names WITHOUT the community_id prefix (e.g., `{platform_id}_summary`), while direct Qdrant queries use collection names WITH the community_id prefix (e.g., `{community_id}_{platform_id}_summary`).
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#30
File: hivemind_summarizer/schema.py:10-14
Timestamp: 2025-04-21T10:12:30.711Z
Learning: For the Telegram summaries feature in the temporal-worker-python project, the user plans to add date validation to ensure end_date is after start_date and dates are in the correct format as a future improvement. This validation would be added to the TelegramSummariesRangeActivityInput class.
🧬 Code Graph Analysis (1)
workflows.py (1)
hivemind_etl/simple_ingestion/pipeline.py (1)
BatchVectorIngestionWorkflow(60-119)
🪛 Ruff (0.12.2)
hivemind_etl/activities.py
18-18: hivemind_etl.simple_ingestion.pipeline.process_documents_batch imported but unused
Remove unused import
(F401)
workflows.py
18-18: hivemind_etl.simple_ingestion.pipeline.BatchVectorIngestionWorkflow imported but unused
Remove unused import
(F401)
hivemind_etl/simple_ingestion/pipeline.py
7-7: .schema.BatchDocument imported but unused
Remove unused import: .schema.BatchDocument
(F401)
109-109: Loop control variable i not used within loop body
(B007)
🔇 Additional comments (11)
hivemind_etl/activities.py (1)
16-19: Import correctly added for batch processing activity.The
process_documents_batchimport follows the established pattern for activities that are imported here and then registered inregistry.py. The static analysis warning can be ignored as this is the expected architecture.workflows.py (1)
16-19: Import correctly added for batch processing workflow.The
BatchVectorIngestionWorkflowimport follows the established pattern for workflows that are imported here and then registered inregistry.py. The static analysis warning can be ignored as this is the expected architecture.registry.py (4)
12-12: Batch activity correctly registered.The
process_documents_batchactivity is properly imported and will be registered in the ACTIVITIES list, enabling it for use in the Temporal worker.
28-28: Batch workflow correctly registered.The
BatchVectorIngestionWorkflowworkflow is properly imported and will be registered in the WORKFLOWS list, enabling it for use in the Temporal worker.
41-41: Batch workflow correctly added to registry.The
BatchVectorIngestionWorkflowis properly added to the WORKFLOWS list, completing the registration process.
58-58: Batch activity correctly added to registry.The
process_documents_batchactivity is properly added to the ACTIVITIES list, completing the registration process.hivemind_etl/simple_ingestion/README.md (1)
1-187: Comprehensive documentation for batch processing workflows.The README provides excellent documentation for both single and batch ingestion workflows, including:
- Clear usage examples with code snippets
- Detailed schema reference
- Performance considerations for choosing between workflows
- Error handling and retry policies
- Integration instructions
This documentation will be very helpful for developers using the batch processing capabilities.
hivemind_etl/simple_ingestion/pipeline.py (4)
7-7: SkipWhile static analysis indicates
BatchDocumentis unused, it's actually part of the type definition forBatchIngestionRequest.documentlist items and is correctly imported.
14-17: LGTM!Clean implementation of the chunk class that properly inherits from
BatchIngestionRequest.
156-158: LGTM!The activity correctly handles the new excluded metadata keys from the ingestion request.
163-206: Well-implemented batch processing activity!The activity correctly processes multiple documents in a single pipeline run, which is more efficient than processing them individually. The implementation properly handles the conversion from
BatchDocumenttoDocumentobjects.
…flow and related activities!
Summary by CodeRabbit
New Features
Documentation
Chores