From 0948b55b3a849be9cc2a9b2bd95105e5816ae2a3 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 4 Jun 2025 08:21:41 +0330 Subject: [PATCH 1/2] feat: Implement batch processing for document ingestion in MediawikiETL --- hivemind_etl/mediawiki/etl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hivemind_etl/mediawiki/etl.py b/hivemind_etl/mediawiki/etl.py index f42a6e6..3d8a978 100644 --- a/hivemind_etl/mediawiki/etl.py +++ b/hivemind_etl/mediawiki/etl.py @@ -100,7 +100,11 @@ def load(self, documents: list[Document]) -> None: ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=self.platform_id ) - ingestion_pipeline.run_pipeline(documents) + # batch documents into chunks of 1000 + for i in range(0, len(documents), 1000): + logging.info(f"Loading batch {i} of {len(documents)} documents into Qdrant!") + ingestion_pipeline.run_pipeline(documents[i : i + 1000]) + logging.info(f"Loaded {len(documents)} documents into Qdrant!") if self.delete_dump_after_load: From d3778015aaac3bf061ccce841637369836acc303 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 4 Jun 2025 08:26:50 +0330 Subject: [PATCH 2/2] feat: Parallel loading for each mediaWiki documents batch! --- hivemind_etl/mediawiki/etl.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/hivemind_etl/mediawiki/etl.py b/hivemind_etl/mediawiki/etl.py index 3d8a978..dadc1db 100644 --- a/hivemind_etl/mediawiki/etl.py +++ b/hivemind_etl/mediawiki/etl.py @@ -1,6 +1,7 @@ import logging import os import shutil +from concurrent.futures import ThreadPoolExecutor, as_completed from llama_index.core import Document from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline @@ -100,10 +101,27 @@ def load(self, documents: list[Document]) -> None: ingestion_pipeline = CustomIngestionPipeline( self.community_id, collection_name=self.platform_id ) - # batch documents into chunks of 1000 - for i in range(0, len(documents), 1000): - logging.info(f"Loading batch {i} of {len(documents)} documents into Qdrant!") - ingestion_pipeline.run_pipeline(documents[i : i + 1000]) + + # Process batches in parallel using ThreadPoolExecutor + batch_size = 1000 + batches = [documents[i:i + batch_size] for i in range(0, len(documents), batch_size)] + + with ThreadPoolExecutor() as executor: + # Submit all batch processing tasks + future_to_batch = { + executor.submit(ingestion_pipeline.run_pipeline, batch): i + for i, batch in enumerate(batches) + } + + # Process completed batches and handle any errors + for future in as_completed(future_to_batch): + batch_idx = future_to_batch[future] + try: + future.result() # This will raise any exceptions that occurred + logging.info(f"Successfully loaded batch {batch_idx} of {len(batches)} documents into Qdrant!") + except Exception as e: + logging.error(f"Error processing batch {batch_idx}: {e}") + raise # Re-raise the exception to stop the process logging.info(f"Loaded {len(documents)} documents into Qdrant!")