From 48721b23a572c03d5923311cd44d64f73ec6585a Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Mon, 30 Sep 2024 23:18:52 +0530 Subject: [PATCH 1/3] Changes to measure time taken in index and support for other operations --- src/unstract/sdk/__init__.py | 2 +- src/unstract/sdk/index.py | 136 ++++++++++++++++--------- src/unstract/sdk/utils/common_utils.py | 40 ++++++++ 3 files changed, 129 insertions(+), 49 deletions(-) diff --git a/src/unstract/sdk/__init__.py b/src/unstract/sdk/__init__.py index 47cfb4b2..f83eb845 100644 --- a/src/unstract/sdk/__init__.py +++ b/src/unstract/sdk/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.49.0" +__version__ = "0.50.0" def get_sdk_version(): diff --git a/src/unstract/sdk/index.py b/src/unstract/sdk/index.py index b5656f45..f10326df 100644 --- a/src/unstract/sdk/index.py +++ b/src/unstract/sdk/index.py @@ -23,6 +23,7 @@ from unstract.sdk.exceptions import IndexingError, SdkError from unstract.sdk.tool.base import BaseTool from unstract.sdk.utils import ToolUtils +from unstract.sdk.utils.common_utils import log_elapsed from unstract.sdk.vector_db import VectorDB from unstract.sdk.x2txt import X2Text @@ -104,6 +105,81 @@ def query_index( finally: vector_db.close() + @log_elapsed(operation="EXTRACTION") + def extract_text( + self, + x2text_instance_id: str, + file_path: str, + output_file_path: Optional[str] = None, + enable_highlight: bool = False, + usage_kwargs: dict[Any, Any] = {}, + process_text: Optional[Callable[[str], str]] = None, + ) -> str: + """Extracts text from a document. + + Uses the configured service to perform the extraction + - LLM Whisperer + - Unstructured IO Community / Enterprise + - Llama Parse + + Args: + x2text_instance_id (str): UUID of the text extractor + file_path (str): Path to the file + output_file_path (Optional[str], optional): File path to write + the extracted contents into. Defaults to None. + enable_highlight (bool, optional): Flag to provide highlighting metadata. + Defaults to False. + usage_kwargs (dict[Any, Any], optional): Dict to capture usage. + Defaults to {}. + process_text (Optional[Callable[[str], str]], optional): Optional function + to post-process the text. Defaults to None. + + Raises: + IndexingError: _description_ + """ + # Extract text and index + self.tool.stream_log("Extracting text from input file") + extracted_text = "" + try: + x2text = X2Text( + tool=self.tool, + adapter_instance_id=x2text_instance_id, + usage_kwargs=usage_kwargs, + ) + if enable_highlight and isinstance(x2text._x2text_instance, LLMWhisperer): + process_response: TextExtractionResult = x2text.process( + input_file_path=file_path, + output_file_path=output_file_path, + enable_highlight=enable_highlight, + ) + whisper_hash_value = process_response.extraction_metadata.whisper_hash + + metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value} + + self.tool.update_exec_metadata(metadata) + + else: + process_response: TextExtractionResult = x2text.process( + input_file_path=file_path, + output_file_path=output_file_path, + ) + + extracted_text = process_response.extracted_text + except AdapterError as e: + # Wrapping AdapterErrors with SdkError + raise IndexingError(str(e)) from e + if process_text: + try: + result = process_text(extracted_text) + if isinstance(result, str): + extracted_text = result + else: + logger.warning("'process_text' is expected to return an 'str'") + except Exception as e: + logger.error(f"Error occured inside function 'process_text': {e}") + return extracted_text + + @log_elapsed(operation="INDEXING(might include EXTRACTION)") def index( self, tool_id: str, @@ -207,58 +283,23 @@ def index( self.tool.stream_log(f"File was indexed already under {doc_id}") return doc_id - # Extract text and index - self.tool.stream_log("Extracting text from input file") - full_text = [] - extracted_text = "" - try: - x2text = X2Text( - tool=self.tool, - adapter_instance_id=x2text_instance_id, - usage_kwargs=usage_kwargs, - ) - if enable_highlight and isinstance( - x2text._x2text_instance, LLMWhisperer - ): - process_response: TextExtractionResult = x2text.process( - input_file_path=file_path, - output_file_path=output_file_path, - enable_highlight=enable_highlight, - ) - whisper_hash_value = ( - process_response.extraction_metadata.whisper_hash - ) - - metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value} - - self.tool.update_exec_metadata(metadata) - - else: - process_response: TextExtractionResult = x2text.process( - input_file_path=file_path, - output_file_path=output_file_path, - ) + extracted_text = self.extract_text( + x2text_instance_id=x2text_instance_id, + file_path=file_path, + output_file_path=output_file_path, + enable_highlight=enable_highlight, + usage_kwargs=usage_kwargs, + process_text=process_text, + ) + if not extracted_text: + raise IndexingError("No text available to index") - extracted_text = process_response.extracted_text - except AdapterError as e: - # Wrapping AdapterErrors with SdkError - raise IndexingError(str(e)) from e - if process_text: - try: - result = process_text(extracted_text) - if isinstance(result, str): - extracted_text = result - except Exception as e: - logger.error(f"Error occured inside function 'process_text': {e}") - full_text.append( + full_text = [ { "section": "full", "text_contents": extracted_text, } - ) - - if not extracted_text: - raise IndexingError("No text available to index") + ] # Check if chunking is required documents = [] @@ -324,7 +365,6 @@ def index( level=LogLevel.ERROR, ) raise IndexingError(str(e)) from e - self.tool.stream_log("Added nodes to vector db") self.tool.stream_log("File has been indexed successfully") return doc_id diff --git a/src/unstract/sdk/utils/common_utils.py b/src/unstract/sdk/utils/common_utils.py index 20a05f5b..cab5373a 100644 --- a/src/unstract/sdk/utils/common_utils.py +++ b/src/unstract/sdk/utils/common_utils.py @@ -1,8 +1,48 @@ +import functools +import logging +import time import uuid +from unstract.sdk.constants import LogLevel + +logger = logging.getLogger(__name__) + class CommonUtils: @staticmethod def generate_uuid() -> str: """Class method to get uuid.""" return str(uuid.uuid4()) + + +# Mapping from python log level to Unstract counterpart +PY_TO_UNSTRACT_LOG_LEVEL = { + logging.DEBUG: LogLevel.DEBUG, + logging.INFO: LogLevel.INFO, + logging.WARNING: LogLevel.WARN, + logging.ERROR: LogLevel.ERROR, +} + + +def log_elapsed(operation): + """Adds an elapsed time log. + + Args: + operation (str): Operation being measured + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + try: + result = func(*args, **kwargs) + finally: + end_time = time.time() + elapsed_time = end_time - start_time + logger.debug(f"Time taken for '{operation}': {elapsed_time:.3f}s") + return result + + return wrapper + + return decorator From 18f5f80a5efbaafbbb12ec2cf6da3082f376a6bf Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 1 Oct 2024 09:37:34 +0530 Subject: [PATCH 2/3] Minor docstring fix --- src/unstract/sdk/index.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/unstract/sdk/index.py b/src/unstract/sdk/index.py index f10326df..a8b99c48 100644 --- a/src/unstract/sdk/index.py +++ b/src/unstract/sdk/index.py @@ -135,9 +135,8 @@ def extract_text( to post-process the text. Defaults to None. Raises: - IndexingError: _description_ + IndexingError: Errors during text extraction """ - # Extract text and index self.tool.stream_log("Extracting text from input file") extracted_text = "" try: From 5d4ef6f9146a5d668c88aa73b4009c596b733b81 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 1 Oct 2024 11:58:04 +0530 Subject: [PATCH 3/3] Updated debug log to info for logging timing --- src/unstract/sdk/utils/common_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unstract/sdk/utils/common_utils.py b/src/unstract/sdk/utils/common_utils.py index cab5373a..568b9e2c 100644 --- a/src/unstract/sdk/utils/common_utils.py +++ b/src/unstract/sdk/utils/common_utils.py @@ -40,7 +40,7 @@ def wrapper(*args, **kwargs): finally: end_time = time.time() elapsed_time = end_time - start_time - logger.debug(f"Time taken for '{operation}': {elapsed_time:.3f}s") + logger.info(f"Time taken for '{operation}': {elapsed_time:.3f}s") return result return wrapper