Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contrib: update to LanceDB contrib #533

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions contrib/hamilton/contrib/user/zilto/lancedb_vdb/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# Purpose of this module

This module implements simple operations to interact with LanceDB.
This module implements vector and full-text search using LanceDB.

# Configuration Options
This module doesn't receive any configuration.

## Inputs:
- `url`: The url to the local LanceDB instance.
- `table_name`: The name of the table to interact with.
- `schema`: To create a new table, you need to specify a pyarrow schema.
- `overwrite_table`: Allows you to overwrite an existing table.

- `vector_query`: The embedding vector of the text query.
- `full_text_query`: The text content to search for in the columns `full_text_index`.

# Limitations
- `push_data()` and `delete_data()` currently return the number of rows added and deleted, which requires reading the table in a Pyarrow table. This could impact performance if the table gets very large or push / delete are highly frequent.
- Full-text search needs to rebuild the index to include newly added data. By default `rebuild_index=True` will rebuild the index on each call to `full_text_search()` for safety. Pass `rebuild_index=False` when making multiple search queries without adding new data.
- `insert()` and `delete()` returns the number of rows added and deleted, which requires reading the table in a Pyarrow table. This could impact performance if the table gets very large or push / delete are highly frequent.
119 changes: 107 additions & 12 deletions contrib/hamilton/contrib/user/zilto/lancedb_vdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Any, Dict, List
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Union

logger = logging.getLogger(__name__)

Expand All @@ -8,19 +9,43 @@
with contrib.catch_import_errors(__name__, __file__, logger):
import pyarrow as pa
import lancedb
import numpy as np
import pandas as pd
from lancedb.pydantic import LanceModel

from hamilton.function_modifiers import tag

VectorType = Union[list, np.ndarray, pa.Array, pa.ChunkedArray]
DataType = Union[Dict, List[Dict], pd.DataFrame, pa.Table, Iterable[pa.RecordBatch]]
TableSchema = Union[pa.Schema, LanceModel]

def vdb_client(uri: str = "./.lancedb") -> lancedb.DBConnection:

def client(uri: Union[str, Path] = "./.lancedb") -> lancedb.DBConnection:
"""Create a LanceDB connection.

:param uri: path to local LanceDB
:return: connection to LanceDB instance.
"""
return lancedb.connect(uri=uri)


def _create_table(
client: lancedb.DBConnection,
table_name: str,
schema: Optional[TableSchema] = None,
overwrite_table: bool = False,
) -> lancedb.db.LanceTable:
"""Create a new table based on schema."""
mode = "overwrite" if overwrite_table else "create"
table = client.create_table(name=table_name, schema=schema, mode=mode)
return table


@tag(side_effect="True")
def table_ref(
vdb_client: lancedb.DBConnection,
client: lancedb.DBConnection,
table_name: str,
schema: pa.Schema,
schema: Optional[TableSchema] = None,
overwrite_table: bool = False,
) -> lancedb.db.LanceTable:
"""Create or reference a LanceDB table
Expand All @@ -33,31 +58,38 @@ def table_ref(
"""

try:
table = vdb_client.open_table(table_name)
table = client.open_table(table_name)
except FileNotFoundError:
mode = "overwrite" if overwrite_table else "create"
table = vdb_client.create_table(name=table_name, schema=schema, mode=mode)
if schema is None:
raise ValueError("`schema` must be provided to create table.")

table = _create_table(
client=client,
table_name=table_name,
schema=schema,
overwrite_table=overwrite_table,
)

return table


@tag(side_effect="True")
def reset_vdb(vdb_client: lancedb.DBConnection) -> Dict[str, List[str]]:
def reset(client: lancedb.DBConnection) -> Dict[str, List[str]]:
"""Drop all existing tables.

:param vdb_client: LanceDB connection.
:return: dictionary containing all the dropped tables.
"""
tables_dropped = []
for table_name in vdb_client.table_names():
vdb_client.drop_table(table_name)
for table_name in client.table_names():
client.drop_table(table_name)
tables_dropped.append(table_name)

return dict(tables_dropped=tables_dropped)


@tag(side_effect="True")
def push_data(table_ref: lancedb.db.LanceTable, data: Any) -> Dict:
def insert(table_ref: lancedb.db.LanceTable, data: DataType) -> Dict:
"""Push new data to the specified table.

:param table_ref: Reference to the LanceDB table.
Expand All @@ -72,7 +104,7 @@ def push_data(table_ref: lancedb.db.LanceTable, data: Any) -> Dict:


@tag(side_effect="True")
def delete_data(table_ref: lancedb.db.LanceTable, delete_expression: str) -> Dict:
def delete(table_ref: lancedb.db.LanceTable, delete_expression: str) -> Dict:
"""Delete existing data using an SQL expression.

:param table_ref: Reference to the LanceDB table.
Expand All @@ -84,3 +116,66 @@ def delete_data(table_ref: lancedb.db.LanceTable, delete_expression: str) -> Dic
n_rows_after = table_ref.to_arrow().shape[0]
n_rows_deleted = n_rows_before - n_rows_after
return dict(table=table_ref, n_rows_deleted=n_rows_deleted)


def vector_search(
table_ref: lancedb.db.LanceTable,
vector_query: VectorType,
columns: Optional[List[str]] = None,
where: Optional[str] = None,
prefilter_where: bool = False,
limit: int = 10,
) -> pd.DataFrame:
"""Search database using an embedding vector.

:param table_ref: table to search
:param vector_query: embedding of the query
:param columns: columns to include in the results
:param where: SQL where clause to pre- or post-filter results
:param prefilter_where: If True filter rows before search else filter after search
:param limit: number of rows to return
:return: A dataframe of results
"""
query_ = (
table_ref.search(
query=vector_query,
query_type="vector",
vector_column_name="vector",
)
.select(columns=columns)
.where(where, prefilter=prefilter_where)
.limit(limit=limit)
)
return query_.to_pandas()


def full_text_search(
table_ref: lancedb.db.LanceTable,
full_text_query: str,
full_text_index: Union[str, List[str]],
where: Optional[str] = None,
limit: int = 10,
rebuild_index: bool = True,
) -> pd.DataFrame:
"""Search database using an embedding vector.

:param table_ref: table to search
:param full_text_query: text query
:param full_text_index: one or more text columns to search
:param where: SQL where clause to pre- or post-filter results
:param limit: number of rows to return
:param rebuild_index: If True rebuild the index
:return: A dataframe of results
"""
# NOTE. Currently, the index needs to be recreated whenever data is added
# ref: https://lancedb.github.io/lancedb/fts/#installation
if rebuild_index:
table_ref.create_fts_index(full_text_index)

query_ = (
table_ref.search(query=full_text_query, query_type="fts")
.select(full_text_index)
.where(where)
.limit(limit)
)
return query_.to_pandas()
Binary file modified contrib/hamilton/contrib/user/zilto/lancedb_vdb/dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
lancedb
numpy
pyarrow
sf-hamilton[visualization]
Loading