Skip to content

Commit

Permalink
fix: create index from single document (#1073)
Browse files Browse the repository at this point in the history
Use a runtime `row_number` to build the index by incorporating design
discussions from #912 and
#868.
  • Loading branch information
jiashenC committed Sep 8, 2023
1 parent f8eb83a commit f0dd533
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 31 deletions.
5 changes: 5 additions & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@

from evadb.utils.generic_utils import is_postgres_uri, parse_config_yml

# Permanent identifier column.
IDENTIFIER_COLUMN = "_row_id"

# Runtime generated column.
ROW_NUM_COLUMN = "_row_number"
ROW_NUM_MAGIC = 0xFFFFFFFF

CATALOG_TABLES = [
"column_catalog",
"table_catalog",
Expand Down
6 changes: 3 additions & 3 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
Expand Down Expand Up @@ -87,7 +87,7 @@ def _create_index(self):
# array. Use zero index to get the actual numpy array.
feat = input_batch.column_as_numpy_array(feat_col_name)

row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN)
row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)
Expand All @@ -103,7 +103,7 @@ def _create_index(self):
self.index.create(input_dim)

# Row ID for mapping back to the row.
self.index.add([FeaturePayload(row_id[i], row_feat)])
self.index.add([FeaturePayload(row_num[i], row_feat)])

# Persist index.
self.index.persist()
Expand Down
22 changes: 11 additions & 11 deletions evadb/executor/vector_index_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import handle_vector_store_params
Expand All @@ -27,11 +27,11 @@
from evadb.utils.logging_manager import logger


# Helper function for getting row_id column alias.
def get_row_id_column_alias(column_list):
# Helper function for getting row_num column alias.
def get_row_num_column_alias(column_list):
for column in column_list:
alias, col_name = column.split(".")
if col_name == IDENTIFIER_COLUMN:
if col_name == ROW_NUM_COLUMN:
return alias


Expand Down Expand Up @@ -74,10 +74,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
)
# todo support queries over distance as well
# distance_list = index_result.similarities
row_id_np = index_result.ids
row_num_np = index_result.ids

# Load projected columns from disk and join with search results.
row_id_col_name = None
row_num_col_name = None

# handle the case where the index_results are less than self.limit_count.value
num_required_results = self.limit_count.value
Expand All @@ -90,14 +90,14 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
res_row_list = [None for _ in range(num_required_results)]
for batch in self.children[0].exec(**kwargs):
column_list = batch.columns
if not row_id_col_name:
row_id_alias = get_row_id_column_alias(column_list)
row_id_col_name = "{}.{}".format(row_id_alias, IDENTIFIER_COLUMN)
if not row_num_col_name:
row_num_alias = get_row_num_column_alias(column_list)
row_num_col_name = "{}.{}".format(row_num_alias, ROW_NUM_COLUMN)

# Nested join.
for _, row in batch.frames.iterrows():
for idx, rid in enumerate(row_id_np):
if rid == row[row_id_col_name]:
for idx, row_num in enumerate(row_num_np):
if row_num == row[row_num_col_name]:
res_row = dict()
for col_name in column_list:
res_row[col_name] = row[col_name]
Expand Down
3 changes: 3 additions & 0 deletions evadb/readers/decord_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import numpy as np

from evadb.catalog.catalog_type import VideoColumnName
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.constants import AUDIORATE, IFRAMES
from evadb.expression.abstract_expression import AbstractExpression
from evadb.expression.expression_utils import extract_range_list_from_predicate
Expand Down Expand Up @@ -126,6 +127,7 @@ def __get_video_frame(self, frame_id):

return {
VideoColumnName.id.name: frame_id,
ROW_NUM_COLUMN: frame_id,
VideoColumnName.data.name: frame_video,
VideoColumnName.seconds.name: round(timestamp, 2),
}
Expand All @@ -136,6 +138,7 @@ def __get_audio_frame(self, frame_id):

return {
VideoColumnName.id.name: frame_id,
ROW_NUM_COLUMN: frame_id,
VideoColumnName.data.name: np.empty(0),
VideoColumnName.seconds.name: 0.0,
VideoColumnName.audio.name: frame_audio,
Expand Down
9 changes: 8 additions & 1 deletion evadb/readers/document/document_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pathlib import Path
from typing import Dict, Iterator

from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.readers.abstract_reader import AbstractReader
from evadb.readers.document.registry import (
_lazy_import_loader,
Expand Down Expand Up @@ -44,8 +45,14 @@ def _read(self) -> Iterator[Dict]:
chunk_size=self._chunk_size, chunk_overlap=self._chunk_overlap
)

row_num = 0
for data in loader.load():
for chunk_id, row in enumerate(
langchain_text_splitter.split_documents([data])
):
yield {"chunk_id": chunk_id, "data": row.page_content}
yield {
"chunk_id": chunk_id,
"data": row.page_content,
ROW_NUM_COLUMN: row_num,
}
row_num += 1
4 changes: 4 additions & 0 deletions evadb/readers/pdf_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
from typing import Dict, Iterator

from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.readers.abstract_reader import AbstractReader
from evadb.utils.generic_utils import try_to_import_fitz

Expand All @@ -35,6 +36,7 @@ def _read(self) -> Iterator[Dict]:
doc = fitz.open(self.file_url)

# PAGE ID, PARAGRAPH ID, STRING
row_num = 0
for page_no, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]
# iterate through the text blocks
Expand All @@ -51,7 +53,9 @@ def _read(self) -> Iterator[Dict]:
if span["text"].strip():
block_string += span["text"]
yield {
ROW_NUM_COLUMN: row_num,
"page": page_no + 1,
"paragraph": paragraph_no + 1,
"data": block_string,
}
row_num += 1
6 changes: 5 additions & 1 deletion evadb/storage/document_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.document.document_reader import DocumentReader
Expand All @@ -28,7 +29,7 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]:
for doc_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, file_name) in doc_files.iterrows():
for _, (row_id, file_name, _) in doc_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
doc_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
Expand All @@ -38,4 +39,7 @@ def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]:
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
4 changes: 3 additions & 1 deletion evadb/storage/image_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.image.opencv_image_reader import CVImageReader
Expand All @@ -28,12 +29,13 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
for image_files in self._rdb_handler.read(self._get_metadata_table(table)):
for _, (row_id, file_name) in image_files.iterrows():
for _, (row_id, file_name, _) in image_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
image_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
reader = CVImageReader(str(image_file), batch_mem_size=1)
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = batch.frames[table.columns[0].name]
yield batch
6 changes: 5 additions & 1 deletion evadb/storage/pdf_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.readers.pdf_reader import PDFReader
Expand All @@ -28,12 +29,15 @@ def __init__(self, db: EvaDBDatabase):

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
for image_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, file_name) in image_files.iterrows():
for _, (row_id, file_name, _) in image_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
image_file = Path(table.file_url) / system_file_name
# setting batch_mem_size = 1, we need fix it
reader = PDFReader(str(image_file), batch_mem_size=1)
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
19 changes: 15 additions & 4 deletions evadb/storage/sqlite_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.schema_utils import SchemaUtils
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, ROW_NUM_COLUMN
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.parser.table_ref import TableInfo
Expand Down Expand Up @@ -67,6 +67,7 @@ def _deserialize_sql_row(self, sql_row: dict, columns: List[ColumnCatalogEntry])
dict_row[col.name] = self._serializer.deserialize(sql_row[col.name])
else:
dict_row[col.name] = sql_row[col.name]
dict_row[ROW_NUM_COLUMN] = dict_row[IDENTIFIER_COLUMN]
return dict_row

def _try_loading_table_via_reflection(self, table_name: str):
Expand Down Expand Up @@ -94,7 +95,11 @@ def create(self, table: TableCatalogEntry, **kwargs):

# During table creation, assume row_id is automatically handled by
# the sqlalchemy engine.
table_columns = [col for col in table.columns if col.name != IDENTIFIER_COLUMN]
table_columns = [
col
for col in table.columns
if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
]
sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table_columns)
attr_dict.update(sqlalchemy_schema)

Expand Down Expand Up @@ -148,12 +153,18 @@ def write(self, table: TableCatalogEntry, rows: Batch):
# the sqlalchemy engine. Another assumption we make here is the
# updated data need not to take care of row_id.
table_columns = [
col for col in table.columns if col.name != IDENTIFIER_COLUMN
col
for col in table.columns
if (col.name != IDENTIFIER_COLUMN and col.name != ROW_NUM_COLUMN)
]

# Todo: validate the data type before inserting into the table
for record in rows.frames.values:
row_data = {col: record[idx] for idx, col in enumerate(columns)}
row_data = {
col: record[idx]
for idx, col in enumerate(columns)
if col != ROW_NUM_COLUMN
}
data.append(self._dict_to_sql_row(row_data, table_columns))
self._sql_session.execute(table_to_update.insert(), data)
self._sql_session.commit()
Expand Down
6 changes: 5 additions & 1 deletion evadb/storage/video_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Iterator

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import ROW_NUM_COLUMN, ROW_NUM_MAGIC
from evadb.database import EvaDBDatabase
from evadb.expression.abstract_expression import AbstractExpression
from evadb.models.storage.batch import Batch
Expand All @@ -39,7 +40,7 @@ def read(
read_video: bool = True,
) -> Iterator[Batch]:
for video_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, video_file_name) in video_files.iterrows():
for _, (row_id, video_file_name, _) in video_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(video_file_name)
video_file = Path(table.file_url) / system_file_name
# increase batch size when reading audio so that
Expand All @@ -58,4 +59,7 @@ def read(
for batch in reader.read():
batch.frames[table.columns[0].name] = row_id
batch.frames[table.columns[1].name] = str(video_file_name)
batch.frames[ROW_NUM_COLUMN] = (
row_id * ROW_NUM_MAGIC + batch.frames[ROW_NUM_COLUMN]
)
yield batch
Loading

0 comments on commit f0dd533

Please sign in to comment.