diff --git a/src/unstract/sdk/exceptions.py b/src/unstract/sdk/exceptions.py index 9b077a5b..408e8500 100644 --- a/src/unstract/sdk/exceptions.py +++ b/src/unstract/sdk/exceptions.py @@ -8,3 +8,9 @@ def __init__(self, message: str = DEFAULT_MESSAGE): def __str__(self) -> str: return self.message + + +class IndexingError(SdkError): + def __init__(self, message: str = ""): + prefix = "Error with indexing. " + super().__init__(prefix + message) diff --git a/src/unstract/sdk/index.py b/src/unstract/sdk/index.py index 73599ece..e2634d4b 100644 --- a/src/unstract/sdk/index.py +++ b/src/unstract/sdk/index.py @@ -2,13 +2,19 @@ from llama_index import Document, StorageContext, VectorStoreIndex from llama_index.node_parser import SimpleNodeParser -from llama_index.vector_stores import VectorStoreQuery, VectorStoreQueryResult +from llama_index.vector_stores import ( + FilterOperator, + MetadataFilter, + MetadataFilters, + VectorStoreQuery, + VectorStoreQueryResult, +) from unstract.adapters.exceptions import AdapterError from unstract.adapters.x2text.x2text_adapter import X2TextAdapter from unstract.sdk.constants import LogLevel, ToolEnv from unstract.sdk.embedding import ToolEmbedding -from unstract.sdk.exceptions import SdkError +from unstract.sdk.exceptions import IndexingError, SdkError from unstract.sdk.tool.base import BaseTool from unstract.sdk.utils import ToolUtils from unstract.sdk.utils.service_context import ServiceContext @@ -172,18 +178,21 @@ def index_file( ) raise SdkError(f"Error loading {vector_db}") - filter = [{"field_name": "doc_id", "operator": "=", "value": doc_id}] + doc_id_eq_filter = MetadataFilter.from_dict( + {"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id} + ) + filters = MetadataFilters(filters=[doc_id_eq_filter]) q = VectorStoreQuery( query_embedding=embedding_li.get_query_embedding(" "), doc_ids=[doc_id], - filters=filter, + filters=filters, ) - doc_id_not_found = True + doc_id_found = False try: n: VectorStoreQueryResult = vector_db_li.query(query=q) if len(n.nodes) > 0: - doc_id_not_found = False + doc_id_found = True self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}") else: self.tool.stream_log(f"No nodes found for {doc_id}") @@ -192,7 +201,7 @@ def index_file( f"Error querying {vector_db}: {e}", level=LogLevel.ERROR ) - if doc_id_not_found is False and reindex: + if doc_id_found and reindex: # Delete the nodes for the doc_id try: vector_db_li.delete(ref_doc_id=doc_id) @@ -203,87 +212,91 @@ def index_file( level=LogLevel.ERROR, ) raise SdkError(f"Error deleting nodes for {doc_id}: {e}") - doc_id_not_found = True + doc_id_found = False - if doc_id_not_found: - self.tool.stream_log("Extracting text from input file") - full_text = [] - extracted_text = "" - try: - x2text = X2Text(tool=self.tool) - x2text_adapter_inst: X2TextAdapter = x2text.get_x2text( - adapter_instance_id=x2text_adapter - ) - extracted_text = x2text_adapter_inst.process( - input_file_path=file_path, output_file_path=output_file_path - ) - except AdapterError as e: - # Wrapping AdapterErrors with SdkError - raise SdkError(str(e)) from e - full_text.append( - { - "section": "full", - "text_contents": self._cleanup_text(extracted_text), - } + if doc_id_found: + self.tool.stream_log(f"File was indexed already under {doc_id}") + return doc_id + + # Extract text and index + self.tool.stream_log("Extracting text from input file") + full_text = [] + extracted_text = "" + try: + x2text = X2Text(tool=self.tool) + x2text_adapter_inst: X2TextAdapter = x2text.get_x2text( + adapter_instance_id=x2text_adapter + ) + extracted_text = x2text_adapter_inst.process( + input_file_path=file_path, output_file_path=output_file_path ) + except AdapterError as e: + # Wrapping AdapterErrors with SdkError + raise IndexingError(str(e)) from e + full_text.append( + { + "section": "full", + "text_contents": self._cleanup_text(extracted_text), + } + ) - # Check if chunking is required - documents = [] - for item in full_text: - text = item["text_contents"] - self.tool.stream_log("Indexing file...") - document = Document( - text=text, - doc_id=doc_id, - metadata={"section": item["section"]}, - ) - document.id_ = doc_id - documents.append(document) - self.tool.stream_log(f"Number of documents: {len(documents)}") - if chunk_size == 0: - parser = SimpleNodeParser.from_defaults( - chunk_size=len(documents[0].text) + 10, chunk_overlap=0 - ) - nodes = parser.get_nodes_from_documents( - documents, show_progress=True - ) - node = nodes[0] - node.embedding = embedding_li.get_query_embedding(" ") - vector_db_li.add(nodes=[node]) - self.tool.stream_log("Added node to vector db") - else: - storage_context = StorageContext.from_defaults( - vector_store=vector_db_li - ) - parser = SimpleNodeParser.from_defaults( - chunk_size=chunk_size, chunk_overlap=chunk_overlap - ) + # Check if chunking is required + documents = [] + for item in full_text: + text = item["text_contents"] + self.tool.stream_log("Indexing file...") + document = Document( + text=text, + doc_id=doc_id, + metadata={"section": item["section"]}, + ) + document.id_ = doc_id + documents.append(document) + self.tool.stream_log(f"Number of documents: {len(documents)}") + if chunk_size == 0: + parser = SimpleNodeParser.from_defaults( + chunk_size=len(documents[0].text) + 10, chunk_overlap=0 + ) + nodes = parser.get_nodes_from_documents( + documents, show_progress=True + ) + node = nodes[0] + node.embedding = embedding_li.get_query_embedding(" ") + vector_db_li.add(nodes=[node]) + self.tool.stream_log("Added node to vector db") + else: + storage_context = StorageContext.from_defaults( + vector_store=vector_db_li + ) + parser = SimpleNodeParser.from_defaults( + chunk_size=chunk_size, chunk_overlap=chunk_overlap + ) - service_context = ServiceContext.get_service_context( - platform_api_key=self.tool.get_env_or_die( - ToolEnv.PLATFORM_API_KEY - ), - embed_model=embedding_li, - node_parser=parser, - ) + service_context = ServiceContext.get_service_context( + platform_api_key=self.tool.get_env_or_die( + ToolEnv.PLATFORM_API_KEY + ), + embed_model=embedding_li, + node_parser=parser, + ) - self.tool.stream_log("Adding nodes to vector db...") - try: - VectorStoreIndex.from_documents( - documents, - storage_context=storage_context, - show_progress=True, - service_context=service_context, - ) - except Exception as e: - self.tool.stream_log( - f"Error adding nodes to vector db: {e}", - level=LogLevel.ERROR, - ) - raise SdkError(f"Error adding nodes to vector db: {e}") - self.tool.stream_log("Added nodes to vector db") + self.tool.stream_log("Adding nodes to vector db...") + try: + VectorStoreIndex.from_documents( + documents, + storage_context=storage_context, + show_progress=True, + service_context=service_context, + ) + except Exception as e: + self.tool.stream_log( + f"Error adding nodes to vector db: {e}", + level=LogLevel.ERROR, + ) + raise SdkError(f"Error adding nodes to vector db: {e}") + self.tool.stream_log("Added nodes to vector db") - self.tool.stream_log("File has been indexed successfully") + self.tool.stream_log("File has been indexed successfully") return doc_id @staticmethod diff --git a/src/unstract/sdk/tool/executor.py b/src/unstract/sdk/tool/executor.py index 0bbf6d29..08660bae 100644 --- a/src/unstract/sdk/tool/executor.py +++ b/src/unstract/sdk/tool/executor.py @@ -4,6 +4,8 @@ from pathlib import Path from typing import Any +from unstract.adapters import get_adapter_version + from unstract.sdk import get_sdk_version from unstract.sdk.constants import Command from unstract.sdk.tool.base import BaseTool @@ -55,9 +57,10 @@ def execute_run(self, args: argparse.Namespace) -> None: self.tool.stream_log( f"Running tool for " - f"Workflow ID: {self.tool.workflow_id} " - f"Execution ID: {self.tool.execution_id} " - f"SDK Version: {get_sdk_version()}" + f"Workflow ID: {self.tool.workflow_id}, " + f"Execution ID: {self.tool.execution_id}, " + f"SDK Version: {get_sdk_version()}, " + f"adapter Version: {get_adapter_version()}" ) self.tool.run( settings=settings,