From 15d11f40fd3b78b06782bb0d794a0a74a9000595 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Thu, 23 May 2024 18:16:20 +0530 Subject: [PATCH 01/13] Exception handling for Prompt Service --- src/unstract/sdk/__init__.py | 2 +- src/unstract/sdk/prompt.py | 46 ++++++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/unstract/sdk/__init__.py b/src/unstract/sdk/__init__.py index a0ae66de..610a2805 100644 --- a/src/unstract/sdk/__init__.py +++ b/src/unstract/sdk/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.27.0" +__version__ = "0.28.0" def get_sdk_version(): diff --git a/src/unstract/sdk/prompt.py b/src/unstract/sdk/prompt.py index a03d8915..1cf3e04b 100644 --- a/src/unstract/sdk/prompt.py +++ b/src/unstract/sdk/prompt.py @@ -1,7 +1,14 @@ from typing import Any, Optional import requests -from requests import RequestException, Response +from requests import ( + ConnectionError, + HTTPError, + RequestException, + Response, + Timeout, + TooManyRedirects, +) from unstract.sdk.constants import LogLevel, PromptStudioKeys, ToolEnv from unstract.sdk.helper import SdkHelper @@ -25,9 +32,7 @@ def __init__( """ self.tool = tool - self.base_url = SdkHelper.get_platform_base_url( - prompt_host, prompt_port - ) + self.base_url = SdkHelper.get_platform_base_url(prompt_host, prompt_port) self.bearer_token = tool.get_env_or_die(ToolEnv.PLATFORM_API_KEY) def answer_prompt(self, payload: dict[str, Any]) -> dict[str, Any]: @@ -36,9 +41,7 @@ def answer_prompt(self, payload: dict[str, Any]) -> dict[str, Any]: def single_pass_extraction(self, payload: dict[str, Any]) -> dict[str, Any]: return self._post_call("single-pass-extraction", payload) - def _post_call( - self, url_path: str, payload: dict[str, Any] - ) -> dict[str, Any]: + def _post_call(self, url_path: str, payload: dict[str, Any]) -> dict[str, Any]: """Invokes and communicates to prompt service to fetch response for the prompt. @@ -63,17 +66,26 @@ def _post_call( "structure_output": "", } url: str = f"{self.base_url}/{url_path}" - headers: dict[str, str] = { - "Authorization": f"Bearer {self.bearer_token}" - } + headers: dict[str, str] = {"Authorization": f"Bearer {self.bearer_token}"} + response: Response = Response() try: # TODO: Review timeout value - response: Response = requests.post( - url, json=payload, headers=headers, timeout=600 - ) + response = requests.post(url, json=payload, headers=headers, timeout=600) response.raise_for_status() result["status"] = "OK" result["structure_output"] = response.text + except ConnectionError as connect_err: + msg = "Unable to connect to prompt service. Please contact admin." + result["error"] = self._stringify_and_stream_err(connect_err, msg) + except Timeout as time_out: + msg = "Request to run prompt has timed out" + result["error"] = self._stringify_and_stream_err(time_out, msg) + except TooManyRedirects as too_many_redirects: + msg = "Too many redirects while connecting to prompt service." + result["error"] = self._stringify_and_stream_err(too_many_redirects, msg) + except HTTPError as http_err: + msg = "Error while fetching prompt response." + result["error"] = self._stringify_and_stream_err(http_err, msg) except RequestException as e: # Extract error information from the response if available error_message = str(e) @@ -91,6 +103,14 @@ def _post_call( ) return result + def _stringify_and_stream_err(self, err: RequestException, msg: str) -> str: + error_message = str(err) + self.tool.stream_log( + f"{msg}: {error_message}", + level=LogLevel.ERROR, + ) + return error_message + @staticmethod def get_exported_tool( tool: BaseTool, prompt_registry_id: str From 1d43206aebbbdc2e1848ec66bd8c9b6d3f02990b Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Thu, 6 Mar 2025 21:30:28 +0530 Subject: [PATCH 02/13] refactor: Indexing API segregation --- src/unstract/sdk/dto.py | 39 ++++++ src/unstract/sdk/extract.py | 20 +++ src/unstract/sdk/index_v2.py | 236 +++++++++++++++++++++++++++++++++++ 3 files changed, 295 insertions(+) create mode 100644 src/unstract/sdk/dto.py create mode 100644 src/unstract/sdk/extract.py create mode 100644 src/unstract/sdk/index_v2.py diff --git a/src/unstract/sdk/dto.py b/src/unstract/sdk/dto.py new file mode 100644 index 00000000..ce973f08 --- /dev/null +++ b/src/unstract/sdk/dto.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, field +from typing import Any, Optional + + +@dataclass +class InstanceIdentifiers: + embedding_instance_id: str + vector_db_instance_id: str + x2text_instance_id: str + llm_instance_id: str + tool_id: str + tags: Optional[list[str]] = None + + +@dataclass +class FileInfo: + file_path: str + output_file_path: str + file_hash: str + + +@dataclass +class ChunkingConfig: + chunk_size: int + chunk_overlap: int + + def __post_init__(self) -> None: + if self.chunk_size == 0: + raise ValueError( + "Indexing cannot be done for zero chunks." + "Please provide a valid chunk_size." + ) + + +@dataclass +class ProcessingOptions: + reindex: bool = False + enable_highlight: bool = False + usage_kwargs: dict[Any, Any] = field(default_factory=dict) diff --git a/src/unstract/sdk/extract.py b/src/unstract/sdk/extract.py new file mode 100644 index 00000000..ccd564c7 --- /dev/null +++ b/src/unstract/sdk/extract.py @@ -0,0 +1,20 @@ +from typing import Optional + +from unstract.sdk.tool.stream import StreamMixin + + +class Extract: + def __init__( + self, + tool: StreamMixin, + run_id: Optional[str] = None, + capture_metrics: bool = False, + ): + # TODO: Inherit from StreamMixin and avoid using BaseTool + self.tool = tool + self._run_id = run_id + self._capture_metrics = capture_metrics + self._metrics = {} + + def extract(self): + pass diff --git a/src/unstract/sdk/index_v2.py b/src/unstract/sdk/index_v2.py new file mode 100644 index 00000000..527fd11d --- /dev/null +++ b/src/unstract/sdk/index_v2.py @@ -0,0 +1,236 @@ +import json +from typing import Optional + +from llama_index.core import Document +from llama_index.core.vector_stores import ( + FilterOperator, + MetadataFilter, + MetadataFilters, + VectorStoreQuery, + VectorStoreQueryResult, +) + +from unstract.sdk.adapter import ToolAdapter +from unstract.sdk.adapters.vectordb.no_op.src.no_op_custom_vectordb import ( + NoOpCustomVectorDB, +) +from unstract.sdk.constants import LogLevel +from unstract.sdk.dto import ( + ChunkingConfig, + FileInfo, + InstanceIdentifiers, + ProcessingOptions, +) +from unstract.sdk.embedding import Embedding +from unstract.sdk.exceptions import IndexingError, SdkError +from unstract.sdk.file_storage.impl import FileStorage +from unstract.sdk.file_storage.provider import FileStorageProvider +from unstract.sdk.tool.stream import StreamMixin +from unstract.sdk.utils.common_utils import capture_metrics, log_elapsed +from unstract.sdk.utils.tool_utils import ToolUtils +from unstract.sdk.vector_db import VectorDB + + +class Index: + def __init__( + self, + tool: StreamMixin, + run_id: Optional[str] = None, + capture_metrics: bool = False, + ): + # TODO: Inherit from StreamMixin and avoid using BaseTool + self.tool = tool + self._run_id = run_id + self._capture_metrics = capture_metrics + self._metrics = {} + + def generate_index_key( + self, + chuking_config: ChunkingConfig, + file_info: FileInfo, + instance_identifiers: InstanceIdentifiers, + fs: FileStorage = FileStorage(provider=FileStorageProvider.LOCAL), + ) -> str: + """Generates a unique index key based on the provided configuration, + file information, instance identifiers, and processing options. + + Args: + chunking_config : ChunkingConfig + file_info (FileInfo): Contains file-related + information such as path and hash. + instance_identifiers (InstanceIdentifiers): Identifiers for + embedding, vector DB, tool, etc. + processing_options (ProcessingOptions): Options related to reindexing, + highlighting, and processing text. + fs (FileStorage, optional): File storage for remote storage. + + Returns: + str: A unique index key used for indexing the document. + """ + if not file_info.file_path and not file_info.file_hash: + raise ValueError("One of `file_path` or `file_hash` need to be provided") + + if not file_info.file_hash: + file_hash = fs.get_hash_from_file(path=file_info.file_path) + + # Whole adapter config is used currently even though it contains some keys + # which might not be relevant to indexing. This is easier for now than + # marking certain keys of the adapter config as necessary. + index_key = { + "file_hash": file_hash, + "vector_db_config": ToolAdapter.get_adapter_config( + self.tool, instance_identifiers.vector_db_instance_id + ), + "embedding_config": ToolAdapter.get_adapter_config( + self.tool, instance_identifiers.embedding_instance_id + ), + "x2text_config": ToolAdapter.get_adapter_config( + self.tool, instance_identifiers.x2text_instance_id + ), + # Typed and hashed as strings since the final hash is persisted + # and this is required to be backward compatible + "chunk_size": str(chuking_config.chunk_size), + "chunk_overlap": str(chuking_config.chunk_overlap), + } + # JSON keys are sorted to ensure that the same key gets hashed even in + # case where the fields are reordered. + hashed_index_key = ToolUtils.hash_str(json.dumps(index_key, sort_keys=True)) + return hashed_index_key + + @capture_metrics + def is_document_indexed( + self, + doc_id: str, + instance_identifiers: InstanceIdentifiers, + processing_options: ProcessingOptions, + embedding: Embedding, + vector_db: VectorDB, + ) -> bool: + """Checks if nodes are already present in the vector database for a + given doc_id. + + Returns: + str: The document ID. + """ + try: + # Checking if document is already indexed against doc_id + doc_id_eq_filter = MetadataFilter.from_dict( + {"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id} + ) + filters = MetadataFilters(filters=[doc_id_eq_filter]) + q = VectorStoreQuery( + query_embedding=embedding.get_query_embedding(" "), + doc_ids=[doc_id], + filters=filters, + ) + + doc_id_found = False + try: + n: VectorStoreQueryResult = vector_db.query(query=q) + if len(n.nodes) > 0: + doc_id_found = True + self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}") + else: + self.tool.stream_log(f"No nodes found for {doc_id}") + except Exception as e: + self.tool.stream_log( + f"Error querying {instance_identifiers.vector_db_instance_id}: {e}," + " proceeding to index", + level=LogLevel.ERROR, + ) + + if doc_id_found and not processing_options.reindex: + self.tool.stream_log(f"File was indexed already under {doc_id}") + return doc_id_found + + except Exception as e: + self.tool.stream_log( + f"Unexpected error during indexing check: {e}", level=LogLevel.ERROR + ) + + return doc_id_found + + @log_elapsed(operation="INDEX") + @capture_metrics + def perform_indexing( + self, + instance_identifiers: InstanceIdentifiers, + chunking_config: ChunkingConfig, + vector_db: VectorDB, + doc_id: str, + extracted_text: str, + doc_id_found: bool, + ): + self._is_no_op_adapter(instance_identifiers, vector_db, doc_id) + + self.tool.stream_log("Indexing file...") + full_text = [ + { + "section": "full", + "text_contents": extracted_text, + } + ] + # Convert raw text to llama index usage Document + documents = self._prepare_documents(doc_id, full_text) + self._delete_existing_nodes_on_reindex(vector_db, doc_id, doc_id_found) + self._trigger_indexing(chunking_config, vector_db, documents) + return doc_id + + def _trigger_indexing(self, chunking_config, vector_db, documents): + self.tool.stream_log("Adding nodes to vector db...") + try: + vector_db.index_document( + documents, + chunk_size=chunking_config.chunk_overlap, + chunk_overlap=chunking_config.chunk_overlap, + show_progress=True, + ) + self.tool.stream_log("File has been indexed successfully") + except Exception as e: + self.tool.stream_log( + f"Error adding nodes to vector db: {e}", + level=LogLevel.ERROR, + ) + raise IndexingError(str(e)) from e + + def _delete_existing_nodes_on_reindex(self, vector_db, doc_id, doc_id_found): + if doc_id_found: + # Delete the nodes for the doc_id + try: + vector_db.delete(ref_doc_id=doc_id) + self.tool.stream_log(f"Deleted nodes for {doc_id}") + except Exception as e: + self.tool.stream_log( + f"Error deleting nodes for {doc_id}: {e}", + level=LogLevel.ERROR, + ) + raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e + + def _prepare_documents(self, doc_id, full_text) -> list: + documents = [] + for item in full_text: + text = item["text_contents"] + document = Document( + text=text, + doc_id=doc_id, + metadata={"section": item["section"]}, + ) + document.id_ = doc_id + documents.append(document) + self.tool.stream_log(f"Number of documents: {len(documents)}") + return documents + + def _is_no_op_adapter( + self, + instance_identifiers: InstanceIdentifiers, + vector_db: VectorDB, + doc_id: str, + ): + if isinstance( + vector_db.get_vector_db( + adapter_instance_id=instance_identifiers.vector_db_instance_id, + embedding_dimension=1, + ), + (NoOpCustomVectorDB), + ): + return doc_id From 071dcb3c3918b4f7fc4ec36b7802000dd6cf3ba4 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Mon, 10 Mar 2025 14:48:52 +0530 Subject: [PATCH 03/13] refactor: Indexing API segregation --- src/unstract/sdk/dto.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/unstract/sdk/dto.py b/src/unstract/sdk/dto.py index ce973f08..3cf5cbaa 100644 --- a/src/unstract/sdk/dto.py +++ b/src/unstract/sdk/dto.py @@ -15,7 +15,6 @@ class InstanceIdentifiers: @dataclass class FileInfo: file_path: str - output_file_path: str file_hash: str From 92ad0bddc59e5882888e425306658002fe8dee3a Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Mon, 10 Mar 2025 15:20:59 +0530 Subject: [PATCH 04/13] refactor: Indexing API segregation --- src/unstract/sdk/index_v2.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/unstract/sdk/index_v2.py b/src/unstract/sdk/index_v2.py index 527fd11d..0f46683b 100644 --- a/src/unstract/sdk/index_v2.py +++ b/src/unstract/sdk/index_v2.py @@ -56,7 +56,7 @@ def generate_index_key( Args: chunking_config : ChunkingConfig - file_info (FileInfo): Contains file-related + file_info (FileInfo): Contains file-related information such as path and hash. instance_identifiers (InstanceIdentifiers): Identifiers for embedding, vector DB, tool, etc. @@ -208,17 +208,25 @@ def _delete_existing_nodes_on_reindex(self, vector_db, doc_id, doc_id_found): def _prepare_documents(self, doc_id, full_text) -> list: documents = [] - for item in full_text: - text = item["text_contents"] - document = Document( - text=text, - doc_id=doc_id, - metadata={"section": item["section"]}, + try : + for item in full_text: + text = item["text_contents"] + document = Document( + text=text, + doc_id=doc_id, + metadata={"section": item["section"]}, + ) + document.id_ = doc_id + documents.append(document) + self.tool.stream_log(f"Number of documents: {len(documents)}") + return documents + except Exception as e: + self.tool.stream_log( + f"Error deleting nodes for {doc_id}: {e}", + level=LogLevel.ERROR, ) - document.id_ = doc_id - documents.append(document) - self.tool.stream_log(f"Number of documents: {len(documents)}") - return documents + raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e + def _is_no_op_adapter( self, From 025b0cebafc1ea2ec3964de411ffde7c533fdcca Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Wed, 12 Mar 2025 19:08:13 +0530 Subject: [PATCH 05/13] Retrievers - Subquestion & Simple --- src/unstract/sdk/exceptions.py | 9 +++ src/unstract/sdk/prompt.py | 26 ++++++++ src/unstract/sdk/retrieval/base_retriever.py | 13 ++++ src/unstract/sdk/retrieval/simple.py | 53 ++++++++++++++++ src/unstract/sdk/retrieval/subquestion.py | 66 ++++++++++++++++++++ 5 files changed, 167 insertions(+) create mode 100644 src/unstract/sdk/retrieval/base_retriever.py create mode 100644 src/unstract/sdk/retrieval/simple.py create mode 100644 src/unstract/sdk/retrieval/subquestion.py diff --git a/src/unstract/sdk/exceptions.py b/src/unstract/sdk/exceptions.py index b5dd3753..dddff56c 100644 --- a/src/unstract/sdk/exceptions.py +++ b/src/unstract/sdk/exceptions.py @@ -97,3 +97,12 @@ class FileOperationError(SdkError): "Please check specific storage error for " "further information" ) + + +class RetrievalError(SdkError): + """Custom exception raised for errors during retrieval from VectorDB.""" + + DEFAULT_MESSAGE = ( + "Error while retrieving data from the VectorDB. " + "Please review the query parameters and ensure the database is accessible." + ) diff --git a/src/unstract/sdk/prompt.py b/src/unstract/sdk/prompt.py index 0d908e6c..29f345eb 100644 --- a/src/unstract/sdk/prompt.py +++ b/src/unstract/sdk/prompt.py @@ -46,6 +46,32 @@ def answer_prompt( payload=payload, params=params, ) + + @log_elapsed(operation="INDEX") + def index( + self, payload: dict[str, Any], params: Optional[dict[str, str]] = None + ) -> dict[str, Any]: + url_path = "index" + if self.is_public_call: + url_path = "index-public" + return self._post_call( + url_path=url_path, + payload=payload, + params=params, + ) + + @log_elapsed(operation="EXTRACT") + def extract( + self, payload: dict[str, Any], params: Optional[dict[str, str]] = None + ) -> dict[str, Any]: + url_path = "extract" + if self.is_public_call: + url_path = "extract-public" + return self._post_call( + url_path=url_path, + payload=payload, + params=params, + ) def single_pass_extraction( self, payload: dict[str, Any], params: Optional[dict[str, str]] = None diff --git a/src/unstract/sdk/retrieval/base_retriever.py b/src/unstract/sdk/retrieval/base_retriever.py new file mode 100644 index 00000000..a53bfbf1 --- /dev/null +++ b/src/unstract/sdk/retrieval/base_retriever.py @@ -0,0 +1,13 @@ +from unstract.sdk.vector_db import VectorDB + + +class BaseRetriever: + def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): + self.vector_db = vector_db + self.prompt = prompt + self.doc_id = doc_id + self.top_k = top_k + + @staticmethod + def retrieve() -> set[str]: + return set() diff --git a/src/unstract/sdk/retrieval/simple.py b/src/unstract/sdk/retrieval/simple.py new file mode 100644 index 00000000..626d4593 --- /dev/null +++ b/src/unstract/sdk/retrieval/simple.py @@ -0,0 +1,53 @@ +import logging +import time + +from llama_index.core import VectorStoreIndex +from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters + +from unstract.sdk.retrieval.base_retriever import BaseRetriever +from unstract.sdk.vector_db import VectorDB + +logger = logging.getLogger(__name__) + + +class SimpleRetriever(BaseRetriever): + def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): + self.vector_db = vector_db + self.prompt = prompt + self.doc_id = doc_id + self.top_k = top_k + + def retrieve(self) -> set[str]: + vector_query_engine: VectorStoreIndex = self.vector_db.get_vector_store_index() + retriever = vector_query_engine.as_retriever( + similarity_top_k=self.top_k, + filters=MetadataFilters( + filters=[ + ExactMatchFilter(key="doc_id", value=self.doc_id), + ], + ), + ) + nodes = retriever.retrieve(self.prompt) + context: set[str] = set() + for node in nodes: + # ToDo: May have to fine-tune this value for node score or keep it + # configurable at the adapter level + if node.score > 0: + context.add(node.get_content()) + else: + logger.info( + "Node score is less than 0. " + f"Ignored: {node.node_id} with score {node.score}" + ) + + if not context: + # UN-1288 For Pinecone, we are seeing an inconsistent case where + # query with doc_id fails even though indexing just happened. + # This causes the following retrieve to return no text. + # To rule out any lag on the Pinecone vector DB write, + # the following sleep is added + # Note: This will not fix the issue. Since this issue is inconsistent + # and not reproducible easily, this is just a safety net. + time.sleep(2) + context = self.retrieve(self.prompt, self.doc_id, self.top_k) + return context diff --git a/src/unstract/sdk/retrieval/subquestion.py b/src/unstract/sdk/retrieval/subquestion.py new file mode 100644 index 00000000..e50291d1 --- /dev/null +++ b/src/unstract/sdk/retrieval/subquestion.py @@ -0,0 +1,66 @@ +import logging + +from llama_index.core.query_engine import SubQuestionQueryEngine +from llama_index.core.tools import QueryEngineTool, ToolMetadata + +from unstract.sdk.exceptions import RetrievalError +from unstract.sdk.retrieval.base_retriever import BaseRetriever +from unstract.sdk.vector_db import VectorDB + +logger = logging.getLogger(__name__) + + +class SubquestionRetrieval(BaseRetriever): + """SubquestionRetrieval class for querying VectorDB using LlamaIndex's + SubQuestionQueryEngine.""" + + def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): + """Initialize the SubquestionRetrieval class. + + Args: + vector_db (VectorDB): The vector database instance. + prompt (str): The query prompt. + doc_id (str): Document identifier for query context. + top_k (int): Number of top results to retrieve. + """ + self.vector_db = vector_db + self.prompt = prompt + self.doc_id = doc_id + self.top_k = top_k + + def retrieve(self) -> set[str]: + """Retrieve text chunks from the VectorDB based on the provided prompt. + + Returns: + set[str]: A set of text chunks retrieved from the database. + """ + try: + vector_query_engine = ( + self.vector_db.get_vector_store_index().as_query_engine() + ) + + query_engine_tools = [ + QueryEngineTool( + query_engine=vector_query_engine, + metadata=ToolMetadata(name=self.doc_id), + ), + ] + + query_engine = SubQuestionQueryEngine.from_defaults( + query_engine_tools=query_engine_tools, + use_async=True, + ) + + response = query_engine.query( + prompt=self.prompt, + top_k=self.top_k, + return_full_response=True, + ) + + chunks: set[str] = {node.text for node in response.source_nodes} + logger.info(f"Successfully retrieved {len(chunks)} chunks.") + return chunks + + except Exception as e: + logger.error(f"Error during retrieving chunks {self.doc_id}: {e}") + raise RetrievalError(str(e)) from e From 1a0dbb4f7af81630c009f591a23db3d6cbef14f3 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Mon, 17 Mar 2025 14:43:11 +0530 Subject: [PATCH 06/13] Addressing review comments --- src/unstract/sdk/exceptions.py | 2 +- src/unstract/sdk/extract.py | 20 ----------- src/unstract/sdk/index_v2.py | 36 ++++++++++---------- src/unstract/sdk/retrieval/base_retriever.py | 8 +++++ src/unstract/sdk/retrieval/simple.py | 6 ---- src/unstract/sdk/retrieval/subquestion.py | 14 -------- 6 files changed, 27 insertions(+), 59 deletions(-) delete mode 100644 src/unstract/sdk/extract.py diff --git a/src/unstract/sdk/exceptions.py b/src/unstract/sdk/exceptions.py index dddff56c..8803000c 100644 --- a/src/unstract/sdk/exceptions.py +++ b/src/unstract/sdk/exceptions.py @@ -104,5 +104,5 @@ class RetrievalError(SdkError): DEFAULT_MESSAGE = ( "Error while retrieving data from the VectorDB. " - "Please review the query parameters and ensure the database is accessible." + "Please contact the admin for further assistance." ) diff --git a/src/unstract/sdk/extract.py b/src/unstract/sdk/extract.py deleted file mode 100644 index ccd564c7..00000000 --- a/src/unstract/sdk/extract.py +++ /dev/null @@ -1,20 +0,0 @@ -from typing import Optional - -from unstract.sdk.tool.stream import StreamMixin - - -class Extract: - def __init__( - self, - tool: StreamMixin, - run_id: Optional[str] = None, - capture_metrics: bool = False, - ): - # TODO: Inherit from StreamMixin and avoid using BaseTool - self.tool = tool - self._run_id = run_id - self._capture_metrics = capture_metrics - self._metrics = {} - - def extract(self): - pass diff --git a/src/unstract/sdk/index_v2.py b/src/unstract/sdk/index_v2.py index 0f46683b..2f30ca0b 100644 --- a/src/unstract/sdk/index_v2.py +++ b/src/unstract/sdk/index_v2.py @@ -172,7 +172,8 @@ def perform_indexing( ] # Convert raw text to llama index usage Document documents = self._prepare_documents(doc_id, full_text) - self._delete_existing_nodes_on_reindex(vector_db, doc_id, doc_id_found) + if doc_id_found: + self.delete_nodes(vector_db, doc_id) self._trigger_indexing(chunking_config, vector_db, documents) return doc_id @@ -193,22 +194,20 @@ def _trigger_indexing(self, chunking_config, vector_db, documents): ) raise IndexingError(str(e)) from e - def _delete_existing_nodes_on_reindex(self, vector_db, doc_id, doc_id_found): - if doc_id_found: - # Delete the nodes for the doc_id - try: - vector_db.delete(ref_doc_id=doc_id) - self.tool.stream_log(f"Deleted nodes for {doc_id}") - except Exception as e: - self.tool.stream_log( - f"Error deleting nodes for {doc_id}: {e}", - level=LogLevel.ERROR, - ) - raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e + def delete_nodes(self, vector_db: VectorDB, doc_id: str): + try: + vector_db.delete(ref_doc_id=doc_id) + self.tool.stream_log(f"Deleted nodes for {doc_id}") + except Exception as e: + self.tool.stream_log( + f"Error deleting nodes for {doc_id}: {e}", + level=LogLevel.ERROR, + ) + raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e - def _prepare_documents(self, doc_id, full_text) -> list: + def _prepare_documents(self, doc_id: str, full_text: str) -> list: documents = [] - try : + try: for item in full_text: text = item["text_contents"] document = Document( @@ -222,11 +221,12 @@ def _prepare_documents(self, doc_id, full_text) -> list: return documents except Exception as e: self.tool.stream_log( - f"Error deleting nodes for {doc_id}: {e}", + f"Error while processing documents {doc_id}: {e}", level=LogLevel.ERROR, ) - raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e - + raise SdkError( + f"Error while processing documents for indexing {doc_id}: {e}" + ) from e def _is_no_op_adapter( self, diff --git a/src/unstract/sdk/retrieval/base_retriever.py b/src/unstract/sdk/retrieval/base_retriever.py index a53bfbf1..5dc26bd6 100644 --- a/src/unstract/sdk/retrieval/base_retriever.py +++ b/src/unstract/sdk/retrieval/base_retriever.py @@ -3,6 +3,14 @@ class BaseRetriever: def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): + """Initialize the Retrieval class. + + Args: + vector_db (VectorDB): The vector database instance. + prompt (str): The query prompt. + doc_id (str): Document identifier for query context. + top_k (int): Number of top results to retrieve. + """ self.vector_db = vector_db self.prompt = prompt self.doc_id = doc_id diff --git a/src/unstract/sdk/retrieval/simple.py b/src/unstract/sdk/retrieval/simple.py index 626d4593..87ac7b14 100644 --- a/src/unstract/sdk/retrieval/simple.py +++ b/src/unstract/sdk/retrieval/simple.py @@ -11,12 +11,6 @@ class SimpleRetriever(BaseRetriever): - def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): - self.vector_db = vector_db - self.prompt = prompt - self.doc_id = doc_id - self.top_k = top_k - def retrieve(self) -> set[str]: vector_query_engine: VectorStoreIndex = self.vector_db.get_vector_store_index() retriever = vector_query_engine.as_retriever( diff --git a/src/unstract/sdk/retrieval/subquestion.py b/src/unstract/sdk/retrieval/subquestion.py index e50291d1..7fc2d051 100644 --- a/src/unstract/sdk/retrieval/subquestion.py +++ b/src/unstract/sdk/retrieval/subquestion.py @@ -14,20 +14,6 @@ class SubquestionRetrieval(BaseRetriever): """SubquestionRetrieval class for querying VectorDB using LlamaIndex's SubQuestionQueryEngine.""" - def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): - """Initialize the SubquestionRetrieval class. - - Args: - vector_db (VectorDB): The vector database instance. - prompt (str): The query prompt. - doc_id (str): Document identifier for query context. - top_k (int): Number of top results to retrieve. - """ - self.vector_db = vector_db - self.prompt = prompt - self.doc_id = doc_id - self.top_k = top_k - def retrieve(self) -> set[str]: """Retrieve text chunks from the VectorDB based on the provided prompt. From 431391fed08126b9caefe2bffc6f3977c50e3460 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Mon, 17 Mar 2025 15:19:14 +0530 Subject: [PATCH 07/13] Addressing review comments --- src/unstract/sdk/index_v2.py | 118 ++++++++++++++++------------------- 1 file changed, 53 insertions(+), 65 deletions(-) diff --git a/src/unstract/sdk/index_v2.py b/src/unstract/sdk/index_v2.py index 2f30ca0b..b344bb69 100644 --- a/src/unstract/sdk/index_v2.py +++ b/src/unstract/sdk/index_v2.py @@ -1,4 +1,5 @@ import json +import logging from typing import Optional from llama_index.core import Document @@ -26,15 +27,20 @@ from unstract.sdk.file_storage.impl import FileStorage from unstract.sdk.file_storage.provider import FileStorageProvider from unstract.sdk.tool.stream import StreamMixin -from unstract.sdk.utils.common_utils import capture_metrics, log_elapsed +from unstract.sdk.utils.common_utils import capture_metrics from unstract.sdk.utils.tool_utils import ToolUtils from unstract.sdk.vector_db import VectorDB +logger = logging.getLogger(__name__) + class Index: def __init__( self, tool: StreamMixin, + instance_identifiers: InstanceIdentifiers, + chunking_config: ChunkingConfig, + processing_options: ProcessingOptions, run_id: Optional[str] = None, capture_metrics: bool = False, ): @@ -42,13 +48,14 @@ def __init__( self.tool = tool self._run_id = run_id self._capture_metrics = capture_metrics + self.instance_identifiers = instance_identifiers + self.chunking_config = chunking_config + self.processing_options = processing_options self._metrics = {} def generate_index_key( self, - chuking_config: ChunkingConfig, file_info: FileInfo, - instance_identifiers: InstanceIdentifiers, fs: FileStorage = FileStorage(provider=FileStorageProvider.LOCAL), ) -> str: """Generates a unique index key based on the provided configuration, @@ -79,18 +86,18 @@ def generate_index_key( index_key = { "file_hash": file_hash, "vector_db_config": ToolAdapter.get_adapter_config( - self.tool, instance_identifiers.vector_db_instance_id + self.tool, self.instance_identifiers.vector_db_instance_id ), "embedding_config": ToolAdapter.get_adapter_config( - self.tool, instance_identifiers.embedding_instance_id + self.tool, self.instance_identifiers.embedding_instance_id ), "x2text_config": ToolAdapter.get_adapter_config( - self.tool, instance_identifiers.x2text_instance_id + self.tool, self.instance_identifiers.x2text_instance_id ), # Typed and hashed as strings since the final hash is persisted # and this is required to be backward compatible - "chunk_size": str(chuking_config.chunk_size), - "chunk_overlap": str(chuking_config.chunk_overlap), + "chunk_size": str(self.chunking_config.chunk_size), + "chunk_overlap": str(self.chunking_config.chunk_overlap), } # JSON keys are sorted to ensure that the same key gets hashed even in # case where the fields are reordered. @@ -101,8 +108,6 @@ def generate_index_key( def is_document_indexed( self, doc_id: str, - instance_identifiers: InstanceIdentifiers, - processing_options: ProcessingOptions, embedding: Embedding, vector_db: VectorDB, ) -> bool: @@ -112,56 +117,54 @@ def is_document_indexed( Returns: str: The document ID. """ + # Checking if document is already indexed against doc_id + doc_id_eq_filter = MetadataFilter.from_dict( + {"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id} + ) + filters = MetadataFilters(filters=[doc_id_eq_filter]) + q = VectorStoreQuery( + query_embedding=embedding.get_query_embedding(" "), + doc_ids=[doc_id], + filters=filters, + ) + + doc_id_found = False try: - # Checking if document is already indexed against doc_id - doc_id_eq_filter = MetadataFilter.from_dict( - {"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id} - ) - filters = MetadataFilters(filters=[doc_id_eq_filter]) - q = VectorStoreQuery( - query_embedding=embedding.get_query_embedding(" "), - doc_ids=[doc_id], - filters=filters, - ) - - doc_id_found = False - try: - n: VectorStoreQueryResult = vector_db.query(query=q) - if len(n.nodes) > 0: - doc_id_found = True - self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}") - else: - self.tool.stream_log(f"No nodes found for {doc_id}") - except Exception as e: - self.tool.stream_log( - f"Error querying {instance_identifiers.vector_db_instance_id}: {e}," - " proceeding to index", - level=LogLevel.ERROR, - ) - - if doc_id_found and not processing_options.reindex: - self.tool.stream_log(f"File was indexed already under {doc_id}") - return doc_id_found - + n: VectorStoreQueryResult = vector_db.query(query=q) + if len(n.nodes) > 0: + doc_id_found = True + self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}") + else: + self.tool.stream_log(f"No nodes found for {doc_id}") except Exception as e: - self.tool.stream_log( - f"Unexpected error during indexing check: {e}", level=LogLevel.ERROR + logger.warning( + f"Error querying {self.instance_identifiers.vector_db_instance_id}:" + f" {str(e)}, proceeding to index", + exc_info=True, ) + if doc_id_found and not self.processing_options.reindex: + self.tool.stream_log(f"File was indexed already under {doc_id}") + return doc_id_found + return doc_id_found - @log_elapsed(operation="INDEX") @capture_metrics def perform_indexing( self, - instance_identifiers: InstanceIdentifiers, - chunking_config: ChunkingConfig, vector_db: VectorDB, doc_id: str, extracted_text: str, doc_id_found: bool, ): - self._is_no_op_adapter(instance_identifiers, vector_db, doc_id) + if isinstance( + vector_db.get_vector_db( + adapter_instance_id=self.instance_identifiers.vector_db_instance_id, + embedding_dimension=1, + ), + (NoOpCustomVectorDB), + ): + return doc_id self.tool.stream_log("Indexing file...") full_text = [ @@ -174,16 +177,16 @@ def perform_indexing( documents = self._prepare_documents(doc_id, full_text) if doc_id_found: self.delete_nodes(vector_db, doc_id) - self._trigger_indexing(chunking_config, vector_db, documents) + self._trigger_indexing(vector_db, documents) return doc_id - def _trigger_indexing(self, chunking_config, vector_db, documents): + def _trigger_indexing(self, vector_db, documents): self.tool.stream_log("Adding nodes to vector db...") try: vector_db.index_document( documents, - chunk_size=chunking_config.chunk_overlap, - chunk_overlap=chunking_config.chunk_overlap, + chunk_size=self.chunking_config.chunk_overlap, + chunk_overlap=self.chunking_config.chunk_overlap, show_progress=True, ) self.tool.stream_log("File has been indexed successfully") @@ -227,18 +230,3 @@ def _prepare_documents(self, doc_id: str, full_text: str) -> list: raise SdkError( f"Error while processing documents for indexing {doc_id}: {e}" ) from e - - def _is_no_op_adapter( - self, - instance_identifiers: InstanceIdentifiers, - vector_db: VectorDB, - doc_id: str, - ): - if isinstance( - vector_db.get_vector_db( - adapter_instance_id=instance_identifiers.vector_db_instance_id, - embedding_dimension=1, - ), - (NoOpCustomVectorDB), - ): - return doc_id From 6372fc600d29fa9c7b26524c7cc87d38eec50f64 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Mon, 17 Mar 2025 18:45:20 +0530 Subject: [PATCH 08/13] Moving helpers to application --- src/unstract/sdk/dto.py | 38 --- src/unstract/sdk/index_v2.py | 232 ------------------- src/unstract/sdk/retrieval/base_retriever.py | 21 -- src/unstract/sdk/retrieval/simple.py | 47 ---- src/unstract/sdk/retrieval/subquestion.py | 52 ----- 5 files changed, 390 deletions(-) delete mode 100644 src/unstract/sdk/dto.py delete mode 100644 src/unstract/sdk/index_v2.py delete mode 100644 src/unstract/sdk/retrieval/base_retriever.py delete mode 100644 src/unstract/sdk/retrieval/simple.py delete mode 100644 src/unstract/sdk/retrieval/subquestion.py diff --git a/src/unstract/sdk/dto.py b/src/unstract/sdk/dto.py deleted file mode 100644 index 3cf5cbaa..00000000 --- a/src/unstract/sdk/dto.py +++ /dev/null @@ -1,38 +0,0 @@ -from dataclasses import dataclass, field -from typing import Any, Optional - - -@dataclass -class InstanceIdentifiers: - embedding_instance_id: str - vector_db_instance_id: str - x2text_instance_id: str - llm_instance_id: str - tool_id: str - tags: Optional[list[str]] = None - - -@dataclass -class FileInfo: - file_path: str - file_hash: str - - -@dataclass -class ChunkingConfig: - chunk_size: int - chunk_overlap: int - - def __post_init__(self) -> None: - if self.chunk_size == 0: - raise ValueError( - "Indexing cannot be done for zero chunks." - "Please provide a valid chunk_size." - ) - - -@dataclass -class ProcessingOptions: - reindex: bool = False - enable_highlight: bool = False - usage_kwargs: dict[Any, Any] = field(default_factory=dict) diff --git a/src/unstract/sdk/index_v2.py b/src/unstract/sdk/index_v2.py deleted file mode 100644 index b344bb69..00000000 --- a/src/unstract/sdk/index_v2.py +++ /dev/null @@ -1,232 +0,0 @@ -import json -import logging -from typing import Optional - -from llama_index.core import Document -from llama_index.core.vector_stores import ( - FilterOperator, - MetadataFilter, - MetadataFilters, - VectorStoreQuery, - VectorStoreQueryResult, -) - -from unstract.sdk.adapter import ToolAdapter -from unstract.sdk.adapters.vectordb.no_op.src.no_op_custom_vectordb import ( - NoOpCustomVectorDB, -) -from unstract.sdk.constants import LogLevel -from unstract.sdk.dto import ( - ChunkingConfig, - FileInfo, - InstanceIdentifiers, - ProcessingOptions, -) -from unstract.sdk.embedding import Embedding -from unstract.sdk.exceptions import IndexingError, SdkError -from unstract.sdk.file_storage.impl import FileStorage -from unstract.sdk.file_storage.provider import FileStorageProvider -from unstract.sdk.tool.stream import StreamMixin -from unstract.sdk.utils.common_utils import capture_metrics -from unstract.sdk.utils.tool_utils import ToolUtils -from unstract.sdk.vector_db import VectorDB - -logger = logging.getLogger(__name__) - - -class Index: - def __init__( - self, - tool: StreamMixin, - instance_identifiers: InstanceIdentifiers, - chunking_config: ChunkingConfig, - processing_options: ProcessingOptions, - run_id: Optional[str] = None, - capture_metrics: bool = False, - ): - # TODO: Inherit from StreamMixin and avoid using BaseTool - self.tool = tool - self._run_id = run_id - self._capture_metrics = capture_metrics - self.instance_identifiers = instance_identifiers - self.chunking_config = chunking_config - self.processing_options = processing_options - self._metrics = {} - - def generate_index_key( - self, - file_info: FileInfo, - fs: FileStorage = FileStorage(provider=FileStorageProvider.LOCAL), - ) -> str: - """Generates a unique index key based on the provided configuration, - file information, instance identifiers, and processing options. - - Args: - chunking_config : ChunkingConfig - file_info (FileInfo): Contains file-related - information such as path and hash. - instance_identifiers (InstanceIdentifiers): Identifiers for - embedding, vector DB, tool, etc. - processing_options (ProcessingOptions): Options related to reindexing, - highlighting, and processing text. - fs (FileStorage, optional): File storage for remote storage. - - Returns: - str: A unique index key used for indexing the document. - """ - if not file_info.file_path and not file_info.file_hash: - raise ValueError("One of `file_path` or `file_hash` need to be provided") - - if not file_info.file_hash: - file_hash = fs.get_hash_from_file(path=file_info.file_path) - - # Whole adapter config is used currently even though it contains some keys - # which might not be relevant to indexing. This is easier for now than - # marking certain keys of the adapter config as necessary. - index_key = { - "file_hash": file_hash, - "vector_db_config": ToolAdapter.get_adapter_config( - self.tool, self.instance_identifiers.vector_db_instance_id - ), - "embedding_config": ToolAdapter.get_adapter_config( - self.tool, self.instance_identifiers.embedding_instance_id - ), - "x2text_config": ToolAdapter.get_adapter_config( - self.tool, self.instance_identifiers.x2text_instance_id - ), - # Typed and hashed as strings since the final hash is persisted - # and this is required to be backward compatible - "chunk_size": str(self.chunking_config.chunk_size), - "chunk_overlap": str(self.chunking_config.chunk_overlap), - } - # JSON keys are sorted to ensure that the same key gets hashed even in - # case where the fields are reordered. - hashed_index_key = ToolUtils.hash_str(json.dumps(index_key, sort_keys=True)) - return hashed_index_key - - @capture_metrics - def is_document_indexed( - self, - doc_id: str, - embedding: Embedding, - vector_db: VectorDB, - ) -> bool: - """Checks if nodes are already present in the vector database for a - given doc_id. - - Returns: - str: The document ID. - """ - # Checking if document is already indexed against doc_id - doc_id_eq_filter = MetadataFilter.from_dict( - {"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id} - ) - filters = MetadataFilters(filters=[doc_id_eq_filter]) - q = VectorStoreQuery( - query_embedding=embedding.get_query_embedding(" "), - doc_ids=[doc_id], - filters=filters, - ) - - doc_id_found = False - try: - n: VectorStoreQueryResult = vector_db.query(query=q) - if len(n.nodes) > 0: - doc_id_found = True - self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}") - else: - self.tool.stream_log(f"No nodes found for {doc_id}") - except Exception as e: - logger.warning( - f"Error querying {self.instance_identifiers.vector_db_instance_id}:" - f" {str(e)}, proceeding to index", - exc_info=True, - ) - - if doc_id_found and not self.processing_options.reindex: - self.tool.stream_log(f"File was indexed already under {doc_id}") - return doc_id_found - - return doc_id_found - - @capture_metrics - def perform_indexing( - self, - vector_db: VectorDB, - doc_id: str, - extracted_text: str, - doc_id_found: bool, - ): - if isinstance( - vector_db.get_vector_db( - adapter_instance_id=self.instance_identifiers.vector_db_instance_id, - embedding_dimension=1, - ), - (NoOpCustomVectorDB), - ): - return doc_id - - self.tool.stream_log("Indexing file...") - full_text = [ - { - "section": "full", - "text_contents": extracted_text, - } - ] - # Convert raw text to llama index usage Document - documents = self._prepare_documents(doc_id, full_text) - if doc_id_found: - self.delete_nodes(vector_db, doc_id) - self._trigger_indexing(vector_db, documents) - return doc_id - - def _trigger_indexing(self, vector_db, documents): - self.tool.stream_log("Adding nodes to vector db...") - try: - vector_db.index_document( - documents, - chunk_size=self.chunking_config.chunk_overlap, - chunk_overlap=self.chunking_config.chunk_overlap, - show_progress=True, - ) - self.tool.stream_log("File has been indexed successfully") - except Exception as e: - self.tool.stream_log( - f"Error adding nodes to vector db: {e}", - level=LogLevel.ERROR, - ) - raise IndexingError(str(e)) from e - - def delete_nodes(self, vector_db: VectorDB, doc_id: str): - try: - vector_db.delete(ref_doc_id=doc_id) - self.tool.stream_log(f"Deleted nodes for {doc_id}") - except Exception as e: - self.tool.stream_log( - f"Error deleting nodes for {doc_id}: {e}", - level=LogLevel.ERROR, - ) - raise SdkError(f"Error deleting nodes for {doc_id}: {e}") from e - - def _prepare_documents(self, doc_id: str, full_text: str) -> list: - documents = [] - try: - for item in full_text: - text = item["text_contents"] - document = Document( - text=text, - doc_id=doc_id, - metadata={"section": item["section"]}, - ) - document.id_ = doc_id - documents.append(document) - self.tool.stream_log(f"Number of documents: {len(documents)}") - return documents - except Exception as e: - self.tool.stream_log( - f"Error while processing documents {doc_id}: {e}", - level=LogLevel.ERROR, - ) - raise SdkError( - f"Error while processing documents for indexing {doc_id}: {e}" - ) from e diff --git a/src/unstract/sdk/retrieval/base_retriever.py b/src/unstract/sdk/retrieval/base_retriever.py deleted file mode 100644 index 5dc26bd6..00000000 --- a/src/unstract/sdk/retrieval/base_retriever.py +++ /dev/null @@ -1,21 +0,0 @@ -from unstract.sdk.vector_db import VectorDB - - -class BaseRetriever: - def __init__(self, vector_db: VectorDB, prompt: str, doc_id: str, top_k: int): - """Initialize the Retrieval class. - - Args: - vector_db (VectorDB): The vector database instance. - prompt (str): The query prompt. - doc_id (str): Document identifier for query context. - top_k (int): Number of top results to retrieve. - """ - self.vector_db = vector_db - self.prompt = prompt - self.doc_id = doc_id - self.top_k = top_k - - @staticmethod - def retrieve() -> set[str]: - return set() diff --git a/src/unstract/sdk/retrieval/simple.py b/src/unstract/sdk/retrieval/simple.py deleted file mode 100644 index 87ac7b14..00000000 --- a/src/unstract/sdk/retrieval/simple.py +++ /dev/null @@ -1,47 +0,0 @@ -import logging -import time - -from llama_index.core import VectorStoreIndex -from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters - -from unstract.sdk.retrieval.base_retriever import BaseRetriever -from unstract.sdk.vector_db import VectorDB - -logger = logging.getLogger(__name__) - - -class SimpleRetriever(BaseRetriever): - def retrieve(self) -> set[str]: - vector_query_engine: VectorStoreIndex = self.vector_db.get_vector_store_index() - retriever = vector_query_engine.as_retriever( - similarity_top_k=self.top_k, - filters=MetadataFilters( - filters=[ - ExactMatchFilter(key="doc_id", value=self.doc_id), - ], - ), - ) - nodes = retriever.retrieve(self.prompt) - context: set[str] = set() - for node in nodes: - # ToDo: May have to fine-tune this value for node score or keep it - # configurable at the adapter level - if node.score > 0: - context.add(node.get_content()) - else: - logger.info( - "Node score is less than 0. " - f"Ignored: {node.node_id} with score {node.score}" - ) - - if not context: - # UN-1288 For Pinecone, we are seeing an inconsistent case where - # query with doc_id fails even though indexing just happened. - # This causes the following retrieve to return no text. - # To rule out any lag on the Pinecone vector DB write, - # the following sleep is added - # Note: This will not fix the issue. Since this issue is inconsistent - # and not reproducible easily, this is just a safety net. - time.sleep(2) - context = self.retrieve(self.prompt, self.doc_id, self.top_k) - return context diff --git a/src/unstract/sdk/retrieval/subquestion.py b/src/unstract/sdk/retrieval/subquestion.py deleted file mode 100644 index 7fc2d051..00000000 --- a/src/unstract/sdk/retrieval/subquestion.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging - -from llama_index.core.query_engine import SubQuestionQueryEngine -from llama_index.core.tools import QueryEngineTool, ToolMetadata - -from unstract.sdk.exceptions import RetrievalError -from unstract.sdk.retrieval.base_retriever import BaseRetriever -from unstract.sdk.vector_db import VectorDB - -logger = logging.getLogger(__name__) - - -class SubquestionRetrieval(BaseRetriever): - """SubquestionRetrieval class for querying VectorDB using LlamaIndex's - SubQuestionQueryEngine.""" - - def retrieve(self) -> set[str]: - """Retrieve text chunks from the VectorDB based on the provided prompt. - - Returns: - set[str]: A set of text chunks retrieved from the database. - """ - try: - vector_query_engine = ( - self.vector_db.get_vector_store_index().as_query_engine() - ) - - query_engine_tools = [ - QueryEngineTool( - query_engine=vector_query_engine, - metadata=ToolMetadata(name=self.doc_id), - ), - ] - - query_engine = SubQuestionQueryEngine.from_defaults( - query_engine_tools=query_engine_tools, - use_async=True, - ) - - response = query_engine.query( - prompt=self.prompt, - top_k=self.top_k, - return_full_response=True, - ) - - chunks: set[str] = {node.text for node in response.source_nodes} - logger.info(f"Successfully retrieved {len(chunks)} chunks.") - return chunks - - except Exception as e: - logger.error(f"Error during retrieving chunks {self.doc_id}: {e}") - raise RetrievalError(str(e)) from e From dc041353b9ce440dc42e0b70cc2d20dbecf4da52 Mon Sep 17 00:00:00 2001 From: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com> Date: Tue, 18 Mar 2025 10:24:23 +0530 Subject: [PATCH 09/13] Remove unused exceptions Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com> --- src/unstract/sdk/exceptions.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/unstract/sdk/exceptions.py b/src/unstract/sdk/exceptions.py index 8803000c..b5dd3753 100644 --- a/src/unstract/sdk/exceptions.py +++ b/src/unstract/sdk/exceptions.py @@ -97,12 +97,3 @@ class FileOperationError(SdkError): "Please check specific storage error for " "further information" ) - - -class RetrievalError(SdkError): - """Custom exception raised for errors during retrieval from VectorDB.""" - - DEFAULT_MESSAGE = ( - "Error while retrieving data from the VectorDB. " - "Please contact the admin for further assistance." - ) From d2e5ae6d42ce32f037c9a41fd527890401116bed Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Tue, 18 Mar 2025 11:49:02 +0530 Subject: [PATCH 10/13] Adding Index util to generate index key --- src/unstract/sdk/utils/indexing_utils.py | 54 ++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 src/unstract/sdk/utils/indexing_utils.py diff --git a/src/unstract/sdk/utils/indexing_utils.py b/src/unstract/sdk/utils/indexing_utils.py new file mode 100644 index 00000000..8f2f0325 --- /dev/null +++ b/src/unstract/sdk/utils/indexing_utils.py @@ -0,0 +1,54 @@ +import json +from typing import Optional + +from unstract.sdk.adapter import ToolAdapter +from unstract.sdk.file_storage import FileStorage, FileStorageProvider +from unstract.sdk.tool.base import BaseTool +from unstract.sdk.utils import ToolUtils + + +class IndexingUtils: + @staticmethod + def generate_index_key( + vector_db: str, + embedding: str, + x2text: str, + chunk_size: str, + chunk_overlap: str, + tool: BaseTool, + file_path: Optional[str] = None, + file_hash: Optional[str] = None, + fs: FileStorage = FileStorage(provider=FileStorageProvider.LOCAL), + ) -> str: + """Generates a unique index key based on the provided configuration, + file information, instance identifiers, and processing options. + + Args: + fs (FileStorage, optional): File storage for remote storage. + + Returns: + str: A unique index key used for indexing the document. + """ + if not file_path and not file_hash: + raise ValueError("One of `file_path` or `file_hash` need to be provided") + + if not file_hash: + file_hash = fs.get_hash_from_file(path=file_path) + + # Whole adapter config is used currently even though it contains some keys + # which might not be relevant to indexing. This is easier for now than + # marking certain keys of the adapter config as necessary. + index_key = { + "file_hash": file_hash, + "vector_db_config": ToolAdapter.get_adapter_config(tool, vector_db), + "embedding_config": ToolAdapter.get_adapter_config(tool, embedding), + "x2text_config": ToolAdapter.get_adapter_config(tool, x2text), + # Typed and hashed as strings since the final hash is persisted + # and this is required to be backward compatible + "chunk_size": str(chunk_size), + "chunk_overlap": str(chunk_overlap), + } + # JSON keys are sorted to ensure that the same key gets hashed even in + # case where the fields are reordered. + hashed_index_key = ToolUtils.hash_str(json.dumps(index_key, sort_keys=True)) + return hashed_index_key From 176aa972659bae4a05f8a954fa3fefad9067510c Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Wed, 19 Mar 2025 10:01:51 +0530 Subject: [PATCH 11/13] Version bump --- src/unstract/sdk/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unstract/sdk/__init__.py b/src/unstract/sdk/__init__.py index 16e1590a..dfe2c455 100644 --- a/src/unstract/sdk/__init__.py +++ b/src/unstract/sdk/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.60.1" +__version__ = "0.60.2" def get_sdk_version(): From aa1d8ce7037e811a1c656dc60398aa9f2ac82a90 Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Wed, 19 Mar 2025 10:13:19 +0530 Subject: [PATCH 12/13] Version bump --- src/unstract/sdk/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unstract/sdk/__init__.py b/src/unstract/sdk/__init__.py index dfe2c455..b7790927 100644 --- a/src/unstract/sdk/__init__.py +++ b/src/unstract/sdk/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.60.2" +__version__ = "0.61.0" def get_sdk_version(): From 3f4c2a4a0bf0dd660cf133d0fb1d952badd6a0cc Mon Sep 17 00:00:00 2001 From: harini-venkataraman Date: Wed, 19 Mar 2025 10:25:08 +0530 Subject: [PATCH 13/13] Adding headers for API --- src/unstract/sdk/prompt.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/unstract/sdk/prompt.py b/src/unstract/sdk/prompt.py index 29f345eb..27eed7fc 100644 --- a/src/unstract/sdk/prompt.py +++ b/src/unstract/sdk/prompt.py @@ -49,7 +49,10 @@ def answer_prompt( @log_elapsed(operation="INDEX") def index( - self, payload: dict[str, Any], params: Optional[dict[str, str]] = None + self, + payload: dict[str, Any], + params: Optional[dict[str, str]] = None, + headers: Optional[dict[str, str]] = None, ) -> dict[str, Any]: url_path = "index" if self.is_public_call: @@ -58,11 +61,15 @@ def index( url_path=url_path, payload=payload, params=params, + headers=headers, ) @log_elapsed(operation="EXTRACT") def extract( - self, payload: dict[str, Any], params: Optional[dict[str, str]] = None + self, + payload: dict[str, Any], + params: Optional[dict[str, str]] = None, + headers: Optional[dict[str, str]] = None, ) -> dict[str, Any]: url_path = "extract" if self.is_public_call: @@ -71,6 +78,7 @@ def extract( url_path=url_path, payload=payload, params=params, + headers=headers, ) def single_pass_extraction(