Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,44 @@ Following metrics are supported:
| L2Distance | [L2 distance (a.k.a. Euclidean distance)](https://en.wikipedia.org/wiki/Euclidean_distance) | Smaller is more similar |
| InnerProduct | [Inner product](https://en.wikipedia.org/wiki/Inner_product_space) | Larger is more similar |

### Full-Text Search (FTS) Index

*Full-text search index* is specified by `fts_indexes` (`Sequence[FtsIndexDef]`). `FtsIndexDef` has the following fields:

* `field_name`: the field to create FTS index.
* `parameters` (optional): a dictionary of parameters to pass to the target's FTS index creation. The supported parameters vary by target.

For example, with LanceDB:

<Tabs>
<TabItem value="python" label="Python" default>

```python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
demo_collector = data_scope.add_collector()
...
demo_collector.export(
"demo_target", DemoTargetSpec(...),
primary_key_fields=["id"],
fts_indexes=[
# Basic FTS index with default tokenizer
cocoindex.FtsIndexDef("content"),
# FTS index with custom tokenizer
cocoindex.FtsIndexDef("description", parameters={"language": "English"})
])
```

</TabItem>
</Tabs>

:::note

FTS indexes are currently only supported for LanceDB target on its enterprise edition. Other targets will raise an error if FTS indexes are specified.

:::

## Miscellaneous

### Getting App Namespace
Expand Down
45 changes: 39 additions & 6 deletions docs/docs/targets/lancedb.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Here's how CocoIndex data elements map to LanceDB elements during export:
| a collected row | a row |
| a field | a column |


::::info Installation and import

This target is provided via an optional dependency `[lancedb]`:
Expand All @@ -41,14 +40,15 @@ import cocoindex.targets.lancedb as coco_lancedb

The spec `coco_lancedb.LanceDB` takes the following fields:

* `db_uri` (`str`, required): The LanceDB database location (e.g. `./lancedb_data`).
* `table_name` (`str`, required): The name of the table to export the data to.
* `db_options` (`coco_lancedb.DatabaseOptions`, optional): Advanced database options.
* `storage_options` (`dict[str, Any]`, optional): Passed through to LanceDB when connecting.
* `db_uri` (`str`, required): The LanceDB database location (e.g. `./lancedb_data`).
* `table_name` (`str`, required): The name of the table to export the data to.
* `db_options` (`coco_lancedb.DatabaseOptions`, optional): Advanced database options.
* `storage_options` (`dict[str, Any]`, optional): Passed through to LanceDB when connecting.

Additional notes:

* Exactly one primary key field is required for LanceDB targets. We create B-Tree index on this key column.
* Exactly one primary key field is required for LanceDB targets. We create B-Tree index on this key column.
* **Full-Text Search (FTS) indexes** are supported via the `fts_indexes` parameter. Note that FTS functionality requires [LanceDB Enterprise](https://lancedb.com/docs/indexing/fts-index/). You can pass any parameters supported by the target's FTS index creation API (e.g., `tokenizer_name` for LanceDB). See [LanceDB FTS documentation](https://lancedb.com/docs/indexing/fts-index/) for full parameter details.

:::info

Expand All @@ -59,6 +59,38 @@ If you want to use vector indexes, you can run the flow once to populate the tar

You can find an end-to-end example here: [examples/text_embedding_lancedb](https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding_lancedb).

### FTS Index Example

```python
import cocoindex
import cocoindex.targets.lancedb as coco_lancedb

@cocoindex.flow_def(name="DocumentSearchFlow")
def document_search_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# ... source and transformations ...

doc_collector = data_scope.add_collector()
# ... collect document data ...

doc_collector.export(
"documents",
coco_lancedb.LanceDB(
db_uri="./lancedb_data",
table_name="documents"
),
primary_key_fields=["id"],
# Add FTS indexes for full-text search
fts_indexes=[
# Basic FTS index with default tokenizer
cocoindex.FtsIndexDef("content"),
# FTS index with stemming for better search recall
cocoindex.FtsIndexDef("description", parameters={"tokenizer_name": "en_stem"}),
# FTS index with position tracking for phrase searches
cocoindex.FtsIndexDef("title", parameters={"tokenizer_name": "default", "with_position": True})
]
)
```

## `connect_async()` helper

We provide a helper to obtain a shared `AsyncConnection` that is reused across your process and shared with CocoIndex's writer for strong read-after-write consistency:
Expand All @@ -85,6 +117,7 @@ Once `db_uri` matches, it automatically reuses the same connection instance with
This achieves strong consistency between your indexing and querying logic, if they run in the same process.

## Example

<ExampleButton
href="https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding_lancedb"
text="Text Embedding LanceDB Example"
Expand Down
5 changes: 5 additions & 0 deletions examples/text_embedding_lancedb/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ def text_embedding_flow(
coco_lancedb.LanceDB(db_uri=LANCEDB_URI, table_name=LANCEDB_TABLE),
primary_key_fields=["id"],
vector_indexes=vector_indexes,
fts_indexes=[
cocoindex.FtsIndexDef(
field_name="text", parameters={"tokenizer_name": "simple"}
)
],
)


Expand Down
2 changes: 2 additions & 0 deletions python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .lib import settings, init, start_server, stop
from .llm import LlmSpec, LlmApiType
from .index import (
FtsIndexDef,
VectorSimilarityMetric,
VectorIndexDef,
IndexOptions,
Expand Down Expand Up @@ -95,6 +96,7 @@
# Index
"VectorSimilarityMetric",
"VectorIndexDef",
"FtsIndexDef",
"IndexOptions",
"HnswVectorIndexMethod",
"IvfFlatVectorIndexMethod",
Expand Down
2 changes: 2 additions & 0 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ def export(
primary_key_fields: Sequence[str],
attachments: Sequence[op.TargetAttachmentSpec] = (),
vector_indexes: Sequence[index.VectorIndexDef] = (),
fts_indexes: Sequence[index.FtsIndexDef] = (),
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
setup_by_user: bool = False,
) -> None:
Expand All @@ -432,6 +433,7 @@ def export(
index_options = index.IndexOptions(
primary_key_fields=primary_key_fields,
vector_indexes=vector_indexes,
fts_indexes=fts_indexes,
)
self._flow_builder_state.engine_flow_builder.export(
target_name,
Expand Down
16 changes: 15 additions & 1 deletion python/cocoindex/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from dataclasses import dataclass
from typing import Sequence, Union
from typing import Sequence, Union, Any


class VectorSimilarityMetric(Enum):
Expand Down Expand Up @@ -40,6 +40,19 @@ class VectorIndexDef:
method: VectorIndexMethod | None = None


@dataclass
class FtsIndexDef:
"""
Define a full-text search index on a field.

The parameters field can contain any keyword arguments supported by the target's
FTS index creation API (e.g., tokenizer_name for LanceDB).
"""

field_name: str
parameters: dict[str, Any] | None = None


@dataclass
class IndexOptions:
"""
Expand All @@ -48,3 +61,4 @@ class IndexOptions:

primary_key_fields: Sequence[str]
vector_indexes: Sequence[VectorIndexDef] = ()
fts_indexes: Sequence[FtsIndexDef] = ()
46 changes: 45 additions & 1 deletion python/cocoindex/targets/lancedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
VectorTypeSchema,
TableType,
)
from ..index import VectorIndexDef, IndexOptions, VectorSimilarityMetric
from ..index import VectorIndexDef, FtsIndexDef, IndexOptions, VectorSimilarityMetric

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,11 +48,19 @@ class _VectorIndex:
metric: VectorSimilarityMetric


@dataclasses.dataclass
class _FtsIndex:
name: str
field_name: str
parameters: dict[str, Any] | None = None


@dataclasses.dataclass
class _State:
key_field_schema: FieldSchema
value_fields_schema: list[FieldSchema]
vector_indexes: list[_VectorIndex] | None = None
fts_indexes: list[_FtsIndex] | None = None
db_options: DatabaseOptions | None = None


Expand Down Expand Up @@ -318,6 +326,18 @@ def get_setup_state(
if index_options.vector_indexes is not None
else None
),
fts_indexes=(
[
_FtsIndex(
name=f"__{index.field_name}__fts__idx",
field_name=index.field_name,
parameters=index.parameters,
)
for index in index_options.fts_indexes
]
if index_options.fts_indexes is not None
else None
),
)

@staticmethod
Expand Down Expand Up @@ -412,6 +432,30 @@ async def apply_setup_change(
if vector_index_name in existing_vector_indexes:
await table.drop_index(vector_index_name)

# Handle FTS indexes
unseen_prev_fts_indexes = {
index.name for index in (previous and previous.fts_indexes) or []
}
existing_fts_indexes = {index.name for index in await table.list_indices()}

for fts_index in current.fts_indexes or []:
if fts_index.name in unseen_prev_fts_indexes:
unseen_prev_fts_indexes.remove(fts_index.name)
else:
try:
# Create FTS index using create_fts_index() API
# Pass parameters as kwargs to support any future FTS index options
kwargs = fts_index.parameters if fts_index.parameters else {}
await table.create_fts_index(fts_index.field_name, **kwargs)
except Exception as e: # pylint: disable=broad-exception-caught
raise RuntimeError(
f"Exception in creating FTS index on field {fts_index.field_name}: {e}"
) from e

for fts_index_name in unseen_prev_fts_indexes:
if fts_index_name in existing_fts_indexes:
await table.drop_index(fts_index_name)

@staticmethod
async def prepare(
spec: LanceDB,
Expand Down
32 changes: 31 additions & 1 deletion rust/cocoindex/src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,33 @@ impl fmt::Display for VectorIndexDef {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FtsIndexDef {
pub field_name: FieldName,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
}

impl fmt::Display for FtsIndexDef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.parameters {
None => write!(f, "{}", self.field_name),
Some(params) => {
let params_str = serde_json::to_string(params).unwrap_or_else(|_| "{}".to_string());
write!(f, "{}:{}", self.field_name, params_str)
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IndexOptions {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub primary_key_fields: Option<Vec<FieldName>>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub vector_indexes: Vec<VectorIndexDef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub fts_indexes: Vec<FtsIndexDef>,
}

impl IndexOptions {
Expand All @@ -490,7 +511,16 @@ impl fmt::Display for IndexOptions {
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(",");
write!(f, "keys={primary_keys}, indexes={vector_indexes}")
let fts_indexes = self
.fts_indexes
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",");
write!(
f,
"keys={primary_keys}, vector_indexes={vector_indexes}, fts_indexes={fts_indexes}"
)
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust/cocoindex/src/ops/targets/kuzu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,9 @@ impl TargetFactoryBase for Factory {
if !data_coll.index_options.vector_indexes.is_empty() {
api_bail!("Vector indexes are not supported for Kuzu yet");
}
if !data_coll.index_options.fts_indexes.is_empty() {
api_bail!("FTS indexes are not supported for Kuzu target");
}
fn to_dep_table(
field_mapping: &AnalyzedGraphElementFieldMapping,
) -> Result<ReferencedNodeTable> {
Expand Down
3 changes: 3 additions & 0 deletions rust/cocoindex/src/ops/targets/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,9 @@ impl SetupState {
.iter()
.map(|f| (f.name.as_str(), &f.value_type.typ))
.collect::<HashMap<_, _>>();
if !index_options.fts_indexes.is_empty() {
api_bail!("FTS indexes are not supported for Neo4j target");
}
for index_def in index_options.vector_indexes.iter() {
sub_components.push(ComponentState {
object_label: schema.elem_type.clone(),
Expand Down
3 changes: 3 additions & 0 deletions rust/cocoindex/src/ops/targets/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ impl SetupState {
index_options: &IndexOptions,
column_options: &HashMap<String, ColumnOptions>,
) -> Result<Self> {
if !index_options.fts_indexes.is_empty() {
api_bail!("FTS indexes are not supported for Postgres target");
}
Ok(Self {
columns: TableColumnsSchema {
key_columns: key_fields_schema
Expand Down
3 changes: 3 additions & 0 deletions rust/cocoindex/src/ops/targets/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,9 @@ impl TargetFactoryBase for Factory {
});
}

if !d.index_options.fts_indexes.is_empty() {
api_bail!("FTS indexes are not supported for Qdrant target");
}
let mut specified_vector_fields = HashSet::new();
for vector_index in d.index_options.vector_indexes {
match vector_def.get_mut(&vector_index.field_name) {
Expand Down
Loading