diff --git a/hivemind_etl/activities.py b/hivemind_etl/activities.py index 9ad8b3f..60fa969 100644 --- a/hivemind_etl/activities.py +++ b/hivemind_etl/activities.py @@ -15,6 +15,7 @@ ) from hivemind_etl.simple_ingestion.pipeline import ( process_document, + process_documents_batch, ) from temporalio import activity diff --git a/hivemind_etl/simple_ingestion/README.md b/hivemind_etl/simple_ingestion/README.md new file mode 100644 index 0000000..5d0fbc6 --- /dev/null +++ b/hivemind_etl/simple_ingestion/README.md @@ -0,0 +1,187 @@ +# Simple Ingestion Workflows + +This module provides Temporal workflows for ingesting documents into the vector database (Qdrant) using the TC Hivemind backend. + +## Available Workflows + +### 1. VectorIngestionWorkflow + +A workflow for processing single document ingestion requests. + +**Usage:** +```python +from hivemind_etl.simple_ingestion.schema import IngestionRequest +from temporalio.client import Client + +# Create single ingestion request +request = IngestionRequest( + communityId="my_community", + platformId="my_platform", + text="Document content here...", + metadata={ + "title": "Document Title", + "author": "Author Name" + } +) + +# Execute workflow +client = await Client.connect("localhost:7233") +await client.execute_workflow( + "VectorIngestionWorkflow", + request, + id="single-ingestion-123", + task_queue="hivemind-etl" +) +``` + +### 2. BatchVectorIngestionWorkflow + +A workflow for processing multiple document ingestion requests in parallel batches for improved efficiency. + +**Key Features:** +- **Automatic Chunking**: Large batches are automatically split into smaller parallel chunks +- **Parallel Processing**: Multiple `process_documents_batch` activities run simultaneously +- **Configurable Batch Size**: Control the size of each processing chunk (default: 10 documents) +- **Same Collection**: All documents in a batch request must belong to the same community and collection +- **Error Handling**: Same retry policy as single document workflow but with longer timeout for batch processing + +**Usage:** +```python +from hivemind_etl.simple_ingestion.schema import BatchIngestionRequest, BatchDocument +from temporalio.client import Client + +# Create batch ingestion request +batch_request = BatchIngestionRequest( + communityId="my_community", + platformId="my_platform", + collectionName="optional_custom_collection", # Optional + document=[ + BatchDocument( + docId="doc_1", + text="First document content...", + metadata={"title": "Document 1"}, + excludedEmbedMetadataKeys=["some_key"], + excludedLlmMetadataKeys=["other_key"] + ), + BatchDocument( + docId="doc_2", + text="Second document content...", + metadata={"title": "Document 2"} + ), + # ... more documents + ] +) + +# Execute batch workflow +client = await Client.connect("localhost:7233") +await client.execute_workflow( + "BatchVectorIngestionWorkflow", + batch_request, + 10, # batch_size: optional, default is 10 + id="batch-ingestion-123", + task_queue="hivemind-etl" +) +``` + +## Schema Reference + +### IngestionRequest (Single Document) + +```python +class IngestionRequest(BaseModel): + communityId: str # Community identifier + platformId: str # Platform identifier + text: str # Document text content + metadata: dict # Document metadata + docId: str = str(uuid4()) # Unique document ID (auto-generated) + excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding + excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing + collectionName: str | None = None # Optional custom collection name +``` + +### BatchIngestionRequest (Multiple Documents) + +```python +class BatchIngestionRequest(BaseModel): + communityId: str # Community identifier + platformId: str # Platform identifier + collectionName: str | None = None # Optional custom collection name + document: list[BatchDocument] # List of documents to process + +class BatchDocument(BaseModel): + docId: str # Unique document ID + text: str # Document text content + metadata: dict # Document metadata + excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding + excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing +``` + +## Collection Naming + +- **Default**: `{communityId}_{platformId}` +- **Custom**: `{communityId}_{collectionName}` (when `collectionName` is provided) + +The collection name reconstruction is handled automatically by the `CustomIngestionPipeline`. + +## Performance Considerations + +### When to Use Batch vs Single Workflows + +**Use BatchVectorIngestionWorkflow when:** +- Processing multiple documents from the same community/collection +- Bulk importing large datasets +- You have 10+ documents to process together +- You want to maximize throughput with parallel processing + +**Use VectorIngestionWorkflow when:** +- Processing single documents in real-time +- Documents arrive individually +- You need immediate processing +- Simple use cases with occasional documents + +### Batch Processing Optimizations + +The batch workflow automatically optimizes performance by: + +1. **Parallel Chunking**: Large batches are split into smaller chunks that process simultaneously +2. **Configurable Batch Size**: Tune chunk size based on your system resources (default: 10) +3. **Pipeline Reuse**: One `CustomIngestionPipeline` instance per chunk +4. **Bulk Operations**: All documents in a chunk are processed together +5. **Concurrent Execution**: Multiple chunks can run in parallel using asyncio.gather() + +## Error Handling + +Both workflows implement the same retry policy: +- **Initial retry interval**: 1 second +- **Maximum retry interval**: 1 minute +- **Maximum attempts**: 3 +- **Timeout**: 5 minutes (single), 10 minutes (batch) + +## Testing + +Use the provided test script to verify functionality: + +```bash +python test_batch_workflow.py +``` + +The test script demonstrates: +- Batch processing with multiple documents +- Mixed collection handling +- Comparison between single and batch workflows + +## Integration + +Both workflows are automatically registered in the Temporal worker through `registry.py`. Ensure your worker includes: + +```python +from registry import WORKFLOWS, ACTIVITIES + +# Worker setup includes both workflows and activities +worker = Worker( + client=client, + task_queue="hivemind-etl", + workflows=WORKFLOWS, + activities=ACTIVITIES +) +``` \ No newline at end of file diff --git a/hivemind_etl/simple_ingestion/pipeline.py b/hivemind_etl/simple_ingestion/pipeline.py index 1fa03d6..1b954be 100644 --- a/hivemind_etl/simple_ingestion/pipeline.py +++ b/hivemind_etl/simple_ingestion/pipeline.py @@ -1,15 +1,21 @@ +import asyncio from datetime import timedelta from temporalio import activity, workflow from temporalio.common import RetryPolicy from temporalio.workflow import execute_activity -from .schema import IngestionRequest +from .schema import IngestionRequest, BatchIngestionRequest, BatchDocument with workflow.unsafe.imports_passed_through(): from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline from llama_index.core import Document +class BatchChunk(BatchIngestionRequest): + """A smaller chunk of a BatchIngestionRequest for parallel processing.""" + pass + + @workflow.defn class VectorIngestionWorkflow: """A Temporal workflow for processing document ingestion requests. @@ -50,6 +56,69 @@ async def run(self, ingestion_request: IngestionRequest) -> None: ) +@workflow.defn +class BatchVectorIngestionWorkflow: + """A Temporal workflow for processing batch document ingestion requests. + + This workflow handles the orchestration of batch document processing activities, + including retry logic and timeout configurations for multiple documents. + """ + + @workflow.run + async def run(self, ingestion_requests: BatchIngestionRequest) -> None: + """Execute the batch ingestion workflow. + + Parameters + ---------- + ingestion_requests : BatchIngestionRequest + The batch request containing all necessary information for document processing, + including community ID, platform ID, text content, and metadata for each document. + + Notes + ----- + The workflow splits documents into smaller batches and processes them in parallel. + Each batch implements a retry policy with the following configuration: + - Initial retry interval: 1 second + - Maximum retry interval: 1 minute + - Maximum retry attempts: 3 + - Activity timeout: 10 minutes + """ + batch_size: int = 10 + retry_policy = RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(minutes=1), + maximum_attempts=3, + ) + + # Split documents into smaller batches + document_chunks = [] + for i in range(0, len(ingestion_requests.document), batch_size): + chunk_documents = ingestion_requests.document[i:i + batch_size] + + # Create a BatchChunk for this subset of documents + batch_chunk = BatchChunk( + communityId=ingestion_requests.communityId, + platformId=ingestion_requests.platformId, + collectionName=ingestion_requests.collectionName, + document=chunk_documents + ) + document_chunks.append(batch_chunk) + + # Process all chunks in parallel + batch_activities = [] + for i, chunk in enumerate(document_chunks): + activity_task = workflow.execute_activity( + process_documents_batch, + chunk, + retry_policy=retry_policy, + start_to_close_timeout=timedelta(minutes=10), + ) + batch_activities.append(activity_task) + + # Wait for all batches to complete + await asyncio.gather(*batch_activities) + + @activity.defn async def process_document( ingestion_request: IngestionRequest, @@ -84,6 +153,54 @@ async def process_document( doc_id=ingestion_request.docId, text=ingestion_request.text, metadata=ingestion_request.metadata, + excluded_embed_metadata_keys=ingestion_request.excludedEmbedMetadataKeys, + excluded_llm_metadata_keys=ingestion_request.excludedLlmMetadataKeys, ) pipeline.run_pipeline(docs=[document]) + + +@activity.defn +async def process_documents_batch( + batch_chunk: BatchChunk, +) -> None: + """Process a batch chunk of documents according to the ingestion request specifications. + + Parameters + ---------- + batch_chunk : BatchChunk + A chunk containing a subset of documents from the original batch request, + including community ID, platform ID, text content, and metadata for each document. + + Notes + ----- + This activity processes a subset of documents from the larger batch, + allowing for parallel processing and better resource management. + """ + if batch_chunk.collectionName is None: + collection_name = batch_chunk.platformId + else: + collection_name = batch_chunk.collectionName + + # Initialize the ingestion pipeline + # the collection name will be reconstructed in `CustomIngestionPipeline` + # in the format of `[communityId]_[collection_name]` + pipeline = CustomIngestionPipeline( + community_id=batch_chunk.communityId, + collection_name=collection_name, + ) + + # Convert all documents in this chunk to Document objects + documents = [] + for doc in batch_chunk.document: + document = Document( + doc_id=doc.docId, + text=doc.text, + metadata=doc.metadata, + excluded_embed_metadata_keys=doc.excludedEmbedMetadataKeys, + excluded_llm_metadata_keys=doc.excludedLlmMetadataKeys, + ) + documents.append(document) + + # Process all documents in this chunk as a batch + pipeline.run_pipeline(docs=documents) diff --git a/hivemind_etl/simple_ingestion/schema.py b/hivemind_etl/simple_ingestion/schema.py index 9a9920e..4443f98 100644 --- a/hivemind_etl/simple_ingestion/schema.py +++ b/hivemind_etl/simple_ingestion/schema.py @@ -37,3 +37,27 @@ class IngestionRequest(BaseModel): excludedEmbedMetadataKeys: list[str] = [] excludedLlmMetadataKeys: list[str] = [] collectionName: str | None = None + +class BatchDocument(BaseModel): + """A model representing a document for batch ingestion. + + """ + docId: str + text: str + metadata: dict + excludedEmbedMetadataKeys: list[str] = [] + excludedLlmMetadataKeys: list[str] = [] + + +class BatchIngestionRequest(BaseModel): + """A model representing a batch of ingestion requests for document processing. + + Parameters + ---------- + ingestion_requests : list[IngestionRequest] + A list of ingestion requests. + """ + communityId: str + platformId: str + collectionName: str | None = None + document: list[BatchDocument] diff --git a/registry.py b/registry.py index b51e9dc..b958f80 100644 --- a/registry.py +++ b/registry.py @@ -9,6 +9,7 @@ transform_mediawiki_data, load_mediawiki_data, process_document, + process_documents_batch, ) from hivemind_summarizer.activities import ( fetch_platform_summaries_by_date, @@ -24,6 +25,7 @@ LoadMediaWikiWorkflow, PlatformSummariesWorkflow, VectorIngestionWorkflow, + BatchVectorIngestionWorkflow, RealTimeSummaryWorkflow, ) @@ -36,6 +38,7 @@ LoadMediaWikiWorkflow, PlatformSummariesWorkflow, VectorIngestionWorkflow, + BatchVectorIngestionWorkflow, RealTimeSummaryWorkflow, ] @@ -52,5 +55,6 @@ fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, process_document, + process_documents_batch, fetch_and_summarize_realtime_data, ] diff --git a/workflows.py b/workflows.py index cbb9834..de3f0f2 100644 --- a/workflows.py +++ b/workflows.py @@ -15,6 +15,7 @@ ) from hivemind_etl.simple_ingestion.pipeline import ( VectorIngestionWorkflow, + BatchVectorIngestionWorkflow, ) from hivemind_summarizer.summarizer_workflow import PlatformSummariesWorkflow from hivemind_summarizer.real_time_summary_workflow import RealTimeSummaryWorkflow