diff --git a/hivemind_etl/mediawiki/etl.py b/hivemind_etl/mediawiki/etl.py index f42a6e6..dadc1db 100644 --- a/hivemind_etl/mediawiki/etl.py +++ b/hivemind_etl/mediawiki/etl.py @@ -1,6 +1,7 @@ import logging import os import shutil +from concurrent.futures import ThreadPoolExecutor, as_completed from llama_index.core import Document from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline @@ -100,7 +101,28 @@ def load(self, documents: list[Document]) -> None: ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=self.platform_id ) - ingestion_pipeline.run_pipeline(documents) + + # Process batches in parallel using ThreadPoolExecutor + batch_size = 1000 + batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)] + + with ThreadPoolExecutor() as executor: + # Submit all batch processing tasks + future_to_batch = { + executor.submit(ingestion_pipeline.run_pipeline, batch): i + for i, batch in enumerate(batches) + } + + # Process completed batches and handle any errors + for future in as_completed(future_to_batch): + batch_idx = future_to_batch[future] + try: + future.result() # This will raise any exceptions that occurred + logging.info(f"Successfully loaded batch {batch_idx} of {len(batches)} documents into Qdrant!") + except Exception as e: + logging.error(f"Error processing batch {batch_idx}: {e}") + raise # Re-raise the exception to stop the process + logging.info(f"Loaded {len(documents)} documents into Qdrant!") if self.delete_dump_after_load: