diff --git a/src/unstract/sdk/index.py b/src/unstract/sdk/index.py index b5656f45..a8b99c48 100644 --- a/src/unstract/sdk/index.py +++ b/src/unstract/sdk/index.py @@ -23,6 +23,7 @@ from unstract.sdk.exceptions import IndexingError, SdkError from unstract.sdk.tool.base import BaseTool from unstract.sdk.utils import ToolUtils +from unstract.sdk.utils.common_utils import log_elapsed from unstract.sdk.vector_db import VectorDB from unstract.sdk.x2txt import X2Text @@ -104,6 +105,80 @@ def query_index( finally: vector_db.close() + @log_elapsed(operation="EXTRACTION") + def extract_text( + self, + x2text_instance_id: str, + file_path: str, + output_file_path: Optional[str] = None, + enable_highlight: bool = False, + usage_kwargs: dict[Any, Any] = {}, + process_text: Optional[Callable[[str], str]] = None, + ) -> str: + """Extracts text from a document. + + Uses the configured service to perform the extraction + - LLM Whisperer + - Unstructured IO Community / Enterprise + - Llama Parse + + Args: + x2text_instance_id (str): UUID of the text extractor + file_path (str): Path to the file + output_file_path (Optional[str], optional): File path to write + the extracted contents into. Defaults to None. + enable_highlight (bool, optional): Flag to provide highlighting metadata. + Defaults to False. + usage_kwargs (dict[Any, Any], optional): Dict to capture usage. + Defaults to {}. + process_text (Optional[Callable[[str], str]], optional): Optional function + to post-process the text. Defaults to None. + + Raises: + IndexingError: Errors during text extraction + """ + self.tool.stream_log("Extracting text from input file") + extracted_text = "" + try: + x2text = X2Text( + tool=self.tool, + adapter_instance_id=x2text_instance_id, + usage_kwargs=usage_kwargs, + ) + if enable_highlight and isinstance(x2text._x2text_instance, LLMWhisperer): + process_response: TextExtractionResult = x2text.process( + input_file_path=file_path, + output_file_path=output_file_path, + enable_highlight=enable_highlight, + ) + whisper_hash_value = process_response.extraction_metadata.whisper_hash + + metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value} + + self.tool.update_exec_metadata(metadata) + + else: + process_response: TextExtractionResult = x2text.process( + input_file_path=file_path, + output_file_path=output_file_path, + ) + + extracted_text = process_response.extracted_text + except AdapterError as e: + # Wrapping AdapterErrors with SdkError + raise IndexingError(str(e)) from e + if process_text: + try: + result = process_text(extracted_text) + if isinstance(result, str): + extracted_text = result + else: + logger.warning("'process_text' is expected to return an 'str'") + except Exception as e: + logger.error(f"Error occured inside function 'process_text': {e}") + return extracted_text + + @log_elapsed(operation="INDEXING(might include EXTRACTION)") def index( self, tool_id: str, @@ -207,58 +282,23 @@ def index( self.tool.stream_log(f"File was indexed already under {doc_id}") return doc_id - # Extract text and index - self.tool.stream_log("Extracting text from input file") - full_text = [] - extracted_text = "" - try: - x2text = X2Text( - tool=self.tool, - adapter_instance_id=x2text_instance_id, - usage_kwargs=usage_kwargs, - ) - if enable_highlight and isinstance( - x2text._x2text_instance, LLMWhisperer - ): - process_response: TextExtractionResult = x2text.process( - input_file_path=file_path, - output_file_path=output_file_path, - enable_highlight=enable_highlight, - ) - whisper_hash_value = ( - process_response.extraction_metadata.whisper_hash - ) - - metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value} - - self.tool.update_exec_metadata(metadata) - - else: - process_response: TextExtractionResult = x2text.process( - input_file_path=file_path, - output_file_path=output_file_path, - ) + extracted_text = self.extract_text( + x2text_instance_id=x2text_instance_id, + file_path=file_path, + output_file_path=output_file_path, + enable_highlight=enable_highlight, + usage_kwargs=usage_kwargs, + process_text=process_text, + ) + if not extracted_text: + raise IndexingError("No text available to index") - extracted_text = process_response.extracted_text - except AdapterError as e: - # Wrapping AdapterErrors with SdkError - raise IndexingError(str(e)) from e - if process_text: - try: - result = process_text(extracted_text) - if isinstance(result, str): - extracted_text = result - except Exception as e: - logger.error(f"Error occured inside function 'process_text': {e}") - full_text.append( + full_text = [ { "section": "full", "text_contents": extracted_text, } - ) - - if not extracted_text: - raise IndexingError("No text available to index") + ] # Check if chunking is required documents = [] @@ -324,7 +364,6 @@ def index( level=LogLevel.ERROR, ) raise IndexingError(str(e)) from e - self.tool.stream_log("Added nodes to vector db") self.tool.stream_log("File has been indexed successfully") return doc_id diff --git a/src/unstract/sdk/utils/common_utils.py b/src/unstract/sdk/utils/common_utils.py index 72015c5c..568b9e2c 100644 --- a/src/unstract/sdk/utils/common_utils.py +++ b/src/unstract/sdk/utils/common_utils.py @@ -1,4 +1,6 @@ +import functools import logging +import time import uuid from unstract.sdk.constants import LogLevel @@ -20,3 +22,27 @@ def generate_uuid() -> str: logging.WARNING: LogLevel.WARN, logging.ERROR: LogLevel.ERROR, } + + +def log_elapsed(operation): + """Adds an elapsed time log. + + Args: + operation (str): Operation being measured + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start_time = time.time() + try: + result = func(*args, **kwargs) + finally: + end_time = time.time() + elapsed_time = end_time - start_time + logger.info(f"Time taken for '{operation}': {elapsed_time:.3f}s") + return result + + return wrapper + + return decorator