Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hivemind_etl/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from hivemind_etl.simple_ingestion.pipeline import (
process_document,
process_documents_batch,
)

from temporalio import activity
Expand Down
187 changes: 187 additions & 0 deletions hivemind_etl/simple_ingestion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Simple Ingestion Workflows

This module provides Temporal workflows for ingesting documents into the vector database (Qdrant) using the TC Hivemind backend.

## Available Workflows

### 1. VectorIngestionWorkflow

A workflow for processing single document ingestion requests.

**Usage:**
```python
from hivemind_etl.simple_ingestion.schema import IngestionRequest
from temporalio.client import Client

# Create single ingestion request
request = IngestionRequest(
communityId="my_community",
platformId="my_platform",
text="Document content here...",
metadata={
"title": "Document Title",
"author": "Author Name"
}
)

# Execute workflow
client = await Client.connect("localhost:7233")
await client.execute_workflow(
"VectorIngestionWorkflow",
request,
id="single-ingestion-123",
task_queue="hivemind-etl"
)
```

### 2. BatchVectorIngestionWorkflow

A workflow for processing multiple document ingestion requests in parallel batches for improved efficiency.

**Key Features:**
- **Automatic Chunking**: Large batches are automatically split into smaller parallel chunks
- **Parallel Processing**: Multiple `process_documents_batch` activities run simultaneously
- **Configurable Batch Size**: Control the size of each processing chunk (default: 10 documents)
- **Same Collection**: All documents in a batch request must belong to the same community and collection
- **Error Handling**: Same retry policy as single document workflow but with longer timeout for batch processing

**Usage:**
```python
from hivemind_etl.simple_ingestion.schema import BatchIngestionRequest, BatchDocument
from temporalio.client import Client

# Create batch ingestion request
batch_request = BatchIngestionRequest(
communityId="my_community",
platformId="my_platform",
collectionName="optional_custom_collection", # Optional
document=[
BatchDocument(
docId="doc_1",
text="First document content...",
metadata={"title": "Document 1"},
excludedEmbedMetadataKeys=["some_key"],
excludedLlmMetadataKeys=["other_key"]
),
BatchDocument(
docId="doc_2",
text="Second document content...",
metadata={"title": "Document 2"}
),
# ... more documents
]
)

# Execute batch workflow
client = await Client.connect("localhost:7233")
await client.execute_workflow(
"BatchVectorIngestionWorkflow",
batch_request,
10, # batch_size: optional, default is 10
id="batch-ingestion-123",
task_queue="hivemind-etl"
)
```

## Schema Reference

### IngestionRequest (Single Document)

```python
class IngestionRequest(BaseModel):
communityId: str # Community identifier
platformId: str # Platform identifier
text: str # Document text content
metadata: dict # Document metadata
docId: str = str(uuid4()) # Unique document ID (auto-generated)
excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding
excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing
collectionName: str | None = None # Optional custom collection name
```

### BatchIngestionRequest (Multiple Documents)

```python
class BatchIngestionRequest(BaseModel):
communityId: str # Community identifier
platformId: str # Platform identifier
collectionName: str | None = None # Optional custom collection name
document: list[BatchDocument] # List of documents to process

class BatchDocument(BaseModel):
docId: str # Unique document ID
text: str # Document text content
metadata: dict # Document metadata
excludedEmbedMetadataKeys: list[str] = [] # Keys to exclude from embedding
excludedLlmMetadataKeys: list[str] = [] # Keys to exclude from LLM processing
```

## Collection Naming

- **Default**: `{communityId}_{platformId}`
- **Custom**: `{communityId}_{collectionName}` (when `collectionName` is provided)

The collection name reconstruction is handled automatically by the `CustomIngestionPipeline`.

## Performance Considerations

### When to Use Batch vs Single Workflows

**Use BatchVectorIngestionWorkflow when:**
- Processing multiple documents from the same community/collection
- Bulk importing large datasets
- You have 10+ documents to process together
- You want to maximize throughput with parallel processing

**Use VectorIngestionWorkflow when:**
- Processing single documents in real-time
- Documents arrive individually
- You need immediate processing
- Simple use cases with occasional documents

### Batch Processing Optimizations

The batch workflow automatically optimizes performance by:

1. **Parallel Chunking**: Large batches are split into smaller chunks that process simultaneously
2. **Configurable Batch Size**: Tune chunk size based on your system resources (default: 10)
3. **Pipeline Reuse**: One `CustomIngestionPipeline` instance per chunk
4. **Bulk Operations**: All documents in a chunk are processed together
5. **Concurrent Execution**: Multiple chunks can run in parallel using asyncio.gather()

## Error Handling

Both workflows implement the same retry policy:
- **Initial retry interval**: 1 second
- **Maximum retry interval**: 1 minute
- **Maximum attempts**: 3
- **Timeout**: 5 minutes (single), 10 minutes (batch)

## Testing

Use the provided test script to verify functionality:

```bash
python test_batch_workflow.py
```

The test script demonstrates:
- Batch processing with multiple documents
- Mixed collection handling
- Comparison between single and batch workflows

## Integration

Both workflows are automatically registered in the Temporal worker through `registry.py`. Ensure your worker includes:

```python
from registry import WORKFLOWS, ACTIVITIES

# Worker setup includes both workflows and activities
worker = Worker(
client=client,
task_queue="hivemind-etl",
workflows=WORKFLOWS,
activities=ACTIVITIES
)
```
119 changes: 118 additions & 1 deletion hivemind_etl/simple_ingestion/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import asyncio
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.common import RetryPolicy
from temporalio.workflow import execute_activity
from .schema import IngestionRequest
from .schema import IngestionRequest, BatchIngestionRequest, BatchDocument

with workflow.unsafe.imports_passed_through():
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
from llama_index.core import Document


class BatchChunk(BatchIngestionRequest):
"""A smaller chunk of a BatchIngestionRequest for parallel processing."""
pass


@workflow.defn
class VectorIngestionWorkflow:
"""A Temporal workflow for processing document ingestion requests.
Expand Down Expand Up @@ -50,6 +56,69 @@ async def run(self, ingestion_request: IngestionRequest) -> None:
)


@workflow.defn
class BatchVectorIngestionWorkflow:
"""A Temporal workflow for processing batch document ingestion requests.

This workflow handles the orchestration of batch document processing activities,
including retry logic and timeout configurations for multiple documents.
"""

@workflow.run
async def run(self, ingestion_requests: BatchIngestionRequest) -> None:
"""Execute the batch ingestion workflow.

Parameters
----------
ingestion_requests : BatchIngestionRequest
The batch request containing all necessary information for document processing,
including community ID, platform ID, text content, and metadata for each document.

Notes
-----
The workflow splits documents into smaller batches and processes them in parallel.
Each batch implements a retry policy with the following configuration:
- Initial retry interval: 1 second
- Maximum retry interval: 1 minute
- Maximum retry attempts: 3
- Activity timeout: 10 minutes
"""
batch_size: int = 10
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(minutes=1),
maximum_attempts=3,
)

# Split documents into smaller batches
document_chunks = []
for i in range(0, len(ingestion_requests.document), batch_size):
chunk_documents = ingestion_requests.document[i:i + batch_size]

# Create a BatchChunk for this subset of documents
batch_chunk = BatchChunk(
communityId=ingestion_requests.communityId,
platformId=ingestion_requests.platformId,
collectionName=ingestion_requests.collectionName,
document=chunk_documents
)
document_chunks.append(batch_chunk)

# Process all chunks in parallel
batch_activities = []
for i, chunk in enumerate(document_chunks):
activity_task = workflow.execute_activity(
process_documents_batch,
chunk,
retry_policy=retry_policy,
start_to_close_timeout=timedelta(minutes=10),
)
batch_activities.append(activity_task)

# Wait for all batches to complete
await asyncio.gather(*batch_activities)


@activity.defn
async def process_document(
ingestion_request: IngestionRequest,
Expand Down Expand Up @@ -84,6 +153,54 @@ async def process_document(
doc_id=ingestion_request.docId,
text=ingestion_request.text,
metadata=ingestion_request.metadata,
excluded_embed_metadata_keys=ingestion_request.excludedEmbedMetadataKeys,
excluded_llm_metadata_keys=ingestion_request.excludedLlmMetadataKeys,
)

pipeline.run_pipeline(docs=[document])


@activity.defn
async def process_documents_batch(
batch_chunk: BatchChunk,
) -> None:
"""Process a batch chunk of documents according to the ingestion request specifications.

Parameters
----------
batch_chunk : BatchChunk
A chunk containing a subset of documents from the original batch request,
including community ID, platform ID, text content, and metadata for each document.

Notes
-----
This activity processes a subset of documents from the larger batch,
allowing for parallel processing and better resource management.
"""
if batch_chunk.collectionName is None:
collection_name = batch_chunk.platformId
else:
collection_name = batch_chunk.collectionName

# Initialize the ingestion pipeline
# the collection name will be reconstructed in `CustomIngestionPipeline`
# in the format of `[communityId]_[collection_name]`
pipeline = CustomIngestionPipeline(
community_id=batch_chunk.communityId,
collection_name=collection_name,
)

# Convert all documents in this chunk to Document objects
documents = []
for doc in batch_chunk.document:
document = Document(
doc_id=doc.docId,
text=doc.text,
metadata=doc.metadata,
excluded_embed_metadata_keys=doc.excludedEmbedMetadataKeys,
excluded_llm_metadata_keys=doc.excludedLlmMetadataKeys,
)
documents.append(document)

# Process all documents in this chunk as a batch
pipeline.run_pipeline(docs=documents)
24 changes: 24 additions & 0 deletions hivemind_etl/simple_ingestion/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,27 @@ class IngestionRequest(BaseModel):
excludedEmbedMetadataKeys: list[str] = []
excludedLlmMetadataKeys: list[str] = []
collectionName: str | None = None

class BatchDocument(BaseModel):
"""A model representing a document for batch ingestion.

"""
docId: str
text: str
metadata: dict
excludedEmbedMetadataKeys: list[str] = []
excludedLlmMetadataKeys: list[str] = []


class BatchIngestionRequest(BaseModel):
"""A model representing a batch of ingestion requests for document processing.

Parameters
----------
ingestion_requests : list[IngestionRequest]
A list of ingestion requests.
"""
communityId: str
platformId: str
collectionName: str | None = None
document: list[BatchDocument]
Loading