Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/unstract/sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ def __init__(self, message: str = DEFAULT_MESSAGE):

def __str__(self) -> str:
return self.message


class IndexingError(SdkError):
def __init__(self, message: str = ""):
prefix = "Error with indexing. "
super().__init__(prefix + message)
177 changes: 95 additions & 82 deletions src/unstract/sdk/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@

from llama_index import Document, StorageContext, VectorStoreIndex
from llama_index.node_parser import SimpleNodeParser
from llama_index.vector_stores import VectorStoreQuery, VectorStoreQueryResult
from llama_index.vector_stores import (
FilterOperator,
MetadataFilter,
MetadataFilters,
VectorStoreQuery,
VectorStoreQueryResult,
)
from unstract.adapters.exceptions import AdapterError
from unstract.adapters.x2text.x2text_adapter import X2TextAdapter

from unstract.sdk.constants import LogLevel, ToolEnv
from unstract.sdk.embedding import ToolEmbedding
from unstract.sdk.exceptions import SdkError
from unstract.sdk.exceptions import IndexingError, SdkError
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils import ToolUtils
from unstract.sdk.utils.service_context import ServiceContext
Expand Down Expand Up @@ -172,18 +178,21 @@ def index_file(
)
raise SdkError(f"Error loading {vector_db}")

filter = [{"field_name": "doc_id", "operator": "=", "value": doc_id}]
doc_id_eq_filter = MetadataFilter.from_dict(
{"key": "doc_id", "operator": FilterOperator.EQ, "value": doc_id}
)
filters = MetadataFilters(filters=[doc_id_eq_filter])
q = VectorStoreQuery(
query_embedding=embedding_li.get_query_embedding(" "),
doc_ids=[doc_id],
filters=filter,
filters=filters,
)

doc_id_not_found = True
doc_id_found = False
try:
n: VectorStoreQueryResult = vector_db_li.query(query=q)
if len(n.nodes) > 0:
doc_id_not_found = False
doc_id_found = True
self.tool.stream_log(f"Found {len(n.nodes)} nodes for {doc_id}")
else:
self.tool.stream_log(f"No nodes found for {doc_id}")
Expand All @@ -192,7 +201,7 @@ def index_file(
f"Error querying {vector_db}: {e}", level=LogLevel.ERROR
)

if doc_id_not_found is False and reindex:
if doc_id_found and reindex:
# Delete the nodes for the doc_id
try:
vector_db_li.delete(ref_doc_id=doc_id)
Expand All @@ -203,87 +212,91 @@ def index_file(
level=LogLevel.ERROR,
)
raise SdkError(f"Error deleting nodes for {doc_id}: {e}")
doc_id_not_found = True
doc_id_found = False

if doc_id_not_found:
self.tool.stream_log("Extracting text from input file")
full_text = []
extracted_text = ""
try:
x2text = X2Text(tool=self.tool)
x2text_adapter_inst: X2TextAdapter = x2text.get_x2text(
adapter_instance_id=x2text_adapter
)
extracted_text = x2text_adapter_inst.process(
input_file_path=file_path, output_file_path=output_file_path
)
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise SdkError(str(e)) from e
full_text.append(
{
"section": "full",
"text_contents": self._cleanup_text(extracted_text),
}
if doc_id_found:
self.tool.stream_log(f"File was indexed already under {doc_id}")
return doc_id

# Extract text and index
self.tool.stream_log("Extracting text from input file")
full_text = []
extracted_text = ""
try:
x2text = X2Text(tool=self.tool)
x2text_adapter_inst: X2TextAdapter = x2text.get_x2text(
adapter_instance_id=x2text_adapter
)
extracted_text = x2text_adapter_inst.process(
input_file_path=file_path, output_file_path=output_file_path
)
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise IndexingError(str(e)) from e
full_text.append(
{
"section": "full",
"text_contents": self._cleanup_text(extracted_text),
}
)

# Check if chunking is required
documents = []
for item in full_text:
text = item["text_contents"]
self.tool.stream_log("Indexing file...")
document = Document(
text=text,
doc_id=doc_id,
metadata={"section": item["section"]},
)
document.id_ = doc_id
documents.append(document)
self.tool.stream_log(f"Number of documents: {len(documents)}")
if chunk_size == 0:
parser = SimpleNodeParser.from_defaults(
chunk_size=len(documents[0].text) + 10, chunk_overlap=0
)
nodes = parser.get_nodes_from_documents(
documents, show_progress=True
)
node = nodes[0]
node.embedding = embedding_li.get_query_embedding(" ")
vector_db_li.add(nodes=[node])
self.tool.stream_log("Added node to vector db")
else:
storage_context = StorageContext.from_defaults(
vector_store=vector_db_li
)
parser = SimpleNodeParser.from_defaults(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
# Check if chunking is required
documents = []
for item in full_text:
text = item["text_contents"]
self.tool.stream_log("Indexing file...")
document = Document(
text=text,
doc_id=doc_id,
metadata={"section": item["section"]},
)
document.id_ = doc_id
documents.append(document)
self.tool.stream_log(f"Number of documents: {len(documents)}")
if chunk_size == 0:
parser = SimpleNodeParser.from_defaults(
chunk_size=len(documents[0].text) + 10, chunk_overlap=0
)
nodes = parser.get_nodes_from_documents(
documents, show_progress=True
)
node = nodes[0]
node.embedding = embedding_li.get_query_embedding(" ")
vector_db_li.add(nodes=[node])
self.tool.stream_log("Added node to vector db")
else:
storage_context = StorageContext.from_defaults(
vector_store=vector_db_li
)
parser = SimpleNodeParser.from_defaults(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)

service_context = ServiceContext.get_service_context(
platform_api_key=self.tool.get_env_or_die(
ToolEnv.PLATFORM_API_KEY
),
embed_model=embedding_li,
node_parser=parser,
)
service_context = ServiceContext.get_service_context(
platform_api_key=self.tool.get_env_or_die(
ToolEnv.PLATFORM_API_KEY
),
embed_model=embedding_li,
node_parser=parser,
)

self.tool.stream_log("Adding nodes to vector db...")
try:
VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
show_progress=True,
service_context=service_context,
)
except Exception as e:
self.tool.stream_log(
f"Error adding nodes to vector db: {e}",
level=LogLevel.ERROR,
)
raise SdkError(f"Error adding nodes to vector db: {e}")
self.tool.stream_log("Added nodes to vector db")
self.tool.stream_log("Adding nodes to vector db...")
try:
VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
show_progress=True,
service_context=service_context,
)
except Exception as e:
self.tool.stream_log(
f"Error adding nodes to vector db: {e}",
level=LogLevel.ERROR,
)
raise SdkError(f"Error adding nodes to vector db: {e}")
self.tool.stream_log("Added nodes to vector db")

self.tool.stream_log("File has been indexed successfully")
self.tool.stream_log("File has been indexed successfully")
return doc_id

@staticmethod
Expand Down
9 changes: 6 additions & 3 deletions src/unstract/sdk/tool/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pathlib import Path
from typing import Any

from unstract.adapters import get_adapter_version

from unstract.sdk import get_sdk_version
from unstract.sdk.constants import Command
from unstract.sdk.tool.base import BaseTool
Expand Down Expand Up @@ -55,9 +57,10 @@ def execute_run(self, args: argparse.Namespace) -> None:

self.tool.stream_log(
f"Running tool for "
f"Workflow ID: {self.tool.workflow_id} "
f"Execution ID: {self.tool.execution_id} "
f"SDK Version: {get_sdk_version()}"
f"Workflow ID: {self.tool.workflow_id}, "
f"Execution ID: {self.tool.execution_id}, "
f"SDK Version: {get_sdk_version()}, "
f"adapter Version: {get_adapter_version()}"
)
self.tool.run(
settings=settings,
Expand Down