Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 87 additions & 48 deletions src/unstract/sdk/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from unstract.sdk.exceptions import IndexingError, SdkError
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils import ToolUtils
from unstract.sdk.utils.common_utils import log_elapsed
from unstract.sdk.vector_db import VectorDB
from unstract.sdk.x2txt import X2Text

Expand Down Expand Up @@ -104,6 +105,80 @@ def query_index(
finally:
vector_db.close()

@log_elapsed(operation="EXTRACTION")
def extract_text(
self,
x2text_instance_id: str,
file_path: str,
output_file_path: Optional[str] = None,
enable_highlight: bool = False,
usage_kwargs: dict[Any, Any] = {},
process_text: Optional[Callable[[str], str]] = None,
) -> str:
"""Extracts text from a document.

Uses the configured service to perform the extraction
- LLM Whisperer
- Unstructured IO Community / Enterprise
- Llama Parse

Args:
x2text_instance_id (str): UUID of the text extractor
file_path (str): Path to the file
output_file_path (Optional[str], optional): File path to write
the extracted contents into. Defaults to None.
enable_highlight (bool, optional): Flag to provide highlighting metadata.
Defaults to False.
usage_kwargs (dict[Any, Any], optional): Dict to capture usage.
Defaults to {}.
process_text (Optional[Callable[[str], str]], optional): Optional function
to post-process the text. Defaults to None.

Raises:
IndexingError: Errors during text extraction
"""
self.tool.stream_log("Extracting text from input file")
extracted_text = ""
try:
x2text = X2Text(
tool=self.tool,
adapter_instance_id=x2text_instance_id,
usage_kwargs=usage_kwargs,
)
if enable_highlight and isinstance(x2text._x2text_instance, LLMWhisperer):
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
)
whisper_hash_value = process_response.extraction_metadata.whisper_hash

metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value}

self.tool.update_exec_metadata(metadata)

else:
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
)

extracted_text = process_response.extracted_text
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise IndexingError(str(e)) from e
if process_text:
try:
result = process_text(extracted_text)
if isinstance(result, str):
extracted_text = result
else:
logger.warning("'process_text' is expected to return an 'str'")
except Exception as e:
logger.error(f"Error occured inside function 'process_text': {e}")
return extracted_text

@log_elapsed(operation="INDEXING(might include EXTRACTION)")
def index(
self,
tool_id: str,
Expand Down Expand Up @@ -207,58 +282,23 @@ def index(
self.tool.stream_log(f"File was indexed already under {doc_id}")
return doc_id

# Extract text and index
self.tool.stream_log("Extracting text from input file")
full_text = []
extracted_text = ""
try:
x2text = X2Text(
tool=self.tool,
adapter_instance_id=x2text_instance_id,
usage_kwargs=usage_kwargs,
)
if enable_highlight and isinstance(
x2text._x2text_instance, LLMWhisperer
):
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
)
whisper_hash_value = (
process_response.extraction_metadata.whisper_hash
)

metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value}

self.tool.update_exec_metadata(metadata)

else:
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
)
extracted_text = self.extract_text(
x2text_instance_id=x2text_instance_id,
file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
usage_kwargs=usage_kwargs,
process_text=process_text,
)
if not extracted_text:
raise IndexingError("No text available to index")

extracted_text = process_response.extracted_text
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise IndexingError(str(e)) from e
if process_text:
try:
result = process_text(extracted_text)
if isinstance(result, str):
extracted_text = result
except Exception as e:
logger.error(f"Error occured inside function 'process_text': {e}")
full_text.append(
full_text = [
{
"section": "full",
"text_contents": extracted_text,
}
)

if not extracted_text:
raise IndexingError("No text available to index")
]

# Check if chunking is required
documents = []
Expand Down Expand Up @@ -324,7 +364,6 @@ def index(
level=LogLevel.ERROR,
)
raise IndexingError(str(e)) from e
self.tool.stream_log("Added nodes to vector db")

self.tool.stream_log("File has been indexed successfully")
return doc_id
Expand Down
26 changes: 26 additions & 0 deletions src/unstract/sdk/utils/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import functools
import logging
import time
import uuid

from unstract.sdk.constants import LogLevel
Expand All @@ -20,3 +22,27 @@ def generate_uuid() -> str:
logging.WARNING: LogLevel.WARN,
logging.ERROR: LogLevel.ERROR,
}


def log_elapsed(operation):
"""Adds an elapsed time log.

Args:
operation (str): Operation being measured
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
finally:
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Time taken for '{operation}': {elapsed_time:.3f}s")
return result

return wrapper

return decorator